Skip to content

Commit

Permalink
feat: broadcast frame to cascading zippers (#366)
Browse files Browse the repository at this point in the history
* add dispatch field in metaframe

* add broadcast method in source

* clean

* add comments

* update comments

* rename the dispatch to broadcast and change the type
  • Loading branch information
venjiang committed Aug 22, 2022
1 parent 979023f commit 3acb5e4
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 17 deletions.
10 changes: 10 additions & 0 deletions core/frame/data_frame.go
Expand Up @@ -70,6 +70,16 @@ func (d *DataFrame) SourceID() string {
return d.metaFrame.SourceID()
}

// SetBroadcast set broadcast mode
func (d *DataFrame) SetBroadcast(enabled bool) {
d.metaFrame.SetBroadcast(enabled)
}

// IsBroadcast returns the broadcast mode is enabled
func (d *DataFrame) IsBroadcast() bool {
return d.metaFrame.IsBroadcast()
}

// Encode return Y3 encoded bytes of `DataFrame`
func (d *DataFrame) Encode() []byte {
data := y3.NewNodePacketEncoder(byte(d.Type()))
Expand Down
12 changes: 8 additions & 4 deletions core/frame/data_frame_test.go
Expand Up @@ -10,14 +10,16 @@ func TestDataFrameEncode(t *testing.T) {
var userDataTag byte = 0x15
d := NewDataFrame()
d.SetCarriage(userDataTag, []byte("yomo"))
d.SetBroadcast(true)

tidbuf := []byte(d.TransactionID())
result := []byte{
0x80 | byte(TagOfDataFrame), byte(len(tidbuf) + 4 + 8 + 2),
0x80 | byte(TagOfMetaFrame), byte(len(tidbuf) + 2 + 2),
0x80 | byte(TagOfDataFrame), byte(len(tidbuf) + 4 + 8 + 2 + 3),
0x80 | byte(TagOfMetaFrame), byte(len(tidbuf) + 2 + 2 + 3),
byte(TagOfTransactionID), byte(len(tidbuf))}
result = append(result, tidbuf...)
result = append(result, byte(TagOfSourceID), 0x0)
result = append(result, byte(TagOfBroadcast), 0x1, 0x1)
result = append(result, 0x80|byte(TagOfPayloadFrame), 0x06,
userDataTag, 0x04, 0x79, 0x6F, 0x6D, 0x6F)
assert.Equal(t, result, d.Encode())
Expand All @@ -26,14 +28,16 @@ func TestDataFrameEncode(t *testing.T) {
func TestDataFrameDecode(t *testing.T) {
var userDataTag byte = 0x15
buf := []byte{
0x80 | byte(TagOfDataFrame), 0x10,
0x80 | byte(TagOfMetaFrame), 0x06,
0x80 | byte(TagOfDataFrame), 0x10 + 3,
0x80 | byte(TagOfMetaFrame), 0x06 + 3,
byte(TagOfTransactionID), 0x04, 0x31, 0x32, 0x33, 0x34,
byte(TagOfBroadcast), 0x01, 0x01,
0x80 | byte(TagOfPayloadFrame), 0x06,
userDataTag, 0x04, 0x79, 0x6F, 0x6D, 0x6F}
data, err := DecodeToDataFrame(buf)
assert.NoError(t, err)
assert.EqualValues(t, "1234", data.TransactionID())
assert.EqualValues(t, userDataTag, data.GetDataTag())
assert.EqualValues(t, []byte("yomo"), data.GetCarriage())
assert.EqualValues(t, true, data.IsBroadcast())
}
1 change: 1 addition & 0 deletions core/frame/frame.go
Expand Up @@ -17,6 +17,7 @@ const (
TagOfMetadata Type = 0x03
TagOfTransactionID Type = 0x01
TagOfSourceID Type = 0x02
TagOfBroadcast Type = 0x04
// PayloadFrame of DataFrame
TagOfPayloadFrame Type = 0x2E
TagOfBackflowFrame Type = 0x2D
Expand Down
31 changes: 25 additions & 6 deletions core/frame/meta_frame.go
Expand Up @@ -11,9 +11,10 @@ import (
// MetaFrame is a Y3 encoded bytes, SeqID is a fixed value of TYPE_ID_TRANSACTION.
// used for describes metadata for a DataFrame.
type MetaFrame struct {
tid string
metadata []byte
sourceID string
tid string
metadata []byte
sourceID string
broadcast bool
}

// NewMetaFrame creates a new MetaFrame instance.
Expand Down Expand Up @@ -55,6 +56,16 @@ func (m *MetaFrame) SourceID() string {
return m.sourceID
}

// SetBroadcast set broadcast mode
func (m *MetaFrame) SetBroadcast(enabled bool) {
m.broadcast = enabled
}

// IsBroadcast returns the broadcast mode is enabled
func (m *MetaFrame) IsBroadcast() bool {
return m.broadcast
}

// Encode implements Frame.Encode method.
func (m *MetaFrame) Encode() []byte {
meta := y3.NewNodePacketEncoder(byte(TagOfMetaFrame))
Expand All @@ -75,6 +86,11 @@ func (m *MetaFrame) Encode() []byte {
meta.AddPrimitivePacket(metadata)
}

// broadcast mode
broadcast := y3.NewPrimitivePacketEncoder(byte(TagOfBroadcast))
broadcast.SetBoolValue(m.broadcast)
meta.AddPrimitivePacket(broadcast)

return meta.Encode()
}

Expand All @@ -95,17 +111,20 @@ func DecodeToMetaFrame(buf []byte) (*MetaFrame, error) {
return nil, err
}
meta.tid = val
break
case byte(TagOfMetadata):
meta.metadata = v.ToBytes()
break
case byte(TagOfSourceID):
sourceID, err := v.ToUTF8String()
if err != nil {
return nil, err
}
meta.sourceID = sourceID
break
case byte(TagOfBroadcast):
broadcast, err := v.ToBool()
if err != nil {
return nil, err
}
meta.broadcast = broadcast
}
}

Expand Down
7 changes: 5 additions & 2 deletions core/frame/meta_frame_test.go
Expand Up @@ -8,18 +8,21 @@ import (

func TestMetaFrameEncode(t *testing.T) {
m := NewMetaFrame()
m.SetBroadcast(true)
tidbuf := []byte(m.tid)
result := []byte{0x80 | byte(TagOfMetaFrame), byte(1 + 1 + len(tidbuf) + 2), byte(TagOfTransactionID), byte(len(tidbuf))}
result := []byte{0x80 | byte(TagOfMetaFrame), byte(1 + 1 + len(tidbuf) + 2 + 3), byte(TagOfTransactionID), byte(len(tidbuf))}
result = append(result, tidbuf...)
result = append(result, byte(TagOfSourceID), 0x0)
result = append(result, byte(TagOfBroadcast), 0x1, 0x1)
assert.Equal(t, result, m.Encode())
}

func TestMetaFrameDecode(t *testing.T) {
buf := []byte{0x80 | byte(TagOfMetaFrame), 0x09, byte(TagOfTransactionID), 0x04, 0x31, 0x32, 0x33, 0x34, byte(TagOfSourceID), 0x01, 0x31}
buf := []byte{0x80 | byte(TagOfMetaFrame), 0x0C, byte(TagOfTransactionID), 0x04, 0x31, 0x32, 0x33, 0x34, byte(TagOfSourceID), 0x01, 0x31, byte(TagOfBroadcast), 0x01, 0x01}
meta, err := DecodeToMetaFrame(buf)
assert.NoError(t, err)
assert.EqualValues(t, "1234", meta.TransactionID())
assert.EqualValues(t, "1", meta.SourceID())
assert.EqualValues(t, true, meta.IsBroadcast())
t.Logf("%# x", buf)
}
8 changes: 5 additions & 3 deletions core/server.go
Expand Up @@ -231,7 +231,7 @@ func (s *Server) handleConnection(c *Context) {
// before frame handlers
for _, handler := range s.beforeHandlers {
if err := handler(c); err != nil {
logger.Errorf("%safterFrameHandler err: %s", ServerLogPrefix, err)
logger.Errorf("%sbeforeFrameHandler err: %s", ServerLogPrefix, err)
c.CloseWithError(yerr.ErrorCodeBeforeHandler, err.Error())
return
}
Expand Down Expand Up @@ -279,8 +279,10 @@ func (s *Server) mainFrameHandler(c *Context) error {
conn := s.connector.Get(c.connID)
if conn != nil && conn.ClientType() == ClientTypeSource {
f := c.Frame.(*frame.DataFrame)
f.GetMetaFrame().SetMetadata(conn.Metadata().Encode())
s.dispatchToDownstreams(f)
if f.IsBroadcast() {
f.GetMetaFrame().SetMetadata(conn.Metadata().Encode())
s.dispatchToDownstreams(f)
}
}
// observe datatags backflow
s.handleBackflowFrame(c)
Expand Down
2 changes: 2 additions & 0 deletions example/4-cascading-zipper/source/source.go
Expand Up @@ -41,6 +41,8 @@ func generateAndSendData(stream yomo.Source) error {
data := []byte(fmt.Sprintf("%d", rnd.Uint32()))
// send data via QUIC stream.
_, err := stream.Write(data)
// using the following code, zipper will broadcast this message to cascading zippers
// err := stream.Broadcast(data)
if err != nil {
log.Printf("[source] ❌ Emit %v to YoMo-Zipper failure with err: %v", data, err)
time.Sleep(500 * time.Millisecond)
Expand Down
17 changes: 15 additions & 2 deletions source.go
Expand Up @@ -19,14 +19,16 @@ type Source interface {
Connect() error
// SetDataTag will set the tag of data when invoking Write().
SetDataTag(tag uint8)
// Write the data to downstream.
Write(p []byte) (n int, err error)
// Write the data to directed downstream.
Write(data []byte) (n int, err error)
// WriteWithTag will write data with specified tag, default transactionID is epoch time.
WriteWithTag(tag uint8, data []byte) error
// SetErrorHandler set the error handler function when server error occurs
SetErrorHandler(fn func(err error))
// [Experimental] SetReceiveHandler set the observe handler function
SetReceiveHandler(fn func(tag byte, data []byte))
// Write the data to all downstream
Broadcast(data []byte) error
}

// YoMo-Source
Expand Down Expand Up @@ -112,3 +114,14 @@ func (s *yomoSource) SetReceiveHandler(fn func(byte, []byte)) {
s.fn = fn
s.client.Logger().Debugf("%sSetReceiveHandler(%v)", sourceLogPrefix, s.fn)
}

// Broadcast Write the data to all downstream
func (s *yomoSource) Broadcast(data []byte) error {
f := frame.NewDataFrame()
f.SetCarriage(byte(s.tag), data)
f.SetSourceID(s.client.ClientID())
f.SetBroadcast(true)
s.client.Logger().Debugf("%sBroadcast: tid=%s, source_id=%s, data[%d]=%# x",
sourceLogPrefix, f.TransactionID(), f.SourceID(), len(data), frame.Shortly(data))
return s.client.WriteFrame(f)
}

0 comments on commit 3acb5e4

Please sign in to comment.