Skip to content

Commit

Permalink
Streaming decoder pools, reply pools (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Dec 29, 2022
1 parent df11358 commit 804264f
Show file tree
Hide file tree
Showing 3 changed files with 261 additions and 80 deletions.
144 changes: 68 additions & 76 deletions decode_stream.go
Expand Up @@ -4,121 +4,113 @@ import (
"bufio"
"encoding/binary"
"io"
"sync"

"github.com/segmentio/encoding/json"
)

// StreamCommandDecoder is EXPERIMENTAL.
var (
streamJsonCommandDecoderPool sync.Pool
streamProtobufCommandDecoderPool sync.Pool
)

func GetStreamCommandDecoder(protoType Type, reader io.Reader) StreamCommandDecoder {
if protoType == TypeJSON {
e := streamJsonCommandDecoderPool.Get()
if e == nil {
return NewJSONStreamCommandDecoder(reader)
}
commandDecoder := e.(*JSONStreamCommandDecoder)
commandDecoder.Reset(reader)
return commandDecoder
}
e := streamProtobufCommandDecoderPool.Get()
if e == nil {
return NewProtobufStreamCommandDecoder(reader)
}
commandDecoder := e.(*ProtobufStreamCommandDecoder)
commandDecoder.Reset(reader)
return commandDecoder
}

func PutStreamCommandDecoder(protoType Type, e StreamCommandDecoder) {
e.Reset(nil)
if protoType == TypeJSON {
streamJsonCommandDecoderPool.Put(e)
return
}
streamProtobufCommandDecoderPool.Put(e)
}

type StreamCommandDecoder interface {
Decode() (*Command, []byte, error)
Decode() (*Command, error)
Reset(reader io.Reader)
}

// JSONStreamCommandDecoder is EXPERIMENTAL.
type JSONStreamCommandDecoder struct {
reader *bufio.Reader
}

// NewJSONStreamCommandDecoder is EXPERIMENTAL.
func NewJSONStreamCommandDecoder(reader io.Reader) *JSONStreamCommandDecoder {
return &JSONStreamCommandDecoder{reader: bufio.NewReader(reader)}
}

func (d *JSONStreamCommandDecoder) Decode() (*Command, []byte, error) {
cmdBytes, err := d.reader.ReadBytes('\n')
func (d *JSONStreamCommandDecoder) Decode() (*Command, error) {
cmdBytes, err := d.reader.ReadSlice('\n')
if err != nil {
return nil, nil, err
if err == io.EOF && len(cmdBytes) > 0 {
var c Command
_, err = json.Parse(cmdBytes, &c, 0)
if err != nil {
return nil, err
}
return &c, err
}
return nil, err
}
var c Command
_, err = json.Parse(cmdBytes, &c, json.ZeroCopy)
_, err = json.Parse(cmdBytes, &c, 0)
if err != nil {
return nil, nil, err
return nil, err
}
return &c, cmdBytes, nil
return &c, nil
}

func (d *JSONStreamCommandDecoder) Reset(reader io.Reader) {
d.reader.Reset(reader)
}

// ProtobufStreamCommandDecoder is EXPERIMENTAL.
type ProtobufStreamCommandDecoder struct {
reader *bufio.Reader
}

// NewProtobufStreamCommandDecoder is EXPERIMENTAL.
func NewProtobufStreamCommandDecoder(reader io.Reader) *ProtobufStreamCommandDecoder {
return &ProtobufStreamCommandDecoder{reader: bufio.NewReader(reader)}
}

func (d *ProtobufStreamCommandDecoder) Decode() (*Command, []byte, error) {
func (d *ProtobufStreamCommandDecoder) Decode() (*Command, error) {
msgLength, err := binary.ReadUvarint(d.reader)
if err != nil {
return nil, nil, err
return nil, err
}
cmdBytes := make([]byte, msgLength)
n, err := d.reader.Read(cmdBytes)
bb := getByteBuffer(int(msgLength))
defer putByteBuffer(bb)

n, err := d.reader.Read(bb.B[:int(msgLength)])
if err != nil {
return nil, nil, err
return nil, err
}
if uint64(n) != msgLength {
return nil, nil, io.ErrShortBuffer
return nil, io.ErrShortBuffer
}
var c Command
err = c.UnmarshalVT(cmdBytes)
err = c.UnmarshalVT(bb.B[:int(msgLength)])
if err != nil {
return nil, nil, err
return nil, err
}
return &c, cmdBytes, nil
return &c, nil
}

//
//// StreamReplyDecoder ...
//type StreamReplyDecoder interface {
// Decode() (*Reply, error)
//}
//
//type JSONStreamReplyDecoder struct {
// reader *bufio.Reader
//}
//
//func NewJSONStreamReplyDecoder(reader io.Reader) *JSONStreamReplyDecoder {
// return &JSONStreamReplyDecoder{reader: bufio.NewReader(reader)}
//}
//
//func (d *JSONStreamReplyDecoder) Decode() (*Reply, error) {
// cmdBytes, err := d.reader.ReadBytes('\n')
// if err != nil {
// return nil, err
// }
// var c Reply
// _, err = json.Parse(cmdBytes, &c, json.ZeroCopy)
// if err != nil {
// return nil, err
// }
// return &c, nil
//}
//
//type ProtobufStreamReplyDecoder struct {
// reader *bufio.Reader
//}
//
//func NewProtobufStreamReplyDecoder(reader io.Reader) *ProtobufStreamReplyDecoder {
// return &ProtobufStreamReplyDecoder{reader: bufio.NewReader(reader)}
//}
//
//func (d *ProtobufStreamReplyDecoder) Decode() (*Reply, error) {
// msgLength, err := binary.ReadUvarint(d.reader)
// if err != nil {
// return nil, err
// }
// cmdBytes := make([]byte, msgLength)
// n, err := d.reader.Read(cmdBytes)
// if err != nil {
// return nil, err
// }
// if uint64(n) != msgLength {
// return nil, io.ErrShortBuffer
// }
// var c Reply
// err = c.UnmarshalVT(cmdBytes)
// if err != nil {
// return nil, err
// }
// return &c, nil
//}
func (d *ProtobufStreamCommandDecoder) Reset(reader io.Reader) {
d.reader.Reset(reader)
}
8 changes: 4 additions & 4 deletions encode_helpers.go
Expand Up @@ -79,10 +79,10 @@ func newRefreshPush(data Raw) *Push {
}
}

// At the moment this is hardcoded to a value which should be enough for most our messages
// sent. If we will have a message with field names total size greater than this value then
// byte buffer won't be reused in JSON case (so need to take care of this to not loose
// performance at some point). Would be nice to add additional size for messages like
// MaxJSONPushFieldsSize at the moment this is hardcoded to a value which should be enough for
// most our messages sent. If we have a message with field names total size greater than
// this value then byte buffer won't be reused in JSON case (so need to take care of this to not
// lose performance at some point). Would be nice to add additional size for messages like
// Connect push which can have variable length Connect.Subs field.
const MaxJSONPushFieldsSize = 64

Expand Down
189 changes: 189 additions & 0 deletions replies.go
@@ -0,0 +1,189 @@
package protocol

import "sync"

type ReplyPoolCollection struct {
connectReplyPool sync.Pool
subscribeReplyPool sync.Pool
unsubscribeReplyPool sync.Pool
publishReplyPool sync.Pool
rpcReplyPool sync.Pool
presenceReplyPool sync.Pool
presenceStatsReplyPool sync.Pool
historyReplyPool sync.Pool
refreshReplyPool sync.Pool
subRefreshReplyPool sync.Pool
}

//goland:noinspection GoUnusedGlobalVariable
var ReplyPool = &ReplyPoolCollection{}

func (p *ReplyPoolCollection) AcquireConnectReply(result *ConnectResult) *Reply {
r := p.connectReplyPool.Get()
if r == nil {
return &Reply{
Connect: result,
}
}
reply := r.(*Reply)
reply.Connect = result
return reply
}

func (p *ReplyPoolCollection) ReleaseConnectReply(r *Reply) {
r.Connect = nil
p.connectReplyPool.Put(r)
}

func (p *ReplyPoolCollection) AcquireSubscribeReply(result *SubscribeResult) *Reply {
r := p.subscribeReplyPool.Get()
if r == nil {
return &Reply{
Subscribe: result,
}
}
reply := r.(*Reply)
reply.Subscribe = result
return reply
}

func (p *ReplyPoolCollection) ReleaseSubscribeReply(r *Reply) {
r.Subscribe = nil
p.subscribeReplyPool.Put(r)
}

func (p *ReplyPoolCollection) AcquireUnsubscribeReply(result *UnsubscribeResult) *Reply {
r := p.unsubscribeReplyPool.Get()
if r == nil {
return &Reply{
Unsubscribe: result,
}
}
reply := r.(*Reply)
reply.Unsubscribe = result
return reply
}

func (p *ReplyPoolCollection) ReleaseUnsubscribeReply(r *Reply) {
r.Unsubscribe = nil
p.unsubscribeReplyPool.Put(r)
}

func (p *ReplyPoolCollection) AcquirePublishReply(result *PublishResult) *Reply {
r := p.publishReplyPool.Get()
if r == nil {
return &Reply{
Publish: result,
}
}
reply := r.(*Reply)
reply.Publish = result
return reply
}

func (p *ReplyPoolCollection) ReleasePublishReply(r *Reply) {
r.Publish = nil
p.publishReplyPool.Put(r)
}

func (p *ReplyPoolCollection) AcquireRPCReply(result *RPCResult) *Reply {
r := p.rpcReplyPool.Get()
if r == nil {
return &Reply{
Rpc: result,
}
}
reply := r.(*Reply)
reply.Rpc = result
return reply
}

func (p *ReplyPoolCollection) ReleaseRPCReply(r *Reply) {
r.Rpc = nil
p.rpcReplyPool.Put(r)
}

func (p *ReplyPoolCollection) AcquirePresenceReply(result *PresenceResult) *Reply {
r := p.presenceReplyPool.Get()
if r == nil {
return &Reply{
Presence: result,
}
}
reply := r.(*Reply)
reply.Presence = result
return reply
}

func (p *ReplyPoolCollection) ReleasePresenceReply(r *Reply) {
r.Presence = nil
p.presenceReplyPool.Put(r)
}

func (p *ReplyPoolCollection) AcquirePresenceStatsReply(result *PresenceStatsResult) *Reply {
r := p.presenceStatsReplyPool.Get()
if r == nil {
return &Reply{
PresenceStats: result,
}
}
reply := r.(*Reply)
reply.PresenceStats = result
return reply
}

func (p *ReplyPoolCollection) ReleasePresenceStatsReply(r *Reply) {
r.PresenceStats = nil
p.presenceStatsReplyPool.Put(r)
}

func (p *ReplyPoolCollection) AcquireHistoryReply(result *HistoryResult) *Reply {
r := p.historyReplyPool.Get()
if r == nil {
return &Reply{
History: result,
}
}
reply := r.(*Reply)
reply.History = result
return reply
}

func (p *ReplyPoolCollection) ReleaseHistoryReply(r *Reply) {
r.History = nil
p.historyReplyPool.Put(r)
}

func (p *ReplyPoolCollection) AcquireRefreshReply(result *RefreshResult) *Reply {
r := p.refreshReplyPool.Get()
if r == nil {
return &Reply{
Refresh: result,
}
}
reply := r.(*Reply)
reply.Refresh = result
return reply
}

func (p *ReplyPoolCollection) ReleaseRefreshReply(r *Reply) {
r.Refresh = nil
p.refreshReplyPool.Put(r)
}

func (p *ReplyPoolCollection) AcquireSubRefreshReply(result *SubRefreshResult) *Reply {
r := p.subRefreshReplyPool.Get()
if r == nil {
return &Reply{
SubRefresh: result,
}
}
reply := r.(*Reply)
reply.SubRefresh = result
return reply
}

func (p *ReplyPoolCollection) ReleaseSubRefreshReply(r *Reply) {
r.SubRefresh = nil
p.subRefreshReplyPool.Put(r)
}

0 comments on commit 804264f

Please sign in to comment.