Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: backflow(update) #330

Merged
merged 19 commits into from Jun 7, 2022
Merged
50 changes: 38 additions & 12 deletions core/client.go
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/core/log"
"github.com/yomorun/yomo/core/yerr"
"github.com/yomorun/yomo/pkg/id"
"github.com/yomorun/yomo/pkg/logger"
pkgtls "github.com/yomorun/yomo/pkg/tls"
)
Expand All @@ -27,13 +28,15 @@ type ConnState = string
// Client is the abstraction of a YoMo-Client. a YoMo-Client can be
// Source, Upstream Zipper or StreamFunction.
type Client struct {
name string // name of the client
clientType ClientType // type of the connection
conn quic.Connection // quic connection
stream quic.Stream // quic stream
state ConnState // state of the connection
processor func(*frame.DataFrame) // functions to invoke when data arrived
addr string // the address of server connected to
name string // name of the client
clientID string // id of the client
clientType ClientType // type of the connection
conn quic.Connection // quic connection
stream quic.Stream // quic stream
state ConnState // state of the connection
processor func(*frame.DataFrame) // functions to invoke when data arrived
receiver func(*frame.BackflowFrame) // functions to invoke when data is processed
addr string // the address of server connected to
mu sync.Mutex
opts ClientOptions
localAddr string // client local addr, it will be changed on reconnect
Expand All @@ -47,6 +50,7 @@ type Client struct {
func NewClient(appName string, connType ClientType, opts ...ClientOption) *Client {
c := &Client{
name: appName,
clientID: id.New(),
clientType: connType,
state: ConnStateReady,
opts: ClientOptions{},
Expand Down Expand Up @@ -111,6 +115,7 @@ func (c *Client) connect(ctx context.Context, addr string) error {
// send handshake
handshake := frame.NewHandshakeFrame(
c.name,
c.clientID,
byte(c.clientType),
c.opts.ObserveDataTags,
c.opts.Credential.Name(),
Expand All @@ -124,7 +129,7 @@ func (c *Client) connect(ctx context.Context, addr string) error {
c.state = ConnStateConnected
c.localAddr = c.conn.LocalAddr().String()

c.logger.Printf("%s❤️ [%s](%s) is connected to YoMo-Zipper %s", ClientLogPrefix, c.name, c.localAddr, addr)
c.logger.Printf("%s❤️ [%s][%s](%s) is connected to YoMo-Zipper %s", ClientLogPrefix, c.name, c.clientID, c.localAddr, addr)

// receiving frames
go c.handleFrame()
Expand Down Expand Up @@ -221,7 +226,7 @@ func (c *Client) handleFrame() {
case frame.TagOfDataFrame: // DataFrame carries user's data
if v, ok := f.(*frame.DataFrame); ok {
c.setState(ConnStateTransportData)
c.logger.Debugf("%sreceive DataFrame, tag=%# x, tid=%s, carry=%# x", ClientLogPrefix, v.GetDataTag(), v.TransactionID(), v.GetCarriage())
c.logger.Debugf("%sreceive DataFrame, tag=%#x, tid=%s, carry=%# x", ClientLogPrefix, v.GetDataTag(), v.TransactionID(), v.GetCarriage())
if c.processor == nil {
c.logger.Warnf("%sprocessor is nil", ClientLogPrefix)
} else {
Expand All @@ -230,6 +235,16 @@ func (c *Client) handleFrame() {
c.processor(v)
}
}
case frame.TagOfBackflowFrame:
if v, ok := f.(*frame.BackflowFrame); ok {
c.logger.Debugf("%sreceive BackflowFrame, tag=%#x, carry=%# x", ClientLogPrefix, v.GetDataTag(), v.GetCarriage())
if c.receiver == nil {
c.logger.Warnf("%sreceiver is nil", ClientLogPrefix)
} else {
c.setState(ConnStateBackflow)
c.receiver(v)
}
}
default:
c.logger.Errorf("%sunknown signal", ClientLogPrefix)
}
Expand All @@ -239,7 +254,7 @@ func (c *Client) handleFrame() {
// Close the client.
func (c *Client) Close() (err error) {
if c.conn != nil {
c.logger.Printf("%sclose the connection, name:%s, addr:%s", ClientLogPrefix, c.name, c.conn.RemoteAddr().String())
c.logger.Printf("%sclose the connection, name:%s, id:%s, addr:%s", ClientLogPrefix, c.name, c.clientID, c.conn.RemoteAddr().String())
}
if c.stream != nil {
err = c.stream.Close()
Expand Down Expand Up @@ -333,6 +348,12 @@ func (c *Client) SetDataFrameObserver(fn func(*frame.DataFrame)) {
c.logger.Debugf("%sSetDataFrameObserver(%v)", ClientLogPrefix, c.processor)
}

// SetBackflowFrameObserver sets the backflow frame handler.
func (c *Client) SetBackflowFrameObserver(fn func(*frame.BackflowFrame)) {
c.receiver = fn
c.logger.Debugf("%sSetBackflowFrameObserver(%v)", ClientLogPrefix, c.receiver)
}

// reconnect the connection between client and server.
func (c *Client) reconnect(ctx context.Context, addr string) {
t := time.NewTicker(1 * time.Second)
Expand All @@ -348,10 +369,10 @@ func (c *Client) reconnect(ctx context.Context, addr string) {
return
case <-t.C:
if c.getState() == ConnStateDisconnected {
c.logger.Printf("%s[%s](%s) is reconnecting to YoMo-Zipper %s...", ClientLogPrefix, c.name, c.localAddr, addr)
c.logger.Printf("%s[%s][%s](%s) is reconnecting to YoMo-Zipper %s...", ClientLogPrefix, c.name, c.clientID, c.localAddr, addr)
err := c.connect(ctx, addr)
if err != nil {
c.logger.Errorf("%s[%s](%s) reconnect error:%v", ClientLogPrefix, c.name, c.localAddr, err)
c.logger.Errorf("%s[%s][%s](%s) reconnect error:%v", ClientLogPrefix, c.name, c.clientID, c.localAddr, err)
}
}
}
Expand Down Expand Up @@ -443,3 +464,8 @@ func (c *Client) SetErrorHandler(fn func(err error)) {
}()
}
}

// ClientID return the client ID
func (c *Client) ClientID() string {
return c.clientID
}
20 changes: 19 additions & 1 deletion core/connection.go
Expand Up @@ -13,26 +13,34 @@ type Connection interface {

// Name returns the name of the connection, which is set by clients
Name() string
// ClientID connection client ID
ClientID() string
// ClientType returns the type of the client (Source | SFN | UpstreamZipper)
ClientType() ClientType
// Metadata returns the extra info of the application
Metadata() Metadata
// Write should goroutine-safely send y3 frames to peer side
Write(f frame.Frame) error
// ObserveDataTags observed data tags
ObserveDataTags() []byte
}

type connection struct {
name string
clientType ClientType
metadata Metadata
stream io.ReadWriteCloser
clientID string
observed []byte // observed data tags
mu sync.Mutex
}

func newConnection(name string, clientType ClientType, metadata Metadata, stream io.ReadWriteCloser) Connection {
func newConnection(name string, clientID string, clientType ClientType, metadata Metadata, stream io.ReadWriteCloser, observed []byte) Connection {
return &connection{
name: name,
clientID: clientID,
clientType: clientType,
observed: observed,
metadata: metadata,
stream: stream,
}
Expand Down Expand Up @@ -65,3 +73,13 @@ func (c *connection) Write(f frame.Frame) error {
_, err := c.stream.Write(f.Encode())
return err
}

// ObserveDataTags observed data tags
func (c *connection) ObserveDataTags() []byte {
return c.observed
}

// ClientID connection client ID
func (c *connection) ClientID() string {
return c.clientID
}
19 changes: 19 additions & 0 deletions core/connector.go
Expand Up @@ -18,6 +18,8 @@ type Connector interface {
Get(connID string) Connection
// GetSnapshot gets the snapshot of all connections.
GetSnapshot() map[string]string
// GetSourceConns gets the connections by source observe tags.
GetSourceConns(sourceID string, tags byte) []Connection
// Clean the connector.
Clean()
}
Expand Down Expand Up @@ -51,6 +53,23 @@ func (c *connector) Get(connID string) Connection {
return nil
}

// GetSourceConns gets the source connection by tag.
func (c *connector) GetSourceConns(sourceID string, tag byte) []Connection {
conns := make([]Connection, 0)

c.conns.Range(func(key interface{}, val interface{}) bool {
conn := val.(Connection)
for _, v := range conn.ObserveDataTags() {
if v == tag && conn.ClientType() == ClientTypeSource && conn.ClientID() == sourceID {
conns = append(conns, conn)
}
}
return true
})

return conns
}

// GetSnapshot gets the snapshot of all connections.
func (c *connector) GetSnapshot() map[string]string {
result := make(map[string]string)
Expand Down
1 change: 1 addition & 0 deletions core/core.go
Expand Up @@ -25,6 +25,7 @@ const (
ConnStateAborted ConnState = "Aborted"
ConnStateClosed ConnState = "Closed" // close connection by server
ConnStateGoaway ConnState = "Goaway"
ConnStateBackflow ConnState = "Backflow"
)

// Prefix is the prefix for logger.
Expand Down
70 changes: 70 additions & 0 deletions core/frame/backflow_frame.go
@@ -0,0 +1,70 @@
package frame

import (
"github.com/yomorun/y3"
)

// BackflowFrame is a Y3 encoded bytes
// It's used to receive stream function processed result
type BackflowFrame struct {
Tag byte
Carriage []byte
}

// NewBackflowFrame creates a new BackflowFrame with a given tag and carriage
func NewBackflowFrame(tag byte, carriage []byte) *BackflowFrame {
return &BackflowFrame{
Tag: tag,
Carriage: carriage,
}
}

// Type gets the type of Frame.
func (f *BackflowFrame) Type() Type {
return TagOfBackflowFrame
}

// SetCarriage sets the user's raw data
func (f *BackflowFrame) SetCarriage(buf []byte) *BackflowFrame {
f.Carriage = buf
return f
}

// Encode to Y3 encoded bytes
func (f *BackflowFrame) Encode() []byte {
carriage := y3.NewPrimitivePacketEncoder(f.Tag)
carriage.SetBytesValue(f.Carriage)

node := y3.NewNodePacketEncoder(byte(TagOfBackflowFrame))
node.AddPrimitivePacket(carriage)

return node.Encode()
}

// GetDataTag return the Tag of user's data
func (f *BackflowFrame) GetDataTag() byte {
return f.Tag
}

// GetCarriage return data
func (f *BackflowFrame) GetCarriage() []byte {
return f.Carriage
}

// DecodeToBackflowFrame decodes Y3 encoded bytes to BackflowFrame
func DecodeToBackflowFrame(buf []byte) (*BackflowFrame, error) {
nodeBlock := y3.NodePacket{}
_, err := y3.DecodeToNodePacket(buf, &nodeBlock)
if err != nil {
return nil, err
}

payload := &BackflowFrame{}
for _, v := range nodeBlock.PrimitivePackets {
payload.Tag = v.SeqID()
payload.Carriage = v.GetValBuf()
break
}

return payload, nil
}
10 changes: 10 additions & 0 deletions core/frame/data_frame.go
Expand Up @@ -60,6 +60,16 @@ func (d *DataFrame) GetDataTag() byte {
return d.payloadFrame.Tag
}

// SetSourceID set the source ID.
func (d *DataFrame) SetSourceID(sourceID string) {
d.metaFrame.SetSourceID(sourceID)
}

// SourceID returns source ID
func (d *DataFrame) SourceID() string {
return d.metaFrame.SourceID()
}

// Encode return Y3 encoded bytes of `DataFrame`
func (d *DataFrame) Encode() []byte {
data := y3.NewNodePacketEncoder(byte(d.Type()))
Expand Down
5 changes: 3 additions & 2 deletions core/frame/data_frame_test.go
Expand Up @@ -13,10 +13,11 @@ func TestDataFrameEncode(t *testing.T) {

tidbuf := []byte(d.TransactionID())
result := []byte{
0x80 | byte(TagOfDataFrame), byte(len(tidbuf) + 4 + 8),
0x80 | byte(TagOfMetaFrame), byte(len(tidbuf) + 2),
0x80 | byte(TagOfDataFrame), byte(len(tidbuf) + 4 + 8 + 2),
0x80 | byte(TagOfMetaFrame), byte(len(tidbuf) + 2 + 2),
byte(TagOfTransactionID), byte(len(tidbuf))}
result = append(result, tidbuf...)
result = append(result, byte(TagOfSourceID), 0x0)
result = append(result, 0x80|byte(TagOfPayloadFrame), 0x06,
userDataTag, 0x04, 0x79, 0x6F, 0x6D, 0x6F)
assert.Equal(t, result, d.Encode())
Expand Down
15 changes: 9 additions & 6 deletions core/frame/frame.go
Expand Up @@ -16,16 +16,17 @@ const (
TagOfMetaFrame Type = 0x2F
TagOfMetadata Type = 0x03
TagOfTransactionID Type = 0x01
TagOfSourceID Type = 0x02
// PayloadFrame of DataFrame
TagOfPayloadFrame Type = 0x2E
TagOfPayloadFrame Type = 0x2E
TagOfBackflowFrame Type = 0x2D

TagOfTokenFrame Type = 0x3E
// HandshakeFrame
TagOfHandshakeFrame Type = 0x3D
TagOfHandshakeName Type = 0x01
TagOfHandshakeType Type = 0x02
// TagOfHandshakeAppID Type = 0x03
// TagOfHandshakeAuthType Type = 0x04
TagOfHandshakeFrame Type = 0x3D
TagOfHandshakeName Type = 0x01
TagOfHandshakeType Type = 0x02
TagOfHandshakeID Type = 0x03
TagOfHandshakeAuthName Type = 0x04
TagOfHandshakeAuthPayload Type = 0x05
TagOfHandshakeObserveDataTags Type = 0x06
Expand Down Expand Up @@ -71,6 +72,8 @@ func (f Type) String() string {
return "RejectedFrame"
case TagOfGoawayFrame:
return "GoawayFrame"
case TagOfBackflowFrame:
return "BackflowFrame"
case TagOfMetaFrame:
return "MetaFrame"
case TagOfPayloadFrame:
Expand Down