-
Notifications
You must be signed in to change notification settings - Fork 0
/
server_peers.go
331 lines (302 loc) · 8.32 KB
/
server_peers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
package copper
import (
"fmt"
"io"
"net"
"time"
)
type serverPeerKey struct {
network string
address string
}
type serverPeerRemote struct {
peer *serverPeer
client *clientConn
targetID int64
name string
distance uint32
settings PublishSettings
subscriptions map[*serverSubscription]struct{}
}
var _ endpointReference = &serverPeerRemote{}
func (remote *serverPeerRemote) getEndpointsRLocked() []Endpoint {
if peer := remote.peer; peer != nil {
return []Endpoint{{
Network: peer.key.network,
Address: peer.key.address,
TargetID: remote.targetID,
}}
}
return nil
}
func (remote *serverPeerRemote) handleRequestRLocked(callback handleRequestCallback, cancel <-chan struct{}) handleRequestStatus {
peer := remote.peer
if peer == nil {
return handleRequestStatusNoRoute
}
peer.owner.mu.RUnlock()
defer peer.owner.mu.RLock()
if isCancelled(cancel) {
// The request is cancelled, so don't bother wasting a stream
return handleRequestStatusDone
}
stream, err := rpcNewStream(remote.client, remote.targetID)
if err != nil {
// If there was any error during establishing the stream it means that
// the client either disconnected or cannot accept new streams. In
// either case it gets forcefully removed.
peer.owner.mu.Lock()
defer peer.owner.mu.Unlock()
remote.removeLocked()
return handleRequestStatusRetry
}
defer stream.Close()
return callback(stream)
}
type serverPeer struct {
owner *server
key serverPeerKey
distance uint32
failure error
failed chan struct{}
client *clientConn
remotesByTarget map[int64]*serverPeerRemote
remotesByName map[string]map[int64]*serverPeerRemote
}
func (s *server) addPeerLocked(network, address string, distance uint32) error {
key := serverPeerKey{
network: network,
address: address,
}
peer := s.peers[key]
if peer != nil {
return fmt.Errorf("peer for %s:%s already exists", network, address)
}
peer = &serverPeer{
owner: s,
key: key,
distance: distance,
failed: make(chan struct{}),
remotesByTarget: make(map[int64]*serverPeerRemote),
remotesByName: make(map[string]map[int64]*serverPeerRemote),
}
s.peers[key] = peer
go peer.connectloop()
return nil
}
func (peer *serverPeer) closeWithErrorLocked(err error) {
if peer.failure == nil {
peer.failure = err
close(peer.failed)
}
}
func (peer *serverPeer) sleep(d time.Duration) bool {
t := time.NewTimer(d)
defer t.Stop()
select {
case <-t.C:
return true
case <-peer.failed:
return false
}
}
func (peer *serverPeer) connectloop() {
for {
select {
case <-peer.failed:
return
default:
}
conn, err := net.Dial(peer.key.network, peer.key.address)
if err != nil {
if log := DebugLog(); log != nil {
log.Printf("Peer %s: connect failed: %s", peer.key.address, err)
}
if peer.sleep(5 * time.Second) {
continue
}
return
}
client := newClient(conn)
peer.attachClient(client)
stop := make(chan struct{})
go func() {
select {
case <-peer.failed:
client.Close()
case <-stop:
}
}()
<-client.Done()
peer.detachClient(client)
client.Close()
close(stop)
}
}
func (peer *serverPeer) attachClient(client *clientConn) {
peer.owner.mu.Lock()
defer peer.owner.mu.Unlock()
peer.client = client
if log := DebugLog(); log != nil {
log.Printf("Peer %s: connected to %s", peer.key.address, client.RemoteAddr())
}
go peer.serveClient(client)
}
func (peer *serverPeer) detachClient(client *clientConn) {
peer.owner.mu.Lock()
defer peer.owner.mu.Unlock()
if peer.client == client {
if log := DebugLog(); log != nil {
log.Printf("Peer %s: disconnected from %s", peer.key.address, client.RemoteAddr())
}
for _, remote := range peer.remotesByTarget {
remote.removeLocked()
}
peer.client = nil
}
}
func (peer *serverPeer) serveClient(client *clientConn) {
defer client.Close()
defer peer.detachClient(client)
for {
if !peer.listenChanges(client) {
break
}
peer.owner.mu.RLock()
active := peer.client == client && peer.failure == nil
peer.owner.mu.RUnlock()
if !active {
break
}
}
}
func (peer *serverPeer) listenChanges(client *clientConn) bool {
stream, err := client.ServiceChanges()
if err != nil {
if err != ECONNCLOSED && err != ECONNSHUTDOWN {
if log := ErrorLog(); log != nil {
log.Printf("Peer %s: changes stream: %s: %s", peer.key.address, client.RemoteAddr(), err)
}
} else {
if log := DebugLog(); log != nil {
log.Printf("Peer %s: changes stream: %s: %s", peer.key.address, client.RemoteAddr(), err)
}
}
return false
}
defer stream.Stop()
for {
changes, err := stream.Read()
if err != nil {
if err == ECONNCLOSED || err == ECONNSHUTDOWN {
if log := DebugLog(); log != nil {
log.Printf("Peer %s: changes stream: %s: %s", peer.key.address, client.RemoteAddr(), err)
}
return false
}
if err != io.EOF {
if log := ErrorLog(); log != nil {
log.Printf("Peer %s: changes stream: %s: %s", peer.key.address, client.RemoteAddr(), err)
}
}
// TODO: Theoretically we may receive an error if we are not reading
// changes fast enough, in which case we may try to reconnect, but
// for that we need to forget all currently active services first.
if log := DebugLog(); log != nil {
log.Printf("Peer %s: changes stream: %s: %s", peer.key.address, client.RemoteAddr(), err)
}
return false
}
if !peer.processChanges(client, changes) {
if log := DebugLog(); log != nil {
log.Printf("Peer %s: changes stream: %s: stopping", peer.key.address, client.RemoteAddr())
}
return false
}
}
}
func (peer *serverPeer) processChanges(client *clientConn, changes ServiceChanges) bool {
peer.owner.mu.Lock()
defer peer.owner.mu.Unlock()
if peer.client != client || peer.failure != nil {
// These changes are from the wrong client
return false
}
for _, targetID := range changes.Removed {
if remote := peer.remotesByTarget[targetID]; remote != nil {
remote.removeLocked()
if log := DebugLog(); log != nil {
log.Printf("Service %s(priority=%d): removed from %s", remote.name, remote.settings.Priority, peer.key.address)
}
}
}
for _, change := range changes.Changed {
peer.addRemoteLocked(change)
}
return true
}
func (peer *serverPeer) addRemoteLocked(change ServiceChange) {
if change.Settings.MaxDistance < peer.distance {
// We are not allowed to reach this service
if remote := peer.remotesByTarget[change.TargetID]; remote != nil {
// Forget this remote and remove from all subscriptions
remote.removeLocked()
if log := DebugLog(); log != nil {
log.Printf("Service %s(priority=%d): removed from %s", remote.name, remote.settings.Priority, peer.key.address)
}
}
return
}
// Check if we already had this remote registered
if remote := peer.remotesByTarget[change.TargetID]; remote != nil {
if remote.name == change.Name && change.Settings.Priority == remote.settings.Priority {
// This remote changed neigher name nor priority
remote.settings = change.Settings
return
}
// Either target id was reused for a different name (shouldn't happen
// in practice), or priority changed. The easiest way to update
// subscriptions is to simply remove this remote and re-add it again
// below.
remote.removeLocked()
}
remote := &serverPeerRemote{
peer: peer,
client: peer.client,
targetID: change.TargetID,
name: change.Name,
distance: peer.distance,
settings: change.Settings,
subscriptions: make(map[*serverSubscription]struct{}),
}
remotes := peer.remotesByName[remote.name]
if remotes == nil {
remotes = make(map[int64]*serverPeerRemote)
peer.remotesByName[remote.name] = remotes
}
remotes[remote.targetID] = remote
peer.remotesByTarget[remote.targetID] = remote
for sub := range peer.owner.subsByName[remote.name] {
sub.addRemoteLocked(remote)
}
if log := DebugLog(); log != nil {
log.Printf("Service %s(priority=%d): discovered at %s", remote.name, remote.settings.Priority, peer.key.address)
}
}
func (remote *serverPeerRemote) removeLocked() {
peer := remote.peer
if peer == nil || peer.client != remote.client {
return
}
remote.peer = nil
for sub := range remote.subscriptions {
sub.removeRemoteLocked(remote)
}
if remotes := peer.remotesByName[remote.name]; remotes != nil {
delete(remotes, remote.targetID)
if len(remotes) == 0 {
delete(peer.remotesByName, remote.name)
}
}
delete(peer.remotesByTarget, remote.targetID)
}