From dad23ddb3db6881915817c0629336b68a36565e6 Mon Sep 17 00:00:00 2001 From: venjiang Date: Tue, 16 Aug 2022 22:50:12 +0800 Subject: [PATCH 1/6] add dispatch field in metaframe --- core/frame/data_frame.go | 10 ++++++++++ core/frame/data_frame_test.go | 12 ++++++++---- core/frame/frame.go | 9 +++++++++ core/frame/meta_frame.go | 29 +++++++++++++++++++++++++---- core/frame/meta_frame_test.go | 7 +++++-- core/server.go | 2 +- 6 files changed, 58 insertions(+), 11 deletions(-) diff --git a/core/frame/data_frame.go b/core/frame/data_frame.go index eb06c641d..5612e976d 100644 --- a/core/frame/data_frame.go +++ b/core/frame/data_frame.go @@ -70,6 +70,16 @@ func (d *DataFrame) SourceID() string { return d.metaFrame.SourceID() } +// SetDispatch set dispatch mode +func (d *DataFrame) SetDispatch(mode Dispatch) { + d.metaFrame.dispatch = mode +} + +// Dispatch get dispatch mode +func (d *DataFrame) Dispatch() Dispatch { + return d.metaFrame.dispatch +} + // Encode return Y3 encoded bytes of `DataFrame` func (d *DataFrame) Encode() []byte { data := y3.NewNodePacketEncoder(byte(d.Type())) diff --git a/core/frame/data_frame_test.go b/core/frame/data_frame_test.go index 0d1d506f4..18d4d8810 100644 --- a/core/frame/data_frame_test.go +++ b/core/frame/data_frame_test.go @@ -10,14 +10,16 @@ func TestDataFrameEncode(t *testing.T) { var userDataTag byte = 0x15 d := NewDataFrame() d.SetCarriage(userDataTag, []byte("yomo")) + d.SetDispatch(DispatchBroadcast) tidbuf := []byte(d.TransactionID()) result := []byte{ - 0x80 | byte(TagOfDataFrame), byte(len(tidbuf) + 4 + 8 + 2), - 0x80 | byte(TagOfMetaFrame), byte(len(tidbuf) + 2 + 2), + 0x80 | byte(TagOfDataFrame), byte(len(tidbuf) + 4 + 8 + 2 + 3), + 0x80 | byte(TagOfMetaFrame), byte(len(tidbuf) + 2 + 2 + 3), byte(TagOfTransactionID), byte(len(tidbuf))} result = append(result, tidbuf...) result = append(result, byte(TagOfSourceID), 0x0) + result = append(result, byte(TagOfDispatch), 0x1, 0x1) result = append(result, 0x80|byte(TagOfPayloadFrame), 0x06, userDataTag, 0x04, 0x79, 0x6F, 0x6D, 0x6F) assert.Equal(t, result, d.Encode()) @@ -26,9 +28,10 @@ func TestDataFrameEncode(t *testing.T) { func TestDataFrameDecode(t *testing.T) { var userDataTag byte = 0x15 buf := []byte{ - 0x80 | byte(TagOfDataFrame), 0x10, - 0x80 | byte(TagOfMetaFrame), 0x06, + 0x80 | byte(TagOfDataFrame), 0x10 + 3, + 0x80 | byte(TagOfMetaFrame), 0x06 + 3, byte(TagOfTransactionID), 0x04, 0x31, 0x32, 0x33, 0x34, + byte(TagOfDispatch), 0x01, 0x01, 0x80 | byte(TagOfPayloadFrame), 0x06, userDataTag, 0x04, 0x79, 0x6F, 0x6D, 0x6F} data, err := DecodeToDataFrame(buf) @@ -36,4 +39,5 @@ func TestDataFrameDecode(t *testing.T) { assert.EqualValues(t, "1234", data.TransactionID()) assert.EqualValues(t, userDataTag, data.GetDataTag()) assert.EqualValues(t, []byte("yomo"), data.GetCarriage()) + assert.EqualValues(t, DispatchBroadcast, data.Dispatch()) } diff --git a/core/frame/frame.go b/core/frame/frame.go index 326c31deb..fc1a874d8 100644 --- a/core/frame/frame.go +++ b/core/frame/frame.go @@ -17,6 +17,7 @@ const ( TagOfMetadata Type = 0x03 TagOfTransactionID Type = 0x01 TagOfSourceID Type = 0x02 + TagOfDispatch Type = 0x04 // PayloadFrame of DataFrame TagOfPayloadFrame Type = 0x2E TagOfBackflowFrame Type = 0x2D @@ -42,6 +43,14 @@ const ( TagOfGoawayMessage Type = 0x02 ) +// frame dispatch mode +type Dispatch = byte + +const ( + DispatchDirected Dispatch = 0x00 // directed + DispatchBroadcast Dispatch = 0x01 // broadcast +) + // Type represents the type of frame. type Type uint8 diff --git a/core/frame/meta_frame.go b/core/frame/meta_frame.go index 712c00976..6ce61108d 100644 --- a/core/frame/meta_frame.go +++ b/core/frame/meta_frame.go @@ -1,6 +1,7 @@ package frame import ( + "fmt" "strconv" "time" @@ -14,6 +15,7 @@ type MetaFrame struct { tid string metadata []byte sourceID string + dispatch Dispatch } // NewMetaFrame creates a new MetaFrame instance. @@ -22,7 +24,7 @@ func NewMetaFrame() *MetaFrame { if err != nil { tid = strconv.FormatInt(time.Now().Unix(), 10) // todo: UnixMicro since go 1.17 } - return &MetaFrame{tid: tid} + return &MetaFrame{tid: tid, dispatch: DispatchDirected} } // SetTransactionID set the transaction ID. @@ -55,6 +57,16 @@ func (m *MetaFrame) SourceID() string { return m.sourceID } +// SetDispatch set dispatch mode +func (m *MetaFrame) SetDispatch(mode Dispatch) { + m.dispatch = mode +} + +// Dispatch get dispatch mode +func (m *MetaFrame) Dispatch() Dispatch { + return m.dispatch +} + // Encode implements Frame.Encode method. func (m *MetaFrame) Encode() []byte { meta := y3.NewNodePacketEncoder(byte(TagOfMetaFrame)) @@ -75,6 +87,11 @@ func (m *MetaFrame) Encode() []byte { meta.AddPrimitivePacket(metadata) } + // dispatch mode + dispatch := y3.NewPrimitivePacketEncoder(byte(TagOfDispatch)) + dispatch.SetBytesValue([]byte{m.dispatch}) + meta.AddPrimitivePacket(dispatch) + return meta.Encode() } @@ -95,17 +112,21 @@ func DecodeToMetaFrame(buf []byte) (*MetaFrame, error) { return nil, err } meta.tid = val - break case byte(TagOfMetadata): meta.metadata = v.ToBytes() - break case byte(TagOfSourceID): sourceID, err := v.ToUTF8String() if err != nil { return nil, err } meta.sourceID = sourceID - break + case byte(TagOfDispatch): + dispatch := v.ToBytes() + fmt.Printf("dispatch: %v", dispatch) + if len(dispatch) < 1 { + meta.dispatch = DispatchDirected + } + meta.dispatch = dispatch[0] } } diff --git a/core/frame/meta_frame_test.go b/core/frame/meta_frame_test.go index 31e58b22c..41a70fecf 100644 --- a/core/frame/meta_frame_test.go +++ b/core/frame/meta_frame_test.go @@ -9,17 +9,20 @@ import ( func TestMetaFrameEncode(t *testing.T) { m := NewMetaFrame() tidbuf := []byte(m.tid) - result := []byte{0x80 | byte(TagOfMetaFrame), byte(1 + 1 + len(tidbuf) + 2), byte(TagOfTransactionID), byte(len(tidbuf))} + result := []byte{0x80 | byte(TagOfMetaFrame), byte(1 + 1 + len(tidbuf) + 2 + 3), byte(TagOfTransactionID), byte(len(tidbuf))} result = append(result, tidbuf...) result = append(result, byte(TagOfSourceID), 0x0) + m.SetDispatch(DispatchBroadcast) + result = append(result, byte(TagOfDispatch), 0x1, 0x1) assert.Equal(t, result, m.Encode()) } func TestMetaFrameDecode(t *testing.T) { - buf := []byte{0x80 | byte(TagOfMetaFrame), 0x09, byte(TagOfTransactionID), 0x04, 0x31, 0x32, 0x33, 0x34, byte(TagOfSourceID), 0x01, 0x31} + buf := []byte{0x80 | byte(TagOfMetaFrame), 0x0C, byte(TagOfTransactionID), 0x04, 0x31, 0x32, 0x33, 0x34, byte(TagOfSourceID), 0x01, 0x31, byte(TagOfDispatch), 0x01, 0x01} meta, err := DecodeToMetaFrame(buf) assert.NoError(t, err) assert.EqualValues(t, "1234", meta.TransactionID()) assert.EqualValues(t, "1", meta.SourceID()) + assert.EqualValues(t, DispatchBroadcast, meta.Dispatch()) t.Logf("%# x", buf) } diff --git a/core/server.go b/core/server.go index 0ac9946a5..f4917bde0 100644 --- a/core/server.go +++ b/core/server.go @@ -231,7 +231,7 @@ func (s *Server) handleConnection(c *Context) { // before frame handlers for _, handler := range s.beforeHandlers { if err := handler(c); err != nil { - logger.Errorf("%safterFrameHandler err: %s", ServerLogPrefix, err) + logger.Errorf("%sbeforeFrameHandler err: %s", ServerLogPrefix, err) c.CloseWithError(yerr.ErrorCodeBeforeHandler, err.Error()) return } From 59901e8b470f03491647383a655ae228e8e0bfb0 Mon Sep 17 00:00:00 2001 From: venjiang Date: Tue, 16 Aug 2022 23:00:57 +0800 Subject: [PATCH 2/6] add broadcast method in source --- core/server.go | 6 ++++-- source.go | 17 +++++++++++++++-- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/core/server.go b/core/server.go index f4917bde0..4068ff209 100644 --- a/core/server.go +++ b/core/server.go @@ -279,8 +279,10 @@ func (s *Server) mainFrameHandler(c *Context) error { conn := s.connector.Get(c.connID) if conn != nil && conn.ClientType() == ClientTypeSource { f := c.Frame.(*frame.DataFrame) - f.GetMetaFrame().SetMetadata(conn.Metadata().Encode()) - s.dispatchToDownstreams(f) + if f.Dispatch() == frame.DispatchBroadcast { + f.GetMetaFrame().SetMetadata(conn.Metadata().Encode()) + s.dispatchToDownstreams(f) + } } // observe datatags backflow s.handleBackflowFrame(c) diff --git a/source.go b/source.go index 4883588a0..c10060943 100644 --- a/source.go +++ b/source.go @@ -19,14 +19,16 @@ type Source interface { Connect() error // SetDataTag will set the tag of data when invoking Write(). SetDataTag(tag uint8) - // Write the data to downstream. - Write(p []byte) (n int, err error) + // Write the data to directed downstream. + Write(data []byte) (n int, err error) // WriteWithTag will write data with specified tag, default transactionID is epoch time. WriteWithTag(tag uint8, data []byte) error // SetErrorHandler set the error handler function when server error occurs SetErrorHandler(fn func(err error)) // [Experimental] SetReceiveHandler set the observe handler function SetReceiveHandler(fn func(tag byte, data []byte)) + // Write the data to all downstream + Broadcast(data []byte) error } // YoMo-Source @@ -108,3 +110,14 @@ func (s *yomoSource) SetReceiveHandler(fn func(byte, []byte)) { s.fn = fn s.client.Logger().Debugf("%sSetReceiveHandler(%v)", sourceLogPrefix, s.fn) } + +// Broadcast Write the data to all downstream +func (s *yomoSource) Broadcast(data []byte) error { + f := frame.NewDataFrame() + f.SetCarriage(byte(s.tag), data) + f.SetSourceID(s.client.ClientID()) + f.SetDispatch(frame.DispatchBroadcast) + s.client.Logger().Debugf("%sBroadcast: tid=%s, source_id=%s, data[%d]=%# x", + sourceLogPrefix, f.TransactionID(), f.SourceID(), len(data), frame.Shortly(data)) + return s.client.WriteFrame(f) +} From 0843ca05c666e123796370256ca361d621b6009f Mon Sep 17 00:00:00 2001 From: venjiang Date: Wed, 17 Aug 2022 16:51:10 +0800 Subject: [PATCH 3/6] clean --- core/frame/meta_frame.go | 2 -- example/4-cascading-zipper/source/source.go | 2 ++ 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/frame/meta_frame.go b/core/frame/meta_frame.go index 6ce61108d..bcfc0f16e 100644 --- a/core/frame/meta_frame.go +++ b/core/frame/meta_frame.go @@ -1,7 +1,6 @@ package frame import ( - "fmt" "strconv" "time" @@ -122,7 +121,6 @@ func DecodeToMetaFrame(buf []byte) (*MetaFrame, error) { meta.sourceID = sourceID case byte(TagOfDispatch): dispatch := v.ToBytes() - fmt.Printf("dispatch: %v", dispatch) if len(dispatch) < 1 { meta.dispatch = DispatchDirected } diff --git a/example/4-cascading-zipper/source/source.go b/example/4-cascading-zipper/source/source.go index 95c4aee8c..d48855d5d 100644 --- a/example/4-cascading-zipper/source/source.go +++ b/example/4-cascading-zipper/source/source.go @@ -41,6 +41,8 @@ func generateAndSendData(stream yomo.Source) error { data := []byte(fmt.Sprintf("%d", rnd.Uint32())) // send data via QUIC stream. _, err := stream.Write(data) + // using the following code, zipper will broadcast this message to cascading zippers + // err := stream.Broadcast(data) if err != nil { log.Printf("[source] ❌ Emit %v to YoMo-Zipper failure with err: %v", data, err) time.Sleep(500 * time.Millisecond) From e5de0364b5535a9496169c31db07209109832ec0 Mon Sep 17 00:00:00 2001 From: venjiang Date: Wed, 17 Aug 2022 22:23:13 +0800 Subject: [PATCH 4/6] add comments --- core/frame/frame.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/frame/frame.go b/core/frame/frame.go index fc1a874d8..9a6c52bbe 100644 --- a/core/frame/frame.go +++ b/core/frame/frame.go @@ -43,12 +43,14 @@ const ( TagOfGoawayMessage Type = 0x02 ) -// frame dispatch mode +// Dispatch frame dispatch mode type Dispatch = byte const ( - DispatchDirected Dispatch = 0x00 // directed - DispatchBroadcast Dispatch = 0x01 // broadcast + // DispatchNormal dispatch directed mode + DispatchDirected Dispatch = 0x00 + // DispatchBroadcast dispatch broadcast mode + DispatchBroadcast Dispatch = 0x01 ) // Type represents the type of frame. From 3923830511a447542e5bbf020b7560e21e838292 Mon Sep 17 00:00:00 2001 From: venjiang Date: Wed, 17 Aug 2022 22:29:55 +0800 Subject: [PATCH 5/6] update comments --- core/frame/frame.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/frame/frame.go b/core/frame/frame.go index 9a6c52bbe..96f808663 100644 --- a/core/frame/frame.go +++ b/core/frame/frame.go @@ -47,7 +47,7 @@ const ( type Dispatch = byte const ( - // DispatchNormal dispatch directed mode + // DispatchDirected dispatch directed mode DispatchDirected Dispatch = 0x00 // DispatchBroadcast dispatch broadcast mode DispatchBroadcast Dispatch = 0x01 From b068dbf12b7f50771fe32885ef8ab83248652c0a Mon Sep 17 00:00:00 2001 From: venjiang Date: Fri, 19 Aug 2022 15:50:03 +0800 Subject: [PATCH 6/6] rename the dispatch to broadcast and change the type --- core/frame/data_frame.go | 12 +++++------ core/frame/data_frame_test.go | 8 +++---- core/frame/frame.go | 12 +---------- core/frame/meta_frame.go | 40 +++++++++++++++++------------------ core/frame/meta_frame_test.go | 8 +++---- core/server.go | 2 +- source.go | 2 +- 7 files changed, 37 insertions(+), 47 deletions(-) diff --git a/core/frame/data_frame.go b/core/frame/data_frame.go index 5612e976d..bce556d1d 100644 --- a/core/frame/data_frame.go +++ b/core/frame/data_frame.go @@ -70,14 +70,14 @@ func (d *DataFrame) SourceID() string { return d.metaFrame.SourceID() } -// SetDispatch set dispatch mode -func (d *DataFrame) SetDispatch(mode Dispatch) { - d.metaFrame.dispatch = mode +// SetBroadcast set broadcast mode +func (d *DataFrame) SetBroadcast(enabled bool) { + d.metaFrame.SetBroadcast(enabled) } -// Dispatch get dispatch mode -func (d *DataFrame) Dispatch() Dispatch { - return d.metaFrame.dispatch +// IsBroadcast returns the broadcast mode is enabled +func (d *DataFrame) IsBroadcast() bool { + return d.metaFrame.IsBroadcast() } // Encode return Y3 encoded bytes of `DataFrame` diff --git a/core/frame/data_frame_test.go b/core/frame/data_frame_test.go index 18d4d8810..f8cc3836b 100644 --- a/core/frame/data_frame_test.go +++ b/core/frame/data_frame_test.go @@ -10,7 +10,7 @@ func TestDataFrameEncode(t *testing.T) { var userDataTag byte = 0x15 d := NewDataFrame() d.SetCarriage(userDataTag, []byte("yomo")) - d.SetDispatch(DispatchBroadcast) + d.SetBroadcast(true) tidbuf := []byte(d.TransactionID()) result := []byte{ @@ -19,7 +19,7 @@ func TestDataFrameEncode(t *testing.T) { byte(TagOfTransactionID), byte(len(tidbuf))} result = append(result, tidbuf...) result = append(result, byte(TagOfSourceID), 0x0) - result = append(result, byte(TagOfDispatch), 0x1, 0x1) + result = append(result, byte(TagOfBroadcast), 0x1, 0x1) result = append(result, 0x80|byte(TagOfPayloadFrame), 0x06, userDataTag, 0x04, 0x79, 0x6F, 0x6D, 0x6F) assert.Equal(t, result, d.Encode()) @@ -31,7 +31,7 @@ func TestDataFrameDecode(t *testing.T) { 0x80 | byte(TagOfDataFrame), 0x10 + 3, 0x80 | byte(TagOfMetaFrame), 0x06 + 3, byte(TagOfTransactionID), 0x04, 0x31, 0x32, 0x33, 0x34, - byte(TagOfDispatch), 0x01, 0x01, + byte(TagOfBroadcast), 0x01, 0x01, 0x80 | byte(TagOfPayloadFrame), 0x06, userDataTag, 0x04, 0x79, 0x6F, 0x6D, 0x6F} data, err := DecodeToDataFrame(buf) @@ -39,5 +39,5 @@ func TestDataFrameDecode(t *testing.T) { assert.EqualValues(t, "1234", data.TransactionID()) assert.EqualValues(t, userDataTag, data.GetDataTag()) assert.EqualValues(t, []byte("yomo"), data.GetCarriage()) - assert.EqualValues(t, DispatchBroadcast, data.Dispatch()) + assert.EqualValues(t, true, data.IsBroadcast()) } diff --git a/core/frame/frame.go b/core/frame/frame.go index 96f808663..0654804bf 100644 --- a/core/frame/frame.go +++ b/core/frame/frame.go @@ -17,7 +17,7 @@ const ( TagOfMetadata Type = 0x03 TagOfTransactionID Type = 0x01 TagOfSourceID Type = 0x02 - TagOfDispatch Type = 0x04 + TagOfBroadcast Type = 0x04 // PayloadFrame of DataFrame TagOfPayloadFrame Type = 0x2E TagOfBackflowFrame Type = 0x2D @@ -43,16 +43,6 @@ const ( TagOfGoawayMessage Type = 0x02 ) -// Dispatch frame dispatch mode -type Dispatch = byte - -const ( - // DispatchDirected dispatch directed mode - DispatchDirected Dispatch = 0x00 - // DispatchBroadcast dispatch broadcast mode - DispatchBroadcast Dispatch = 0x01 -) - // Type represents the type of frame. type Type uint8 diff --git a/core/frame/meta_frame.go b/core/frame/meta_frame.go index bcfc0f16e..8c9a70375 100644 --- a/core/frame/meta_frame.go +++ b/core/frame/meta_frame.go @@ -11,10 +11,10 @@ import ( // MetaFrame is a Y3 encoded bytes, SeqID is a fixed value of TYPE_ID_TRANSACTION. // used for describes metadata for a DataFrame. type MetaFrame struct { - tid string - metadata []byte - sourceID string - dispatch Dispatch + tid string + metadata []byte + sourceID string + broadcast bool } // NewMetaFrame creates a new MetaFrame instance. @@ -23,7 +23,7 @@ func NewMetaFrame() *MetaFrame { if err != nil { tid = strconv.FormatInt(time.Now().Unix(), 10) // todo: UnixMicro since go 1.17 } - return &MetaFrame{tid: tid, dispatch: DispatchDirected} + return &MetaFrame{tid: tid} } // SetTransactionID set the transaction ID. @@ -56,14 +56,14 @@ func (m *MetaFrame) SourceID() string { return m.sourceID } -// SetDispatch set dispatch mode -func (m *MetaFrame) SetDispatch(mode Dispatch) { - m.dispatch = mode +// SetBroadcast set broadcast mode +func (m *MetaFrame) SetBroadcast(enabled bool) { + m.broadcast = enabled } -// Dispatch get dispatch mode -func (m *MetaFrame) Dispatch() Dispatch { - return m.dispatch +// IsBroadcast returns the broadcast mode is enabled +func (m *MetaFrame) IsBroadcast() bool { + return m.broadcast } // Encode implements Frame.Encode method. @@ -86,10 +86,10 @@ func (m *MetaFrame) Encode() []byte { meta.AddPrimitivePacket(metadata) } - // dispatch mode - dispatch := y3.NewPrimitivePacketEncoder(byte(TagOfDispatch)) - dispatch.SetBytesValue([]byte{m.dispatch}) - meta.AddPrimitivePacket(dispatch) + // broadcast mode + broadcast := y3.NewPrimitivePacketEncoder(byte(TagOfBroadcast)) + broadcast.SetBoolValue(m.broadcast) + meta.AddPrimitivePacket(broadcast) return meta.Encode() } @@ -119,12 +119,12 @@ func DecodeToMetaFrame(buf []byte) (*MetaFrame, error) { return nil, err } meta.sourceID = sourceID - case byte(TagOfDispatch): - dispatch := v.ToBytes() - if len(dispatch) < 1 { - meta.dispatch = DispatchDirected + case byte(TagOfBroadcast): + broadcast, err := v.ToBool() + if err != nil { + return nil, err } - meta.dispatch = dispatch[0] + meta.broadcast = broadcast } } diff --git a/core/frame/meta_frame_test.go b/core/frame/meta_frame_test.go index 41a70fecf..463b227ba 100644 --- a/core/frame/meta_frame_test.go +++ b/core/frame/meta_frame_test.go @@ -8,21 +8,21 @@ import ( func TestMetaFrameEncode(t *testing.T) { m := NewMetaFrame() + m.SetBroadcast(true) tidbuf := []byte(m.tid) result := []byte{0x80 | byte(TagOfMetaFrame), byte(1 + 1 + len(tidbuf) + 2 + 3), byte(TagOfTransactionID), byte(len(tidbuf))} result = append(result, tidbuf...) result = append(result, byte(TagOfSourceID), 0x0) - m.SetDispatch(DispatchBroadcast) - result = append(result, byte(TagOfDispatch), 0x1, 0x1) + result = append(result, byte(TagOfBroadcast), 0x1, 0x1) assert.Equal(t, result, m.Encode()) } func TestMetaFrameDecode(t *testing.T) { - buf := []byte{0x80 | byte(TagOfMetaFrame), 0x0C, byte(TagOfTransactionID), 0x04, 0x31, 0x32, 0x33, 0x34, byte(TagOfSourceID), 0x01, 0x31, byte(TagOfDispatch), 0x01, 0x01} + buf := []byte{0x80 | byte(TagOfMetaFrame), 0x0C, byte(TagOfTransactionID), 0x04, 0x31, 0x32, 0x33, 0x34, byte(TagOfSourceID), 0x01, 0x31, byte(TagOfBroadcast), 0x01, 0x01} meta, err := DecodeToMetaFrame(buf) assert.NoError(t, err) assert.EqualValues(t, "1234", meta.TransactionID()) assert.EqualValues(t, "1", meta.SourceID()) - assert.EqualValues(t, DispatchBroadcast, meta.Dispatch()) + assert.EqualValues(t, true, meta.IsBroadcast()) t.Logf("%# x", buf) } diff --git a/core/server.go b/core/server.go index 4068ff209..8e4445c48 100644 --- a/core/server.go +++ b/core/server.go @@ -279,7 +279,7 @@ func (s *Server) mainFrameHandler(c *Context) error { conn := s.connector.Get(c.connID) if conn != nil && conn.ClientType() == ClientTypeSource { f := c.Frame.(*frame.DataFrame) - if f.Dispatch() == frame.DispatchBroadcast { + if f.IsBroadcast() { f.GetMetaFrame().SetMetadata(conn.Metadata().Encode()) s.dispatchToDownstreams(f) } diff --git a/source.go b/source.go index c10060943..5c7653c5b 100644 --- a/source.go +++ b/source.go @@ -116,7 +116,7 @@ func (s *yomoSource) Broadcast(data []byte) error { f := frame.NewDataFrame() f.SetCarriage(byte(s.tag), data) f.SetSourceID(s.client.ClientID()) - f.SetDispatch(frame.DispatchBroadcast) + f.SetBroadcast(true) s.client.Logger().Debugf("%sBroadcast: tid=%s, source_id=%s, data[%d]=%# x", sourceLogPrefix, f.TransactionID(), f.SourceID(), len(data), frame.Shortly(data)) return s.client.WriteFrame(f)