-
Notifications
You must be signed in to change notification settings - Fork 0
/
websocket.go
148 lines (134 loc) · 3.52 KB
/
websocket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package guiapi
import (
"context"
"encoding/json"
"log"
"math/rand"
"nhooyr.io/websocket"
)
// StreamFunc is the type of a stream handler function. The initial arguments
// from the client side are passed as JSON in args. Any time an update is
// ready to be sent, it needs to be sent to the res channel. The stream can
// be closed by returning from the function. If the client side closes the
// connection, the context will be canceled. Because of this it is important
// to check ctx.Done() regularly.
type StreamFunc func(ctx context.Context, args json.RawMessage, res chan<- *Update) error
type websocketMessage struct {
Name string `json:"name"`
Args json.RawMessage `json:"args"`
}
func (s *Server) websocketHandler(c *PageCtx) {
streamID := rand.Intn(10000)
conn, err := websocket.Accept(c.Writer, c.Request, &websocket.AcceptOptions{
Subprotocols: []string{"guiapi"},
})
if err != nil {
log.Println("websocket accept error:", err)
return
}
defer conn.Close(websocket.StatusInternalError, "exit")
if conn.Subprotocol() != "guiapi" {
log.Printf("websocket accept error: invalid subprotocol %q", conn.Subprotocol())
return
}
ctx := c.Request.Context()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ch := make(chan *Update, 1)
defer close(ch)
log.Println("start websocket", streamID)
go func() {
defer log.Println("exit websocket writer", streamID)
for {
select {
case <-ctx.Done():
err := conn.Close(websocket.StatusNormalClosure, "done")
if err != nil {
log.Println("websocket close error:", err)
return
}
return
case resp, ok := <-ch:
if !ok {
log.Println("websocket writer not ok", streamID)
return
}
buf, err := json.Marshal(resp)
if err != nil {
log.Println("json marshal error:", err)
return
}
err = conn.Write(ctx, websocket.MessageText, buf)
if err != nil {
log.Println("websocket write error:", err)
return
}
}
}
}()
messages := make(chan []byte)
defer close(messages)
go func() {
defer log.Println("exit websocket reader", streamID)
for {
msgType, buf, err := conn.Read(ctx)
if err != nil {
if websocket.CloseStatus(err) == websocket.StatusGoingAway {
log.Println("websocket going away")
} else {
log.Println("websocket read error:", err, "CloseStatus:", websocket.CloseStatus(err))
}
cancel()
return
}
if msgType != websocket.MessageText {
log.Println("websocket read error: invalid message type", msgType)
return
}
select {
case messages <- buf:
case <-ctx.Done():
log.Println("websocket reader blocked", streamID)
}
}
}()
var previousCancel context.CancelFunc
defer log.Println("exit websocketHandler", streamID)
for {
select {
case <-ctx.Done():
return
case buf, ok := <-messages:
if previousCancel != nil {
previousCancel()
}
if !ok {
log.Println("websocket router not ok", streamID)
return
}
var msg websocketMessage
err := json.Unmarshal(buf, &msg)
if err != nil {
log.Println("json unmarshal error:", err)
cancel()
break
}
log.Printf("websocket message %q %s", msg.Name, msg.Args)
subCtx, subCancel := context.WithCancel(ctx)
previousCancel = subCancel
go func() {
fn := s.streams[msg.Name]
if fn == nil {
log.Println("StreamRouter error: unknown stream", msg.Name, string(buf))
cancel()
return
}
err := fn(subCtx, msg.Args, ch)
if err != nil {
log.Println("StreamRouter error:", err)
cancel()
}
}()
}
}
}