Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepared message concept #207

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
144 changes: 143 additions & 1 deletion conn.go
Expand Up @@ -6,6 +6,7 @@ package websocket

import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"io"
Expand Down Expand Up @@ -659,12 +660,153 @@ func (w *messageWriter) Close() error {
return nil
}

// netConn is a fake network connection used to get PreparedMessage
// prebuilt payloads.
type netConn struct {
bytes.Buffer
}

func (netConn) Read(p []byte) (int, error) { return 0, nil }

// netAddr is a fake net.Addr implementation to be used in netConn.
type netAddr int

func (a netAddr) Network() string { return "" }
func (a netAddr) String() string { return "" }

func (c netConn) Close() error { return nil }
func (c netConn) LocalAddr() net.Addr { return netAddr(0) }
func (c netConn) RemoteAddr() net.Addr { return netAddr(0) }
func (c netConn) SetDeadline(t time.Time) error { return nil }
func (c netConn) SetReadDeadline(t time.Time) error { return nil }
func (c netConn) SetWriteDeadline(t time.Time) error { return nil }

var (
preparingServerConnPool = sync.Pool{New: func() interface{} {
var buf bytes.Buffer
return newConn(&netConn{Buffer: buf}, true, 0, 0)
}}
preparingClientConnPool = sync.Pool{New: func() interface{} {
var buf bytes.Buffer
return newConn(&netConn{Buffer: buf}, false, 0, 0)
}}
)

// PreparedMessage caches on the wire representations of a message payload.
// Use PreparedMessage to efficiently send a message payload to multiple
// connections. PreparedMessage is especially useful when compression
// is used because the CPU and memory expensive compression operation
// can be executed once for a given set of compression options.
type PreparedMessage struct {
frameType int
data []byte
mu sync.Mutex
frames map[frameKey]*preparedFrame
}

// frameKey defines a unique set of options to cache prepared frames in PreparedMessage.
type frameKey struct {
isServer bool
compress bool
compressionLevel int
}

// preparedFrame contains data in wire representation.
type preparedFrame struct {
once sync.Once
data []byte
}

// NewPreparedMessage returns initialized PreparedMessage. You can then send
// it to connection using WritePreparedMessage method. Valid wire representation
// will be calculated lazily only once for a set of current connection options.
func NewPreparedMessage(messageType int, data []byte) *PreparedMessage {
if !isData(messageType) {
panic("Prepared message type can only be TextMessage or BinaryMessage")
}
return &PreparedMessage{
frameType: messageType,
data: data,
frames: make(map[frameKey]*preparedFrame),
}
}

func (pm *PreparedMessage) frame(key frameKey) (int, []byte, error) {
pm.mu.Lock()
frame, ok := pm.frames[key]
if !ok {
frame = &preparedFrame{}
pm.frames[key] = frame
}
pm.mu.Unlock()

var writeErr error

frame.once.Do(func() {
// Create frame data once for a given frameKey.
var c *Conn
if key.isServer {
c = preparingServerConnPool.Get().(*Conn)
} else {
c = preparingClientConnPool.Get().(*Conn)
}

defer func() {
c.conn.(*netConn).Buffer.Reset()
c.enableWriteCompression = false
c.newCompressionWriter = nil
c.SetCompressionLevel(0)
if key.isServer {
preparingServerConnPool.Put(c)
} else {
preparingClientConnPool.Put(c)
}
}()

if key.compress {
c.enableWriteCompression = true
c.newCompressionWriter = compressNoContextTakeover
c.SetCompressionLevel(key.compressionLevel)
}
writeErr := c.WriteMessage(pm.frameType, pm.data)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shadowing here, need writeErr = c.WriteMessage(pm.frameType, pm.data)

if writeErr == nil {
preparedData := c.conn.(*netConn).Buffer.Bytes()
data := make([]byte, len(preparedData))
copy(data, preparedData)
frame.data = data
}
})

return pm.frameType, frame.data, writeErr
}

// WritePreparedMessage writes prepared message into connection.
func (c *Conn) WritePreparedMessage(msg *PreparedMessage) error {
frameType, frameData, err := msg.frame(frameKey{
isServer: c.isServer,
compress: c.newCompressionWriter != nil && c.enableWriteCompression,
compressionLevel: c.compressionLevel,
})
if err != nil {
return err
}
if c.isWriting {
panic("concurrent write to websocket connection")
}
c.isWriting = true
err = c.write(frameType, c.writeDeadline, frameData, nil)
if !c.isWriting {
panic("concurrent write to websocket connection")
}
c.isWriting = false
return err
}

// WriteMessage is a helper method for getting a writer using NextWriter,
// writing the message and closing the writer.
func (c *Conn) WriteMessage(messageType int, data []byte) error {

if c.isServer && (c.newCompressionWriter == nil || !c.enableWriteCompression) {

// Fast path with no allocations and single frame.

if err := c.prepWrite(messageType); err != nil {
Expand Down