-
Notifications
You must be signed in to change notification settings - Fork 128
/
connector.go
92 lines (78 loc) 路 2.21 KB
/
connector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package core
import (
"sync"
"github.com/yomorun/yomo/core/frame"
"golang.org/x/exp/slog"
)
var _ Connector = &connector{}
// Connector is a interface to manage the connections and applications.
type Connector interface {
// Add a connection.
Add(connID string, conn Connection)
// Remove a connection.
Remove(connID string)
// Get a connection by connection id.
Get(connID string) Connection
// GetSnapshot gets the snapshot of all connections.
GetSnapshot() map[string]string
// GetSourceConns gets the connections by source observe tag.
GetSourceConns(sourceID string, tag frame.Tag) []Connection
// Clean the connector.
Clean()
}
type connector struct {
conns sync.Map
logger *slog.Logger
}
func newConnector(logger *slog.Logger) Connector {
return &connector{conns: sync.Map{}, logger: logger}
}
// Add a connection.
func (c *connector) Add(connID string, conn Connection) {
c.logger.Debug("connector add connection", "conn_id", connID)
c.conns.Store(connID, conn)
}
// Remove a connection.
func (c *connector) Remove(connID string) {
c.logger.Debug("connector remove connection", "conn_id", connID)
c.conns.Delete(connID)
}
// Get a connection by connection id.
func (c *connector) Get(connID string) Connection {
if conn, ok := c.conns.Load(connID); ok {
return conn.(Connection)
}
return nil
}
// GetSourceConns gets the source connection by tag.
func (c *connector) GetSourceConns(sourceID string, tag frame.Tag) []Connection {
conns := make([]Connection, 0)
c.conns.Range(func(key interface{}, val interface{}) bool {
conn := val.(Connection)
for _, v := range conn.ObserveDataTags() {
if v == tag && conn.ClientType() == ClientTypeSource && conn.ClientID() == sourceID {
conns = append(conns, conn)
}
}
return true
})
return conns
}
// GetSnapshot gets the snapshot of all connections.
func (c *connector) GetSnapshot() map[string]string {
result := make(map[string]string)
c.conns.Range(func(key interface{}, val interface{}) bool {
connID := key.(string)
conn := val.(Connection)
result[connID] = conn.Name()
return true
})
return result
}
// Clean the connector.
func (c *connector) Clean() {
c.conns.Range(func(key, value any) bool {
c.conns.Delete(key)
return true
})
}