-
Notifications
You must be signed in to change notification settings - Fork 1k
/
connectedness_event_emitter.go
145 lines (131 loc) · 4.18 KB
/
connectedness_event_emitter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package swarm
import (
"context"
"sync"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
)
// connectednessEventEmitter emits PeerConnectednessChanged events.
// We ensure that for any peer we connected to we always sent atleast 1 NotConnected Event after
// the peer disconnects. This is because peers can observe a connection before they are notified
// of the connection by a peer connectedness changed event.
type connectednessEventEmitter struct {
mx sync.RWMutex
// newConns is the channel that holds the peerIDs we recently connected to
newConns chan peer.ID
removeConnsMx sync.Mutex
// removeConns is a slice of peerIDs we have recently closed connections to
removeConns []peer.ID
// lastEvent is the last connectedness event sent for a particular peer.
lastEvent map[peer.ID]network.Connectedness
// connectedness is the function that gives the peers current connectedness state
connectedness func(peer.ID) network.Connectedness
// emitter is the PeerConnectednessChanged event emitter
emitter event.Emitter
wg sync.WaitGroup
removeConnNotif chan struct{}
ctx context.Context
cancel context.CancelFunc
}
func newConnectednessEventEmitter(connectedness func(peer.ID) network.Connectedness, emitter event.Emitter) *connectednessEventEmitter {
ctx, cancel := context.WithCancel(context.Background())
c := &connectednessEventEmitter{
newConns: make(chan peer.ID, 32),
lastEvent: make(map[peer.ID]network.Connectedness),
removeConnNotif: make(chan struct{}, 1),
connectedness: connectedness,
emitter: emitter,
ctx: ctx,
cancel: cancel,
}
c.wg.Add(1)
go c.runEmitter()
return c
}
func (c *connectednessEventEmitter) AddConn(p peer.ID) {
c.mx.RLock()
defer c.mx.RUnlock()
if c.ctx.Err() != nil {
return
}
c.newConns <- p
}
func (c *connectednessEventEmitter) RemoveConn(p peer.ID) {
c.mx.RLock()
defer c.mx.RUnlock()
if c.ctx.Err() != nil {
return
}
c.removeConnsMx.Lock()
// This queue is roughly bounded by the total number of added connections we
// have. If consumers of connectedness events are slow, we apply
// backpressure to AddConn operations.
//
// We purposefully don't block/backpressure here to avoid deadlocks, since it's
// reasonable for a consumer of the event to want to remove a connection.
c.removeConns = append(c.removeConns, p)
c.removeConnsMx.Unlock()
select {
case c.removeConnNotif <- struct{}{}:
default:
}
}
func (c *connectednessEventEmitter) Close() {
c.cancel()
c.wg.Wait()
}
func (c *connectednessEventEmitter) runEmitter() {
defer c.wg.Done()
for {
select {
case p := <-c.newConns:
c.notifyPeer(p, true)
case <-c.removeConnNotif:
c.sendConnRemovedNotifications()
case <-c.ctx.Done():
c.mx.Lock() // Wait for all pending AddConn & RemoveConn operations to complete
defer c.mx.Unlock()
for {
select {
case p := <-c.newConns:
c.notifyPeer(p, true)
case <-c.removeConnNotif:
c.sendConnRemovedNotifications()
default:
return
}
}
}
}
}
// notifyPeer sends the peer connectedness event using the emitter.
// Use forceNotConnectedEvent = true to send a NotConnected event even if
// no Connected event was sent for this peer.
// In case a peer is disconnected before we sent the Connected event, we still
// send the Disconnected event because a connection to the peer can be observed
// in such cases.
func (c *connectednessEventEmitter) notifyPeer(p peer.ID, forceNotConnectedEvent bool) {
oldState := c.lastEvent[p]
c.lastEvent[p] = c.connectedness(p)
if c.lastEvent[p] == network.NotConnected {
delete(c.lastEvent, p)
}
if (forceNotConnectedEvent && c.lastEvent[p] == network.NotConnected) || c.lastEvent[p] != oldState {
c.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: p,
Connectedness: c.lastEvent[p],
})
}
}
func (c *connectednessEventEmitter) sendConnRemovedNotifications() {
c.removeConnsMx.Lock()
defer c.removeConnsMx.Unlock()
c.removeConnsMx.Lock()
removeConns := c.removeConns
c.removeConns = nil
c.removeConnsMx.Unlock()
for _, p := range removeConns {
c.notifyPeer(p, false)
}
}