-
Notifications
You must be signed in to change notification settings - Fork 249
/
io-context.h
1586 lines (1321 loc) · 63.8 KB
/
io-context.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright (c) 2017-2022 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0
#pragma once
#include <kj/async-io.h>
#include <kj/compat/http.h>
#include <kj/mutex.h>
#include <kj/function.h>
#include <kj/map.h>
#include "trace.h"
#include "worker.h"
#include <workerd/jsg/jsg.h>
#include <v8.h>
#include "io-gate.h"
#include <workerd/api/util.h>
#include <capnp/dynamic.h>
#include "limit-enforcer.h"
#include "io-channels.h"
namespace capnp { class HttpOverCapnpFactory; }
namespace workerd {
template <typename T> class IoOwn;
template <typename T> class IoPtr;
template <typename T>
struct RemoveIoOwn_ { typedef T Type; static constexpr bool is = false; };
template <typename T>
struct RemoveIoOwn_<IoOwn<T>> { typedef T Type; static constexpr bool is = true; };
template <typename T>
constexpr bool isIoOwn() { return RemoveIoOwn_<T>::is; }
template <typename T>
using RemoveIoOwn = typename RemoveIoOwn_<T>::Type;
[[noreturn]] void throwExceededMemoryLimit(bool isActor);
class ThreadContext;
class IoContext;
class ThreadContext {
// Thread-level stuff needed to construct a IoContext. One of these is created for each
// request-handling thread.
public:
struct HeaderIdBundle {
HeaderIdBundle(kj::HttpHeaderTable::Builder& builder);
const kj::HttpHeaderTable& table;
const kj::HttpHeaderId contentEncoding;
const kj::HttpHeaderId cfCacheStatus; // used by cache API implementation
const kj::HttpHeaderId cacheControl;
const kj::HttpHeaderId cfKvMetadata; // used by KV binding implementation
const kj::HttpHeaderId cfR2ErrorHeader; // used by R2 binding implementation
const kj::HttpHeaderId cfBlobMetadataSize; // used by R2 binding implementation
const kj::HttpHeaderId cfBlobRequest; // used by R2 binding implementation
const kj::HttpHeaderId secWebSocketProtocol;
};
ThreadContext(
kj::Timer& timer, kj::EntropySource& entropySource,
HeaderIdBundle headerIds, capnp::HttpOverCapnpFactory& httpOverCapnpFactory, bool isFiddle);
kj::Timer& getUnsafeTimer() { return timer; }
// This should only be used to costruct TimerChannel. Everything else should use TimerChannel.
kj::EntropySource& getEntropySource() { return entropySource; }
const kj::HttpHeaderTable& getHeaderTable() { return headerIds.table; }
const HeaderIdBundle& getHeaderIds() { return headerIds; }
capnp::HttpOverCapnpFactory& getHttpOverCapnpFactory() { return httpOverCapnpFactory; }
bool isFiddle() { return fiddle; }
private:
kj::Timer& timer;
// NOTE: This timer only updates when entering the event loop!
kj::EntropySource& entropySource;
HeaderIdBundle headerIds;
capnp::HttpOverCapnpFactory& httpOverCapnpFactory;
bool fiddle;
};
class TimeoutId {
// A TimeoutId is a positive non-zero integer value that explicitly identifies a timeout set on an
// isolate.
//
// Lastly, timeout ids can experience integer roll over. It is expected that the
// setTimeout/clearTimeout implementation will enforce id uniqueness for *active* timeouts. This
// does not mean that an external user cannot have cached a timeout id for a long expired timeout.
// However, clearTimeout implementations are expected to only have access to timeouts set via that
// same implementation.
public:
using NumberType = double;
// Use a double so that we can exceed the maximum value for uint32_t.
using ValueType = uint64_t;
// Store as a uint64_t so that we treat this id as an integer.
class Generator;
static TimeoutId fromNumber(NumberType id) {
// Convert an externally provided double into a TimeoutId. If you are making a new TimeoutId,
// use a Generator instead.
return TimeoutId(ValueType(id));
}
NumberType toNumber() const {
// Convert a TimeoutId to an integer-covertable double for external consumption.
// Note that this is expected to be less than or equal to JavaScript Number.MAX_SAFE_INTEGER
// (2^53 - 1). To reach greater than that value in normal operation, we'd need a Generator to
// live far far longer than our normal release/restart cycle, be initialized with a large
// starting value, or experience active concurrency _somehow_.
return value;
}
bool operator<(TimeoutId id) const {
return value < id.value;
}
private:
constexpr explicit TimeoutId(ValueType value): value(value) {}
ValueType value;
};
class TimeoutId::Generator {
public:
Generator() = default;
Generator(const Generator&) = delete;
Generator& operator=(const Generator&) = delete;
Generator(Generator&&) = delete;
Generator& operator=(Generator&&) = delete;
TimeoutId getNext();
// Get the next TimeoutId for this generator. This function will never return a TimeoutId <= 0.
private:
// We always skip 0 per the spec:
// https://html.spec.whatwg.org/multipage/timers-and-user-prompts.html#timers.
TimeoutId::ValueType nextId = 1;
};
class TimeoutManager {
public:
constexpr static auto MAX_TIMEOUTS = 10'000;
// Upper bound on the number of timeouts a user can *ever* have active.
struct TimeoutParameters {
TimeoutParameters(bool repeat, int64_t msDelay, jsg::Function<void()> function)
: repeat(repeat), msDelay(msDelay), function(kj::mv(function)) {
// Don't allow pushing Date.now() backwards! This should be checked before TimeoutParameters
// is created but just in case...
if (msDelay < 0) {
this->msDelay = 0;
}
}
bool repeat;
int64_t msDelay;
kj::Maybe<jsg::Function<void()>> function;
// This is a maybe to allow cancel to clear it and free the reference
// when it is no longer needed.
};
virtual TimeoutId setTimeout(
IoContext& context, TimeoutId::Generator& generator, TimeoutParameters params) = 0;
virtual void clearTimeout(IoContext& context, TimeoutId id) = 0;
virtual size_t getTimeoutCount() const = 0;
virtual kj::Maybe<kj::Date> getNextTimeout() const = 0;
};
class IoContext_IncomingRequest {
// Represents one incoming request being handled by a IoContext. In non-actor scenarios,
// there is only ever one IncomingRequest per IoContext, but with actors there could be many.
//
// This should normally be referenced as IoContext::IncomingRequest, but it has been pulled
// out of the nested scope to allow forward-declaration.
//
// The purpose of tracking IncomingRequests at all is so that we can perform metrics, logging,
// and tracing on a "per-request basis", e.g. we can log that a particular incoming request
// generated N subrequests, and traces can trace through them. But this concept falls apart
// a bit when actors are in play, because we can't really say which incoming request "caused"
// any particular subrequest, especially when multiple incoming requests overlap. As a
// heuristic approximation, we attribute each subrequest (and all other forms of resource
// usage) to the "current" incoming request, which is defined as the newest request that hasn't
// already completed.
public:
IoContext_IncomingRequest(kj::Own<IoContext> context,
kj::Own<IoChannelFactory> ioChannelFactory,
kj::Own<RequestObserver> metrics,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer,
kj::Maybe<kj::Own<Tracer>> tracer);
KJ_DISALLOW_COPY(IoContext_IncomingRequest);
~IoContext_IncomingRequest() noexcept(false);
IoContext& getContext() { return *context; }
void delivered();
// Invoked when the request is actually delivered.
//
// If, for some reason, this is not invoked before the object is destroyed, this indicate that
// the event was canceled for some reason before delivery. No JavaScript was invoked.
//
// This method invokes metrics->delivered() and also makes this IncomingRequest "current" for
// the IoContext.
//
// If delivered() is never called, then drain() need not be called.
kj::Promise<void> drain();
// Waits until the request is "done". For non-actor requests this means waiting until
// all "waitUntil" tasks finish, applying the "soft timeout" time limit from WorkerLimits.
//
// For actor requests, this means waiting until either all tasks have finished (not just
// waitUntil, all tasks), or a new incoming request has been received (which then takes over
// responsibility for waiting for tasks), or the actor is shut down.
kj::Promise<bool> finishScheduled();
// Waits for all "waitUntil" tasks to finish, up to the time limit for scheduled events, as
// defined by `scheduledTimeoutMs` in `WorkerLimits`. Returns a bool indicating `true` if the
// event completed successfully, or `false`, if it was canceled early.
//
// Note that, while this is similar in some ways to `drain()`, `finishScheduled()` is intended
// to be called synchronously during request handling, i.e. where a client is waiting for the
// result, and the operation will be canceled if the client disconnects. `drain()` is intended
// to be called after the client has received a response or disconnected.
//
// This method is also used by some custom event handlers (see WorkerInterface::CustomEvent) that
// need similar behavior.
RequestObserver& getMetrics() { return *metrics; }
kj::Maybe<WorkerTracer&> getWorkerTracer() { return workerTracer; }
kj::Maybe<Tracer&> getTracer() { return tracer; }
private:
kj::Own<IoContext> context;
kj::Own<RequestObserver> metrics;
kj::Maybe<kj::Own<WorkerTracer>> workerTracer;
kj::Maybe<kj::Own<Tracer>> tracer;
kj::Own<IoChannelFactory> ioChannelFactory;
bool wasDelivered = false;
bool waitedForWaitUntil = false;
// Used for debugging, tracks whether we properly called drain() or some other mechanism to
// wait for waitUntil tasks.
kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> drainFulfiller;
// If drain() was already called, this is non-null and fulfilling it will cancel the drain.
// This is used in particular when a new IncomingRequest starts while the drain is being
// awaited.
kj::ListLink<IoContext_IncomingRequest> link;
// Used by IoContext::incomingRequests.
friend class IoContext;
};
class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler {
// IoContext holds state associated with a single I/O context. For stateless requests, each
// incoming request runs in a unique I/O context. For actors, each actor runs in a unique I/O
// context (but all requests received by that actor run in the same context).
//
// The IoContext serves as a bridge between JavaScript objects and I/O objects. I/O
// objects are strongly tied to the KJ event loop, and thus must live on a single thread. The
// JS isolate, however, can move between threads, bringing all garbage-collected heap objects
// with it. So, when a GC'd object holds a reference to I/O objects or tasks (KJ promises), it
// needs help from IoContext manage this.
//
// Whenever JavaScript is executing, the current IoContext can be obtained via
// `IoContext::current()`, and this can then be used to manage I/O, such as outgoing
// subrequests. When the IoContext is destroyed, all outstanding I/O objects and tasks
// created through it are destroyed immediately, even if objects on the JS heap still refer to
// them. Any attempt to access an I/O object from the wrong context will throw.
//
// This has an observable side-effect for workers: if a worker saves the request objects
// associated with one request into its global state and then attempts to access those objects
// within callbacks associated with some other request, an exception will be thrown. We actually
// like this. We don't want people leaking heavy objects or allowing simultaneous requests to
// interfere with each other.
public:
class TimeoutManagerImpl;
IoContext(ThreadContext& thread,
kj::Own<const Worker> worker, kj::Maybe<Worker::Actor&> actor,
kj::Own<LimitEnforcer> limitEnforcer);
// Construct a new IoContext. Before using it, you must also create an IncomingRequest.
~IoContext() noexcept(false);
// On destruction, all outstanding tasks associated with this request are canceled.
using IncomingRequest = IoContext_IncomingRequest;
const Worker& getWorker() { return *worker; }
Worker::Lock& getCurrentLock() { return KJ_REQUIRE_NONNULL(currentLock); }
uint32_t getAnalyticsEngineWriteBudget() { return analyticsEngineWriteBudget; };
void decrementAnalyticsEngineWriteBudget() {
JSG_REQUIRE(analyticsEngineWriteBudget > 0, Error, "writeDataPoint(dataPoint): Write limit reached.");
--analyticsEngineWriteBudget;
};
kj::Maybe<Worker::Actor&> getActor() {
return actor;
}
Worker::Actor& getActorOrThrow();
// Gets the actor, throwing if there isn't one.
RequestObserver& getMetrics() {
return *getCurrentIncomingRequest().metrics;
}
const kj::Maybe<WorkerTracer&> getWorkerTracer() {
if (incomingRequests.empty()) return nullptr;
return getCurrentIncomingRequest().getWorkerTracer();
}
const kj::Maybe<Tracer&> getTracer() {
if (incomingRequests.empty()) return nullptr;
return getCurrentIncomingRequest().tracer.map([](kj::Own<Tracer>& t) -> Tracer& { return *t; });
}
LimitEnforcer& getLimitEnforcer() { return *limitEnforcer; }
InputGate::Lock getInputLock();
// Get the current input lock. Throws an exception if no input lock is held (e.g. because this is
// not an actor request).
kj::Maybe<kj::Own<InputGate::CriticalSection>> getCriticalSection();
// Get the current CriticalSection, if there is one, or returns null if not.
template <typename Func>
jsg::PromiseForResult<Func, void, true> blockConcurrencyWhile(jsg::Lock& js, Func&& callback);
// Runs `callback` within its own critical section, returning its final result. If `callback`
// throws, the input lock will break, resetting the actor.
//
// This can only be called when I/O gates are active, i.e. in an actor.
kj::Promise<void> waitForOutputLocks();
// Wait until all outstanding output locks have been unlocked. Does not wait for future output
// locks, even if they are created before past locks are unlocked.
//
// This is used in actors to block output while some storage writes are uncommitted. For
// non-actor requests, this always completes immediately.
kj::Maybe<kj::Promise<void>> waitForOutputLocksIfNecessary();
kj::Maybe<IoOwn<kj::Promise<void>>> waitForOutputLocksIfNecessaryIoOwn();
// Like waitForOutputLocks() but, as an optimization, returns null in (some) cases where no
// wait is needed, such as when the request is not an actor request.
//
// Use the ...IoOwn() overload if you need to store this promise in a JS API object.
template <typename T>
kj::Promise<T> lockOutputWhile(kj::Promise<T> promise);
// Lock output until the given promise completes.
//
// It is an error to call this outside of actors.
bool isInspectorEnabled();
bool isFiddle();
void logWarning(kj::StringPtr description);
// Log a warning to the inspector. This is a no-op if the inspector is not enabled.
void logWarningOnce(kj::StringPtr description);
// Log a warning to the inspector. This is a no-op if the inspector is not enabled. Deduplicates
// warning messages such that a single unique message will only be logged once for the lifetime of
// an isolate.
void logErrorOnce(kj::StringPtr description);
// Log an internal error message. Deduplicates log messages such that a single unique message will
// only be logged once for the lifetime of an isolate.
void logUncaughtException(kj::StringPtr description);
void logUncaughtException(UncaughtExceptionSource source, v8::Local<v8::Value> exception,
v8::Local<v8::Message> message = {});
void logUncaughtExceptionAsync(UncaughtExceptionSource source, kj::Exception&& e);
// Log an uncaught exception from an asynchronous context, i.e. when the IoContext is not
// "current".
void reportPromiseRejectEvent(v8::PromiseRejectMessage& message);
kj::Promise<void> onAbort() { return abortPromise.addBranch(); }
// Returns a promise that will reject with an exception if and when the request should be
// aborted, e.g. because its CPU time expired. This should be joined with any promises for
// incoming tasks.
bool isFailOpen() { return failOpen; }
// Has event.passThroughOnException() been called?
void setFailOpen() { failOpen = true; }
// Called by event.passThroughOnException().
// -----------------------------------------------------------------
// Tracking thread-local request
template <typename Func>
kj::PromiseForResult<Func, Worker::Lock&> run(
Func&& func, kj::Maybe<InputGate::Lock> inputLock = nullptr)
KJ_WARN_UNUSED_RESULT;
// Asynchronously execute a callback inside the context.
//
// We don't use a "scope" class because this might actually switch to a larger stack for the
// duration of the callback.
//
// If `inputLock` is not provided, and this is an actor context, an input lock will be obtained
// before executing the callback.
template <typename Func>
kj::PromiseForResult<Func, Worker::Lock&> run(
Func&& func, kj::Maybe<kj::Own<InputGate::CriticalSection>> criticalSection)
KJ_WARN_UNUSED_RESULT;
// Like run() but executes within the given critical section, if it is non-null. If
// `criticalSection` is null, then this just forwards to the other run() (with null inputLock).
static IoContext& current();
// Returns the current IoContext for the thread.
// Throws an exception if there is no current context (see hasCurrent() below).
static bool hasCurrent();
// True if there is a current IoContext for the thread (current() will not throw).
bool isCurrent();
// True if this is the IoContext for the current thread (same as `hasCurrent() && tcx == current()`).
class WeakRef : public kj::Refcounted {
// A WeakRef is a weak reference to a IoContext. Note that because IoContext is not
// itself ref-counted, we cannot follow the usual pattern of a weak reference that potentially
// converts to a strong reference. Instead, intended usage looks like so:
// ```
// auto& context = IoContext::current();
// return canOutliveContext().then([contextWeakRef = context.getWeakRef()]() mutable {
// auto hadContext = contextWeakRef.runIfAlive([&](IoContext& context){
// useContextFinally(context);
// });
// if (!hadContext) {
// doWhatMustBeDone();
// }
// });
// ```
public:
WeakRef(kj::Badge<IoContext>, IoContext& context) : maybeContext(context) {}
template<typename F>
bool runIfAlive(F&& f) {
// Run the functor and return true if the context is alive, otherwise return false. Note that
// since the `IoContext` might not be alive for any async continuation, we do not provide
// a `kj::Maybe<IoContext&> tryGet()` function. You are expected to invoke this function
// again in the next continuation to re-check if the `IoContext` is still around.
KJ_IF_MAYBE(context, maybeContext) {
kj::fwd<F>(f)(*context);
return true;
}
return false;
}
private:
friend class IoContext;
void kill() {
maybeContext = nullptr;
}
kj::Maybe<IoContext&> maybeContext;
};
kj::Own<WeakRef> getWeakRef() {
return kj::addRef(*selfRef);
}
static kj::Maybe<kj::Own<WeakRef>> tryGetWeakRefForCurrent();
// If there is a current IoContext, return its WeakRef.
// -----------------------------------------------------------------
// Task scheduling and object storage
void addTask(kj::Promise<void> promise);
// Arrange for the given promise to execute as part of this request. It will be canceled if the
// request is canceled.
template <typename T, typename Func>
jsg::PromiseForResult<Func, T, false> awaitIo(kj::Promise<T> promise, Func&& func);
template <typename T, typename Func>
jsg::PromiseForResult<Func, T, true> awaitIo(
jsg::Lock& js, kj::Promise<T> promise, Func&& func);
template <typename T>
jsg::Promise<T> awaitIo(kj::Promise<T> promise);
template <typename T>
jsg::Promise<T> awaitIo(jsg::Lock& js, kj::Promise<T> promise);
// Waits for some background I/O to complete, then executes `func` on the result, returning a
// JavaScript promise for the result of that. If no `func` is provided, no transformation is
// applied.
//
// If the IoContext is canceled, the I/O promise will be canceled, `func` will be destroyed
// without being called, and the JS promise will never resolve.
//
// You might wonder why this function takes a continuation function as a parameter, rather than
// taking a single `kj::Promise<T>`, returning `jsg::Promise<T>`, and leaving it up to you to
// call `.then()` on the result. The answer is that `func` provides stronger guarantees about the
// context where it runs, which avoids the need for `IoOwn`s:
// - `func` itself can safely capture I/O objects without IoOwn, because the function itself
// is attached to the IoContext. (If the IoContext is canceled, `func` is destroyed.)
// - Similarly, the result of `promise` can be an I/O object without needing to be wrapped in
// IoOwn, because `func` is guaranteed to be called in this IoContext.
//
// Conversely, you might wonder why you wouldn't use `awaitIo(promise.then(func))` instead, which
// would also avoid the need for `IoOwn` since `func` would run as part of the KJ event loop.
// But, in this version, `func` cannot access any JavaScript objects, because it would not run
// with the isolate lock.
//
// Historically, we solved this with something called `capctx`. You would write something like:
// `awaitIo(promise.then(capctx(func)))`. This provided both properties: `func()` ran both in
// the KJ event loop and with the isolate lock held. However, this had the problem that it
// required returning to the KJ event loop between running func() and runnning whatever
// JavaScript code was waiting on it. This implies releasing the isolate lock just to
// immediately acquire it again, which was wasteful. Passing `func` as a parameter to `awaitIo()`
// allows it to run under the same isolate lock that then runs the awaiting JavaScript.
//
// Note that awaitIo() automatically implies registering a pending event while waiting for the
// promise (no need to call registerPendingEvent()).
template <typename T, typename Func>
jsg::PromiseForResult<Func, T, false> awaitIoWithInputLock(kj::Promise<T> promise, Func&& func);
template <typename T>
jsg::Promise<T> awaitIoWithInputLock(kj::Promise<T> promise);
// Waits for the given I/O while holding the input lock, so that all other I/O is blocked from
// completing in the meantime (unless it is also holding the same input lock).
template <typename T>
jsg::Promise<T> awaitIoLegacy(kj::Promise<T> promise);
template <typename T>
jsg::Promise<T> awaitIoLegacyWithInputLock(kj::Promise<T> promise);
// DEPRECATED: Like awaitIo() but:
// - Does not have a continuation function, so suffers from the problems described in
// `awaitIo()`'s doc comment.
// - Does not automatically register a pending event.
//
// This is used to implement the historical KJ-oriented PromiseWrapper behavior in terms of the
// new `awaitIo()` implementation. This should go away once all API implementations are
// refactored to use `awaitIo()`.
template <typename T>
kj::_::ReducePromises<RemoveIoOwn<T>> awaitJs(jsg::Promise<T> promise);
// Returns a KJ promise that resolves when a particular JavaScript promise completes.
//
// The JS promise must complete within this IoContext. The KJ promise will reject
// immediately if any of these happen:
// - The JS promise is GC'd without resolving.
// - The JS promise is resolved from the wrong context.
// - The system detects that no further process will be made in this context (because there is no
// more JavaScript to run, and there is no outstanding I/O scheduled with awaitIo()).
//
// If `T` is `IoOwn<U>`, it will be unwrapped to just `U` in the result. If `U` is in turn
// `kj::Promise<V>`, then the promises will be chained as usual, so the final result is
// `kj::Promise<V>`.
uint taskCount() { return addTaskCounter; }
// Returns the number of times addTask() has been called (even if the tasks have completed).
void addWaitUntil(kj::Promise<void> promise);
// Indicates that the script has requested that it stay active until the given promise resolves.
// drain() waits until all such promises have completed.
EventOutcome waitUntilStatus() const { return waitUntilStatusValue; }
// Returns the status of waitUntil promises. If a promise fails, this sets the status to the
// one corresponding to the exception type.
kj::TaskSet& getWaitUntilTasks() {
// DO NOT USE, use `addWaitUntil()` instead.
//
// TODO(cleanup): This is only needed for use with RpcWorkerInterface, but we can eliminate
// that class's need for waitUntilTasks if we change the signature of sendTraces() to return
// a promise, I think.
return waitUntilTasks;
}
template <typename T> IoOwn<T> addObject(kj::Own<T> obj);
template <typename T> IoPtr<T> addObject(T& obj);
// Wraps a reference in a wrapper which:
// 1. Will throw an exception if dereferenced while the IoContext is not current for the
// thread.
// 2. Can be safely destroyed from any thread.
// 3. Invalidates itself when the request ends (such that dereferencing throws).
template <typename Func>
auto addFunctor(Func&& func);
// Like addObject() but takes a functor, returning a functor with the same signature but which
// holds the original functor under a `IoOwn`, and so will stop working if the IoContext
// is no longer valid. This is particularly useful for passing to `jsg::Promise::then()` when
// you need the continuation to run in the correct context.
class Finalizeable {
// If an object passed to addObject(Own<T>) implements Finalizeable, then once it is known to
// be the case that no code will ever run in the context of this IoContext again,
// finalize() will be called.
//
// This is primarily used to proactively fail out hanging promises once we know they can never
// be fulfilled, so that requests fail fast rather than hang forever.
//
// Finalizers should NOT call into JavaScript or really do much of anything except for calling
// reject() on some Fulfiller object. It can optionally return a warning which should be
// logged if the inspector is attached.
public:
KJ_DISALLOW_COPY(Finalizeable);
#ifdef KJ_DEBUG
Finalizeable();
~Finalizeable() noexcept(false);
// In debug mode, we assert that this object was actually finalized. A Finalizeable object that
// doesn't get finalized typically arises when a derived class multiply-inherits from
// Finalizeable and some other non-Finalizeable class T, then gets passed to
// `IoContext::addObject()` as a T. This can be a source of baffling bugs.
#else
Finalizeable() = default;
#endif
private:
virtual kj::Maybe<kj::StringPtr> finalize() = 0;
friend class IoContext;
#ifdef KJ_DEBUG
IoContext& context;
bool finalized = false;
// Set true by IoContext::runFinalizers();
#endif
};
kj::Own<void> registerPendingEvent();
// Call this to indicate that the caller expects to call into JavaScript in this IoContext
// at some point in the future, in response to some *external* event that the caller is waiting
// for. Then, hold on to the returned handle until that time. This prevents finalizers from being
// called in the meantime.
//
// TODO(cleanup): awaitIo() automatically applies this. Is the public method needed anymore?
template <typename T>
kj::Promise<T> waitForDeferredProxy(kj::Promise<api::DeferredProxy<T>>&& promise) {
// When you want to perform a task that returns Promise<DeferredProxy<T>> and the application
// JavaScript is waiting for the result, use `context.waitForDeferredProxy(promise)` to turn it
// into a regular `Promise<T>`, including registering pending events as needed.
return promise.then([this](api::DeferredProxy<T> deferredProxy) {
return deferredProxy.proxyTask.attach(registerPendingEvent());
});
}
template <typename T>
jsg::Promise<T> awaitDeferredProxy(kj::Promise<api::DeferredProxy<T>>&& promise) {
// Like awaitIo(), but handles the specific case of Promise<DeferredProxy>. This is special
// becaues the convention is that the outer promise is NOT treated as a pending I/O event; it
// may actually be waiting for something to happen in JavaScript land. Once the outer promise
// resolves, the inner promise (the DeferredProxy<T>) is treated as external I/O.
return awaitIoImpl<false>(waitForDeferredProxy(kj::mv(promise)), getCriticalSection());
}
bool isFinalized() {
return ownedObjects.isFinalized();
}
void setNoRetryScheduled() { retryScheduled = false; }
// Called by ScheduledEvent
bool shouldRetryScheduled() { return retryScheduled; }
// Called by ServiceWorkerGlobalScope::runScheduled
// -----------------------------------------------------------------
// Access to I/O
TimeoutId setTimeoutImpl(
TimeoutId::Generator& generator,
bool repeat,
jsg::Function<void()> function,
double msDelay);
void clearTimeoutImpl(TimeoutId key);
// Used to implement setTimeout()/clearTimeout(). We don't expose the timer directly because the
// promises it returns need to live in this I/O context, anyway.
size_t getTimeoutCount();
kj::Date now();
// Access the event loop's current time point. This will remain constant between ticks.
kj::Promise<void> atTime(kj::Date when) { return getIoChannelFactory().getTimer().atTime(when); }
// Returns a promise that resolves once `now() >= when`.
kj::Promise<void> afterLimitTimeout(kj::Duration t) {
// Returns a promise that resolves after some time. This is intended to be used for implementing
// time limits on some sort of operation, not for implementing application-driven timing, as it
// does not maintain consistency with the clock as observed through Date.now(), e.g. when it
// comes to spectre mitigations.
return getIoChannelFactory().getTimer().afterLimitTimeout(t);
}
kj::EntropySource& getEntropySource() { return thread.getEntropySource(); }
// Provide access to the system CSPRNG.
capnp::HttpOverCapnpFactory& getHttpOverCapnpFactory() {
return thread.getHttpOverCapnpFactory();
}
const kj::HttpHeaderTable& getHeaderTable() { return thread.getHeaderTable(); }
const ThreadContext::HeaderIdBundle& getHeaderIds() { return thread.getHeaderIds(); }
static constexpr uint NULL_CLIENT_CHANNEL = 0;
static constexpr uint NEXT_CLIENT_CHANNEL = 1;
// Subrequest channel numbers for the two special channels.
// NULL = The channel used by global fetch() when the Request has no fetcher attached.
// NEXT = DEPRECATED: The fetcher attached to Requests delivered by a FetchEvent, so that we can
// detect when an incoming request is passed through to `fetch()` (perhaps with rewrites)
// and treat that case differently. In practice this has proven too confusing, so we don't
// plan to treat NEXT and NULL differently going forward.
static constexpr uint SPECIAL_SUBREQUEST_CHANNEL_COUNT = 2;
// Number of subrequest channels that have special meaning (and so won't appear in any binding).
struct SubrequestOptions final {
bool inHouse;
// When inHouse is true, the subrequest is to an API provided internally. For example calls
// to KV. This primarily affects metrics and limits.
bool wrapMetrics;
// When true, the client is wrapped by metrics.wrapSubrequestClient() ensuring appropriate
// metrics collection.
kj::Maybe<kj::StringPtr> operationName;
// The name to use for the request's Jaeger span if Jaeger tracing is turned on.
};
kj::Own<WorkerInterface> getSubrequestNoChecks(
kj::FunctionParam<kj::Own<WorkerInterface>(kj::Maybe<Tracer::Span&>, IoChannelFactory&)> func,
SubrequestOptions options);
kj::Own<WorkerInterface> getSubrequest(
kj::FunctionParam<kj::Own<WorkerInterface>(kj::Maybe<Tracer::Span&>, IoChannelFactory&)> func,
SubrequestOptions options);
// If creating a new subrequest is permitted, calls the given factory function to create one.
kj::Own<WorkerInterface> getSubrequestChannel(
uint channel, bool isInHouse, kj::Maybe<kj::String> cfBlobJson, kj::StringPtr operationName);
// Get WorkerInterface objects to use for subrequests.
//
// `channel` specifies which outgoing channel to use. The special channel 0 refers to the "null"
// binding (used for fetches where `request.fetcher` is not set), and channel 1 refers to the
// "next" binding (used when request.fetcher is carried over from the incoming request).
// Named bindings, e.g. Worker2Worker bindings, will have indices starting from 2. Fetcher
// bindings declared via Worker::Global::Fetcher have a corresponding `channel` property to refer
// to these outgoing bindings.
//
// `isInHouse` is true if this client represents an "in house" endpoint, i.e. some API provided
// by the Workers platform. For example, KV namespaces are in-house. This primarily affects
// metrics and limits:
// - In-house requests do not count as "subrequests" for metrics and logging purposes.
// - In-house requests are not subject to the same limits on the number of subrequests per
// request.
// - In preview, in-house requests do not show up in the network tab.
//
// `operationName` is the name to use for the request's Jaeger span, if Jaeger tracing is
// turned on.
kj::Own<WorkerInterface> getSubrequestChannelNoChecks(
uint channel, bool isInHouse, kj::Maybe<kj::String> cfBlobJson,
kj::Maybe<kj::StringPtr> operationName = nullptr);
// Like getSubrequestChannel() but doesn't enforce limits. Use for trusted paths only.
kj::Own<kj::HttpClient> getHttpClient(
uint channel, bool isInHouse, kj::Maybe<kj::String> cfBlobJson, kj::StringPtr operationName);
kj::Own<kj::HttpClient> getHttpClientNoChecks(
uint channel, bool isInHouse, kj::Maybe<kj::String> cfBlobJson,
kj::Maybe<kj::StringPtr> operationName = nullptr);
// Convenience methods that call getSubrequest*() and adapt the returned WorkerInterface objects
// to HttpClient.
//
// TODO(cleanup): Make it the caller's job to call asHttpClient() on the result of
// getSubrequest*().
capnp::Capability::Client getCapnpChannel(uint channel) {
return getIoChannelFactory().getCapability(channel);
}
kj::Own<IoChannelFactory::ActorChannel> getGlobalActorChannel(
uint channel, const ActorIdFactory::ActorId& id) {
return getIoChannelFactory().getGlobalActor(channel, id);
}
kj::Own<IoChannelFactory::ActorChannel> getColoLocalActorChannel(uint channel, kj::String id) {
return getIoChannelFactory().getColoLocalActor(channel, kj::mv(id));
}
kj::Own<CacheClient> getCacheClient();
// Get an HttpClient to use for Cache API subrequests.
kj::Maybe<Tracer::Span> makeTraceSpan(kj::StringPtr operationName);
// Make a new trace span, if tracing is active. Returns null if tracing is not active.
jsg::Promise<kj::Maybe<IoOwn<kj::AsyncInputStream>>> makeCachePutStream(
jsg::Lock& js, kj::Own<kj::AsyncInputStream> stream);
// Implement per-IoContext rate limiting for Cache.put(). Pass the body of a Cache API PUT
// request and get a possibly wrapped stream back.
//
// The returned promise is fulfilled with nullptr if the Cache API PUT quota is already exceeded,
// or if the passed stream would cause it to be exceeded. If the stream has an unknown length, you
// will get a wrapped stream back that tracks how many bytes are read/pumped out of the stream,
// then decrements the per-IoContext quota on destruction.
//
// TODO(cleanup): Factor this into getCacheClient() somehow so it's not opt-in.
capnp::CapabilityServerSet<capnp::DynamicCapability>& getLocalCapSet() {
// Gets a CapabilityServerSet representing the capnp capabilities hosted by this request or
// actor context. This allows us to implement the CapnpCapability::unwrap() method on
// capabilities which allows the application to get at the underlying server object, when the
// capability points to a local object.
return localCapSet;
}
void writeLogfwdr(uint channel, kj::FunctionParam<void(capnp::AnyPointer::Builder)> buildMessage);
private:
ThreadContext& thread;
kj::Own<WeakRef> selfRef = kj::refcounted<WeakRef>(kj::Badge<IoContext>(), *this);
kj::Own<const Worker> worker;
kj::Maybe<Worker::Actor&> actor;
kj::Own<LimitEnforcer> limitEnforcer;
kj::List<IncomingRequest, &IncomingRequest::link> incomingRequests;
// List of active IncomingRequests, ordered from most-recently-started to least-recently-started.
capnp::CapabilityServerSet<capnp::DynamicCapability> localCapSet;
static constexpr uint32_t DEFAULT_ANALYTICS_ENGINE_WRITE_LIMIT = 25; // arbitrarily chosen limit
uint32_t analyticsEngineWriteBudget = DEFAULT_ANALYTICS_ENGINE_WRITE_LIMIT;
bool failOpen = false;
void* threadId;
// For debug checks.
bool retryScheduled = true;
// For scheduled workers noRetry calls
kj::Maybe<Worker::Lock&> currentLock;
kj::Maybe<InputGate::Lock> currentInputLock;
struct OwnedObject {
kj::Maybe<kj::Own<OwnedObject>> next;
kj::Maybe<kj::Own<OwnedObject>>* prev;
kj::Maybe<Finalizeable&> finalizer;
};
template <typename T>
struct SpecificOwnedObject: public OwnedObject {
SpecificOwnedObject(kj::Own<T> ptr): ptr(kj::mv(ptr)) {}
kj::Own<T> ptr;
};
class OwnedObjectList {
public:
OwnedObjectList() = default;
KJ_DISALLOW_COPY(OwnedObjectList);
~OwnedObjectList() noexcept(false);
void link(kj::Own<OwnedObject> object);
static void unlink(OwnedObject& object);
kj::Vector<kj::StringPtr> finalize();
// Runs the finalizer for each object in forward order and returns a vector of any warnings
// returned from those finalizers.
bool isFinalized() {
return finalizersRan;
}
private:
kj::Maybe<kj::Own<OwnedObject>> head;
bool finalizersRan = false;
};
class DeleteQueue: public kj::AtomicRefcounted {
// Object which receives possibly-cross-thread deletions of owned objects.
public:
DeleteQueue()
: crossThreadDeleteQueue(State { kj::Vector<OwnedObject*>() }) {}
void scheduleDeletion(OwnedObject* object) const;
struct State {
kj::Vector<OwnedObject*> queue;
};
kj::MutexGuarded<kj::Maybe<State>> crossThreadDeleteQueue;
// Pointers from IoOwns that were dropped in other threads, and therefore should be deleted
// whenever the IoContext gets around to it. The maybe is changed to nullptr when the
// IoContext goes away, at which point all OwnedObjects have already been deleted so
// cross-thread deletions can just be ignored.
template <typename T> IoOwn<T> addObject(kj::Own<T> obj, OwnedObjectList& ownedObjects);
// Implements the corresponding methods of IoContext and ActorContext.
};
class DeleteQueuePtr: public kj::Own<DeleteQueue> {
// When the IoContext is destroyed, we need to null out the DeleteQueue. Complicating
// matters a bit, we need to cancel all tasks (destroy the TaskSet) before this happens, so
// we can't just do it in IoContext's destrucrtor. As a hack, we customize our pointer
// to the delete queue to get the tear-down order right.
public:
DeleteQueuePtr(kj::Own<DeleteQueue> value)
: kj::Own<DeleteQueue>(kj::mv(value)) {}
KJ_DISALLOW_COPY(DeleteQueuePtr);
~DeleteQueuePtr() noexcept(false) {
auto ptr = get();
if (ptr != nullptr) {
*ptr->crossThreadDeleteQueue.lockExclusive() = nullptr;
}
}
};
DeleteQueuePtr deleteQueue;
kj::Own<kj::PromiseFulfiller<void>> abortFulfiller;
kj::ForkedPromise<void> abortPromise = nullptr;
class PendingEvent;
kj::Maybe<PendingEvent&> pendingEvent;
kj::Maybe<kj::Promise<void>> runFinalizersTask;
OwnedObjectList ownedObjects;
// Objects pointed to by IoOwn<T>s.
// NOTE: This must live below `deleteQueue`, as some of these OwnedObjects may own attachctx()'ed
// objects which reference `deleteQueue` in their destructors.
constexpr static size_t GB = 1 << 30;
constexpr static size_t MAX_TOTAL_PUT_SIZE = 5 * GB;
constexpr static size_t MAX_INDIVIDUAL_PUT_SIZE = 5 * GB;
kj::Promise<size_t> cachePutQuota = MAX_TOTAL_PUT_SIZE;
// Implementation detail of makeCachePutStream().
kj::TaskSet waitUntilTasks;
EventOutcome waitUntilStatusValue = EventOutcome::OK;
void setTimeoutImpl(TimeoutId timeoutId, bool repeat, jsg::V8Ref<v8::Function> function,
double msDelay, kj::Array<jsg::Value> args);
uint addTaskCounter = 0;
kj::Maybe<kj::TaskSet> tasks;
kj::Own<TimeoutManager> timeoutManager;
// The timeout manager needs to live below `deleteQueue` because the promises may refer to
// objects in the queue.
// ATTENTION: `tasks` and `timeoutManagerOrCanceler` MUST be destructed before any other member.
// If any other member is destructed after (is declared later in the class than) these two
// members, then there is a possibility that callbacks will attempt to use a partially or fully
// destructed IoContext object. For the same reason, any promises stored outside of the
// IoContext (e.g. in the ActorContext) MUST be canceled when the IoContext is
// destructed.
kj::Own<WorkerInterface> getSubrequestChannelImpl(
uint channel, bool isInHouse, kj::Maybe<kj::String> cfBlobJson,
kj::Maybe<Tracer::Span&> span, IoChannelFactory& channelFactory);
friend class IoContext_IncomingRequest;
template <typename T> friend class IoOwn;
template <typename T> friend class IoPtr;
void taskFailed(kj::Exception&& exception) override;
void requireCurrent();
void checkFarGet(const DeleteQueue* expectedQueue);
class Runnable {
public:
virtual void run(Worker::Lock& lock) = 0;
};
void runImpl(Runnable& runnable, bool takePendingEvent, Worker::LockType lockType,
kj::Maybe<InputGate::Lock> inputLock, bool allowPermanentException);
void runFinalizers(Worker::AsyncLock& asyncLock);
template <bool addIoOwn, typename T> struct MaybeIoOwn_;
template <typename T> struct MaybeIoOwn_<false, T> { typedef T Type; };
template <typename T> struct MaybeIoOwn_<true, T> { typedef IoOwn<T> Type; };
template <bool addIoOwn, typename T>
using MaybeIoOwn = typename MaybeIoOwn_<addIoOwn, T>::Type;