From 109974f44be16969214db90cfec20b636c7d0bb8 Mon Sep 17 00:00:00 2001 From: woorui Date: Sat, 10 Dec 2022 02:08:09 +0800 Subject: [PATCH 01/16] feat(logger): add slog support --- core/ylog/example_test.go | 42 ++++++++++++ core/ylog/logger.go | 134 +++++++++++++++++++++++++++++++++++++ core/ylog/slog_handler.go | 136 ++++++++++++++++++++++++++++++++++++++ go.mod | 3 +- go.sum | 6 +- 5 files changed, 318 insertions(+), 3 deletions(-) create mode 100644 core/ylog/example_test.go create mode 100644 core/ylog/logger.go create mode 100644 core/ylog/slog_handler.go diff --git a/core/ylog/example_test.go b/core/ylog/example_test.go new file mode 100644 index 000000000..01a86d2db --- /dev/null +++ b/core/ylog/example_test.go @@ -0,0 +1,42 @@ +package ylog_test + +import ( + "io" + "net" + + "github.com/yomorun/yomo/core/ylog" +) + +func Example() { + // text format logger + logger := ylog.NewFromConfig(ylog.Config{ + Level: "warn", + Format: "text", + ErrorOutput: "stdout", + DisableTime: true, + }) + + ylog.SetDefault(logger.With("hello", "yomo").WithGroup("syslog")) + + ylog.Debug("debug", "aaa", "bbb") + ylog.Info("info", "ccc", "ddd") + ylog.Warn("warn", "eee", "fff") + ylog.Error("error", io.EOF, "eee", "fff") + + // json format logger + sysLogger := ylog.NewFromConfig(ylog.Config{ + Level: "error", + Format: "json", + ErrorOutput: "stdout", + DisableTime: true, + }) + + sysLogger = sysLogger.WithGroup("syslog") + + sysLogger.Error("sys error", net.ErrClosed, "ggg", "hhh") + + // Output: + // level=WARN msg=warn hello=yomo syslog.eee=fff + // level=ERROR msg=error hello=yomo syslog.eee=fff syslog.err=EOF + // {"level":"ERROR","msg":"sys error","syslog":{"ggg":"hhh","err":"use of closed network connection"}} +} diff --git a/core/ylog/logger.go b/core/ylog/logger.go new file mode 100644 index 000000000..ac26d45a2 --- /dev/null +++ b/core/ylog/logger.go @@ -0,0 +1,134 @@ +// package ylog provides a slog.Logger instance for logging. +// ylog also provides a default slog.Logger, the default logger is build from environment. +// +// ylog allows to call log api directly, like: +// +// ylog.Debug("test", "name", "yomo") +// ylog.Info("test", "name", "yomo") +// ylog.Warn("test", "name", "yomo") +// ylog.Error("test", "name", "yomo") +package ylog + +import ( + "io" + "log" + "os" + "strconv" + "strings" + + "github.com/caarlos0/env/v6" + "golang.org/x/exp/slog" +) + +var defaultLogger = Default() + +// SetDefault set global logger. +func SetDefault(logger *slog.Logger) { defaultLogger = logger } + +// Debug logs a message at debug level. +func Debug(msg string, keyvals ...interface{}) { + defaultLogger.Debug(msg, keyvals...) +} + +// Info logs a message at info level. +func Info(msg string, keyvals ...interface{}) { + defaultLogger.Info(msg, keyvals...) +} + +// Warn logs a message at warn level. +func Warn(msg string, keyvals ...interface{}) { + defaultLogger.Warn(msg, keyvals...) +} + +// Error logs a message at error level. +func Error(msg string, err error, keyvals ...interface{}) { + defaultLogger.Error(msg, err, keyvals...) +} + +// Config is the config of slog, the config is from environment. +type Config struct { + // Verbose indicates if logger log code line, use false for production. + Verbose bool `env:"YOMO_LOG_VERBOSE" envDefault:"false"` + + // the log level, It can be one of `debug`, `info`, `warn`, `error` + Level string `env:"YOMO_LOG_LEVEL" envDefault:"info"` + + // log output file path, It's stdout if not set. + Output string `env:"YOMO_LOG_OUTPUT"` + + // error log output file path, It's stderr if not set. + ErrorOutput string `env:"YOMO_LOG_ERROR_OUTPUT"` + + // log format, support text and json. + Format string `env:"YOMO_LOG_FORMAT" envDefault:"text"` + + // DisableTime disable time key, It's a pravited field, Just for testing. + DisableTime bool +} + +// DebugFrameSize is use for log dataFrame, +// It means that only logs the first DebugFrameSize bytes if the data is large than DebugFrameSize bytes. +// +// DebugFrameSize is default to 16, +// if env `YOMO_DEBUG_FRAME_SIZE` is setted and It's an int number, Set the env value to DebugFrameSize. +var DebugFrameSize = 16 + +func init() { + if e := os.Getenv("YOMO_DEBUG_FRAME_SIZE"); e != "" { + if val, err := strconv.Atoi(e); err == nil { + DebugFrameSize = val + } + } +} + +// Default returns a slog.Logger according to enviroment. +func Default() *slog.Logger { + var conf Config + if err := env.Parse(&conf); err != nil { + log.Fatalf("%+v\n", err) + } + return NewFromConfig(conf) +} + +// NewFromConfig returns a slog.Logger according to conf. +func NewFromConfig(conf Config) *slog.Logger { + return slog.New(NewHandlerFromConfig(conf)) +} + +func parseToWriter(path string, defaultWriter io.Writer) (io.Writer, error) { + switch strings.ToLower(path) { + case "stdout": + return os.Stdout, nil + case "stderr": + return os.Stderr, nil + default: + if path != "" { + return os.Open(path) + } + return defaultWriter, nil + } +} + +func mustParseToWriter(path string, defaultWriter io.Writer) io.Writer { + w, err := parseToWriter(path, defaultWriter) + if err != nil { + panic(err) + } + return w +} + +func parseToSlogLevel(stringLevel string) slog.Level { + var level = slog.DebugLevel + switch strings.ToLower(stringLevel) { + case "debug": + level = slog.DebugLevel + case "info": + level = slog.InfoLevel + case "warn": + level = slog.WarnLevel + case "error": + level = slog.ErrorLevel + } + + return level +} diff --git a/core/ylog/slog_handler.go b/core/ylog/slog_handler.go new file mode 100644 index 000000000..d8a2a5c00 --- /dev/null +++ b/core/ylog/slog_handler.go @@ -0,0 +1,136 @@ +// package provides handler that supports spliting log stream to common log stream and error log stream. +package ylog + +import ( + "bytes" + "io" + "os" + "strings" + "sync" + + "golang.org/x/exp/slog" +) + +// handler supports spliting log stream to common log stream and error log stream. +type handler struct { + slog.Handler + + buf *asyncBuffer + + writer io.Writer + errWriter io.Writer +} + +type asyncBuffer struct { + sync.Mutex + underlying *bytes.Buffer +} + +func newAsyncBuffer(cap int) *asyncBuffer { + return &asyncBuffer{ + underlying: bytes.NewBuffer(make([]byte, cap)), + } +} + +func (buf *asyncBuffer) Write(b []byte) (int, error) { + buf.Lock() + defer buf.Unlock() + + return buf.underlying.Write(b) +} + +func (buf *asyncBuffer) Read(p []byte) (int, error) { + buf.Lock() + defer buf.Unlock() + + return buf.underlying.Read(p) +} + +func (buf *asyncBuffer) Reset() { + buf.Lock() + defer buf.Unlock() + + buf.underlying.Reset() +} + +// NewHandlerFromConfig creates a slog.Handler from conf +func NewHandlerFromConfig(conf Config) slog.Handler { + buf := newAsyncBuffer(0) + + h := bufferedSlogHandler( + buf, + conf.Format, + parseToSlogLevel(conf.Level), + conf.Verbose, + conf.DisableTime, + ) + + return &handler{ + Handler: h, + buf: buf, + writer: mustParseToWriter(conf.Output, os.Stdout), + errWriter: mustParseToWriter(conf.ErrorOutput, os.Stderr), + } +} + +func (h *handler) Enabled(level slog.Level) bool { + return h.Handler.Enabled(level) +} + +func (h *handler) Handle(r slog.Record) error { + err := h.Handler.Handle(r) + if err != nil { + return err + } + + if r.Level == slog.ErrorLevel { + _, err = io.Copy(h.errWriter, h.buf) + } else { + _, err = io.Copy(h.writer, h.buf) + } + h.buf.Reset() + + return err +} + +func (h *handler) WithAttrs(as []slog.Attr) slog.Handler { + return &handler{ + buf: h.buf, + writer: h.writer, + errWriter: h.errWriter, + Handler: h.Handler.WithAttrs(as), + } +} + +func (h *handler) WithGroup(name string) slog.Handler { + return &handler{ + buf: h.buf, + writer: h.writer, + errWriter: h.errWriter, + Handler: h.Handler.WithGroup(name), + } +} + +func bufferedSlogHandler(buf io.Writer, format string, level slog.Level, verbose, disableTime bool) slog.Handler { + opt := slog.HandlerOptions{ + AddSource: verbose, + Level: level, + } + if disableTime { + opt.ReplaceAttr = func(a slog.Attr) slog.Attr { + if a.Key == "time" { + return slog.Attr{} + } + return a + } + } + + var h slog.Handler + if strings.ToLower(format) == "json" { + h = opt.NewJSONHandler(buf) + } else { + h = opt.NewTextHandler(buf) + } + + return h +} diff --git a/go.mod b/go.mod index 0c7933c36..226b5fd12 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.19 require ( github.com/briandowns/spinner v1.19.0 github.com/bytecodealliance/wasmtime-go v1.0.0 + github.com/caarlos0/env/v6 v6.10.1 github.com/cenkalti/backoff/v4 v4.1.3 github.com/fatih/color v1.13.0 github.com/joho/godotenv v1.4.0 @@ -18,6 +19,7 @@ require ( github.com/stretchr/testify v1.8.1 github.com/yomorun/y3 v1.0.5 go.uber.org/zap v1.23.0 + golang.org/x/exp v0.0.0-20221204150635-6dcec336b2bb golang.org/x/tools v0.3.0 gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/yaml.v3 v3.0.1 @@ -51,7 +53,6 @@ require ( go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.8.0 // indirect golang.org/x/crypto v0.1.0 // indirect - golang.org/x/exp v0.0.0-20221031165847-c99f073a8326 // indirect golang.org/x/mod v0.7.0 // indirect golang.org/x/net v0.2.0 // indirect golang.org/x/sys v0.2.0 // indirect diff --git a/go.sum b/go.sum index 67a6f741f..d4292a206 100644 --- a/go.sum +++ b/go.sum @@ -44,6 +44,8 @@ github.com/briandowns/spinner v1.19.0 h1:s8aq38H+Qju89yhp89b4iIiMzMm8YN3p6vGpwyh github.com/briandowns/spinner v1.19.0/go.mod h1:mQak9GHqbspjC/5iUx3qMlIho8xBS/ppAL/hX5SmPJU= github.com/bytecodealliance/wasmtime-go v1.0.0 h1:9u9gqaUiaJeN5IoD1L7egD8atOnTGyJcNp8BhkL9cUU= github.com/bytecodealliance/wasmtime-go v1.0.0/go.mod h1:jjlqQbWUfVSbehpErw3UoWFndBXRRMvfikYH6KsCwOg= +github.com/caarlos0/env/v6 v6.10.1 h1:t1mPSxNpei6M5yAeu1qtRdPAK29Nbcf/n3G7x+b3/II= +github.com/caarlos0/env/v6 v6.10.1/go.mod h1:hvp/ryKXKipEkcuYjs9mI4bBCg+UI0Yhgm5Zu0ddvwc= github.com/cenkalti/backoff/v4 v4.0.0/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg= github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= @@ -274,8 +276,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20221031165847-c99f073a8326 h1:QfTh0HpN6hlw6D3vu8DAwC8pBIwikq0AI1evdm+FksE= -golang.org/x/exp v0.0.0-20221031165847-c99f073a8326/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= +golang.org/x/exp v0.0.0-20221204150635-6dcec336b2bb h1:QIsP/NmClBICkqnJ4rSIhnrGiGR7Yv9ZORGGnmmLTPk= +golang.org/x/exp v0.0.0-20221204150635-6dcec336b2bb/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= From ab38a911b3f98cc990f749fc12a7078df79b1217 Mon Sep 17 00:00:00 2001 From: woorui Date: Mon, 12 Dec 2022 14:42:18 +0800 Subject: [PATCH 02/16] refactor(core): use slog --- core/client.go | 43 ++++---- core/client_options.go | 14 +-- core/client_test.go | 4 +- core/connection.go | 15 ++- core/connector.go | 13 +-- core/context.go | 48 ++++++++- core/core.go | 7 -- core/frame/frame.go | 14 +-- core/listener_default.go | 10 +- core/log/logger.go | 57 ----------- core/log/logger_test.go | 15 --- core/server.go | 216 +++++++++++++++++++-------------------- core/server_options.go | 68 ++++++++---- core/server_test.go | 16 ++- 14 files changed, 268 insertions(+), 272 deletions(-) delete mode 100644 core/log/logger.go delete mode 100644 core/log/logger_test.go diff --git a/core/client.go b/core/client.go index a244f4a30..9be98d7d2 100644 --- a/core/client.go +++ b/core/client.go @@ -12,9 +12,9 @@ import ( "github.com/lucas-clemente/quic-go" "github.com/yomorun/yomo/core/frame" - "github.com/yomorun/yomo/core/log" "github.com/yomorun/yomo/core/yerr" "github.com/yomorun/yomo/pkg/id" + "golang.org/x/exp/slog" ) // ClientOption YoMo client options @@ -37,7 +37,7 @@ type Client struct { mu sync.Mutex opts *clientOptions localAddr string // client local addr, it will be changed on reconnect - logger log.Logger + logger *slog.Logger errc chan error } @@ -48,15 +48,18 @@ func NewClient(appName string, connType ClientType, opts ...ClientOption) *Clien for _, o := range opts { o(option) } + clientID := id.New() + + logger := slog.With("component", "client", "type", connType.String(), "client_id", clientID, "client_name", appName) return &Client{ name: appName, - clientID: id.New(), + clientID: clientID, clientType: connType, state: ConnStateReady, opts: option, errc: make(chan error), - logger: option.logger, + logger: logger, } } @@ -121,12 +124,14 @@ func (c *Client) connect(ctx context.Context, addr string) error { c.state = ConnStateConnected c.localAddr = c.conn.LocalAddr().String() - c.logger.Printf("%sā¤ļø [%s][%s](%s) is connected to YoMo-Zipper %s", ClientLogPrefix, c.name, c.clientID, c.localAddr, addr) + c.logger = slog.With("local_addr", c.localAddr, "reomote", c.RemoteAddr()) + + c.logger.Debug("connected to YoMo-Zipper") // receiving frames go func() { closeConn, closeClient, err := c.handleFrame() - c.logger.Debugf("%shandleFrame: %v, %v, %T, %v", ClientLogPrefix, closeConn, closeClient, err, err) + c.logger.Debug("connected to YoMo-Zipper", "close_conn", closeConn, "close_client", closeClient, "error", err) c.mu.Lock() defer c.mu.Unlock() @@ -164,7 +169,7 @@ func (c *Client) handleFrame() (bool, bool, error) { if err == io.EOF { return true, false, err } else if strings.HasPrefix(err.Error(), "unknown frame type") { - c.logger.Warnf("%s%v", ClientLogPrefix, err) + c.logger.Warn("unknown frame type", "error", err) continue } else if e, ok := err.(*quic.IdleTimeoutError); ok { return false, false, e @@ -179,7 +184,7 @@ func (c *Client) handleFrame() (bool, bool, error) { // read frame // first, get frame type frameType := f.Type() - c.logger.Debugf("%shandleFrame: %v", ClientLogPrefix, frameType) + c.logger.Debug("handleFrame", "frame_type", frameType) switch frameType { case frame.TagOfRejectedFrame: if v, ok := f.(*frame.RejectedFrame); ok { @@ -192,7 +197,7 @@ func (c *Client) handleFrame() (bool, bool, error) { case frame.TagOfDataFrame: // DataFrame carries user's data if v, ok := f.(*frame.DataFrame); ok { if c.processor == nil { - c.logger.Warnf("%sprocessor is nil", ClientLogPrefix) + c.logger.Warn("processor is nil") } else { c.processor(v) } @@ -200,13 +205,13 @@ func (c *Client) handleFrame() (bool, bool, error) { case frame.TagOfBackflowFrame: if v, ok := f.(*frame.BackflowFrame); ok { if c.receiver == nil { - c.logger.Warnf("%sreceiver is nil", ClientLogPrefix) + c.logger.Warn("receiver is nil") } else { c.receiver(v) } } default: - c.logger.Warnf("%sunknown or unsupported frame %#x", ClientLogPrefix, frameType) + c.logger.Warn("unknown or unsupported frame", "frame_type", frameType.String()) } } } @@ -228,7 +233,7 @@ func (c *Client) Close() error { } func (c *Client) close() error { - c.logger.Printf("%sšŸ’” close the connection, name:%s, id:%s, addr:%s", ClientLogPrefix, c.name, c.clientID, c.addr) + c.logger.Debug("close the connection") // close error channel so that close handler function will be called close(c.errc) @@ -239,7 +244,7 @@ func (c *Client) close() error { // WriteFrame writes a frame to the connection, gurantee threadsafe. func (c *Client) WriteFrame(frm frame.Frame) error { - c.logger.Debugf("%s[%s](%s)@%s WriteFrame() will write frame: %s", ClientLogPrefix, c.name, c.localAddr, c.State(), frm.Type()) + c.logger.Debug("close the connection", "client_state", c.State(), "frame_type", frm.Type().String()) if c.state != ConnStateConnected { return errors.New("client connection isn't connected") @@ -255,13 +260,13 @@ func (c *Client) WriteFrame(frm frame.Frame) error { // SetDataFrameObserver sets the data frame handler. func (c *Client) SetDataFrameObserver(fn func(*frame.DataFrame)) { c.processor = fn - c.logger.Debugf("%sSetDataFrameObserver(%v)", ClientLogPrefix, c.processor) + c.logger.Debug("SetDataFrameObserver", "processor", c.processor) } // SetBackflowFrameObserver sets the backflow frame handler. func (c *Client) SetBackflowFrameObserver(fn func(*frame.BackflowFrame)) { c.receiver = fn - c.logger.Debugf("%sSetBackflowFrameObserver(%v)", ClientLogPrefix, c.receiver) + c.logger.Debug("SetBackflowFrameObserver", "receiver", c.receiver) } // reconnect the connection between client and server. @@ -272,7 +277,7 @@ func (c *Client) reconnect(ctx context.Context, addr string) { for { select { case <-ctx.Done(): - c.logger.Debugf("%s[%s](%s) context.Done()", ClientLogPrefix, c.name, c.localAddr) + c.logger.Debug("context.Done", "receiver", "error", ctx.Err()) return case err, ok := <-c.errc: if c.errorfn != nil && err != nil { @@ -287,10 +292,10 @@ func (c *Client) reconnect(ctx context.Context, addr string) { state := c.state c.mu.Unlock() if state == ConnStateDisconnected { - c.logger.Printf("%s[%s][%s](%s) is reconnecting to YoMo-Zipper %s...", ClientLogPrefix, c.name, c.clientID, c.localAddr, addr) + c.logger.Debug("reconnecting to YoMo-Zipper") err := c.connect(ctx, addr) if err != nil { - c.logger.Errorf("%s[%s][%s](%s) reconnect error:%v", ClientLogPrefix, c.name, c.clientID, c.localAddr, err) + c.logger.Error("reconnecting to YoMo-Zipper", err) } } } @@ -307,7 +312,7 @@ func (c *Client) SetObserveDataTags(tag ...frame.Tag) { } // Logger get client's logger instance, you can customize this using `yomo.WithLogger` -func (c *Client) Logger() log.Logger { +func (c *Client) Logger() *slog.Logger { return c.logger } diff --git a/core/client_options.go b/core/client_options.go index dc92329b7..fc7d6667b 100644 --- a/core/client_options.go +++ b/core/client_options.go @@ -7,9 +7,9 @@ import ( "github.com/lucas-clemente/quic-go" "github.com/yomorun/yomo/core/auth" "github.com/yomorun/yomo/core/frame" - "github.com/yomorun/yomo/core/log" - "github.com/yomorun/yomo/pkg/logger" + "github.com/yomorun/yomo/core/ylog" pkgtls "github.com/yomorun/yomo/pkg/tls" + "golang.org/x/exp/slog" ) // clientOptions are the options for YoMo client. @@ -18,11 +18,11 @@ type clientOptions struct { quicConfig *quic.Config tlsConfig *tls.Config credential *auth.Credential - logger log.Logger + logger *slog.Logger } func defaultClientOption() *clientOptions { - logger := logger.Default() + logger := ylog.Default() defalutQuicConfig := &quic.Config{ Versions: []quic.VersionNumber{quic.Version2}, @@ -45,7 +45,7 @@ func defaultClientOption() *clientOptions { } if opts.credential != nil { - logger.Printf("%suse credential: [%s]", ClientLogPrefix, opts.credential.Name()) + logger.Debug("use credential", "component", "client", "credential_name", opts.credential.Name()) } return opts @@ -81,8 +81,8 @@ func WithClientQuicConfig(qc *quic.Config) ClientOption { } } -// WithLogger sets logger for the client. -func WithLogger(logger log.Logger) ClientOption { +// WithClientLogger sets logger for the client. +func WithClientLogger(logger *slog.Logger) ClientOption { return func(o *clientOptions) { o.logger = logger } diff --git a/core/client_test.go b/core/client_test.go index 0173faa93..dedf4d9c3 100644 --- a/core/client_test.go +++ b/core/client_test.go @@ -12,8 +12,8 @@ import ( "github.com/yomorun/yomo/core/frame" "github.com/yomorun/yomo/core/metadata" "github.com/yomorun/yomo/core/router" + "github.com/yomorun/yomo/core/ylog" "github.com/yomorun/yomo/pkg/config" - "github.com/yomorun/yomo/pkg/logger" ) const testaddr = "127.0.0.1:19999" @@ -64,7 +64,7 @@ func TestFrameRoundTrip(t *testing.T) { WithObserveDataTags(obversedTag), WithClientQuicConfig(DefalutQuicConfig), WithClientTLSConfig(nil), - WithLogger(logger.Default()), + WithClientLogger(ylog.Default()), ) source.SetBackflowFrameObserver(func(bf *frame.BackflowFrame) { diff --git a/core/connection.go b/core/connection.go index 9018e1217..3cd7a5754 100644 --- a/core/connection.go +++ b/core/connection.go @@ -6,7 +6,7 @@ import ( "github.com/yomorun/yomo/core/frame" "github.com/yomorun/yomo/core/metadata" - "github.com/yomorun/yomo/pkg/logger" + "golang.org/x/exp/slog" ) // Connection wraps the specific io connections (typically quic.Connection) to transfer y3 frames @@ -36,9 +36,18 @@ type connection struct { observed []frame.Tag // observed data tags mu sync.Mutex closed bool + logger *slog.Logger } -func newConnection(name string, clientID string, clientType ClientType, metadata metadata.Metadata, stream io.ReadWriteCloser, observed []frame.Tag) Connection { +func newConnection( + name string, + clientID string, + clientType ClientType, + metadata metadata.Metadata, + stream io.ReadWriteCloser, + observed []frame.Tag, + logger *slog.Logger, +) Connection { return &connection{ name: name, clientID: clientID, @@ -82,7 +91,7 @@ func (c *connection) Write(f frame.Frame) error { c.mu.Lock() defer c.mu.Unlock() if c.closed { - logger.Warnf("%sclient stream is closed: %s", ServerLogPrefix, c.clientID) + c.logger.Warn("client stream is closed") return nil } _, err := c.stream.Write(f.Encode()) diff --git a/core/connector.go b/core/connector.go index 425fb0308..5c4f60499 100644 --- a/core/connector.go +++ b/core/connector.go @@ -4,7 +4,7 @@ import ( "sync" "github.com/yomorun/yomo/core/frame" - "github.com/yomorun/yomo/pkg/logger" + "golang.org/x/exp/slog" ) var _ Connector = &connector{} @@ -26,22 +26,23 @@ type Connector interface { } type connector struct { - conns sync.Map + conns sync.Map + logger *slog.Logger } -func newConnector() Connector { - return &connector{conns: sync.Map{}} +func newConnector(logger *slog.Logger) Connector { + return &connector{conns: sync.Map{}, logger: logger} } // Add a connection. func (c *connector) Add(connID string, conn Connection) { - logger.Debugf("%sconnector add: connID=%s", ServerLogPrefix, connID) + c.logger.Debug("connector add connection", "conn_id", connID) c.conns.Store(connID, conn) } // Remove a connection. func (c *connector) Remove(connID string) { - logger.Debugf("%sconnector remove: connID=%s", ServerLogPrefix, connID) + c.logger.Debug("connector remove connection", "conn_id", connID) c.conns.Delete(connID) } diff --git a/core/context.go b/core/context.go index 1157793e8..779e678fb 100644 --- a/core/context.go +++ b/core/context.go @@ -8,7 +8,7 @@ import ( "github.com/lucas-clemente/quic-go" "github.com/yomorun/yomo/core/frame" "github.com/yomorun/yomo/core/yerr" - "github.com/yomorun/yomo/pkg/logger" + "golang.org/x/exp/slog" ) var ctxPool sync.Pool @@ -26,9 +26,11 @@ type Context struct { Keys map[string]interface{} mu sync.RWMutex + + logger *slog.Logger } -func newContext(conn quic.Connection, stream quic.Stream) (ctx *Context) { +func newContext(conn quic.Connection, stream quic.Stream, logger *slog.Logger) (ctx *Context) { v := ctxPool.Get() if v == nil { ctx = new(Context) @@ -38,18 +40,54 @@ func newContext(conn quic.Connection, stream quic.Stream) (ctx *Context) { ctx.Conn = conn ctx.Stream = stream ctx.connID = conn.RemoteAddr().String() + ctx.logger = logger.With("conn_id", conn.RemoteAddr().String(), "stream_id", stream.StreamID()) return } +const clientInfoKey = "client_info" + +// ClientInfo holds client info, you can use `*Context.ClientInfo()` to get it after handshake. +type ClientInfo struct { + clientID string + clientType byte + clientName string + authName string +} + +// ClientInfo get client info from context. +func (c *Context) ClientInfo() *ClientInfo { + val, ok := c.Get(clientInfoKey) + if !ok { + return &ClientInfo{} + } + return val.(*ClientInfo) +} + // WithFrame sets a frame to context. func (c *Context) WithFrame(f frame.Frame) *Context { + if f.Type() == frame.TagOfHandshakeFrame { + handshakeFrame := f.(*frame.HandshakeFrame) + c.logger.With( + "client_id", handshakeFrame.ClientID, + "client_type", ClientType(handshakeFrame.ClientType).String(), + "client_name", handshakeFrame.Name, + "auth_name", handshakeFrame.AuthName(), + ) + c.Set(clientInfoKey, &ClientInfo{ + clientID: handshakeFrame.ClientID, + clientType: handshakeFrame.ClientType, + clientName: handshakeFrame.Name, + authName: handshakeFrame.AuthName(), + }) + } + c.logger.With("frame_type", f.Type().String()) c.Frame = f return c } // Clean the context. func (c *Context) Clean() { - logger.Debugf("%sconn[%s] context clean", ServerLogPrefix, c.connID) + c.logger.Debug("conn context clean", "conn_id", c.connID) c.reset() ctxPool.Put(c) } @@ -59,6 +97,7 @@ func (c *Context) reset() { c.connID = "" c.Stream = nil c.Frame = nil + c.logger = nil for k := range c.Keys { delete(c.Keys, k) } @@ -66,14 +105,13 @@ func (c *Context) reset() { // CloseWithError closes the stream and cleans the context. func (c *Context) CloseWithError(code yerr.ErrorCode, msg string) { - logger.Debugf("%sconn[%s] context close, errCode=%#x, msg=%s", ServerLogPrefix, c.connID, code, msg) + c.logger.Debug("conn context close, ", "err_code", code, "err_msg", msg) if c.Stream != nil { c.Stream.Close() } if c.Conn != nil { c.Conn.CloseWithError(quic.ApplicationErrorCode(code), msg) } - c.Clean() } // ConnID get quic connection id diff --git a/core/core.go b/core/core.go index c890cab89..701d81e56 100644 --- a/core/core.go +++ b/core/core.go @@ -17,13 +17,6 @@ const ( ConnStateClosed ConnState = "Closed" ) -// Prefix is the prefix for logger. -const ( - ClientLogPrefix = "\033[36m[core:client]\033[0m " - ServerLogPrefix = "\033[32m[core:server]\033[0m " - ParseFrameLogPrefix = "\033[36m[core:stream_parser]\033[0m " -) - func init() { rand.Seed(time.Now().Unix()) } diff --git a/core/frame/frame.go b/core/frame/frame.go index d1f6d80b3..5e9afb443 100644 --- a/core/frame/frame.go +++ b/core/frame/frame.go @@ -1,9 +1,9 @@ package frame import ( - "os" - "strconv" "time" + + "github.com/yomorun/yomo/core/ylog" ) // ReadWriter is the interface that groups the ReadFrame and WriteFrame methods. @@ -69,7 +69,7 @@ type Writer interface { } // debugFrameSize print frame data size on debug mode -var debugFrameSize = 16 +var debugFrameSize = ylog.DebugFrameSize // Kinds of frames transferable within YoMo const ( @@ -160,11 +160,3 @@ func (f Type) String() string { return "UnknownFrame" } } - -func init() { - if envFrameSize := os.Getenv("YOMO_DEBUG_FRAME_SIZE"); envFrameSize != "" { - if val, err := strconv.Atoi(envFrameSize); err == nil { - debugFrameSize = val - } - } -} diff --git a/core/listener_default.go b/core/listener_default.go index e926cf7a8..2b8d82a70 100644 --- a/core/listener_default.go +++ b/core/listener_default.go @@ -6,15 +6,15 @@ import ( "time" "github.com/lucas-clemente/quic-go" - "github.com/yomorun/yomo/pkg/logger" pkgtls "github.com/yomorun/yomo/pkg/tls" + "golang.org/x/exp/slog" ) var _ Listener = (*defaultListener)(nil) type defaultListener struct { - conf *quic.Config quic.Listener + conf *quic.Config } // DefalutQuicConfig be used when `quicConfig` is nil. @@ -30,11 +30,11 @@ var DefalutQuicConfig = &quic.Config{ // DisablePathMTUDiscovery: true, } -func newListener(conn net.PacketConn, tlsConfig *tls.Config, quicConfig *quic.Config) (*defaultListener, error) { +func newListener(conn net.PacketConn, tlsConfig *tls.Config, quicConfig *quic.Config, logger *slog.Logger) (*defaultListener, error) { if tlsConfig == nil { tc, err := pkgtls.CreateServerTLSConfig(conn.LocalAddr().String()) if err != nil { - logger.Errorf("%sCreateServerTLSConfig: %v", ServerLogPrefix, err) + logger.Error("CreateServerTLSConfig error", err) return &defaultListener{}, err } tlsConfig = tc @@ -46,7 +46,7 @@ func newListener(conn net.PacketConn, tlsConfig *tls.Config, quicConfig *quic.Co quicListener, err := quic.Listen(conn, tlsConfig, quicConfig) if err != nil { - logger.Errorf("%squic Listen: %v", ServerLogPrefix, err) + logger.Error("quic Listen error", err) return &defaultListener{}, err } diff --git a/core/log/logger.go b/core/log/logger.go deleted file mode 100644 index 79134c383..000000000 --- a/core/log/logger.go +++ /dev/null @@ -1,57 +0,0 @@ -package log - -// Level of log -type Level uint8 - -const ( - // DebugLevel defines debug log level. - DebugLevel Level = iota + 1 - // InfoLevel defines info log level. - InfoLevel - // WarnLevel defines warn log level. - WarnLevel - // ErrorLevel defines error log level. - ErrorLevel - // NoLevel defines an absent log level. - NoLevel Level = 254 - // Disabled disables the logger. - Disabled Level = 255 -) - -// Logger is the interface for logger. -type Logger interface { - // SetLevel sets the logger level - SetLevel(Level) - // SetEncoding sets the logger's encoding - SetEncoding(encoding string) - // Printf logs a message without level - Printf(template string, args ...interface{}) - // Debugf logs a message at DebugLevel - Debugf(template string, args ...interface{}) - // Infof logs a message at InfoLevel - Infof(template string, args ...interface{}) - // Warnf logs a message at WarnLevel - Warnf(template string, args ...interface{}) - // Errorf logs a message at ErrorLevel - Errorf(template string, args ...interface{}) - // Output file path to write log message - Output(file string) - // ErrorOutput file path to write error message - ErrorOutput(file string) -} - -// String the logger level -func (l Level) String() string { - switch l { - case DebugLevel: - return "DEBUG" - case ErrorLevel: - return "ERROR" - case WarnLevel: - return "WARN" - case InfoLevel: - return "INFO" - default: - return "" - } -} diff --git a/core/log/logger_test.go b/core/log/logger_test.go deleted file mode 100644 index e290d16bf..000000000 --- a/core/log/logger_test.go +++ /dev/null @@ -1,15 +0,0 @@ -package log - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestLog(t *testing.T) { - assert.Equal(t, Level(88).String(), "") - assert.Equal(t, DebugLevel.String(), "DEBUG") - assert.Equal(t, WarnLevel.String(), "WARN") - assert.Equal(t, ErrorLevel.String(), "ERROR") - assert.Equal(t, InfoLevel.String(), "INFO") -} diff --git a/core/server.go b/core/server.go index 28a63eda0..d3425f159 100644 --- a/core/server.go +++ b/core/server.go @@ -17,20 +17,12 @@ import ( "github.com/yomorun/yomo/core/metadata" "github.com/yomorun/yomo/core/router" "github.com/yomorun/yomo/core/yerr" + "golang.org/x/exp/slog" // authentication implements, Currently, only token authentication is implemented _ "github.com/yomorun/yomo/pkg/auth" - "github.com/yomorun/yomo/pkg/logger" ) -const ( - // DefaultListenAddr is the default address to listen. - DefaultListenAddr = "0.0.0.0:9000" -) - -// ServerOption is the option for server. -type ServerOption func(*ServerOptions) - // FrameHandler is the handler for frame. type FrameHandler func(c *Context) error @@ -43,47 +35,48 @@ type Server struct { connector Connector router router.Router metadataBuilder metadata.Builder - alpnHandler func(proto string) error counterOfDataFrame int64 downstreams map[string]frame.Writer mu sync.Mutex - opts ServerOptions + opts *serverOptions beforeHandlers []FrameHandler afterHandlers []FrameHandler connectionCloseHandlers []ConnectionHandler listener Listener wg *sync.WaitGroup + logger *slog.Logger } // NewServer create a Server instance. func NewServer(name string, opts ...ServerOption) *Server { + options := defaultServerOptions() + + for _, o := range opts { + o(options) + } + + logger := options.logger.With("component", "server", "name", name) + s := &Server{ name: name, - connector: newConnector(), + connector: newConnector(logger), downstreams: make(map[string]frame.Writer), wg: new(sync.WaitGroup), + logger: logger, + opts: options, } - s.Init(opts...) return s } -// Init the options. -func (s *Server) Init(opts ...ServerOption) error { - for _, o := range opts { - o(&s.opts) - } - // options defaults - s.initOptions() - - return nil -} - // ListenAndServe starts the server. func (s *Server) ListenAndServe(ctx context.Context, addr string) error { if addr == "" { addr = DefaultListenAddr } + + s.logger.With("addr", addr) + udpAddr, err := net.ResolveUDPAddr("udp", addr) if err != nil { return err @@ -92,6 +85,7 @@ func (s *Server) ListenAndServe(ctx context.Context, addr string) error { if err != nil { return err } + return s.Serve(ctx, conn) } @@ -106,14 +100,14 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { } // listen the address - listener, err := newListener(conn, s.opts.TLSConfig, s.opts.QuicConfig) + listener, err := newListener(conn, s.opts.tlsConfig, s.opts.quicConfig, s.logger) if err != nil { - logger.Errorf("%slistener.Listen: err=%v", ServerLogPrefix, err) + s.logger.Error("listener.Listen error", err) return err } s.listener = listener - // defer listener.Close() - logger.Printf("%sāœ… [%s][%d] Listening on: %s, QUIC: %v, AUTH: %s", ServerLogPrefix, s.name, os.Getpid(), listener.Addr(), listener.Versions(), s.authNames()) + + s.logger.Info("Listening", err, "pid", os.Getpid(), "listener_version", listener.Versions(), "auth_name", s.authNames()) for { // create a new connection when new yomo-client connected @@ -122,10 +116,10 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { conn, err := s.listener.Accept(sctx) if err != nil { - logger.Errorf("%slistener accept connections error: %v", ServerLogPrefix, err) + s.logger.Error("listener accept connections error", err) return err } - err = s.alpnHandler(conn.ConnectionState().TLS.NegotiatedProtocol) + err = s.opts.alpnHandler(conn.ConnectionState().TLS.NegotiatedProtocol) if err != nil { conn.CloseWithError(quic.ApplicationErrorCode(yerr.ErrorCodeRejected), err.Error()) continue @@ -135,13 +129,13 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { // defer s.doConnectionCloseHandlers(conn) s.wg.Add(1) connID := GetConnID(conn) - logger.Infof("%sā¤ļø1/ new connection: %s", ServerLogPrefix, connID) + s.logger.Info(" new connection", "conn_id", connID) go func(ctx context.Context, qconn quic.Connection) { // connection close handlers on client connect timeout defer s.doConnectionCloseHandlers(qconn) for { - logger.Infof("%sā¤ļø2/ waiting for new stream", ServerLogPrefix) + s.logger.Debug("waiting for new stream") stream, err := qconn.AcceptStream(ctx) if err != nil { // if client close the connection, then we should close the connection @@ -159,41 +153,47 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { clientID = conn.ClientID() conn.Close() } - logger.Printf("%sšŸ’” [%s][%s](%s) close the connection: %v", ServerLogPrefix, name, clientID, connID, err) + s.logger.Debug("close the connection", "client_name", name, "client_id", clientID, "conn_id", connID, "error", err) break } defer stream.Close() - if ok := s.handshakeWithTimeout(conn, stream, 10*time.Second); !ok { + yctx, ok := s.handshakeWithTimeout(conn, stream, 10*time.Second) + if !ok { return } - logger.Infof("%sā¤ļø3/ [stream:%d] created, connID=%s", ServerLogPrefix, stream.StreamID(), connID) - // process frames on stream - c := newContext(conn, stream) - defer c.Clean() - s.handleConnection(c) - logger.Infof("%sā¤ļø4/ [stream:%d] handleConnection DONE", ServerLogPrefix, stream.StreamID()) + s.logger.Info("stream created", "stream_id", stream.StreamID(), "conn_id", connID) + + s.handleConnection(yctx) + yctx.logger.Info("stream handleConnection DONE") + + yctx.Clean() } }(sctx, conn) } } // handshakeWithTimeout call handshake with a timeout. -func (s *Server) handshakeWithTimeout(conn quic.Connection, stream quic.Stream, timeout time.Duration) bool { - ch := make(chan bool) +func (s *Server) handshakeWithTimeout(conn quic.Connection, stream quic.Stream, timeout time.Duration) (*Context, bool) { + type result struct { + ok bool + yctx *Context + } + ch := make(chan result) fs := NewFrameStream(stream) go func() { - ch <- s.handshake(conn, stream, fs) + ok, yctx := s.handshake(conn, stream, fs) + ch <- result{yctx, ok} }() select { case <-time.After(timeout): - return false - case ok := <-ch: - return ok + return nil, false + case r := <-ch: + return r.yctx, r.ok } } @@ -202,34 +202,38 @@ func (s *Server) handshakeWithTimeout(conn quic.Connection, stream quic.Stream, // It returns true if handshake successful otherwise return false. // It response to client a handshakeAckFrame if the handshake is successful // otherwise response a goawayFrame. -func (s *Server) handshake(conn quic.Connection, stream quic.Stream, fs frame.ReadWriter) bool { +// It returns a context for this stream handler. +func (s *Server) handshake(conn quic.Connection, stream quic.Stream, fs frame.ReadWriter) (*Context, bool) { frm, err := fs.ReadFrame() if err != nil { if err := fs.WriteFrame(frame.NewGoawayFrame(err.Error())); err != nil { - logger.Errorf("%sā›”ļø write to client[%s] GoawayFrame error:%v", ServerLogPrefix, conn.RemoteAddr().String(), err) + s.logger.Error("write to client GoawayFrame error", err, "remote_addr", conn.RemoteAddr().String()) } - return false + return nil, false } if frm.Type() != frame.TagOfHandshakeFrame { if err := fs.WriteFrame(frame.NewGoawayFrame("handshake failed")); err != nil { - logger.Errorf("%sā›”ļø reads first frame from client[%s] is not handshakeFrame, type :%v", ServerLogPrefix, conn.RemoteAddr().String(), frm.Type()) + s.logger.Error("first frame is not handshakeFrame", err, "remote_addr", conn.RemoteAddr().String(), "frame_type", frm.Type().String()) } - return false + return nil, false } - c := newContext(conn, stream).WithFrame(frm) + c := newContext(conn, stream, s.logger).WithFrame(frm) if err := s.handleHandshakeFrame(c); err != nil { if err := fs.WriteFrame(frame.NewGoawayFrame(err.Error())); err != nil { - logger.Errorf("%sā›”ļø write to client[%s] GoawayFrame error:%v", ServerLogPrefix, conn.RemoteAddr().String(), err) + c.logger.Error("write to client GoawayFrame error", err, "remote_addr", conn.RemoteAddr().String()) } - return false + return nil, false } - return true + return c, true } +// Logger returns the logger of server. +func (s *Server) Logger() *slog.Logger { return s.logger } + // Close will shutdown the server. func (s *Server) Close() error { // listener @@ -248,7 +252,8 @@ func (s *Server) Close() error { return nil } -// handle streams on a connection +// handleConnection handles streams on a connection, +// use c.logger in this function scope for more complete logger information. func (s *Server) handleConnection(c *Context) { fs := NewFrameStream(c.Stream) // check update for stream @@ -259,27 +264,27 @@ func (s *Server) handleConnection(c *Context) { if e, ok := err.(*quic.ApplicationError); ok { if yerr.Is(e.ErrorCode, yerr.ErrorCodeClientAbort) { // client abort - logger.Infof("%sclient close the connection", ServerLogPrefix) + c.logger.Info("client close the connection") break } else { ye := yerr.New(yerr.Parse(e.ErrorCode), err) - logger.Errorf("%s[ERR] %s", ServerLogPrefix, ye) + c.logger.Error("read frame error", ye) } } else if err == io.EOF { - logger.Infof("%sthe connection is EOF", ServerLogPrefix) + c.logger.Info("connection EOF") break } if errors.Is(err, net.ErrClosed) { // if client close the connection, net.ErrClosed will be raise // by quic-go IdleTimeoutError after connection's KeepAlive config. - logger.Warnf("%s[ERR] net.ErrClosed on [handleConnection] %v", ServerLogPrefix, net.ErrClosed) + c.logger.Warn("connection error", "error", net.ErrClosed) c.CloseWithError(yerr.ErrorCodeClosed, "net.ErrClosed") break } // any error occurred, we should close the stream // after this, conn.AcceptStream() will raise the error c.CloseWithError(yerr.ErrorCodeUnknown, err.Error()) - logger.Warnf("%sconnection.Close()", ServerLogPrefix) + c.logger.Warn("connection close") break } @@ -289,21 +294,21 @@ func (s *Server) handleConnection(c *Context) { // before frame handlers for _, handler := range s.beforeHandlers { if err := handler(c); err != nil { - logger.Errorf("%sbeforeFrameHandler err: %s", ServerLogPrefix, err) + c.logger.Error("beforeFrameHandler error", err) c.CloseWithError(yerr.ErrorCodeBeforeHandler, err.Error()) return } } // main handler if err := s.mainFrameHandler(c); err != nil { - logger.Errorf("%smainFrameHandler err: %s", ServerLogPrefix, err) + c.logger.Error("mainFrameHandler error", err) c.CloseWithError(yerr.ErrorCodeMainHandler, err.Error()) return } // after frame handler for _, handler := range s.afterHandlers { if err := handler(c); err != nil { - logger.Errorf("%safterFrameHandler err: %s", ServerLogPrefix, err) + c.logger.Error("afterFrameHandler error", err) c.CloseWithError(yerr.ErrorCodeAfterHandler, err.Error()) return } @@ -312,12 +317,11 @@ func (s *Server) handleConnection(c *Context) { } func (s *Server) mainFrameHandler(c *Context) error { - var err error frameType := c.Frame.Type() switch frameType { case frame.TagOfHandshakeFrame: - logger.Errorf("%sreceive a handshakeFrame, ingonre it", ServerLogPrefix) + c.logger.Warn("receive a handshakeFrame, ingonre it") case frame.TagOfDataFrame: if err := s.handleDataFrame(c); err != nil { c.CloseWithError(yerr.ErrorCodeData, fmt.Sprintf("handleDataFrame err: %v", err)) @@ -328,7 +332,7 @@ func (s *Server) mainFrameHandler(c *Context) error { s.handleBackflowFrame(c) } default: - logger.Errorf("%serr=%v, frameType=%v", ServerLogPrefix, err, frameType) + c.logger.Warn("unexpected frame", "unexpected_frame_type", frameType) } return nil } @@ -343,17 +347,17 @@ func (s *Server) handleHandshakeFrame(c *Context) error { clientType := ClientType(f.ClientType) stream := c.Stream // credential - logger.Debugf("%sGOT ā¤ļø HandshakeFrame: ClientType=%# x is %s, ClientID=%s, Credential=%s", ServerLogPrefix, f.ClientType, ClientType(f.ClientType), clientID, authName(f.AuthName())) + c.logger.Debug("GOT HandshakeFrame", "client_type", f.ClientType, "client_id", clientID, "auth_name", authName(f.AuthName())) // authenticate - authed := auth.Authenticate(s.opts.Auths, f) - logger.Debugf("%sauthenticated==%v", ServerLogPrefix, authed) + authed := auth.Authenticate(s.opts.auths, f) + c.logger.Debug("authenticated", "authed", authed) if !authed { err := fmt.Errorf("handshake authentication fails, client credential name is %s", authName(f.AuthName())) // return err - logger.Debugf("%sšŸ”‘ <%s> [%s](%s) is connected!", ServerLogPrefix, clientType, f.Name, connID) + c.logger.Debug("authenticated", "authed", authed) rejectedFrame := frame.NewRejectedFrame(err.Error()) if _, err = stream.Write(rejectedFrame.Encode()); err != nil { - logger.Debugf("%sšŸ”‘ write to <%s> [%s](%s) RejectedFrame error:%v", ServerLogPrefix, clientType, f.Name, connID, err) + c.logger.Error("write to RejectedFrame failed", err, "authed", authed) return err } return nil @@ -368,7 +372,7 @@ func (s *Server) handleHandshakeFrame(c *Context) error { if err != nil { return err } - conn = newConnection(f.Name, f.ClientID, clientType, metadata, stream, f.ObserveDataTags) + conn = newConnection(f.Name, f.ClientID, clientType, metadata, stream, f.ObserveDataTags, c.logger) if clientType == ClientTypeStreamFunction { // route @@ -381,10 +385,10 @@ func (s *Server) handleHandshakeFrame(c *Context) error { if e, ok := err.(yerr.DuplicateNameError); ok { existsConnID := e.ConnID() if conn := s.connector.Get(existsConnID); conn != nil { - logger.Debugf("%s%s, write to SFN[%s](%s) GoawayFrame", ServerLogPrefix, e.Error(), f.Name, existsConnID) + c.logger.Debug("write GoawayFrame", "error", e.Error(), "exists_conn_id", existsConnID) goawayFrame := frame.NewGoawayFrame(e.Error()) if err := conn.Write(goawayFrame); err != nil { - logger.Errorf("%sā›”ļø write to SFN[%s] GoawayFrame error:%v", ServerLogPrefix, f.Name, err) + c.logger.Error("write GoawayFrame failed", err) return err } } @@ -394,7 +398,7 @@ func (s *Server) handleHandshakeFrame(c *Context) error { } } case ClientTypeUpstreamZipper: - conn = newConnection(f.Name, f.ClientID, clientType, nil, stream, f.ObserveDataTags) + conn = newConnection(f.Name, f.ClientID, clientType, nil, stream, f.ObserveDataTags, c.logger) default: // TODO: There is no need to Remove, // unknown client type is not be add to connector. @@ -405,11 +409,11 @@ func (s *Server) handleHandshakeFrame(c *Context) error { } if _, err := stream.Write(frame.NewHandshakeAckFrame().Encode()); err != nil { - logger.Debugf("%sšŸ”‘ write to <%s> [%s](%s) AckFrame error:%v", ServerLogPrefix, clientType, f.Name, connID, err) + c.logger.Error("write handshakeAckFrame error", err) } s.connector.Add(connID, conn) - logger.Printf("%sā¤ļø <%s> [%s][%s](%s) is connected!", ServerLogPrefix, clientType, f.Name, clientID, connID) + c.logger.Info("client is connected!") return nil } @@ -426,7 +430,7 @@ func (s *Server) handleDataFrame(c *Context) error { fromID := c.ConnID() from := s.connector.Get(fromID) if from == nil { - logger.Warnf("%shandleDataFrame connector cannot find %s", ServerLogPrefix, fromID) + c.logger.Warn("handleDataFrame connector cannot find", "from_conn_id", fromID) return fmt.Errorf("handleDataFrame connector cannot find %s", fromID) } @@ -444,7 +448,7 @@ func (s *Server) handleDataFrame(c *Context) error { // route route := s.router.Route(metadata) if route == nil { - logger.Warnf("%shandleDataFrame route is nil", ServerLogPrefix) + c.logger.Warn("handleDataFrame route is nil") return fmt.Errorf("handleDataFrame route is nil") } @@ -453,16 +457,23 @@ func (s *Server) handleDataFrame(c *Context) error { for _, toID := range connIDs { conn := s.connector.Get(toID) if conn == nil { - logger.Errorf("%sconn is nil: (%s)", ServerLogPrefix, toID) + c.logger.Error("Can't find forward conn", errors.New("conn is nil"), "forward_conn_id", toID) continue } to := conn.Name() - logger.Debugf("%shandleDataFrame [%s](%s) -> [%s](%s): %v", ServerLogPrefix, from.Name(), fromID, to, toID, f) + c.logger.Info( + "handleDataFrame", + "from_conn_name", from.Name(), + "from_conn_id", fromID, + "to_conn_name", to, + "to_conn_id", toID, + "data_frame", f.String(), + ) // write data frame to stream if err := conn.Write(f); err != nil { - logger.Warnf("%shandleDataFrame conn.Write %v", ServerLogPrefix, err) + c.logger.Error("handleDataFrame conn.Write", err) } } @@ -479,9 +490,9 @@ func (s *Server) handleBackflowFrame(c *Context) error { sourceConns := s.connector.GetSourceConns(sourceID, tag) for _, source := range sourceConns { if source != nil { - logger.Debugf("%sā™»ļø handleBackflowFrame --> source:%s, result=%v", ServerLogPrefix, sourceID, f) + c.logger.Info("handleBackflowFrame", "source_conn_id", sourceID, "back_flow_frame", f.String()) if err := source.Write(bf); err != nil { - logger.Errorf("%sā™»ļø handleBackflowFrame --> source:%s, error=%v", ServerLogPrefix, sourceID, err) + c.logger.Error("handleBackflowFrame conn.Write", err) return err } } @@ -508,7 +519,7 @@ func (s *Server) Downstreams() map[string]frame.Writer { func (s *Server) ConfigRouter(router router.Router) { s.mu.Lock() s.router = router - logger.Debugf("%sconfig router is %#v", ServerLogPrefix, router) + s.logger.Debug("config route", "router", router) s.mu.Unlock() } @@ -516,15 +527,15 @@ func (s *Server) ConfigRouter(router router.Router) { func (s *Server) ConfigMetadataBuilder(builder metadata.Builder) { s.mu.Lock() s.metadataBuilder = builder - logger.Debugf("%sconfig metadataBuilder is %#v", ServerLogPrefix, builder) + s.logger.Debug("config metadataBuilder", "metadataBuilder", builder) s.mu.Unlock() } // ConfigAlpnHandler is used to set alpnHandler by zipper func (s *Server) ConfigAlpnHandler(h func(string) error) { s.mu.Lock() - s.alpnHandler = h - logger.Debugf("%sconfig alpnHandler is %#v", ServerLogPrefix, h) + s.opts.alpnHandler = h + s.logger.Debug("config alpnHandler", "alpnHandler", h) s.mu.Unlock() } @@ -540,7 +551,7 @@ func (s *Server) AddDownstreamServer(addr string, c frame.Writer) { func (s *Server) dispatchToDownstreams(c *Context) { conn := s.connector.Get(c.connID) if conn == nil { - logger.Debugf("%sdispatchToDownstreams: s.connector.Get(%s) is nil", ServerLogPrefix, c.connID) + c.logger.Debug("dispatchToDownstreams failed") } else if conn.ClientType() == ClientTypeSource { f := c.Frame.(*frame.DataFrame) if f.IsBroadcast() { @@ -548,11 +559,11 @@ func (s *Server) dispatchToDownstreams(c *Context) { f.GetMetaFrame().SetMetadata(conn.Metadata().Encode()) } for addr, ds := range s.downstreams { - logger.Debugf("%sdispatching to [%s]: %# x", ServerLogPrefix, addr, f.TransactionID()) + c.logger.Info("dispatching to", "dispatch_addr", addr, "tid", f.TransactionID()) ds.WriteFrame(f) } } else { - logger.Debugf("%sdispatchToDownstreams: frame is local only [%s, %s]", ServerLogPrefix, c.connID, f.TransactionID()) + c.logger.Info("do not broadcast", "tid", f.TransactionID()) } } } @@ -562,16 +573,6 @@ func GetConnID(conn quic.Connection) string { return conn.RemoteAddr().String() } -func (s *Server) initOptions() { - // defaults - if s.alpnHandler == nil { - s.alpnHandler = func(proto string) error { - logger.Infof("%sclient alpn proto is: %s", ServerLogPrefix, proto) - return nil - } - } -} - func (s *Server) validateRouter() error { if s.router == nil { return errors.New("server's router is nil") @@ -586,11 +587,6 @@ func (s *Server) validateMetadataBuilder() error { return nil } -// Options returns the options of server. -func (s *Server) Options() ServerOptions { - return s.opts -} - // Connector returns the connector of server. func (s *Server) Connector() Connector { return s.connector @@ -612,11 +608,11 @@ func (s *Server) SetConnectionCloseHandlers(handlers ...ConnectionHandler) { } func (s *Server) authNames() []string { - if len(s.opts.Auths) == 0 { + if len(s.opts.auths) == 0 { return []string{"none"} } result := []string{} - for _, auth := range s.opts.Auths { + for _, auth := range s.opts.auths { result = append(result, auth.Name()) } return result @@ -632,7 +628,7 @@ func authName(name string) string { func (s *Server) doConnectionCloseHandlers(qconn quic.Connection) { defer s.wg.Done() - logger.Debugf("%sšŸ–¤ [%s] quic connection closed", ServerLogPrefix, qconn.RemoteAddr()) + s.logger.Debug("quic connection closed", "remote_addr", qconn.RemoteAddr()) for _, h := range s.connectionCloseHandlers { h(qconn) } diff --git a/core/server_options.go b/core/server_options.go index e284d52c0..e1875d0f3 100644 --- a/core/server_options.go +++ b/core/server_options.go @@ -2,58 +2,84 @@ package core import ( "crypto/tls" - "net" "github.com/lucas-clemente/quic-go" "github.com/yomorun/yomo/core/auth" + "github.com/yomorun/yomo/core/ylog" + "golang.org/x/exp/slog" ) +const ( + // DefaultListenAddr is the default address to listen. + DefaultListenAddr = "0.0.0.0:9000" +) + +// ServerOption is the option for server. +type ServerOption func(*serverOptions) + // ServerOptions are the options for YoMo server. -type ServerOptions struct { - QuicConfig *quic.Config - TLSConfig *tls.Config - Addr string - Auths map[string]auth.Authentication - Conn net.PacketConn +type serverOptions struct { + quicConfig *quic.Config + tlsConfig *tls.Config + addr string + auths map[string]auth.Authentication + logger *slog.Logger + alpnHandler func(proto string) error +} + +func defaultServerOptions() *serverOptions { + logger := ylog.Default() + + return &serverOptions{ + quicConfig: DefalutQuicConfig, + tlsConfig: nil, + addr: DefaultListenAddr, + auths: map[string]auth.Authentication{}, + logger: logger, + alpnHandler: func(proto string) error { + logger.Info("client alpn proto", "component", "server", "proto", proto) + return nil + }, + } } // WithAddr sets the server address. func WithAddr(addr string) ServerOption { - return func(o *ServerOptions) { - o.Addr = addr + return func(o *serverOptions) { + o.addr = addr } } // WithAuth sets the server authentication method. func WithAuth(name string, args ...string) ServerOption { - return func(o *ServerOptions) { + return func(o *serverOptions) { if a, ok := auth.GetAuth(name); ok { a.Init(args...) - if o.Auths == nil { - o.Auths = make(map[string]auth.Authentication) + if o.auths == nil { + o.auths = make(map[string]auth.Authentication) } - o.Auths[a.Name()] = a + o.auths[a.Name()] = a } } } // WithServerTLSConfig sets the TLS configuration for the server. func WithServerTLSConfig(tc *tls.Config) ServerOption { - return func(o *ServerOptions) { - o.TLSConfig = tc + return func(o *serverOptions) { + o.tlsConfig = tc } } // WithServerQuicConfig sets the QUIC configuration for the server. func WithServerQuicConfig(qc *quic.Config) ServerOption { - return func(o *ServerOptions) { - o.QuicConfig = qc + return func(o *serverOptions) { + o.quicConfig = qc } } -// WithConn sets the connection for the server. -func WithConn(conn net.PacketConn) ServerOption { - return func(o *ServerOptions) { - o.Conn = conn +// WithServerLogger sets the logger for the server. +func WithServerLogger(logger *slog.Logger) ServerOption { + return func(o *serverOptions) { + o.logger = logger } } diff --git a/core/server_test.go b/core/server_test.go index effb5f98b..9da04da96 100644 --- a/core/server_test.go +++ b/core/server_test.go @@ -10,6 +10,7 @@ import ( "github.com/yomorun/yomo/core/frame" "github.com/yomorun/yomo/core/metadata" "github.com/yomorun/yomo/core/router" + "github.com/yomorun/yomo/core/ylog" yauth "github.com/yomorun/yomo/pkg/auth" "github.com/yomorun/yomo/pkg/config" ) @@ -33,7 +34,9 @@ type mockConnectorArgs struct { // buildMockConnector build a mock connector according to `args` // for preparing dataFrame testing. func buildMockConnector(router router.Router, metadataBuilder metadata.Builder, args []mockConnectorArgs) Connector { - connector := newConnector() + logger := ylog.Default() + + connector := newConnector(logger) for _, arg := range args { @@ -55,6 +58,7 @@ func buildMockConnector(router router.Router, metadataBuilder metadata.Builder, metadata, arg.stream, handshakeFrame.ObserveDataTags, + logger, ) route := router.Route(conn.Metadata()) @@ -118,7 +122,7 @@ func TestHandleDataFrame(t *testing.T) { }) defer connector.Clean() - server := &Server{connector: connector} + server := &Server{connector: connector, logger: ylog.Default()} server.ConfigRouter(routers) server.ConfigMetadataBuilder(metadataBuilder) @@ -137,6 +141,7 @@ func TestHandleDataFrame(t *testing.T) { connID: sourceConnID, Stream: sourceStream, Frame: dataFrame, + logger: server.logger, } err := server.handleDataFrame(c) @@ -167,6 +172,7 @@ func TestHandleDataFrame(t *testing.T) { connID: zipperConnID, Stream: zipperStream, Frame: dataFrame, + logger: server.logger, } err := server.handleDataFrame(c) @@ -277,13 +283,14 @@ func TestHandShake(t *testing.T) { }, } + logger := ylog.Default() for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - server := &Server{connector: newConnector()} + server := &Server{connector: newConnector(logger), logger: logger, opts: defaultServerOptions()} server.ConfigRouter(router.Default([]config.App{{Name: tt.args.clientNameConfigInServer}})) - server.opts.Auths = map[string]auth.Authentication{ + server.opts.auths = map[string]auth.Authentication{ tokenAuth.Name(): tokenAuth, } @@ -301,6 +308,7 @@ func TestHandShake(t *testing.T) { connID: clientID, Stream: stream, Frame: frame.NewHandshakeFrame(clientName, clientID, clientType, []frame.Tag{frame.Tag(1)}, "token", token), + logger: server.logger, } for n := 0; n < tt.handshakeTimes; n++ { From 95bb697755ba599578445832da15ea1c1d1f4d2d Mon Sep 17 00:00:00 2001 From: woorui Date: Tue, 13 Dec 2022 00:04:08 +0800 Subject: [PATCH 03/16] fix(core/client): log level --- core/client_options.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/client_options.go b/core/client_options.go index fc7d6667b..e1a69cc40 100644 --- a/core/client_options.go +++ b/core/client_options.go @@ -45,7 +45,7 @@ func defaultClientOption() *clientOptions { } if opts.credential != nil { - logger.Debug("use credential", "component", "client", "credential_name", opts.credential.Name()) + logger.Info("use credential", "component", "client", "credential_name", opts.credential.Name()) } return opts From d809f6760e90aede7355dd83ce2c6cd61ff8de89 Mon Sep 17 00:00:00 2001 From: woorui Date: Tue, 13 Dec 2022 00:34:11 +0800 Subject: [PATCH 04/16] fix(core/context): log with bug --- core/context.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/context.go b/core/context.go index 779e678fb..f90048c09 100644 --- a/core/context.go +++ b/core/context.go @@ -1,6 +1,7 @@ package core import ( + "fmt" "io" "sync" "time" @@ -67,7 +68,7 @@ func (c *Context) ClientInfo() *ClientInfo { func (c *Context) WithFrame(f frame.Frame) *Context { if f.Type() == frame.TagOfHandshakeFrame { handshakeFrame := f.(*frame.HandshakeFrame) - c.logger.With( + c.logger = c.logger.With( "client_id", handshakeFrame.ClientID, "client_type", ClientType(handshakeFrame.ClientType).String(), "client_name", handshakeFrame.Name, @@ -80,7 +81,7 @@ func (c *Context) WithFrame(f frame.Frame) *Context { authName: handshakeFrame.AuthName(), }) } - c.logger.With("frame_type", f.Type().String()) + c.logger = c.logger.With("frame_type", f.Type().String()) c.Frame = f return c } From 0d240ad75eb2f3e04decb5640e09951144fac2f4 Mon Sep 17 00:00:00 2001 From: woorui Date: Tue, 13 Dec 2022 00:36:06 +0800 Subject: [PATCH 05/16] fix(core/server): recycle Context correctly --- core/client.go | 6 +++--- core/connection.go | 1 + core/server.go | 21 +++++++++++++-------- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/core/client.go b/core/client.go index 9be98d7d2..78db3a95e 100644 --- a/core/client.go +++ b/core/client.go @@ -260,13 +260,13 @@ func (c *Client) WriteFrame(frm frame.Frame) error { // SetDataFrameObserver sets the data frame handler. func (c *Client) SetDataFrameObserver(fn func(*frame.DataFrame)) { c.processor = fn - c.logger.Debug("SetDataFrameObserver", "processor", c.processor) + c.logger.Debug("SetDataFrameObserver") } // SetBackflowFrameObserver sets the backflow frame handler. func (c *Client) SetBackflowFrameObserver(fn func(*frame.BackflowFrame)) { c.receiver = fn - c.logger.Debug("SetBackflowFrameObserver", "receiver", c.receiver) + c.logger.Debug("SetBackflowFrameObserver") } // reconnect the connection between client and server. @@ -277,7 +277,7 @@ func (c *Client) reconnect(ctx context.Context, addr string) { for { select { case <-ctx.Done(): - c.logger.Debug("context.Done", "receiver", "error", ctx.Err()) + c.logger.Debug("context.Done", "error", ctx.Err()) return case err, ok := <-c.errc: if c.errorfn != nil && err != nil { diff --git a/core/connection.go b/core/connection.go index 3cd7a5754..39b5941d5 100644 --- a/core/connection.go +++ b/core/connection.go @@ -48,6 +48,7 @@ func newConnection( observed []frame.Tag, logger *slog.Logger, ) Connection { + logger.Debug("new connecton") return &connection{ name: name, clientID: clientID, diff --git a/core/server.go b/core/server.go index d3425f159..e24b9bbb4 100644 --- a/core/server.go +++ b/core/server.go @@ -75,7 +75,7 @@ func (s *Server) ListenAndServe(ctx context.Context, addr string) error { addr = DefaultListenAddr } - s.logger.With("addr", addr) + s.logger = s.logger.With("addr", addr) udpAddr, err := net.ResolveUDPAddr("udp", addr) if err != nil { @@ -107,7 +107,7 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { } s.listener = listener - s.logger.Info("Listening", err, "pid", os.Getpid(), "listener_version", listener.Versions(), "auth_name", s.authNames()) + s.logger.Info("Listening", "pid", os.Getpid(), "quic", listener.Versions(), "auth_name", s.authNames()) for { // create a new connection when new yomo-client connected @@ -159,6 +159,13 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { defer stream.Close() yctx, ok := s.handshakeWithTimeout(conn, stream, 10*time.Second) + + defer func() { + if yctx != nil { + yctx.Clean() + } + }() + if !ok { return } @@ -167,8 +174,6 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { s.handleConnection(yctx) yctx.logger.Info("stream handleConnection DONE") - - yctx.Clean() } }(sctx, conn) } @@ -350,7 +355,7 @@ func (s *Server) handleHandshakeFrame(c *Context) error { c.logger.Debug("GOT HandshakeFrame", "client_type", f.ClientType, "client_id", clientID, "auth_name", authName(f.AuthName())) // authenticate authed := auth.Authenticate(s.opts.auths, f) - c.logger.Debug("authenticated", "authed", authed) + c.logger.Debug("authenticate", "authed", authed) if !authed { err := fmt.Errorf("handshake authentication fails, client credential name is %s", authName(f.AuthName())) // return err @@ -519,7 +524,7 @@ func (s *Server) Downstreams() map[string]frame.Writer { func (s *Server) ConfigRouter(router router.Router) { s.mu.Lock() s.router = router - s.logger.Debug("config route", "router", router) + s.logger.Debug("config route") s.mu.Unlock() } @@ -527,7 +532,7 @@ func (s *Server) ConfigRouter(router router.Router) { func (s *Server) ConfigMetadataBuilder(builder metadata.Builder) { s.mu.Lock() s.metadataBuilder = builder - s.logger.Debug("config metadataBuilder", "metadataBuilder", builder) + s.logger.Debug("config metadataBuilder") s.mu.Unlock() } @@ -535,7 +540,7 @@ func (s *Server) ConfigMetadataBuilder(builder metadata.Builder) { func (s *Server) ConfigAlpnHandler(h func(string) error) { s.mu.Lock() s.opts.alpnHandler = h - s.logger.Debug("config alpnHandler", "alpnHandler", h) + s.logger.Debug("config alpnHandler") s.mu.Unlock() } From 9876815478c3c40ed992b205cd4c29a0a895862d Mon Sep 17 00:00:00 2001 From: woorui Date: Tue, 13 Dec 2022 00:49:21 +0800 Subject: [PATCH 06/16] fix(core/context): remove unused pkg imports --- core/context.go | 1 - 1 file changed, 1 deletion(-) diff --git a/core/context.go b/core/context.go index f90048c09..1ca27afa4 100644 --- a/core/context.go +++ b/core/context.go @@ -1,7 +1,6 @@ package core import ( - "fmt" "io" "sync" "time" From abb1b746ab467fab5d6004e895655bcbb2f48227 Mon Sep 17 00:00:00 2001 From: woorui Date: Tue, 13 Dec 2022 17:24:32 +0800 Subject: [PATCH 07/16] fix(client/option): use WithLogger --- core/client_options.go | 4 ++-- core/client_test.go | 2 +- core/server_options.go | 7 ------- 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/core/client_options.go b/core/client_options.go index e1a69cc40..2a52593f0 100644 --- a/core/client_options.go +++ b/core/client_options.go @@ -81,8 +81,8 @@ func WithClientQuicConfig(qc *quic.Config) ClientOption { } } -// WithClientLogger sets logger for the client. -func WithClientLogger(logger *slog.Logger) ClientOption { +// WithLogger sets logger for the client. +func WithLogger(logger *slog.Logger) ClientOption { return func(o *clientOptions) { o.logger = logger } diff --git a/core/client_test.go b/core/client_test.go index dedf4d9c3..3e788aab1 100644 --- a/core/client_test.go +++ b/core/client_test.go @@ -64,7 +64,7 @@ func TestFrameRoundTrip(t *testing.T) { WithObserveDataTags(obversedTag), WithClientQuicConfig(DefalutQuicConfig), WithClientTLSConfig(nil), - WithClientLogger(ylog.Default()), + WithLogger(ylog.Default()), ) source.SetBackflowFrameObserver(func(bf *frame.BackflowFrame) { diff --git a/core/server_options.go b/core/server_options.go index e1875d0f3..18b3087ce 100644 --- a/core/server_options.go +++ b/core/server_options.go @@ -76,10 +76,3 @@ func WithServerQuicConfig(qc *quic.Config) ServerOption { o.quicConfig = qc } } - -// WithServerLogger sets the logger for the server. -func WithServerLogger(logger *slog.Logger) ServerOption { - return func(o *serverOptions) { - o.logger = logger - } -} From 1466d4297ab75fb6acdf5e7ac0eafcea1a44613b Mon Sep 17 00:00:00 2001 From: woorui Date: Fri, 16 Dec 2022 19:09:32 +0800 Subject: [PATCH 08/16] fix(client): change log level --- core/client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/client.go b/core/client.go index 78db3a95e..dbc687302 100644 --- a/core/client.go +++ b/core/client.go @@ -233,7 +233,7 @@ func (c *Client) Close() error { } func (c *Client) close() error { - c.logger.Debug("close the connection") + c.logger.Info("close the connection") // close error channel so that close handler function will be called close(c.errc) @@ -292,7 +292,7 @@ func (c *Client) reconnect(ctx context.Context, addr string) { state := c.state c.mu.Unlock() if state == ConnStateDisconnected { - c.logger.Debug("reconnecting to YoMo-Zipper") + c.logger.Info("reconnecting to YoMo-Zipper") err := c.connect(ctx, addr) if err != nil { c.logger.Error("reconnecting to YoMo-Zipper", err) From 6d9c33c68ea1166eae10422ed6d68f4b0d9929c1 Mon Sep 17 00:00:00 2001 From: woorui Date: Tue, 20 Dec 2022 13:35:09 +0800 Subject: [PATCH 09/16] fix: fix log message --- core/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/client.go b/core/client.go index dbc687302..3bef6be4c 100644 --- a/core/client.go +++ b/core/client.go @@ -124,7 +124,7 @@ func (c *Client) connect(ctx context.Context, addr string) error { c.state = ConnStateConnected c.localAddr = c.conn.LocalAddr().String() - c.logger = slog.With("local_addr", c.localAddr, "reomote", c.RemoteAddr()) + c.logger = slog.With("local_addr", c.localAddr, "remote_addr", c.RemoteAddr()) c.logger.Debug("connected to YoMo-Zipper") From fd99baf816da3b69fce2c5ad7a0fe614eb745b04 Mon Sep 17 00:00:00 2001 From: wurui Date: Tue, 20 Dec 2022 15:00:54 +0800 Subject: [PATCH 10/16] refactor: yomo pkg uses slog (#411) * refactor: use slog for yomo pkg * refactor: yomo use slog * refactor(rx): use slog for rx pkg * feat(sfn): change on data log level * fix(zipper): zipper build * fix(zipper): remove unused pkg imports * merge master * fix(client): option logger * fix(option): make option public * fix(sfn): log info * feat: make NewOption public * refactor: example uses slog (#412) * refactor(example): use slog * build: restore go mod * build: restore go mod * doc: fix example doc * fix: example use slog not ylog * build: go mod --- cli/root.go | 11 +- example/0-basic/README.md | 30 ++-- example/0-basic/sfn/main.go | 10 +- example/0-basic/source/main.go | 102 +---------- example/3-multi-sfn/stream-fn-1/app.go | 10 +- example/3-multi-sfn/stream-fn-2/app.go | 5 +- example/3-multi-sfn/stream-fn-3/app.go | 12 +- example/5-backflow/sfn-1/main.go | 10 +- example/5-backflow/sfn-2/main.go | 10 +- example/5-backflow/source/main.go | 14 +- go.mod | 5 +- go.sum | 17 +- options.go | 31 ++-- pkg/logger/logger.go | 88 ---------- pkg/logger/zap.go | 228 ------------------------- rx/runtime.go | 18 +- rx/stream_operator.go | 4 +- rx/stream_operator_test.go | 5 - sfn.go | 28 ++- source.go | 16 +- zipper.go | 37 ++-- zipper_notwindows.go | 14 +- zipper_windows.go | 8 +- 23 files changed, 135 insertions(+), 578 deletions(-) delete mode 100644 pkg/logger/logger.go delete mode 100644 pkg/logger/zap.go diff --git a/cli/root.go b/cli/root.go index e1c093798..a270ce90d 100644 --- a/cli/root.go +++ b/cli/root.go @@ -23,7 +23,6 @@ import ( "github.com/spf13/cobra" "github.com/yomorun/yomo/cli/serverless" "github.com/yomorun/yomo/pkg/file" - "github.com/yomorun/yomo/pkg/logger" ) var ( @@ -35,13 +34,9 @@ var ( // rootCmd represents the base command when called without any subcommands var rootCmd = &cobra.Command{ - Use: "yomo", - Version: GetVersion(), - PersistentPreRun: func(cmd *cobra.Command, args []string) { - if verbose { - logger.EnableDebug() - } - }, + Use: "yomo", + Version: GetVersion(), + PersistentPreRun: func(cmd *cobra.Command, args []string) {}, // Run: func(cmd *cobra.Command, args []string) { }, } diff --git a/example/0-basic/README.md b/example/0-basic/README.md index b7da32f8e..59a7d731e 100644 --- a/example/0-basic/README.md +++ b/example/0-basic/README.md @@ -42,11 +42,9 @@ YoMo CLI Version: v1.0.0 ```bash yomo serve -c ./workflow.yaml -Using config file: ./workflow.yaml -2021/11/11 16:09:54 [yomo:zipper] [AddWorkflow] 0, Noise -ā„¹ļø Running YoMo-Zipper... -2021/11/11 16:09:54 [yomo:zipper] Listening SIGTERM/SIGINT... -2021/11/11 16:09:54 [core:server] āœ… (name:Service) Listening on: 127.0.0.1:9000, QUIC: [v1 draft-29] +time=2022-12-12T18:12:15.735+08:00 level=INFO msg="Using config file" component=server name=Service file_path=../workflow.yaml +time=2022-12-12T18:12:15.735+08:00 level=INFO msg="Listening SIGUSR1, SIGUSR2, SIGTERM/SIGINT..." +time=2022-12-12T18:12:15.738+08:00 level=INFO msg=Listening component=server name=Service local_addr=127.0.0.1:9000 pid=25220 quic="[v2 v1 draft-29]" auth_name=[none] ``` ### Run [stream-function](https://docs.yomo.run/stream-fn) @@ -63,11 +61,13 @@ go run ./sfn/main.go ```bash go run ./source/main.go -2021/11/11 16:12:01 [core:client] use credential: [None] -2021/11/11 16:12:01 [core:client] ā¤ļø [yomo-source] is connected to YoMo-Zipper localhost:9000 -2021/11/11 16:12:01 [source] āœ… Emit {192.13399 1636618321242 localhost} to YoMo-Zipper -2021/11/11 16:12:01 [source] āœ… Emit {132.86566 1636618321547 localhost} to YoMo-Zipper -2021/11/11 16:12:01 [source] āœ… Emit {199.17604 1636618321851 localhost} to YoMo-Zipper +time=2022-12-12T17:56:27.156+08:00 level=INFO msg="use credential" component=client credential_name=none +time=2022-12-12T17:56:27.161+08:00 level=INFO msg="[source] āœ… Emit to YoMo-Zipper" data="{62.31009 1670838987160 localhost}" +time=2022-12-12T17:56:28.162+08:00 level=INFO msg="[source] āœ… Emit to YoMo-Zipper" data="{58.455963 1670838988161 localhost}" +time=2022-12-12T17:56:29.163+08:00 level=INFO msg="[source] āœ… Emit to YoMo-Zipper" data="{158.80386 1670838989162 localhost}" +time=2022-12-12T17:56:30.164+08:00 level=INFO msg="[source] āœ… Emit to YoMo-Zipper" data="{190.63675 1670838990164 localhost}" +time=2022-12-12T17:56:31.166+08:00 level=INFO msg="[source] āœ… Emit to YoMo-Zipper" data="{147.77885 1670838991166 localhost}" +time=2022-12-12T17:56:32.168+08:00 level=INFO msg="[source] āœ… Emit to YoMo-Zipper" data="{83.59812 1670838992168 localhost}" ``` ### Results @@ -77,7 +77,11 @@ go run ./source/main.go The terminal of `stream-function` will print the real-time sound value. ```bash -2021/11/11 16:12:01 >> [sfn] got tag=0x33, data={ 0x1.80449ap+07 0x17d0e0dbd5a 0x6c 0x6f 0x63 0x61 0x6c 0x68 0x6f 0x73 0x74} -2021/11/11 16:12:01 >> [sfn] got tag=0x33, data={ 0x1.09bb38p+07 0x17d0e0dbe8b 0x6c 0x6f 0x63 0x61 0x6c 0x68 0x6f 0x73 0x74} -2021/11/11 16:12:01 >> [sfn] got tag=0x33, data={ 0x1.8e5a22p+07 0x17d0e0dbfbb 0x6c 0x6f 0x63 0x61 0x6c 0x68 0x6f 0x73 0x74} +time=2022-12-12T18:02:08.408+08:00 level=INFO msg="use credential" component=client credential_name=none +time=2022-12-12T18:02:13.895+08:00 level=INFO msg=[sfn] got=51 data="{98.02577 1670839333894 localhost}" +time=2022-12-12T18:02:14.900+08:00 level=INFO msg=[sfn] got=51 data="{71.31387 1670839334895 localhost}" +time=2022-12-12T18:02:15.898+08:00 level=INFO msg=[sfn] got=51 data="{157.18372 1670839335896 localhost}" +time=2022-12-12T18:02:16.900+08:00 level=INFO msg=[sfn] got=51 data="{13.951344 1670839336898 localhost}" +time=2022-12-12T18:02:17.902+08:00 level=INFO msg=[sfn] got=51 data="{99.50129 1670839337899 localhost}" +time=2022-12-12T18:02:18.904+08:00 level=INFO msg=[sfn] got=51 data="{124.94903 1670839338901 localhost}" ``` diff --git a/example/0-basic/sfn/main.go b/example/0-basic/sfn/main.go index 6cc0e12eb..ac755ef44 100644 --- a/example/0-basic/sfn/main.go +++ b/example/0-basic/sfn/main.go @@ -6,7 +6,7 @@ import ( "github.com/yomorun/yomo" "github.com/yomorun/yomo/core/frame" - "github.com/yomorun/yomo/pkg/logger" + "golang.org/x/exp/slog" ) type noiseData struct { @@ -32,12 +32,12 @@ func main() { // start err := sfn.Connect() if err != nil { - logger.Errorf("[sfn1] connect err=%v", err) + slog.Error("[sfn1] connect", err) os.Exit(1) } // set the error handler function when server error occurs sfn.SetErrorHandler(func(err error) { - logger.Errorf("[sfn1] receive server error: %v", err) + slog.Error("[sfn1] receive server error", err) sfn.Close() os.Exit(1) }) @@ -49,10 +49,10 @@ func handler(data []byte) (frame.Tag, []byte) { var model noiseData err := json.Unmarshal(data, &model) if err != nil { - logger.Errorf("[sfn] json.Marshal err=%v", err) + slog.Error("[sfn] json.Marshal error", err) os.Exit(-2) } else { - logger.Printf(">> [sfn] got tag=0x33, data=%+v", model) + slog.Info("[sfn]", "got", 0x33, "data", model) } return 0x0, nil } diff --git a/example/0-basic/source/main.go b/example/0-basic/source/main.go index 27d476e64..7bc76d159 100644 --- a/example/0-basic/source/main.go +++ b/example/0-basic/source/main.go @@ -2,17 +2,16 @@ package main import ( "encoding/json" - stdlog "log" "math/rand" "os" - "strings" "time" "github.com/yomorun/yomo" - "github.com/yomorun/yomo/core/log" + "golang.org/x/exp/slog" ) -var logger = NewCustomLogger() +// custom logger +var logger = slog.New(slog.NewTextHandler(os.Stdout)) type noiseData struct { Noise float32 `json:"noise"` // Noise value @@ -33,7 +32,7 @@ func main() { ) err := source.Connect() if err != nil { - logger.Printf("[source] āŒ Emit the data to YoMo-Zipper failure with err: %v", err) + logger.Error("[source] āŒ Emit the data to YoMo-Zipper failure with err", err) return } @@ -42,13 +41,13 @@ func main() { source.SetDataTag(0x33) // set the error handler function when server error occurs source.SetErrorHandler(func(err error) { - logger.Printf("[source] receive server error: %v", err) + logger.Error("[source] receive server error", err) os.Exit(1) }) // generate mock data and send it to YoMo-Zipper in every 100 ms. err = generateAndSendData(source) - logger.Printf("[source] >>>> ERR >>>> %v", err) + logger.Error("[source] >>>> ERR", err) os.Exit(0) } @@ -64,7 +63,7 @@ func generateAndSendData(stream yomo.Source) error { sendingBuf, err := json.Marshal(&data) if err != nil { - logger.Errorf("json.Marshal err:%v", err) + logger.Error("json.Marshal error", err) os.Exit(-1) } @@ -76,97 +75,14 @@ func generateAndSendData(stream yomo.Source) error { return nil } if err != nil { - logger.Errorf("[source] āŒ Emit %v to YoMo-Zipper failure with err: %v", data, err) + logger.Error("[source] āŒ Emit to YoMo-Zipper failure with err", err, "data", data) time.Sleep(500 * time.Millisecond) continue } else { - logger.Printf("[source] āœ… Emit %v to YoMo-Zipper", data) + logger.Info("[source] āœ… Emit to YoMo-Zipper", "data", data) } time.Sleep(1000 * time.Millisecond) } } - -// custom logger - -var _ = log.Logger(&CustomLogger{}) - -type CustomLogger struct { - level log.Level -} - -func NewCustomLogger() log.Logger { - envLevel := strings.ToLower(os.Getenv("YOMO_LOG_LEVEL")) - level := log.ErrorLevel - switch envLevel { - case "debug": - level = log.DebugLevel - case "info": - level = log.InfoLevel - case "warn": - level = log.WarnLevel - case "error": - level = log.ErrorLevel - } - - return &CustomLogger{ - level: level, - } -} - -func (c *CustomLogger) SetLevel(level log.Level) { - c.level = level -} - -func (c *CustomLogger) SetEncoding(encoding string) { -} - -// Printf prints a formated message at LevelNo -func (c *CustomLogger) Printf(template string, args ...interface{}) { - c.log(log.NoLevel, template, args...) -} - -// Debugf logs a message at LevelDebug. -func (c *CustomLogger) Debugf(template string, args ...interface{}) { - c.log(log.DebugLevel, template, args...) -} - -// Infof logs a message at LevelInfo. -func (c *CustomLogger) Infof(template string, args ...interface{}) { - c.log(log.InfoLevel, template, args...) -} - -// Warnf logs a message at LevelWarn. -func (c *CustomLogger) Warnf(template string, args ...interface{}) { - c.log(log.WarnLevel, template, args...) -} - -// Errorf logs a message at LevelError. -func (c *CustomLogger) Errorf(template string, args ...interface{}) { - c.log(log.ErrorLevel, template, args...) -} - -// Output file path to write log message output to -func (c *CustomLogger) Output(file string) { -} - -// ErrorOutput file path to write error message output to -func (c *CustomLogger) ErrorOutput(file string) { -} - -func (c *CustomLogger) log(level log.Level, template string, args ...interface{}) { - if c.level == log.Disabled { - return - } - - v := []interface{}{level} - v = append(v, args...) - if c.level == log.NoLevel { - stdlog.Printf("%s "+template, v...) - return - } - if level >= c.level { - stdlog.Printf("%s "+template, v...) - } -} diff --git a/example/3-multi-sfn/stream-fn-1/app.go b/example/3-multi-sfn/stream-fn-1/app.go index c91ea2361..fe7000a78 100644 --- a/example/3-multi-sfn/stream-fn-1/app.go +++ b/example/3-multi-sfn/stream-fn-1/app.go @@ -5,12 +5,12 @@ import ( "context" "encoding/binary" "encoding/json" + "fmt" "os" "time" "github.com/yomorun/yomo" "github.com/yomorun/yomo/core/frame" - "github.com/yomorun/yomo/pkg/logger" ) // NoiseData represents the structure of data @@ -34,7 +34,7 @@ func main() { err := sfn.Connect() if err != nil { - logger.Errorf("[fn1] connect err=%v", err) + fmt.Printf("[fn1] connect err=%v", err) os.Exit(1) } @@ -45,7 +45,7 @@ func handler(data []byte) (frame.Tag, []byte) { var mold noiseData err := json.Unmarshal(data, &mold) if err != nil { - logger.Errorf("[fn1] y3.ToObject err=%v", err) + fmt.Printf("[fn1] y3.ToObject err=%v", err) return 0x0, nil } mold.Noise = mold.Noise / 10 @@ -53,7 +53,7 @@ func handler(data []byte) (frame.Tag, []byte) { // Print every value and return noise value to downstream. result, err := printExtract(context.Background(), &mold) if err != nil { - logger.Errorf("[fn1] to downstream err=%v", err) + fmt.Printf("[fn1] to downstream err=%v", err) return 0x0, nil } @@ -64,7 +64,7 @@ func handler(data []byte) (frame.Tag, []byte) { // Print every value and return noise value to downstream. var printExtract = func(_ context.Context, value *noiseData) (float32, error) { rightNow := time.Now().UnixNano() / int64(time.Millisecond) - logger.Printf("āœ… [%s] %d > value: %f āš”ļø=%dms", value.From, value.Time, value.Noise, rightNow-value.Time) + fmt.Printf("āœ… [%s] %d > value: %f āš”ļø=%dms", value.From, value.Time, value.Noise, rightNow-value.Time) return value.Noise, nil } diff --git a/example/3-multi-sfn/stream-fn-2/app.go b/example/3-multi-sfn/stream-fn-2/app.go index beff9ebf7..a2af783ce 100644 --- a/example/3-multi-sfn/stream-fn-2/app.go +++ b/example/3-multi-sfn/stream-fn-2/app.go @@ -10,7 +10,6 @@ import ( "github.com/yomorun/yomo" "github.com/yomorun/yomo/core/frame" - "github.com/yomorun/yomo/pkg/logger" ) // ThresholdSingleValue is the threshold of a single value. @@ -42,7 +41,7 @@ func main() { err := sfn.Connect() if err != nil { - logger.Errorf("[fn2] connect err=%v", err) + fmt.Printf("[fn2] connect err=%v", err) os.Exit(1) } @@ -53,7 +52,7 @@ func handler(data []byte) (frame.Tag, []byte) { v := Float32frombytes(data) result, err := computePeek(context.Background(), v) if err != nil { - logger.Errorf("[fn2] computePeek err=%v", err) + fmt.Printf("[fn2] computePeek err=%v", err) return 0x0, nil } diff --git a/example/3-multi-sfn/stream-fn-3/app.go b/example/3-multi-sfn/stream-fn-3/app.go index ec15a65a1..7235e66f3 100644 --- a/example/3-multi-sfn/stream-fn-3/app.go +++ b/example/3-multi-sfn/stream-fn-3/app.go @@ -3,6 +3,7 @@ package main import ( "context" "encoding/binary" + "fmt" "math" "os" "sync" @@ -10,7 +11,6 @@ import ( "github.com/yomorun/yomo" "github.com/yomorun/yomo/core/frame" - "github.com/yomorun/yomo/pkg/logger" ) // ThresholdAverageValue is the threshold of the average value after a sliding window. @@ -31,9 +31,9 @@ var slidingAvg = func(i interface{}) error { total += value.(float32) } avg := total / float32(len(values)) - logger.Printf("šŸ§© average value in last %d ms: %f!", SlidingWindowInMS, avg) + fmt.Printf("šŸ§© average value in last %d ms: %f!", SlidingWindowInMS, avg) if avg >= ThresholdAverageValue { - logger.Printf("ā—ā— average value in last %d ms: %f reaches the threshold %d!", SlidingWindowInMS, avg, ThresholdAverageValue) + fmt.Printf("ā—ā— average value in last %d ms: %f reaches the threshold %d!", SlidingWindowInMS, avg, ThresholdAverageValue) } } return nil @@ -55,7 +55,7 @@ func main() { err := sfn.Connect() if err != nil { - logger.Errorf("[fn3] connect err=%v", err) + fmt.Printf("[fn3] connect err=%v", err) os.Exit(1) } @@ -66,7 +66,7 @@ func main() { func handler(data []byte) (frame.Tag, []byte) { v := Float32frombytes(data) - logger.Printf("āœ… [fn3] observe <- %v", v) + fmt.Printf("āœ… [fn3] observe <- %v", v) observe <- v return 0x16, nil // no more processing, return nil @@ -107,7 +107,7 @@ func SlidingWindowWithTime(observe <-chan float32, windowTimeInMS uint32, slideT if len(availableItems) != 0 { err := handler(availableItems) if err != nil { - logger.Errorf("[fn3] SlidingWindowWithTime err=%v", err) + fmt.Printf("[fn3] SlidingWindowWithTime err=%v", err) return } } diff --git a/example/5-backflow/sfn-1/main.go b/example/5-backflow/sfn-1/main.go index 9b572d45f..839620819 100644 --- a/example/5-backflow/sfn-1/main.go +++ b/example/5-backflow/sfn-1/main.go @@ -1,12 +1,12 @@ package main import ( + "fmt" "os" "strconv" "github.com/yomorun/yomo" "github.com/yomorun/yomo/core/frame" - "github.com/yomorun/yomo/pkg/logger" ) type noiseData struct { @@ -32,12 +32,12 @@ func main() { // start err := sfn.Connect() if err != nil { - logger.Errorf("[sfn-1] connect err=%v", err) + fmt.Printf("[sfn-1] connect err=%v", err) os.Exit(1) } // set the error handler function when server error occurs sfn.SetErrorHandler(func(err error) { - logger.Errorf("[sfn-1] receive server error: %v", err) + fmt.Printf("[sfn-1] receive server error: %v", err) sfn.Close() os.Exit(1) }) @@ -49,12 +49,12 @@ func handler(data []byte) (frame.Tag, []byte) { // got noise, err := strconv.ParseFloat(string(data), 10) if err != nil { - logger.Errorf("[sfn-1] got err=%v", err) + fmt.Printf("[sfn-1] got err=%v", err) return 0x0, nil } // result result := int(noise) - logger.Printf("[sfn-1] got: tag=0x33, data=%v, return: tag=0x34, data=%v", noise, result) + fmt.Printf("[sfn-1] got: tag=0x33, data=%v, return: tag=0x34, data=%v", noise, result) return 0x34, []byte(strconv.Itoa(result)) } diff --git a/example/5-backflow/sfn-2/main.go b/example/5-backflow/sfn-2/main.go index fab7fab8d..01eb45ed8 100644 --- a/example/5-backflow/sfn-2/main.go +++ b/example/5-backflow/sfn-2/main.go @@ -1,12 +1,12 @@ package main import ( + "fmt" "os" "strconv" "github.com/yomorun/yomo" "github.com/yomorun/yomo/core/frame" - "github.com/yomorun/yomo/pkg/logger" ) func main() { @@ -26,12 +26,12 @@ func main() { // start err := sfn.Connect() if err != nil { - logger.Errorf("[sfn-2] connect err=%v", err) + fmt.Printf("[sfn-2] connect err=%v", err) os.Exit(1) } // set the error handler function when server error occurs sfn.SetErrorHandler(func(err error) { - logger.Errorf("[sfn-2] receive server error: %v", err) + fmt.Printf("[sfn-2] receive server error: %v", err) sfn.Close() os.Exit(1) }) @@ -43,12 +43,12 @@ func handler(data []byte) (frame.Tag, []byte) { // got noise, err := strconv.Atoi(string(data)) if err != nil { - logger.Errorf("[sfn-2] got err=%v", err) + fmt.Printf("[sfn-2] got err=%v", err) return 0x0, nil } // result result := noise * 10 - logger.Printf("[sfn-2] got: tag=0x34, data=%v, return: tag=0x35, data=%v", noise, result) + fmt.Printf("[sfn-2] got: tag=0x34, data=%v, return: tag=0x35, data=%v", noise, result) return 0x35, []byte(strconv.Itoa(result)) } diff --git a/example/5-backflow/source/main.go b/example/5-backflow/source/main.go index f561cd332..ba46e1d9f 100644 --- a/example/5-backflow/source/main.go +++ b/example/5-backflow/source/main.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "math/rand" "os" "strconv" @@ -8,7 +9,6 @@ import ( "github.com/yomorun/yomo" "github.com/yomorun/yomo/core/frame" - "github.com/yomorun/yomo/pkg/logger" ) func main() { @@ -24,7 +24,7 @@ func main() { ) err := source.Connect() if err != nil { - logger.Printf("[source] āŒ Emit the data to YoMo-Zipper failure with err: %v", err) + fmt.Printf("[source] āŒ Emit the data to YoMo-Zipper failure with err: %v", err) return } @@ -33,17 +33,17 @@ func main() { source.SetDataTag(0x33) // set the error handler function when server error occurs source.SetErrorHandler(func(err error) { - logger.Printf("[source] receive server error: %v", err) + fmt.Printf("[source] receive server error: %v", err) os.Exit(1) }) // set receive handler for the observe datatags source.SetReceiveHandler(func(tag frame.Tag, data []byte) { - logger.Printf("[source] ā™»ļø receive backflow: tag=%#v, data=%s", tag, data) + fmt.Printf("[source] ā™»ļø receive backflow: tag=%#v, data=%s", tag, data) }) // generate mock data and send it to YoMo-Zipper in every 100 ms. err = generateAndSendData(source) - logger.Printf("[source] >>>> ERR >>>> %v", err) + fmt.Printf("[source] >>>> ERR >>>> %v", err) os.Exit(0) } @@ -55,12 +55,12 @@ func generateAndSendData(stream yomo.Source) error { // send data via QUIC stream. _, err := stream.Write(data) if err != nil { - logger.Errorf("[source] āŒ Emit %.2f to YoMo-Zipper failure with err: %v", noise, err) + fmt.Printf("[source] āŒ Emit %.2f to YoMo-Zipper failure with err: %v", noise, err) time.Sleep(500 * time.Millisecond) continue } else { - logger.Printf("[source] āœ… Emit %.2f to YoMo-Zipper", noise) + fmt.Printf("[source] āœ… Emit %.2f to YoMo-Zipper", noise) } time.Sleep(1000 * time.Millisecond) diff --git a/go.mod b/go.mod index 9150c5748..55375d18b 100644 --- a/go.mod +++ b/go.mod @@ -18,10 +18,8 @@ require ( github.com/spf13/viper v1.13.0 github.com/stretchr/testify v1.8.1 github.com/yomorun/y3 v1.0.5 - go.uber.org/zap v1.23.0 golang.org/x/exp v0.0.0-20221212164502-fae10dda9338 golang.org/x/tools v0.3.0 - gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -50,8 +48,7 @@ require ( github.com/stretchr/objx v0.5.0 // indirect github.com/subosito/gotenv v1.4.1 // indirect github.com/teivah/onecontext v1.3.0 // indirect - go.uber.org/atomic v1.10.0 // indirect - go.uber.org/multierr v1.8.0 // indirect + go.uber.org/goleak v1.1.11 // indirect golang.org/x/crypto v0.1.0 // indirect golang.org/x/mod v0.7.0 // indirect golang.org/x/net v0.2.0 // indirect diff --git a/go.sum b/go.sum index 5ebd40ed3..fa01e876f 100644 --- a/go.sum +++ b/go.sum @@ -36,10 +36,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/briandowns/spinner v1.19.0 h1:s8aq38H+Qju89yhp89b4iIiMzMm8YN3p6vGpwyh/a8E= github.com/briandowns/spinner v1.19.0/go.mod h1:mQak9GHqbspjC/5iUx3qMlIho8xBS/ppAL/hX5SmPJU= github.com/bytecodealliance/wasmtime-go v1.0.0 h1:9u9gqaUiaJeN5IoD1L7egD8atOnTGyJcNp8BhkL9cUU= @@ -191,7 +189,6 @@ github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3v github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pelletier/go-toml/v2 v2.0.5 h1:ipoSadvV8oGUjnUbMub59IDPPwfxF694nG/jwbMiyQg= github.com/pelletier/go-toml/v2 v2.0.5/go.mod h1:OMHamSCAODeSsVrwwvcJOaoN0LIUIaFVNZzmWyNfXas= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -248,15 +245,9 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= -go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= -go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= -go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= -go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= -go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY= -go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= +go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -276,8 +267,6 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20221204150635-6dcec336b2bb h1:QIsP/NmClBICkqnJ4rSIhnrGiGR7Yv9ZORGGnmmLTPk= -golang.org/x/exp v0.0.0-20221204150635-6dcec336b2bb/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/exp v0.0.0-20221212164502-fae10dda9338 h1:OvjRkcNHnf6/W5FZXSxODbxwD+X7fspczG7Jn/xQVD4= golang.org/x/exp v0.0.0-20221212164502-fae10dda9338/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= @@ -465,6 +454,7 @@ golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.3.0 h1:SrNbZl6ECOS1qFzgTdQfWXZM9XBkiA6tkFrH9YSTPHM= golang.org/x/tools v0.3.0/go.mod h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -567,13 +557,10 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= -gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= -gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/options.go b/options.go index bca5b04fa..fddb7e4dc 100644 --- a/options.go +++ b/options.go @@ -6,7 +6,7 @@ import ( "github.com/lucas-clemente/quic-go" "github.com/yomorun/yomo/core" "github.com/yomorun/yomo/core/frame" - "github.com/yomorun/yomo/core/log" + "golang.org/x/exp/slog" ) const ( @@ -19,15 +19,15 @@ type Option func(o *Options) // Options are the options for YoMo type Options struct { - ZipperAddr string // target Zipper endpoint address - // ZipperListenAddr string // Zipper endpoint address - ZipperWorkflowConfig string // Zipper workflow file - MeshConfigURL string // meshConfigURL is the URL of edge-mesh config - ServerOptions []core.ServerOption - ClientOptions []core.ClientOption - QuicConfig *quic.Config - TLSConfig *tls.Config - Logger log.Logger + ZipperAddr string // target Zipper endpoint address + MeshConfigURL string // meshConfigURL is the URL of edge-mesh config + ServerOptions []core.ServerOption + ClientOptions []core.ClientOption + QuicConfig *quic.Config + TLSConfig *tls.Config + + // TODO: WithWorkflowConfig + // zipperWorkflowConfig string // Zipper workflow file } // WithZipperAddr return a new options with ZipperAddr set to addr. @@ -37,15 +37,6 @@ func WithZipperAddr(addr string) Option { } } -// // WithZipperListenAddr return a new options with ZipperListenAddr set to addr. -// func WithZipperListenAddr(addr string) Option { -// return func(o *options) { -// o.ZipperListenAddr = addr -// } -// } - -// TODO: WithWorkflowConfig - // WithMeshConfigURL sets the initial edge-mesh config URL for the YoMo-Zipper. func WithMeshConfigURL(url string) Option { return func(o *Options) { @@ -112,7 +103,7 @@ func WithObserveDataTags(tags ...frame.Tag) Option { } // WithLogger sets the client logger -func WithLogger(logger log.Logger) Option { +func WithLogger(logger *slog.Logger) Option { return func(o *Options) { o.ClientOptions = append( o.ClientOptions, diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go deleted file mode 100644 index 3f4ad1de8..000000000 --- a/pkg/logger/logger.go +++ /dev/null @@ -1,88 +0,0 @@ -package logger - -import ( - "os" - "strings" - - "github.com/yomorun/yomo/core/log" -) - -var logger log.Logger - -func init() { - logger = Default(isEnableDebug()) -} - -// SetLogger allows developers to customize the logger instance. -func SetLogger(l log.Logger) { - logger = l -} - -// EnableDebug enables the development model for logging. -// Deprecated -func EnableDebug() { - logger = Default(true) -} - -// Printf prints a formated message without a specified level. -func Printf(format string, v ...interface{}) { - logger.Printf(format, v...) -} - -// Debugf logs a message at DebugLevel. -func Debugf(template string, args ...interface{}) { - logger.Debugf(template, args...) -} - -// Infof logs a message at InfoLevel. -func Infof(template string, args ...interface{}) { - logger.Infof(template, args...) -} - -// Warnf logs a message at WarnLevel. -func Warnf(template string, args ...interface{}) { - logger.Warnf(template, args...) -} - -// Errorf logs a message at ErrorLevel. -func Errorf(template string, args ...interface{}) { - logger.Errorf(template, args...) -} - -// isEnableDebug indicates whether the debug is enabled. -func isEnableDebug() bool { - return os.Getenv("YOMO_ENABLE_DEBUG") == "true" -} - -// isJSONFormat indicates whether the log is in JSON format. -func isJSONFormat() bool { - return os.Getenv("YOMO_LOG_FORMAT") == "json" -} - -func logFormat() string { - return os.Getenv("YOMO_LOG_FORMAT") -} - -func logLevel() log.Level { - envLevel := strings.ToLower(os.Getenv("YOMO_LOG_LEVEL")) - level := log.ErrorLevel - switch envLevel { - case "debug": - return log.DebugLevel - case "info": - return log.InfoLevel - case "warn": - return log.WarnLevel - case "error": - return log.ErrorLevel - } - return level -} - -func output() string { - return strings.ToLower(os.Getenv("YOMO_LOG_OUTPUT")) -} - -func errorOutput() string { - return strings.ToLower(os.Getenv("YOMO_LOG_ERROR_OUTPUT")) -} diff --git a/pkg/logger/zap.go b/pkg/logger/zap.go deleted file mode 100644 index 7d866b0cc..000000000 --- a/pkg/logger/zap.go +++ /dev/null @@ -1,228 +0,0 @@ -package logger - -import ( - stdlog "log" - "os" - "time" - - "github.com/yomorun/yomo/core/log" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "gopkg.in/natefinch/lumberjack.v2" -) - -const ( - timeFormat = "2006-01-02 15:04:05.000" -) - -// zapLogger is the logger implementation in go.uber.org/zap -type zapLogger struct { - level zapcore.Level - debug bool - encoding string - opts []zap.Option - logger *zap.Logger - instance *zap.SugaredLogger - output string - errorOutput string -} - -// Default the default logger instance -func Default(debug ...bool) log.Logger { - z := New() - z.SetLevel(logLevel()) - if isJSONFormat() { - z.SetEncoding("json") - } - // env debug - if isEnableDebug() { - z.SetLevel(log.DebugLevel) - } - if len(debug) > 0 { - if debug[0] { - z.SetLevel(log.DebugLevel) - } - } - z.Output(output()) - z.ErrorOutput(errorOutput()) - - return z -} - -// New create new logger instance -func New(opts ...zap.Option) log.Logger { - // std logger - stdlog.Default().SetFlags(0) - stdlog.Default().SetOutput(new(logWriter)) - - z := zapLogger{ - level: zap.ErrorLevel, - debug: false, - encoding: "console", - opts: opts, - } - - return &z -} - -func openSinks(cfg zap.Config) (zapcore.WriteSyncer, zapcore.WriteSyncer, error) { - sink, closeOut, err := zap.Open(cfg.OutputPaths...) - if err != nil { - return nil, nil, err - } - errSink, _, err := zap.Open(cfg.ErrorOutputPaths...) - if err != nil { - closeOut() - return nil, nil, err - } - return sink, errSink, nil -} - -// SetEncoding set logger message coding -func (z *zapLogger) SetEncoding(enc string) { - z.encoding = enc -} - -// SetLevel set logger level -func (z *zapLogger) SetLevel(lvl log.Level) { - isDebug := lvl == log.DebugLevel - level := zap.ErrorLevel - switch lvl { - case log.DebugLevel: - level = zap.DebugLevel - case log.InfoLevel: - level = zap.InfoLevel - case log.WarnLevel: - level = zap.WarnLevel - case log.ErrorLevel: - level = zap.ErrorLevel - } - z.level = level - z.debug = isDebug -} - -// Output file path to write log message -func (z *zapLogger) Output(file string) { - if file != "" { - z.output = file - } -} - -// ErrorOutput file path to write log message -func (z *zapLogger) ErrorOutput(file string) { - if file != "" { - z.errorOutput = file - } -} - -// Printf logs a message wihout level -func (z *zapLogger) Printf(format string, v ...interface{}) { - stdlog.Printf(format, v...) -} - -// Debugf logs a message at DebugLevel -func (z *zapLogger) Debugf(template string, args ...interface{}) { - z.Instance().Debugf(template, args...) -} - -// Infof logs a message at InfoLevel -func (z *zapLogger) Infof(template string, args ...interface{}) { - z.Instance().Infof(template, args...) -} - -// Warnf logs a message at WarnLevel -func (z zapLogger) Warnf(template string, args ...interface{}) { - z.Instance().Warnf(template, args...) -} - -// Errorf logs a message at ErrorLevel -func (z zapLogger) Errorf(template string, args ...interface{}) { - z.Instance().Errorf(template, args...) -} - -func (z *zapLogger) Instance() *zap.SugaredLogger { - if z.instance == nil { - // zap - encoderConfig := zapcore.EncoderConfig{ - TimeKey: "ts", - LevelKey: "level", - NameKey: "logger", - CallerKey: "caller", - FunctionKey: zapcore.OmitKey, - MessageKey: "msg", - StacktraceKey: "stacktrace", - LineEnding: zapcore.DefaultLineEnding, - EncodeLevel: zapcore.CapitalColorLevelEncoder, - EncodeTime: timeEncoder, - EncodeDuration: zapcore.SecondsDurationEncoder, - EncodeCaller: zapcore.ShortCallerEncoder, - } - cfg := zap.Config{ - Level: zap.NewAtomicLevelAt(zap.ErrorLevel), - Development: z.debug, - DisableCaller: true, - DisableStacktrace: true, - Encoding: z.encoding, - EncoderConfig: encoderConfig, - OutputPaths: []string{"stderr"}, - ErrorOutputPaths: []string{"stderr"}, - } - cfg.Level.SetLevel(z.level) - if z.debug { - // set the minimal level to debug - cfg.Level.SetLevel(zap.DebugLevel) - } - // output - if z.output != "" { - cfg.OutputPaths = append(cfg.OutputPaths, z.output) - } - encoder := zapcore.NewConsoleEncoder(encoderConfig) - sink, _, err := openSinks(cfg) - if err != nil { - panic(err) - } - core := zapcore.NewCore(encoder, sink, cfg.Level) - // error output - if z.errorOutput != "" { - rotatedLogger := errorRotatedLogger(z.errorOutput, 10, 30, 7) - errorOutputOption := zap.Hooks(func(entry zapcore.Entry) error { - if entry.Level == zap.ErrorLevel { - msg, err := encoder.EncodeEntry(entry, nil) - if err != nil { - return err - } - rotatedLogger.Write(msg.Bytes()) - } - return nil - }) - z.opts = append(z.opts, errorOutputOption) - } - logger := zap.New(core, z.opts...) - - z.logger = logger - z.instance = z.logger.Sugar() - } - return z.instance -} - -func errorRotatedLogger(file string, maxSize, maxBacukups, maxAge int) *lumberjack.Logger { - return &lumberjack.Logger{ - Filename: file, - MaxSize: maxSize, - MaxBackups: maxBacukups, - MaxAge: maxAge, - Compress: false, - } -} - -func timeEncoder(t time.Time, enc zapcore.PrimitiveArrayEncoder) { - enc.AppendString(t.Format(timeFormat)) -} - -type logWriter struct{} - -func (l logWriter) Write(bytes []byte) (int, error) { - os.Stderr.WriteString(time.Now().Format(timeFormat)) - os.Stderr.Write([]byte("\t")) - return os.Stderr.Write(bytes) -} diff --git a/rx/runtime.go b/rx/runtime.go index e85231bd6..777aae48d 100644 --- a/rx/runtime.go +++ b/rx/runtime.go @@ -5,7 +5,7 @@ import ( "github.com/yomorun/yomo" "github.com/yomorun/yomo/core/frame" - "github.com/yomorun/yomo/pkg/logger" + "github.com/yomorun/yomo/core/ylog" ) // Runtime is the Stream Serverless Runtime for RxStream. @@ -32,22 +32,22 @@ func (r *Runtime) RawByteHandler(req []byte) (frame.Tag, []byte) { // observe the data from RxStream. for item := range r.stream.Observe() { if item.Error() { - logger.Errorf("[Rx Handler] Handler got an error, err=%v", item.E) + ylog.Error("[Rx Handler] Handler got an error", item.E) continue } if item.V == nil { - logger.Warnf("[Rx Handler] the returned data is nil.") + ylog.Warn("[Rx Handler] the returned data is nil.") continue } res, ok := (item.V).(frame.PayloadFrame) if !ok { - logger.Warnf("[Rx Handler] the data is not a frame.PayloadFrame, won't send it to YoMo-Zipper.") + ylog.Warn("[Rx Handler] the data is not a frame.PayloadFrame, won't send it to YoMo-Zipper.") continue } - logger.Infof("[RawByteHandler] Send data with [tag=%#x] to YoMo-Zipper.", res.Tag) + ylog.Info("[RawByteHandler] Send data to YoMo-Zipper.", "tag", res.Tag) return res.Tag, res.Carriage } @@ -63,22 +63,22 @@ func (r *Runtime) PipeHandler(in <-chan []byte, out chan<- *frame.PayloadFrame) r.rawBytesChan <- req case item := <-r.stream.Observe(): if item.Error() { - logger.Errorf("[rx PipeHandler] Handler got an error, err=%v", item.E) + ylog.Error("[rx PipeHandler] Handler got an error", item.E) continue } if item.V == nil { - logger.Warnf("[rx PipeHandler] the returned data is nil.") + ylog.Warn("[rx PipeHandler] the returned data is nil.") continue } res, ok := (item.V).(frame.PayloadFrame) if !ok { - logger.Warnf("[rx PipeHandler] the data is not a frame.PayloadFrame, won't send it to YoMo-Zipper.") + ylog.Warn("[rx PipeHandler] the data is not a frame.PayloadFrame, won't send it to YoMo-Zipper.") continue } - logger.Infof("[rx PipeHandler] Send data with [tag=%#x] to YoMo-Zipper.", res.Tag) + ylog.Info("[rx PipeHandler] Send data with [tag=%#x] to YoMo-Zipper.", res.Tag) out <- &res } } diff --git a/rx/stream_operator.go b/rx/stream_operator.go index c7c7a7588..a7945e300 100644 --- a/rx/stream_operator.go +++ b/rx/stream_operator.go @@ -10,7 +10,7 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/reactivex/rxgo/v2" "github.com/yomorun/yomo/core/frame" - "github.com/yomorun/yomo/pkg/logger" + "github.com/yomorun/yomo/core/ylog" ) // Of creates an item from a value. @@ -790,7 +790,7 @@ func (s *StreamImpl) PipeBackToZipper(dataTag frame.Tag) Stream { buf, ok := (item.V).([]byte) if !ok { - logger.Errorf("[PipeBackToZipper] the data is not a []byte, won't send pass it to next.") + ylog.Error("[PipeBackToZipper] the data is not a []byte, won't send pass it to next.", errors.New("item.V is not []byte type")) continue } diff --git a/rx/stream_operator_test.go b/rx/stream_operator_test.go index 165dc48ab..913ce1870 100644 --- a/rx/stream_operator_test.go +++ b/rx/stream_operator_test.go @@ -110,11 +110,6 @@ func Test_AuditTime(t *testing.T) { }) } -type testStruct struct { - ID uint32 `y3:"0x11"` - Name string `y3:"0x12"` -} - func Test_SlidingWindowWithCount(t *testing.T) { t.Run("window size = 1, slide size = 1, handler does nothing", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) diff --git a/sfn.go b/sfn.go index a11fb10de..bd63e5097 100644 --- a/sfn.go +++ b/sfn.go @@ -7,10 +7,6 @@ import ( "github.com/yomorun/yomo/core/frame" ) -const ( - streamFunctionLogPrefix = "\033[31m[yomo:sfn]\033[0m " -) - // StreamFunction defines serverless streaming functions. type StreamFunction interface { // SetObserveDataTags set the data tag list that will be observed @@ -62,29 +58,29 @@ type streamFunction struct { // Deprecated: use yomo.WithObserveDataTags instead func (s *streamFunction) SetObserveDataTags(tag ...frame.Tag) { s.client.SetObserveDataTags(tag...) - s.client.Logger().Debugf("%sSetObserveDataTag(%v)", streamFunctionLogPrefix, s.observeDataTags) + s.client.Logger().Debug("sSetObserveDataTag", "tags", s.observeDataTags) } // SetHandler set the handler function, which accept the raw bytes data and return the tag & response. func (s *streamFunction) SetHandler(fn core.AsyncHandler) error { s.fn = fn - s.client.Logger().Debugf("%sSetHandler(%v)", streamFunctionLogPrefix, s.fn) + s.client.Logger().Debug("SetHandler") return nil } func (s *streamFunction) SetPipeHandler(fn core.PipeHandler) error { s.pfn = fn - s.client.Logger().Debugf("%sSetHandler(%v)", streamFunctionLogPrefix, s.pfn) + s.client.Logger().Debug("SetPipeHandler") return nil } // Connect create a connection to the zipper, when data arrvied, the data will be passed to the // handler which setted by SetHandler method. func (s *streamFunction) Connect() error { - s.client.Logger().Debugf("%s Connect()", streamFunctionLogPrefix) + s.client.Logger().Debug("Connect") // notify underlying network operations, when data with tag we observed arrived, invoke the func s.client.SetDataFrameObserver(func(data *frame.DataFrame) { - s.client.Logger().Debugf("%sreceive DataFrame: %v", streamFunctionLogPrefix, data) + s.client.Logger().Debug("receive DataFrame", "data_frame", data.String()) s.onDataFrame(data.GetCarriage(), data.GetMetaFrame()) }) @@ -102,7 +98,7 @@ func (s *streamFunction) Connect() error { for { data := <-s.pOut if data != nil { - s.client.Logger().Debugf("%spipe fn send: %v", streamFunctionLogPrefix, data) + s.client.Logger().Debug("pipe fn send", "payload_frame", data) frame := frame.NewDataFrame() // todo: frame.SetTransactionID frame.SetCarriage(data.Tag, data.Carriage) @@ -114,7 +110,7 @@ func (s *streamFunction) Connect() error { err := s.client.Connect(context.Background(), s.zipperEndpoint) if err != nil { - s.client.Logger().Errorf("%sConnect() error: %s", streamFunctionLogPrefix, err) + s.client.Logger().Error("Connect error", err) } return err } @@ -131,7 +127,7 @@ func (s *streamFunction) Close() error { if s.client != nil { if err := s.client.Close(); err != nil { - s.client.Logger().Errorf("%sClose(): %v", err) + s.client.Logger().Error("Close error", err) return err } } @@ -141,7 +137,7 @@ func (s *streamFunction) Close() error { // when DataFrame we observed arrived, invoke the user's function func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) { - s.client.Logger().Infof("%sonDataFrame ->[%s]", streamFunctionLogPrefix, s.name) + s.client.Logger().Debug("onDataFrame") if s.fn != nil { go func() { @@ -157,15 +153,15 @@ func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) { // reuse sourceID frame.SetSourceID(metaFrame.SourceID()) frame.SetCarriage(tag, resp) - s.client.Logger().Debugf("%sstart WriteFrame(): %v", streamFunctionLogPrefix, resp) + s.client.Logger().Debug("start WriteFrame()", "resp", resp) s.client.WriteFrame(frame) } }() } else if s.pfn != nil { - s.client.Logger().Debugf("%spipe fn receive: data[%d]=%# x", streamFunctionLogPrefix, len(data), data) + s.client.Logger().Debug("pipe fn receive", "data_len", len(data), "data", data) s.pIn <- data } else { - s.client.Logger().Warnf("%sStreamFunction is nil", streamFunctionLogPrefix) + s.client.Logger().Warn("StreamFunction is nil") } } diff --git a/source.go b/source.go index a0ec158f8..9ce3f1930 100644 --- a/source.go +++ b/source.go @@ -7,10 +7,6 @@ import ( "github.com/yomorun/yomo/core/frame" ) -const ( - sourceLogPrefix = "\033[32m[yomo:source]\033[0m " -) - // Source is responsible for sending data to yomo. type Source interface { // Close will close the connection to YoMo-Zipper. @@ -71,10 +67,10 @@ func (s *yomoSource) SetDataTag(tag frame.Tag) { // Close will close the connection to YoMo-Zipper. func (s *yomoSource) Close() error { if err := s.client.Close(); err != nil { - s.client.Logger().Errorf("%sClose(): %v", sourceLogPrefix, err) + s.client.Logger().Error("Close error", err) return err } - s.client.Logger().Debugf("%s is closed", sourceLogPrefix) + s.client.Logger().Debug("source is closed") return nil } @@ -89,7 +85,7 @@ func (s *yomoSource) Connect() error { err := s.client.Connect(context.Background(), s.zipperEndpoint) if err != nil { - s.client.Logger().Errorf("%sConnect() error: %s", sourceLogPrefix, err) + s.client.Logger().Error("Connect error", err) } return err } @@ -99,7 +95,7 @@ func (s *yomoSource) WriteWithTag(tag frame.Tag, data []byte) error { f := frame.NewDataFrame() f.SetCarriage(tag, data) f.SetSourceID(s.client.ClientID()) - s.client.Logger().Debugf("%sWriteWithTag: %v", sourceLogPrefix, f) + s.client.Logger().Debug("WriteWithTag", "data_frame", f.String()) return s.client.WriteFrame(f) } @@ -111,7 +107,7 @@ func (s *yomoSource) SetErrorHandler(fn func(err error)) { // [Experimental] SetReceiveHandler set the observe handler function func (s *yomoSource) SetReceiveHandler(fn func(frame.Tag, []byte)) { s.fn = fn - s.client.Logger().Debugf("%sSetReceiveHandler(%v)", sourceLogPrefix, s.fn) + s.client.Logger().Debug("SetReceiveHandler") } // Broadcast Write the data to all downstream @@ -120,6 +116,6 @@ func (s *yomoSource) Broadcast(data []byte) error { f.SetCarriage(s.tag, data) f.SetSourceID(s.client.ClientID()) f.SetBroadcast(true) - s.client.Logger().Debugf("%sBroadcast: %v", sourceLogPrefix, f) + s.client.Logger().Debug("Broadcast", "data_frame", f.String()) return s.client.WriteFrame(f) } diff --git a/zipper.go b/zipper.go index 0b25a210a..878575bc1 100644 --- a/zipper.go +++ b/zipper.go @@ -10,12 +10,9 @@ import ( "github.com/yomorun/yomo/core" "github.com/yomorun/yomo/core/metadata" "github.com/yomorun/yomo/core/router" + "github.com/yomorun/yomo/core/ylog" "github.com/yomorun/yomo/pkg/config" - "github.com/yomorun/yomo/pkg/logger" -) - -const ( - zipperLogPrefix = "\033[33m[yomo:zipper]\033[0m " + "golang.org/x/exp/slog" ) // Zipper is the orchestrator of yomo. There are two types of zipper: @@ -81,7 +78,6 @@ func NewZipperWithOptions(name string, opts ...Option) Zipper { func NewZipper(conf string) (Zipper, error) { config, err := config.ParseWorkflowConfig(conf) if err != nil { - logger.Errorf("%s[ERR] %v", zipperLogPrefix, err) return nil, err } // listening address @@ -92,6 +88,7 @@ func NewZipper(conf string) (Zipper, error) { zipper := createZipperServer(config.Name, options, config) // zipper workflow err = zipper.configWorkflow(config) + zipper.server.Logger().Info("Using config file", "file_path", conf) return zipper, err } @@ -124,14 +121,22 @@ func createZipperServer(name string, options *Options, cfg *config.WorkflowConfi return z } +func (z *zipper) Logger() *slog.Logger { + logger := z.server.Logger() + if logger == nil { + return ylog.Default() + } + + return logger +} + // ConfigWorkflow will read workflows from config files and register them to zipper. func (z *zipper) ConfigWorkflow(conf string) error { config, err := config.ParseWorkflowConfig(conf) if err != nil { - logger.Errorf("%s[ERR] %v", zipperLogPrefix, err) return err } - logger.Debugf("%sConfigWorkflow config=%+v", zipperLogPrefix, config) + z.Logger().Debug("ConfigWorkflow", "work_flow", config) return z.configWorkflow(config) } @@ -147,7 +152,7 @@ func (z *zipper) ConfigMesh(url string) error { return nil } - logger.Printf("%sDownloading mesh config...", zipperLogPrefix) + z.Logger().Debug("Downloading mesh config...") // download mesh conf res, err := http.Get(url) if err != nil { @@ -159,11 +164,10 @@ func (z *zipper) ConfigMesh(url string) error { var configs []config.MeshZipper err = decoder.Decode(&configs) if err != nil { - logger.Errorf("%sāœ… downloaded the Mesh config with err=%v", zipperLogPrefix, err) + z.Logger().Error("download Mesh config", err) return err } - - logger.Printf("%sāœ… Successfully downloaded the Mesh config. ", zipperLogPrefix) + z.Logger().Debug("download Mesh config successfully") if len(configs) == 0 { return nil @@ -186,7 +190,7 @@ func (z *zipper) ConfigMesh(url string) error { // ListenAndServe will start zipper service. func (z *zipper) ListenAndServe() error { - logger.Debugf("%sCreating Zipper Server ...", zipperLogPrefix) + z.Logger().Debug("Creating Zipper Server") // check downstream zippers for _, ds := range z.downstreamZippers { if dsZipper, ok := ds.(*zipper); ok { @@ -201,10 +205,9 @@ func (z *zipper) ListenAndServe() error { // AddDownstreamZipper will add downstream zipper. func (z *zipper) AddDownstreamZipper(downstream Zipper) error { - logger.Debugf("%sAddDownstreamZipper: %v", zipperLogPrefix, downstream) z.downstreamZippers = append(z.downstreamZippers, downstream) z.hasDownstreams = true - logger.Debugf("%scurrent downstreams: %d", zipperLogPrefix, len(z.downstreamZippers)) + z.Logger().Debug("add downstreams", "zipper", downstream.Addr(), "add_num_after", len(z.downstreamZippers)) return nil } @@ -231,16 +234,12 @@ func (z *zipper) Addr() string { // Close will close a connection. If zipper is Server, close the server. If zipper is Client, close the client. func (z *zipper) Close() error { if z.server != nil { - logger.Debugf("%sserver close()", zipperLogPrefix) if err := z.server.Close(); err != nil { - logger.Errorf("%sserver close(): %v", zipperLogPrefix, err) return err } } if z.client != nil { - logger.Debugf("%sclient close()", zipperLogPrefix) if err := z.client.Close(); err != nil { - logger.Errorf("%sclient close(): %v", zipperLogPrefix, err) return err } } diff --git a/zipper_notwindows.go b/zipper_notwindows.go index f195cf5de..b3853d259 100644 --- a/zipper_notwindows.go +++ b/zipper_notwindows.go @@ -4,13 +4,12 @@ package yomo import ( - "fmt" "os" "os/signal" "runtime" "syscall" - "github.com/yomorun/yomo/pkg/logger" + "github.com/yomorun/yomo/core/ylog" ) // initialize when zipper running as server. support inspection: @@ -21,21 +20,20 @@ func (z *zipper) init() { go func() { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGTERM, syscall.SIGUSR2, syscall.SIGUSR1, syscall.SIGINT) - logger.Printf("%sListening SIGUSR1, SIGUSR2, SIGTERM/SIGINT...", zipperLogPrefix) + ylog.Info("Listening SIGUSR1, SIGUSR2, SIGTERM/SIGINT...") for p1 := range c { - logger.Printf("Received signal: %s", p1) + ylog.Debug("Received signal", "signal", p1) if p1 == syscall.SIGTERM || p1 == syscall.SIGINT { - logger.Printf("graceful shutting down ... %s", p1) + ylog.Debug("graceful shutting down ...", "sign", p1) // waiting for the server to finish processing the current request z.Close() os.Exit(0) - // close(sgnl) } else if p1 == syscall.SIGUSR2 { var m runtime.MemStats runtime.ReadMemStats(&m) - fmt.Printf("\tNumGC = %v\n", m.NumGC) + ylog.Debug("runtime stats", "gc_nums", m.NumGC) } else if p1 == syscall.SIGUSR1 { - logger.Printf("print zipper stats(): %d", z.Stats()) + ylog.Debug("zipper stats", "zipper_stats", z.Stats()) } } }() diff --git a/zipper_windows.go b/zipper_windows.go index 26c9e41c3..a9482874a 100644 --- a/zipper_windows.go +++ b/zipper_windows.go @@ -8,7 +8,7 @@ import ( "os/signal" "syscall" - "github.com/yomorun/yomo/pkg/logger" + "github.com/yomorun/yomo/core/ylog" ) // initialize when zipper running as server. support inspection: @@ -17,11 +17,11 @@ func (z *zipper) init() { go func() { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGTERM, syscall.SIGINT) - logger.Printf("%sListening SIGTERM/SIGINT...", zipperLogPrefix) + ylog.Info("Listening SIGTERM/SIGINT...") for p1 := range c { - logger.Printf("Received signal: %s", p1) + ylog.Debug("Received signal", "signal", p1) if p1 == syscall.SIGTERM || p1 == syscall.SIGINT { - logger.Printf("graceful shutting down ... %s", p1) + ylog.Debug("graceful shutting down ...", "sign", p1) os.Exit(0) } } From 52772ccb0a3c2219dfa19300539e515da3b2776e Mon Sep 17 00:00:00 2001 From: woorui Date: Tue, 20 Dec 2022 15:42:42 +0800 Subject: [PATCH 11/16] fix(context): error log with --- core/context.go | 1 - core/server.go | 7 ++++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/context.go b/core/context.go index 1ca27afa4..009771d63 100644 --- a/core/context.go +++ b/core/context.go @@ -80,7 +80,6 @@ func (c *Context) WithFrame(f frame.Frame) *Context { authName: handshakeFrame.AuthName(), }) } - c.logger = c.logger.With("frame_type", f.Type().String()) c.Frame = f return c } diff --git a/core/server.go b/core/server.go index e24b9bbb4..56b84770a 100644 --- a/core/server.go +++ b/core/server.go @@ -210,16 +210,17 @@ func (s *Server) handshakeWithTimeout(conn quic.Connection, stream quic.Stream, // It returns a context for this stream handler. func (s *Server) handshake(conn quic.Connection, stream quic.Stream, fs frame.ReadWriter) (*Context, bool) { frm, err := fs.ReadFrame() + if err != nil { if err := fs.WriteFrame(frame.NewGoawayFrame(err.Error())); err != nil { - s.logger.Error("write to client GoawayFrame error", err, "remote_addr", conn.RemoteAddr().String()) + s.logger.Error("write to client GoawayFrame error", err) } return nil, false } if frm.Type() != frame.TagOfHandshakeFrame { if err := fs.WriteFrame(frame.NewGoawayFrame("handshake failed")); err != nil { - s.logger.Error("first frame is not handshakeFrame", err, "remote_addr", conn.RemoteAddr().String(), "frame_type", frm.Type().String()) + s.logger.Error("first frame is not handshakeFrame", err) } return nil, false } @@ -228,7 +229,7 @@ func (s *Server) handshake(conn quic.Connection, stream quic.Stream, fs frame.Re if err := s.handleHandshakeFrame(c); err != nil { if err := fs.WriteFrame(frame.NewGoawayFrame(err.Error())); err != nil { - c.logger.Error("write to client GoawayFrame error", err, "remote_addr", conn.RemoteAddr().String()) + s.logger.Error("write to client GoawayFrame error", err) } return nil, false } From 5be43f0fbaa672b656f274858bb4ddc2c286de6a Mon Sep 17 00:00:00 2001 From: woorui Date: Tue, 20 Dec 2022 16:00:28 +0800 Subject: [PATCH 12/16] fix: logger with usage --- core/client.go | 6 ++---- core/server.go | 2 +- example/0-basic/README.md | 2 +- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/core/client.go b/core/client.go index 3bef6be4c..291184ea9 100644 --- a/core/client.go +++ b/core/client.go @@ -50,7 +50,7 @@ func NewClient(appName string, connType ClientType, opts ...ClientOption) *Clien } clientID := id.New() - logger := slog.With("component", "client", "type", connType.String(), "client_id", clientID, "client_name", appName) + logger := option.logger.With("component", "client", "client_type", connType.String(), "client_id", clientID, "client_name", appName) return &Client{ name: appName, @@ -124,8 +124,6 @@ func (c *Client) connect(ctx context.Context, addr string) error { c.state = ConnStateConnected c.localAddr = c.conn.LocalAddr().String() - c.logger = slog.With("local_addr", c.localAddr, "remote_addr", c.RemoteAddr()) - c.logger.Debug("connected to YoMo-Zipper") // receiving frames @@ -211,7 +209,7 @@ func (c *Client) handleFrame() (bool, bool, error) { } } default: - c.logger.Warn("unknown or unsupported frame", "frame_type", frameType.String()) + c.logger.Warn("unknown or unsupported frame", "frame_type", frameType) } } } diff --git a/core/server.go b/core/server.go index 56b84770a..2e7d880e9 100644 --- a/core/server.go +++ b/core/server.go @@ -634,7 +634,7 @@ func authName(name string) string { func (s *Server) doConnectionCloseHandlers(qconn quic.Connection) { defer s.wg.Done() - s.logger.Debug("quic connection closed", "remote_addr", qconn.RemoteAddr()) + s.logger.Debug("quic connection closed") for _, h := range s.connectionCloseHandlers { h(qconn) } diff --git a/example/0-basic/README.md b/example/0-basic/README.md index 59a7d731e..6bb5f65b2 100644 --- a/example/0-basic/README.md +++ b/example/0-basic/README.md @@ -44,7 +44,7 @@ yomo serve -c ./workflow.yaml time=2022-12-12T18:12:15.735+08:00 level=INFO msg="Using config file" component=server name=Service file_path=../workflow.yaml time=2022-12-12T18:12:15.735+08:00 level=INFO msg="Listening SIGUSR1, SIGUSR2, SIGTERM/SIGINT..." -time=2022-12-12T18:12:15.738+08:00 level=INFO msg=Listening component=server name=Service local_addr=127.0.0.1:9000 pid=25220 quic="[v2 v1 draft-29]" auth_name=[none] +time=2022-12-12T18:12:15.738+08:00 level=INFO msg=Listening component=server name=Service pid=25220 quic="[v2 v1 draft-29]" auth_name=[none] ``` ### Run [stream-function](https://docs.yomo.run/stream-fn) From 017c7dafd27af3e7d73ec90ead9daab727c07197 Mon Sep 17 00:00:00 2001 From: woorui Date: Tue, 20 Dec 2022 16:32:43 +0800 Subject: [PATCH 13/16] fix: server log msg if handshake failed --- core/server.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/core/server.go b/core/server.go index 2e7d880e9..70a24ad75 100644 --- a/core/server.go +++ b/core/server.go @@ -211,27 +211,29 @@ func (s *Server) handshakeWithTimeout(conn quic.Connection, stream quic.Stream, func (s *Server) handshake(conn quic.Connection, stream quic.Stream, fs frame.ReadWriter) (*Context, bool) { frm, err := fs.ReadFrame() + c := newContext(conn, stream, s.logger).WithFrame(frm) + if err != nil { if err := fs.WriteFrame(frame.NewGoawayFrame(err.Error())); err != nil { s.logger.Error("write to client GoawayFrame error", err) } - return nil, false + return c, false } if frm.Type() != frame.TagOfHandshakeFrame { + c.logger.Info("client not do handshake right off") if err := fs.WriteFrame(frame.NewGoawayFrame("handshake failed")); err != nil { - s.logger.Error("first frame is not handshakeFrame", err) + s.logger.Error("write to client GoawayFrame error", err) } - return nil, false + return c, false } - c := newContext(conn, stream, s.logger).WithFrame(frm) - if err := s.handleHandshakeFrame(c); err != nil { + c.logger.Info("handshake failed", "error", err) if err := fs.WriteFrame(frame.NewGoawayFrame(err.Error())); err != nil { s.logger.Error("write to client GoawayFrame error", err) } - return nil, false + return c, false } return c, true From fea245ff253152320906434cf1eb8f345f44d758 Mon Sep 17 00:00:00 2001 From: woorui Date: Tue, 20 Dec 2022 17:01:43 +0800 Subject: [PATCH 14/16] fix(frame): read until if goaway bug --- core/frame/frame.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/core/frame/frame.go b/core/frame/frame.go index 5e9afb443..9faee658f 100644 --- a/core/frame/frame.go +++ b/core/frame/frame.go @@ -1,6 +1,7 @@ package frame import ( + "errors" "time" "github.com/yomorun/yomo/core/ylog" @@ -42,6 +43,10 @@ func ReadUntil(reader Reader, t Type, timeout time.Duration) (Frame, error) { errch <- err return } + if f.Type() == TagOfGoawayFrame { + errch <- errors.New(f.(*GoawayFrame).message) + return + } if f.Type() == t { frmch <- f return @@ -49,15 +54,13 @@ func ReadUntil(reader Reader, t Type, timeout time.Duration) (Frame, error) { } }() - for { - select { - case <-time.After(timeout): - return nil, ErrReadUntilTimeout{t: t} - case err := <-errch: - return nil, err - case frm := <-frmch: - return frm, nil - } + select { + case <-time.After(timeout): + return nil, ErrReadUntilTimeout{t: t} + case err := <-errch: + return nil, err + case frm := <-frmch: + return frm, nil } } From 3d984cfb1a19cc3f247275ef247b2308997d61b2 Mon Sep 17 00:00:00 2001 From: woorui Date: Tue, 20 Dec 2022 17:17:32 +0800 Subject: [PATCH 15/16] test: fix read until test --- core/frame/frame.go | 1 + core/frame/frame_test.go | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/frame/frame.go b/core/frame/frame.go index 9faee658f..414b25c4f 100644 --- a/core/frame/frame.go +++ b/core/frame/frame.go @@ -30,6 +30,7 @@ func (err ErrReadUntilTimeout) Error() string { // ReadUntil reads frame from reader, until the frame of the specified type is returned. // It returns ErrReadUntilTimeout error if frame not be returned after timeout duration. +// If read a goawayFrame, use goawayFrame.message as error and return it. func ReadUntil(reader Reader, t Type, timeout time.Duration) (Frame, error) { var ( errch = make(chan error) diff --git a/core/frame/frame_test.go b/core/frame/frame_test.go index b93a904d9..690ae5bb4 100644 --- a/core/frame/frame_test.go +++ b/core/frame/frame_test.go @@ -28,7 +28,7 @@ func TestReadUntil(t *testing.T) { { name: "read until timeout", fields: fields{ - frames: []Frame{NewGoawayFrame("goaway"), NewHandshakeAckFrame()}, + frames: []Frame{NewDataFrame(), NewHandshakeAckFrame()}, intervals: time.Second, }, args: args{ @@ -41,7 +41,7 @@ func TestReadUntil(t *testing.T) { { name: "read until success", fields: fields{ - frames: []Frame{NewGoawayFrame("goaway"), NewHandshakeAckFrame()}, + frames: []Frame{NewDataFrame(), NewHandshakeAckFrame()}, intervals: time.Microsecond, }, args: args{ From 0254d92ed660a98e672518a16c6962c1a0af43ad Mon Sep 17 00:00:00 2001 From: woorui Date: Tue, 20 Dec 2022 17:22:21 +0800 Subject: [PATCH 16/16] fix: frame test --- core/frame/frame_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/frame/frame_test.go b/core/frame/frame_test.go index 690ae5bb4..535d92fe8 100644 --- a/core/frame/frame_test.go +++ b/core/frame/frame_test.go @@ -42,11 +42,11 @@ func TestReadUntil(t *testing.T) { name: "read until success", fields: fields{ frames: []Frame{NewDataFrame(), NewHandshakeAckFrame()}, - intervals: time.Microsecond, + intervals: time.Millisecond, }, args: args{ t: TagOfHandshakeAckFrame, - timeout: time.Millisecond, + timeout: time.Second, }, wantErr: nil, wantFrame: NewHandshakeAckFrame(),