forked from capnproto/go-capnp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
answer.go
329 lines (292 loc) · 9.45 KB
/
answer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
package rpc
import (
"context"
"fmt"
"sync"
"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/exc"
"capnproto.org/go/capnp/v3/internal/rc"
"capnproto.org/go/capnp/v3/internal/syncutil"
rpccp "capnproto.org/go/capnp/v3/std/capnp/rpc"
)
// An answerID is an index into the answers table.
type answerID uint32
// answer is an entry in a Conn's answer table.
type answer struct {
// c and id must be set before any answer methods are called.
c *Conn
id answerID
// cancel cancels the Context used in the received method call.
// May be nil.
cancel context.CancelFunc
// ret is the outgoing Return struct. ret is valid iff there was no
// error creating the message. If ret is invalid, then this answer
// entry is a placeholder until the remote vat cancels the call.
ret rpccp.Return
// sendMsg sends the return message. The caller MUST NOT hold ans.c.lk.
sendMsg func()
// msgReleaser releases the return message when its refcount hits zero.
// The caller MUST NOT hold ans.c.lk.
msgReleaser *rc.Releaser
// results is the memoized answer to ret.Results().
// Set by AllocResults and setBootstrap, but contents can only be read
// if flags has resultsReady but not finishReceived set.
results rpccp.Payload
// All fields below are protected by s.c.mu.
// flags is a bitmask of events that have occurred in an answer's
// lifetime.
flags answerFlags
// exportRefs is the number of references to exports placed in the
// results.
exportRefs map[exportID]uint32
// pcall is the PipelineCaller returned by RecvCall. It will be set
// to nil once results are ready.
pcall capnp.PipelineCaller
// promise is a promise wrapping pcall. It will be resolved, and
// subsequently set to nil, once the results are ready.
promise *capnp.Promise
// pcalls is added to for every pending RecvCall and subtracted from
// for every RecvCall return (delivery acknowledgement). This is used
// to satisfy the Returner.Return contract.
pcalls sync.WaitGroup
// err is the error passed to (*answer).sendException or from creating
// the Return message. Can only be read after resultsReady is set in
// flags.
err error
}
type answerFlags uint8
const (
returnSent answerFlags = 1 << iota
finishReceived
resultsReady
releaseResultCapsFlag
)
// flags.Contains(flag) Returns true iff flags contains flag, which must
// be a single flag.
func (flags answerFlags) Contains(flag answerFlags) bool {
return flags&flag != 0
}
// errorAnswer returns a placeholder answer with an error result already set.
func errorAnswer(c *Conn, id answerID, err error) *answer {
return &answer{
c: c,
id: id,
err: err,
flags: resultsReady | returnSent,
}
}
// newReturn creates a new Return message. The returned Releaser will release the message when
// all references to it are dropped; the caller is responsible for one reference. This will not
// happen before the message is sent, as the returned send function retains a reference.
func (c *Conn) newReturn() (_ rpccp.Return, sendMsg func(), _ *rc.Releaser, _ error) {
outMsg, err := c.transport.NewMessage()
if err != nil {
return rpccp.Return{}, nil, nil, rpcerr.Failedf("create return: %w", err)
}
ret, err := outMsg.Message.NewReturn()
if err != nil {
outMsg.Release()
return rpccp.Return{}, nil, nil, rpcerr.Failedf("create return: %w", err)
}
// Before releasing the message, we need to wait both until it is sent and
// until the local vat is done with it. We therefore implement a simple
// ref-counting mechanism such that 'release' must be called twice before
// 'releaseMsg' is called.
releaser := rc.NewReleaser(2, outMsg.Release)
return ret, func() {
c.sender.Send(asyncSend{
send: outMsg.Send,
release: releaser.Decr,
onSent: func(err error) {
if err != nil {
c.er.ReportError(fmt.Errorf("send return: %w", err))
}
},
})
}, releaser, nil
}
// setPipelineCaller sets ans.pcall to pcall if the answer has not
// already returned. The caller MUST NOT hold ans.c.lk.
//
// This also sets ans.promise to a new promise, wrapping pcall.
func (ans *answer) setPipelineCaller(m capnp.Method, pcall capnp.PipelineCaller) {
syncutil.With(&ans.c.lk, func() {
if !ans.flags.Contains(resultsReady) {
ans.pcall = pcall
ans.promise = capnp.NewPromise(m, pcall)
}
})
}
// AllocResults allocates the results struct.
func (ans *answer) AllocResults(sz capnp.ObjectSize) (capnp.Struct, error) {
var err error
ans.results, err = ans.ret.NewResults()
if err != nil {
return capnp.Struct{}, rpcerr.Failedf("alloc results: %w", err)
}
s, err := capnp.NewStruct(ans.results.Segment(), sz)
if err != nil {
return capnp.Struct{}, rpcerr.Failedf("alloc results: %w", err)
}
if err := ans.results.SetContent(s.ToPtr()); err != nil {
return capnp.Struct{}, rpcerr.Failedf("alloc results: %w", err)
}
return s, nil
}
// setBootstrap sets the results to an interface pointer, stealing the
// reference.
func (ans *answer) setBootstrap(c capnp.Client) error {
if ans.ret.HasResults() || len(ans.ret.Message().CapTable) > 0 {
panic("setBootstrap called after creating results")
}
// Add the capability to the table early to avoid leaks if setBootstrap fails.
ans.ret.Message().CapTable = []capnp.Client{c}
var err error
ans.results, err = ans.ret.NewResults()
if err != nil {
return rpcerr.Failedf("alloc bootstrap results: %w", err)
}
iface := capnp.NewInterface(ans.results.Segment(), 0)
if err := ans.results.SetContent(iface.ToPtr()); err != nil {
return rpcerr.Failedf("alloc bootstrap results: %w", err)
}
return nil
}
// Return sends the return message.
//
// The caller MUST NOT hold ans.c.lk.
func (ans *answer) Return(e error) {
rl := &releaseList{}
defer rl.Release()
ans.c.lk.Lock()
if e != nil {
ans.sendException(rl, e)
ans.c.lk.Unlock()
ans.pcalls.Wait()
ans.c.tasks.Done() // added by handleCall
return
}
if err := ans.sendReturn(rl); err != nil {
select {
case <-ans.c.bgctx.Done():
default:
ans.c.tasks.Done() // added by handleCall
if err := ans.c.shutdown(err); err != nil {
ans.c.er.ReportError(err)
}
ans.c.lk.Unlock()
ans.pcalls.Wait()
return
}
}
ans.c.lk.Unlock()
ans.pcalls.Wait()
ans.c.tasks.Done() // added by handleCall
}
// sendReturn sends the return message with results allocated by a
// previous call to AllocResults. If the answer already received a
// Finish with releaseResultCaps set to true, then sendReturn returns
// the number of references to be subtracted from each export.
//
// The caller MUST be holding onto ans.c.lk. sendReturn MUST NOT be
// called if sendException was previously called.
func (ans *answer) sendReturn(rl *releaseList) error {
ans.pcall = nil
ans.flags |= resultsReady
var err error
ans.exportRefs, err = ans.c.fillPayloadCapTable(ans.results)
if err != nil {
// We're not going to send the message after all, so don't forget to release it.
ans.msgReleaser.Decr()
ans.c.er.ReportError(rpcerr.Annotate(err, "send return"))
}
// Continue. Don't fail to send return if cap table isn't fully filled.
select {
case <-ans.c.bgctx.Done():
// We're not going to send the message after all, so don't forget to release it.
ans.msgReleaser.Decr()
default:
fin := ans.flags.Contains(finishReceived)
if ans.promise != nil {
if fin {
// Can't use ans.result after a finish, but it's
// ok to return an error if the finish comes in
// before the return. Possible enhancement: use
// the cancel variant of return.
ans.promise.Reject(rpcerr.Failedf("received finish before return"))
} else {
ans.promise.Resolve(ans.results.Content())
}
ans.promise = nil
}
ans.c.lk.Unlock()
ans.sendMsg()
if fin {
ans.c.lk.Lock()
return ans.destroy(rl)
}
ans.c.lk.Lock()
}
ans.flags |= returnSent
if !ans.flags.Contains(finishReceived) {
return nil
}
return ans.destroy(rl)
}
// sendException sends an exception on the answer's return message.
//
// The caller MUST be holding onto ans.c.lk. sendException MUST NOT
// be called if sendReturn was previously called.
func (ans *answer) sendException(rl *releaseList, ex error) {
ans.err = ex
ans.pcall = nil
ans.flags |= resultsReady
if ans.promise != nil {
ans.promise.Reject(ex)
ans.promise = nil
}
select {
case <-ans.c.bgctx.Done():
default:
// Send exception.
fin := ans.flags.Contains(finishReceived)
ans.c.lk.Unlock()
if e, err := ans.ret.NewException(); err != nil {
ans.c.er.ReportError(fmt.Errorf("send exception: %w", err))
} else {
e.SetType(rpccp.Exception_Type(exc.TypeOf(ex)))
if err := e.SetReason(ex.Error()); err != nil {
ans.c.er.ReportError(fmt.Errorf("send exception: %w", err))
} else {
ans.sendMsg()
}
}
if fin {
ans.c.lk.Lock()
// destroy will never return an error because sendException does
// create any exports.
_ = ans.destroy(rl)
}
ans.c.lk.Lock()
}
ans.flags |= returnSent
if !ans.flags.Contains(finishReceived) {
return
}
// destroy will never return an error because sendException does
// create any exports.
_ = ans.destroy(rl)
}
// destroy removes the answer from the table and returns ReleaseFuncs to
// run. The answer must have sent a return and received a finish.
// The caller must be holding onto ans.c.lk.
//
// shutdown has its own strategy for cleaning up an answer.
func (ans *answer) destroy(rl *releaseList) error {
rl.Add(ans.msgReleaser.Decr)
delete(ans.c.lk.answers, ans.id)
if !ans.flags.Contains(releaseResultCapsFlag) || len(ans.exportRefs) == 0 {
return nil
}
return ans.c.releaseExportRefs(rl, ans.exportRefs)
}