From 4e7f3502a3ea6d12ed1ff4c58e11e5aba193d37c Mon Sep 17 00:00:00 2001 From: wujunzhuo Date: Thu, 12 May 2022 17:09:04 +0800 Subject: [PATCH 01/14] refactor: support customized extensions for allegro and advanced developers --- app_info.go | 22 ++++++ core/app_info.go | 12 +++ core/connection.go | 63 +++++++++++++++ core/connector.go | 174 +++-------------------------------------- core/router.go | 14 ++-- core/server.go | 159 ++++++++++++++++--------------------- core/server_options.go | 8 -- core/store/memory.go | 31 -------- core/store/store.go | 8 -- router.go | 113 +++++++++++++------------- zipper.go | 5 +- 11 files changed, 241 insertions(+), 368 deletions(-) create mode 100644 app_info.go create mode 100644 core/app_info.go create mode 100644 core/connection.go delete mode 100644 core/store/memory.go delete mode 100644 core/store/store.go diff --git a/app_info.go b/app_info.go new file mode 100644 index 000000000..b15aedfac --- /dev/null +++ b/app_info.go @@ -0,0 +1,22 @@ +package yomo + +import ( + "github.com/yomorun/yomo/core" + "github.com/yomorun/yomo/core/frame" +) + +type appInfo struct{} + +func (a *appInfo) Key() string { + return "" +} + +type appInfoBuilder struct{} + +func newAppInfoBuilder() core.AppInfoBuilder { + return &appInfoBuilder{} +} + +func (a *appInfoBuilder) Build(f *frame.HandshakeFrame) (core.AppInfo, error) { + return &appInfo{}, nil +} diff --git a/core/app_info.go b/core/app_info.go new file mode 100644 index 000000000..08fff1a58 --- /dev/null +++ b/core/app_info.go @@ -0,0 +1,12 @@ +package core + +import "github.com/yomorun/yomo/core/frame" + +// AppInfo is used for customizing extensions of an application. +type AppInfo interface { + Key() string +} + +type AppInfoBuilder interface { + Build(f *frame.HandshakeFrame) (AppInfo, error) +} diff --git a/core/connection.go b/core/connection.go new file mode 100644 index 000000000..add76de50 --- /dev/null +++ b/core/connection.go @@ -0,0 +1,63 @@ +package core + +import ( + "io" + "sync" + + "github.com/yomorun/yomo/core/frame" +) + +type Connection interface { + io.Closer + ConnID() string + Name() string + GetClientType() ClientType + GetAppInfo() AppInfo + Write(f frame.Frame) error +} + +type connection struct { + connID string + name string + clientType ClientType + appInfo AppInfo + stream io.ReadWriteCloser + mu sync.Mutex +} + +func NewConnection(connID string, name string, clientType ClientType, appInfo AppInfo, stream io.ReadWriteCloser) Connection { + return &connection{ + connID: connID, + name: name, + clientType: clientType, + appInfo: appInfo, + stream: stream, + } +} + +func (c *connection) Write(f frame.Frame) error { + c.mu.Lock() + defer c.mu.Unlock() + _, err := c.stream.Write(f.Encode()) + return err +} + +func (c *connection) Close() error { + return c.stream.Close() +} + +func (c *connection) ConnID() string { + return c.connID +} + +func (c *connection) Name() string { + return c.name +} + +func (c *connection) GetClientType() ClientType { + return c.clientType +} + +func (c *connection) GetAppInfo() AppInfo { + return c.appInfo +} diff --git a/core/connector.go b/core/connector.go index 245d7a4c6..221d1f2d4 100644 --- a/core/connector.go +++ b/core/connector.go @@ -1,217 +1,67 @@ package core import ( - "fmt" - "io" - "math/rand" "sync" - "github.com/yomorun/yomo/core/frame" "github.com/yomorun/yomo/pkg/logger" ) -type app struct { - name string // app name - observed []byte // data tags -} - -// func (a *app) ID() string { -// return a.id -// } - -func (a *app) Name() string { - return a.name -} - var _ Connector = &connector{} // Connector is a interface to manage the connections and applications. type Connector interface { // Add a connection. - Add(connID string, stream io.ReadWriteCloser) + Add(connID string, conn Connection) // Remove a connection. Remove(connID string) // Get a connection by connection id. - Get(connID string) io.ReadWriteCloser - // GetConnIDs gets the connection ids by name and tag. - GetConnIDs(name string, tags byte) []string - // Write a Frame to a connection. - Write(f frame.Frame, toID string) error + Get(connID string) Connection // GetSnapshot gets the snapshot of all connections. - GetSnapshot() map[string]io.ReadWriteCloser - - // App gets the app by connID. - App(connID string) (*app, bool) - // AppID gets the ID of app by connID. - // AppID(connID string) (string, bool) - // AppName gets the name of app by connID. - AppName(connID string) (string, bool) - // LinkApp links the app and connection. - LinkApp(connID string, name string, observed []byte) - // UnlinkApp removes the app by connID. - UnlinkApp(connID string, name string) - // ExistsApp check app exists - ExistsApp(name string) bool - + GetSnapshot() map[string]Connection // Clean the connector. Clean() } type connector struct { conns sync.Map - apps sync.Map - mu sync.Mutex } func newConnector() Connector { - return &connector{ - conns: sync.Map{}, - apps: sync.Map{}, - mu: sync.Mutex{}, - } + return &connector{conns: sync.Map{}} } // Add a connection. -func (c *connector) Add(connID string, stream io.ReadWriteCloser) { +func (c *connector) Add(connID string, conn Connection) { logger.Debugf("%sconnector add: connID=%s", ServerLogPrefix, connID) - c.conns.Store(connID, stream) + c.conns.Store(connID, conn) } // Remove a connection. func (c *connector) Remove(connID string) { logger.Debugf("%sconnector remove: connID=%s", ServerLogPrefix, connID) c.conns.Delete(connID) - // c.funcs.Delete(connID) - c.apps.Delete(connID) } // Get a connection by connection id. -func (c *connector) Get(connID string) io.ReadWriteCloser { +func (c *connector) Get(connID string) Connection { logger.Debugf("%sconnector get connection: connID=%s", ServerLogPrefix, connID) - if stream, ok := c.conns.Load(connID); ok { - return stream.(io.ReadWriteCloser) + if conn, ok := c.conns.Load(connID); ok { + return conn.(Connection) } return nil } -// App gets the app by connID. -func (c *connector) App(connID string) (*app, bool) { - if result, found := c.apps.Load(connID); found { - app, ok := result.(*app) - if ok { - logger.Debugf("%sconnector get app=%s, connID=%s", ServerLogPrefix, app.name, connID) - return app, true - } - logger.Warnf("%sconnector get app convert fails, connID=%s", ServerLogPrefix, connID) - return nil, false - } - logger.Warnf("%sconnector get app is nil, connID=%s", ServerLogPrefix, connID) - return nil, false -} - -// AppName gets the name of app by connID. -func (c *connector) AppName(connID string) (string, bool) { - if app, ok := c.App(connID); ok { - return app.name, true - } - return "", false -} - -// GetConnIDs gets the connection ids by name and tag. -func (c *connector) GetConnIDs(name string, tag byte) []string { - connIDs := make([]string, 0) - - c.apps.Range(func(key interface{}, val interface{}) bool { - app := val.(*app) - if app.name == name { - for _, v := range app.observed { - if v == tag { - connIDs = append(connIDs, key.(string)) - break - } - } - } - return true - }) - - if n := len(connIDs); n > 1 { - index := rand.Intn(n) - return connIDs[index : index+1] - } - - return connIDs -} - -// Write a DataFrame to a connection. -func (c *connector) Write(f frame.Frame, toID string) error { - targetStream := c.Get(toID) - if targetStream == nil { - logger.Warnf("%swill write to: [%s], target stream is nil", ServerLogPrefix, toID) - return fmt.Errorf("target[%s] stream is nil", toID) - } - c.mu.Lock() - _, err := targetStream.Write(f.Encode()) - c.mu.Unlock() - return err -} - // GetSnapshot gets the snapshot of all connections. -func (c *connector) GetSnapshot() map[string]io.ReadWriteCloser { - result := make(map[string]io.ReadWriteCloser) +func (c *connector) GetSnapshot() map[string]Connection { + result := make(map[string]Connection) c.conns.Range(func(key interface{}, val interface{}) bool { - result[key.(string)] = val.(io.ReadWriteCloser) + result[key.(string)] = val.(Connection) return true }) return result } -// LinkApp links the app and connection. -func (c *connector) LinkApp(connID string, name string, observed []byte) { - logger.Debugf("%sconnector link application: connID[%s] --> app[%s]", ServerLogPrefix, connID, name) - c.apps.Store(connID, &app{name, observed}) -} - -// UnlinkApp removes the app by connID. -func (c *connector) UnlinkApp(connID string, name string) { - logger.Debugf("%sconnector unlink application: connID[%s] x-> app[%s]", ServerLogPrefix, connID, name) - c.apps.Delete(connID) -} - -// ExistsApp check app exists -func (c *connector) ExistsApp(name string) bool { - var found bool - c.apps.Range(func(key interface{}, val interface{}) bool { - app := val.(*app) - if app.name == name { - found = true - return false - } - return true - }) - - return found -} - -// func (c *connector) RemoveApp(appID string) { -// logger.Debugf("%sconnector unlink application: connID[%s] x-> app[%s]", ServerLogPrefix, connID, appID) -// c.apps.Range(func(key interface{},val interface{})bool{ -// return true -// }) -// c.rapps.Delete(appID) -// } - -// func (c *connector) AppConns(appID string) []string { -// conns := make([]string, 0) -// c.apps.Range(func(key interface{},val interface{})bool{ -// if val.(string)==appID{ -// conns=append(conns,key.(string)) -// } -// }) -// return conns -// } - // Clean the connector. func (c *connector) Clean() { c.conns = sync.Map{} - c.apps = sync.Map{} } diff --git a/core/router.go b/core/router.go index 7ab37707f..261727af7 100644 --- a/core/router.go +++ b/core/router.go @@ -3,17 +3,17 @@ package core // Router is the interface to manage the routes for applications. type Router interface { // Route gets the route - Route() Route + Route(info AppInfo) Route // Clean the routes. Clean() } -// Route is the interface for route. +// Route manages data subscribers according to their observed data tags. type Route interface { // Add a route. - Add(index int, name string) - // GetForwardRoutes returns all the forward routes from current node. - GetForwardRoutes(current string) []string - // Exists indicates whether the route exists or not. - Exists(name string) bool + Add(connID string, name string, observeDataTags []byte) error + // Remove a route. + Remove(connID string) error + // GetForwardRoutes returns all the subscribers by the given data tag. + GetForwardRoutes(tag byte) []string } diff --git a/core/server.go b/core/server.go index 26ed52249..ee99a1357 100644 --- a/core/server.go +++ b/core/server.go @@ -6,13 +6,11 @@ import ( "fmt" "io" "net" - "reflect" "sync" "sync/atomic" "github.com/lucas-clemente/quic-go" "github.com/yomorun/yomo/core/frame" - "github.com/yomorun/yomo/core/store" "github.com/yomorun/yomo/core/yerr" // authentication implements, Currently, only token authentication is implemented @@ -32,11 +30,11 @@ type FrameHandler func(c *Context) error // Server is the underlining server of Zipper type Server struct { - name string - // stream quic.Stream + name string state string connector Connector router Router + appInfoBuilder AppInfoBuilder counterOfDataFrame int64 downstreams map[string]*Client mu sync.Mutex @@ -118,13 +116,10 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { if err != nil { // if client close the connection, then we should close the connection // @CC: when Source close the connection, it won't affect connectors - appName, ok := s.connector.AppName(connID) - if ok { + if conn := s.connector.Get(connID); conn != nil { // connector s.connector.Remove(connID) - // store - // remove app route from store? let me think... - logger.Printf("%s💔 [%s](%s) close the connection", ServerLogPrefix, appName, connID) + logger.Printf("%s💔 [%s](%s) close the connection", ServerLogPrefix, conn.Name(), connID) } else { logger.Errorf("%s❤️3/ [unknown](%s) on stream %v", ServerLogPrefix, connID, err) } @@ -160,10 +155,6 @@ func (s *Server) Close() error { if s.connector != nil { s.connector.Clean() } - // store - if s.opts.Store != nil { - s.opts.Store.Clean() - } return nil } @@ -259,7 +250,10 @@ func (s *Server) mainFrameHandler(c *Context) error { if err := s.handleDataFrame(c); err != nil { c.CloseWithError(yerr.ErrorCodeData, fmt.Sprintf("handleDataFrame err: %v", err)) } else { - s.dispatchToDownstreams(c.Frame.(*frame.DataFrame)) + conn := s.connector.Get(c.connID) + if conn != nil && conn.GetClientType() == ClientTypeSource { + s.dispatchToDownstreams(c.Frame.(*frame.DataFrame)) + } } default: logger.Errorf("%serr=%v, frame=%v", ServerLogPrefix, err, c.Frame.Encode()) @@ -274,7 +268,6 @@ func (s *Server) handleHandshakeFrame(c *Context) error { logger.Debugf("%sGOT ❤️ HandshakeFrame : %# x", ServerLogPrefix, f) // basic info connID := c.ConnID() - name := f.Name clientType := ClientType(f.ClientType) stream := c.Stream // credential @@ -283,68 +276,49 @@ func (s *Server) handleHandshakeFrame(c *Context) error { if !s.authenticate(f) { err := fmt.Errorf("handshake authentication fails, client credential name is %s", authName(f.AuthName())) // return err - logger.Debugf("%s🔑 <%s> [%s](%s) is connected!", ServerLogPrefix, clientType, name, connID) + logger.Debugf("%s🔑 <%s> [%s](%s) is connected!", ServerLogPrefix, clientType, f.Name, connID) rejectedFrame := frame.NewRejectedFrame(err.Error()) if _, err = stream.Write(rejectedFrame.Encode()); err != nil { - logger.Debugf("%s🔑 write to <%s> [%s](%s) RejectedFrame error:%v", ServerLogPrefix, clientType, name, connID, err) + logger.Debugf("%s🔑 write to <%s> [%s](%s) RejectedFrame error:%v", ServerLogPrefix, clientType, f.Name, connID, err) return err } return nil } + // appInfo + if err := s.validateAppInfoBuilder(); err != nil { + return err + } + appInfo, err := s.appInfoBuilder.Build(f) + if err != nil { + return err + } + // route if err := s.validateRouter(); err != nil { return err } - route := s.router.Route() - if reflect.ValueOf(route).IsNil() { + route := s.router.Route(appInfo) + if route == nil { err := errors.New("handleHandshakeFrame route is nil") return err } - // store - s.opts.Store.Set(name, route) // client type + conn := NewConnection(connID, f.Name, clientType, appInfo, stream) switch clientType { - case ClientTypeSource: - s.connector.Add(connID, stream) - s.connector.LinkApp(connID, name, nil) + case ClientTypeSource, ClientTypeUpstreamZipper: + s.connector.Add(connID, conn) case ClientTypeStreamFunction: - // when sfn connect, it will provide its name to the server. server will check if this client - // has permission connected to. - if !route.Exists(name) { - // unexpected client connected, close the connection - s.connector.Remove(connID) - // SFN: stream function - err := fmt.Errorf("handshake router validation faild, illegal SFN[%s]", f.Name) - c.CloseWithError(yerr.ErrorCodeRejected, err.Error()) - // break - return err - } - // check app exists in connection list - // if s.connector.ExistsApp(name) { - // err := fmt.Errorf("SFN[%s] connection already exists", f.Name) - // c.CloseWithError(0xCC, err.Error()) - // return err - // } - // check app exists in connection list - // logger.Printf("%sSFN[%s] write GoawayFrame to client", ServerLogPrefix, f.Name) - if s.connector.ExistsApp(name) { + if err := route.Add(connID, f.Name, f.ObserveDataTags); err != nil { logger.Debugf("%swrite to SFN[%s] GoawayFrame", ServerLogPrefix, f.Name) - err := fmt.Errorf("SFN[%s] connection already exists", f.Name) goawayFrame := frame.NewGoawayFrame(err.Error()) if _, err = stream.Write(goawayFrame.Encode()); err != nil { logger.Errorf("%s⛔️ write to SFN[%s] GoawayFrame error:%v", ServerLogPrefix, f.Name, err) return err } - // c.CloseWithError(goawayFrame.Code(), err.Error()) } - s.connector.Add(connID, stream) - // link connection to stream function - s.connector.LinkApp(connID, name, f.ObserveDataTags) - case ClientTypeUpstreamZipper: - s.connector.Add(connID, stream) - s.connector.LinkApp(connID, name, nil) + s.connector.Add(connID, conn) default: // unknown client type s.connector.Remove(connID) @@ -352,7 +326,7 @@ func (s *Server) handleHandshakeFrame(c *Context) error { c.CloseWithError(yerr.ErrorCodeUnknownClient, "Unknown ClientType, illegal!") return errors.New("core.server: Unknown ClientType, illegal") } - logger.Printf("%s❤️ <%s> [%s](%s) is connected!", ServerLogPrefix, clientType, name, connID) + logger.Printf("%s❤️ <%s> [%s](%s) is connected!", ServerLogPrefix, clientType, f.Name, connID) return nil } @@ -377,48 +351,48 @@ func (s *Server) handleDataFrame(c *Context) error { atomic.AddInt64(&s.counterOfDataFrame, 1) // currentIssuer := f.GetIssuer() fromID := c.ConnID() - from, ok := s.connector.AppName(fromID) - if !ok { - logger.Warnf("%shandleDataFrame have connection[%s], but not have function", ServerLogPrefix, fromID) - return nil + from := s.connector.Get(fromID) + if from == nil { + logger.Warnf("%shandleDataFrame connector cannot find %s", ServerLogPrefix, fromID) + return fmt.Errorf("handleDataFrame connector cannot find %s", fromID) } f := c.Frame.(*frame.DataFrame) // route - name, _ := s.connector.AppName(fromID) - cacheRoute, ok := s.opts.Store.Get(name) - if !ok { - err := fmt.Errorf("get route failure, appName=%s, connID=%s", name, fromID) - logger.Errorf("%shandleDataFrame %s", ServerLogPrefix, err.Error()) + if err := s.validateRouter(); err != nil { return err } - route := cacheRoute.(Route) + route := s.router.Route(from.GetAppInfo()) if route == nil { logger.Warnf("%shandleDataFrame route is nil", ServerLogPrefix) return fmt.Errorf("handleDataFrame route is nil") } - // get stream function names from route - routes := route.GetForwardRoutes(from) - for _, to := range routes { - toIDs := s.connector.GetConnIDs(to, f.GetDataTag()) - for _, toID := range toIDs { - logger.Debugf("%shandleDataFrame tag=%#x tid=%s, counter=%d, from=[%s](%s), to=[%s](%s)", ServerLogPrefix, f.Tag(), f.TransactionID(), s.counterOfDataFrame, from, fromID, to, toID) - - // write data frame to stream - logger.Infof("%swrite data: [%s](%s) --> [%s](%s)", ServerLogPrefix, from, fromID, to, toID) - if err := s.connector.Write(f, toID); err != nil { - logger.Errorf("%swrite data: [%s](%s) --> [%s](%s), err=%v", ServerLogPrefix, from, fromID, to, toID, err) - continue - } + + // get stream function connection ids from route + connIDs := route.GetForwardRoutes(f.GetDataTag()) + for _, toID := range connIDs { + conn := s.connector.Get(toID) + if conn == nil { + logger.Errorf("%sconn is nil: (%s)", ServerLogPrefix, toID) + continue + } + + to := conn.Name() + logger.Debugf("%shandleDataFrame tag=%#x tid=%s, counter=%d, from=[%s](%s), to=[%s](%s)", ServerLogPrefix, f.Tag(), f.TransactionID(), s.counterOfDataFrame, from.Name(), fromID, to, toID) + + // write data frame to stream + logger.Infof("%swrite data: [%s](%s) --> [%s](%s)", ServerLogPrefix, from, fromID, to, toID) + if err := conn.Write(f); err != nil { + logger.Errorf("%swrite data: [%s](%s) --> [%s](%s), err=%v", ServerLogPrefix, from, fromID, to, toID, err) + continue } } return nil } // StatsFunctions returns the sfn stats of server. -// func (s *Server) StatsFunctions() map[string][]*quic.Stream { -func (s *Server) StatsFunctions() map[string]io.ReadWriteCloser { +func (s *Server) StatsFunctions() map[string]Connection { return s.connector.GetSnapshot() } @@ -440,12 +414,18 @@ func (s *Server) Downstreams() map[string]*Client { // return nil // } -func (s *Server) ConfigRouter(router Router) error { +func (s *Server) ConfigRouter(router Router) { s.mu.Lock() s.router = router logger.Debugf("%sconfig router is %#v", ServerLogPrefix, router) s.mu.Unlock() - return nil +} + +func (s *Server) ConfigAppInfoBuilder(builder AppInfoBuilder) { + s.mu.Lock() + s.appInfoBuilder = builder + logger.Debugf("%sconfig appInfoBuilder is %#v", ServerLogPrefix, builder) + s.mu.Unlock() } func (s *Server) Router() Router { @@ -477,14 +457,6 @@ func GetConnID(conn quic.Connection) string { func (s *Server) initOptions() { // defaults - // store - if s.opts.Store == nil { - s.opts.Store = store.NewMemoryStore() - } - // auth - // if s.opts.Auths == nil { - // s.opts.Auths = append(s.opts.Auths, auth.NewNoneAuth()) - // } } func (s *Server) validateRouter() error { @@ -494,6 +466,13 @@ func (s *Server) validateRouter() error { return nil } +func (s *Server) validateAppInfoBuilder() error { + if s.appInfoBuilder == nil { + return errors.New("server's appInfoBuilder is nil") + } + return nil +} + func (s *Server) Options() ServerOptions { return s.opts } @@ -502,10 +481,6 @@ func (s *Server) Connector() Connector { return s.connector } -func (s *Server) Store() store.Store { - return s.opts.Store -} - func (s *Server) SetBeforeHandlers(handlers ...FrameHandler) { s.beforeHandlers = append(s.beforeHandlers, handlers...) } diff --git a/core/server_options.go b/core/server_options.go index 726aa30e7..bce8a7bfd 100644 --- a/core/server_options.go +++ b/core/server_options.go @@ -6,7 +6,6 @@ import ( "github.com/lucas-clemente/quic-go" "github.com/yomorun/yomo/core/auth" - "github.com/yomorun/yomo/core/store" ) type ServerOptions struct { @@ -14,7 +13,6 @@ type ServerOptions struct { TLSConfig *tls.Config Addr string Auths []auth.Authentication - Store store.Store Conn net.PacketConn } @@ -34,12 +32,6 @@ func WithAuth(name string, args ...string) ServerOption { } } -func WithStore(store store.Store) ServerOption { - return func(o *ServerOptions) { - o.Store = store - } -} - func WithServerTLSConfig(tc *tls.Config) ServerOption { return func(o *ServerOptions) { o.TLSConfig = tc diff --git a/core/store/memory.go b/core/store/memory.go deleted file mode 100644 index 7a1e7aa11..000000000 --- a/core/store/memory.go +++ /dev/null @@ -1,31 +0,0 @@ -package store - -import ( - "sync" -) - -type MemoryStore struct { - m sync.Map -} - -func NewMemoryStore() *MemoryStore { - return &MemoryStore{ - m: sync.Map{}, - } -} - -func (s *MemoryStore) Set(key interface{}, val interface{}) { - s.m.Store(key, val) -} - -func (s *MemoryStore) Get(key interface{}) (interface{}, bool) { - return s.m.Load(key) -} - -func (s *MemoryStore) Remove(key interface{}) { - s.m.Delete(key) -} - -func (s *MemoryStore) Clean() { - s.m = sync.Map{} -} diff --git a/core/store/store.go b/core/store/store.go deleted file mode 100644 index afa14e3f5..000000000 --- a/core/store/store.go +++ /dev/null @@ -1,8 +0,0 @@ -package store - -type Store interface { - Set(key interface{}, val interface{}) - Get(key interface{}) (interface{}, bool) - Remove(key interface{}) - Clean() -} diff --git a/router.go b/router.go index 37b621b22..78f73eba4 100644 --- a/router.go +++ b/router.go @@ -1,93 +1,90 @@ package yomo import ( + "fmt" "sync" "github.com/yomorun/yomo/core" "github.com/yomorun/yomo/pkg/config" - "github.com/yomorun/yomo/pkg/logger" ) -// router type router struct { - config *config.WorkflowConfig + r *route } -func newRouter(config *config.WorkflowConfig) core.Router { - return &router{config: config} +func newRouter(functions []config.App) core.Router { + return &router{r: newRoute(functions)} } -// router interface -func (r *router) Route() core.Route { - logger.Debugf("%sworkflowconfig is %#v", zipperLogPrefix, r.config) - return newRoute(r.config) +func (r *router) Route(info core.AppInfo) core.Route { + return r.r } func (r *router) Clean() { - r.config = nil + r.r = nil } -// route interface type route struct { - data sync.Map + functions []config.App + data map[byte]map[string]struct{} + mu sync.RWMutex } -func newRoute(config *config.WorkflowConfig) *route { - if config == nil { - logger.Errorf("%sworkflowconfig is nil", zipperLogPrefix) - return nil +func newRoute(functions []config.App) *route { + return &route{ + functions: functions, + data: make(map[byte]map[string]struct{}), } - r := route{ - data: sync.Map{}, +} + +func (r *route) Add(connID string, name string, observeDataTags []byte) error { + r.mu.Lock() + defer r.mu.Unlock() + + ok := false + for _, v := range r.functions { + if v.Name == name { + ok = true + break + } } - logger.Debugf("%sworkflowconfig %+v", zipperLogPrefix, *config) - for i, app := range config.Functions { - r.Add(i, app.Name) + if !ok { + return fmt.Errorf("SFN name %s does not exist in config functions", name) } - return &r -} - -func (r *route) Add(index int, name string) { - if r.Exists(name) { - logger.Warnf("%sapp[%s] already exists in workflow and will not be added", zipperLogPrefix, name) - return + for _, tag := range observeDataTags { + conns := r.data[tag] + if conns == nil { + conns = make(map[string]struct{}) + r.data[tag] = conns + } + r.data[tag][connID] = struct{}{} } - logger.Debugf("%sroute add: %s", zipperLogPrefix, name) - r.data.Store(index, name) + + return nil } -func (r *route) Exists(name string) bool { - var ok bool - logger.Debugf("%srouter[%v] exists name: %s", zipperLogPrefix, r, name) - r.data.Range(func(key interface{}, val interface{}) bool { - if val.(string) == name { - ok = true - return false - } - return true - }) +func (r *route) Remove(connID string) error { + r.mu.Lock() + defer r.mu.Unlock() + + for _, conns := range r.data { + delete(conns, connID) + } - return ok + return nil } -func (r *route) GetForwardRoutes(current string) []string { - idx := -1 - r.data.Range(func(key interface{}, val interface{}) bool { - if val.(string) == current { - idx = key.(int) - return false - } - return true - }) +func (r *route) GetForwardRoutes(tag byte) []string { + r.mu.RLock() + defer r.mu.RUnlock() - routes := make([]string, 0) - r.data.Range(func(key interface{}, val interface{}) bool { - if key.(int) > idx { - routes = append(routes, val.(string)) + if conns := r.data[tag]; conns != nil { + keys := make([]string, 0, len(conns)) + for k := range conns { + keys = append(keys, k) } - return true - }) - - return routes + return keys + } + return make([]string, 0) } diff --git a/zipper.go b/zipper.go index 1876f1fff..02434355d 100644 --- a/zipper.go +++ b/zipper.go @@ -135,8 +135,9 @@ func (z *zipper) ConfigWorkflow(conf string) error { func (z *zipper) configWorkflow(config *config.WorkflowConfig) error { z.wfc = config - // router - return z.server.ConfigRouter(newRouter(config)) + z.server.ConfigAppInfoBuilder(newAppInfoBuilder()) + z.server.ConfigRouter(newRouter(config.Functions)) + return nil } func (z *zipper) ConfigMesh(url string) error { From 0b2b4d773bc364a6bd939644252bb095df0f1482 Mon Sep 17 00:00:00 2001 From: wujunzhuo Date: Fri, 13 May 2022 10:35:49 +0800 Subject: [PATCH 02/14] add comments --- app_info.go | 2 +- core/app_info.go | 5 ++++- core/connection.go | 42 +++++++++++++++++++++++------------------- core/server.go | 22 +++++----------------- 4 files changed, 33 insertions(+), 38 deletions(-) diff --git a/app_info.go b/app_info.go index b15aedfac..9c97560a0 100644 --- a/app_info.go +++ b/app_info.go @@ -17,6 +17,6 @@ func newAppInfoBuilder() core.AppInfoBuilder { return &appInfoBuilder{} } -func (a *appInfoBuilder) Build(f *frame.HandshakeFrame) (core.AppInfo, error) { +func (a *appInfoBuilder) Build(f frame.Frame) (core.AppInfo, error) { return &appInfo{}, nil } diff --git a/core/app_info.go b/core/app_info.go index 08fff1a58..5d05b3b57 100644 --- a/core/app_info.go +++ b/core/app_info.go @@ -4,9 +4,12 @@ import "github.com/yomorun/yomo/core/frame" // AppInfo is used for customizing extensions of an application. type AppInfo interface { + // Key must be globally unique between applications Key() string } +// AppInfoBuilder is the builder for AppInfo type AppInfoBuilder interface { - Build(f *frame.HandshakeFrame) (AppInfo, error) + // Build will return an AppInfo instance according to the frame passed in + Build(f frame.Frame) (AppInfo, error) } diff --git a/core/connection.go b/core/connection.go index add76de50..331e31123 100644 --- a/core/connection.go +++ b/core/connection.go @@ -7,17 +7,21 @@ import ( "github.com/yomorun/yomo/core/frame" ) +// Connection wraps the specific io connections (typically quic.Connection) to transfer y3 frames type Connection interface { io.Closer - ConnID() string + + // Name returns the name of the connection, which is set by clients Name() string - GetClientType() ClientType - GetAppInfo() AppInfo + // ClientType returns the type of the client (Source | SFN | UpstreamZipper) + ClientType() ClientType + // AppInfo returns the extra application info + AppInfo() AppInfo + // Write should goroutine-safely send y3 frames to peer side Write(f frame.Frame) error } type connection struct { - connID string name string clientType ClientType appInfo AppInfo @@ -25,9 +29,8 @@ type connection struct { mu sync.Mutex } -func NewConnection(connID string, name string, clientType ClientType, appInfo AppInfo, stream io.ReadWriteCloser) Connection { +func newConnection(name string, clientType ClientType, appInfo AppInfo, stream io.ReadWriteCloser) Connection { return &connection{ - connID: connID, name: name, clientType: clientType, appInfo: appInfo, @@ -35,29 +38,30 @@ func NewConnection(connID string, name string, clientType ClientType, appInfo Ap } } -func (c *connection) Write(f frame.Frame) error { - c.mu.Lock() - defer c.mu.Unlock() - _, err := c.stream.Write(f.Encode()) - return err -} - +// Close implements io.Close interface func (c *connection) Close() error { return c.stream.Close() } -func (c *connection) ConnID() string { - return c.connID -} - +// Name returns the name of the connection, which is set by clients func (c *connection) Name() string { return c.name } -func (c *connection) GetClientType() ClientType { +// ClientType returns the type of the connection (Source | SFN | UpstreamZipper) +func (c *connection) ClientType() ClientType { return c.clientType } -func (c *connection) GetAppInfo() AppInfo { +// AppInfo returns the extra application info +func (c *connection) AppInfo() AppInfo { return c.appInfo } + +// Write should goroutine-safely send y3 frames to peer side +func (c *connection) Write(f frame.Frame) error { + c.mu.Lock() + defer c.mu.Unlock() + _, err := c.stream.Write(f.Encode()) + return err +} diff --git a/core/server.go b/core/server.go index ee99a1357..ba26fae3b 100644 --- a/core/server.go +++ b/core/server.go @@ -251,7 +251,7 @@ func (s *Server) mainFrameHandler(c *Context) error { c.CloseWithError(yerr.ErrorCodeData, fmt.Sprintf("handleDataFrame err: %v", err)) } else { conn := s.connector.Get(c.connID) - if conn != nil && conn.GetClientType() == ClientTypeSource { + if conn != nil && conn.ClientType() == ClientTypeSource { s.dispatchToDownstreams(c.Frame.(*frame.DataFrame)) } } @@ -305,7 +305,7 @@ func (s *Server) handleHandshakeFrame(c *Context) error { } // client type - conn := NewConnection(connID, f.Name, clientType, appInfo, stream) + conn := newConnection(f.Name, clientType, appInfo, stream) switch clientType { case ClientTypeSource, ClientTypeUpstreamZipper: s.connector.Add(connID, conn) @@ -363,7 +363,7 @@ func (s *Server) handleDataFrame(c *Context) error { if err := s.validateRouter(); err != nil { return err } - route := s.router.Route(from.GetAppInfo()) + route := s.router.Route(from.AppInfo()) if route == nil { logger.Warnf("%shandleDataFrame route is nil", ServerLogPrefix) return fmt.Errorf("handleDataFrame route is nil") @@ -406,14 +406,7 @@ func (s *Server) Downstreams() map[string]*Client { return s.downstreams } -// AddWorkflow register sfn to this server. -// func (s *Server) AddWorkflow(wfs ...Workflow) error { -// for _, wf := range wfs { -// s.router.Add(wf.Seq, wf.Name) -// } -// return nil -// } - +// ConfigRouter is used to set router by zipper func (s *Server) ConfigRouter(router Router) { s.mu.Lock() s.router = router @@ -421,6 +414,7 @@ func (s *Server) ConfigRouter(router Router) { s.mu.Unlock() } +// ConfigAppInfoBuilder is used to set appInfoBuilder by zipper func (s *Server) ConfigAppInfoBuilder(builder AppInfoBuilder) { s.mu.Lock() s.appInfoBuilder = builder @@ -428,12 +422,6 @@ func (s *Server) ConfigAppInfoBuilder(builder AppInfoBuilder) { s.mu.Unlock() } -func (s *Server) Router() Router { - s.mu.Lock() - defer s.mu.Unlock() - return s.router -} - // AddDownstreamServer add a downstream server to this server. all the DataFrames will be // dispatch to all the downstreams. func (s *Server) AddDownstreamServer(addr string, c *Client) { From 8a12374e3cf09b6c1420207dd9b3b56686dce9eb Mon Sep 17 00:00:00 2001 From: wujunzhuo Date: Thu, 19 May 2022 12:13:11 +0800 Subject: [PATCH 03/14] support appInfo when handling DataFrame --- core/frame/frame.go | 12 ++++++------ core/frame/meta_frame.go | 42 ++++++++++++++++++++++++++++++---------- core/server.go | 14 +++++++++++++- go.mod | 1 + go.sum | 2 ++ 5 files changed, 54 insertions(+), 17 deletions(-) diff --git a/core/frame/frame.go b/core/frame/frame.go index f97db283b..15a8e784f 100644 --- a/core/frame/frame.go +++ b/core/frame/frame.go @@ -16,7 +16,7 @@ const ( TagOfMetaFrame Type = 0x2F TagOfMetadata Type = 0x03 TagOfTransactionID Type = 0x01 - TagOfIssuer Type = 0x02 + TagOfExtInfo Type = 0x02 // PayloadFrame of DataFrame TagOfPayloadFrame Type = 0x2E @@ -31,13 +31,13 @@ const ( TagOfHandshakeAuthPayload Type = 0x05 TagOfHandshakeObserveDataTags Type = 0x06 - TagOfPingFrame Type = 0x3C - TagOfPongFrame Type = 0x3B - TagOfAcceptedFrame Type = 0x3A - TagOfRejectedFrame Type = 0x39 + TagOfPingFrame Type = 0x3C + TagOfPongFrame Type = 0x3B + TagOfAcceptedFrame Type = 0x3A + TagOfRejectedFrame Type = 0x39 TagOfRejectedMessage Type = 0x02 // GoawayFrame - TagOfGoawayFrame Type = 0x30 + TagOfGoawayFrame Type = 0x30 TagOfGoawayCode Type = 0x01 TagOfGoawayMessage Type = 0x02 ) diff --git a/core/frame/meta_frame.go b/core/frame/meta_frame.go index a5a75379b..cabb11a31 100644 --- a/core/frame/meta_frame.go +++ b/core/frame/meta_frame.go @@ -1,22 +1,21 @@ package frame import ( - "strconv" - "time" - + "github.com/google/uuid" "github.com/yomorun/y3" ) // MetaFrame is a Y3 encoded bytes, SeqID is a fixed value of TYPE_ID_TRANSACTION. // used for describes metadata for a DataFrame. type MetaFrame struct { - tid string + tid string + extInfo []byte } // NewMetaFrame creates a new MetaFrame instance. func NewMetaFrame() *MetaFrame { return &MetaFrame{ - tid: strconv.FormatInt(time.Now().Unix(), 10), + tid: uuid.NewString(), } } @@ -30,14 +29,28 @@ func (m *MetaFrame) TransactionID() string { return m.tid } +// SetExtInfo set the extended information. +func (m *MetaFrame) SetExtInfo(extInfo []byte) { + m.extInfo = extInfo +} + +// TransactionID returns the extended information +func (m *MetaFrame) ExtInfo() []byte { + return m.extInfo +} + // Encode implements Frame.Encode method. func (m *MetaFrame) Encode() []byte { meta := y3.NewNodePacketEncoder(byte(TagOfMetaFrame)) transactionID := y3.NewPrimitivePacketEncoder(byte(TagOfTransactionID)) transactionID.SetStringValue(m.tid) - meta.AddPrimitivePacket(transactionID) + + extInfo := y3.NewPrimitivePacketEncoder(byte(TagOfExtInfo)) + extInfo.SetBytesValue(m.extInfo) + meta.AddPrimitivePacket(extInfo) + return meta.Encode() } @@ -50,10 +63,19 @@ func DecodeToMetaFrame(buf []byte) (*MetaFrame, error) { } meta := &MetaFrame{} - for _, v := range nodeBlock.PrimitivePackets { - val, _ := v.ToUTF8String() - meta.tid = val - break + for k, v := range nodeBlock.PrimitivePackets { + switch k { + case byte(TagOfTransactionID): + val, err := v.ToUTF8String() + if err != nil { + return nil, err + } + meta.tid = val + break + case byte(TagOfExtInfo): + meta.extInfo = v.ToBytes() + break + } } return meta, nil diff --git a/core/server.go b/core/server.go index ba26fae3b..7a0b10df7 100644 --- a/core/server.go +++ b/core/server.go @@ -359,11 +359,23 @@ func (s *Server) handleDataFrame(c *Context) error { f := c.Frame.(*frame.DataFrame) + appInfo := from.AppInfo() + if appInfo == nil { + err := s.validateAppInfoBuilder() + if err != nil { + return err + } + appInfo, err = s.appInfoBuilder.Build(f) + if err != nil { + return err + } + } + // route if err := s.validateRouter(); err != nil { return err } - route := s.router.Route(from.AppInfo()) + route := s.router.Route(appInfo) if route == nil { logger.Warnf("%shandleDataFrame route is nil", ServerLogPrefix) return fmt.Errorf("handleDataFrame route is nil") diff --git a/go.mod b/go.mod index a0296cc08..f2434e39c 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/cenkalti/backoff/v4 v4.1.3 github.com/emirpasic/gods v1.15.0 // indirect github.com/fsnotify/fsnotify v1.5.1 // indirect + github.com/google/uuid v1.3.0 github.com/lucas-clemente/quic-go v0.27.0 github.com/onsi/ginkgo v1.16.5 // indirect github.com/reactivex/rxgo/v2 v2.5.0 diff --git a/go.sum b/go.sum index 5fc1beb0c..1f80e449f 100644 --- a/go.sum +++ b/go.sum @@ -69,6 +69,8 @@ github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+u github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY= github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= From 16b192b119026dfdf9c1a10923a6644007eab1aa Mon Sep 17 00:00:00 2001 From: wujunzhuo Date: Thu, 19 May 2022 16:14:04 +0800 Subject: [PATCH 04/14] fix comment --- core/frame/meta_frame.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/frame/meta_frame.go b/core/frame/meta_frame.go index cabb11a31..64dbf8f14 100644 --- a/core/frame/meta_frame.go +++ b/core/frame/meta_frame.go @@ -34,7 +34,7 @@ func (m *MetaFrame) SetExtInfo(extInfo []byte) { m.extInfo = extInfo } -// TransactionID returns the extended information +// ExtInfo returns the extended information func (m *MetaFrame) ExtInfo() []byte { return m.extInfo } From e7d28505405d904c7ba92131b08706de12e75de7 Mon Sep 17 00:00:00 2001 From: wujunzhuo Date: Thu, 19 May 2022 16:22:06 +0800 Subject: [PATCH 05/14] fix unit test --- core/frame/meta_frame.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/frame/meta_frame.go b/core/frame/meta_frame.go index 64dbf8f14..7a7452864 100644 --- a/core/frame/meta_frame.go +++ b/core/frame/meta_frame.go @@ -47,9 +47,11 @@ func (m *MetaFrame) Encode() []byte { transactionID.SetStringValue(m.tid) meta.AddPrimitivePacket(transactionID) - extInfo := y3.NewPrimitivePacketEncoder(byte(TagOfExtInfo)) - extInfo.SetBytesValue(m.extInfo) - meta.AddPrimitivePacket(extInfo) + if m.extInfo != nil { + extInfo := y3.NewPrimitivePacketEncoder(byte(TagOfExtInfo)) + extInfo.SetBytesValue(m.extInfo) + meta.AddPrimitivePacket(extInfo) + } return meta.Encode() } From 597a36bd680d0cf2321a12b9bea582b79cdfc839 Mon Sep 17 00:00:00 2001 From: wujunzhuo Date: Thu, 19 May 2022 17:36:58 +0800 Subject: [PATCH 06/14] AppInfo enc-dec --- app_info.go | 20 ++++++++++++++++---- core/app_info.go | 6 +++++- core/frame/frame.go | 2 +- core/frame/meta_frame.go | 26 +++++++++++++------------- core/server.go | 8 +++++--- 5 files changed, 40 insertions(+), 22 deletions(-) diff --git a/app_info.go b/app_info.go index 9c97560a0..2a3a7a176 100644 --- a/app_info.go +++ b/app_info.go @@ -11,12 +11,24 @@ func (a *appInfo) Key() string { return "" } -type appInfoBuilder struct{} +func (a *appInfo) Encode() []byte { + return nil +} + +type appInfoBuilder struct { + a *appInfo +} func newAppInfoBuilder() core.AppInfoBuilder { - return &appInfoBuilder{} + return &appInfoBuilder{ + a: &appInfo{}, + } +} + +func (builder *appInfoBuilder) Build(f *frame.HandshakeFrame) (core.AppInfo, error) { + return builder.a, nil } -func (a *appInfoBuilder) Build(f frame.Frame) (core.AppInfo, error) { - return &appInfo{}, nil +func (builder *appInfoBuilder) Decode(buf []byte) (core.AppInfo, error) { + return builder.a, nil } diff --git a/core/app_info.go b/core/app_info.go index 5d05b3b57..fa3256eb2 100644 --- a/core/app_info.go +++ b/core/app_info.go @@ -6,10 +6,14 @@ import "github.com/yomorun/yomo/core/frame" type AppInfo interface { // Key must be globally unique between applications Key() string + // Encode is the serializer method + Encode() []byte } // AppInfoBuilder is the builder for AppInfo type AppInfoBuilder interface { // Build will return an AppInfo instance according to the frame passed in - Build(f frame.Frame) (AppInfo, error) + Build(f *frame.HandshakeFrame) (AppInfo, error) + // Decode is the deserializer method + Decode(buf []byte) (AppInfo, error) } diff --git a/core/frame/frame.go b/core/frame/frame.go index 15a8e784f..6bb9c91db 100644 --- a/core/frame/frame.go +++ b/core/frame/frame.go @@ -16,7 +16,7 @@ const ( TagOfMetaFrame Type = 0x2F TagOfMetadata Type = 0x03 TagOfTransactionID Type = 0x01 - TagOfExtInfo Type = 0x02 + TagOfAppInfo Type = 0x02 // PayloadFrame of DataFrame TagOfPayloadFrame Type = 0x2E diff --git a/core/frame/meta_frame.go b/core/frame/meta_frame.go index 7a7452864..be5365942 100644 --- a/core/frame/meta_frame.go +++ b/core/frame/meta_frame.go @@ -9,7 +9,7 @@ import ( // used for describes metadata for a DataFrame. type MetaFrame struct { tid string - extInfo []byte + appInfo []byte } // NewMetaFrame creates a new MetaFrame instance. @@ -29,14 +29,14 @@ func (m *MetaFrame) TransactionID() string { return m.tid } -// SetExtInfo set the extended information. -func (m *MetaFrame) SetExtInfo(extInfo []byte) { - m.extInfo = extInfo +// SetAppInfo set the extra application information. +func (m *MetaFrame) SetAppInfo(appInfo []byte) { + m.appInfo = appInfo } -// ExtInfo returns the extended information -func (m *MetaFrame) ExtInfo() []byte { - return m.extInfo +// AppInfo returns the extra application information. +func (m *MetaFrame) AppInfo() []byte { + return m.appInfo } // Encode implements Frame.Encode method. @@ -47,10 +47,10 @@ func (m *MetaFrame) Encode() []byte { transactionID.SetStringValue(m.tid) meta.AddPrimitivePacket(transactionID) - if m.extInfo != nil { - extInfo := y3.NewPrimitivePacketEncoder(byte(TagOfExtInfo)) - extInfo.SetBytesValue(m.extInfo) - meta.AddPrimitivePacket(extInfo) + if m.appInfo != nil { + appInfo := y3.NewPrimitivePacketEncoder(byte(TagOfAppInfo)) + appInfo.SetBytesValue(m.appInfo) + meta.AddPrimitivePacket(appInfo) } return meta.Encode() @@ -74,8 +74,8 @@ func DecodeToMetaFrame(buf []byte) (*MetaFrame, error) { } meta.tid = val break - case byte(TagOfExtInfo): - meta.extInfo = v.ToBytes() + case byte(TagOfAppInfo): + meta.appInfo = v.ToBytes() break } } diff --git a/core/server.go b/core/server.go index 7a0b10df7..69623257d 100644 --- a/core/server.go +++ b/core/server.go @@ -252,7 +252,9 @@ func (s *Server) mainFrameHandler(c *Context) error { } else { conn := s.connector.Get(c.connID) if conn != nil && conn.ClientType() == ClientTypeSource { - s.dispatchToDownstreams(c.Frame.(*frame.DataFrame)) + f := c.Frame.(*frame.DataFrame) + f.GetMetaFrame().SetAppInfo(conn.AppInfo().Encode()) + s.dispatchToDownstreams(f) } } default: @@ -360,12 +362,12 @@ func (s *Server) handleDataFrame(c *Context) error { f := c.Frame.(*frame.DataFrame) appInfo := from.AppInfo() - if appInfo == nil { + if appInfo == nil && from.ClientType() == ClientTypeUpstreamZipper { err := s.validateAppInfoBuilder() if err != nil { return err } - appInfo, err = s.appInfoBuilder.Build(f) + appInfo, err = s.appInfoBuilder.Decode(f.GetMetaFrame().AppInfo()) if err != nil { return err } From ad9e8894311a08c9ab6ffece9d91aec0fd6fee4c Mon Sep 17 00:00:00 2001 From: wujunzhuo Date: Thu, 19 May 2022 17:46:09 +0800 Subject: [PATCH 07/14] remove redundant code --- app_info.go | 4 ---- core/app_info.go | 6 ++---- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/app_info.go b/app_info.go index 2a3a7a176..b0d72fd75 100644 --- a/app_info.go +++ b/app_info.go @@ -7,10 +7,6 @@ import ( type appInfo struct{} -func (a *appInfo) Key() string { - return "" -} - func (a *appInfo) Encode() []byte { return nil } diff --git a/core/app_info.go b/core/app_info.go index fa3256eb2..2d5998bf5 100644 --- a/core/app_info.go +++ b/core/app_info.go @@ -4,9 +4,7 @@ import "github.com/yomorun/yomo/core/frame" // AppInfo is used for customizing extensions of an application. type AppInfo interface { - // Key must be globally unique between applications - Key() string - // Encode is the serializer method + // Encode is the serialize method Encode() []byte } @@ -14,6 +12,6 @@ type AppInfo interface { type AppInfoBuilder interface { // Build will return an AppInfo instance according to the frame passed in Build(f *frame.HandshakeFrame) (AppInfo, error) - // Decode is the deserializer method + // Decode is the deserialize method Decode(buf []byte) (AppInfo, error) } From 5d1c9a638b5187a4f6822d054ec000860f6a0c68 Mon Sep 17 00:00:00 2001 From: wujunzhuo Date: Sat, 21 May 2022 09:58:25 +0800 Subject: [PATCH 08/14] minor refactor --- router.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/router.go b/router.go index 78f73eba4..dcfc86ba6 100644 --- a/router.go +++ b/router.go @@ -79,12 +79,11 @@ func (r *route) GetForwardRoutes(tag byte) []string { r.mu.RLock() defer r.mu.RUnlock() + var keys []string if conns := r.data[tag]; conns != nil { - keys := make([]string, 0, len(conns)) for k := range conns { keys = append(keys, k) } - return keys } - return make([]string, 0) + return keys } From 7a068783ac2dcae57ec8a8a9fd8eefffb7334627 Mon Sep 17 00:00:00 2001 From: wujunzhuo Date: Sun, 22 May 2022 15:49:21 +0800 Subject: [PATCH 09/14] check sfn name --- router.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/router.go b/router.go index dcfc86ba6..764a423d3 100644 --- a/router.go +++ b/router.go @@ -26,14 +26,14 @@ func (r *router) Clean() { type route struct { functions []config.App - data map[byte]map[string]struct{} + data map[byte]map[string]string mu sync.RWMutex } func newRoute(functions []config.App) *route { return &route{ functions: functions, - data: make(map[byte]map[string]struct{}), + data: make(map[byte]map[string]string), } } @@ -49,16 +49,24 @@ func (r *route) Add(connID string, name string, observeDataTags []byte) error { } } if !ok { - return fmt.Errorf("SFN name %s does not exist in config functions", name) + return fmt.Errorf("SFN[%s] does not exist in config functions", name) + } + + for _, conns := range r.data { + for _, n := range conns { + if n == name { + return fmt.Errorf("SFN[%s] is already linked to another connection", name) + } + } } for _, tag := range observeDataTags { conns := r.data[tag] if conns == nil { - conns = make(map[string]struct{}) + conns = make(map[string]string) r.data[tag] = conns } - r.data[tag][connID] = struct{}{} + r.data[tag][connID] = name } return nil From 0691cc40000123737c2296f0faecf0f2d184e6a5 Mon Sep 17 00:00:00 2001 From: wujunzhuo Date: Sun, 22 May 2022 15:55:21 +0800 Subject: [PATCH 10/14] revert interface nil if statement --- core/server.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/server.go b/core/server.go index 69623257d..3a1cc4a13 100644 --- a/core/server.go +++ b/core/server.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net" + "reflect" "sync" "sync/atomic" @@ -301,7 +302,7 @@ func (s *Server) handleHandshakeFrame(c *Context) error { return err } route := s.router.Route(appInfo) - if route == nil { + if reflect.ValueOf(route).IsNil() { err := errors.New("handleHandshakeFrame route is nil") return err } @@ -378,7 +379,7 @@ func (s *Server) handleDataFrame(c *Context) error { return err } route := s.router.Route(appInfo) - if route == nil { + if reflect.ValueOf(route).IsNil() { logger.Warnf("%shandleDataFrame route is nil", ServerLogPrefix) return fmt.Errorf("handleDataFrame route is nil") } From 8ec1ccecc1ca69ef88e9b39caeacf90a3970ea93 Mon Sep 17 00:00:00 2001 From: wujunzhuo Date: Mon, 23 May 2022 10:07:22 +0800 Subject: [PATCH 11/14] rename AppInfo to MetaData --- app_info.go | 30 ------------------------------ core/app_info.go | 17 ----------------- core/connection.go | 16 ++++++++-------- core/frame/frame.go | 2 +- core/frame/meta_frame.go | 36 +++++++++++++++++++----------------- core/meta_data.go | 17 +++++++++++++++++ core/router.go | 2 +- core/server.go | 38 +++++++++++++++++++------------------- go.mod | 2 +- go.sum | 7 +++++-- meta_data.go | 30 ++++++++++++++++++++++++++++++ router.go | 2 +- zipper.go | 2 +- 13 files changed, 103 insertions(+), 98 deletions(-) delete mode 100644 app_info.go delete mode 100644 core/app_info.go create mode 100644 core/meta_data.go create mode 100644 meta_data.go diff --git a/app_info.go b/app_info.go deleted file mode 100644 index b0d72fd75..000000000 --- a/app_info.go +++ /dev/null @@ -1,30 +0,0 @@ -package yomo - -import ( - "github.com/yomorun/yomo/core" - "github.com/yomorun/yomo/core/frame" -) - -type appInfo struct{} - -func (a *appInfo) Encode() []byte { - return nil -} - -type appInfoBuilder struct { - a *appInfo -} - -func newAppInfoBuilder() core.AppInfoBuilder { - return &appInfoBuilder{ - a: &appInfo{}, - } -} - -func (builder *appInfoBuilder) Build(f *frame.HandshakeFrame) (core.AppInfo, error) { - return builder.a, nil -} - -func (builder *appInfoBuilder) Decode(buf []byte) (core.AppInfo, error) { - return builder.a, nil -} diff --git a/core/app_info.go b/core/app_info.go deleted file mode 100644 index 2d5998bf5..000000000 --- a/core/app_info.go +++ /dev/null @@ -1,17 +0,0 @@ -package core - -import "github.com/yomorun/yomo/core/frame" - -// AppInfo is used for customizing extensions of an application. -type AppInfo interface { - // Encode is the serialize method - Encode() []byte -} - -// AppInfoBuilder is the builder for AppInfo -type AppInfoBuilder interface { - // Build will return an AppInfo instance according to the frame passed in - Build(f *frame.HandshakeFrame) (AppInfo, error) - // Decode is the deserialize method - Decode(buf []byte) (AppInfo, error) -} diff --git a/core/connection.go b/core/connection.go index 331e31123..d38e9e892 100644 --- a/core/connection.go +++ b/core/connection.go @@ -15,8 +15,8 @@ type Connection interface { Name() string // ClientType returns the type of the client (Source | SFN | UpstreamZipper) ClientType() ClientType - // AppInfo returns the extra application info - AppInfo() AppInfo + // MetaData returns the extra info of the application + MetaData() MetaData // Write should goroutine-safely send y3 frames to peer side Write(f frame.Frame) error } @@ -24,16 +24,16 @@ type Connection interface { type connection struct { name string clientType ClientType - appInfo AppInfo + metaData MetaData stream io.ReadWriteCloser mu sync.Mutex } -func newConnection(name string, clientType ClientType, appInfo AppInfo, stream io.ReadWriteCloser) Connection { +func newConnection(name string, clientType ClientType, metaData MetaData, stream io.ReadWriteCloser) Connection { return &connection{ name: name, clientType: clientType, - appInfo: appInfo, + metaData: metaData, stream: stream, } } @@ -53,9 +53,9 @@ func (c *connection) ClientType() ClientType { return c.clientType } -// AppInfo returns the extra application info -func (c *connection) AppInfo() AppInfo { - return c.appInfo +// MetaData returns the extra info of the application +func (c *connection) MetaData() MetaData { + return c.metaData } // Write should goroutine-safely send y3 frames to peer side diff --git a/core/frame/frame.go b/core/frame/frame.go index 6bb9c91db..6772fa5fb 100644 --- a/core/frame/frame.go +++ b/core/frame/frame.go @@ -16,7 +16,7 @@ const ( TagOfMetaFrame Type = 0x2F TagOfMetadata Type = 0x03 TagOfTransactionID Type = 0x01 - TagOfAppInfo Type = 0x02 + TagOfMetaData Type = 0x02 // PayloadFrame of DataFrame TagOfPayloadFrame Type = 0x2E diff --git a/core/frame/meta_frame.go b/core/frame/meta_frame.go index be5365942..3ea9aba35 100644 --- a/core/frame/meta_frame.go +++ b/core/frame/meta_frame.go @@ -1,22 +1,24 @@ package frame import ( - "github.com/google/uuid" + gonanoid "github.com/matoous/go-nanoid/v2" "github.com/yomorun/y3" ) // MetaFrame is a Y3 encoded bytes, SeqID is a fixed value of TYPE_ID_TRANSACTION. // used for describes metadata for a DataFrame. type MetaFrame struct { - tid string - appInfo []byte + tid string + data []byte } // NewMetaFrame creates a new MetaFrame instance. func NewMetaFrame() *MetaFrame { - return &MetaFrame{ - tid: uuid.NewString(), + tid, err := gonanoid.New() + if err != nil { + return nil } + return &MetaFrame{tid: tid} } // SetTransactinID set the transaction ID. @@ -29,14 +31,14 @@ func (m *MetaFrame) TransactionID() string { return m.tid } -// SetAppInfo set the extra application information. -func (m *MetaFrame) SetAppInfo(appInfo []byte) { - m.appInfo = appInfo +// SetMetaData set the extra info of the application +func (m *MetaFrame) SetMetaData(data []byte) { + m.data = data } -// AppInfo returns the extra application information. -func (m *MetaFrame) AppInfo() []byte { - return m.appInfo +// MetaData returns the extra info of the application +func (m *MetaFrame) MetaData() []byte { + return m.data } // Encode implements Frame.Encode method. @@ -47,10 +49,10 @@ func (m *MetaFrame) Encode() []byte { transactionID.SetStringValue(m.tid) meta.AddPrimitivePacket(transactionID) - if m.appInfo != nil { - appInfo := y3.NewPrimitivePacketEncoder(byte(TagOfAppInfo)) - appInfo.SetBytesValue(m.appInfo) - meta.AddPrimitivePacket(appInfo) + if m.data != nil { + data := y3.NewPrimitivePacketEncoder(byte(TagOfMetaData)) + data.SetBytesValue(m.data) + meta.AddPrimitivePacket(data) } return meta.Encode() @@ -74,8 +76,8 @@ func DecodeToMetaFrame(buf []byte) (*MetaFrame, error) { } meta.tid = val break - case byte(TagOfAppInfo): - meta.appInfo = v.ToBytes() + case byte(TagOfMetaData): + meta.data = v.ToBytes() break } } diff --git a/core/meta_data.go b/core/meta_data.go new file mode 100644 index 000000000..00b4e8679 --- /dev/null +++ b/core/meta_data.go @@ -0,0 +1,17 @@ +package core + +import "github.com/yomorun/yomo/core/frame" + +// MetaData is used for storing extra info of the application +type MetaData interface { + // Encode is the serialize method + Encode() []byte +} + +// MetaDataBuilder is the builder of MetaData +type MetaDataBuilder interface { + // Build will return an MetaData instance according to the handshake frame passed in + Build(f *frame.HandshakeFrame) (MetaData, error) + // Decode is the deserialize method + Decode(buf []byte) (MetaData, error) +} diff --git a/core/router.go b/core/router.go index 261727af7..7866ceb83 100644 --- a/core/router.go +++ b/core/router.go @@ -3,7 +3,7 @@ package core // Router is the interface to manage the routes for applications. type Router interface { // Route gets the route - Route(info AppInfo) Route + Route(metaData MetaData) Route // Clean the routes. Clean() } diff --git a/core/server.go b/core/server.go index 3a1cc4a13..bf222013f 100644 --- a/core/server.go +++ b/core/server.go @@ -35,7 +35,7 @@ type Server struct { state string connector Connector router Router - appInfoBuilder AppInfoBuilder + metaDataBuilder MetaDataBuilder counterOfDataFrame int64 downstreams map[string]*Client mu sync.Mutex @@ -254,7 +254,7 @@ func (s *Server) mainFrameHandler(c *Context) error { conn := s.connector.Get(c.connID) if conn != nil && conn.ClientType() == ClientTypeSource { f := c.Frame.(*frame.DataFrame) - f.GetMetaFrame().SetAppInfo(conn.AppInfo().Encode()) + f.GetMetaFrame().SetMetaData(conn.MetaData().Encode()) s.dispatchToDownstreams(f) } } @@ -288,11 +288,11 @@ func (s *Server) handleHandshakeFrame(c *Context) error { return nil } - // appInfo - if err := s.validateAppInfoBuilder(); err != nil { + // metaData + if err := s.validateMetaDataBuilder(); err != nil { return err } - appInfo, err := s.appInfoBuilder.Build(f) + metaData, err := s.metaDataBuilder.Build(f) if err != nil { return err } @@ -301,14 +301,14 @@ func (s *Server) handleHandshakeFrame(c *Context) error { if err := s.validateRouter(); err != nil { return err } - route := s.router.Route(appInfo) + route := s.router.Route(metaData) if reflect.ValueOf(route).IsNil() { err := errors.New("handleHandshakeFrame route is nil") return err } // client type - conn := newConnection(f.Name, clientType, appInfo, stream) + conn := newConnection(f.Name, clientType, metaData, stream) switch clientType { case ClientTypeSource, ClientTypeUpstreamZipper: s.connector.Add(connID, conn) @@ -362,13 +362,13 @@ func (s *Server) handleDataFrame(c *Context) error { f := c.Frame.(*frame.DataFrame) - appInfo := from.AppInfo() - if appInfo == nil && from.ClientType() == ClientTypeUpstreamZipper { - err := s.validateAppInfoBuilder() + metaData := from.MetaData() + if metaData == nil && from.ClientType() == ClientTypeUpstreamZipper { + err := s.validateMetaDataBuilder() if err != nil { return err } - appInfo, err = s.appInfoBuilder.Decode(f.GetMetaFrame().AppInfo()) + metaData, err = s.metaDataBuilder.Decode(f.GetMetaFrame().MetaData()) if err != nil { return err } @@ -378,7 +378,7 @@ func (s *Server) handleDataFrame(c *Context) error { if err := s.validateRouter(); err != nil { return err } - route := s.router.Route(appInfo) + route := s.router.Route(metaData) if reflect.ValueOf(route).IsNil() { logger.Warnf("%shandleDataFrame route is nil", ServerLogPrefix) return fmt.Errorf("handleDataFrame route is nil") @@ -429,11 +429,11 @@ func (s *Server) ConfigRouter(router Router) { s.mu.Unlock() } -// ConfigAppInfoBuilder is used to set appInfoBuilder by zipper -func (s *Server) ConfigAppInfoBuilder(builder AppInfoBuilder) { +// ConfigMetaDataBuilder is used to set metaDataBuilder by zipper +func (s *Server) ConfigMetaDataBuilder(builder MetaDataBuilder) { s.mu.Lock() - s.appInfoBuilder = builder - logger.Debugf("%sconfig appInfoBuilder is %#v", ServerLogPrefix, builder) + s.metaDataBuilder = builder + logger.Debugf("%sconfig metaDataBuilder is %#v", ServerLogPrefix, builder) s.mu.Unlock() } @@ -469,9 +469,9 @@ func (s *Server) validateRouter() error { return nil } -func (s *Server) validateAppInfoBuilder() error { - if s.appInfoBuilder == nil { - return errors.New("server's appInfoBuilder is nil") +func (s *Server) validateMetaDataBuilder() error { + if s.metaDataBuilder == nil { + return errors.New("server's metaDataBuilder is nil") } return nil } diff --git a/go.mod b/go.mod index f2434e39c..458296b4e 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ require ( github.com/cenkalti/backoff/v4 v4.1.3 github.com/emirpasic/gods v1.15.0 // indirect github.com/fsnotify/fsnotify v1.5.1 // indirect - github.com/google/uuid v1.3.0 github.com/lucas-clemente/quic-go v0.27.0 + github.com/matoous/go-nanoid/v2 v2.0.0 github.com/onsi/ginkgo v1.16.5 // indirect github.com/reactivex/rxgo/v2 v2.5.0 github.com/stretchr/objx v0.3.0 // indirect diff --git a/go.sum b/go.sum index 1f80e449f..4f6196776 100644 --- a/go.sum +++ b/go.sum @@ -69,8 +69,6 @@ github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+u github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY= github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= @@ -98,6 +96,10 @@ github.com/marten-seemann/qtls-go1-17 v0.1.1 h1:DQjHPq+aOzUeh9/lixAGunn6rIOQyWCh github.com/marten-seemann/qtls-go1-17 v0.1.1/go.mod h1:C2ekUKcDdz9SDWxec1N/MvcXBpaX9l3Nx67XaR84L5s= github.com/marten-seemann/qtls-go1-18 v0.1.1 h1:qp7p7XXUFL7fpBvSS1sWD+uSqPvzNQK43DH+/qEkj0Y= github.com/marten-seemann/qtls-go1-18 v0.1.1/go.mod h1:mJttiymBAByA49mhlNZZGrH5u1uXYZJ+RW28Py7f4m4= +github.com/matoous/go-nanoid v1.5.0 h1:VRorl6uCngneC4oUQqOYtO3S0H5QKFtKuKycFG3euek= +github.com/matoous/go-nanoid v1.5.0/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U= +github.com/matoous/go-nanoid/v2 v2.0.0 h1:d19kur2QuLeHmJBkvYkFdhFBzLoo1XVm2GgTpL+9Tj0= +github.com/matoous/go-nanoid/v2 v2.0.0/go.mod h1:FtS4aGPVfEkxKxhdWPAspZpZSh1cOjtM7Ej/So3hR0g= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/microcosm-cc/bluemonday v1.0.1/go.mod h1:hsXNsILzKxV+sX77C5b8FSuKF00vh2OMYv+xgHpAMF4= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -162,6 +164,7 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/meta_data.go b/meta_data.go new file mode 100644 index 000000000..e7fa44ad2 --- /dev/null +++ b/meta_data.go @@ -0,0 +1,30 @@ +package yomo + +import ( + "github.com/yomorun/yomo/core" + "github.com/yomorun/yomo/core/frame" +) + +type metaData struct{} + +func (m *metaData) Encode() []byte { + return nil +} + +type metaDataBuilder struct { + m *metaData +} + +func newMetaDataBuilder() core.MetaDataBuilder { + return &metaDataBuilder{ + m: &metaData{}, + } +} + +func (builder *metaDataBuilder) Build(f *frame.HandshakeFrame) (core.MetaData, error) { + return builder.m, nil +} + +func (builder *metaDataBuilder) Decode(buf []byte) (core.MetaData, error) { + return builder.m, nil +} diff --git a/router.go b/router.go index 764a423d3..a93929e77 100644 --- a/router.go +++ b/router.go @@ -16,7 +16,7 @@ func newRouter(functions []config.App) core.Router { return &router{r: newRoute(functions)} } -func (r *router) Route(info core.AppInfo) core.Route { +func (r *router) Route(metaData core.MetaData) core.Route { return r.r } diff --git a/zipper.go b/zipper.go index 02434355d..6745e529c 100644 --- a/zipper.go +++ b/zipper.go @@ -135,7 +135,7 @@ func (z *zipper) ConfigWorkflow(conf string) error { func (z *zipper) configWorkflow(config *config.WorkflowConfig) error { z.wfc = config - z.server.ConfigAppInfoBuilder(newAppInfoBuilder()) + z.server.ConfigMetaDataBuilder(newMetaDataBuilder()) z.server.ConfigRouter(newRouter(config.Functions)) return nil } From 72855ee64ccab022486f25e3a2bcf9da2fd43447 Mon Sep 17 00:00:00 2001 From: wujunzhuo Date: Mon, 23 May 2022 13:00:47 +0800 Subject: [PATCH 12/14] remove connection from route --- core/connector.go | 10 ++++++---- core/frame/meta_frame.go | 5 ++++- core/server.go | 32 ++++++++++++++++---------------- zipper.go | 6 +++--- 4 files changed, 29 insertions(+), 24 deletions(-) diff --git a/core/connector.go b/core/connector.go index 221d1f2d4..aa180a0a7 100644 --- a/core/connector.go +++ b/core/connector.go @@ -17,7 +17,7 @@ type Connector interface { // Get a connection by connection id. Get(connID string) Connection // GetSnapshot gets the snapshot of all connections. - GetSnapshot() map[string]Connection + GetSnapshot() map[string]string // Clean the connector. Clean() } @@ -52,10 +52,12 @@ func (c *connector) Get(connID string) Connection { } // GetSnapshot gets the snapshot of all connections. -func (c *connector) GetSnapshot() map[string]Connection { - result := make(map[string]Connection) +func (c *connector) GetSnapshot() map[string]string { + result := make(map[string]string) c.conns.Range(func(key interface{}, val interface{}) bool { - result[key.(string)] = val.(Connection) + connID := key.(string) + conn := val.(Connection) + result[connID] = conn.Name() return true }) return result diff --git a/core/frame/meta_frame.go b/core/frame/meta_frame.go index 3ea9aba35..877f7068d 100644 --- a/core/frame/meta_frame.go +++ b/core/frame/meta_frame.go @@ -1,6 +1,9 @@ package frame import ( + "strconv" + "time" + gonanoid "github.com/matoous/go-nanoid/v2" "github.com/yomorun/y3" ) @@ -16,7 +19,7 @@ type MetaFrame struct { func NewMetaFrame() *MetaFrame { tid, err := gonanoid.New() if err != nil { - return nil + tid = strconv.FormatInt(time.Now().UnixMicro(), 10) } return &MetaFrame{tid: tid} } diff --git a/core/server.go b/core/server.go index bf222013f..ccfa067b3 100644 --- a/core/server.go +++ b/core/server.go @@ -85,6 +85,14 @@ func (s *Server) ListenAndServe(ctx context.Context, addr string) error { // Serve the server with a net.PacketConn. func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { + if err := s.validateMetaDataBuilder(); err != nil { + return err + } + + if err := s.validateRouter(); err != nil { + return err + } + listener := newListener() // listen the address err := listener.Listen(conn, s.opts.TLSConfig, s.opts.QuicConfig) @@ -120,6 +128,10 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { if conn := s.connector.Get(connID); conn != nil { // connector s.connector.Remove(connID) + route := s.router.Route(conn.MetaData()) + if !reflect.ValueOf(route).IsNil() { + route.Remove(connID) + } logger.Printf("%s💔 [%s](%s) close the connection", ServerLogPrefix, conn.Name(), connID) } else { logger.Errorf("%s❤️3/ [unknown](%s) on stream %v", ServerLogPrefix, connID, err) @@ -289,18 +301,12 @@ func (s *Server) handleHandshakeFrame(c *Context) error { } // metaData - if err := s.validateMetaDataBuilder(); err != nil { - return err - } metaData, err := s.metaDataBuilder.Build(f) if err != nil { return err } // route - if err := s.validateRouter(); err != nil { - return err - } route := s.router.Route(metaData) if reflect.ValueOf(route).IsNil() { err := errors.New("handleHandshakeFrame route is nil") @@ -363,21 +369,15 @@ func (s *Server) handleDataFrame(c *Context) error { f := c.Frame.(*frame.DataFrame) metaData := from.MetaData() - if metaData == nil && from.ClientType() == ClientTypeUpstreamZipper { - err := s.validateMetaDataBuilder() - if err != nil { - return err - } - metaData, err = s.metaDataBuilder.Decode(f.GetMetaFrame().MetaData()) + if reflect.ValueOf(metaData).IsNil() && from.ClientType() == ClientTypeUpstreamZipper { + m, err := s.metaDataBuilder.Decode(f.GetMetaFrame().MetaData()) if err != nil { return err } + metaData = m } // route - if err := s.validateRouter(); err != nil { - return err - } route := s.router.Route(metaData) if reflect.ValueOf(route).IsNil() { logger.Warnf("%shandleDataFrame route is nil", ServerLogPrefix) @@ -407,7 +407,7 @@ func (s *Server) handleDataFrame(c *Context) error { } // StatsFunctions returns the sfn stats of server. -func (s *Server) StatsFunctions() map[string]Connection { +func (s *Server) StatsFunctions() map[string]string { return s.connector.GetSnapshot() } diff --git a/zipper.go b/zipper.go index 6745e529c..1bca56755 100644 --- a/zipper.go +++ b/zipper.go @@ -245,9 +245,9 @@ func (z *zipper) Close() error { // Stats inspects current server. func (z *zipper) Stats() int { - log.Printf("[%s] all sfn connected: %d", z.name, len(z.server.StatsFunctions())) - for k := range z.server.StatsFunctions() { - log.Printf("[%s] -> ConnID=%v", z.name, k) + log.Printf("[%s] all connections: %d", z.name, len(z.server.StatsFunctions())) + for connID, name := range z.server.StatsFunctions() { + log.Printf("[%s] -> ConnID=%s, Name=%s", z.name, connID, name) } log.Printf("[%s] all downstream zippers connected: %d", z.name, len(z.server.Downstreams())) From 02a5779c415bfbe601e80e974847b1efeb6744b7 Mon Sep 17 00:00:00 2001 From: wujunzhuo Date: Mon, 23 May 2022 13:25:23 +0800 Subject: [PATCH 13/14] fix UnixMicro go 1.16 --- core/frame/meta_frame.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/frame/meta_frame.go b/core/frame/meta_frame.go index 877f7068d..65513c7fa 100644 --- a/core/frame/meta_frame.go +++ b/core/frame/meta_frame.go @@ -19,7 +19,7 @@ type MetaFrame struct { func NewMetaFrame() *MetaFrame { tid, err := gonanoid.New() if err != nil { - tid = strconv.FormatInt(time.Now().UnixMicro(), 10) + tid = strconv.FormatInt(time.Now().Unix(), 10) // todo: UnixMicro since go 1.17 } return &MetaFrame{tid: tid} } From 3a77cde9fee9da7ce196905ea68b0bc639e452fb Mon Sep 17 00:00:00 2001 From: wujunzhuo Date: Mon, 23 May 2022 15:35:58 +0800 Subject: [PATCH 14/14] rename MetaData to Metadata --- core/connection.go | 16 ++++++++-------- core/frame/frame.go | 1 - core/frame/meta_frame.go | 28 ++++++++++++++-------------- core/meta_data.go | 17 ----------------- core/metadata.go | 17 +++++++++++++++++ core/router.go | 2 +- core/server.go | 40 ++++++++++++++++++++-------------------- meta_data.go | 30 ------------------------------ metadata.go | 30 ++++++++++++++++++++++++++++++ router.go | 2 +- zipper.go | 2 +- 11 files changed, 92 insertions(+), 93 deletions(-) delete mode 100644 core/meta_data.go create mode 100644 core/metadata.go delete mode 100644 meta_data.go create mode 100644 metadata.go diff --git a/core/connection.go b/core/connection.go index d38e9e892..55ac28c32 100644 --- a/core/connection.go +++ b/core/connection.go @@ -15,8 +15,8 @@ type Connection interface { Name() string // ClientType returns the type of the client (Source | SFN | UpstreamZipper) ClientType() ClientType - // MetaData returns the extra info of the application - MetaData() MetaData + // Metadata returns the extra info of the application + Metadata() Metadata // Write should goroutine-safely send y3 frames to peer side Write(f frame.Frame) error } @@ -24,16 +24,16 @@ type Connection interface { type connection struct { name string clientType ClientType - metaData MetaData + metadata Metadata stream io.ReadWriteCloser mu sync.Mutex } -func newConnection(name string, clientType ClientType, metaData MetaData, stream io.ReadWriteCloser) Connection { +func newConnection(name string, clientType ClientType, metadata Metadata, stream io.ReadWriteCloser) Connection { return &connection{ name: name, clientType: clientType, - metaData: metaData, + metadata: metadata, stream: stream, } } @@ -53,9 +53,9 @@ func (c *connection) ClientType() ClientType { return c.clientType } -// MetaData returns the extra info of the application -func (c *connection) MetaData() MetaData { - return c.metaData +// Metadata returns the extra info of the application +func (c *connection) Metadata() Metadata { + return c.metadata } // Write should goroutine-safely send y3 frames to peer side diff --git a/core/frame/frame.go b/core/frame/frame.go index 6772fa5fb..63b3df642 100644 --- a/core/frame/frame.go +++ b/core/frame/frame.go @@ -16,7 +16,6 @@ const ( TagOfMetaFrame Type = 0x2F TagOfMetadata Type = 0x03 TagOfTransactionID Type = 0x01 - TagOfMetaData Type = 0x02 // PayloadFrame of DataFrame TagOfPayloadFrame Type = 0x2E diff --git a/core/frame/meta_frame.go b/core/frame/meta_frame.go index 65513c7fa..534c6b855 100644 --- a/core/frame/meta_frame.go +++ b/core/frame/meta_frame.go @@ -11,8 +11,8 @@ import ( // MetaFrame is a Y3 encoded bytes, SeqID is a fixed value of TYPE_ID_TRANSACTION. // used for describes metadata for a DataFrame. type MetaFrame struct { - tid string - data []byte + tid string + metadata []byte } // NewMetaFrame creates a new MetaFrame instance. @@ -34,14 +34,14 @@ func (m *MetaFrame) TransactionID() string { return m.tid } -// SetMetaData set the extra info of the application -func (m *MetaFrame) SetMetaData(data []byte) { - m.data = data +// SetMetadata set the extra info of the application +func (m *MetaFrame) SetMetadata(metadata []byte) { + m.metadata = metadata } -// MetaData returns the extra info of the application -func (m *MetaFrame) MetaData() []byte { - return m.data +// Metadata returns the extra info of the application +func (m *MetaFrame) Metadata() []byte { + return m.metadata } // Encode implements Frame.Encode method. @@ -52,10 +52,10 @@ func (m *MetaFrame) Encode() []byte { transactionID.SetStringValue(m.tid) meta.AddPrimitivePacket(transactionID) - if m.data != nil { - data := y3.NewPrimitivePacketEncoder(byte(TagOfMetaData)) - data.SetBytesValue(m.data) - meta.AddPrimitivePacket(data) + if m.metadata != nil { + metadata := y3.NewPrimitivePacketEncoder(byte(TagOfMetadata)) + metadata.SetBytesValue(m.metadata) + meta.AddPrimitivePacket(metadata) } return meta.Encode() @@ -79,8 +79,8 @@ func DecodeToMetaFrame(buf []byte) (*MetaFrame, error) { } meta.tid = val break - case byte(TagOfMetaData): - meta.data = v.ToBytes() + case byte(TagOfMetadata): + meta.metadata = v.ToBytes() break } } diff --git a/core/meta_data.go b/core/meta_data.go deleted file mode 100644 index 00b4e8679..000000000 --- a/core/meta_data.go +++ /dev/null @@ -1,17 +0,0 @@ -package core - -import "github.com/yomorun/yomo/core/frame" - -// MetaData is used for storing extra info of the application -type MetaData interface { - // Encode is the serialize method - Encode() []byte -} - -// MetaDataBuilder is the builder of MetaData -type MetaDataBuilder interface { - // Build will return an MetaData instance according to the handshake frame passed in - Build(f *frame.HandshakeFrame) (MetaData, error) - // Decode is the deserialize method - Decode(buf []byte) (MetaData, error) -} diff --git a/core/metadata.go b/core/metadata.go new file mode 100644 index 000000000..c9d9a0476 --- /dev/null +++ b/core/metadata.go @@ -0,0 +1,17 @@ +package core + +import "github.com/yomorun/yomo/core/frame" + +// Metadata is used for storing extra info of the application +type Metadata interface { + // Encode is the serialize method + Encode() []byte +} + +// MetadataBuilder is the builder of Metadata +type MetadataBuilder interface { + // Build will return an Metadata instance according to the handshake frame passed in + Build(f *frame.HandshakeFrame) (Metadata, error) + // Decode is the deserialize method + Decode(buf []byte) (Metadata, error) +} diff --git a/core/router.go b/core/router.go index 7866ceb83..46dc26906 100644 --- a/core/router.go +++ b/core/router.go @@ -3,7 +3,7 @@ package core // Router is the interface to manage the routes for applications. type Router interface { // Route gets the route - Route(metaData MetaData) Route + Route(metadata Metadata) Route // Clean the routes. Clean() } diff --git a/core/server.go b/core/server.go index ccfa067b3..918b1efca 100644 --- a/core/server.go +++ b/core/server.go @@ -35,7 +35,7 @@ type Server struct { state string connector Connector router Router - metaDataBuilder MetaDataBuilder + metadataBuilder MetadataBuilder counterOfDataFrame int64 downstreams map[string]*Client mu sync.Mutex @@ -85,7 +85,7 @@ func (s *Server) ListenAndServe(ctx context.Context, addr string) error { // Serve the server with a net.PacketConn. func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { - if err := s.validateMetaDataBuilder(); err != nil { + if err := s.validateMetadataBuilder(); err != nil { return err } @@ -128,7 +128,7 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { if conn := s.connector.Get(connID); conn != nil { // connector s.connector.Remove(connID) - route := s.router.Route(conn.MetaData()) + route := s.router.Route(conn.Metadata()) if !reflect.ValueOf(route).IsNil() { route.Remove(connID) } @@ -266,7 +266,7 @@ func (s *Server) mainFrameHandler(c *Context) error { conn := s.connector.Get(c.connID) if conn != nil && conn.ClientType() == ClientTypeSource { f := c.Frame.(*frame.DataFrame) - f.GetMetaFrame().SetMetaData(conn.MetaData().Encode()) + f.GetMetaFrame().SetMetadata(conn.Metadata().Encode()) s.dispatchToDownstreams(f) } } @@ -300,21 +300,21 @@ func (s *Server) handleHandshakeFrame(c *Context) error { return nil } - // metaData - metaData, err := s.metaDataBuilder.Build(f) + // metadata + metadata, err := s.metadataBuilder.Build(f) if err != nil { return err } // route - route := s.router.Route(metaData) + route := s.router.Route(metadata) if reflect.ValueOf(route).IsNil() { err := errors.New("handleHandshakeFrame route is nil") return err } // client type - conn := newConnection(f.Name, clientType, metaData, stream) + conn := newConnection(f.Name, clientType, metadata, stream) switch clientType { case ClientTypeSource, ClientTypeUpstreamZipper: s.connector.Add(connID, conn) @@ -368,17 +368,17 @@ func (s *Server) handleDataFrame(c *Context) error { f := c.Frame.(*frame.DataFrame) - metaData := from.MetaData() - if reflect.ValueOf(metaData).IsNil() && from.ClientType() == ClientTypeUpstreamZipper { - m, err := s.metaDataBuilder.Decode(f.GetMetaFrame().MetaData()) + metadata := from.Metadata() + if reflect.ValueOf(metadata).IsNil() && from.ClientType() == ClientTypeUpstreamZipper { + m, err := s.metadataBuilder.Decode(f.GetMetaFrame().Metadata()) if err != nil { return err } - metaData = m + metadata = m } // route - route := s.router.Route(metaData) + route := s.router.Route(metadata) if reflect.ValueOf(route).IsNil() { logger.Warnf("%shandleDataFrame route is nil", ServerLogPrefix) return fmt.Errorf("handleDataFrame route is nil") @@ -429,11 +429,11 @@ func (s *Server) ConfigRouter(router Router) { s.mu.Unlock() } -// ConfigMetaDataBuilder is used to set metaDataBuilder by zipper -func (s *Server) ConfigMetaDataBuilder(builder MetaDataBuilder) { +// ConfigMetadataBuilder is used to set metadataBuilder by zipper +func (s *Server) ConfigMetadataBuilder(builder MetadataBuilder) { s.mu.Lock() - s.metaDataBuilder = builder - logger.Debugf("%sconfig metaDataBuilder is %#v", ServerLogPrefix, builder) + s.metadataBuilder = builder + logger.Debugf("%sconfig metadataBuilder is %#v", ServerLogPrefix, builder) s.mu.Unlock() } @@ -469,9 +469,9 @@ func (s *Server) validateRouter() error { return nil } -func (s *Server) validateMetaDataBuilder() error { - if s.metaDataBuilder == nil { - return errors.New("server's metaDataBuilder is nil") +func (s *Server) validateMetadataBuilder() error { + if s.metadataBuilder == nil { + return errors.New("server's metadataBuilder is nil") } return nil } diff --git a/meta_data.go b/meta_data.go deleted file mode 100644 index e7fa44ad2..000000000 --- a/meta_data.go +++ /dev/null @@ -1,30 +0,0 @@ -package yomo - -import ( - "github.com/yomorun/yomo/core" - "github.com/yomorun/yomo/core/frame" -) - -type metaData struct{} - -func (m *metaData) Encode() []byte { - return nil -} - -type metaDataBuilder struct { - m *metaData -} - -func newMetaDataBuilder() core.MetaDataBuilder { - return &metaDataBuilder{ - m: &metaData{}, - } -} - -func (builder *metaDataBuilder) Build(f *frame.HandshakeFrame) (core.MetaData, error) { - return builder.m, nil -} - -func (builder *metaDataBuilder) Decode(buf []byte) (core.MetaData, error) { - return builder.m, nil -} diff --git a/metadata.go b/metadata.go new file mode 100644 index 000000000..5962b9fc7 --- /dev/null +++ b/metadata.go @@ -0,0 +1,30 @@ +package yomo + +import ( + "github.com/yomorun/yomo/core" + "github.com/yomorun/yomo/core/frame" +) + +type metadata struct{} + +func (m *metadata) Encode() []byte { + return nil +} + +type metadataBuilder struct { + m *metadata +} + +func newMetadataBuilder() core.MetadataBuilder { + return &metadataBuilder{ + m: &metadata{}, + } +} + +func (builder *metadataBuilder) Build(f *frame.HandshakeFrame) (core.Metadata, error) { + return builder.m, nil +} + +func (builder *metadataBuilder) Decode(buf []byte) (core.Metadata, error) { + return builder.m, nil +} diff --git a/router.go b/router.go index a93929e77..7cbfc8f94 100644 --- a/router.go +++ b/router.go @@ -16,7 +16,7 @@ func newRouter(functions []config.App) core.Router { return &router{r: newRoute(functions)} } -func (r *router) Route(metaData core.MetaData) core.Route { +func (r *router) Route(metadata core.Metadata) core.Route { return r.r } diff --git a/zipper.go b/zipper.go index 1bca56755..4b932a449 100644 --- a/zipper.go +++ b/zipper.go @@ -135,7 +135,7 @@ func (z *zipper) ConfigWorkflow(conf string) error { func (z *zipper) configWorkflow(config *config.WorkflowConfig) error { z.wfc = config - z.server.ConfigMetaDataBuilder(newMetaDataBuilder()) + z.server.ConfigMetadataBuilder(newMetadataBuilder()) z.server.ConfigRouter(newRouter(config.Functions)) return nil }