-
Notifications
You must be signed in to change notification settings - Fork 0
/
passthru.go
76 lines (73 loc) · 1.65 KB
/
passthru.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package copper
import (
"sync"
)
func passthru(dst, src Stream, waitack bool) {
go func() {
<-src.Acknowledged()
if src.IsAcknowledged() {
dst.Acknowledge()
}
}()
var write sync.Mutex
go func() {
<-dst.WriteClosed()
write.Lock()
defer write.Unlock()
src.CloseReadWithError(dst.WriteErr())
}()
for {
buf, err := src.Peek()
if len(buf) > 0 {
write.Lock()
n, werr := dst.Write(buf)
if n > 0 {
if waitack {
<-dst.Acknowledged()
if werr == nil {
// Recheck if write side was closed
werr = dst.WriteErr()
}
if dst.IsAcknowledged() || werr == nil {
// Discard input if acknowledged, or write side is not
// yet closed. Remember, that acknowledgements are sent
// on the write side, but received on the read side, so
// the above tells us that if CloseWrite was called by
// the remote, then CloseRead was not, which may
// indicate it is reading our data. Otherwise the write
// side is done, we assume the request was lost and src
// may be reused for another attempt.
// TODO: implement src reuse
src.Discard(n)
}
waitack = false
} else {
src.Discard(n)
}
}
write.Unlock()
if werr != nil {
src.CloseReadWithError(werr)
return
}
}
if err != nil {
if src.IsAcknowledged() {
dst.Acknowledge()
}
dst.CloseWriteWithError(err)
return
}
}
}
func passthruBoth(local, remote Stream) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// TODO: set waitack to true when local reuse support is implemented
passthru(remote, local, false)
}()
passthru(local, remote, false)
wg.Wait()
}