Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming decoder pools, reply pools #16

Merged
merged 3 commits into from Dec 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
144 changes: 68 additions & 76 deletions decode_stream.go
Expand Up @@ -4,121 +4,113 @@ import (
"bufio"
"encoding/binary"
"io"
"sync"

"github.com/segmentio/encoding/json"
)

// StreamCommandDecoder is EXPERIMENTAL.
var (
streamJsonCommandDecoderPool sync.Pool
streamProtobufCommandDecoderPool sync.Pool
)

func GetStreamCommandDecoder(protoType Type, reader io.Reader) StreamCommandDecoder {
if protoType == TypeJSON {
e := streamJsonCommandDecoderPool.Get()
if e == nil {
return NewJSONStreamCommandDecoder(reader)
}
commandDecoder := e.(*JSONStreamCommandDecoder)
commandDecoder.Reset(reader)
return commandDecoder
}
e := streamProtobufCommandDecoderPool.Get()
if e == nil {
return NewProtobufStreamCommandDecoder(reader)
}
commandDecoder := e.(*ProtobufStreamCommandDecoder)
commandDecoder.Reset(reader)
return commandDecoder
}

func PutStreamCommandDecoder(protoType Type, e StreamCommandDecoder) {
e.Reset(nil)
if protoType == TypeJSON {
streamJsonCommandDecoderPool.Put(e)
return
}
streamProtobufCommandDecoderPool.Put(e)
}

type StreamCommandDecoder interface {
Decode() (*Command, []byte, error)
Decode() (*Command, error)
Reset(reader io.Reader)
}

// JSONStreamCommandDecoder is EXPERIMENTAL.
type JSONStreamCommandDecoder struct {
reader *bufio.Reader
}

// NewJSONStreamCommandDecoder is EXPERIMENTAL.
func NewJSONStreamCommandDecoder(reader io.Reader) *JSONStreamCommandDecoder {
return &JSONStreamCommandDecoder{reader: bufio.NewReader(reader)}
}

func (d *JSONStreamCommandDecoder) Decode() (*Command, []byte, error) {
cmdBytes, err := d.reader.ReadBytes('\n')
func (d *JSONStreamCommandDecoder) Decode() (*Command, error) {
cmdBytes, err := d.reader.ReadSlice('\n')
if err != nil {
return nil, nil, err
if err == io.EOF && len(cmdBytes) > 0 {
var c Command
_, err = json.Parse(cmdBytes, &c, 0)
if err != nil {
return nil, err
}
return &c, err
}
return nil, err
}
var c Command
_, err = json.Parse(cmdBytes, &c, json.ZeroCopy)
_, err = json.Parse(cmdBytes, &c, 0)
if err != nil {
return nil, nil, err
return nil, err
}
return &c, cmdBytes, nil
return &c, nil
}

func (d *JSONStreamCommandDecoder) Reset(reader io.Reader) {
d.reader.Reset(reader)
}

// ProtobufStreamCommandDecoder is EXPERIMENTAL.
type ProtobufStreamCommandDecoder struct {
reader *bufio.Reader
}

// NewProtobufStreamCommandDecoder is EXPERIMENTAL.
func NewProtobufStreamCommandDecoder(reader io.Reader) *ProtobufStreamCommandDecoder {
return &ProtobufStreamCommandDecoder{reader: bufio.NewReader(reader)}
}

func (d *ProtobufStreamCommandDecoder) Decode() (*Command, []byte, error) {
func (d *ProtobufStreamCommandDecoder) Decode() (*Command, error) {
msgLength, err := binary.ReadUvarint(d.reader)
if err != nil {
return nil, nil, err
return nil, err
}
cmdBytes := make([]byte, msgLength)
n, err := d.reader.Read(cmdBytes)
bb := getByteBuffer(int(msgLength))
defer putByteBuffer(bb)

n, err := d.reader.Read(bb.B[:int(msgLength)])
if err != nil {
return nil, nil, err
return nil, err
}
if uint64(n) != msgLength {
return nil, nil, io.ErrShortBuffer
return nil, io.ErrShortBuffer
}
var c Command
err = c.UnmarshalVT(cmdBytes)
err = c.UnmarshalVT(bb.B[:int(msgLength)])
if err != nil {
return nil, nil, err
return nil, err
}
return &c, cmdBytes, nil
return &c, nil
}

//
//// StreamReplyDecoder ...
//type StreamReplyDecoder interface {
// Decode() (*Reply, error)
//}
//
//type JSONStreamReplyDecoder struct {
// reader *bufio.Reader
//}
//
//func NewJSONStreamReplyDecoder(reader io.Reader) *JSONStreamReplyDecoder {
// return &JSONStreamReplyDecoder{reader: bufio.NewReader(reader)}
//}
//
//func (d *JSONStreamReplyDecoder) Decode() (*Reply, error) {
// cmdBytes, err := d.reader.ReadBytes('\n')
// if err != nil {
// return nil, err
// }
// var c Reply
// _, err = json.Parse(cmdBytes, &c, json.ZeroCopy)
// if err != nil {
// return nil, err
// }
// return &c, nil
//}
//
//type ProtobufStreamReplyDecoder struct {
// reader *bufio.Reader
//}
//
//func NewProtobufStreamReplyDecoder(reader io.Reader) *ProtobufStreamReplyDecoder {
// return &ProtobufStreamReplyDecoder{reader: bufio.NewReader(reader)}
//}
//
//func (d *ProtobufStreamReplyDecoder) Decode() (*Reply, error) {
// msgLength, err := binary.ReadUvarint(d.reader)
// if err != nil {
// return nil, err
// }
// cmdBytes := make([]byte, msgLength)
// n, err := d.reader.Read(cmdBytes)
// if err != nil {
// return nil, err
// }
// if uint64(n) != msgLength {
// return nil, io.ErrShortBuffer
// }
// var c Reply
// err = c.UnmarshalVT(cmdBytes)
// if err != nil {
// return nil, err
// }
// return &c, nil
//}
func (d *ProtobufStreamCommandDecoder) Reset(reader io.Reader) {
d.reader.Reset(reader)
}
8 changes: 4 additions & 4 deletions encode_helpers.go
Expand Up @@ -79,10 +79,10 @@ func newRefreshPush(data Raw) *Push {
}
}

// At the moment this is hardcoded to a value which should be enough for most our messages
// sent. If we will have a message with field names total size greater than this value then
// byte buffer won't be reused in JSON case (so need to take care of this to not loose
// performance at some point). Would be nice to add additional size for messages like
// MaxJSONPushFieldsSize at the moment this is hardcoded to a value which should be enough for
// most our messages sent. If we have a message with field names total size greater than
// this value then byte buffer won't be reused in JSON case (so need to take care of this to not
// lose performance at some point). Would be nice to add additional size for messages like
// Connect push which can have variable length Connect.Subs field.
const MaxJSONPushFieldsSize = 64

Expand Down
189 changes: 189 additions & 0 deletions replies.go
@@ -0,0 +1,189 @@
package protocol

import "sync"

type ReplyPoolCollection struct {
connectReplyPool sync.Pool
subscribeReplyPool sync.Pool
unsubscribeReplyPool sync.Pool
publishReplyPool sync.Pool
rpcReplyPool sync.Pool
presenceReplyPool sync.Pool
presenceStatsReplyPool sync.Pool
historyReplyPool sync.Pool
refreshReplyPool sync.Pool
subRefreshReplyPool sync.Pool
}

//goland:noinspection GoUnusedGlobalVariable
var ReplyPool = &ReplyPoolCollection{}

func (p *ReplyPoolCollection) AcquireConnectReply(result *ConnectResult) *Reply {
r := p.connectReplyPool.Get()
if r == nil {
return &Reply{
Connect: result,
}
}
reply := r.(*Reply)
reply.Connect = result
return reply
}

func (p *ReplyPoolCollection) ReleaseConnectReply(r *Reply) {
r.Connect = nil
p.connectReplyPool.Put(r)
}

func (p *ReplyPoolCollection) AcquireSubscribeReply(result *SubscribeResult) *Reply {
r := p.subscribeReplyPool.Get()
if r == nil {
return &Reply{
Subscribe: result,
}
}
reply := r.(*Reply)
reply.Subscribe = result
return reply
}

func (p *ReplyPoolCollection) ReleaseSubscribeReply(r *Reply) {
r.Subscribe = nil
p.subscribeReplyPool.Put(r)
}

func (p *ReplyPoolCollection) AcquireUnsubscribeReply(result *UnsubscribeResult) *Reply {
r := p.unsubscribeReplyPool.Get()
if r == nil {
return &Reply{
Unsubscribe: result,
}
}
reply := r.(*Reply)
reply.Unsubscribe = result
return reply
}

func (p *ReplyPoolCollection) ReleaseUnsubscribeReply(r *Reply) {
r.Unsubscribe = nil
p.unsubscribeReplyPool.Put(r)
}

func (p *ReplyPoolCollection) AcquirePublishReply(result *PublishResult) *Reply {
r := p.publishReplyPool.Get()
if r == nil {
return &Reply{
Publish: result,
}
}
reply := r.(*Reply)
reply.Publish = result
return reply
}

func (p *ReplyPoolCollection) ReleasePublishReply(r *Reply) {
r.Publish = nil
p.publishReplyPool.Put(r)
}

func (p *ReplyPoolCollection) AcquireRPCReply(result *RPCResult) *Reply {
r := p.rpcReplyPool.Get()
if r == nil {
return &Reply{
Rpc: result,
}
}
reply := r.(*Reply)
reply.Rpc = result
return reply
}

func (p *ReplyPoolCollection) ReleaseRPCReply(r *Reply) {
r.Rpc = nil
p.rpcReplyPool.Put(r)
}

func (p *ReplyPoolCollection) AcquirePresenceReply(result *PresenceResult) *Reply {
r := p.presenceReplyPool.Get()
if r == nil {
return &Reply{
Presence: result,
}
}
reply := r.(*Reply)
reply.Presence = result
return reply
}

func (p *ReplyPoolCollection) ReleasePresenceReply(r *Reply) {
r.Presence = nil
p.presenceReplyPool.Put(r)
}

func (p *ReplyPoolCollection) AcquirePresenceStatsReply(result *PresenceStatsResult) *Reply {
r := p.presenceStatsReplyPool.Get()
if r == nil {
return &Reply{
PresenceStats: result,
}
}
reply := r.(*Reply)
reply.PresenceStats = result
return reply
}

func (p *ReplyPoolCollection) ReleasePresenceStatsReply(r *Reply) {
r.PresenceStats = nil
p.presenceStatsReplyPool.Put(r)
}

func (p *ReplyPoolCollection) AcquireHistoryReply(result *HistoryResult) *Reply {
r := p.historyReplyPool.Get()
if r == nil {
return &Reply{
History: result,
}
}
reply := r.(*Reply)
reply.History = result
return reply
}

func (p *ReplyPoolCollection) ReleaseHistoryReply(r *Reply) {
r.History = nil
p.historyReplyPool.Put(r)
}

func (p *ReplyPoolCollection) AcquireRefreshReply(result *RefreshResult) *Reply {
r := p.refreshReplyPool.Get()
if r == nil {
return &Reply{
Refresh: result,
}
}
reply := r.(*Reply)
reply.Refresh = result
return reply
}

func (p *ReplyPoolCollection) ReleaseRefreshReply(r *Reply) {
r.Refresh = nil
p.refreshReplyPool.Put(r)
}

func (p *ReplyPoolCollection) AcquireSubRefreshReply(result *SubRefreshResult) *Reply {
r := p.subRefreshReplyPool.Get()
if r == nil {
return &Reply{
SubRefresh: result,
}
}
reply := r.(*Reply)
reply.SubRefresh = result
return reply
}

func (p *ReplyPoolCollection) ReleaseSubRefreshReply(r *Reply) {
r.SubRefresh = nil
p.subRefreshReplyPool.Put(r)
}