diff --git a/core/frame/data_frame.go b/core/frame/data_frame.go index eb06c641d..bce556d1d 100644 --- a/core/frame/data_frame.go +++ b/core/frame/data_frame.go @@ -70,6 +70,16 @@ func (d *DataFrame) SourceID() string { return d.metaFrame.SourceID() } +// SetBroadcast set broadcast mode +func (d *DataFrame) SetBroadcast(enabled bool) { + d.metaFrame.SetBroadcast(enabled) +} + +// IsBroadcast returns the broadcast mode is enabled +func (d *DataFrame) IsBroadcast() bool { + return d.metaFrame.IsBroadcast() +} + // Encode return Y3 encoded bytes of `DataFrame` func (d *DataFrame) Encode() []byte { data := y3.NewNodePacketEncoder(byte(d.Type())) diff --git a/core/frame/data_frame_test.go b/core/frame/data_frame_test.go index 0d1d506f4..f8cc3836b 100644 --- a/core/frame/data_frame_test.go +++ b/core/frame/data_frame_test.go @@ -10,14 +10,16 @@ func TestDataFrameEncode(t *testing.T) { var userDataTag byte = 0x15 d := NewDataFrame() d.SetCarriage(userDataTag, []byte("yomo")) + d.SetBroadcast(true) tidbuf := []byte(d.TransactionID()) result := []byte{ - 0x80 | byte(TagOfDataFrame), byte(len(tidbuf) + 4 + 8 + 2), - 0x80 | byte(TagOfMetaFrame), byte(len(tidbuf) + 2 + 2), + 0x80 | byte(TagOfDataFrame), byte(len(tidbuf) + 4 + 8 + 2 + 3), + 0x80 | byte(TagOfMetaFrame), byte(len(tidbuf) + 2 + 2 + 3), byte(TagOfTransactionID), byte(len(tidbuf))} result = append(result, tidbuf...) result = append(result, byte(TagOfSourceID), 0x0) + result = append(result, byte(TagOfBroadcast), 0x1, 0x1) result = append(result, 0x80|byte(TagOfPayloadFrame), 0x06, userDataTag, 0x04, 0x79, 0x6F, 0x6D, 0x6F) assert.Equal(t, result, d.Encode()) @@ -26,9 +28,10 @@ func TestDataFrameEncode(t *testing.T) { func TestDataFrameDecode(t *testing.T) { var userDataTag byte = 0x15 buf := []byte{ - 0x80 | byte(TagOfDataFrame), 0x10, - 0x80 | byte(TagOfMetaFrame), 0x06, + 0x80 | byte(TagOfDataFrame), 0x10 + 3, + 0x80 | byte(TagOfMetaFrame), 0x06 + 3, byte(TagOfTransactionID), 0x04, 0x31, 0x32, 0x33, 0x34, + byte(TagOfBroadcast), 0x01, 0x01, 0x80 | byte(TagOfPayloadFrame), 0x06, userDataTag, 0x04, 0x79, 0x6F, 0x6D, 0x6F} data, err := DecodeToDataFrame(buf) @@ -36,4 +39,5 @@ func TestDataFrameDecode(t *testing.T) { assert.EqualValues(t, "1234", data.TransactionID()) assert.EqualValues(t, userDataTag, data.GetDataTag()) assert.EqualValues(t, []byte("yomo"), data.GetCarriage()) + assert.EqualValues(t, true, data.IsBroadcast()) } diff --git a/core/frame/frame.go b/core/frame/frame.go index 326c31deb..0654804bf 100644 --- a/core/frame/frame.go +++ b/core/frame/frame.go @@ -17,6 +17,7 @@ const ( TagOfMetadata Type = 0x03 TagOfTransactionID Type = 0x01 TagOfSourceID Type = 0x02 + TagOfBroadcast Type = 0x04 // PayloadFrame of DataFrame TagOfPayloadFrame Type = 0x2E TagOfBackflowFrame Type = 0x2D diff --git a/core/frame/meta_frame.go b/core/frame/meta_frame.go index 712c00976..8c9a70375 100644 --- a/core/frame/meta_frame.go +++ b/core/frame/meta_frame.go @@ -11,9 +11,10 @@ import ( // MetaFrame is a Y3 encoded bytes, SeqID is a fixed value of TYPE_ID_TRANSACTION. // used for describes metadata for a DataFrame. type MetaFrame struct { - tid string - metadata []byte - sourceID string + tid string + metadata []byte + sourceID string + broadcast bool } // NewMetaFrame creates a new MetaFrame instance. @@ -55,6 +56,16 @@ func (m *MetaFrame) SourceID() string { return m.sourceID } +// SetBroadcast set broadcast mode +func (m *MetaFrame) SetBroadcast(enabled bool) { + m.broadcast = enabled +} + +// IsBroadcast returns the broadcast mode is enabled +func (m *MetaFrame) IsBroadcast() bool { + return m.broadcast +} + // Encode implements Frame.Encode method. func (m *MetaFrame) Encode() []byte { meta := y3.NewNodePacketEncoder(byte(TagOfMetaFrame)) @@ -75,6 +86,11 @@ func (m *MetaFrame) Encode() []byte { meta.AddPrimitivePacket(metadata) } + // broadcast mode + broadcast := y3.NewPrimitivePacketEncoder(byte(TagOfBroadcast)) + broadcast.SetBoolValue(m.broadcast) + meta.AddPrimitivePacket(broadcast) + return meta.Encode() } @@ -95,17 +111,20 @@ func DecodeToMetaFrame(buf []byte) (*MetaFrame, error) { return nil, err } meta.tid = val - break case byte(TagOfMetadata): meta.metadata = v.ToBytes() - break case byte(TagOfSourceID): sourceID, err := v.ToUTF8String() if err != nil { return nil, err } meta.sourceID = sourceID - break + case byte(TagOfBroadcast): + broadcast, err := v.ToBool() + if err != nil { + return nil, err + } + meta.broadcast = broadcast } } diff --git a/core/frame/meta_frame_test.go b/core/frame/meta_frame_test.go index 31e58b22c..463b227ba 100644 --- a/core/frame/meta_frame_test.go +++ b/core/frame/meta_frame_test.go @@ -8,18 +8,21 @@ import ( func TestMetaFrameEncode(t *testing.T) { m := NewMetaFrame() + m.SetBroadcast(true) tidbuf := []byte(m.tid) - result := []byte{0x80 | byte(TagOfMetaFrame), byte(1 + 1 + len(tidbuf) + 2), byte(TagOfTransactionID), byte(len(tidbuf))} + result := []byte{0x80 | byte(TagOfMetaFrame), byte(1 + 1 + len(tidbuf) + 2 + 3), byte(TagOfTransactionID), byte(len(tidbuf))} result = append(result, tidbuf...) result = append(result, byte(TagOfSourceID), 0x0) + result = append(result, byte(TagOfBroadcast), 0x1, 0x1) assert.Equal(t, result, m.Encode()) } func TestMetaFrameDecode(t *testing.T) { - buf := []byte{0x80 | byte(TagOfMetaFrame), 0x09, byte(TagOfTransactionID), 0x04, 0x31, 0x32, 0x33, 0x34, byte(TagOfSourceID), 0x01, 0x31} + buf := []byte{0x80 | byte(TagOfMetaFrame), 0x0C, byte(TagOfTransactionID), 0x04, 0x31, 0x32, 0x33, 0x34, byte(TagOfSourceID), 0x01, 0x31, byte(TagOfBroadcast), 0x01, 0x01} meta, err := DecodeToMetaFrame(buf) assert.NoError(t, err) assert.EqualValues(t, "1234", meta.TransactionID()) assert.EqualValues(t, "1", meta.SourceID()) + assert.EqualValues(t, true, meta.IsBroadcast()) t.Logf("%# x", buf) } diff --git a/core/server.go b/core/server.go index 0ac9946a5..8e4445c48 100644 --- a/core/server.go +++ b/core/server.go @@ -231,7 +231,7 @@ func (s *Server) handleConnection(c *Context) { // before frame handlers for _, handler := range s.beforeHandlers { if err := handler(c); err != nil { - logger.Errorf("%safterFrameHandler err: %s", ServerLogPrefix, err) + logger.Errorf("%sbeforeFrameHandler err: %s", ServerLogPrefix, err) c.CloseWithError(yerr.ErrorCodeBeforeHandler, err.Error()) return } @@ -279,8 +279,10 @@ func (s *Server) mainFrameHandler(c *Context) error { conn := s.connector.Get(c.connID) if conn != nil && conn.ClientType() == ClientTypeSource { f := c.Frame.(*frame.DataFrame) - f.GetMetaFrame().SetMetadata(conn.Metadata().Encode()) - s.dispatchToDownstreams(f) + if f.IsBroadcast() { + f.GetMetaFrame().SetMetadata(conn.Metadata().Encode()) + s.dispatchToDownstreams(f) + } } // observe datatags backflow s.handleBackflowFrame(c) diff --git a/example/4-cascading-zipper/source/source.go b/example/4-cascading-zipper/source/source.go index 95c4aee8c..d48855d5d 100644 --- a/example/4-cascading-zipper/source/source.go +++ b/example/4-cascading-zipper/source/source.go @@ -41,6 +41,8 @@ func generateAndSendData(stream yomo.Source) error { data := []byte(fmt.Sprintf("%d", rnd.Uint32())) // send data via QUIC stream. _, err := stream.Write(data) + // using the following code, zipper will broadcast this message to cascading zippers + // err := stream.Broadcast(data) if err != nil { log.Printf("[source] ❌ Emit %v to YoMo-Zipper failure with err: %v", data, err) time.Sleep(500 * time.Millisecond) diff --git a/source.go b/source.go index 4883588a0..5c7653c5b 100644 --- a/source.go +++ b/source.go @@ -19,14 +19,16 @@ type Source interface { Connect() error // SetDataTag will set the tag of data when invoking Write(). SetDataTag(tag uint8) - // Write the data to downstream. - Write(p []byte) (n int, err error) + // Write the data to directed downstream. + Write(data []byte) (n int, err error) // WriteWithTag will write data with specified tag, default transactionID is epoch time. WriteWithTag(tag uint8, data []byte) error // SetErrorHandler set the error handler function when server error occurs SetErrorHandler(fn func(err error)) // [Experimental] SetReceiveHandler set the observe handler function SetReceiveHandler(fn func(tag byte, data []byte)) + // Write the data to all downstream + Broadcast(data []byte) error } // YoMo-Source @@ -108,3 +110,14 @@ func (s *yomoSource) SetReceiveHandler(fn func(byte, []byte)) { s.fn = fn s.client.Logger().Debugf("%sSetReceiveHandler(%v)", sourceLogPrefix, s.fn) } + +// Broadcast Write the data to all downstream +func (s *yomoSource) Broadcast(data []byte) error { + f := frame.NewDataFrame() + f.SetCarriage(byte(s.tag), data) + f.SetSourceID(s.client.ClientID()) + f.SetBroadcast(true) + s.client.Logger().Debugf("%sBroadcast: tid=%s, source_id=%s, data[%d]=%# x", + sourceLogPrefix, f.TransactionID(), f.SourceID(), len(data), frame.Shortly(data)) + return s.client.WriteFrame(f) +}