Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor client option #400

Merged
merged 10 commits into from Dec 5, 2022
108 changes: 19 additions & 89 deletions core/client.go
Expand Up @@ -11,17 +11,14 @@ import (
"time"

"github.com/lucas-clemente/quic-go"
"github.com/yomorun/yomo/core/auth"
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/core/log"
"github.com/yomorun/yomo/core/yerr"
"github.com/yomorun/yomo/pkg/id"
"github.com/yomorun/yomo/pkg/logger"
pkgtls "github.com/yomorun/yomo/pkg/tls"
)

// ClientOption YoMo client options
type ClientOption func(*ClientOptions)
type ClientOption func(*clientOptions)

// Client is the abstraction of a YoMo-Client. a YoMo-Client can be
// Source, Upstream Zipper or StreamFunction.
Expand All @@ -38,36 +35,29 @@ type Client struct {
closefn func() // function to invoke when client closed
addr string // the address of server connected to
mu sync.Mutex
opts ClientOptions
opts *clientOptions
localAddr string // client local addr, it will be changed on reconnect
logger log.Logger
errc chan error
}

// NewClient creates a new YoMo-Client.
func NewClient(appName string, connType ClientType, opts ...ClientOption) *Client {
c := &Client{
option := defaultClientOption()

for _, o := range opts {
o(option)
}

return &Client{
name: appName,
clientID: id.New(),
clientType: connType,
state: ConnStateReady,
opts: ClientOptions{},
opts: option,
errc: make(chan error),
logger: option.logger,
}
c.Init(opts...)
once.Do(func() {
c.init()
})

return c
}

// Init the options.
func (c *Client) Init(opts ...ClientOption) error {
for _, o := range opts {
o(&c.opts)
}
return c.initOptions()
}

// Connect connects to YoMo-Zipper.
Expand All @@ -94,7 +84,7 @@ func (c *Client) connect(ctx context.Context, addr string) error {
c.state = ConnStateConnecting

// create quic connection
conn, err := quic.DialAddrContext(ctx, addr, c.opts.TLSConfig, c.opts.QuicConfig)
conn, err := quic.DialAddrContext(ctx, addr, c.opts.tlsConfig, c.opts.quicConfig)
if err != nil {
c.state = ConnStateDisconnected
return err
Expand All @@ -114,9 +104,9 @@ func (c *Client) connect(ctx context.Context, addr string) error {
c.name,
c.clientID,
byte(c.clientType),
c.opts.ObserveDataTags,
c.opts.Credential.Name(),
c.opts.Credential.Payload(),
c.opts.observeDataTags,
c.opts.credential.Name(),
c.opts.credential.Payload(),
)
if _, err := c.fs.WriteFrame(handshake); err != nil {
c.state = ConnStateDisconnected
Expand Down Expand Up @@ -302,73 +292,13 @@ func (c *Client) reconnect(ctx context.Context, addr string) {
}
}

func (c *Client) init() {
// // tracing
// _, _, err := tracing.NewTracerProvider(c.name)
// if err != nil {
// logger.Errorf("tracing: %v", err)
// }
}

// ServerAddr returns the address of the server.
func (c *Client) ServerAddr() string {
return c.addr
}

// initOptions init options defaults
func (c *Client) initOptions() error {
// logger
if c.logger == nil {
if c.opts.Logger != nil {
c.logger = c.opts.Logger
} else {
c.logger = logger.Default()
}
}
// observe tag list
if c.opts.ObserveDataTags == nil {
c.opts.ObserveDataTags = make([]frame.Tag, 0)
}
// credential
if c.opts.Credential == nil {
c.opts.Credential = auth.NewCredential("")
}
// tls config
if c.opts.TLSConfig == nil {
tc, err := pkgtls.CreateClientTLSConfig()
if err != nil {
c.logger.Errorf("%sCreateClientTLSConfig: %v", ClientLogPrefix, err)
return err
}
c.opts.TLSConfig = tc
}
// quic config
if c.opts.QuicConfig == nil {
c.opts.QuicConfig = &quic.Config{
Versions: []quic.VersionNumber{quic.Version2},
MaxIdleTimeout: time.Second * 40,
KeepAlivePeriod: time.Second * 20,
MaxIncomingStreams: 1000,
MaxIncomingUniStreams: 1000,
HandshakeIdleTimeout: time.Second * 3,
InitialStreamReceiveWindow: 1024 * 1024 * 2,
InitialConnectionReceiveWindow: 1024 * 1024 * 2,
TokenStore: quic.NewLRUTokenStore(10, 5),
// DisablePathMTUDiscovery: true,
}
}
// credential
if c.opts.Credential != nil {
c.logger.Printf("%suse credential: [%s]", ClientLogPrefix, c.opts.Credential.Name())
}

return nil
}
// RemoteAddr returns the address of the client.
func (c *Client) RemoteAddr() string { return c.addr }

// SetObserveDataTags set the data tag list that will be observed.
// Deprecated: use yomo.WithObserveDataTags instead
func (c *Client) SetObserveDataTags(tag ...frame.Tag) {
c.opts.ObserveDataTags = append(c.opts.ObserveDataTags, tag...)
c.opts.observeDataTags = append(c.opts.observeDataTags, tag...)
}

// Logger get client's logger instance, you can customize this using `yomo.WithLogger`
Expand Down Expand Up @@ -403,4 +333,4 @@ func (c *Client) State() ConnState {
}

// String returns client's name and addr format as a string.
func (c *Client) String() string { return fmt.Sprintf("name:%s, addr: %s", c.name, c.ServerAddr()) }
func (c *Client) String() string { return fmt.Sprintf("name:%s, addr: %s", c.name, c.RemoteAddr()) }
69 changes: 52 additions & 17 deletions core/client_options.go
Expand Up @@ -2,53 +2,88 @@ package core

import (
"crypto/tls"
"time"

"github.com/lucas-clemente/quic-go"
"github.com/yomorun/yomo/core/auth"
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/core/log"
"github.com/yomorun/yomo/pkg/logger"
pkgtls "github.com/yomorun/yomo/pkg/tls"
)

// ClientOptions are the options for YoMo client.
type ClientOptions struct {
ObserveDataTags []frame.Tag
QuicConfig *quic.Config
TLSConfig *tls.Config
Credential *auth.Credential
Logger log.Logger
// clientOptions are the options for YoMo client.
type clientOptions struct {
observeDataTags []frame.Tag
quicConfig *quic.Config
tlsConfig *tls.Config
credential *auth.Credential
logger log.Logger
}

func defaultClientOption() *clientOptions {
logger := logger.Default()

defalutQuicConfig := &quic.Config{
Versions: []quic.VersionNumber{quic.Version2},
MaxIdleTimeout: time.Second * 40,
KeepAlivePeriod: time.Second * 20,
MaxIncomingStreams: 1000,
MaxIncomingUniStreams: 1000,
HandshakeIdleTimeout: time.Second * 3,
InitialStreamReceiveWindow: 1024 * 1024 * 2,
InitialConnectionReceiveWindow: 1024 * 1024 * 2,
TokenStore: quic.NewLRUTokenStore(10, 5),
}

opts := &clientOptions{
observeDataTags: make([]frame.Tag, 0),
quicConfig: defalutQuicConfig,
tlsConfig: pkgtls.MustCreateClientTLSConfig(),
credential: auth.NewCredential(""),
logger: logger,
}

if opts.credential != nil {
logger.Printf("%suse credential: [%s]", ClientLogPrefix, opts.credential.Name())
}

return opts
}

// WithObserveDataTags sets data tag list for the client.
func WithObserveDataTags(tags ...frame.Tag) ClientOption {
return func(o *ClientOptions) {
o.ObserveDataTags = tags
return func(o *clientOptions) {
o.observeDataTags = tags
}
}

// WithCredential sets the client credential method (used by client).
func WithCredential(payload string) ClientOption {
return func(o *ClientOptions) {
o.Credential = auth.NewCredential(payload)
return func(o *clientOptions) {
o.credential = auth.NewCredential(payload)
}
}

// WithClientTLSConfig sets tls config for the client.
func WithClientTLSConfig(tc *tls.Config) ClientOption {
return func(o *ClientOptions) {
o.TLSConfig = tc
return func(o *clientOptions) {
if tc != nil {
o.tlsConfig = tc
}
}
}

// WithClientQuicConfig sets quic config for the client.
func WithClientQuicConfig(qc *quic.Config) ClientOption {
return func(o *ClientOptions) {
o.QuicConfig = qc
return func(o *clientOptions) {
o.quicConfig = qc
}
}

// WithLogger sets logger for the client.
func WithLogger(logger log.Logger) ClientOption {
return func(o *ClientOptions) {
o.Logger = logger
return func(o *clientOptions) {
o.logger = logger
}
}
5 changes: 0 additions & 5 deletions core/core.go
Expand Up @@ -2,14 +2,9 @@ package core

import (
"math/rand"
"sync"
"time"
)

var (
once sync.Once
)

// ConnState represents the state of the connection.
type ConnState = string

Expand Down
36 changes: 2 additions & 34 deletions core/stream_parser.go
Expand Up @@ -14,23 +14,13 @@ func ParseFrame(stream io.Reader) (frame.Frame, error) {
if err != nil {
return nil, err
}
// if len(buf) > 512 {
// logger.Debugf("%s馃敆 parsed out total %d bytes: \n\thead 64 bytes are: [%# x], \n\ttail 64 bytes are: [%#x]", ParseFrameLogPrefix, len(buf), buf[0:64], buf[len(buf)-64:])
// } else {
// logger.Debugf("%s馃敆 parsed out: [%# x]", ParseFrameLogPrefix, buf)
// }

frameType := buf[0]
// determine the frame type
switch frameType {
case 0x80 | byte(frame.TagOfHandshakeFrame):
handshakeFrame, err := readHandshakeFrame(buf)
// logger.Debugf("%sHandshakeFrame: name=%s, type=%s", ParseFrameLogPrefix, handshakeFrame.Name, handshakeFrame.Type())
return handshakeFrame, err
return frame.DecodeToHandshakeFrame(buf)
case 0x80 | byte(frame.TagOfDataFrame):
data, err := readDataFrame(buf)
// logger.Debugf("%sDataFrame: tid=%s, tag=%#x, len(carriage)=%d", ParseFrameLogPrefix, data.TransactionID(), data.GetDataTag(), len(data.GetCarriage()))
return data, err
return frame.DecodeToDataFrame(buf)
case 0x80 | byte(frame.TagOfAcceptedFrame):
return frame.DecodeToAcceptedFrame(buf)
case 0x80 | byte(frame.TagOfRejectedFrame):
Expand All @@ -43,25 +33,3 @@ func ParseFrame(stream io.Reader) (frame.Frame, error) {
return nil, fmt.Errorf("unknown frame type, buf[0]=%#x", buf[0])
}
}

func readHandshakeFrame(buf []byte) (*frame.HandshakeFrame, error) {
// parse to HandshakeFrame
// handshake, err := frame.DecodeToHandshakeFrame(buf)
// if err != nil {
// logger.Errorf("%sreadHandshakeFrame: err=%v", ParseFrameLogPrefix, err)
// return nil
// }
// return handshake
return frame.DecodeToHandshakeFrame(buf)
}

func readDataFrame(buf []byte) (*frame.DataFrame, error) {
// parse to DataFrame
// data, err := frame.DecodeToDataFrame(buf)
// if err != nil {
// logger.Errorf("%sreadDataFrame: err=%v", ParseFrameLogPrefix, err)
// return err
// }
// return data
return frame.DecodeToDataFrame(buf)
}
9 changes: 9 additions & 0 deletions pkg/tls/tls.go
Expand Up @@ -51,6 +51,15 @@ func CreateServerTLSConfig(host string) (*tls.Config, error) {
}, nil
}

// MustCreateClientTLSConfig creates client tls config, It is panic If error here.
func MustCreateClientTLSConfig() *tls.Config {
conf, err := CreateClientTLSConfig()
if err != nil {
panic(err)
}
return conf
}

// CreateClientTLSConfig creates client tls config.
func CreateClientTLSConfig() (*tls.Config, error) {
// ca pool
Expand Down