From dadd423a99a9345c7539776e71623fa4dc72b837 Mon Sep 17 00:00:00 2001 From: wujunzhuo Date: Mon, 15 Aug 2022 14:47:35 +0800 Subject: [PATCH 1/3] refactor: better client connection state management --- core/client.go | 276 +++++++++++++++----------------------------- core/core.go | 22 ++-- core/server.go | 3 - core/yerr/errors.go | 4 +- 4 files changed, 106 insertions(+), 199 deletions(-) diff --git a/core/client.go b/core/client.go index 86e655d4e..9b56a9d83 100644 --- a/core/client.go +++ b/core/client.go @@ -3,9 +3,9 @@ package core import ( "context" "errors" - "fmt" + "io" "net" - + "strings" "sync" "time" @@ -22,9 +22,6 @@ import ( // ClientOption YoMo client options type ClientOption func(*ClientOptions) -// ConnState describes the state of the connection. -type ConnState = string - // Client is the abstraction of a YoMo-Client. a YoMo-Client can be // Source, Upstream Zipper or StreamFunction. type Client struct { @@ -32,18 +29,18 @@ type Client struct { clientID string // id of the client clientType ClientType // type of the connection conn quic.Connection // quic connection - stream quic.Stream // quic stream + fs *FrameStream // yomo abstract stream state ConnState // state of the connection - processor func(*frame.DataFrame) // functions to invoke when data arrived - receiver func(*frame.BackflowFrame) // functions to invoke when data is processed + processor func(*frame.DataFrame) // function to invoke when data arrived + receiver func(*frame.BackflowFrame) // function to invoke when data is processed + errorfn func(error) // function to invoke when error occured + closefn func() // function to invoke when client closed addr string // the address of server connected to mu sync.Mutex opts ClientOptions localAddr string // client local addr, it will be changed on reconnect logger log.Logger errc chan error - closec chan bool - closed bool } // NewClient creates a new YoMo-Client. @@ -55,7 +52,6 @@ func NewClient(appName string, connType ClientType, opts ...ClientOption) *Clien state: ConnStateReady, opts: ClientOptions{}, errc: make(chan error), - closec: make(chan bool), } c.Init(opts...) once.Do(func() { @@ -75,7 +71,6 @@ func (c *Client) Init(opts ...ClientOption) error { // Connect connects to YoMo-Zipper. func (c *Client) Connect(ctx context.Context, addr string) error { - // TODO: refactor this later as a Connection Manager // reconnect // for download zipper @@ -83,14 +78,17 @@ func (c *Client) Connect(ctx context.Context, addr string) error { go c.reconnect(ctx, addr) // connect - if err := c.connect(ctx, addr); err != nil { - return err - } - - return nil + return c.connect(ctx, addr) } func (c *Client) connect(ctx context.Context, addr string) error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.state != ConnStateReady && c.state != ConnStateDisconnected { + return nil + } + c.addr = addr c.state = ConnStateConnecting @@ -100,6 +98,7 @@ func (c *Client) connect(ctx context.Context, addr string) error { c.state = ConnStateDisconnected return err } + c.conn = conn // quic stream stream, err := conn.OpenStreamSync(ctx) @@ -107,11 +106,8 @@ func (c *Client) connect(ctx context.Context, addr string) error { c.state = ConnStateDisconnected return err } + c.fs = NewFrameStream(stream) - c.stream = stream - c.conn = conn - - c.state = ConnStateAuthenticating // send handshake handshake := frame.NewHandshakeFrame( c.name, @@ -121,118 +117,84 @@ func (c *Client) connect(ctx context.Context, addr string) error { c.opts.Credential.Name(), c.opts.Credential.Payload(), ) - err = c.WriteFrame(handshake) - if err != nil { - c.state = ConnStateRejected + if _, err := c.fs.WriteFrame(handshake); err != nil { + c.state = ConnStateDisconnected return err } + c.state = ConnStateConnected c.localAddr = c.conn.LocalAddr().String() c.logger.Printf("%s❤️ [%s][%s](%s) is connected to YoMo-Zipper %s", ClientLogPrefix, c.name, c.clientID, c.localAddr, addr) // receiving frames - go c.handleFrame() + go func() { + closeConn, closeClient, err := c.handleFrame() + c.logger.Debugf("%shandleFrame: %v, %v, %T, %v", ClientLogPrefix, closeConn, closeClient, err, err) + + c.mu.Lock() + defer c.mu.Unlock() + + if c.state == ConnStateClosed { + return + } + + c.state = ConnStateDisconnected + c.errc <- err + + stream.Close() + if closeConn { + c.conn.CloseWithError(yerr.ErrorCodeClientAbort.To(), err.Error()) + } + + if closeClient { + c.close() + } + }() return nil } // handleFrame handles the logic when receiving frame from server. -func (c *Client) handleFrame() { - // transform raw QUIC stream to wire format - fs := NewFrameStream(c.stream) +func (c *Client) handleFrame() (bool, bool, error) { for { - c.logger.Debugf("%shandleFrame connection state=%v", ClientLogPrefix, c.state) // this will block until a frame is received - f, err := fs.ReadFrame() + f, err := c.fs.ReadFrame() if err != nil { - defer c.stream.Close() - // defer c.conn.CloseWithError(0xD0, err.Error()) - - c.logger.Debugf("%shandleFrame(): %T | %v", ClientLogPrefix, err, err) - if e, ok := err.(*quic.IdleTimeoutError); ok { - c.logger.Errorf("%sconnection timeout, err=%v, zipper=%s", ClientLogPrefix, e, c.addr) - c.setState(ConnStateDisconnected) + if err == io.EOF { + return true, false, err + } else if strings.HasPrefix(err.Error(), "unknown frame type") { + c.logger.Warnf("%s%v", ClientLogPrefix, err) + continue + } else if e, ok := err.(*quic.IdleTimeoutError); ok { + return false, false, e } else if e, ok := err.(*quic.ApplicationError); ok { - c.setState(ConnStateDisconnected) - c.logger.Infof("%sapplication error, err=%v, errcode=%v", ClientLogPrefix, e, e.ErrorCode) - if yerr.Is(e.ErrorCode, yerr.ErrorCodeRejected) { - // if connection is rejected(eg: authenticate fails) from server - c.logger.Errorf("%sIllegal client, server rejected.", ClientLogPrefix) - c.setState(ConnStateRejected) - break - } else if yerr.Is(e.ErrorCode, yerr.ErrorCodeClientAbort) { - // client abort - c.logger.Infof("%sclient close the connection", ClientLogPrefix) - c.setState(ConnStateAborted) - break - } else if yerr.Is(e.ErrorCode, yerr.ErrorCodeGoaway) { - // server goaway - c.logger.Infof("%sserver goaway the connection", ClientLogPrefix) - c.setState(ConnStateGoaway) - break - } else if yerr.Is(e.ErrorCode, yerr.ErrorCodeHandshake) { - // handshake - c.logger.Errorf("%shandshake fails", ClientLogPrefix) - c.setState(ConnStateRejected) - break - } + return false, e.ErrorCode == yerr.ErrorCodeGoaway.To() || e.ErrorCode == yerr.ErrorCodeRejected.To(), e } else if errors.Is(err, net.ErrClosed) { - // if client close the connection, net.ErrClosed will be raise - c.logger.Errorf("%sconnection is closed, err=%v", ClientLogPrefix, err) - c.setState(ConnStateDisconnected) - // by quic-go IdleTimeoutError after connection's KeepAlive config. - break - } else { - // any error occurred, we should close the stream - // after this, conn.AcceptStream() will raise the error - c.setState(ConnStateClosed) - c.conn.CloseWithError(yerr.To(yerr.ErrorCodeUnknown), err.Error()) - c.logger.Errorf("%sunknown error occurred, err=%v, state=%v", ClientLogPrefix, err, c.getState()) - break + return false, false, err } + return true, false, err } - if f == nil { - break - } + // read frame // first, get frame type frameType := f.Type() c.logger.Debugf("%stype=%s, frame=%# x", ClientLogPrefix, frameType, frame.Shortly(f.Encode())) switch frameType { - case frame.TagOfHandshakeFrame: - if v, ok := f.(*frame.HandshakeFrame); ok { - c.logger.Debugf("%sreceive HandshakeFrame, name=%v", ClientLogPrefix, v.Name) - } - case frame.TagOfPongFrame: - c.setState(ConnStatePong) - case frame.TagOfAcceptedFrame: - c.setState(ConnStateAccepted) case frame.TagOfRejectedFrame: - c.setState(ConnStateRejected) if v, ok := f.(*frame.RejectedFrame); ok { - c.logger.Errorf("%s🔑 receive RejectedFrame, message=%s", ClientLogPrefix, v.Message()) - c.conn.CloseWithError(yerr.To(yerr.ErrorCodeRejected), v.Message()) - c.errc <- errors.New(v.Message()) - break + return true, true, errors.New(v.Message()) } case frame.TagOfGoawayFrame: - c.setState(ConnStateGoaway) if v, ok := f.(*frame.GoawayFrame); ok { - c.logger.Errorf("%s⛔️ receive GoawayFrame, message=%s", ClientLogPrefix, v.Message()) - c.conn.CloseWithError(yerr.To(yerr.ErrorCodeGoaway), v.Message()) - c.errc <- errors.New(v.Message()) - break + return true, true, errors.New(v.Message()) } case frame.TagOfDataFrame: // DataFrame carries user's data if v, ok := f.(*frame.DataFrame); ok { - c.setState(ConnStateTransportData) c.logger.Debugf("%sreceive DataFrame, tag=%#x, tid=%s, carry=%# x", ClientLogPrefix, v.GetDataTag(), v.TransactionID(), v.GetCarriage()) if c.processor == nil { c.logger.Warnf("%sprocessor is nil", ClientLogPrefix) } else { - // TODO: should c.processor accept a DataFrame as parameter? - // c.processor(v.GetDataTagID(), v.GetCarriage(), v.GetMetaFrame()) c.processor(v) } } @@ -242,100 +204,51 @@ func (c *Client) handleFrame() { if c.receiver == nil { c.logger.Warnf("%sreceiver is nil", ClientLogPrefix) } else { - c.setState(ConnStateBackflow) c.receiver(v) } } default: - c.logger.Errorf("%sunknown signal", ClientLogPrefix) + c.logger.Warnf("%sunknown or unsupported frame %#x", ClientLogPrefix, frameType) } } } // Close the client. -func (c *Client) Close() (err error) { - if c.conn != nil { - c.logger.Printf("%sclose the connection, name:%s, id:%s, addr:%s", ClientLogPrefix, c.name, c.clientID, c.conn.RemoteAddr().String()) - } - if c.stream != nil { - err = c.stream.Close() - if err != nil { - c.logger.Errorf("%s stream.Close(): %v", ClientLogPrefix, err) - } - } - if c.conn != nil { - err = c.conn.CloseWithError(0, "client-ask-to-close-this-connection") - if err != nil { - c.logger.Errorf("%s connection.Close(): %v", ClientLogPrefix, err) - } - } - // close channel +func (c *Client) Close() error { c.mu.Lock() - if !c.closed { - close(c.errc) - close(c.closec) - c.closed = true + defer c.mu.Unlock() + + if c.state == ConnStateClosed { + return nil } - c.mu.Unlock() - return err + c.conn.CloseWithError(yerr.ErrorCodeClientAbort.To(), "client ask to close") + return c.close() +} + +func (c *Client) close() error { + c.logger.Printf("%s💔 close the connection, name:%s, id:%s, addr:%s", ClientLogPrefix, c.name, c.clientID, c.addr) + + // close error channel so that close handler function will be called + close(c.errc) + + c.state = ConnStateClosed + return nil } // WriteFrame writes a frame to the connection, gurantee threadsafe. func (c *Client) WriteFrame(frm frame.Frame) error { - // write on QUIC stream - if c.stream == nil { - return errors.New("stream is nil") - } - if c.state == ConnStateDisconnected || c.state == ConnStateRejected { - return fmt.Errorf("client connection state is %s", c.state) - } c.logger.Debugf("%s[%s](%s)@%s WriteFrame() will write frame: %s", ClientLogPrefix, c.name, c.localAddr, c.state, frm.Type()) - data := frm.Encode() - // emit raw bytes of Frame - c.mu.Lock() - n, err := c.stream.Write(data) - c.mu.Unlock() - c.logger.Debugf("%sWriteFrame() wrote n=%d, data=%# x", ClientLogPrefix, n, frame.Shortly(data)) - if err != nil { - c.setState(ConnStateDisconnected) - // c.state = ConnStateDisconnected - if e, ok := err.(*quic.IdleTimeoutError); ok { - c.logger.Errorf("%sWriteFrame() connection timeout, err=%v", ClientLogPrefix, e) - } else { - c.logger.Errorf("%sWriteFrame() wrote error=%v", ClientLogPrefix, err) - return err - } + if c.state != ConnStateConnected { + return errors.New("client connection isn't connected") } - if n != len(data) { - err := errors.New("[client] yomo Client .Write() wroten error") - c.logger.Errorf("%s error:%v", ClientLogPrefix, err) + + if _, err := c.fs.WriteFrame(frm); err != nil { return err } - return err -} - -// update connection state -func (c *Client) setState(state ConnState) { - c.logger.Debugf("setState to:%s", state) - c.mu.Lock() - c.state = state - c.mu.Unlock() -} - -// getState get connection state -func (c *Client) getState() ConnState { - c.mu.Lock() - defer c.mu.Unlock() - return c.state -} -// update connection local addr -func (c *Client) setLocalAddr(addr string) { - c.mu.Lock() - c.localAddr = addr - c.mu.Unlock() + return nil } // SetDataFrameObserver sets the data frame handler. @@ -360,11 +273,16 @@ func (c *Client) reconnect(ctx context.Context, addr string) { case <-ctx.Done(): c.logger.Debugf("%s[%s](%s) context.Done()", ClientLogPrefix, c.name, c.localAddr) return - case <-c.closec: - c.logger.Debugf("%s[%s](%s) close channel", ClientLogPrefix, c.name, c.localAddr) - return + case err, ok := <-c.errc: + if c.errorfn != nil && err != nil { + c.errorfn(err) + } + if !ok && c.closefn != nil { + c.closefn() + return + } case <-t.C: - if c.getState() == ConnStateDisconnected { + if c.state == ConnStateDisconnected { c.logger.Printf("%s[%s][%s](%s) is reconnecting to YoMo-Zipper %s...", ClientLogPrefix, c.name, c.clientID, c.localAddr, addr) err := c.connect(ctx, addr) if err != nil { @@ -451,14 +369,12 @@ func (c *Client) Logger() log.Logger { // SetErrorHandler set error handler func (c *Client) SetErrorHandler(fn func(err error)) { - if fn != nil { - go func() { - err := <-c.errc - if err != nil { - fn(err) - } - }() - } + c.errorfn = fn +} + +// SetCloseHandler set close handler +func (c *Client) SetCloseHandler(fn func()) { + c.closefn = fn } // ClientID return the client ID diff --git a/core/core.go b/core/core.go index 4dc282fa6..3b00cc885 100644 --- a/core/core.go +++ b/core/core.go @@ -10,22 +10,16 @@ var ( once sync.Once ) +// ConnState represents the state of the connection. +type ConnState = string + // ConnState represents the state of a connection. const ( - ConnStateReady ConnState = "Ready" - ConnStateDisconnected ConnState = "Disconnected" - ConnStateConnecting ConnState = "Connecting" - ConnStateConnected ConnState = "Connected" - ConnStateAuthenticating ConnState = "Authenticating" - ConnStateAccepted ConnState = "Accepted" - ConnStateRejected ConnState = "Rejected" - ConnStatePing ConnState = "Ping" - ConnStatePong ConnState = "Pong" - ConnStateTransportData ConnState = "TransportData" - ConnStateAborted ConnState = "Aborted" - ConnStateClosed ConnState = "Closed" // close connection by server - ConnStateGoaway ConnState = "Goaway" - ConnStateBackflow ConnState = "Backflow" + ConnStateReady ConnState = "Ready" + ConnStateDisconnected ConnState = "Disconnected" + ConnStateConnecting ConnState = "Connecting" + ConnStateConnected ConnState = "Connected" + ConnStateClosed ConnState = "Closed" ) // Prefix is the prefix for logger. diff --git a/core/server.go b/core/server.go index 8e4445c48..69daff318 100644 --- a/core/server.go +++ b/core/server.go @@ -37,7 +37,6 @@ type ConnectionHandler func(conn quic.Connection) // Server is the underlining server of Zipper type Server struct { name string - state string connector Connector router Router metadataBuilder MetadataBuilder @@ -112,8 +111,6 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { // defer listener.Close() logger.Printf("%s✅ [%s][%d] Listening on: %s, MODE: %s, QUIC: %v, AUTH: %s", ServerLogPrefix, s.name, os.Getpid(), listener.Addr(), mode(), listener.Versions(), s.authNames()) - s.state = ConnStateConnected - // loop for { // create a new connection when new yomo-client connected sctx, cancel := context.WithCancel(ctx) diff --git a/core/yerr/errors.go b/core/yerr/errors.go index 3adda7897..2b9228b85 100644 --- a/core/yerr/errors.go +++ b/core/yerr/errors.go @@ -96,8 +96,8 @@ func Parse(qerr quic.ApplicationErrorCode) ErrorCode { } // To convert yomo ErrorCode to quic ApplicationErrorCode -func To(code ErrorCode) quic.ApplicationErrorCode { - return quic.ApplicationErrorCode(code) +func (e ErrorCode) To() quic.ApplicationErrorCode { + return quic.ApplicationErrorCode(e) } // DuplicateNameError duplicate name(sfn) From d42ef6051670195f6eb6b67813f329eb608a11d0 Mon Sep 17 00:00:00 2001 From: wujunzhuo Date: Fri, 26 Aug 2022 16:27:44 +0800 Subject: [PATCH 2/3] return error code to server --- core/client.go | 6 +++++- core/yerr/errors.go | 25 +++++++++++++++++++++---- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/core/client.go b/core/client.go index 9b56a9d83..fc9f33ede 100644 --- a/core/client.go +++ b/core/client.go @@ -144,7 +144,11 @@ func (c *Client) connect(ctx context.Context, addr string) error { stream.Close() if closeConn { - c.conn.CloseWithError(yerr.ErrorCodeClientAbort.To(), err.Error()) + code := yerr.ErrorCodeClientAbort + if e, ok := err.(yerr.YomoError); ok { + code = e.ErrorCode() + } + c.conn.CloseWithError(code.To(), err.Error()) } if closeClient { diff --git a/core/yerr/errors.go b/core/yerr/errors.go index 2b9228b85..9aa963a90 100644 --- a/core/yerr/errors.go +++ b/core/yerr/errors.go @@ -7,23 +7,35 @@ import ( ) // YomoError yomo error -type YomoError struct { +type YomoError interface { + error + // Errorcode getter method + ErrorCode() ErrorCode +} + +type yomoError struct { errorCode ErrorCode err error } // New create yomo error -func New(code ErrorCode, err error) *YomoError { - return &YomoError{ +func New(code ErrorCode, err error) YomoError { + return &yomoError{ errorCode: code, err: err, } } -func (e *YomoError) Error() string { +// Error is the built-in error interface +func (e *yomoError) Error() string { return fmt.Sprintf("%s error: message=%s", e.errorCode, e.err.Error()) } +// Errorcode getter method +func (e *yomoError) ErrorCode() ErrorCode { + return e.errorCode +} + // ErrorCode error code type ErrorCode uint64 @@ -119,6 +131,11 @@ func (e DuplicateNameError) Error() string { return e.err.Error() } +// Errorcode getter method +func (e DuplicateNameError) ErrorCode() ErrorCode { + return ErrorCodeDuplicateName +} + // ConnID duplicate connection ID func (e DuplicateNameError) ConnID() string { return e.connID From eb85ab560ac2f624e6e4e341644218e2bd08b758 Mon Sep 17 00:00:00 2001 From: wujunzhuo Date: Fri, 26 Aug 2022 16:49:35 +0800 Subject: [PATCH 3/3] codacy --- core/yerr/errors.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/yerr/errors.go b/core/yerr/errors.go index 9aa963a90..bd5c51247 100644 --- a/core/yerr/errors.go +++ b/core/yerr/errors.go @@ -9,7 +9,7 @@ import ( // YomoError yomo error type YomoError interface { error - // Errorcode getter method + // ErrorCode getter method ErrorCode() ErrorCode } @@ -31,7 +31,7 @@ func (e *yomoError) Error() string { return fmt.Sprintf("%s error: message=%s", e.errorCode, e.err.Error()) } -// Errorcode getter method +// ErrorCode getter method func (e *yomoError) ErrorCode() ErrorCode { return e.errorCode } @@ -131,7 +131,7 @@ func (e DuplicateNameError) Error() string { return e.err.Error() } -// Errorcode getter method +// ErrorCode getter method func (e DuplicateNameError) ErrorCode() ErrorCode { return ErrorCodeDuplicateName }