diff --git a/decode_stream.go b/decode_stream.go index fd6e25c..e296fb9 100644 --- a/decode_stream.go +++ b/decode_stream.go @@ -4,121 +4,113 @@ import ( "bufio" "encoding/binary" "io" + "sync" "github.com/segmentio/encoding/json" ) -// StreamCommandDecoder is EXPERIMENTAL. +var ( + streamJsonCommandDecoderPool sync.Pool + streamProtobufCommandDecoderPool sync.Pool +) + +func GetStreamCommandDecoder(protoType Type, reader io.Reader) StreamCommandDecoder { + if protoType == TypeJSON { + e := streamJsonCommandDecoderPool.Get() + if e == nil { + return NewJSONStreamCommandDecoder(reader) + } + commandDecoder := e.(*JSONStreamCommandDecoder) + commandDecoder.Reset(reader) + return commandDecoder + } + e := streamProtobufCommandDecoderPool.Get() + if e == nil { + return NewProtobufStreamCommandDecoder(reader) + } + commandDecoder := e.(*ProtobufStreamCommandDecoder) + commandDecoder.Reset(reader) + return commandDecoder +} + +func PutStreamCommandDecoder(protoType Type, e StreamCommandDecoder) { + e.Reset(nil) + if protoType == TypeJSON { + streamJsonCommandDecoderPool.Put(e) + return + } + streamProtobufCommandDecoderPool.Put(e) +} + type StreamCommandDecoder interface { - Decode() (*Command, []byte, error) + Decode() (*Command, error) + Reset(reader io.Reader) } -// JSONStreamCommandDecoder is EXPERIMENTAL. type JSONStreamCommandDecoder struct { reader *bufio.Reader } -// NewJSONStreamCommandDecoder is EXPERIMENTAL. func NewJSONStreamCommandDecoder(reader io.Reader) *JSONStreamCommandDecoder { return &JSONStreamCommandDecoder{reader: bufio.NewReader(reader)} } -func (d *JSONStreamCommandDecoder) Decode() (*Command, []byte, error) { - cmdBytes, err := d.reader.ReadBytes('\n') +func (d *JSONStreamCommandDecoder) Decode() (*Command, error) { + cmdBytes, err := d.reader.ReadSlice('\n') if err != nil { - return nil, nil, err + if err == io.EOF && len(cmdBytes) > 0 { + var c Command + _, err = json.Parse(cmdBytes, &c, 0) + if err != nil { + return nil, err + } + return &c, err + } + return nil, err } var c Command - _, err = json.Parse(cmdBytes, &c, json.ZeroCopy) + _, err = json.Parse(cmdBytes, &c, 0) if err != nil { - return nil, nil, err + return nil, err } - return &c, cmdBytes, nil + return &c, nil +} + +func (d *JSONStreamCommandDecoder) Reset(reader io.Reader) { + d.reader.Reset(reader) } -// ProtobufStreamCommandDecoder is EXPERIMENTAL. type ProtobufStreamCommandDecoder struct { reader *bufio.Reader } -// NewProtobufStreamCommandDecoder is EXPERIMENTAL. func NewProtobufStreamCommandDecoder(reader io.Reader) *ProtobufStreamCommandDecoder { return &ProtobufStreamCommandDecoder{reader: bufio.NewReader(reader)} } -func (d *ProtobufStreamCommandDecoder) Decode() (*Command, []byte, error) { +func (d *ProtobufStreamCommandDecoder) Decode() (*Command, error) { msgLength, err := binary.ReadUvarint(d.reader) if err != nil { - return nil, nil, err + return nil, err } - cmdBytes := make([]byte, msgLength) - n, err := d.reader.Read(cmdBytes) + bb := getByteBuffer(int(msgLength)) + defer putByteBuffer(bb) + + n, err := d.reader.Read(bb.B[:int(msgLength)]) if err != nil { - return nil, nil, err + return nil, err } if uint64(n) != msgLength { - return nil, nil, io.ErrShortBuffer + return nil, io.ErrShortBuffer } var c Command - err = c.UnmarshalVT(cmdBytes) + err = c.UnmarshalVT(bb.B[:int(msgLength)]) if err != nil { - return nil, nil, err + return nil, err } - return &c, cmdBytes, nil + return &c, nil } -// -//// StreamReplyDecoder ... -//type StreamReplyDecoder interface { -// Decode() (*Reply, error) -//} -// -//type JSONStreamReplyDecoder struct { -// reader *bufio.Reader -//} -// -//func NewJSONStreamReplyDecoder(reader io.Reader) *JSONStreamReplyDecoder { -// return &JSONStreamReplyDecoder{reader: bufio.NewReader(reader)} -//} -// -//func (d *JSONStreamReplyDecoder) Decode() (*Reply, error) { -// cmdBytes, err := d.reader.ReadBytes('\n') -// if err != nil { -// return nil, err -// } -// var c Reply -// _, err = json.Parse(cmdBytes, &c, json.ZeroCopy) -// if err != nil { -// return nil, err -// } -// return &c, nil -//} -// -//type ProtobufStreamReplyDecoder struct { -// reader *bufio.Reader -//} -// -//func NewProtobufStreamReplyDecoder(reader io.Reader) *ProtobufStreamReplyDecoder { -// return &ProtobufStreamReplyDecoder{reader: bufio.NewReader(reader)} -//} -// -//func (d *ProtobufStreamReplyDecoder) Decode() (*Reply, error) { -// msgLength, err := binary.ReadUvarint(d.reader) -// if err != nil { -// return nil, err -// } -// cmdBytes := make([]byte, msgLength) -// n, err := d.reader.Read(cmdBytes) -// if err != nil { -// return nil, err -// } -// if uint64(n) != msgLength { -// return nil, io.ErrShortBuffer -// } -// var c Reply -// err = c.UnmarshalVT(cmdBytes) -// if err != nil { -// return nil, err -// } -// return &c, nil -//} +func (d *ProtobufStreamCommandDecoder) Reset(reader io.Reader) { + d.reader.Reset(reader) +} diff --git a/encode_helpers.go b/encode_helpers.go index 2abb6f7..3b363e8 100644 --- a/encode_helpers.go +++ b/encode_helpers.go @@ -79,10 +79,10 @@ func newRefreshPush(data Raw) *Push { } } -// At the moment this is hardcoded to a value which should be enough for most our messages -// sent. If we will have a message with field names total size greater than this value then -// byte buffer won't be reused in JSON case (so need to take care of this to not loose -// performance at some point). Would be nice to add additional size for messages like +// MaxJSONPushFieldsSize at the moment this is hardcoded to a value which should be enough for +// most our messages sent. If we have a message with field names total size greater than +// this value then byte buffer won't be reused in JSON case (so need to take care of this to not +// lose performance at some point). Would be nice to add additional size for messages like // Connect push which can have variable length Connect.Subs field. const MaxJSONPushFieldsSize = 64 diff --git a/replies.go b/replies.go new file mode 100644 index 0000000..c8bcdde --- /dev/null +++ b/replies.go @@ -0,0 +1,189 @@ +package protocol + +import "sync" + +type ReplyPoolCollection struct { + connectReplyPool sync.Pool + subscribeReplyPool sync.Pool + unsubscribeReplyPool sync.Pool + publishReplyPool sync.Pool + rpcReplyPool sync.Pool + presenceReplyPool sync.Pool + presenceStatsReplyPool sync.Pool + historyReplyPool sync.Pool + refreshReplyPool sync.Pool + subRefreshReplyPool sync.Pool +} + +//goland:noinspection GoUnusedGlobalVariable +var ReplyPool = &ReplyPoolCollection{} + +func (p *ReplyPoolCollection) AcquireConnectReply(result *ConnectResult) *Reply { + r := p.connectReplyPool.Get() + if r == nil { + return &Reply{ + Connect: result, + } + } + reply := r.(*Reply) + reply.Connect = result + return reply +} + +func (p *ReplyPoolCollection) ReleaseConnectReply(r *Reply) { + r.Connect = nil + p.connectReplyPool.Put(r) +} + +func (p *ReplyPoolCollection) AcquireSubscribeReply(result *SubscribeResult) *Reply { + r := p.subscribeReplyPool.Get() + if r == nil { + return &Reply{ + Subscribe: result, + } + } + reply := r.(*Reply) + reply.Subscribe = result + return reply +} + +func (p *ReplyPoolCollection) ReleaseSubscribeReply(r *Reply) { + r.Subscribe = nil + p.subscribeReplyPool.Put(r) +} + +func (p *ReplyPoolCollection) AcquireUnsubscribeReply(result *UnsubscribeResult) *Reply { + r := p.unsubscribeReplyPool.Get() + if r == nil { + return &Reply{ + Unsubscribe: result, + } + } + reply := r.(*Reply) + reply.Unsubscribe = result + return reply +} + +func (p *ReplyPoolCollection) ReleaseUnsubscribeReply(r *Reply) { + r.Unsubscribe = nil + p.unsubscribeReplyPool.Put(r) +} + +func (p *ReplyPoolCollection) AcquirePublishReply(result *PublishResult) *Reply { + r := p.publishReplyPool.Get() + if r == nil { + return &Reply{ + Publish: result, + } + } + reply := r.(*Reply) + reply.Publish = result + return reply +} + +func (p *ReplyPoolCollection) ReleasePublishReply(r *Reply) { + r.Publish = nil + p.publishReplyPool.Put(r) +} + +func (p *ReplyPoolCollection) AcquireRPCReply(result *RPCResult) *Reply { + r := p.rpcReplyPool.Get() + if r == nil { + return &Reply{ + Rpc: result, + } + } + reply := r.(*Reply) + reply.Rpc = result + return reply +} + +func (p *ReplyPoolCollection) ReleaseRPCReply(r *Reply) { + r.Rpc = nil + p.rpcReplyPool.Put(r) +} + +func (p *ReplyPoolCollection) AcquirePresenceReply(result *PresenceResult) *Reply { + r := p.presenceReplyPool.Get() + if r == nil { + return &Reply{ + Presence: result, + } + } + reply := r.(*Reply) + reply.Presence = result + return reply +} + +func (p *ReplyPoolCollection) ReleasePresenceReply(r *Reply) { + r.Presence = nil + p.presenceReplyPool.Put(r) +} + +func (p *ReplyPoolCollection) AcquirePresenceStatsReply(result *PresenceStatsResult) *Reply { + r := p.presenceStatsReplyPool.Get() + if r == nil { + return &Reply{ + PresenceStats: result, + } + } + reply := r.(*Reply) + reply.PresenceStats = result + return reply +} + +func (p *ReplyPoolCollection) ReleasePresenceStatsReply(r *Reply) { + r.PresenceStats = nil + p.presenceStatsReplyPool.Put(r) +} + +func (p *ReplyPoolCollection) AcquireHistoryReply(result *HistoryResult) *Reply { + r := p.historyReplyPool.Get() + if r == nil { + return &Reply{ + History: result, + } + } + reply := r.(*Reply) + reply.History = result + return reply +} + +func (p *ReplyPoolCollection) ReleaseHistoryReply(r *Reply) { + r.History = nil + p.historyReplyPool.Put(r) +} + +func (p *ReplyPoolCollection) AcquireRefreshReply(result *RefreshResult) *Reply { + r := p.refreshReplyPool.Get() + if r == nil { + return &Reply{ + Refresh: result, + } + } + reply := r.(*Reply) + reply.Refresh = result + return reply +} + +func (p *ReplyPoolCollection) ReleaseRefreshReply(r *Reply) { + r.Refresh = nil + p.refreshReplyPool.Put(r) +} + +func (p *ReplyPoolCollection) AcquireSubRefreshReply(result *SubRefreshResult) *Reply { + r := p.subRefreshReplyPool.Get() + if r == nil { + return &Reply{ + SubRefresh: result, + } + } + reply := r.(*Reply) + reply.SubRefresh = result + return reply +} + +func (p *ReplyPoolCollection) ReleaseSubRefreshReply(r *Reply) { + r.SubRefresh = nil + p.subRefreshReplyPool.Put(r) +}