From 0cb2823414ba65356421255520552f89bc3633ff Mon Sep 17 00:00:00 2001 From: venjiang Date: Tue, 26 Apr 2022 19:13:00 +0800 Subject: [PATCH 01/18] plan a test --- core/client.go | 22 ++++++++++------- core/connector.go | 20 +++++++++++++++- core/server.go | 25 ++++++++++++++++++++ example/0-basic/Taskfile.yml | 13 ++++++++++ example/0-basic/sfn/main.go | 2 +- example/0-basic/sfn2/main.go | 43 ++++++++++++++++++++++++++++++++++ example/0-basic/source/main.go | 16 +++++++------ example/0-basic/workflow.yaml | 1 + sfn.go | 16 +++++++++---- source.go | 11 +++++++++ 10 files changed, 147 insertions(+), 22 deletions(-) create mode 100644 example/0-basic/sfn2/main.go diff --git a/core/client.go b/core/client.go index 10b47e87f..4d6ce82b0 100644 --- a/core/client.go +++ b/core/client.go @@ -27,13 +27,13 @@ type ConnState = string // Client is the abstraction of a YoMo-Client. a YoMo-Client can be // Source, Upstream Zipper or StreamFunction. type Client struct { - name string // name of the client - clientType ClientType // type of the connection - conn quic.Connection // quic connection - stream quic.Stream // quic stream - state ConnState // state of the connection - processor func(*frame.DataFrame) // functions to invoke when data arrived - addr string // the address of server connected to + name string // name of the client + clientType ClientType // type of the connection + conn quic.Connection // quic connection + stream quic.Stream // quic stream + state ConnState // state of the connection + processor func(*frame.DataFrame, ...func(data []byte)) // functions to invoke when data arrived + addr string // the address of server connected to mu sync.Mutex opts ClientOptions localAddr string // client local addr, it will be changed on reconnect @@ -222,7 +222,7 @@ func (c *Client) handleFrame() { } else { // TODO: should c.processor accept a DataFrame as parameter? // c.processor(v.GetDataTagID(), v.GetCarriage(), v.GetMetaFrame()) - c.processor(v) + c.processor(v, c.callbacks...) } } default: @@ -323,11 +323,15 @@ func (c *Client) setLocalAddr(addr string) { } // SetDataFrameObserver sets the data frame handler. -func (c *Client) SetDataFrameObserver(fn func(*frame.DataFrame)) { +func (c *Client) SetDataFrameObserver(fn func(*frame.DataFrame, ...func([]byte))) { c.processor = fn c.logger.Debugf("%sSetDataFrameObserver(%v)", ClientLogPrefix, c.processor) } +func (c *Client) SetDataFrameCallbacks(callbacks ...func(data []byte)) { + c.callbacks = callbacks +} + // reconnect the connection between client and server. func (c *Client) reconnect(ctx context.Context, addr string) { t := time.NewTicker(1 * time.Second) diff --git a/core/connector.go b/core/connector.go index aa180a0a7..59ea742e3 100644 --- a/core/connector.go +++ b/core/connector.go @@ -18,16 +18,22 @@ type Connector interface { Get(connID string) Connection // GetSnapshot gets the snapshot of all connections. GetSnapshot() map[string]string + GetSourceConnIDs(tags byte) []string + LinkSource(connID string, name string, observed []byte) // Clean the connector. Clean() } type connector struct { conns sync.Map + sources sync.Map } func newConnector() Connector { - return &connector{conns: sync.Map{}} + return &connector{ + conns sync.Map + sources sync.Map + } } // Add a connection. @@ -36,10 +42,16 @@ func (c *connector) Add(connID string, conn Connection) { c.conns.Store(connID, conn) } +// func (c *connector) AddSource(connID string, stream io.ReadWriteCloser) { +// logger.Debugf("%sconnector add: connID=%s", ServerLogPrefix, connID) +// c.sconns.Store(connID, stream) +// } + // Remove a connection. func (c *connector) Remove(connID string) { logger.Debugf("%sconnector remove: connID=%s", ServerLogPrefix, connID) c.conns.Delete(connID) + c.sources.Delete(connID) } // Get a connection by connection id. @@ -63,6 +75,12 @@ func (c *connector) GetSnapshot() map[string]string { return result } +// LinkSource links the source and connection. +func (c *connector) LinkSource(connID string, name string, observed []byte) { + logger.Debugf("%sconnector link source: connID[%s] --> source[%s]", ServerLogPrefix, connID, name) + c.sources.Store(connID, &app{name, observed}) +} + // Clean the connector. func (c *connector) Clean() { c.conns = sync.Map{} diff --git a/core/server.go b/core/server.go index 8c4510bdc..5796f900a 100644 --- a/core/server.go +++ b/core/server.go @@ -408,6 +408,31 @@ func (s *Server) handleDataFrame(c *Context) error { return nil } +func (s *Server) handleObserver(c *Context) error { + f := c.Frame.(*frame.DataFrame) + // write to source + // TODO: 回流 + tag := f.GetDataTag() + sourceIDs := s.connector.GetSourceConnIDs(tag) + for _, sourceID := range sourceIDs { + // if targetStream != nil { + result :=f.GetCarriage() + // sfn := s.connector.Get("127.0.0.1:53697") + // if sfn != nil { + // // fs := NewFrameStream(c.Stream) + // // f,err:=fs.ReadFrame() + // // if err!=nil{ + // // logger.Errorf("解析frame出错") + // // return err + // result = string(f.GetCarriage()) + // // } + // } + logger.Printf("[回流] tag:%# v -> source:%s, result=%s", tag, sourceID, string(result)) + // } + } + return nil +} + // StatsFunctions returns the sfn stats of server. func (s *Server) StatsFunctions() map[string]string { return s.connector.GetSnapshot() diff --git a/example/0-basic/Taskfile.yml b/example/0-basic/Taskfile.yml index cf0cfb417..9f1b27c63 100644 --- a/example/0-basic/Taskfile.yml +++ b/example/0-basic/Taskfile.yml @@ -49,6 +49,19 @@ tasks: env: YOMO_LOG_LEVEL: error + sfn2-build: + desc: build sfn2 + cmds: + - "go build -o ./bin/sfn2{{exeExt}} sfn2/main.go" + + sfn2: + desc: run sfn2 + deps: [sfn2-build] + cmds: + - "./bin/sfn2{{exeExt}}" + env: + YOMO_LOG_LEVEL: error + zipper: desc: run zipper cmds: diff --git a/example/0-basic/sfn/main.go b/example/0-basic/sfn/main.go index d090988f9..7b263e361 100644 --- a/example/0-basic/sfn/main.go +++ b/example/0-basic/sfn/main.go @@ -53,5 +53,5 @@ func handler(data []byte) (byte, []byte) { } else { logger.Printf(">> [sfn] got tag=0x33, data=%+v", model) } - return 0x0, nil + return 0x34, []byte("sfn-1 处理结果") } diff --git a/example/0-basic/sfn2/main.go b/example/0-basic/sfn2/main.go new file mode 100644 index 000000000..3f3fc4bb1 --- /dev/null +++ b/example/0-basic/sfn2/main.go @@ -0,0 +1,43 @@ +package main + +import ( + "os" + + "github.com/yomorun/yomo" + "github.com/yomorun/yomo/pkg/logger" +) + +func main() { + addr := "localhost:9000" + if v := os.Getenv("YOMO_ADDR"); v != "" { + addr = v + } + sfn := yomo.NewStreamFunction( + "Noise2", + yomo.WithZipperAddr(addr), + yomo.WithObserveDataTags(0x34), + ) + defer sfn.Close() + + // set handler + sfn.SetHandler(handler) + // start + err := sfn.Connect() + if err != nil { + logger.Errorf("[sfn-2] connect err=%v", err) + os.Exit(1) + } + // set the error handler function when server error occurs + sfn.SetErrorHandler(func(err error) { + logger.Errorf("[sfn-2] receive server error: %v", err) + sfn.Close() + os.Exit(1) + }) + + select {} +} + +func handler(data []byte) (byte, []byte) { + logger.Printf(">> [sfn-2] got tag=0x34, data=%s", string(data)) + return 0x35, []byte("sfn-2 处理结果") +} diff --git a/example/0-basic/source/main.go b/example/0-basic/source/main.go index 89367c64d..a2e332413 100644 --- a/example/0-basic/source/main.go +++ b/example/0-basic/source/main.go @@ -30,6 +30,7 @@ func main() { "yomo-source", yomo.WithZipperAddr(addr), yomo.WithLogger(logger), + yomo.WithObserveDataTags(0x34), ) err := source.Connect() if err != nil { @@ -70,13 +71,7 @@ func generateAndSendData(stream yomo.Source) error { // send data via QUIC stream. _, err = stream.Write(sendingBuf) - if i++; i > 6 { - stream.Close() - return nil - // logger.Printf("[source] send GoawayFrame") - // goawayFrame := frame.NewGoawayFrame("客户端发送Goaway") - // stream.WriteFrame(goawayFrame) - } + i++ if err != nil { logger.Errorf("[source] ❌ Emit %v to YoMo-Zipper failure with err: %v", data, err) time.Sleep(500 * time.Millisecond) @@ -85,6 +80,13 @@ func generateAndSendData(stream yomo.Source) error { } else { logger.Printf("[source] ✅ Emit %v to YoMo-Zipper", data) } + if i > 6 { + stream.Close() + return nil + // logger.Printf("[source] send GoawayFrame") + // goawayFrame := frame.NewGoawayFrame("客户端发送Goaway") + // stream.WriteFrame(goawayFrame) + } time.Sleep(500 * time.Millisecond) } diff --git a/example/0-basic/workflow.yaml b/example/0-basic/workflow.yaml index 7d11e02f0..99ca9a154 100644 --- a/example/0-basic/workflow.yaml +++ b/example/0-basic/workflow.yaml @@ -3,3 +3,4 @@ host: localhost port: 9000 functions: - name: Noise + - name: Noise2 diff --git a/sfn.go b/sfn.go index 9846df99d..468886db5 100644 --- a/sfn.go +++ b/sfn.go @@ -83,9 +83,9 @@ func (s *streamFunction) SetPipeHandler(fn core.PipeHandler) error { func (s *streamFunction) Connect() error { s.client.Logger().Debugf("%s Connect()", streamFunctionLogPrefix) // notify underlying network operations, when data with tag we observed arrived, invoke the func - s.client.SetDataFrameObserver(func(data *frame.DataFrame) { + s.client.SetDataFrameObserver(func(data *frame.DataFrame, callbacks ...func(data []byte)) { s.client.Logger().Debugf("%sreceive DataFrame, tag=%# x, carraige=%# x", streamFunctionLogPrefix, data.Tag(), data.GetCarriage()) - s.onDataFrame(data.GetCarriage(), data.GetMetaFrame()) + s.onDataFrame(data.GetCarriage(), data.GetMetaFrame(), callbacks...) }) if s.pfn != nil { @@ -140,7 +140,7 @@ func (s *streamFunction) Close() error { } // when DataFrame we observed arrived, invoke the user's function -func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) { +func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame, callbacks ...func(data []byte)) { s.client.Logger().Infof("%sonDataFrame ->[%s]", streamFunctionLogPrefix, s.name) if s.fn != nil { @@ -158,7 +158,15 @@ func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) { // reuse transactionID frame.SetTransactionID(metaFrame.TransactionID()) frame.SetCarriage(tag, resp) - s.client.WriteFrame(frame) + err := s.client.WriteFrame(frame) + // TODO: sfn 处理完成后,处理回流 + if err != nil { + // TODO: 回流 + for _, callback := range callbacks { + callback(resp) + } + } + } }() } else if s.pfn != nil { diff --git a/source.go b/source.go index e9ded49b4..ee60088ab 100644 --- a/source.go +++ b/source.go @@ -27,6 +27,8 @@ type Source interface { SetErrorHandler(fn func(err error)) // WriteFrame writes a frame to the connection WriteFrame(frm frame.Frame) error + // SetObserveHandler set the observe handler function + SetObserveHandler(fn func(data []byte)) error } // YoMo-Source @@ -35,6 +37,7 @@ type yomoSource struct { zipperEndpoint string client *core.Client tag uint8 + fn core.AsyncHandler } var _ Source = &yomoSource{} @@ -97,3 +100,11 @@ func (s *yomoSource) SetErrorHandler(fn func(err error)) { func (s *yomoSource) WriteFrame(frm frame.Frame) error { return s.client.WriteFrame(frm) } + +// SetObserveHandler set the observe handler function +func (s *yomoSource) SetObserveHandler(fn func(data []byte)) error { + // s.fn = fn + // s.client.Logger().Debugf("%sSetObserveHandler(%v)", sourceLogPrefix, s.fn) + // s.client.SetDataFrameObserver(fn) + return nil +} From ccb98432e66f2d0d1dc806b5b188405bfd13a40b Mon Sep 17 00:00:00 2001 From: venjiang Date: Wed, 27 Apr 2022 00:06:29 +0800 Subject: [PATCH 02/18] use backflow frame --- core/client.go | 35 +++++++++++------ core/frame/backflow_frame.go | 69 ++++++++++++++++++++++++++++++++++ core/frame/frame.go | 3 +- core/server.go | 28 ++++++-------- core/stream_parser.go | 2 + example/0-basic/sfn/main.go | 3 +- example/0-basic/source/main.go | 19 ++++++---- sfn.go | 16 ++------ source.go | 25 +++++++----- 9 files changed, 141 insertions(+), 59 deletions(-) create mode 100644 core/frame/backflow_frame.go diff --git a/core/client.go b/core/client.go index 4d6ce82b0..e9daa9d2b 100644 --- a/core/client.go +++ b/core/client.go @@ -27,13 +27,14 @@ type ConnState = string // Client is the abstraction of a YoMo-Client. a YoMo-Client can be // Source, Upstream Zipper or StreamFunction. type Client struct { - name string // name of the client - clientType ClientType // type of the connection - conn quic.Connection // quic connection - stream quic.Stream // quic stream - state ConnState // state of the connection - processor func(*frame.DataFrame, ...func(data []byte)) // functions to invoke when data arrived - addr string // the address of server connected to + name string // name of the client + clientType ClientType // type of the connection + conn quic.Connection // quic connection + stream quic.Stream // quic stream + state ConnState // state of the connection + processor func(*frame.DataFrame) // functions to invoke when data arrived + receiver func(*frame.BackflowFrame) // functions to invoke when data is processed + addr string // the address of server connected to mu sync.Mutex opts ClientOptions localAddr string // client local addr, it will be changed on reconnect @@ -222,7 +223,17 @@ func (c *Client) handleFrame() { } else { // TODO: should c.processor accept a DataFrame as parameter? // c.processor(v.GetDataTagID(), v.GetCarriage(), v.GetMetaFrame()) - c.processor(v, c.callbacks...) + c.processor(v) + } + } + case frame.TagOfBackflowFrame: + if v, ok := f.(*frame.BackflowFrame); ok { + c.setState(ConnStateTransportData) + c.logger.Debugf("%sreceive BackflowFrame, tag=%# x, carry=%# x", ClientLogPrefix, v.GetDataTag(), v.GetCarriage()) + if c.receiver == nil { + c.logger.Warnf("%sreceiver is nil", ClientLogPrefix) + } else { + c.receiver(v) } } default: @@ -323,13 +334,15 @@ func (c *Client) setLocalAddr(addr string) { } // SetDataFrameObserver sets the data frame handler. -func (c *Client) SetDataFrameObserver(fn func(*frame.DataFrame, ...func([]byte))) { +func (c *Client) SetDataFrameObserver(fn func(*frame.DataFrame)) { c.processor = fn c.logger.Debugf("%sSetDataFrameObserver(%v)", ClientLogPrefix, c.processor) } -func (c *Client) SetDataFrameCallbacks(callbacks ...func(data []byte)) { - c.callbacks = callbacks +// SetBackflowFrameObserver sets the backflow frame handler. +func (c *Client) SetBackflowFrameObserver(fn func(*frame.BackflowFrame)) { + c.receiver = fn + c.logger.Debugf("%sSetBackflowFrameObserver(%v)", ClientLogPrefix, c.receiver) } // reconnect the connection between client and server. diff --git a/core/frame/backflow_frame.go b/core/frame/backflow_frame.go new file mode 100644 index 000000000..f394b910a --- /dev/null +++ b/core/frame/backflow_frame.go @@ -0,0 +1,69 @@ +package frame + +import ( + "github.com/yomorun/y3" +) + +// BackflowFrame is a Y3 encoded bytes +type BackflowFrame struct { + Tag byte + Carriage []byte +} + +// NewBackflowPayloadFrame creates a new PayloadFrame with a given TagID of user's data +func NewBackflowFrame(tag byte,carriage []byte) *BackflowFrame { + return &BackflowFrame{ + Tag: tag, + Carriage:carriage, + } +} + +// Type gets the type of Frame. +func (f *BackflowFrame) Type() Type { + return TagOfBackflowFrame +} + +// SetCarriage sets the user's raw data +func (f *BackflowFrame) SetCarriage(buf []byte) *BackflowFrame { + f.Carriage = buf + return f +} + +// Encode to Y3 encoded bytes +func (f *BackflowFrame) Encode() []byte { + carriage := y3.NewPrimitivePacketEncoder(f.Tag) + carriage.SetBytesValue(f.Carriage) + + node := y3.NewNodePacketEncoder(byte(TagOfBackflowFrame)) + node.AddPrimitivePacket(carriage) + + return node.Encode() +} + +// GetDataTag return the Tag of user's data +func (f *BackflowFrame) GetDataTag() byte { + return f.Tag +} + +// GetCarriage return data +func (f *BackflowFrame) GetCarriage() []byte { + return f.Carriage +} + +// DecodeToBackflowFrame decodes Y3 encoded bytes to BackflowFrame +func DecodeToBackflowFrame(buf []byte) (*BackflowFrame, error) { + nodeBlock := y3.NodePacket{} + _, err := y3.DecodeToNodePacket(buf, &nodeBlock) + if err != nil { + return nil, err + } + + payload := &BackflowFrame{} + for _, v := range nodeBlock.PrimitivePackets { + payload.Tag = v.SeqID() + payload.Carriage = v.GetValBuf() + break + } + + return payload, nil +} diff --git a/core/frame/frame.go b/core/frame/frame.go index 63b3df642..96073f999 100644 --- a/core/frame/frame.go +++ b/core/frame/frame.go @@ -17,7 +17,8 @@ const ( TagOfMetadata Type = 0x03 TagOfTransactionID Type = 0x01 // PayloadFrame of DataFrame - TagOfPayloadFrame Type = 0x2E + TagOfPayloadFrame Type = 0x2E + TagOfBackflowFrame Type = 0x2D TagOfTokenFrame Type = 0x3E // HandshakeFrame diff --git a/core/server.go b/core/server.go index 5796f900a..d7e1220c0 100644 --- a/core/server.go +++ b/core/server.go @@ -266,6 +266,8 @@ func (s *Server) mainFrameHandler(c *Context) error { if conn != nil && conn.ClientType() == ClientTypeSource { f := c.Frame.(*frame.DataFrame) f.GetMetaFrame().SetMetadata(conn.Metadata().Encode()) + // observe datatags backflow + s.handleBackflowFrame(c) s.dispatchToDownstreams(f) } } @@ -408,27 +410,19 @@ func (s *Server) handleDataFrame(c *Context) error { return nil } -func (s *Server) handleObserver(c *Context) error { +func (s *Server) handleBackflowFrame(c *Context) error { f := c.Frame.(*frame.DataFrame) - // write to source - // TODO: 回流 tag := f.GetDataTag() + carriage := f.GetCarriage() + // write to source with BackflowFrame + bf := frame.NewBackflowFrame(tag, carriage) sourceIDs := s.connector.GetSourceConnIDs(tag) for _, sourceID := range sourceIDs { - // if targetStream != nil { - result :=f.GetCarriage() - // sfn := s.connector.Get("127.0.0.1:53697") - // if sfn != nil { - // // fs := NewFrameStream(c.Stream) - // // f,err:=fs.ReadFrame() - // // if err!=nil{ - // // logger.Errorf("解析frame出错") - // // return err - // result = string(f.GetCarriage()) - // // } - // } - logger.Printf("[回流] tag:%# v -> source:%s, result=%s", tag, sourceID, string(result)) - // } + logger.Debugf("%shandleBackflowFrame tag:%# v -> source:%s, result=%# x", ServerLogPrefix, tag, sourceID, frame.Shortly(carriage)) + source := s.connector.Get(sourceID) + if source != nil { + source.Write(bf.Encode()) + } } return nil } diff --git a/core/stream_parser.go b/core/stream_parser.go index 349af9c96..29c7dbf0c 100644 --- a/core/stream_parser.go +++ b/core/stream_parser.go @@ -37,6 +37,8 @@ func ParseFrame(stream io.Reader) (frame.Frame, error) { return frame.DecodeToRejectedFrame(buf) case 0x80 | byte(frame.TagOfGoawayFrame): return frame.DecodeToGoawayFrame(buf) + case 0x80 | byte(frame.TagOfBackflowFrame): + return frame.DecodeToBackflowFrame(buf) default: return nil, fmt.Errorf("unknown frame type, buf[0]=%#x", buf[0]) } diff --git a/example/0-basic/sfn/main.go b/example/0-basic/sfn/main.go index 7b263e361..20b316211 100644 --- a/example/0-basic/sfn/main.go +++ b/example/0-basic/sfn/main.go @@ -2,6 +2,7 @@ package main import ( "encoding/json" + "fmt" "os" "github.com/yomorun/yomo" @@ -53,5 +54,5 @@ func handler(data []byte) (byte, []byte) { } else { logger.Printf(">> [sfn] got tag=0x33, data=%+v", model) } - return 0x34, []byte("sfn-1 处理结果") + return 0x34, []byte(fmt.Sprintf("sfn-1 处理结果: %v", model.Noise)) } diff --git a/example/0-basic/source/main.go b/example/0-basic/source/main.go index a2e332413..0efd65565 100644 --- a/example/0-basic/source/main.go +++ b/example/0-basic/source/main.go @@ -30,7 +30,7 @@ func main() { "yomo-source", yomo.WithZipperAddr(addr), yomo.WithLogger(logger), - yomo.WithObserveDataTags(0x34), + yomo.WithObserveDataTags(0x34, 0x35), ) err := source.Connect() if err != nil { @@ -46,6 +46,9 @@ func main() { logger.Printf("[source] receive server error: %v", err) os.Exit(1) }) + source.SetReceiverHandler(func(tag byte, data []byte) { + logger.Printf("[source] receive backflow: tag=%#v, data=%v", tag, string(data)) + }) // generate mock data and send it to YoMo-Zipper in every 100 ms. err = generateAndSendData(source) @@ -80,13 +83,13 @@ func generateAndSendData(stream yomo.Source) error { } else { logger.Printf("[source] ✅ Emit %v to YoMo-Zipper", data) } - if i > 6 { - stream.Close() - return nil - // logger.Printf("[source] send GoawayFrame") - // goawayFrame := frame.NewGoawayFrame("客户端发送Goaway") - // stream.WriteFrame(goawayFrame) - } + // if i > 6 { + // stream.Close() + // return nil + // // logger.Printf("[source] send GoawayFrame") + // // goawayFrame := frame.NewGoawayFrame("客户端发送Goaway") + // // stream.WriteFrame(goawayFrame) + // } time.Sleep(500 * time.Millisecond) } diff --git a/sfn.go b/sfn.go index 468886db5..9846df99d 100644 --- a/sfn.go +++ b/sfn.go @@ -83,9 +83,9 @@ func (s *streamFunction) SetPipeHandler(fn core.PipeHandler) error { func (s *streamFunction) Connect() error { s.client.Logger().Debugf("%s Connect()", streamFunctionLogPrefix) // notify underlying network operations, when data with tag we observed arrived, invoke the func - s.client.SetDataFrameObserver(func(data *frame.DataFrame, callbacks ...func(data []byte)) { + s.client.SetDataFrameObserver(func(data *frame.DataFrame) { s.client.Logger().Debugf("%sreceive DataFrame, tag=%# x, carraige=%# x", streamFunctionLogPrefix, data.Tag(), data.GetCarriage()) - s.onDataFrame(data.GetCarriage(), data.GetMetaFrame(), callbacks...) + s.onDataFrame(data.GetCarriage(), data.GetMetaFrame()) }) if s.pfn != nil { @@ -140,7 +140,7 @@ func (s *streamFunction) Close() error { } // when DataFrame we observed arrived, invoke the user's function -func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame, callbacks ...func(data []byte)) { +func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) { s.client.Logger().Infof("%sonDataFrame ->[%s]", streamFunctionLogPrefix, s.name) if s.fn != nil { @@ -158,15 +158,7 @@ func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame, ca // reuse transactionID frame.SetTransactionID(metaFrame.TransactionID()) frame.SetCarriage(tag, resp) - err := s.client.WriteFrame(frame) - // TODO: sfn 处理完成后,处理回流 - if err != nil { - // TODO: 回流 - for _, callback := range callbacks { - callback(resp) - } - } - + s.client.WriteFrame(frame) } }() } else if s.pfn != nil { diff --git a/source.go b/source.go index ee60088ab..ce03a3844 100644 --- a/source.go +++ b/source.go @@ -27,8 +27,10 @@ type Source interface { SetErrorHandler(fn func(err error)) // WriteFrame writes a frame to the connection WriteFrame(frm frame.Frame) error - // SetObserveHandler set the observe handler function - SetObserveHandler(fn func(data []byte)) error + // SetReceiverHandler set the observe handler function + SetReceiverHandler(fn func(tag byte, data []byte)) + // Read reads up to len(p) bytes into p. It returns the number of bytes + // Read(p []byte) (n int, err error) } // YoMo-Source @@ -37,7 +39,7 @@ type yomoSource struct { zipperEndpoint string client *core.Client tag uint8 - fn core.AsyncHandler + fn func(byte, []byte) } var _ Source = &yomoSource{} @@ -76,6 +78,13 @@ func (s *yomoSource) Close() error { // Connect to YoMo-Zipper. func (s *yomoSource) Connect() error { + // set backflowframe handler + s.client.SetBackflowFrameObserver(func(frm *frame.BackflowFrame) { + if s.fn != nil { + s.fn(frm.GetDataTag(), frm.GetCarriage()) + } + }) + err := s.client.Connect(context.Background(), s.zipperEndpoint) if err != nil { s.client.Logger().Errorf("%sConnect() error: %s", sourceLogPrefix, err) @@ -101,10 +110,8 @@ func (s *yomoSource) WriteFrame(frm frame.Frame) error { return s.client.WriteFrame(frm) } -// SetObserveHandler set the observe handler function -func (s *yomoSource) SetObserveHandler(fn func(data []byte)) error { - // s.fn = fn - // s.client.Logger().Debugf("%sSetObserveHandler(%v)", sourceLogPrefix, s.fn) - // s.client.SetDataFrameObserver(fn) - return nil +// SetReceiverHandler set the receiver handler function +func (s *yomoSource) SetReceiverHandler(fn func(byte, []byte)) { + s.fn = fn + s.client.Logger().Debugf("%sSetObserveHandler(%v)", sourceLogPrefix, s.fn) } From 390e85f60b603109d839a144509d17a83e940913 Mon Sep 17 00:00:00 2001 From: venjiang Date: Wed, 27 Apr 2022 12:47:40 +0800 Subject: [PATCH 03/18] add comments, modify taskfile --- core/client.go | 6 +++--- core/connector.go | 2 ++ core/core.go | 1 + core/frame/frame.go | 2 ++ core/server.go | 9 +++++++-- example/0-basic/Taskfile.yml | 20 ++++++++++---------- example/0-basic/sfn/main.go | 10 +++++----- example/0-basic/sfn2/main.go | 8 ++++---- example/0-basic/source/main.go | 2 +- source.go | 10 ++++------ 10 files changed, 39 insertions(+), 31 deletions(-) diff --git a/core/client.go b/core/client.go index e9daa9d2b..34a330060 100644 --- a/core/client.go +++ b/core/client.go @@ -217,7 +217,7 @@ func (c *Client) handleFrame() { case frame.TagOfDataFrame: // DataFrame carries user's data if v, ok := f.(*frame.DataFrame); ok { c.setState(ConnStateTransportData) - c.logger.Debugf("%sreceive DataFrame, tag=%# x, tid=%s, carry=%# x", ClientLogPrefix, v.GetDataTag(), v.TransactionID(), v.GetCarriage()) + c.logger.Debugf("%sreceive DataFrame, tag=%#x, tid=%s, carry=%# x", ClientLogPrefix, v.GetDataTag(), v.TransactionID(), v.GetCarriage()) if c.processor == nil { c.logger.Warnf("%sprocessor is nil", ClientLogPrefix) } else { @@ -228,8 +228,8 @@ func (c *Client) handleFrame() { } case frame.TagOfBackflowFrame: if v, ok := f.(*frame.BackflowFrame); ok { - c.setState(ConnStateTransportData) - c.logger.Debugf("%sreceive BackflowFrame, tag=%# x, carry=%# x", ClientLogPrefix, v.GetDataTag(), v.GetCarriage()) + c.setState(ConnStateBackflow) + c.logger.Debugf("%sreceive BackflowFrame, tag=%#x, carry=%# x", ClientLogPrefix, v.GetDataTag(), v.GetCarriage()) if c.receiver == nil { c.logger.Warnf("%sreceiver is nil", ClientLogPrefix) } else { diff --git a/core/connector.go b/core/connector.go index 59ea742e3..9ff19192a 100644 --- a/core/connector.go +++ b/core/connector.go @@ -18,7 +18,9 @@ type Connector interface { Get(connID string) Connection // GetSnapshot gets the snapshot of all connections. GetSnapshot() map[string]string + // GetSourceConnIDs gets the connection ids by source observe tag. GetSourceConnIDs(tags byte) []string + // LinkSource links the source and connection. LinkSource(connID string, name string, observed []byte) // Clean the connector. Clean() diff --git a/core/core.go b/core/core.go index 664db9de7..4dc282fa6 100644 --- a/core/core.go +++ b/core/core.go @@ -25,6 +25,7 @@ const ( ConnStateAborted ConnState = "Aborted" ConnStateClosed ConnState = "Closed" // close connection by server ConnStateGoaway ConnState = "Goaway" + ConnStateBackflow ConnState = "Backflow" ) // Prefix is the prefix for logger. diff --git a/core/frame/frame.go b/core/frame/frame.go index 96073f999..38f2a3ffa 100644 --- a/core/frame/frame.go +++ b/core/frame/frame.go @@ -72,6 +72,8 @@ func (f Type) String() string { return "RejectedFrame" case TagOfGoawayFrame: return "GoawayFrame" + case TagOfBackflowFrame: + return "BackflowFrame" case TagOfMetaFrame: return "MetaFrame" case TagOfPayloadFrame: diff --git a/core/server.go b/core/server.go index d7e1220c0..049f05d93 100644 --- a/core/server.go +++ b/core/server.go @@ -418,10 +418,15 @@ func (s *Server) handleBackflowFrame(c *Context) error { bf := frame.NewBackflowFrame(tag, carriage) sourceIDs := s.connector.GetSourceConnIDs(tag) for _, sourceID := range sourceIDs { - logger.Debugf("%shandleBackflowFrame tag:%# v -> source:%s, result=%# x", ServerLogPrefix, tag, sourceID, frame.Shortly(carriage)) + logger.Debugf("%shandleBackflowFrame tag:%#v --> source:%s, result=%# x", ServerLogPrefix, tag, sourceID, frame.Shortly(carriage)) + // get source's quic.Stream source := s.connector.Get(sourceID) if source != nil { - source.Write(bf.Encode()) + _, err := source.Write(bf.Encode()) + if err != nil { + logger.Errorf("%shandleBackflowFrame tag:%#v --> source:%s, error=%v", ServerLogPrefix, tag, sourceID, err) + return err + } } } return nil diff --git a/example/0-basic/Taskfile.yml b/example/0-basic/Taskfile.yml index 9f1b27c63..c1404be8d 100644 --- a/example/0-basic/Taskfile.yml +++ b/example/0-basic/Taskfile.yml @@ -7,7 +7,7 @@ output: "prefixed" tasks: run: desc: run - deps: [zipper, source, sfn] + deps: [zipper, source, sfn1, sfn2] cmds: - echo 'basic example run' @@ -18,8 +18,8 @@ tasks: - rm -rf ./bin build: - desc: build source, sfn and zipper - deps: [source-build, sfn-build] + desc: build source, sfns and zipper + deps: [source-build, sfn1-build, sfn2-build] cmds: - echo 'building done' @@ -28,10 +28,10 @@ tasks: cmds: - "go build -o ./bin/source{{exeExt}} source/main.go" - sfn-build: - desc: build sfn + sfn1-build: + desc: build sfn1 cmds: - - "go build -o ./bin/sfn{{exeExt}} sfn/main.go" + - "go build -o ./bin/sfn1{{exeExt}} sfn/main.go" source: desc: run source @@ -41,11 +41,11 @@ tasks: env: YOMO_LOG_LEVEL: error - sfn: - desc: run sfn - deps: [sfn-build] + sfn1: + desc: run sfn1 + deps: [sfn1-build] cmds: - - "./bin/sfn{{exeExt}}" + - "./bin/sfn1{{exeExt}}" env: YOMO_LOG_LEVEL: error diff --git a/example/0-basic/sfn/main.go b/example/0-basic/sfn/main.go index 20b316211..6934bb0ed 100644 --- a/example/0-basic/sfn/main.go +++ b/example/0-basic/sfn/main.go @@ -32,12 +32,12 @@ func main() { // start err := sfn.Connect() if err != nil { - logger.Errorf("[sfn] connect err=%v", err) + logger.Errorf("[sfn1] connect err=%v", err) os.Exit(1) } // set the error handler function when server error occurs sfn.SetErrorHandler(func(err error) { - logger.Errorf("[sfn] receive server error: %v", err) + logger.Errorf("[sfn1] receive server error: %v", err) sfn.Close() os.Exit(1) }) @@ -49,10 +49,10 @@ func handler(data []byte) (byte, []byte) { var model noiseData err := json.Unmarshal(data, &model) if err != nil { - logger.Errorf("[sfn] json.Marshal err=%v", err) + logger.Errorf("[sfn1] json.Marshal err=%v", err) os.Exit(-2) } else { - logger.Printf(">> [sfn] got tag=0x33, data=%+v", model) + logger.Printf(">> [sfn1] got tag=0x33, data=%+v", model) } - return 0x34, []byte(fmt.Sprintf("sfn-1 处理结果: %v", model.Noise)) + return 0x34, []byte(fmt.Sprintf("sfn1 processed result: %v", model.Noise)) } diff --git a/example/0-basic/sfn2/main.go b/example/0-basic/sfn2/main.go index 3f3fc4bb1..4df2dc9aa 100644 --- a/example/0-basic/sfn2/main.go +++ b/example/0-basic/sfn2/main.go @@ -24,12 +24,12 @@ func main() { // start err := sfn.Connect() if err != nil { - logger.Errorf("[sfn-2] connect err=%v", err) + logger.Errorf("[sfn2] connect err=%v", err) os.Exit(1) } // set the error handler function when server error occurs sfn.SetErrorHandler(func(err error) { - logger.Errorf("[sfn-2] receive server error: %v", err) + logger.Errorf("[sfn2] receive server error: %v", err) sfn.Close() os.Exit(1) }) @@ -38,6 +38,6 @@ func main() { } func handler(data []byte) (byte, []byte) { - logger.Printf(">> [sfn-2] got tag=0x34, data=%s", string(data)) - return 0x35, []byte("sfn-2 处理结果") + logger.Printf(">> [sfn2] got tag=0x34, data=%s", string(data)) + return 0x35, []byte("sfn2 processed result") } diff --git a/example/0-basic/source/main.go b/example/0-basic/source/main.go index 0efd65565..040ff06e0 100644 --- a/example/0-basic/source/main.go +++ b/example/0-basic/source/main.go @@ -46,7 +46,7 @@ func main() { logger.Printf("[source] receive server error: %v", err) os.Exit(1) }) - source.SetReceiverHandler(func(tag byte, data []byte) { + source.SetReceiveHandler(func(tag byte, data []byte) { logger.Printf("[source] receive backflow: tag=%#v, data=%v", tag, string(data)) }) diff --git a/source.go b/source.go index ce03a3844..015d62759 100644 --- a/source.go +++ b/source.go @@ -27,10 +27,8 @@ type Source interface { SetErrorHandler(fn func(err error)) // WriteFrame writes a frame to the connection WriteFrame(frm frame.Frame) error - // SetReceiverHandler set the observe handler function - SetReceiverHandler(fn func(tag byte, data []byte)) - // Read reads up to len(p) bytes into p. It returns the number of bytes - // Read(p []byte) (n int, err error) + // SetReceiveHandler set the observe handler function + SetReceiveHandler(fn func(tag byte, data []byte)) } // YoMo-Source @@ -110,8 +108,8 @@ func (s *yomoSource) WriteFrame(frm frame.Frame) error { return s.client.WriteFrame(frm) } -// SetReceiverHandler set the receiver handler function -func (s *yomoSource) SetReceiverHandler(fn func(byte, []byte)) { +// SetReceiveHandler set the receive handler function +func (s *yomoSource) SetReceiveHandler(fn func(byte, []byte)) { s.fn = fn s.client.Logger().Debugf("%sSetObserveHandler(%v)", sourceLogPrefix, s.fn) } From 7c429da0a11548b1a21c5a89adf412a6da992f74 Mon Sep 17 00:00:00 2001 From: venjiang Date: Wed, 27 Apr 2022 16:01:51 +0800 Subject: [PATCH 04/18] multiple tag tests --- core/client.go | 2 +- core/server.go | 10 +++++----- example/0-basic/sfn2/main.go | 2 +- example/0-basic/source/main.go | 5 +++-- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/core/client.go b/core/client.go index 34a330060..9b50398e0 100644 --- a/core/client.go +++ b/core/client.go @@ -228,11 +228,11 @@ func (c *Client) handleFrame() { } case frame.TagOfBackflowFrame: if v, ok := f.(*frame.BackflowFrame); ok { - c.setState(ConnStateBackflow) c.logger.Debugf("%sreceive BackflowFrame, tag=%#x, carry=%# x", ClientLogPrefix, v.GetDataTag(), v.GetCarriage()) if c.receiver == nil { c.logger.Warnf("%sreceiver is nil", ClientLogPrefix) } else { + c.setState(ConnStateBackflow) c.receiver(v) } } diff --git a/core/server.go b/core/server.go index 049f05d93..c31c8ac11 100644 --- a/core/server.go +++ b/core/server.go @@ -133,19 +133,19 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { } logger.Printf("%s💔 [%s](%s) close the connection", ServerLogPrefix, conn.Name(), connID) } else { - logger.Errorf("%s❤️3/ [unknown](%s) on stream %v", ServerLogPrefix, connID, err) + logger.Errorf("%s💙 [unknown](%s) on stream %v", ServerLogPrefix, connID, err) } break } defer stream.Close() - logger.Infof("%s❤️4/ [stream:%d] created, connID=%s", ServerLogPrefix, stream.StreamID(), connID) + logger.Infof("%s❤️3/ [stream:%d] created, connID=%s", ServerLogPrefix, stream.StreamID(), connID) // process frames on stream // c := newContext(connID, stream) c := newContext(conn, stream) defer c.Clean() s.handleConnection(c) - logger.Infof("%s❤️5/ [stream:%d] handleConnection DONE", ServerLogPrefix, stream.StreamID()) + logger.Infof("%s❤️4/ [stream:%d] handleConnection DONE", ServerLogPrefix, stream.StreamID()) } }(sctx, conn) } @@ -418,13 +418,13 @@ func (s *Server) handleBackflowFrame(c *Context) error { bf := frame.NewBackflowFrame(tag, carriage) sourceIDs := s.connector.GetSourceConnIDs(tag) for _, sourceID := range sourceIDs { - logger.Debugf("%shandleBackflowFrame tag:%#v --> source:%s, result=%# x", ServerLogPrefix, tag, sourceID, frame.Shortly(carriage)) // get source's quic.Stream source := s.connector.Get(sourceID) if source != nil { + logger.Debugf("%s♻️ handleBackflowFrame tag:%#v --> source:%s, result=%# x", ServerLogPrefix, tag, sourceID, frame.Shortly(carriage)) _, err := source.Write(bf.Encode()) if err != nil { - logger.Errorf("%shandleBackflowFrame tag:%#v --> source:%s, error=%v", ServerLogPrefix, tag, sourceID, err) + logger.Errorf("%s♻️ handleBackflowFrame tag:%#v --> source:%s, error=%v", ServerLogPrefix, tag, sourceID, err) return err } } diff --git a/example/0-basic/sfn2/main.go b/example/0-basic/sfn2/main.go index 4df2dc9aa..2d6c30582 100644 --- a/example/0-basic/sfn2/main.go +++ b/example/0-basic/sfn2/main.go @@ -39,5 +39,5 @@ func main() { func handler(data []byte) (byte, []byte) { logger.Printf(">> [sfn2] got tag=0x34, data=%s", string(data)) - return 0x35, []byte("sfn2 processed result") + return 0x0, []byte("sfn2 processed result") } diff --git a/example/0-basic/source/main.go b/example/0-basic/source/main.go index 040ff06e0..f67227898 100644 --- a/example/0-basic/source/main.go +++ b/example/0-basic/source/main.go @@ -30,7 +30,7 @@ func main() { "yomo-source", yomo.WithZipperAddr(addr), yomo.WithLogger(logger), - yomo.WithObserveDataTags(0x34, 0x35), + yomo.WithObserveDataTags(0x34, 0x0), ) err := source.Connect() if err != nil { @@ -46,8 +46,9 @@ func main() { logger.Printf("[source] receive server error: %v", err) os.Exit(1) }) + // set receive handler for the observe datatags source.SetReceiveHandler(func(tag byte, data []byte) { - logger.Printf("[source] receive backflow: tag=%#v, data=%v", tag, string(data)) + logger.Printf("[source] ♻️ receive backflow: tag=%#v, data=%v", tag, string(data)) }) // generate mock data and send it to YoMo-Zipper in every 100 ms. From 675f379e70ab456f498249c572b9772031b38e4d Mon Sep 17 00:00:00 2001 From: venjiang Date: Wed, 27 Apr 2022 16:29:28 +0800 Subject: [PATCH 05/18] backflow frame comments --- core/frame/backflow_frame.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/frame/backflow_frame.go b/core/frame/backflow_frame.go index f394b910a..46a885334 100644 --- a/core/frame/backflow_frame.go +++ b/core/frame/backflow_frame.go @@ -5,16 +5,17 @@ import ( ) // BackflowFrame is a Y3 encoded bytes +// It's used to receive stream function processed result type BackflowFrame struct { Tag byte Carriage []byte } -// NewBackflowPayloadFrame creates a new PayloadFrame with a given TagID of user's data -func NewBackflowFrame(tag byte,carriage []byte) *BackflowFrame { +// NewBackflowFrame creates a new BackflowFrame with a given tag and carriage +func NewBackflowFrame(tag byte, carriage []byte) *BackflowFrame { return &BackflowFrame{ - Tag: tag, - Carriage:carriage, + Tag: tag, + Carriage: carriage, } } From 21e1db6822c2bcc8cd8c8974e716d071d2ea7982 Mon Sep 17 00:00:00 2001 From: venjiang Date: Wed, 27 Apr 2022 18:13:21 +0800 Subject: [PATCH 06/18] change source's receive handler log message --- source.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source.go b/source.go index 015d62759..6791f2093 100644 --- a/source.go +++ b/source.go @@ -111,5 +111,5 @@ func (s *yomoSource) WriteFrame(frm frame.Frame) error { // SetReceiveHandler set the receive handler function func (s *yomoSource) SetReceiveHandler(fn func(byte, []byte)) { s.fn = fn - s.client.Logger().Debugf("%sSetObserveHandler(%v)", sourceLogPrefix, s.fn) + s.client.Logger().Debugf("%sSetReceiveHandler(%v)", sourceLogPrefix, s.fn) } From aef5da571359ff3e368e96bb13c440d8edea7312 Mon Sep 17 00:00:00 2001 From: venjiang Date: Wed, 27 Apr 2022 23:38:27 +0800 Subject: [PATCH 07/18] clean up plan B legacy code --- core/connector.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/core/connector.go b/core/connector.go index 9ff19192a..a00a3c7cd 100644 --- a/core/connector.go +++ b/core/connector.go @@ -65,6 +65,37 @@ func (c *connector) Get(connID string) Connection { return nil } +// GetSourceConnIDs gets the source connection ids by tag. +func (c *connector) GetSourceConnIDs(tag byte) []string { + connIDs := make([]string, 0) + + c.sources.Range(func(key interface{}, val interface{}) bool { + app := val.(*app) + for _, v := range app.observed { + if v == tag { + connIDs = append(connIDs, key.(string)) + // break + } + } + return true + }) + + return connIDs +} + +// Write a Frame to a connection. +func (c *connector) Write(f frame.Frame, toID string) error { + targetStream := c.Get(toID) + if targetStream == nil { + logger.Warnf("%swill write to: [%s], target stream is nil", ServerLogPrefix, toID) + return fmt.Errorf("target[%s] stream is nil", toID) + } + c.mu.Lock() + _, err := targetStream.Write(f.Encode()) + c.mu.Unlock() + return err +} + // GetSnapshot gets the snapshot of all connections. func (c *connector) GetSnapshot() map[string]string { result := make(map[string]string) From 4cbdc3f87b253111c74f9b3a9bbb2b64518b7c2c Mon Sep 17 00:00:00 2001 From: venjiang Date: Thu, 5 May 2022 23:57:04 +0800 Subject: [PATCH 08/18] refactor backflow --- core/frame/meta_frame.go | 1 + core/server.go | 2 +- source.go | 10 ++++++---- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/core/frame/meta_frame.go b/core/frame/meta_frame.go index 534c6b855..e3e6fcf1b 100644 --- a/core/frame/meta_frame.go +++ b/core/frame/meta_frame.go @@ -13,6 +13,7 @@ import ( type MetaFrame struct { tid string metadata []byte + sourceID string } // NewMetaFrame creates a new MetaFrame instance. diff --git a/core/server.go b/core/server.go index c31c8ac11..ffda58844 100644 --- a/core/server.go +++ b/core/server.go @@ -287,7 +287,7 @@ func (s *Server) handleHandshakeFrame(c *Context) error { clientType := ClientType(f.ClientType) stream := c.Stream // credential - logger.Debugf("%sClientType=%# x is %s, Credential=%s", ServerLogPrefix, f.ClientType, ClientType(f.ClientType), authName(f.AuthName())) + logger.Debugf("%sClientType=%# x is %s, sourceID=%s, Credential=%s", ServerLogPrefix, f.ClientType, ClientType(f.ClientType), f.SourceID(), authName(f.AuthName())) // authenticate if !s.authenticate(f) { err := fmt.Errorf("handshake authentication fails, client credential name is %s", authName(f.AuthName())) diff --git a/source.go b/source.go index 6791f2093..ee5704d26 100644 --- a/source.go +++ b/source.go @@ -92,10 +92,12 @@ func (s *yomoSource) Connect() error { // WriteWithTag will write data with specified tag, default transactionID is epoch time. func (s *yomoSource) WriteWithTag(tag uint8, data []byte) error { - s.client.Logger().Debugf("%sWriteWithTag: len(data)=%d, data=%# x", sourceLogPrefix, len(data), frame.Shortly(data)) - frame := frame.NewDataFrame() - frame.SetCarriage(byte(tag), data) - return s.client.WriteFrame(frame) + f := frame.NewDataFrame() + f.SetCarriage(byte(tag), data) + f.SetSourceID(s.client.ClientID()) + s.client.Logger().Debugf("%sWriteWithTag: tid=%s, source_id=%s, data[%d]=%# x", + sourceLogPrefix, f.TransactionID(), f.SourceID(), len(data), frame.Shortly(data)) + return s.WriteFrame(f) } // SetErrorHandler set the error handler function when server error occurs From 8335bd6d13374d8577b888539489d71dd5b4e440 Mon Sep 17 00:00:00 2001 From: venjiang Date: Fri, 6 May 2022 00:43:59 +0800 Subject: [PATCH 09/18] add test --- core/frame/data_frame_test.go | 5 +++-- core/frame/handshake_frame_test.go | 7 ++++++- core/frame/meta_frame_test.go | 7 +++++-- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/core/frame/data_frame_test.go b/core/frame/data_frame_test.go index 878cb3141..0d1d506f4 100644 --- a/core/frame/data_frame_test.go +++ b/core/frame/data_frame_test.go @@ -13,10 +13,11 @@ func TestDataFrameEncode(t *testing.T) { tidbuf := []byte(d.TransactionID()) result := []byte{ - 0x80 | byte(TagOfDataFrame), byte(len(tidbuf) + 4 + 8), - 0x80 | byte(TagOfMetaFrame), byte(len(tidbuf) + 2), + 0x80 | byte(TagOfDataFrame), byte(len(tidbuf) + 4 + 8 + 2), + 0x80 | byte(TagOfMetaFrame), byte(len(tidbuf) + 2 + 2), byte(TagOfTransactionID), byte(len(tidbuf))} result = append(result, tidbuf...) + result = append(result, byte(TagOfSourceID), 0x0) result = append(result, 0x80|byte(TagOfPayloadFrame), 0x06, userDataTag, 0x04, 0x79, 0x6F, 0x6D, 0x6F) assert.Equal(t, result, d.Encode()) diff --git a/core/frame/handshake_frame_test.go b/core/frame/handshake_frame_test.go index 1737d9309..26be2f4ba 100644 --- a/core/frame/handshake_frame_test.go +++ b/core/frame/handshake_frame_test.go @@ -10,14 +10,18 @@ func TestHandshakeFrameEncode(t *testing.T) { expectedName := "1234" var expectedType byte = 0xD3 m := NewHandshakeFrame(expectedName, expectedType, []byte{0x01, 0x02}, "token", "a") + m.metaFrame.SetTransactionID("1234") + m.metaFrame.SetSourceID("1") assert.Equal(t, []byte{ - 0x80 | byte(TagOfHandshakeFrame), 0x17, + 0x80 | byte(TagOfHandshakeFrame), 0x17 + 11, byte(TagOfHandshakeName), 0x04, 0x31, 0x32, 0x33, 0x34, byte(TagOfHandshakeType), 0x01, 0xD3, byte(TagOfHandshakeObserveDataTags), 0x02, 0x01, 0x02, // byte(TagOfHandshakeAppID), 0x0, byte(TagOfHandshakeAuthName), 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, byte(TagOfHandshakeAuthPayload), 0x01, 0x61, + // metaframe, see meta_frame_test + 0xaf, 0x09, 0x01, 0x04, 0x31, 0x32, 0x33, 0x34, 0x03, 0x01, 0x31, }, m.Encode(), ) @@ -26,4 +30,5 @@ func TestHandshakeFrameEncode(t *testing.T) { assert.NoError(t, err) assert.EqualValues(t, expectedName, Handshake.Name) assert.EqualValues(t, expectedType, Handshake.ClientType) + assert.EqualValues(t, "1", Handshake.SourceID()) } diff --git a/core/frame/meta_frame_test.go b/core/frame/meta_frame_test.go index 94781a0ec..31e58b22c 100644 --- a/core/frame/meta_frame_test.go +++ b/core/frame/meta_frame_test.go @@ -9,14 +9,17 @@ import ( func TestMetaFrameEncode(t *testing.T) { m := NewMetaFrame() tidbuf := []byte(m.tid) - result := []byte{0x80 | byte(TagOfMetaFrame), byte(1 + 1 + len(tidbuf)), byte(TagOfTransactionID), byte(len(tidbuf))} + result := []byte{0x80 | byte(TagOfMetaFrame), byte(1 + 1 + len(tidbuf) + 2), byte(TagOfTransactionID), byte(len(tidbuf))} result = append(result, tidbuf...) + result = append(result, byte(TagOfSourceID), 0x0) assert.Equal(t, result, m.Encode()) } func TestMetaFrameDecode(t *testing.T) { - buf := []byte{0x80 | byte(TagOfMetaFrame), 0x06, byte(TagOfTransactionID), 0x04, 0x31, 0x32, 0x33, 0x34} + buf := []byte{0x80 | byte(TagOfMetaFrame), 0x09, byte(TagOfTransactionID), 0x04, 0x31, 0x32, 0x33, 0x34, byte(TagOfSourceID), 0x01, 0x31} meta, err := DecodeToMetaFrame(buf) assert.NoError(t, err) assert.EqualValues(t, "1234", meta.TransactionID()) + assert.EqualValues(t, "1", meta.SourceID()) + t.Logf("%# x", buf) } From a18edfced965ffd2c49eb772b16c0797126f25d7 Mon Sep 17 00:00:00 2001 From: venjiang Date: Tue, 17 May 2022 17:12:30 +0800 Subject: [PATCH 10/18] fix: compatibility --- core/connector.go | 5 ----- core/frame/handshake_frame.go | 4 ++++ 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/core/connector.go b/core/connector.go index a00a3c7cd..7a4a0d0e9 100644 --- a/core/connector.go +++ b/core/connector.go @@ -44,11 +44,6 @@ func (c *connector) Add(connID string, conn Connection) { c.conns.Store(connID, conn) } -// func (c *connector) AddSource(connID string, stream io.ReadWriteCloser) { -// logger.Debugf("%sconnector add: connID=%s", ServerLogPrefix, connID) -// c.sconns.Store(connID, stream) -// } - // Remove a connection. func (c *connector) Remove(connID string) { logger.Debugf("%sconnector remove: connID=%s", ServerLogPrefix, connID) diff --git a/core/frame/handshake_frame.go b/core/frame/handshake_frame.go index d095b44bd..b1c3e0324 100644 --- a/core/frame/handshake_frame.go +++ b/core/frame/handshake_frame.go @@ -56,6 +56,10 @@ func (h *HandshakeFrame) Encode() []byte { handshake.AddPrimitivePacket(observeDataTagsBlock) handshake.AddPrimitivePacket(authNameBlock) handshake.AddPrimitivePacket(authPayloadBlock) + // metaframe + if h.metaFrame != nil { + handshake.AddBytes(h.metaFrame.Encode()) + } return handshake.Encode() } From 265bf4fb8bdc74d048ed93a8e927947750581406 Mon Sep 17 00:00:00 2001 From: venjiang Date: Tue, 17 May 2022 22:07:58 +0800 Subject: [PATCH 11/18] handshake add client id field --- core/client.go | 5 +++-- core/connector.go | 41 +++++++++++++++++++++++++++++++++-- core/frame/frame.go | 9 ++++---- core/frame/handshake_frame.go | 21 +++++++++++++++--- core/server.go | 3 ++- source.go | 4 ++-- 6 files changed, 68 insertions(+), 15 deletions(-) diff --git a/core/client.go b/core/client.go index 9b50398e0..075e0f873 100644 --- a/core/client.go +++ b/core/client.go @@ -112,6 +112,7 @@ func (c *Client) connect(ctx context.Context, addr string) error { // send handshake handshake := frame.NewHandshakeFrame( c.name, + c.clientID, byte(c.clientType), c.opts.ObserveDataTags, c.opts.Credential.Name(), @@ -125,7 +126,7 @@ func (c *Client) connect(ctx context.Context, addr string) error { c.state = ConnStateConnected c.localAddr = c.conn.LocalAddr().String() - c.logger.Printf("%s❤️ [%s](%s) is connected to YoMo-Zipper %s", ClientLogPrefix, c.name, c.localAddr, addr) + c.logger.Printf("%s❤️ [%s][%s](%s) is connected to YoMo-Zipper %s", ClientLogPrefix, c.name, c.clientID, c.localAddr, addr) // receiving frames go c.handleFrame() @@ -245,7 +246,7 @@ func (c *Client) handleFrame() { // Close the client. func (c *Client) Close() (err error) { if c.conn != nil { - c.logger.Printf("%sclose the connection, name:%s, addr:%s", ClientLogPrefix, c.name, c.conn.RemoteAddr().String()) + c.logger.Printf("%sclose the connection, name:%s, id:%s, addr:%s", ClientLogPrefix, c.name, c.clientID, c.conn.RemoteAddr().String()) } if c.stream != nil { err = c.stream.Close() diff --git a/core/connector.go b/core/connector.go index 7a4a0d0e9..492a2ff24 100644 --- a/core/connector.go +++ b/core/connector.go @@ -6,6 +6,24 @@ import ( "github.com/yomorun/yomo/pkg/logger" ) +<<<<<<< HEAD +======= +type app struct { + id string // client id + name string // app name + observed []byte // data tags + sourceID string // source id +} + +func (a *app) ID() string { + return a.id +} + +func (a *app) Name() string { + return a.name +} + +>>>>>>> 27a22b4 (handshake add client id field) var _ Connector = &connector{} // Connector is a interface to manage the connections and applications. @@ -17,11 +35,30 @@ type Connector interface { // Get a connection by connection id. Get(connID string) Connection // GetSnapshot gets the snapshot of all connections. +<<<<<<< HEAD GetSnapshot() map[string]string // GetSourceConnIDs gets the connection ids by source observe tag. GetSourceConnIDs(tags byte) []string // LinkSource links the source and connection. LinkSource(connID string, name string, observed []byte) +======= + GetSnapshot() map[string]io.ReadWriteCloser + // App gets the app by connID. + App(connID string) (*app, bool) + // AppID gets the ID of app by connID. + // AppID(connID string) (string, bool) + // AppName gets the name of app by connID. + AppName(connID string) (string, bool) + // LinkApp links the app and connection. + LinkApp(connID string, id string, name string, observed []byte) + // LinkSource links the source and connection. + LinkSource(connID string, id string, name string, sourceID string, observed []byte) + // UnlinkApp removes the app by connID. + UnlinkApp(connID string, name string) + // ExistsApp check app exists + ExistsApp(name string) bool + +>>>>>>> 27a22b4 (handshake add client id field) // Clean the connector. Clean() } @@ -104,9 +141,9 @@ func (c *connector) GetSnapshot() map[string]string { } // LinkSource links the source and connection. -func (c *connector) LinkSource(connID string, name string, observed []byte) { +func (c *connector) LinkSource(connID string, id string, name string, sourceID string, observed []byte) { logger.Debugf("%sconnector link source: connID[%s] --> source[%s]", ServerLogPrefix, connID, name) - c.sources.Store(connID, &app{name, observed}) + c.sources.Store(connID, &app{id, name, observed, sourceID}) } // Clean the connector. diff --git a/core/frame/frame.go b/core/frame/frame.go index 38f2a3ffa..e51fbab16 100644 --- a/core/frame/frame.go +++ b/core/frame/frame.go @@ -22,11 +22,10 @@ const ( TagOfTokenFrame Type = 0x3E // HandshakeFrame - TagOfHandshakeFrame Type = 0x3D - TagOfHandshakeName Type = 0x01 - TagOfHandshakeType Type = 0x02 - // TagOfHandshakeAppID Type = 0x03 - // TagOfHandshakeAuthType Type = 0x04 + TagOfHandshakeFrame Type = 0x3D + TagOfHandshakeName Type = 0x01 + TagOfHandshakeType Type = 0x02 + TagOfHandshakeID Type = 0x03 TagOfHandshakeAuthName Type = 0x04 TagOfHandshakeAuthPayload Type = 0x05 TagOfHandshakeObserveDataTags Type = 0x06 diff --git a/core/frame/handshake_frame.go b/core/frame/handshake_frame.go index b1c3e0324..03fdeef03 100644 --- a/core/frame/handshake_frame.go +++ b/core/frame/handshake_frame.go @@ -8,6 +8,8 @@ import ( type HandshakeFrame struct { // Name is client name Name string + // ClientID represents client ID + ClientID string // ClientType represents client type (source or sfn) ClientType byte // ObserveDataTags are the client data tag list. @@ -18,9 +20,10 @@ type HandshakeFrame struct { } // NewHandshakeFrame creates a new HandshakeFrame. -func NewHandshakeFrame(name string, clientType byte, observeDataTags []byte, authName string, authPayload string) *HandshakeFrame { +func NewHandshakeFrame(name string, clientID string, clientType byte, observeDataTags []byte, authName string, authPayload string) *HandshakeFrame { return &HandshakeFrame{ Name: name, + ClientID: clientID, ClientType: clientType, ObserveDataTags: observeDataTags, authName: authName, @@ -38,7 +41,10 @@ func (h *HandshakeFrame) Encode() []byte { // name nameBlock := y3.NewPrimitivePacketEncoder(byte(TagOfHandshakeName)) nameBlock.SetStringValue(h.Name) - // type + // client ID + idBlock := y3.NewPrimitivePacketEncoder(byte(TagOfHandshakeID)) + idBlock.SetStringValue(h.ClientID) + // client type typeBlock := y3.NewPrimitivePacketEncoder(byte(TagOfHandshakeType)) typeBlock.SetBytesValue([]byte{h.ClientType}) // observe data tags @@ -51,6 +57,7 @@ func (h *HandshakeFrame) Encode() []byte { authPayloadBlock.SetStringValue(h.authPayload) // handshake frame handshake := y3.NewNodePacketEncoder(byte(h.Type())) + handshake.AddPrimitivePacket(idBlock) handshake.AddPrimitivePacket(nameBlock) handshake.AddPrimitivePacket(typeBlock) handshake.AddPrimitivePacket(observeDataTagsBlock) @@ -81,7 +88,15 @@ func DecodeToHandshakeFrame(buf []byte) (*HandshakeFrame, error) { } handshake.Name = name } - // type + // client ID + if idBlock, ok := node.PrimitivePackets[byte(TagOfHandshakeID)]; ok { + id, err := idBlock.ToUTF8String() + if err != nil { + return nil, err + } + handshake.ClientID = id + } + // client type if typeBlock, ok := node.PrimitivePackets[byte(TagOfHandshakeType)]; ok { clientType := typeBlock.ToBytes() handshake.ClientType = clientType[0] diff --git a/core/server.go b/core/server.go index ffda58844..0b3eb3ac6 100644 --- a/core/server.go +++ b/core/server.go @@ -284,10 +284,11 @@ func (s *Server) handleHandshakeFrame(c *Context) error { logger.Debugf("%sGOT ❤️ HandshakeFrame : %# x", ServerLogPrefix, f) // basic info connID := c.ConnID() + clientID := f.ClientID clientType := ClientType(f.ClientType) stream := c.Stream // credential - logger.Debugf("%sClientType=%# x is %s, sourceID=%s, Credential=%s", ServerLogPrefix, f.ClientType, ClientType(f.ClientType), f.SourceID(), authName(f.AuthName())) + logger.Debugf("%sClientType=%# x is %s, ClientID=%s, Credential=%s", ServerLogPrefix, f.ClientType, ClientType(f.ClientType), clientID, authName(f.AuthName())) // authenticate if !s.authenticate(f) { err := fmt.Errorf("handshake authentication fails, client credential name is %s", authName(f.AuthName())) diff --git a/source.go b/source.go index ee5704d26..ea14a5fc7 100644 --- a/source.go +++ b/source.go @@ -27,7 +27,7 @@ type Source interface { SetErrorHandler(fn func(err error)) // WriteFrame writes a frame to the connection WriteFrame(frm frame.Frame) error - // SetReceiveHandler set the observe handler function + // [Experimental] SetReceiveHandler set the observe handler function SetReceiveHandler(fn func(tag byte, data []byte)) } @@ -110,7 +110,7 @@ func (s *yomoSource) WriteFrame(frm frame.Frame) error { return s.client.WriteFrame(frm) } -// SetReceiveHandler set the receive handler function +// [Experimental] SetReceiveHandler set the observe handler function func (s *yomoSource) SetReceiveHandler(fn func(byte, []byte)) { s.fn = fn s.client.Logger().Debugf("%sSetReceiveHandler(%v)", sourceLogPrefix, s.fn) From 2f7c8635f267a6a72cf76261a340c6f8a42baa8a Mon Sep 17 00:00:00 2001 From: venjiang Date: Wed, 18 May 2022 00:07:26 +0800 Subject: [PATCH 12/18] test add id pkg temp save --- core/client.go | 9 +++ core/connection.go | 15 +++- core/connector.go | 111 +++++++++++------------------ core/frame/data_frame.go | 10 +++ core/frame/frame.go | 1 + core/frame/handshake_frame.go | 20 ++++-- core/frame/handshake_frame_test.go | 5 +- core/frame/meta_frame.go | 34 +++++++-- core/server.go | 17 ++--- metadata.go | 6 +- pkg/id/id.go | 16 +++++ 11 files changed, 151 insertions(+), 93 deletions(-) create mode 100644 pkg/id/id.go diff --git a/core/client.go b/core/client.go index 075e0f873..866999ad5 100644 --- a/core/client.go +++ b/core/client.go @@ -14,6 +14,7 @@ import ( "github.com/yomorun/yomo/core/frame" "github.com/yomorun/yomo/core/log" "github.com/yomorun/yomo/core/yerr" + "github.com/yomorun/yomo/pkg/id" "github.com/yomorun/yomo/pkg/logger" pkgtls "github.com/yomorun/yomo/pkg/tls" ) @@ -28,6 +29,7 @@ type ConnState = string // Source, Upstream Zipper or StreamFunction. type Client struct { name string // name of the client + clientID string // id of the client clientType ClientType // type of the connection conn quic.Connection // quic connection stream quic.Stream // quic stream @@ -48,6 +50,7 @@ type Client struct { func NewClient(appName string, connType ClientType, opts ...ClientOption) *Client { c := &Client{ name: appName, + clientID: id.New(), clientType: connType, state: ConnStateReady, opts: ClientOptions{}, @@ -118,6 +121,8 @@ func (c *Client) connect(ctx context.Context, addr string) error { c.opts.Credential.Name(), c.opts.Credential.Payload(), ) + // source ID + handshake.SetSourceID(c.clientID) err = c.WriteFrame(handshake) if err != nil { c.state = ConnStateRejected @@ -456,3 +461,7 @@ func (c *Client) SetErrorHandler(fn func(err error)) { }() } } +// ClientID return the client ID +func (c *Client) ClientID() string { + return c.clientID +} diff --git a/core/connection.go b/core/connection.go index 55ac28c32..1735043bb 100644 --- a/core/connection.go +++ b/core/connection.go @@ -19,6 +19,8 @@ type Connection interface { Metadata() Metadata // Write should goroutine-safely send y3 frames to peer side Write(f frame.Frame) error + // ObserveDataTags observed data tags + ObserveDataTags() []byte } type connection struct { @@ -26,13 +28,19 @@ type connection struct { clientType ClientType metadata Metadata stream io.ReadWriteCloser + clientID string + sourceID string + observed []byte // observed data tags mu sync.Mutex } -func newConnection(name string, clientType ClientType, metadata Metadata, stream io.ReadWriteCloser) Connection { +func newConnection(name string, clientID string, clientType ClientType, sourceID string, metadata Metadata, stream io.ReadWriteCloser, observed []byte) Connection { return &connection{ name: name, + clientID: clientID, clientType: clientType, + sourceID: sourceID, + observed: observed, metadata: metadata, stream: stream, } @@ -65,3 +73,8 @@ func (c *connection) Write(f frame.Frame) error { _, err := c.stream.Write(f.Encode()) return err } + +// ObserveDataTags observed data tags +func (c *connection) ObserveDataTags() []byte { + return c.observed +} diff --git a/core/connector.go b/core/connector.go index 492a2ff24..e961a852e 100644 --- a/core/connector.go +++ b/core/connector.go @@ -6,24 +6,6 @@ import ( "github.com/yomorun/yomo/pkg/logger" ) -<<<<<<< HEAD -======= -type app struct { - id string // client id - name string // app name - observed []byte // data tags - sourceID string // source id -} - -func (a *app) ID() string { - return a.id -} - -func (a *app) Name() string { - return a.name -} - ->>>>>>> 27a22b4 (handshake add client id field) var _ Connector = &connector{} // Connector is a interface to manage the connections and applications. @@ -35,44 +17,21 @@ type Connector interface { // Get a connection by connection id. Get(connID string) Connection // GetSnapshot gets the snapshot of all connections. -<<<<<<< HEAD GetSnapshot() map[string]string - // GetSourceConnIDs gets the connection ids by source observe tag. - GetSourceConnIDs(tags byte) []string + // GetSourceConns gets the connections by source observe tags. + GetSourceConns(sourceID string, tags byte) []Connection // LinkSource links the source and connection. - LinkSource(connID string, name string, observed []byte) -======= - GetSnapshot() map[string]io.ReadWriteCloser - // App gets the app by connID. - App(connID string) (*app, bool) - // AppID gets the ID of app by connID. - // AppID(connID string) (string, bool) - // AppName gets the name of app by connID. - AppName(connID string) (string, bool) - // LinkApp links the app and connection. - LinkApp(connID string, id string, name string, observed []byte) - // LinkSource links the source and connection. - LinkSource(connID string, id string, name string, sourceID string, observed []byte) - // UnlinkApp removes the app by connID. - UnlinkApp(connID string, name string) - // ExistsApp check app exists - ExistsApp(name string) bool - ->>>>>>> 27a22b4 (handshake add client id field) + // LinkSource(connID string, id string, name string, sourceID string, observed []byte) // Clean the connector. Clean() } type connector struct { conns sync.Map - sources sync.Map } func newConnector() Connector { - return &connector{ - conns sync.Map - sources sync.Map - } + return &connector{conns: sync.Map{}} } // Add a connection. @@ -85,7 +44,6 @@ func (c *connector) Add(connID string, conn Connection) { func (c *connector) Remove(connID string) { logger.Debugf("%sconnector remove: connID=%s", ServerLogPrefix, connID) c.conns.Delete(connID) - c.sources.Delete(connID) } // Get a connection by connection id. @@ -97,36 +55,49 @@ func (c *connector) Get(connID string) Connection { return nil } -// GetSourceConnIDs gets the source connection ids by tag. -func (c *connector) GetSourceConnIDs(tag byte) []string { - connIDs := make([]string, 0) +// GetSourceConns gets the source connection by tag. +func (c *connector) GetSourceConns(sourceID string, tag byte) []Connection { + // connIDs := make([]string, 0) + + // c.sources.Range(func(key interface{}, val interface{}) bool { + // app := val.(*app) + // for _, v := range app.observed { + // if v == tag { + // connIDs = append(connIDs, key.(string)) + // // break + // } + // } + // return true + // }) - c.sources.Range(func(key interface{}, val interface{}) bool { - app := val.(*app) - for _, v := range app.observed { + // return connection list + conns := make([]Connection, 0) + + c.conns.Range(func(key interface{}, val interface{}) bool { + conn := val.(Connection) + for _, v := range conn.ObserveDataTags() { if v == tag { - connIDs = append(connIDs, key.(string)) - // break + conns = append(conns, conn) } } return true }) - return connIDs + return conns } // Write a Frame to a connection. -func (c *connector) Write(f frame.Frame, toID string) error { - targetStream := c.Get(toID) - if targetStream == nil { - logger.Warnf("%swill write to: [%s], target stream is nil", ServerLogPrefix, toID) - return fmt.Errorf("target[%s] stream is nil", toID) - } - c.mu.Lock() - _, err := targetStream.Write(f.Encode()) - c.mu.Unlock() - return err -} +// func (c *connector) Write(f frame.Frame, toID string) error { +// targetStream := c.Get(toID) +// if targetStream == nil { +// logger.Warnf("%swill write to: [%s], target stream is nil", ServerLogPrefix, toID) +// return fmt.Errorf("target[%s] stream is nil", toID) +// } +// c.mu.Lock() +// _, err := targetStream.Write(f.Encode()) +// c.mu.Unlock() +// return err +// } // GetSnapshot gets the snapshot of all connections. func (c *connector) GetSnapshot() map[string]string { @@ -141,10 +112,10 @@ func (c *connector) GetSnapshot() map[string]string { } // LinkSource links the source and connection. -func (c *connector) LinkSource(connID string, id string, name string, sourceID string, observed []byte) { - logger.Debugf("%sconnector link source: connID[%s] --> source[%s]", ServerLogPrefix, connID, name) - c.sources.Store(connID, &app{id, name, observed, sourceID}) -} +// func (c *connector) LinkSource(connID string, id string, name string, sourceID string, observed []byte) { +// logger.Debugf("%sconnector link source: connID[%s] --> source[%s]", ServerLogPrefix, connID, name) +// c.sources.Store(connID, &app{id, name, observed, sourceID}) +// } // Clean the connector. func (c *connector) Clean() { diff --git a/core/frame/data_frame.go b/core/frame/data_frame.go index 44ed7af66..eb06c641d 100644 --- a/core/frame/data_frame.go +++ b/core/frame/data_frame.go @@ -60,6 +60,16 @@ func (d *DataFrame) GetDataTag() byte { return d.payloadFrame.Tag } +// SetSourceID set the source ID. +func (d *DataFrame) SetSourceID(sourceID string) { + d.metaFrame.SetSourceID(sourceID) +} + +// SourceID returns source ID +func (d *DataFrame) SourceID() string { + return d.metaFrame.SourceID() +} + // Encode return Y3 encoded bytes of `DataFrame` func (d *DataFrame) Encode() []byte { data := y3.NewNodePacketEncoder(byte(d.Type())) diff --git a/core/frame/frame.go b/core/frame/frame.go index e51fbab16..326c31deb 100644 --- a/core/frame/frame.go +++ b/core/frame/frame.go @@ -16,6 +16,7 @@ const ( TagOfMetaFrame Type = 0x2F TagOfMetadata Type = 0x03 TagOfTransactionID Type = 0x01 + TagOfSourceID Type = 0x02 // PayloadFrame of DataFrame TagOfPayloadFrame Type = 0x2E TagOfBackflowFrame Type = 0x2D diff --git a/core/frame/handshake_frame.go b/core/frame/handshake_frame.go index 03fdeef03..bb414b049 100644 --- a/core/frame/handshake_frame.go +++ b/core/frame/handshake_frame.go @@ -17,6 +17,8 @@ type HandshakeFrame struct { // auth authName string authPayload string + // sourceID source client ID + sourceID string } // NewHandshakeFrame creates a new HandshakeFrame. @@ -31,6 +33,16 @@ func NewHandshakeFrame(name string, clientID string, clientType byte, observeDat } } +// SetSourceID set the source ID. +func (h *HandshakeFrame) SetSourceID(sourceID string) { + h.sourceID = sourceID +} + +// SourceID returns source ID +func (h *HandshakeFrame) SourceID() string { + return h.sourceID +} + // Type gets the type of Frame. func (h *HandshakeFrame) Type() Type { return TagOfHandshakeFrame @@ -57,16 +69,16 @@ func (h *HandshakeFrame) Encode() []byte { authPayloadBlock.SetStringValue(h.authPayload) // handshake frame handshake := y3.NewNodePacketEncoder(byte(h.Type())) - handshake.AddPrimitivePacket(idBlock) handshake.AddPrimitivePacket(nameBlock) + handshake.AddPrimitivePacket(idBlock) handshake.AddPrimitivePacket(typeBlock) handshake.AddPrimitivePacket(observeDataTagsBlock) handshake.AddPrimitivePacket(authNameBlock) handshake.AddPrimitivePacket(authPayloadBlock) // metaframe - if h.metaFrame != nil { - handshake.AddBytes(h.metaFrame.Encode()) - } + // if h.metaFrame != nil { + // handshake.AddBytes(h.metaFrame.Encode()) + // } return handshake.Encode() } diff --git a/core/frame/handshake_frame_test.go b/core/frame/handshake_frame_test.go index 26be2f4ba..17d9a8abd 100644 --- a/core/frame/handshake_frame_test.go +++ b/core/frame/handshake_frame_test.go @@ -9,12 +9,13 @@ import ( func TestHandshakeFrameEncode(t *testing.T) { expectedName := "1234" var expectedType byte = 0xD3 - m := NewHandshakeFrame(expectedName, expectedType, []byte{0x01, 0x02}, "token", "a") + m := NewHandshakeFrame(expectedName, "", expectedType, []byte{0x01, 0x02}, "token", "a") m.metaFrame.SetTransactionID("1234") m.metaFrame.SetSourceID("1") assert.Equal(t, []byte{ - 0x80 | byte(TagOfHandshakeFrame), 0x17 + 11, + 0x80 | byte(TagOfHandshakeFrame), 0x17 + 13, byte(TagOfHandshakeName), 0x04, 0x31, 0x32, 0x33, 0x34, + byte(TagOfHandshakeID), 0x0, byte(TagOfHandshakeType), 0x01, 0xD3, byte(TagOfHandshakeObserveDataTags), 0x02, 0x01, 0x02, // byte(TagOfHandshakeAppID), 0x0, diff --git a/core/frame/meta_frame.go b/core/frame/meta_frame.go index e3e6fcf1b..3925c6b2a 100644 --- a/core/frame/meta_frame.go +++ b/core/frame/meta_frame.go @@ -45,19 +45,32 @@ func (m *MetaFrame) Metadata() []byte { return m.metadata } +// SetSourceID set the source ID. +func (m *MetaFrame) SetSourceID(sourceID string) { + m.sourceID = sourceID +} + +// SourceID returns source ID +func (m *MetaFrame) SourceID() string { + return m.sourceID +} + // Encode implements Frame.Encode method. func (m *MetaFrame) Encode() []byte { meta := y3.NewNodePacketEncoder(byte(TagOfMetaFrame)) - + // transaction ID transactionID := y3.NewPrimitivePacketEncoder(byte(TagOfTransactionID)) transactionID.SetStringValue(m.tid) meta.AddPrimitivePacket(transactionID) - - if m.metadata != nil { - metadata := y3.NewPrimitivePacketEncoder(byte(TagOfMetadata)) - metadata.SetBytesValue(m.metadata) - meta.AddPrimitivePacket(metadata) - } + // source ID + sourceID := y3.NewPrimitivePacketEncoder(byte(TagOfSourceID)) + sourceID.SetStringValue(m.sourceID) + meta.AddPrimitivePacket(sourceID) + // if m.metadata != nil { + // metadata := y3.NewPrimitivePacketEncoder(byte(TagOfMetadata)) + // metadata.SetBytesValue(m.metadata) + // meta.AddPrimitivePacket(metadata) + // } return meta.Encode() } @@ -83,6 +96,13 @@ func DecodeToMetaFrame(buf []byte) (*MetaFrame, error) { case byte(TagOfMetadata): meta.metadata = v.ToBytes() break + case byte(TagOfSourceID): + sourceID, err := v.ToUTF8String() + if err != nil { + return nil, err + } + meta.sourceID = sourceID + break } } diff --git a/core/server.go b/core/server.go index 0b3eb3ac6..faf997bc1 100644 --- a/core/server.go +++ b/core/server.go @@ -311,7 +311,7 @@ func (s *Server) handleHandshakeFrame(c *Context) error { if err != nil { return err } - conn = newConnection(f.Name, clientType, metadata, stream) + conn = newConnection(f.Name, f.ClientID, clientType, f.SourceID(), metadata, stream, f.ObserveDataTags) if clientType == ClientTypeStreamFunction { // route @@ -329,16 +329,17 @@ func (s *Server) handleHandshakeFrame(c *Context) error { } } case ClientTypeUpstreamZipper: - conn = newConnection(f.Name, clientType, nil, stream) + conn = newConnection(f.Name, f.ClientID, clientType, f.SourceID(), nil, stream, f.ObserveDataTags) default: // unknown client type + s.connector.Remove(connID) logger.Errorf("%sClientType=%# x, ilegal!", ServerLogPrefix, f.ClientType) c.CloseWithError(yerr.ErrorCodeUnknownClient, "Unknown ClientType, illegal!") return errors.New("core.server: Unknown ClientType, illegal") } s.connector.Add(connID, conn) - logger.Printf("%s❤️ <%s> [%s](%s) is connected!", ServerLogPrefix, clientType, f.Name, connID) + logger.Printf("%s❤️ <%s> [%s][%s](%s) is connected!", ServerLogPrefix, clientType, f.Name, clientID, connID) return nil } @@ -415,16 +416,16 @@ func (s *Server) handleBackflowFrame(c *Context) error { f := c.Frame.(*frame.DataFrame) tag := f.GetDataTag() carriage := f.GetCarriage() + sourceID := f.SourceID() // write to source with BackflowFrame bf := frame.NewBackflowFrame(tag, carriage) - sourceIDs := s.connector.GetSourceConnIDs(tag) - for _, sourceID := range sourceIDs { + sourceConns := s.connector.GetSourceConns(sourceID, tag) + for _, source := range sourceConns { // get source's quic.Stream - source := s.connector.Get(sourceID) + // source := s.connector.Get(sourceID) if source != nil { logger.Debugf("%s♻️ handleBackflowFrame tag:%#v --> source:%s, result=%# x", ServerLogPrefix, tag, sourceID, frame.Shortly(carriage)) - _, err := source.Write(bf.Encode()) - if err != nil { + if err := source.Write(bf); err != nil { logger.Errorf("%s♻️ handleBackflowFrame tag:%#v --> source:%s, error=%v", ServerLogPrefix, tag, sourceID, err) return err } diff --git a/metadata.go b/metadata.go index 5962b9fc7..999a70ac0 100644 --- a/metadata.go +++ b/metadata.go @@ -5,10 +5,13 @@ import ( "github.com/yomorun/yomo/core/frame" ) -type metadata struct{} +type metadata struct { + // sourceID string +} func (m *metadata) Encode() []byte { return nil + // return []byte(m.sourceID) } type metadataBuilder struct { @@ -22,6 +25,7 @@ func newMetadataBuilder() core.MetadataBuilder { } func (builder *metadataBuilder) Build(f *frame.HandshakeFrame) (core.Metadata, error) { + // builder.m.sourceID = f.ClientID return builder.m, nil } diff --git a/pkg/id/id.go b/pkg/id/id.go new file mode 100644 index 000000000..a1edeab1d --- /dev/null +++ b/pkg/id/id.go @@ -0,0 +1,16 @@ +package id + +import ( + gonanoid "github.com/matoous/go-nanoid/v2" + "github.com/yomorun/yomo/pkg/logger" +) + +// New generate id +func New() string { + id, err := gonanoid.New() + if err != nil { + logger.Errorf("generated id err=%v", err) + return "" + } + return id +} From 10d21324db20326e15f61619af975c833979d83a Mon Sep 17 00:00:00 2001 From: venjiang Date: Thu, 26 May 2022 23:47:49 +0800 Subject: [PATCH 13/18] refactor backflow --- core/connection.go | 7 +++++++ core/connector.go | 16 +--------------- core/server.go | 7 +++---- example/0-basic/sfn2/main.go | 4 +++- example/0-basic/source/main.go | 2 +- sfn.go | 2 ++ 6 files changed, 17 insertions(+), 21 deletions(-) diff --git a/core/connection.go b/core/connection.go index 1735043bb..57f744875 100644 --- a/core/connection.go +++ b/core/connection.go @@ -13,6 +13,8 @@ type Connection interface { // Name returns the name of the connection, which is set by clients Name() string + // ClientID connection client ID + ClientID() string // ClientType returns the type of the client (Source | SFN | UpstreamZipper) ClientType() ClientType // Metadata returns the extra info of the application @@ -78,3 +80,8 @@ func (c *connection) Write(f frame.Frame) error { func (c *connection) ObserveDataTags() []byte { return c.observed } + +// ClientID connection client ID +func (c *connection) ClientID() string { + return c.clientID +} diff --git a/core/connector.go b/core/connector.go index e961a852e..a1329fec2 100644 --- a/core/connector.go +++ b/core/connector.go @@ -57,26 +57,12 @@ func (c *connector) Get(connID string) Connection { // GetSourceConns gets the source connection by tag. func (c *connector) GetSourceConns(sourceID string, tag byte) []Connection { - // connIDs := make([]string, 0) - - // c.sources.Range(func(key interface{}, val interface{}) bool { - // app := val.(*app) - // for _, v := range app.observed { - // if v == tag { - // connIDs = append(connIDs, key.(string)) - // // break - // } - // } - // return true - // }) - - // return connection list conns := make([]Connection, 0) c.conns.Range(func(key interface{}, val interface{}) bool { conn := val.(Connection) for _, v := range conn.ObserveDataTags() { - if v == tag { + if v == tag && conn.ClientType() == ClientTypeSource && conn.ClientID() == sourceID { conns = append(conns, conn) } } diff --git a/core/server.go b/core/server.go index faf997bc1..eeb752802 100644 --- a/core/server.go +++ b/core/server.go @@ -266,10 +266,10 @@ func (s *Server) mainFrameHandler(c *Context) error { if conn != nil && conn.ClientType() == ClientTypeSource { f := c.Frame.(*frame.DataFrame) f.GetMetaFrame().SetMetadata(conn.Metadata().Encode()) - // observe datatags backflow - s.handleBackflowFrame(c) s.dispatchToDownstreams(f) } + // observe datatags backflow + s.handleBackflowFrame(c) } default: logger.Errorf("%serr=%v, frame=%v", ServerLogPrefix, err, c.Frame.Encode()) @@ -420,9 +420,8 @@ func (s *Server) handleBackflowFrame(c *Context) error { // write to source with BackflowFrame bf := frame.NewBackflowFrame(tag, carriage) sourceConns := s.connector.GetSourceConns(sourceID, tag) + // logger.Printf("%s♻️ handleBackflowFrame tag:%#v --> source:%s, result=%s", ServerLogPrefix, tag, sourceID, carriage) for _, source := range sourceConns { - // get source's quic.Stream - // source := s.connector.Get(sourceID) if source != nil { logger.Debugf("%s♻️ handleBackflowFrame tag:%#v --> source:%s, result=%# x", ServerLogPrefix, tag, sourceID, frame.Shortly(carriage)) if err := source.Write(bf); err != nil { diff --git a/example/0-basic/sfn2/main.go b/example/0-basic/sfn2/main.go index 2d6c30582..8b6ebbd66 100644 --- a/example/0-basic/sfn2/main.go +++ b/example/0-basic/sfn2/main.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "os" "github.com/yomorun/yomo" @@ -39,5 +40,6 @@ func main() { func handler(data []byte) (byte, []byte) { logger.Printf(">> [sfn2] got tag=0x34, data=%s", string(data)) - return 0x0, []byte("sfn2 processed result") + result := bytes.ReplaceAll(data, []byte("sfn1"), []byte("sfn2")) + return 0x0, result } diff --git a/example/0-basic/source/main.go b/example/0-basic/source/main.go index f67227898..fb14f816f 100644 --- a/example/0-basic/source/main.go +++ b/example/0-basic/source/main.go @@ -92,7 +92,7 @@ func generateAndSendData(stream yomo.Source) error { // // stream.WriteFrame(goawayFrame) // } - time.Sleep(500 * time.Millisecond) + time.Sleep(1000 * time.Millisecond) } } diff --git a/sfn.go b/sfn.go index 9846df99d..5ca0489cb 100644 --- a/sfn.go +++ b/sfn.go @@ -157,6 +157,8 @@ func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) { frame := frame.NewDataFrame() // reuse transactionID frame.SetTransactionID(metaFrame.TransactionID()) + // reuse sourceID + frame.SetSourceID(metaFrame.SourceID()) frame.SetCarriage(tag, resp) s.client.WriteFrame(frame) } From 91c22a4568940288bdc5751033aa012aa3dd26f0 Mon Sep 17 00:00:00 2001 From: venjiang Date: Sat, 28 May 2022 00:05:25 +0800 Subject: [PATCH 14/18] clean --- core/frame/meta_frame.go | 5 ----- core/server.go | 3 ++- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/core/frame/meta_frame.go b/core/frame/meta_frame.go index 3925c6b2a..b0790591f 100644 --- a/core/frame/meta_frame.go +++ b/core/frame/meta_frame.go @@ -66,11 +66,6 @@ func (m *MetaFrame) Encode() []byte { sourceID := y3.NewPrimitivePacketEncoder(byte(TagOfSourceID)) sourceID.SetStringValue(m.sourceID) meta.AddPrimitivePacket(sourceID) - // if m.metadata != nil { - // metadata := y3.NewPrimitivePacketEncoder(byte(TagOfMetadata)) - // metadata.SetBytesValue(m.metadata) - // meta.AddPrimitivePacket(metadata) - // } return meta.Encode() } diff --git a/core/server.go b/core/server.go index eeb752802..bc511024f 100644 --- a/core/server.go +++ b/core/server.go @@ -420,7 +420,8 @@ func (s *Server) handleBackflowFrame(c *Context) error { // write to source with BackflowFrame bf := frame.NewBackflowFrame(tag, carriage) sourceConns := s.connector.GetSourceConns(sourceID, tag) - // logger.Printf("%s♻️ handleBackflowFrame tag:%#v --> source:%s, result=%s", ServerLogPrefix, tag, sourceID, carriage) + // conn := s.connector.Get(c.connID) + // logger.Printf("%s♻️ handleBackflowFrame %s tag:%#v --> source:%s, result=%s", ServerLogPrefix, conn.ClientType(), tag, sourceID, carriage) for _, source := range sourceConns { if source != nil { logger.Debugf("%s♻️ handleBackflowFrame tag:%#v --> source:%s, result=%# x", ServerLogPrefix, tag, sourceID, frame.Shortly(carriage)) From c983c5285a07f6e35f0eb433b1a3fafa30dc3aee Mon Sep 17 00:00:00 2001 From: venjiang Date: Wed, 1 Jun 2022 16:01:23 +0800 Subject: [PATCH 15/18] add client id on reconnect --- core/client.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/client.go b/core/client.go index c14b7e403..a266a2536 100644 --- a/core/client.go +++ b/core/client.go @@ -371,10 +371,10 @@ func (c *Client) reconnect(ctx context.Context, addr string) { return case <-t.C: if c.getState() == ConnStateDisconnected { - c.logger.Printf("%s[%s](%s) is reconnecting to YoMo-Zipper %s...", ClientLogPrefix, c.name, c.localAddr, addr) + c.logger.Printf("%s[%s][%s](%s) is reconnecting to YoMo-Zipper %s...", ClientLogPrefix, c.name, c.clientID, c.localAddr, addr) err := c.connect(ctx, addr) if err != nil { - c.logger.Errorf("%s[%s](%s) reconnect error:%v", ClientLogPrefix, c.name, c.localAddr, err) + c.logger.Errorf("%s[%s][%s](%s) reconnect error:%v", ClientLogPrefix, c.name, c.clientID, c.localAddr, err) } } } @@ -466,6 +466,7 @@ func (c *Client) SetErrorHandler(fn func(err error)) { }() } } + // ClientID return the client ID func (c *Client) ClientID() string { return c.clientID From 44f0e201c01ff8e44deda4d7929725804c1e5b53 Mon Sep 17 00:00:00 2001 From: venjiang Date: Wed, 1 Jun 2022 16:53:09 +0800 Subject: [PATCH 16/18] remove source id field from handshake frame --- core/client.go | 2 -- core/connection.go | 4 +--- core/connector.go | 21 --------------------- core/frame/handshake_frame.go | 16 ---------------- core/server.go | 6 +++--- source.go | 9 +-------- 6 files changed, 5 insertions(+), 53 deletions(-) diff --git a/core/client.go b/core/client.go index a266a2536..d22b4ac2f 100644 --- a/core/client.go +++ b/core/client.go @@ -121,8 +121,6 @@ func (c *Client) connect(ctx context.Context, addr string) error { c.opts.Credential.Name(), c.opts.Credential.Payload(), ) - // source ID - handshake.SetSourceID(c.clientID) err = c.WriteFrame(handshake) if err != nil { c.state = ConnStateRejected diff --git a/core/connection.go b/core/connection.go index 57f744875..54d69fc99 100644 --- a/core/connection.go +++ b/core/connection.go @@ -31,17 +31,15 @@ type connection struct { metadata Metadata stream io.ReadWriteCloser clientID string - sourceID string observed []byte // observed data tags mu sync.Mutex } -func newConnection(name string, clientID string, clientType ClientType, sourceID string, metadata Metadata, stream io.ReadWriteCloser, observed []byte) Connection { +func newConnection(name string, clientID string, clientType ClientType, metadata Metadata, stream io.ReadWriteCloser, observed []byte) Connection { return &connection{ name: name, clientID: clientID, clientType: clientType, - sourceID: sourceID, observed: observed, metadata: metadata, stream: stream, diff --git a/core/connector.go b/core/connector.go index a1329fec2..035d6d24b 100644 --- a/core/connector.go +++ b/core/connector.go @@ -20,8 +20,6 @@ type Connector interface { GetSnapshot() map[string]string // GetSourceConns gets the connections by source observe tags. GetSourceConns(sourceID string, tags byte) []Connection - // LinkSource links the source and connection. - // LinkSource(connID string, id string, name string, sourceID string, observed []byte) // Clean the connector. Clean() } @@ -72,19 +70,6 @@ func (c *connector) GetSourceConns(sourceID string, tag byte) []Connection { return conns } -// Write a Frame to a connection. -// func (c *connector) Write(f frame.Frame, toID string) error { -// targetStream := c.Get(toID) -// if targetStream == nil { -// logger.Warnf("%swill write to: [%s], target stream is nil", ServerLogPrefix, toID) -// return fmt.Errorf("target[%s] stream is nil", toID) -// } -// c.mu.Lock() -// _, err := targetStream.Write(f.Encode()) -// c.mu.Unlock() -// return err -// } - // GetSnapshot gets the snapshot of all connections. func (c *connector) GetSnapshot() map[string]string { result := make(map[string]string) @@ -97,12 +82,6 @@ func (c *connector) GetSnapshot() map[string]string { return result } -// LinkSource links the source and connection. -// func (c *connector) LinkSource(connID string, id string, name string, sourceID string, observed []byte) { -// logger.Debugf("%sconnector link source: connID[%s] --> source[%s]", ServerLogPrefix, connID, name) -// c.sources.Store(connID, &app{id, name, observed, sourceID}) -// } - // Clean the connector. func (c *connector) Clean() { c.conns = sync.Map{} diff --git a/core/frame/handshake_frame.go b/core/frame/handshake_frame.go index bb414b049..9154e5e29 100644 --- a/core/frame/handshake_frame.go +++ b/core/frame/handshake_frame.go @@ -17,8 +17,6 @@ type HandshakeFrame struct { // auth authName string authPayload string - // sourceID source client ID - sourceID string } // NewHandshakeFrame creates a new HandshakeFrame. @@ -33,16 +31,6 @@ func NewHandshakeFrame(name string, clientID string, clientType byte, observeDat } } -// SetSourceID set the source ID. -func (h *HandshakeFrame) SetSourceID(sourceID string) { - h.sourceID = sourceID -} - -// SourceID returns source ID -func (h *HandshakeFrame) SourceID() string { - return h.sourceID -} - // Type gets the type of Frame. func (h *HandshakeFrame) Type() Type { return TagOfHandshakeFrame @@ -75,10 +63,6 @@ func (h *HandshakeFrame) Encode() []byte { handshake.AddPrimitivePacket(observeDataTagsBlock) handshake.AddPrimitivePacket(authNameBlock) handshake.AddPrimitivePacket(authPayloadBlock) - // metaframe - // if h.metaFrame != nil { - // handshake.AddBytes(h.metaFrame.Encode()) - // } return handshake.Encode() } diff --git a/core/server.go b/core/server.go index 0557b231c..c244c7cf1 100644 --- a/core/server.go +++ b/core/server.go @@ -310,7 +310,7 @@ func (s *Server) handleHandshakeFrame(c *Context) error { if err != nil { return err } - conn = newConnection(f.Name, f.ClientID, clientType, f.SourceID(), metadata, stream, f.ObserveDataTags) + conn = newConnection(f.Name, f.ClientID, clientType, metadata, stream, f.ObserveDataTags) if clientType == ClientTypeStreamFunction { // route @@ -336,7 +336,7 @@ func (s *Server) handleHandshakeFrame(c *Context) error { } } case ClientTypeUpstreamZipper: - conn = newConnection(f.Name, f.ClientID, clientType, f.SourceID(), nil, stream, f.ObserveDataTags) + conn = newConnection(f.Name, f.ClientID, clientType, nil, stream, f.ObserveDataTags) default: // unknown client type s.connector.Remove(connID) @@ -428,7 +428,7 @@ func (s *Server) handleBackflowFrame(c *Context) error { bf := frame.NewBackflowFrame(tag, carriage) sourceConns := s.connector.GetSourceConns(sourceID, tag) // conn := s.connector.Get(c.connID) - // logger.Printf("%s♻️ handleBackflowFrame %s tag:%#v --> source:%s, result=%s", ServerLogPrefix, conn.ClientType(), tag, sourceID, carriage) + // logger.Printf("%s♻️ handleBackflowFrame tag:%#v --> source:%s, result=%s", ServerLogPrefix, tag, sourceID, carriage) for _, source := range sourceConns { if source != nil { logger.Debugf("%s♻️ handleBackflowFrame tag:%#v --> source:%s, result=%# x", ServerLogPrefix, tag, sourceID, frame.Shortly(carriage)) diff --git a/source.go b/source.go index ea14a5fc7..4883588a0 100644 --- a/source.go +++ b/source.go @@ -25,8 +25,6 @@ type Source interface { WriteWithTag(tag uint8, data []byte) error // SetErrorHandler set the error handler function when server error occurs SetErrorHandler(fn func(err error)) - // WriteFrame writes a frame to the connection - WriteFrame(frm frame.Frame) error // [Experimental] SetReceiveHandler set the observe handler function SetReceiveHandler(fn func(tag byte, data []byte)) } @@ -97,7 +95,7 @@ func (s *yomoSource) WriteWithTag(tag uint8, data []byte) error { f.SetSourceID(s.client.ClientID()) s.client.Logger().Debugf("%sWriteWithTag: tid=%s, source_id=%s, data[%d]=%# x", sourceLogPrefix, f.TransactionID(), f.SourceID(), len(data), frame.Shortly(data)) - return s.WriteFrame(f) + return s.client.WriteFrame(f) } // SetErrorHandler set the error handler function when server error occurs @@ -105,11 +103,6 @@ func (s *yomoSource) SetErrorHandler(fn func(err error)) { s.client.SetErrorHandler(fn) } -// WriteFrame writes a frame to the connection -func (s *yomoSource) WriteFrame(frm frame.Frame) error { - return s.client.WriteFrame(frm) -} - // [Experimental] SetReceiveHandler set the observe handler function func (s *yomoSource) SetReceiveHandler(fn func(byte, []byte)) { s.fn = fn From a7836bae4211b8d6cd0998fdeb37f4ce47935ecb Mon Sep 17 00:00:00 2001 From: venjiang Date: Wed, 1 Jun 2022 17:00:25 +0800 Subject: [PATCH 17/18] handshake frame test --- core/frame/handshake_frame_test.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/core/frame/handshake_frame_test.go b/core/frame/handshake_frame_test.go index 17d9a8abd..480ddf51c 100644 --- a/core/frame/handshake_frame_test.go +++ b/core/frame/handshake_frame_test.go @@ -10,10 +10,8 @@ func TestHandshakeFrameEncode(t *testing.T) { expectedName := "1234" var expectedType byte = 0xD3 m := NewHandshakeFrame(expectedName, "", expectedType, []byte{0x01, 0x02}, "token", "a") - m.metaFrame.SetTransactionID("1234") - m.metaFrame.SetSourceID("1") assert.Equal(t, []byte{ - 0x80 | byte(TagOfHandshakeFrame), 0x17 + 13, + 0x80 | byte(TagOfHandshakeFrame), 0x19, byte(TagOfHandshakeName), 0x04, 0x31, 0x32, 0x33, 0x34, byte(TagOfHandshakeID), 0x0, byte(TagOfHandshakeType), 0x01, 0xD3, @@ -21,8 +19,6 @@ func TestHandshakeFrameEncode(t *testing.T) { // byte(TagOfHandshakeAppID), 0x0, byte(TagOfHandshakeAuthName), 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, byte(TagOfHandshakeAuthPayload), 0x01, 0x61, - // metaframe, see meta_frame_test - 0xaf, 0x09, 0x01, 0x04, 0x31, 0x32, 0x33, 0x34, 0x03, 0x01, 0x31, }, m.Encode(), ) @@ -31,5 +27,4 @@ func TestHandshakeFrameEncode(t *testing.T) { assert.NoError(t, err) assert.EqualValues(t, expectedName, Handshake.Name) assert.EqualValues(t, expectedType, Handshake.ClientType) - assert.EqualValues(t, "1", Handshake.SourceID()) } From 9a1849cd276af22b4f779b01961e114c8a3f56e6 Mon Sep 17 00:00:00 2001 From: venjiang Date: Wed, 1 Jun 2022 17:19:16 +0800 Subject: [PATCH 18/18] clean up unused code --- metadata.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/metadata.go b/metadata.go index 999a70ac0..5962b9fc7 100644 --- a/metadata.go +++ b/metadata.go @@ -5,13 +5,10 @@ import ( "github.com/yomorun/yomo/core/frame" ) -type metadata struct { - // sourceID string -} +type metadata struct{} func (m *metadata) Encode() []byte { return nil - // return []byte(m.sourceID) } type metadataBuilder struct { @@ -25,7 +22,6 @@ func newMetadataBuilder() core.MetadataBuilder { } func (builder *metadataBuilder) Build(f *frame.HandshakeFrame) (core.Metadata, error) { - // builder.m.sourceID = f.ClientID return builder.m, nil }