diff --git a/core/client.go b/core/client.go index 2c57b12d5..d22b4ac2f 100644 --- a/core/client.go +++ b/core/client.go @@ -14,6 +14,7 @@ import ( "github.com/yomorun/yomo/core/frame" "github.com/yomorun/yomo/core/log" "github.com/yomorun/yomo/core/yerr" + "github.com/yomorun/yomo/pkg/id" "github.com/yomorun/yomo/pkg/logger" pkgtls "github.com/yomorun/yomo/pkg/tls" ) @@ -27,13 +28,15 @@ type ConnState = string // Client is the abstraction of a YoMo-Client. a YoMo-Client can be // Source, Upstream Zipper or StreamFunction. type Client struct { - name string // name of the client - clientType ClientType // type of the connection - conn quic.Connection // quic connection - stream quic.Stream // quic stream - state ConnState // state of the connection - processor func(*frame.DataFrame) // functions to invoke when data arrived - addr string // the address of server connected to + name string // name of the client + clientID string // id of the client + clientType ClientType // type of the connection + conn quic.Connection // quic connection + stream quic.Stream // quic stream + state ConnState // state of the connection + processor func(*frame.DataFrame) // functions to invoke when data arrived + receiver func(*frame.BackflowFrame) // functions to invoke when data is processed + addr string // the address of server connected to mu sync.Mutex opts ClientOptions localAddr string // client local addr, it will be changed on reconnect @@ -47,6 +50,7 @@ type Client struct { func NewClient(appName string, connType ClientType, opts ...ClientOption) *Client { c := &Client{ name: appName, + clientID: id.New(), clientType: connType, state: ConnStateReady, opts: ClientOptions{}, @@ -111,6 +115,7 @@ func (c *Client) connect(ctx context.Context, addr string) error { // send handshake handshake := frame.NewHandshakeFrame( c.name, + c.clientID, byte(c.clientType), c.opts.ObserveDataTags, c.opts.Credential.Name(), @@ -124,7 +129,7 @@ func (c *Client) connect(ctx context.Context, addr string) error { c.state = ConnStateConnected c.localAddr = c.conn.LocalAddr().String() - c.logger.Printf("%s❤️ [%s](%s) is connected to YoMo-Zipper %s", ClientLogPrefix, c.name, c.localAddr, addr) + c.logger.Printf("%s❤️ [%s][%s](%s) is connected to YoMo-Zipper %s", ClientLogPrefix, c.name, c.clientID, c.localAddr, addr) // receiving frames go c.handleFrame() @@ -221,7 +226,7 @@ func (c *Client) handleFrame() { case frame.TagOfDataFrame: // DataFrame carries user's data if v, ok := f.(*frame.DataFrame); ok { c.setState(ConnStateTransportData) - c.logger.Debugf("%sreceive DataFrame, tag=%# x, tid=%s, carry=%# x", ClientLogPrefix, v.GetDataTag(), v.TransactionID(), v.GetCarriage()) + c.logger.Debugf("%sreceive DataFrame, tag=%#x, tid=%s, carry=%# x", ClientLogPrefix, v.GetDataTag(), v.TransactionID(), v.GetCarriage()) if c.processor == nil { c.logger.Warnf("%sprocessor is nil", ClientLogPrefix) } else { @@ -230,6 +235,16 @@ func (c *Client) handleFrame() { c.processor(v) } } + case frame.TagOfBackflowFrame: + if v, ok := f.(*frame.BackflowFrame); ok { + c.logger.Debugf("%sreceive BackflowFrame, tag=%#x, carry=%# x", ClientLogPrefix, v.GetDataTag(), v.GetCarriage()) + if c.receiver == nil { + c.logger.Warnf("%sreceiver is nil", ClientLogPrefix) + } else { + c.setState(ConnStateBackflow) + c.receiver(v) + } + } default: c.logger.Errorf("%sunknown signal", ClientLogPrefix) } @@ -239,7 +254,7 @@ func (c *Client) handleFrame() { // Close the client. func (c *Client) Close() (err error) { if c.conn != nil { - c.logger.Printf("%sclose the connection, name:%s, addr:%s", ClientLogPrefix, c.name, c.conn.RemoteAddr().String()) + c.logger.Printf("%sclose the connection, name:%s, id:%s, addr:%s", ClientLogPrefix, c.name, c.clientID, c.conn.RemoteAddr().String()) } if c.stream != nil { err = c.stream.Close() @@ -333,6 +348,12 @@ func (c *Client) SetDataFrameObserver(fn func(*frame.DataFrame)) { c.logger.Debugf("%sSetDataFrameObserver(%v)", ClientLogPrefix, c.processor) } +// SetBackflowFrameObserver sets the backflow frame handler. +func (c *Client) SetBackflowFrameObserver(fn func(*frame.BackflowFrame)) { + c.receiver = fn + c.logger.Debugf("%sSetBackflowFrameObserver(%v)", ClientLogPrefix, c.receiver) +} + // reconnect the connection between client and server. func (c *Client) reconnect(ctx context.Context, addr string) { t := time.NewTicker(1 * time.Second) @@ -348,10 +369,10 @@ func (c *Client) reconnect(ctx context.Context, addr string) { return case <-t.C: if c.getState() == ConnStateDisconnected { - c.logger.Printf("%s[%s](%s) is reconnecting to YoMo-Zipper %s...", ClientLogPrefix, c.name, c.localAddr, addr) + c.logger.Printf("%s[%s][%s](%s) is reconnecting to YoMo-Zipper %s...", ClientLogPrefix, c.name, c.clientID, c.localAddr, addr) err := c.connect(ctx, addr) if err != nil { - c.logger.Errorf("%s[%s](%s) reconnect error:%v", ClientLogPrefix, c.name, c.localAddr, err) + c.logger.Errorf("%s[%s][%s](%s) reconnect error:%v", ClientLogPrefix, c.name, c.clientID, c.localAddr, err) } } } @@ -443,3 +464,8 @@ func (c *Client) SetErrorHandler(fn func(err error)) { }() } } + +// ClientID return the client ID +func (c *Client) ClientID() string { + return c.clientID +} diff --git a/core/connection.go b/core/connection.go index 55ac28c32..54d69fc99 100644 --- a/core/connection.go +++ b/core/connection.go @@ -13,12 +13,16 @@ type Connection interface { // Name returns the name of the connection, which is set by clients Name() string + // ClientID connection client ID + ClientID() string // ClientType returns the type of the client (Source | SFN | UpstreamZipper) ClientType() ClientType // Metadata returns the extra info of the application Metadata() Metadata // Write should goroutine-safely send y3 frames to peer side Write(f frame.Frame) error + // ObserveDataTags observed data tags + ObserveDataTags() []byte } type connection struct { @@ -26,13 +30,17 @@ type connection struct { clientType ClientType metadata Metadata stream io.ReadWriteCloser + clientID string + observed []byte // observed data tags mu sync.Mutex } -func newConnection(name string, clientType ClientType, metadata Metadata, stream io.ReadWriteCloser) Connection { +func newConnection(name string, clientID string, clientType ClientType, metadata Metadata, stream io.ReadWriteCloser, observed []byte) Connection { return &connection{ name: name, + clientID: clientID, clientType: clientType, + observed: observed, metadata: metadata, stream: stream, } @@ -65,3 +73,13 @@ func (c *connection) Write(f frame.Frame) error { _, err := c.stream.Write(f.Encode()) return err } + +// ObserveDataTags observed data tags +func (c *connection) ObserveDataTags() []byte { + return c.observed +} + +// ClientID connection client ID +func (c *connection) ClientID() string { + return c.clientID +} diff --git a/core/connector.go b/core/connector.go index aa180a0a7..035d6d24b 100644 --- a/core/connector.go +++ b/core/connector.go @@ -18,6 +18,8 @@ type Connector interface { Get(connID string) Connection // GetSnapshot gets the snapshot of all connections. GetSnapshot() map[string]string + // GetSourceConns gets the connections by source observe tags. + GetSourceConns(sourceID string, tags byte) []Connection // Clean the connector. Clean() } @@ -51,6 +53,23 @@ func (c *connector) Get(connID string) Connection { return nil } +// GetSourceConns gets the source connection by tag. +func (c *connector) GetSourceConns(sourceID string, tag byte) []Connection { + conns := make([]Connection, 0) + + c.conns.Range(func(key interface{}, val interface{}) bool { + conn := val.(Connection) + for _, v := range conn.ObserveDataTags() { + if v == tag && conn.ClientType() == ClientTypeSource && conn.ClientID() == sourceID { + conns = append(conns, conn) + } + } + return true + }) + + return conns +} + // GetSnapshot gets the snapshot of all connections. func (c *connector) GetSnapshot() map[string]string { result := make(map[string]string) diff --git a/core/core.go b/core/core.go index 664db9de7..4dc282fa6 100644 --- a/core/core.go +++ b/core/core.go @@ -25,6 +25,7 @@ const ( ConnStateAborted ConnState = "Aborted" ConnStateClosed ConnState = "Closed" // close connection by server ConnStateGoaway ConnState = "Goaway" + ConnStateBackflow ConnState = "Backflow" ) // Prefix is the prefix for logger. diff --git a/core/frame/backflow_frame.go b/core/frame/backflow_frame.go new file mode 100644 index 000000000..46a885334 --- /dev/null +++ b/core/frame/backflow_frame.go @@ -0,0 +1,70 @@ +package frame + +import ( + "github.com/yomorun/y3" +) + +// BackflowFrame is a Y3 encoded bytes +// It's used to receive stream function processed result +type BackflowFrame struct { + Tag byte + Carriage []byte +} + +// NewBackflowFrame creates a new BackflowFrame with a given tag and carriage +func NewBackflowFrame(tag byte, carriage []byte) *BackflowFrame { + return &BackflowFrame{ + Tag: tag, + Carriage: carriage, + } +} + +// Type gets the type of Frame. +func (f *BackflowFrame) Type() Type { + return TagOfBackflowFrame +} + +// SetCarriage sets the user's raw data +func (f *BackflowFrame) SetCarriage(buf []byte) *BackflowFrame { + f.Carriage = buf + return f +} + +// Encode to Y3 encoded bytes +func (f *BackflowFrame) Encode() []byte { + carriage := y3.NewPrimitivePacketEncoder(f.Tag) + carriage.SetBytesValue(f.Carriage) + + node := y3.NewNodePacketEncoder(byte(TagOfBackflowFrame)) + node.AddPrimitivePacket(carriage) + + return node.Encode() +} + +// GetDataTag return the Tag of user's data +func (f *BackflowFrame) GetDataTag() byte { + return f.Tag +} + +// GetCarriage return data +func (f *BackflowFrame) GetCarriage() []byte { + return f.Carriage +} + +// DecodeToBackflowFrame decodes Y3 encoded bytes to BackflowFrame +func DecodeToBackflowFrame(buf []byte) (*BackflowFrame, error) { + nodeBlock := y3.NodePacket{} + _, err := y3.DecodeToNodePacket(buf, &nodeBlock) + if err != nil { + return nil, err + } + + payload := &BackflowFrame{} + for _, v := range nodeBlock.PrimitivePackets { + payload.Tag = v.SeqID() + payload.Carriage = v.GetValBuf() + break + } + + return payload, nil +} diff --git a/core/frame/data_frame.go b/core/frame/data_frame.go index 44ed7af66..eb06c641d 100644 --- a/core/frame/data_frame.go +++ b/core/frame/data_frame.go @@ -60,6 +60,16 @@ func (d *DataFrame) GetDataTag() byte { return d.payloadFrame.Tag } +// SetSourceID set the source ID. +func (d *DataFrame) SetSourceID(sourceID string) { + d.metaFrame.SetSourceID(sourceID) +} + +// SourceID returns source ID +func (d *DataFrame) SourceID() string { + return d.metaFrame.SourceID() +} + // Encode return Y3 encoded bytes of `DataFrame` func (d *DataFrame) Encode() []byte { data := y3.NewNodePacketEncoder(byte(d.Type())) diff --git a/core/frame/data_frame_test.go b/core/frame/data_frame_test.go index 878cb3141..0d1d506f4 100644 --- a/core/frame/data_frame_test.go +++ b/core/frame/data_frame_test.go @@ -13,10 +13,11 @@ func TestDataFrameEncode(t *testing.T) { tidbuf := []byte(d.TransactionID()) result := []byte{ - 0x80 | byte(TagOfDataFrame), byte(len(tidbuf) + 4 + 8), - 0x80 | byte(TagOfMetaFrame), byte(len(tidbuf) + 2), + 0x80 | byte(TagOfDataFrame), byte(len(tidbuf) + 4 + 8 + 2), + 0x80 | byte(TagOfMetaFrame), byte(len(tidbuf) + 2 + 2), byte(TagOfTransactionID), byte(len(tidbuf))} result = append(result, tidbuf...) + result = append(result, byte(TagOfSourceID), 0x0) result = append(result, 0x80|byte(TagOfPayloadFrame), 0x06, userDataTag, 0x04, 0x79, 0x6F, 0x6D, 0x6F) assert.Equal(t, result, d.Encode()) diff --git a/core/frame/frame.go b/core/frame/frame.go index 63b3df642..326c31deb 100644 --- a/core/frame/frame.go +++ b/core/frame/frame.go @@ -16,16 +16,17 @@ const ( TagOfMetaFrame Type = 0x2F TagOfMetadata Type = 0x03 TagOfTransactionID Type = 0x01 + TagOfSourceID Type = 0x02 // PayloadFrame of DataFrame - TagOfPayloadFrame Type = 0x2E + TagOfPayloadFrame Type = 0x2E + TagOfBackflowFrame Type = 0x2D TagOfTokenFrame Type = 0x3E // HandshakeFrame - TagOfHandshakeFrame Type = 0x3D - TagOfHandshakeName Type = 0x01 - TagOfHandshakeType Type = 0x02 - // TagOfHandshakeAppID Type = 0x03 - // TagOfHandshakeAuthType Type = 0x04 + TagOfHandshakeFrame Type = 0x3D + TagOfHandshakeName Type = 0x01 + TagOfHandshakeType Type = 0x02 + TagOfHandshakeID Type = 0x03 TagOfHandshakeAuthName Type = 0x04 TagOfHandshakeAuthPayload Type = 0x05 TagOfHandshakeObserveDataTags Type = 0x06 @@ -71,6 +72,8 @@ func (f Type) String() string { return "RejectedFrame" case TagOfGoawayFrame: return "GoawayFrame" + case TagOfBackflowFrame: + return "BackflowFrame" case TagOfMetaFrame: return "MetaFrame" case TagOfPayloadFrame: diff --git a/core/frame/handshake_frame.go b/core/frame/handshake_frame.go index d095b44bd..9154e5e29 100644 --- a/core/frame/handshake_frame.go +++ b/core/frame/handshake_frame.go @@ -8,6 +8,8 @@ import ( type HandshakeFrame struct { // Name is client name Name string + // ClientID represents client ID + ClientID string // ClientType represents client type (source or sfn) ClientType byte // ObserveDataTags are the client data tag list. @@ -18,9 +20,10 @@ type HandshakeFrame struct { } // NewHandshakeFrame creates a new HandshakeFrame. -func NewHandshakeFrame(name string, clientType byte, observeDataTags []byte, authName string, authPayload string) *HandshakeFrame { +func NewHandshakeFrame(name string, clientID string, clientType byte, observeDataTags []byte, authName string, authPayload string) *HandshakeFrame { return &HandshakeFrame{ Name: name, + ClientID: clientID, ClientType: clientType, ObserveDataTags: observeDataTags, authName: authName, @@ -38,7 +41,10 @@ func (h *HandshakeFrame) Encode() []byte { // name nameBlock := y3.NewPrimitivePacketEncoder(byte(TagOfHandshakeName)) nameBlock.SetStringValue(h.Name) - // type + // client ID + idBlock := y3.NewPrimitivePacketEncoder(byte(TagOfHandshakeID)) + idBlock.SetStringValue(h.ClientID) + // client type typeBlock := y3.NewPrimitivePacketEncoder(byte(TagOfHandshakeType)) typeBlock.SetBytesValue([]byte{h.ClientType}) // observe data tags @@ -52,6 +58,7 @@ func (h *HandshakeFrame) Encode() []byte { // handshake frame handshake := y3.NewNodePacketEncoder(byte(h.Type())) handshake.AddPrimitivePacket(nameBlock) + handshake.AddPrimitivePacket(idBlock) handshake.AddPrimitivePacket(typeBlock) handshake.AddPrimitivePacket(observeDataTagsBlock) handshake.AddPrimitivePacket(authNameBlock) @@ -77,7 +84,15 @@ func DecodeToHandshakeFrame(buf []byte) (*HandshakeFrame, error) { } handshake.Name = name } - // type + // client ID + if idBlock, ok := node.PrimitivePackets[byte(TagOfHandshakeID)]; ok { + id, err := idBlock.ToUTF8String() + if err != nil { + return nil, err + } + handshake.ClientID = id + } + // client type if typeBlock, ok := node.PrimitivePackets[byte(TagOfHandshakeType)]; ok { clientType := typeBlock.ToBytes() handshake.ClientType = clientType[0] diff --git a/core/frame/handshake_frame_test.go b/core/frame/handshake_frame_test.go index 1737d9309..480ddf51c 100644 --- a/core/frame/handshake_frame_test.go +++ b/core/frame/handshake_frame_test.go @@ -9,10 +9,11 @@ import ( func TestHandshakeFrameEncode(t *testing.T) { expectedName := "1234" var expectedType byte = 0xD3 - m := NewHandshakeFrame(expectedName, expectedType, []byte{0x01, 0x02}, "token", "a") + m := NewHandshakeFrame(expectedName, "", expectedType, []byte{0x01, 0x02}, "token", "a") assert.Equal(t, []byte{ - 0x80 | byte(TagOfHandshakeFrame), 0x17, + 0x80 | byte(TagOfHandshakeFrame), 0x19, byte(TagOfHandshakeName), 0x04, 0x31, 0x32, 0x33, 0x34, + byte(TagOfHandshakeID), 0x0, byte(TagOfHandshakeType), 0x01, 0xD3, byte(TagOfHandshakeObserveDataTags), 0x02, 0x01, 0x02, // byte(TagOfHandshakeAppID), 0x0, diff --git a/core/frame/meta_frame.go b/core/frame/meta_frame.go index 7794994f9..1d54a21b0 100644 --- a/core/frame/meta_frame.go +++ b/core/frame/meta_frame.go @@ -13,6 +13,7 @@ import ( type MetaFrame struct { tid string metadata []byte + sourceID string } // NewMetaFrame creates a new MetaFrame instance. @@ -44,19 +45,27 @@ func (m *MetaFrame) Metadata() []byte { return m.metadata } +// SetSourceID set the source ID. +func (m *MetaFrame) SetSourceID(sourceID string) { + m.sourceID = sourceID +} + +// SourceID returns source ID +func (m *MetaFrame) SourceID() string { + return m.sourceID +} + // Encode implements Frame.Encode method. func (m *MetaFrame) Encode() []byte { meta := y3.NewNodePacketEncoder(byte(TagOfMetaFrame)) - + // transaction ID transactionID := y3.NewPrimitivePacketEncoder(byte(TagOfTransactionID)) transactionID.SetStringValue(m.tid) meta.AddPrimitivePacket(transactionID) - - if m.metadata != nil { - metadata := y3.NewPrimitivePacketEncoder(byte(TagOfMetadata)) - metadata.SetBytesValue(m.metadata) - meta.AddPrimitivePacket(metadata) - } + // source ID + sourceID := y3.NewPrimitivePacketEncoder(byte(TagOfSourceID)) + sourceID.SetStringValue(m.sourceID) + meta.AddPrimitivePacket(sourceID) return meta.Encode() } @@ -82,6 +91,13 @@ func DecodeToMetaFrame(buf []byte) (*MetaFrame, error) { case byte(TagOfMetadata): meta.metadata = v.ToBytes() break + case byte(TagOfSourceID): + sourceID, err := v.ToUTF8String() + if err != nil { + return nil, err + } + meta.sourceID = sourceID + break } } diff --git a/core/frame/meta_frame_test.go b/core/frame/meta_frame_test.go index 94781a0ec..31e58b22c 100644 --- a/core/frame/meta_frame_test.go +++ b/core/frame/meta_frame_test.go @@ -9,14 +9,17 @@ import ( func TestMetaFrameEncode(t *testing.T) { m := NewMetaFrame() tidbuf := []byte(m.tid) - result := []byte{0x80 | byte(TagOfMetaFrame), byte(1 + 1 + len(tidbuf)), byte(TagOfTransactionID), byte(len(tidbuf))} + result := []byte{0x80 | byte(TagOfMetaFrame), byte(1 + 1 + len(tidbuf) + 2), byte(TagOfTransactionID), byte(len(tidbuf))} result = append(result, tidbuf...) + result = append(result, byte(TagOfSourceID), 0x0) assert.Equal(t, result, m.Encode()) } func TestMetaFrameDecode(t *testing.T) { - buf := []byte{0x80 | byte(TagOfMetaFrame), 0x06, byte(TagOfTransactionID), 0x04, 0x31, 0x32, 0x33, 0x34} + buf := []byte{0x80 | byte(TagOfMetaFrame), 0x09, byte(TagOfTransactionID), 0x04, 0x31, 0x32, 0x33, 0x34, byte(TagOfSourceID), 0x01, 0x31} meta, err := DecodeToMetaFrame(buf) assert.NoError(t, err) assert.EqualValues(t, "1234", meta.TransactionID()) + assert.EqualValues(t, "1", meta.SourceID()) + t.Logf("%# x", buf) } diff --git a/core/server.go b/core/server.go index 128b5ed8f..c244c7cf1 100644 --- a/core/server.go +++ b/core/server.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net" + "os" "sync" "sync/atomic" @@ -102,7 +103,7 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { return err } defer listener.Close() - logger.Printf("%s✅ [%s] Listening on: %s, MODE: %s, QUIC: %v, AUTH: %s", ServerLogPrefix, s.name, listener.Addr(), mode(), listener.Versions(), s.authNames()) + logger.Printf("%s✅ [%s][%d] Listening on: %s, MODE: %s, QUIC: %v, AUTH: %s", ServerLogPrefix, s.name, os.Getpid(), listener.Addr(), mode(), listener.Versions(), s.authNames()) s.state = ConnStateConnected for { @@ -135,20 +136,19 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { } logger.Printf("%s💔 [%s](%s) close the connection", ServerLogPrefix, conn.Name(), connID) } else { - logger.Errorf("%s❤️3/ [unknown](%s) on stream %v", ServerLogPrefix, connID, err) - // qconn.CloseWithError(quic.ApplicationErrorCode(yerr.ErrorCodeUnknown), err.Error()) + logger.Errorf("%s💙 [unknown](%s) on stream %v", ServerLogPrefix, connID, err) } break } defer stream.Close() - logger.Infof("%s❤️4/ [stream:%d] created, connID=%s", ServerLogPrefix, stream.StreamID(), connID) + logger.Infof("%s❤️3/ [stream:%d] created, connID=%s", ServerLogPrefix, stream.StreamID(), connID) // process frames on stream // c := newContext(connID, stream) c := newContext(conn, stream) defer c.Clean() s.handleConnection(c) - logger.Infof("%s❤️5/ [stream:%d] handleConnection DONE", ServerLogPrefix, stream.StreamID()) + logger.Infof("%s❤️4/ [stream:%d] handleConnection DONE", ServerLogPrefix, stream.StreamID()) } }(sctx, conn) } @@ -267,6 +267,8 @@ func (s *Server) mainFrameHandler(c *Context) error { f.GetMetaFrame().SetMetadata(conn.Metadata().Encode()) s.dispatchToDownstreams(f) } + // observe datatags backflow + s.handleBackflowFrame(c) } default: logger.Errorf("%serr=%v, frame=%v", ServerLogPrefix, err, frame.Shortly(c.Frame.Encode())) @@ -281,10 +283,11 @@ func (s *Server) handleHandshakeFrame(c *Context) error { logger.Debugf("%sGOT ❤️ HandshakeFrame : %# x", ServerLogPrefix, f) // basic info connID := c.ConnID() + clientID := f.ClientID clientType := ClientType(f.ClientType) stream := c.Stream // credential - logger.Debugf("%sClientType=%# x is %s, Credential=%s", ServerLogPrefix, f.ClientType, ClientType(f.ClientType), authName(f.AuthName())) + logger.Debugf("%sClientType=%# x is %s, ClientID=%s, Credential=%s", ServerLogPrefix, f.ClientType, ClientType(f.ClientType), clientID, authName(f.AuthName())) // authenticate if !s.authenticate(f) { err := fmt.Errorf("handshake authentication fails, client credential name is %s", authName(f.AuthName())) @@ -307,7 +310,7 @@ func (s *Server) handleHandshakeFrame(c *Context) error { if err != nil { return err } - conn = newConnection(f.Name, clientType, metadata, stream) + conn = newConnection(f.Name, f.ClientID, clientType, metadata, stream, f.ObserveDataTags) if clientType == ClientTypeStreamFunction { // route @@ -333,16 +336,17 @@ func (s *Server) handleHandshakeFrame(c *Context) error { } } case ClientTypeUpstreamZipper: - conn = newConnection(f.Name, clientType, nil, stream) + conn = newConnection(f.Name, f.ClientID, clientType, nil, stream, f.ObserveDataTags) default: // unknown client type + s.connector.Remove(connID) logger.Errorf("%sClientType=%# x, ilegal!", ServerLogPrefix, f.ClientType) c.CloseWithError(yerr.ErrorCodeUnknownClient, "Unknown ClientType, illegal!") return errors.New("core.server: Unknown ClientType, illegal") } s.connector.Add(connID, conn) - logger.Printf("%s❤️ <%s> [%s](%s) is connected!", ServerLogPrefix, clientType, f.Name, connID) + logger.Printf("%s❤️ <%s> [%s][%s](%s) is connected!", ServerLogPrefix, clientType, f.Name, clientID, connID) return nil } @@ -415,6 +419,28 @@ func (s *Server) handleDataFrame(c *Context) error { return nil } +func (s *Server) handleBackflowFrame(c *Context) error { + f := c.Frame.(*frame.DataFrame) + tag := f.GetDataTag() + carriage := f.GetCarriage() + sourceID := f.SourceID() + // write to source with BackflowFrame + bf := frame.NewBackflowFrame(tag, carriage) + sourceConns := s.connector.GetSourceConns(sourceID, tag) + // conn := s.connector.Get(c.connID) + // logger.Printf("%s♻️ handleBackflowFrame tag:%#v --> source:%s, result=%s", ServerLogPrefix, tag, sourceID, carriage) + for _, source := range sourceConns { + if source != nil { + logger.Debugf("%s♻️ handleBackflowFrame tag:%#v --> source:%s, result=%# x", ServerLogPrefix, tag, sourceID, frame.Shortly(carriage)) + if err := source.Write(bf); err != nil { + logger.Errorf("%s♻️ handleBackflowFrame tag:%#v --> source:%s, error=%v", ServerLogPrefix, tag, sourceID, err) + return err + } + } + } + return nil +} + // StatsFunctions returns the sfn stats of server. func (s *Server) StatsFunctions() map[string]string { return s.connector.GetSnapshot() diff --git a/core/stream_parser.go b/core/stream_parser.go index 349af9c96..29c7dbf0c 100644 --- a/core/stream_parser.go +++ b/core/stream_parser.go @@ -37,6 +37,8 @@ func ParseFrame(stream io.Reader) (frame.Frame, error) { return frame.DecodeToRejectedFrame(buf) case 0x80 | byte(frame.TagOfGoawayFrame): return frame.DecodeToGoawayFrame(buf) + case 0x80 | byte(frame.TagOfBackflowFrame): + return frame.DecodeToBackflowFrame(buf) default: return nil, fmt.Errorf("unknown frame type, buf[0]=%#x", buf[0]) } diff --git a/example/0-basic/Taskfile.yml b/example/0-basic/Taskfile.yml index cf0cfb417..c1404be8d 100644 --- a/example/0-basic/Taskfile.yml +++ b/example/0-basic/Taskfile.yml @@ -7,7 +7,7 @@ output: "prefixed" tasks: run: desc: run - deps: [zipper, source, sfn] + deps: [zipper, source, sfn1, sfn2] cmds: - echo 'basic example run' @@ -18,8 +18,8 @@ tasks: - rm -rf ./bin build: - desc: build source, sfn and zipper - deps: [source-build, sfn-build] + desc: build source, sfns and zipper + deps: [source-build, sfn1-build, sfn2-build] cmds: - echo 'building done' @@ -28,10 +28,10 @@ tasks: cmds: - "go build -o ./bin/source{{exeExt}} source/main.go" - sfn-build: - desc: build sfn + sfn1-build: + desc: build sfn1 cmds: - - "go build -o ./bin/sfn{{exeExt}} sfn/main.go" + - "go build -o ./bin/sfn1{{exeExt}} sfn/main.go" source: desc: run source @@ -41,11 +41,24 @@ tasks: env: YOMO_LOG_LEVEL: error - sfn: - desc: run sfn - deps: [sfn-build] + sfn1: + desc: run sfn1 + deps: [sfn1-build] cmds: - - "./bin/sfn{{exeExt}}" + - "./bin/sfn1{{exeExt}}" + env: + YOMO_LOG_LEVEL: error + + sfn2-build: + desc: build sfn2 + cmds: + - "go build -o ./bin/sfn2{{exeExt}} sfn2/main.go" + + sfn2: + desc: run sfn2 + deps: [sfn2-build] + cmds: + - "./bin/sfn2{{exeExt}}" env: YOMO_LOG_LEVEL: error diff --git a/example/0-basic/sfn/main.go b/example/0-basic/sfn/main.go index d090988f9..6934bb0ed 100644 --- a/example/0-basic/sfn/main.go +++ b/example/0-basic/sfn/main.go @@ -2,6 +2,7 @@ package main import ( "encoding/json" + "fmt" "os" "github.com/yomorun/yomo" @@ -31,12 +32,12 @@ func main() { // start err := sfn.Connect() if err != nil { - logger.Errorf("[sfn] connect err=%v", err) + logger.Errorf("[sfn1] connect err=%v", err) os.Exit(1) } // set the error handler function when server error occurs sfn.SetErrorHandler(func(err error) { - logger.Errorf("[sfn] receive server error: %v", err) + logger.Errorf("[sfn1] receive server error: %v", err) sfn.Close() os.Exit(1) }) @@ -48,10 +49,10 @@ func handler(data []byte) (byte, []byte) { var model noiseData err := json.Unmarshal(data, &model) if err != nil { - logger.Errorf("[sfn] json.Marshal err=%v", err) + logger.Errorf("[sfn1] json.Marshal err=%v", err) os.Exit(-2) } else { - logger.Printf(">> [sfn] got tag=0x33, data=%+v", model) + logger.Printf(">> [sfn1] got tag=0x33, data=%+v", model) } - return 0x0, nil + return 0x34, []byte(fmt.Sprintf("sfn1 processed result: %v", model.Noise)) } diff --git a/example/0-basic/sfn2/main.go b/example/0-basic/sfn2/main.go new file mode 100644 index 000000000..8b6ebbd66 --- /dev/null +++ b/example/0-basic/sfn2/main.go @@ -0,0 +1,45 @@ +package main + +import ( + "bytes" + "os" + + "github.com/yomorun/yomo" + "github.com/yomorun/yomo/pkg/logger" +) + +func main() { + addr := "localhost:9000" + if v := os.Getenv("YOMO_ADDR"); v != "" { + addr = v + } + sfn := yomo.NewStreamFunction( + "Noise2", + yomo.WithZipperAddr(addr), + yomo.WithObserveDataTags(0x34), + ) + defer sfn.Close() + + // set handler + sfn.SetHandler(handler) + // start + err := sfn.Connect() + if err != nil { + logger.Errorf("[sfn2] connect err=%v", err) + os.Exit(1) + } + // set the error handler function when server error occurs + sfn.SetErrorHandler(func(err error) { + logger.Errorf("[sfn2] receive server error: %v", err) + sfn.Close() + os.Exit(1) + }) + + select {} +} + +func handler(data []byte) (byte, []byte) { + logger.Printf(">> [sfn2] got tag=0x34, data=%s", string(data)) + result := bytes.ReplaceAll(data, []byte("sfn1"), []byte("sfn2")) + return 0x0, result +} diff --git a/example/0-basic/source/main.go b/example/0-basic/source/main.go index 08bda2728..d94baa60d 100644 --- a/example/0-basic/source/main.go +++ b/example/0-basic/source/main.go @@ -30,6 +30,7 @@ func main() { "yomo-source", yomo.WithZipperAddr(addr), yomo.WithLogger(logger), + yomo.WithObserveDataTags(0x34, 0x0), ) err := source.Connect() if err != nil { @@ -45,6 +46,10 @@ func main() { logger.Printf("[source] receive server error: %v", err) os.Exit(1) }) + // set receive handler for the observe datatags + source.SetReceiveHandler(func(tag byte, data []byte) { + logger.Printf("[source] ♻️ receive backflow: tag=%#v, data=%v", tag, string(data)) + }) // generate mock data and send it to YoMo-Zipper in every 100 ms. err = generateAndSendData(source) @@ -70,7 +75,8 @@ func generateAndSendData(stream yomo.Source) error { // send data via QUIC stream. _, err = stream.Write(sendingBuf) - if i++; i > 6 { + i++ + if i > 6 { stream.Close() return nil } @@ -83,7 +89,7 @@ func generateAndSendData(stream yomo.Source) error { logger.Printf("[source] ✅ Emit %v to YoMo-Zipper", data) } - time.Sleep(500 * time.Millisecond) + time.Sleep(1000 * time.Millisecond) } } diff --git a/example/0-basic/workflow.yaml b/example/0-basic/workflow.yaml index 7d11e02f0..99ca9a154 100644 --- a/example/0-basic/workflow.yaml +++ b/example/0-basic/workflow.yaml @@ -3,3 +3,4 @@ host: localhost port: 9000 functions: - name: Noise + - name: Noise2 diff --git a/pkg/id/id.go b/pkg/id/id.go new file mode 100644 index 000000000..a1edeab1d --- /dev/null +++ b/pkg/id/id.go @@ -0,0 +1,16 @@ +package id + +import ( + gonanoid "github.com/matoous/go-nanoid/v2" + "github.com/yomorun/yomo/pkg/logger" +) + +// New generate id +func New() string { + id, err := gonanoid.New() + if err != nil { + logger.Errorf("generated id err=%v", err) + return "" + } + return id +} diff --git a/sfn.go b/sfn.go index 9846df99d..5ca0489cb 100644 --- a/sfn.go +++ b/sfn.go @@ -157,6 +157,8 @@ func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) { frame := frame.NewDataFrame() // reuse transactionID frame.SetTransactionID(metaFrame.TransactionID()) + // reuse sourceID + frame.SetSourceID(metaFrame.SourceID()) frame.SetCarriage(tag, resp) s.client.WriteFrame(frame) } diff --git a/source.go b/source.go index 78b252348..4883588a0 100644 --- a/source.go +++ b/source.go @@ -25,6 +25,8 @@ type Source interface { WriteWithTag(tag uint8, data []byte) error // SetErrorHandler set the error handler function when server error occurs SetErrorHandler(fn func(err error)) + // [Experimental] SetReceiveHandler set the observe handler function + SetReceiveHandler(fn func(tag byte, data []byte)) } // YoMo-Source @@ -33,6 +35,7 @@ type yomoSource struct { zipperEndpoint string client *core.Client tag uint8 + fn func(byte, []byte) } var _ Source = &yomoSource{} @@ -71,6 +74,13 @@ func (s *yomoSource) Close() error { // Connect to YoMo-Zipper. func (s *yomoSource) Connect() error { + // set backflowframe handler + s.client.SetBackflowFrameObserver(func(frm *frame.BackflowFrame) { + if s.fn != nil { + s.fn(frm.GetDataTag(), frm.GetCarriage()) + } + }) + err := s.client.Connect(context.Background(), s.zipperEndpoint) if err != nil { s.client.Logger().Errorf("%sConnect() error: %s", sourceLogPrefix, err) @@ -80,13 +90,21 @@ func (s *yomoSource) Connect() error { // WriteWithTag will write data with specified tag, default transactionID is epoch time. func (s *yomoSource) WriteWithTag(tag uint8, data []byte) error { - s.client.Logger().Debugf("%sWriteWithTag: len(data)=%d, data=%# x", sourceLogPrefix, len(data), frame.Shortly(data)) - frame := frame.NewDataFrame() - frame.SetCarriage(byte(tag), data) - return s.client.WriteFrame(frame) + f := frame.NewDataFrame() + f.SetCarriage(byte(tag), data) + f.SetSourceID(s.client.ClientID()) + s.client.Logger().Debugf("%sWriteWithTag: tid=%s, source_id=%s, data[%d]=%# x", + sourceLogPrefix, f.TransactionID(), f.SourceID(), len(data), frame.Shortly(data)) + return s.client.WriteFrame(f) } // SetErrorHandler set the error handler function when server error occurs func (s *yomoSource) SetErrorHandler(fn func(err error)) { s.client.SetErrorHandler(fn) } + +// [Experimental] SetReceiveHandler set the observe handler function +func (s *yomoSource) SetReceiveHandler(fn func(byte, []byte)) { + s.fn = fn + s.client.Logger().Debugf("%sSetReceiveHandler(%v)", sourceLogPrefix, s.fn) +}