Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: broadcast frame to cascading zippers #366

Merged
merged 6 commits into from Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/frame/data_frame.go
Expand Up @@ -70,6 +70,16 @@ func (d *DataFrame) SourceID() string {
return d.metaFrame.SourceID()
}

// SetBroadcast set broadcast mode
func (d *DataFrame) SetBroadcast(enabled bool) {
d.metaFrame.SetBroadcast(enabled)
}

// IsBroadcast returns the broadcast mode is enabled
func (d *DataFrame) IsBroadcast() bool {
return d.metaFrame.IsBroadcast()
}

// Encode return Y3 encoded bytes of `DataFrame`
func (d *DataFrame) Encode() []byte {
data := y3.NewNodePacketEncoder(byte(d.Type()))
Expand Down
12 changes: 8 additions & 4 deletions core/frame/data_frame_test.go
Expand Up @@ -10,14 +10,16 @@ func TestDataFrameEncode(t *testing.T) {
var userDataTag byte = 0x15
d := NewDataFrame()
d.SetCarriage(userDataTag, []byte("yomo"))
d.SetBroadcast(true)

tidbuf := []byte(d.TransactionID())
result := []byte{
0x80 | byte(TagOfDataFrame), byte(len(tidbuf) + 4 + 8 + 2),
0x80 | byte(TagOfMetaFrame), byte(len(tidbuf) + 2 + 2),
0x80 | byte(TagOfDataFrame), byte(len(tidbuf) + 4 + 8 + 2 + 3),
0x80 | byte(TagOfMetaFrame), byte(len(tidbuf) + 2 + 2 + 3),
byte(TagOfTransactionID), byte(len(tidbuf))}
result = append(result, tidbuf...)
result = append(result, byte(TagOfSourceID), 0x0)
result = append(result, byte(TagOfBroadcast), 0x1, 0x1)
result = append(result, 0x80|byte(TagOfPayloadFrame), 0x06,
userDataTag, 0x04, 0x79, 0x6F, 0x6D, 0x6F)
assert.Equal(t, result, d.Encode())
Expand All @@ -26,14 +28,16 @@ func TestDataFrameEncode(t *testing.T) {
func TestDataFrameDecode(t *testing.T) {
var userDataTag byte = 0x15
buf := []byte{
0x80 | byte(TagOfDataFrame), 0x10,
0x80 | byte(TagOfMetaFrame), 0x06,
0x80 | byte(TagOfDataFrame), 0x10 + 3,
0x80 | byte(TagOfMetaFrame), 0x06 + 3,
byte(TagOfTransactionID), 0x04, 0x31, 0x32, 0x33, 0x34,
byte(TagOfBroadcast), 0x01, 0x01,
0x80 | byte(TagOfPayloadFrame), 0x06,
userDataTag, 0x04, 0x79, 0x6F, 0x6D, 0x6F}
data, err := DecodeToDataFrame(buf)
assert.NoError(t, err)
assert.EqualValues(t, "1234", data.TransactionID())
assert.EqualValues(t, userDataTag, data.GetDataTag())
assert.EqualValues(t, []byte("yomo"), data.GetCarriage())
assert.EqualValues(t, true, data.IsBroadcast())
}
1 change: 1 addition & 0 deletions core/frame/frame.go
Expand Up @@ -17,6 +17,7 @@ const (
TagOfMetadata Type = 0x03
TagOfTransactionID Type = 0x01
TagOfSourceID Type = 0x02
TagOfBroadcast Type = 0x04
// PayloadFrame of DataFrame
TagOfPayloadFrame Type = 0x2E
TagOfBackflowFrame Type = 0x2D
Expand Down
31 changes: 25 additions & 6 deletions core/frame/meta_frame.go
Expand Up @@ -11,9 +11,10 @@ import (
// MetaFrame is a Y3 encoded bytes, SeqID is a fixed value of TYPE_ID_TRANSACTION.
// used for describes metadata for a DataFrame.
type MetaFrame struct {
tid string
metadata []byte
sourceID string
tid string
metadata []byte
sourceID string
broadcast bool
}

// NewMetaFrame creates a new MetaFrame instance.
Expand Down Expand Up @@ -55,6 +56,16 @@ func (m *MetaFrame) SourceID() string {
return m.sourceID
}

// SetBroadcast set broadcast mode
func (m *MetaFrame) SetBroadcast(enabled bool) {
m.broadcast = enabled
}

// IsBroadcast returns the broadcast mode is enabled
func (m *MetaFrame) IsBroadcast() bool {
return m.broadcast
}

// Encode implements Frame.Encode method.
func (m *MetaFrame) Encode() []byte {
meta := y3.NewNodePacketEncoder(byte(TagOfMetaFrame))
Expand All @@ -75,6 +86,11 @@ func (m *MetaFrame) Encode() []byte {
meta.AddPrimitivePacket(metadata)
}

// broadcast mode
broadcast := y3.NewPrimitivePacketEncoder(byte(TagOfBroadcast))
broadcast.SetBoolValue(m.broadcast)
meta.AddPrimitivePacket(broadcast)

return meta.Encode()
}

Expand All @@ -95,17 +111,20 @@ func DecodeToMetaFrame(buf []byte) (*MetaFrame, error) {
return nil, err
}
meta.tid = val
break
case byte(TagOfMetadata):
meta.metadata = v.ToBytes()
break
case byte(TagOfSourceID):
sourceID, err := v.ToUTF8String()
if err != nil {
return nil, err
}
meta.sourceID = sourceID
break
case byte(TagOfBroadcast):
broadcast, err := v.ToBool()
if err != nil {
return nil, err
}
meta.broadcast = broadcast
}
}

Expand Down
7 changes: 5 additions & 2 deletions core/frame/meta_frame_test.go
Expand Up @@ -8,18 +8,21 @@ import (

func TestMetaFrameEncode(t *testing.T) {
m := NewMetaFrame()
m.SetBroadcast(true)
tidbuf := []byte(m.tid)
result := []byte{0x80 | byte(TagOfMetaFrame), byte(1 + 1 + len(tidbuf) + 2), byte(TagOfTransactionID), byte(len(tidbuf))}
result := []byte{0x80 | byte(TagOfMetaFrame), byte(1 + 1 + len(tidbuf) + 2 + 3), byte(TagOfTransactionID), byte(len(tidbuf))}
result = append(result, tidbuf...)
result = append(result, byte(TagOfSourceID), 0x0)
result = append(result, byte(TagOfBroadcast), 0x1, 0x1)
assert.Equal(t, result, m.Encode())
}

func TestMetaFrameDecode(t *testing.T) {
buf := []byte{0x80 | byte(TagOfMetaFrame), 0x09, byte(TagOfTransactionID), 0x04, 0x31, 0x32, 0x33, 0x34, byte(TagOfSourceID), 0x01, 0x31}
buf := []byte{0x80 | byte(TagOfMetaFrame), 0x0C, byte(TagOfTransactionID), 0x04, 0x31, 0x32, 0x33, 0x34, byte(TagOfSourceID), 0x01, 0x31, byte(TagOfBroadcast), 0x01, 0x01}
meta, err := DecodeToMetaFrame(buf)
assert.NoError(t, err)
assert.EqualValues(t, "1234", meta.TransactionID())
assert.EqualValues(t, "1", meta.SourceID())
assert.EqualValues(t, true, meta.IsBroadcast())
t.Logf("%# x", buf)
}
8 changes: 5 additions & 3 deletions core/server.go
Expand Up @@ -231,7 +231,7 @@ func (s *Server) handleConnection(c *Context) {
// before frame handlers
for _, handler := range s.beforeHandlers {
if err := handler(c); err != nil {
logger.Errorf("%safterFrameHandler err: %s", ServerLogPrefix, err)
logger.Errorf("%sbeforeFrameHandler err: %s", ServerLogPrefix, err)
c.CloseWithError(yerr.ErrorCodeBeforeHandler, err.Error())
return
}
Expand Down Expand Up @@ -279,8 +279,10 @@ func (s *Server) mainFrameHandler(c *Context) error {
conn := s.connector.Get(c.connID)
if conn != nil && conn.ClientType() == ClientTypeSource {
f := c.Frame.(*frame.DataFrame)
f.GetMetaFrame().SetMetadata(conn.Metadata().Encode())
s.dispatchToDownstreams(f)
if f.IsBroadcast() {
f.GetMetaFrame().SetMetadata(conn.Metadata().Encode())
s.dispatchToDownstreams(f)
}
}
// observe datatags backflow
s.handleBackflowFrame(c)
Expand Down
2 changes: 2 additions & 0 deletions example/4-cascading-zipper/source/source.go
Expand Up @@ -41,6 +41,8 @@ func generateAndSendData(stream yomo.Source) error {
data := []byte(fmt.Sprintf("%d", rnd.Uint32()))
// send data via QUIC stream.
_, err := stream.Write(data)
// using the following code, zipper will broadcast this message to cascading zippers
// err := stream.Broadcast(data)
if err != nil {
log.Printf("[source] ❌ Emit %v to YoMo-Zipper failure with err: %v", data, err)
time.Sleep(500 * time.Millisecond)
Expand Down
17 changes: 15 additions & 2 deletions source.go
Expand Up @@ -19,14 +19,16 @@ type Source interface {
Connect() error
// SetDataTag will set the tag of data when invoking Write().
SetDataTag(tag uint8)
// Write the data to downstream.
Write(p []byte) (n int, err error)
// Write the data to directed downstream.
Write(data []byte) (n int, err error)
// WriteWithTag will write data with specified tag, default transactionID is epoch time.
WriteWithTag(tag uint8, data []byte) error
// SetErrorHandler set the error handler function when server error occurs
SetErrorHandler(fn func(err error))
// [Experimental] SetReceiveHandler set the observe handler function
SetReceiveHandler(fn func(tag byte, data []byte))
// Write the data to all downstream
Broadcast(data []byte) error
}

// YoMo-Source
Expand Down Expand Up @@ -108,3 +110,14 @@ func (s *yomoSource) SetReceiveHandler(fn func(byte, []byte)) {
s.fn = fn
s.client.Logger().Debugf("%sSetReceiveHandler(%v)", sourceLogPrefix, s.fn)
}

// Broadcast Write the data to all downstream
func (s *yomoSource) Broadcast(data []byte) error {
f := frame.NewDataFrame()
f.SetCarriage(byte(s.tag), data)
f.SetSourceID(s.client.ClientID())
f.SetBroadcast(true)
s.client.Logger().Debugf("%sBroadcast: tid=%s, source_id=%s, data[%d]=%# x",
sourceLogPrefix, f.TransactionID(), f.SourceID(), len(data), frame.Shortly(data))
return s.client.WriteFrame(f)
}