diff --git a/core/connection.go b/core/connection.go new file mode 100644 index 000000000..55ac28c32 --- /dev/null +++ b/core/connection.go @@ -0,0 +1,67 @@ +package core + +import ( + "io" + "sync" + + "github.com/yomorun/yomo/core/frame" +) + +// Connection wraps the specific io connections (typically quic.Connection) to transfer y3 frames +type Connection interface { + io.Closer + + // Name returns the name of the connection, which is set by clients + Name() string + // ClientType returns the type of the client (Source | SFN | UpstreamZipper) + ClientType() ClientType + // Metadata returns the extra info of the application + Metadata() Metadata + // Write should goroutine-safely send y3 frames to peer side + Write(f frame.Frame) error +} + +type connection struct { + name string + clientType ClientType + metadata Metadata + stream io.ReadWriteCloser + mu sync.Mutex +} + +func newConnection(name string, clientType ClientType, metadata Metadata, stream io.ReadWriteCloser) Connection { + return &connection{ + name: name, + clientType: clientType, + metadata: metadata, + stream: stream, + } +} + +// Close implements io.Close interface +func (c *connection) Close() error { + return c.stream.Close() +} + +// Name returns the name of the connection, which is set by clients +func (c *connection) Name() string { + return c.name +} + +// ClientType returns the type of the connection (Source | SFN | UpstreamZipper) +func (c *connection) ClientType() ClientType { + return c.clientType +} + +// Metadata returns the extra info of the application +func (c *connection) Metadata() Metadata { + return c.metadata +} + +// Write should goroutine-safely send y3 frames to peer side +func (c *connection) Write(f frame.Frame) error { + c.mu.Lock() + defer c.mu.Unlock() + _, err := c.stream.Write(f.Encode()) + return err +} diff --git a/core/connector.go b/core/connector.go index 245d7a4c6..aa180a0a7 100644 --- a/core/connector.go +++ b/core/connector.go @@ -1,217 +1,69 @@ package core import ( - "fmt" - "io" - "math/rand" "sync" - "github.com/yomorun/yomo/core/frame" "github.com/yomorun/yomo/pkg/logger" ) -type app struct { - name string // app name - observed []byte // data tags -} - -// func (a *app) ID() string { -// return a.id -// } - -func (a *app) Name() string { - return a.name -} - var _ Connector = &connector{} // Connector is a interface to manage the connections and applications. type Connector interface { // Add a connection. - Add(connID string, stream io.ReadWriteCloser) + Add(connID string, conn Connection) // Remove a connection. Remove(connID string) // Get a connection by connection id. - Get(connID string) io.ReadWriteCloser - // GetConnIDs gets the connection ids by name and tag. - GetConnIDs(name string, tags byte) []string - // Write a Frame to a connection. - Write(f frame.Frame, toID string) error + Get(connID string) Connection // GetSnapshot gets the snapshot of all connections. - GetSnapshot() map[string]io.ReadWriteCloser - - // App gets the app by connID. - App(connID string) (*app, bool) - // AppID gets the ID of app by connID. - // AppID(connID string) (string, bool) - // AppName gets the name of app by connID. - AppName(connID string) (string, bool) - // LinkApp links the app and connection. - LinkApp(connID string, name string, observed []byte) - // UnlinkApp removes the app by connID. - UnlinkApp(connID string, name string) - // ExistsApp check app exists - ExistsApp(name string) bool - + GetSnapshot() map[string]string // Clean the connector. Clean() } type connector struct { conns sync.Map - apps sync.Map - mu sync.Mutex } func newConnector() Connector { - return &connector{ - conns: sync.Map{}, - apps: sync.Map{}, - mu: sync.Mutex{}, - } + return &connector{conns: sync.Map{}} } // Add a connection. -func (c *connector) Add(connID string, stream io.ReadWriteCloser) { +func (c *connector) Add(connID string, conn Connection) { logger.Debugf("%sconnector add: connID=%s", ServerLogPrefix, connID) - c.conns.Store(connID, stream) + c.conns.Store(connID, conn) } // Remove a connection. func (c *connector) Remove(connID string) { logger.Debugf("%sconnector remove: connID=%s", ServerLogPrefix, connID) c.conns.Delete(connID) - // c.funcs.Delete(connID) - c.apps.Delete(connID) } // Get a connection by connection id. -func (c *connector) Get(connID string) io.ReadWriteCloser { +func (c *connector) Get(connID string) Connection { logger.Debugf("%sconnector get connection: connID=%s", ServerLogPrefix, connID) - if stream, ok := c.conns.Load(connID); ok { - return stream.(io.ReadWriteCloser) + if conn, ok := c.conns.Load(connID); ok { + return conn.(Connection) } return nil } -// App gets the app by connID. -func (c *connector) App(connID string) (*app, bool) { - if result, found := c.apps.Load(connID); found { - app, ok := result.(*app) - if ok { - logger.Debugf("%sconnector get app=%s, connID=%s", ServerLogPrefix, app.name, connID) - return app, true - } - logger.Warnf("%sconnector get app convert fails, connID=%s", ServerLogPrefix, connID) - return nil, false - } - logger.Warnf("%sconnector get app is nil, connID=%s", ServerLogPrefix, connID) - return nil, false -} - -// AppName gets the name of app by connID. -func (c *connector) AppName(connID string) (string, bool) { - if app, ok := c.App(connID); ok { - return app.name, true - } - return "", false -} - -// GetConnIDs gets the connection ids by name and tag. -func (c *connector) GetConnIDs(name string, tag byte) []string { - connIDs := make([]string, 0) - - c.apps.Range(func(key interface{}, val interface{}) bool { - app := val.(*app) - if app.name == name { - for _, v := range app.observed { - if v == tag { - connIDs = append(connIDs, key.(string)) - break - } - } - } - return true - }) - - if n := len(connIDs); n > 1 { - index := rand.Intn(n) - return connIDs[index : index+1] - } - - return connIDs -} - -// Write a DataFrame to a connection. -func (c *connector) Write(f frame.Frame, toID string) error { - targetStream := c.Get(toID) - if targetStream == nil { - logger.Warnf("%swill write to: [%s], target stream is nil", ServerLogPrefix, toID) - return fmt.Errorf("target[%s] stream is nil", toID) - } - c.mu.Lock() - _, err := targetStream.Write(f.Encode()) - c.mu.Unlock() - return err -} - // GetSnapshot gets the snapshot of all connections. -func (c *connector) GetSnapshot() map[string]io.ReadWriteCloser { - result := make(map[string]io.ReadWriteCloser) +func (c *connector) GetSnapshot() map[string]string { + result := make(map[string]string) c.conns.Range(func(key interface{}, val interface{}) bool { - result[key.(string)] = val.(io.ReadWriteCloser) + connID := key.(string) + conn := val.(Connection) + result[connID] = conn.Name() return true }) return result } -// LinkApp links the app and connection. -func (c *connector) LinkApp(connID string, name string, observed []byte) { - logger.Debugf("%sconnector link application: connID[%s] --> app[%s]", ServerLogPrefix, connID, name) - c.apps.Store(connID, &app{name, observed}) -} - -// UnlinkApp removes the app by connID. -func (c *connector) UnlinkApp(connID string, name string) { - logger.Debugf("%sconnector unlink application: connID[%s] x-> app[%s]", ServerLogPrefix, connID, name) - c.apps.Delete(connID) -} - -// ExistsApp check app exists -func (c *connector) ExistsApp(name string) bool { - var found bool - c.apps.Range(func(key interface{}, val interface{}) bool { - app := val.(*app) - if app.name == name { - found = true - return false - } - return true - }) - - return found -} - -// func (c *connector) RemoveApp(appID string) { -// logger.Debugf("%sconnector unlink application: connID[%s] x-> app[%s]", ServerLogPrefix, connID, appID) -// c.apps.Range(func(key interface{},val interface{})bool{ -// return true -// }) -// c.rapps.Delete(appID) -// } - -// func (c *connector) AppConns(appID string) []string { -// conns := make([]string, 0) -// c.apps.Range(func(key interface{},val interface{})bool{ -// if val.(string)==appID{ -// conns=append(conns,key.(string)) -// } -// }) -// return conns -// } - // Clean the connector. func (c *connector) Clean() { c.conns = sync.Map{} - c.apps = sync.Map{} } diff --git a/core/frame/frame.go b/core/frame/frame.go index f97db283b..63b3df642 100644 --- a/core/frame/frame.go +++ b/core/frame/frame.go @@ -16,7 +16,6 @@ const ( TagOfMetaFrame Type = 0x2F TagOfMetadata Type = 0x03 TagOfTransactionID Type = 0x01 - TagOfIssuer Type = 0x02 // PayloadFrame of DataFrame TagOfPayloadFrame Type = 0x2E @@ -31,13 +30,13 @@ const ( TagOfHandshakeAuthPayload Type = 0x05 TagOfHandshakeObserveDataTags Type = 0x06 - TagOfPingFrame Type = 0x3C - TagOfPongFrame Type = 0x3B - TagOfAcceptedFrame Type = 0x3A - TagOfRejectedFrame Type = 0x39 + TagOfPingFrame Type = 0x3C + TagOfPongFrame Type = 0x3B + TagOfAcceptedFrame Type = 0x3A + TagOfRejectedFrame Type = 0x39 TagOfRejectedMessage Type = 0x02 // GoawayFrame - TagOfGoawayFrame Type = 0x30 + TagOfGoawayFrame Type = 0x30 TagOfGoawayCode Type = 0x01 TagOfGoawayMessage Type = 0x02 ) diff --git a/core/frame/meta_frame.go b/core/frame/meta_frame.go index a5a75379b..534c6b855 100644 --- a/core/frame/meta_frame.go +++ b/core/frame/meta_frame.go @@ -4,20 +4,24 @@ import ( "strconv" "time" + gonanoid "github.com/matoous/go-nanoid/v2" "github.com/yomorun/y3" ) // MetaFrame is a Y3 encoded bytes, SeqID is a fixed value of TYPE_ID_TRANSACTION. // used for describes metadata for a DataFrame. type MetaFrame struct { - tid string + tid string + metadata []byte } // NewMetaFrame creates a new MetaFrame instance. func NewMetaFrame() *MetaFrame { - return &MetaFrame{ - tid: strconv.FormatInt(time.Now().Unix(), 10), + tid, err := gonanoid.New() + if err != nil { + tid = strconv.FormatInt(time.Now().Unix(), 10) // todo: UnixMicro since go 1.17 } + return &MetaFrame{tid: tid} } // SetTransactinID set the transaction ID. @@ -30,14 +34,30 @@ func (m *MetaFrame) TransactionID() string { return m.tid } +// SetMetadata set the extra info of the application +func (m *MetaFrame) SetMetadata(metadata []byte) { + m.metadata = metadata +} + +// Metadata returns the extra info of the application +func (m *MetaFrame) Metadata() []byte { + return m.metadata +} + // Encode implements Frame.Encode method. func (m *MetaFrame) Encode() []byte { meta := y3.NewNodePacketEncoder(byte(TagOfMetaFrame)) transactionID := y3.NewPrimitivePacketEncoder(byte(TagOfTransactionID)) transactionID.SetStringValue(m.tid) - meta.AddPrimitivePacket(transactionID) + + if m.metadata != nil { + metadata := y3.NewPrimitivePacketEncoder(byte(TagOfMetadata)) + metadata.SetBytesValue(m.metadata) + meta.AddPrimitivePacket(metadata) + } + return meta.Encode() } @@ -50,10 +70,19 @@ func DecodeToMetaFrame(buf []byte) (*MetaFrame, error) { } meta := &MetaFrame{} - for _, v := range nodeBlock.PrimitivePackets { - val, _ := v.ToUTF8String() - meta.tid = val - break + for k, v := range nodeBlock.PrimitivePackets { + switch k { + case byte(TagOfTransactionID): + val, err := v.ToUTF8String() + if err != nil { + return nil, err + } + meta.tid = val + break + case byte(TagOfMetadata): + meta.metadata = v.ToBytes() + break + } } return meta, nil diff --git a/core/metadata.go b/core/metadata.go new file mode 100644 index 000000000..c9d9a0476 --- /dev/null +++ b/core/metadata.go @@ -0,0 +1,17 @@ +package core + +import "github.com/yomorun/yomo/core/frame" + +// Metadata is used for storing extra info of the application +type Metadata interface { + // Encode is the serialize method + Encode() []byte +} + +// MetadataBuilder is the builder of Metadata +type MetadataBuilder interface { + // Build will return an Metadata instance according to the handshake frame passed in + Build(f *frame.HandshakeFrame) (Metadata, error) + // Decode is the deserialize method + Decode(buf []byte) (Metadata, error) +} diff --git a/core/router.go b/core/router.go index 7ab37707f..46dc26906 100644 --- a/core/router.go +++ b/core/router.go @@ -3,17 +3,17 @@ package core // Router is the interface to manage the routes for applications. type Router interface { // Route gets the route - Route() Route + Route(metadata Metadata) Route // Clean the routes. Clean() } -// Route is the interface for route. +// Route manages data subscribers according to their observed data tags. type Route interface { // Add a route. - Add(index int, name string) - // GetForwardRoutes returns all the forward routes from current node. - GetForwardRoutes(current string) []string - // Exists indicates whether the route exists or not. - Exists(name string) bool + Add(connID string, name string, observeDataTags []byte) error + // Remove a route. + Remove(connID string) error + // GetForwardRoutes returns all the subscribers by the given data tag. + GetForwardRoutes(tag byte) []string } diff --git a/core/server.go b/core/server.go index 26ed52249..918b1efca 100644 --- a/core/server.go +++ b/core/server.go @@ -12,7 +12,6 @@ import ( "github.com/lucas-clemente/quic-go" "github.com/yomorun/yomo/core/frame" - "github.com/yomorun/yomo/core/store" "github.com/yomorun/yomo/core/yerr" // authentication implements, Currently, only token authentication is implemented @@ -32,11 +31,11 @@ type FrameHandler func(c *Context) error // Server is the underlining server of Zipper type Server struct { - name string - // stream quic.Stream + name string state string connector Connector router Router + metadataBuilder MetadataBuilder counterOfDataFrame int64 downstreams map[string]*Client mu sync.Mutex @@ -86,6 +85,14 @@ func (s *Server) ListenAndServe(ctx context.Context, addr string) error { // Serve the server with a net.PacketConn. func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { + if err := s.validateMetadataBuilder(); err != nil { + return err + } + + if err := s.validateRouter(); err != nil { + return err + } + listener := newListener() // listen the address err := listener.Listen(conn, s.opts.TLSConfig, s.opts.QuicConfig) @@ -118,13 +125,14 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { if err != nil { // if client close the connection, then we should close the connection // @CC: when Source close the connection, it won't affect connectors - appName, ok := s.connector.AppName(connID) - if ok { + if conn := s.connector.Get(connID); conn != nil { // connector s.connector.Remove(connID) - // store - // remove app route from store? let me think... - logger.Printf("%s💔 [%s](%s) close the connection", ServerLogPrefix, appName, connID) + route := s.router.Route(conn.Metadata()) + if !reflect.ValueOf(route).IsNil() { + route.Remove(connID) + } + logger.Printf("%s💔 [%s](%s) close the connection", ServerLogPrefix, conn.Name(), connID) } else { logger.Errorf("%s❤️3/ [unknown](%s) on stream %v", ServerLogPrefix, connID, err) } @@ -160,10 +168,6 @@ func (s *Server) Close() error { if s.connector != nil { s.connector.Clean() } - // store - if s.opts.Store != nil { - s.opts.Store.Clean() - } return nil } @@ -259,7 +263,12 @@ func (s *Server) mainFrameHandler(c *Context) error { if err := s.handleDataFrame(c); err != nil { c.CloseWithError(yerr.ErrorCodeData, fmt.Sprintf("handleDataFrame err: %v", err)) } else { - s.dispatchToDownstreams(c.Frame.(*frame.DataFrame)) + conn := s.connector.Get(c.connID) + if conn != nil && conn.ClientType() == ClientTypeSource { + f := c.Frame.(*frame.DataFrame) + f.GetMetaFrame().SetMetadata(conn.Metadata().Encode()) + s.dispatchToDownstreams(f) + } } default: logger.Errorf("%serr=%v, frame=%v", ServerLogPrefix, err, c.Frame.Encode()) @@ -274,7 +283,6 @@ func (s *Server) handleHandshakeFrame(c *Context) error { logger.Debugf("%sGOT ❤️ HandshakeFrame : %# x", ServerLogPrefix, f) // basic info connID := c.ConnID() - name := f.Name clientType := ClientType(f.ClientType) stream := c.Stream // credential @@ -283,68 +291,43 @@ func (s *Server) handleHandshakeFrame(c *Context) error { if !s.authenticate(f) { err := fmt.Errorf("handshake authentication fails, client credential name is %s", authName(f.AuthName())) // return err - logger.Debugf("%s🔑 <%s> [%s](%s) is connected!", ServerLogPrefix, clientType, name, connID) + logger.Debugf("%s🔑 <%s> [%s](%s) is connected!", ServerLogPrefix, clientType, f.Name, connID) rejectedFrame := frame.NewRejectedFrame(err.Error()) if _, err = stream.Write(rejectedFrame.Encode()); err != nil { - logger.Debugf("%s🔑 write to <%s> [%s](%s) RejectedFrame error:%v", ServerLogPrefix, clientType, name, connID, err) + logger.Debugf("%s🔑 write to <%s> [%s](%s) RejectedFrame error:%v", ServerLogPrefix, clientType, f.Name, connID, err) return err } return nil } - // route - if err := s.validateRouter(); err != nil { + // metadata + metadata, err := s.metadataBuilder.Build(f) + if err != nil { return err } - route := s.router.Route() + + // route + route := s.router.Route(metadata) if reflect.ValueOf(route).IsNil() { err := errors.New("handleHandshakeFrame route is nil") return err } - // store - s.opts.Store.Set(name, route) // client type + conn := newConnection(f.Name, clientType, metadata, stream) switch clientType { - case ClientTypeSource: - s.connector.Add(connID, stream) - s.connector.LinkApp(connID, name, nil) + case ClientTypeSource, ClientTypeUpstreamZipper: + s.connector.Add(connID, conn) case ClientTypeStreamFunction: - // when sfn connect, it will provide its name to the server. server will check if this client - // has permission connected to. - if !route.Exists(name) { - // unexpected client connected, close the connection - s.connector.Remove(connID) - // SFN: stream function - err := fmt.Errorf("handshake router validation faild, illegal SFN[%s]", f.Name) - c.CloseWithError(yerr.ErrorCodeRejected, err.Error()) - // break - return err - } - // check app exists in connection list - // if s.connector.ExistsApp(name) { - // err := fmt.Errorf("SFN[%s] connection already exists", f.Name) - // c.CloseWithError(0xCC, err.Error()) - // return err - // } - // check app exists in connection list - // logger.Printf("%sSFN[%s] write GoawayFrame to client", ServerLogPrefix, f.Name) - if s.connector.ExistsApp(name) { + if err := route.Add(connID, f.Name, f.ObserveDataTags); err != nil { logger.Debugf("%swrite to SFN[%s] GoawayFrame", ServerLogPrefix, f.Name) - err := fmt.Errorf("SFN[%s] connection already exists", f.Name) goawayFrame := frame.NewGoawayFrame(err.Error()) if _, err = stream.Write(goawayFrame.Encode()); err != nil { logger.Errorf("%s⛔️ write to SFN[%s] GoawayFrame error:%v", ServerLogPrefix, f.Name, err) return err } - // c.CloseWithError(goawayFrame.Code(), err.Error()) } - s.connector.Add(connID, stream) - // link connection to stream function - s.connector.LinkApp(connID, name, f.ObserveDataTags) - case ClientTypeUpstreamZipper: - s.connector.Add(connID, stream) - s.connector.LinkApp(connID, name, nil) + s.connector.Add(connID, conn) default: // unknown client type s.connector.Remove(connID) @@ -352,7 +335,7 @@ func (s *Server) handleHandshakeFrame(c *Context) error { c.CloseWithError(yerr.ErrorCodeUnknownClient, "Unknown ClientType, illegal!") return errors.New("core.server: Unknown ClientType, illegal") } - logger.Printf("%s❤️ <%s> [%s](%s) is connected!", ServerLogPrefix, clientType, name, connID) + logger.Printf("%s❤️ <%s> [%s](%s) is connected!", ServerLogPrefix, clientType, f.Name, connID) return nil } @@ -377,48 +360,54 @@ func (s *Server) handleDataFrame(c *Context) error { atomic.AddInt64(&s.counterOfDataFrame, 1) // currentIssuer := f.GetIssuer() fromID := c.ConnID() - from, ok := s.connector.AppName(fromID) - if !ok { - logger.Warnf("%shandleDataFrame have connection[%s], but not have function", ServerLogPrefix, fromID) - return nil + from := s.connector.Get(fromID) + if from == nil { + logger.Warnf("%shandleDataFrame connector cannot find %s", ServerLogPrefix, fromID) + return fmt.Errorf("handleDataFrame connector cannot find %s", fromID) } f := c.Frame.(*frame.DataFrame) - // route - name, _ := s.connector.AppName(fromID) - cacheRoute, ok := s.opts.Store.Get(name) - if !ok { - err := fmt.Errorf("get route failure, appName=%s, connID=%s", name, fromID) - logger.Errorf("%shandleDataFrame %s", ServerLogPrefix, err.Error()) - return err + metadata := from.Metadata() + if reflect.ValueOf(metadata).IsNil() && from.ClientType() == ClientTypeUpstreamZipper { + m, err := s.metadataBuilder.Decode(f.GetMetaFrame().Metadata()) + if err != nil { + return err + } + metadata = m } - route := cacheRoute.(Route) - if route == nil { + + // route + route := s.router.Route(metadata) + if reflect.ValueOf(route).IsNil() { logger.Warnf("%shandleDataFrame route is nil", ServerLogPrefix) return fmt.Errorf("handleDataFrame route is nil") } - // get stream function names from route - routes := route.GetForwardRoutes(from) - for _, to := range routes { - toIDs := s.connector.GetConnIDs(to, f.GetDataTag()) - for _, toID := range toIDs { - logger.Debugf("%shandleDataFrame tag=%#x tid=%s, counter=%d, from=[%s](%s), to=[%s](%s)", ServerLogPrefix, f.Tag(), f.TransactionID(), s.counterOfDataFrame, from, fromID, to, toID) - - // write data frame to stream - logger.Infof("%swrite data: [%s](%s) --> [%s](%s)", ServerLogPrefix, from, fromID, to, toID) - if err := s.connector.Write(f, toID); err != nil { - logger.Errorf("%swrite data: [%s](%s) --> [%s](%s), err=%v", ServerLogPrefix, from, fromID, to, toID, err) - continue - } + + // get stream function connection ids from route + connIDs := route.GetForwardRoutes(f.GetDataTag()) + for _, toID := range connIDs { + conn := s.connector.Get(toID) + if conn == nil { + logger.Errorf("%sconn is nil: (%s)", ServerLogPrefix, toID) + continue + } + + to := conn.Name() + logger.Debugf("%shandleDataFrame tag=%#x tid=%s, counter=%d, from=[%s](%s), to=[%s](%s)", ServerLogPrefix, f.Tag(), f.TransactionID(), s.counterOfDataFrame, from.Name(), fromID, to, toID) + + // write data frame to stream + logger.Infof("%swrite data: [%s](%s) --> [%s](%s)", ServerLogPrefix, from, fromID, to, toID) + if err := conn.Write(f); err != nil { + logger.Errorf("%swrite data: [%s](%s) --> [%s](%s), err=%v", ServerLogPrefix, from, fromID, to, toID, err) + continue } } return nil } // StatsFunctions returns the sfn stats of server. -// func (s *Server) StatsFunctions() map[string][]*quic.Stream { -func (s *Server) StatsFunctions() map[string]io.ReadWriteCloser { +func (s *Server) StatsFunctions() map[string]string { return s.connector.GetSnapshot() } @@ -432,26 +421,20 @@ func (s *Server) Downstreams() map[string]*Client { return s.downstreams } -// AddWorkflow register sfn to this server. -// func (s *Server) AddWorkflow(wfs ...Workflow) error { -// for _, wf := range wfs { -// s.router.Add(wf.Seq, wf.Name) -// } -// return nil -// } - -func (s *Server) ConfigRouter(router Router) error { +// ConfigRouter is used to set router by zipper +func (s *Server) ConfigRouter(router Router) { s.mu.Lock() s.router = router logger.Debugf("%sconfig router is %#v", ServerLogPrefix, router) s.mu.Unlock() - return nil } -func (s *Server) Router() Router { +// ConfigMetadataBuilder is used to set metadataBuilder by zipper +func (s *Server) ConfigMetadataBuilder(builder MetadataBuilder) { s.mu.Lock() - defer s.mu.Unlock() - return s.router + s.metadataBuilder = builder + logger.Debugf("%sconfig metadataBuilder is %#v", ServerLogPrefix, builder) + s.mu.Unlock() } // AddDownstreamServer add a downstream server to this server. all the DataFrames will be @@ -477,14 +460,6 @@ func GetConnID(conn quic.Connection) string { func (s *Server) initOptions() { // defaults - // store - if s.opts.Store == nil { - s.opts.Store = store.NewMemoryStore() - } - // auth - // if s.opts.Auths == nil { - // s.opts.Auths = append(s.opts.Auths, auth.NewNoneAuth()) - // } } func (s *Server) validateRouter() error { @@ -494,6 +469,13 @@ func (s *Server) validateRouter() error { return nil } +func (s *Server) validateMetadataBuilder() error { + if s.metadataBuilder == nil { + return errors.New("server's metadataBuilder is nil") + } + return nil +} + func (s *Server) Options() ServerOptions { return s.opts } @@ -502,10 +484,6 @@ func (s *Server) Connector() Connector { return s.connector } -func (s *Server) Store() store.Store { - return s.opts.Store -} - func (s *Server) SetBeforeHandlers(handlers ...FrameHandler) { s.beforeHandlers = append(s.beforeHandlers, handlers...) } diff --git a/core/server_options.go b/core/server_options.go index 726aa30e7..bce8a7bfd 100644 --- a/core/server_options.go +++ b/core/server_options.go @@ -6,7 +6,6 @@ import ( "github.com/lucas-clemente/quic-go" "github.com/yomorun/yomo/core/auth" - "github.com/yomorun/yomo/core/store" ) type ServerOptions struct { @@ -14,7 +13,6 @@ type ServerOptions struct { TLSConfig *tls.Config Addr string Auths []auth.Authentication - Store store.Store Conn net.PacketConn } @@ -34,12 +32,6 @@ func WithAuth(name string, args ...string) ServerOption { } } -func WithStore(store store.Store) ServerOption { - return func(o *ServerOptions) { - o.Store = store - } -} - func WithServerTLSConfig(tc *tls.Config) ServerOption { return func(o *ServerOptions) { o.TLSConfig = tc diff --git a/core/store/memory.go b/core/store/memory.go deleted file mode 100644 index 7a1e7aa11..000000000 --- a/core/store/memory.go +++ /dev/null @@ -1,31 +0,0 @@ -package store - -import ( - "sync" -) - -type MemoryStore struct { - m sync.Map -} - -func NewMemoryStore() *MemoryStore { - return &MemoryStore{ - m: sync.Map{}, - } -} - -func (s *MemoryStore) Set(key interface{}, val interface{}) { - s.m.Store(key, val) -} - -func (s *MemoryStore) Get(key interface{}) (interface{}, bool) { - return s.m.Load(key) -} - -func (s *MemoryStore) Remove(key interface{}) { - s.m.Delete(key) -} - -func (s *MemoryStore) Clean() { - s.m = sync.Map{} -} diff --git a/core/store/store.go b/core/store/store.go deleted file mode 100644 index afa14e3f5..000000000 --- a/core/store/store.go +++ /dev/null @@ -1,8 +0,0 @@ -package store - -type Store interface { - Set(key interface{}, val interface{}) - Get(key interface{}) (interface{}, bool) - Remove(key interface{}) - Clean() -} diff --git a/go.mod b/go.mod index a0296cc08..458296b4e 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/emirpasic/gods v1.15.0 // indirect github.com/fsnotify/fsnotify v1.5.1 // indirect github.com/lucas-clemente/quic-go v0.27.0 + github.com/matoous/go-nanoid/v2 v2.0.0 github.com/onsi/ginkgo v1.16.5 // indirect github.com/reactivex/rxgo/v2 v2.5.0 github.com/stretchr/objx v0.3.0 // indirect diff --git a/go.sum b/go.sum index 5fc1beb0c..4f6196776 100644 --- a/go.sum +++ b/go.sum @@ -96,6 +96,10 @@ github.com/marten-seemann/qtls-go1-17 v0.1.1 h1:DQjHPq+aOzUeh9/lixAGunn6rIOQyWCh github.com/marten-seemann/qtls-go1-17 v0.1.1/go.mod h1:C2ekUKcDdz9SDWxec1N/MvcXBpaX9l3Nx67XaR84L5s= github.com/marten-seemann/qtls-go1-18 v0.1.1 h1:qp7p7XXUFL7fpBvSS1sWD+uSqPvzNQK43DH+/qEkj0Y= github.com/marten-seemann/qtls-go1-18 v0.1.1/go.mod h1:mJttiymBAByA49mhlNZZGrH5u1uXYZJ+RW28Py7f4m4= +github.com/matoous/go-nanoid v1.5.0 h1:VRorl6uCngneC4oUQqOYtO3S0H5QKFtKuKycFG3euek= +github.com/matoous/go-nanoid v1.5.0/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U= +github.com/matoous/go-nanoid/v2 v2.0.0 h1:d19kur2QuLeHmJBkvYkFdhFBzLoo1XVm2GgTpL+9Tj0= +github.com/matoous/go-nanoid/v2 v2.0.0/go.mod h1:FtS4aGPVfEkxKxhdWPAspZpZSh1cOjtM7Ej/So3hR0g= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/microcosm-cc/bluemonday v1.0.1/go.mod h1:hsXNsILzKxV+sX77C5b8FSuKF00vh2OMYv+xgHpAMF4= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -160,6 +164,7 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/metadata.go b/metadata.go new file mode 100644 index 000000000..5962b9fc7 --- /dev/null +++ b/metadata.go @@ -0,0 +1,30 @@ +package yomo + +import ( + "github.com/yomorun/yomo/core" + "github.com/yomorun/yomo/core/frame" +) + +type metadata struct{} + +func (m *metadata) Encode() []byte { + return nil +} + +type metadataBuilder struct { + m *metadata +} + +func newMetadataBuilder() core.MetadataBuilder { + return &metadataBuilder{ + m: &metadata{}, + } +} + +func (builder *metadataBuilder) Build(f *frame.HandshakeFrame) (core.Metadata, error) { + return builder.m, nil +} + +func (builder *metadataBuilder) Decode(buf []byte) (core.Metadata, error) { + return builder.m, nil +} diff --git a/router.go b/router.go index 37b621b22..7cbfc8f94 100644 --- a/router.go +++ b/router.go @@ -1,93 +1,97 @@ package yomo import ( + "fmt" "sync" "github.com/yomorun/yomo/core" "github.com/yomorun/yomo/pkg/config" - "github.com/yomorun/yomo/pkg/logger" ) -// router type router struct { - config *config.WorkflowConfig + r *route } -func newRouter(config *config.WorkflowConfig) core.Router { - return &router{config: config} +func newRouter(functions []config.App) core.Router { + return &router{r: newRoute(functions)} } -// router interface -func (r *router) Route() core.Route { - logger.Debugf("%sworkflowconfig is %#v", zipperLogPrefix, r.config) - return newRoute(r.config) +func (r *router) Route(metadata core.Metadata) core.Route { + return r.r } func (r *router) Clean() { - r.config = nil + r.r = nil } -// route interface type route struct { - data sync.Map + functions []config.App + data map[byte]map[string]string + mu sync.RWMutex } -func newRoute(config *config.WorkflowConfig) *route { - if config == nil { - logger.Errorf("%sworkflowconfig is nil", zipperLogPrefix) - return nil +func newRoute(functions []config.App) *route { + return &route{ + functions: functions, + data: make(map[byte]map[string]string), } - r := route{ - data: sync.Map{}, +} + +func (r *route) Add(connID string, name string, observeDataTags []byte) error { + r.mu.Lock() + defer r.mu.Unlock() + + ok := false + for _, v := range r.functions { + if v.Name == name { + ok = true + break + } } - logger.Debugf("%sworkflowconfig %+v", zipperLogPrefix, *config) - for i, app := range config.Functions { - r.Add(i, app.Name) + if !ok { + return fmt.Errorf("SFN[%s] does not exist in config functions", name) } - return &r -} + for _, conns := range r.data { + for _, n := range conns { + if n == name { + return fmt.Errorf("SFN[%s] is already linked to another connection", name) + } + } + } -func (r *route) Add(index int, name string) { - if r.Exists(name) { - logger.Warnf("%sapp[%s] already exists in workflow and will not be added", zipperLogPrefix, name) - return + for _, tag := range observeDataTags { + conns := r.data[tag] + if conns == nil { + conns = make(map[string]string) + r.data[tag] = conns + } + r.data[tag][connID] = name } - logger.Debugf("%sroute add: %s", zipperLogPrefix, name) - r.data.Store(index, name) + + return nil } -func (r *route) Exists(name string) bool { - var ok bool - logger.Debugf("%srouter[%v] exists name: %s", zipperLogPrefix, r, name) - r.data.Range(func(key interface{}, val interface{}) bool { - if val.(string) == name { - ok = true - return false - } - return true - }) +func (r *route) Remove(connID string) error { + r.mu.Lock() + defer r.mu.Unlock() + + for _, conns := range r.data { + delete(conns, connID) + } - return ok + return nil } -func (r *route) GetForwardRoutes(current string) []string { - idx := -1 - r.data.Range(func(key interface{}, val interface{}) bool { - if val.(string) == current { - idx = key.(int) - return false - } - return true - }) +func (r *route) GetForwardRoutes(tag byte) []string { + r.mu.RLock() + defer r.mu.RUnlock() - routes := make([]string, 0) - r.data.Range(func(key interface{}, val interface{}) bool { - if key.(int) > idx { - routes = append(routes, val.(string)) + var keys []string + if conns := r.data[tag]; conns != nil { + for k := range conns { + keys = append(keys, k) } - return true - }) - - return routes + } + return keys } diff --git a/zipper.go b/zipper.go index 1876f1fff..4b932a449 100644 --- a/zipper.go +++ b/zipper.go @@ -135,8 +135,9 @@ func (z *zipper) ConfigWorkflow(conf string) error { func (z *zipper) configWorkflow(config *config.WorkflowConfig) error { z.wfc = config - // router - return z.server.ConfigRouter(newRouter(config)) + z.server.ConfigMetadataBuilder(newMetadataBuilder()) + z.server.ConfigRouter(newRouter(config.Functions)) + return nil } func (z *zipper) ConfigMesh(url string) error { @@ -244,9 +245,9 @@ func (z *zipper) Close() error { // Stats inspects current server. func (z *zipper) Stats() int { - log.Printf("[%s] all sfn connected: %d", z.name, len(z.server.StatsFunctions())) - for k := range z.server.StatsFunctions() { - log.Printf("[%s] -> ConnID=%v", z.name, k) + log.Printf("[%s] all connections: %d", z.name, len(z.server.StatsFunctions())) + for connID, name := range z.server.StatsFunctions() { + log.Printf("[%s] -> ConnID=%s, Name=%s", z.name, connID, name) } log.Printf("[%s] all downstream zippers connected: %d", z.name, len(z.server.Downstreams()))