Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refactor Connector to make code more expressive #329

Merged
merged 14 commits into from May 26, 2022
30 changes: 30 additions & 0 deletions app_info.go
@@ -0,0 +1,30 @@
package yomo

import (
"github.com/yomorun/yomo/core"
"github.com/yomorun/yomo/core/frame"
)

type appInfo struct{}
wujunzhuo marked this conversation as resolved.
Show resolved Hide resolved

func (a *appInfo) Encode() []byte {
return nil
}

type appInfoBuilder struct {
a *appInfo
}

func newAppInfoBuilder() core.AppInfoBuilder {
return &appInfoBuilder{
a: &appInfo{},
}
}

func (builder *appInfoBuilder) Build(f *frame.HandshakeFrame) (core.AppInfo, error) {
return builder.a, nil
}

func (builder *appInfoBuilder) Decode(buf []byte) (core.AppInfo, error) {
return builder.a, nil
}
17 changes: 17 additions & 0 deletions core/app_info.go
@@ -0,0 +1,17 @@
package core

import "github.com/yomorun/yomo/core/frame"

// AppInfo is used for customizing extensions of an application.
type AppInfo interface {
// Encode is the serialize method
Encode() []byte
}

// AppInfoBuilder is the builder for AppInfo
type AppInfoBuilder interface {
// Build will return an AppInfo instance according to the frame passed in
Build(f *frame.HandshakeFrame) (AppInfo, error)
// Decode is the deserialize method
Decode(buf []byte) (AppInfo, error)
}
67 changes: 67 additions & 0 deletions core/connection.go
@@ -0,0 +1,67 @@
package core

import (
"io"
"sync"

"github.com/yomorun/yomo/core/frame"
)

// Connection wraps the specific io connections (typically quic.Connection) to transfer y3 frames
type Connection interface {
io.Closer

// Name returns the name of the connection, which is set by clients
Name() string
// ClientType returns the type of the client (Source | SFN | UpstreamZipper)
ClientType() ClientType
// AppInfo returns the extra application info
AppInfo() AppInfo
// Write should goroutine-safely send y3 frames to peer side
Write(f frame.Frame) error
}

type connection struct {
name string
clientType ClientType
appInfo AppInfo
stream io.ReadWriteCloser
mu sync.Mutex
}

func newConnection(name string, clientType ClientType, appInfo AppInfo, stream io.ReadWriteCloser) Connection {
return &connection{
name: name,
clientType: clientType,
appInfo: appInfo,
stream: stream,
}
}

// Close implements io.Close interface
func (c *connection) Close() error {
return c.stream.Close()
}

// Name returns the name of the connection, which is set by clients
func (c *connection) Name() string {
return c.name
}

// ClientType returns the type of the connection (Source | SFN | UpstreamZipper)
func (c *connection) ClientType() ClientType {
return c.clientType
}

// AppInfo returns the extra application info
func (c *connection) AppInfo() AppInfo {
return c.appInfo
}

// Write should goroutine-safely send y3 frames to peer side
func (c *connection) Write(f frame.Frame) error {
c.mu.Lock()
defer c.mu.Unlock()
_, err := c.stream.Write(f.Encode())
return err
}
174 changes: 12 additions & 162 deletions core/connector.go
@@ -1,217 +1,67 @@
package core

import (
"fmt"
"io"
"math/rand"
"sync"

"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/pkg/logger"
)

type app struct {
name string // app name
observed []byte // data tags
}

// func (a *app) ID() string {
// return a.id
// }

func (a *app) Name() string {
return a.name
}

var _ Connector = &connector{}

// Connector is a interface to manage the connections and applications.
type Connector interface {
// Add a connection.
Add(connID string, stream io.ReadWriteCloser)
Add(connID string, conn Connection)
// Remove a connection.
Remove(connID string)
// Get a connection by connection id.
Get(connID string) io.ReadWriteCloser
// GetConnIDs gets the connection ids by name and tag.
GetConnIDs(name string, tags byte) []string
// Write a Frame to a connection.
Write(f frame.Frame, toID string) error
Get(connID string) Connection
// GetSnapshot gets the snapshot of all connections.
GetSnapshot() map[string]io.ReadWriteCloser

// App gets the app by connID.
App(connID string) (*app, bool)
// AppID gets the ID of app by connID.
// AppID(connID string) (string, bool)
// AppName gets the name of app by connID.
AppName(connID string) (string, bool)
// LinkApp links the app and connection.
LinkApp(connID string, name string, observed []byte)
// UnlinkApp removes the app by connID.
UnlinkApp(connID string, name string)
// ExistsApp check app exists
ExistsApp(name string) bool

GetSnapshot() map[string]Connection
// Clean the connector.
Clean()
}

type connector struct {
conns sync.Map
apps sync.Map
mu sync.Mutex
}

func newConnector() Connector {
return &connector{
conns: sync.Map{},
apps: sync.Map{},
mu: sync.Mutex{},
}
return &connector{conns: sync.Map{}}
}

// Add a connection.
func (c *connector) Add(connID string, stream io.ReadWriteCloser) {
func (c *connector) Add(connID string, conn Connection) {
logger.Debugf("%sconnector add: connID=%s", ServerLogPrefix, connID)
c.conns.Store(connID, stream)
c.conns.Store(connID, conn)
}

// Remove a connection.
func (c *connector) Remove(connID string) {
logger.Debugf("%sconnector remove: connID=%s", ServerLogPrefix, connID)
c.conns.Delete(connID)
// c.funcs.Delete(connID)
c.apps.Delete(connID)
}

// Get a connection by connection id.
func (c *connector) Get(connID string) io.ReadWriteCloser {
func (c *connector) Get(connID string) Connection {
logger.Debugf("%sconnector get connection: connID=%s", ServerLogPrefix, connID)
if stream, ok := c.conns.Load(connID); ok {
return stream.(io.ReadWriteCloser)
if conn, ok := c.conns.Load(connID); ok {
return conn.(Connection)
}
return nil
}

// App gets the app by connID.
func (c *connector) App(connID string) (*app, bool) {
if result, found := c.apps.Load(connID); found {
app, ok := result.(*app)
if ok {
logger.Debugf("%sconnector get app=%s, connID=%s", ServerLogPrefix, app.name, connID)
return app, true
}
logger.Warnf("%sconnector get app convert fails, connID=%s", ServerLogPrefix, connID)
return nil, false
}
logger.Warnf("%sconnector get app is nil, connID=%s", ServerLogPrefix, connID)
return nil, false
}

// AppName gets the name of app by connID.
func (c *connector) AppName(connID string) (string, bool) {
if app, ok := c.App(connID); ok {
return app.name, true
}
return "", false
}

// GetConnIDs gets the connection ids by name and tag.
func (c *connector) GetConnIDs(name string, tag byte) []string {
connIDs := make([]string, 0)

c.apps.Range(func(key interface{}, val interface{}) bool {
app := val.(*app)
if app.name == name {
for _, v := range app.observed {
if v == tag {
connIDs = append(connIDs, key.(string))
break
}
}
}
return true
})

if n := len(connIDs); n > 1 {
index := rand.Intn(n)
return connIDs[index : index+1]
}

return connIDs
}

// Write a DataFrame to a connection.
func (c *connector) Write(f frame.Frame, toID string) error {
targetStream := c.Get(toID)
if targetStream == nil {
logger.Warnf("%swill write to: [%s], target stream is nil", ServerLogPrefix, toID)
return fmt.Errorf("target[%s] stream is nil", toID)
}
c.mu.Lock()
_, err := targetStream.Write(f.Encode())
c.mu.Unlock()
return err
}

// GetSnapshot gets the snapshot of all connections.
func (c *connector) GetSnapshot() map[string]io.ReadWriteCloser {
result := make(map[string]io.ReadWriteCloser)
func (c *connector) GetSnapshot() map[string]Connection {
result := make(map[string]Connection)
c.conns.Range(func(key interface{}, val interface{}) bool {
result[key.(string)] = val.(io.ReadWriteCloser)
result[key.(string)] = val.(Connection)
return true
})
return result
}

// LinkApp links the app and connection.
func (c *connector) LinkApp(connID string, name string, observed []byte) {
logger.Debugf("%sconnector link application: connID[%s] --> app[%s]", ServerLogPrefix, connID, name)
c.apps.Store(connID, &app{name, observed})
}

// UnlinkApp removes the app by connID.
func (c *connector) UnlinkApp(connID string, name string) {
logger.Debugf("%sconnector unlink application: connID[%s] x-> app[%s]", ServerLogPrefix, connID, name)
c.apps.Delete(connID)
}

// ExistsApp check app exists
func (c *connector) ExistsApp(name string) bool {
var found bool
c.apps.Range(func(key interface{}, val interface{}) bool {
app := val.(*app)
if app.name == name {
found = true
return false
}
return true
})

return found
}

// func (c *connector) RemoveApp(appID string) {
// logger.Debugf("%sconnector unlink application: connID[%s] x-> app[%s]", ServerLogPrefix, connID, appID)
// c.apps.Range(func(key interface{},val interface{})bool{
// return true
// })
// c.rapps.Delete(appID)
// }

// func (c *connector) AppConns(appID string) []string {
// conns := make([]string, 0)
// c.apps.Range(func(key interface{},val interface{})bool{
// if val.(string)==appID{
// conns=append(conns,key.(string))
// }
// })
// return conns
// }

// Clean the connector.
func (c *connector) Clean() {
c.conns = sync.Map{}
c.apps = sync.Map{}
}
12 changes: 6 additions & 6 deletions core/frame/frame.go
Expand Up @@ -16,7 +16,7 @@ const (
TagOfMetaFrame Type = 0x2F
TagOfMetadata Type = 0x03
TagOfTransactionID Type = 0x01
TagOfIssuer Type = 0x02
TagOfAppInfo Type = 0x02
// PayloadFrame of DataFrame
TagOfPayloadFrame Type = 0x2E

Expand All @@ -31,13 +31,13 @@ const (
TagOfHandshakeAuthPayload Type = 0x05
TagOfHandshakeObserveDataTags Type = 0x06

TagOfPingFrame Type = 0x3C
TagOfPongFrame Type = 0x3B
TagOfAcceptedFrame Type = 0x3A
TagOfRejectedFrame Type = 0x39
TagOfPingFrame Type = 0x3C
TagOfPongFrame Type = 0x3B
TagOfAcceptedFrame Type = 0x3A
TagOfRejectedFrame Type = 0x39
TagOfRejectedMessage Type = 0x02
// GoawayFrame
TagOfGoawayFrame Type = 0x30
TagOfGoawayFrame Type = 0x30
TagOfGoawayCode Type = 0x01
TagOfGoawayMessage Type = 0x02
)
Expand Down