-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
dataloader.rb
264 lines (240 loc) · 9.5 KB
/
dataloader.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# frozen_string_literal: true
require "graphql/dataloader/null_dataloader"
require "graphql/dataloader/request"
require "graphql/dataloader/request_all"
require "graphql/dataloader/source"
module GraphQL
# This plugin supports Fiber-based concurrency, along with {GraphQL::Dataloader::Source}.
#
# @example Installing Dataloader
#
# class MySchema < GraphQL::Schema
# use GraphQL::Dataloader
# end
#
# @example Waiting for batch-loaded data in a GraphQL field
#
# field :team, Types::Team, null: true
#
# def team
# dataloader.with(Sources::Record, Team).load(object.team_id)
# end
#
class Dataloader
def self.use(schema)
schema.dataloader_class = self
end
# Call the block with a Dataloader instance,
# then run all enqueued jobs and return the result of the block.
def self.with_dataloading(&block)
dataloader = self.new
result = nil
dataloader.append_job {
result = block.call(dataloader)
}
dataloader.run
result
end
def initialize
@source_cache = Hash.new { |h, source_class| h[source_class] = Hash.new { |h2, batch_parameters|
source = if RUBY_VERSION < "3"
source_class.new(*batch_parameters)
else
batch_args, batch_kwargs = batch_parameters
source_class.new(*batch_args, **batch_kwargs)
end
source.setup(self)
h2[batch_parameters] = source
}
}
@pending_jobs = []
end
# Get a Source instance from this dataloader, for calling `.load(...)` or `.request(...)` on.
#
# @param source_class [Class<GraphQL::Dataloader::Source]
# @param batch_parameters [Array<Object>]
# @return [GraphQL::Dataloader::Source] An instance of {source_class}, initialized with `self, *batch_parameters`,
# and cached for the lifetime of this {Multiplex}.
if RUBY_VERSION < "3"
def with(source_class, *batch_parameters)
@source_cache[source_class][batch_parameters]
end
else
def with(source_class, *batch_args, **batch_kwargs)
batch_parameters = [batch_args, batch_kwargs]
@source_cache[source_class][batch_parameters]
end
end
# Tell the dataloader that this fiber is waiting for data.
#
# Dataloader will resume the fiber after the requested data has been loaded (by another Fiber).
#
# @return [void]
def yield
Fiber.yield
nil
end
# @api private Nothing to see here
def append_job(&job)
# Given a block, queue it up to be worked through when `#run` is called.
# (If the dataloader is already running, than a Fiber will pick this up later.)
@pending_jobs.push(job)
nil
end
# Use a self-contained queue for the work in the block.
def run_isolated
prev_queue = @pending_jobs
@pending_jobs = []
res = nil
# Make sure the block is inside a Fiber, so it can `Fiber.yield`
append_job {
res = yield
}
run
res
ensure
@pending_jobs = prev_queue
end
# @api private Move along, move along
def run
# At a high level, the algorithm is:
#
# A) Inside Fibers, run jobs from the queue one-by-one
# - When one of the jobs yields to the dataloader (`Fiber.yield`), then that fiber will pause
# - In that case, if there are still pending jobs, a new Fiber will be created to run jobs
# - Continue until all jobs have been _started_ by a Fiber. (Any number of those Fibers may be waiting to be resumed, after their data is loaded)
# B) Once all known jobs have been run until they are complete or paused for data, run all pending data sources.
# - Similarly, create a Fiber to consume pending sources and tell them to load their data.
# - If one of those Fibers pauses, then create a new Fiber to continue working through remaining pending sources.
# - When a source causes another source to become pending, run the newly-pending source _first_, since it's a dependency of the previous one.
# C) After all pending sources have been completely loaded (there are no more pending sources), resume any Fibers that were waiting for data.
# - Those Fibers assume that source caches will have been populated with the data they were waiting for.
# - Those Fibers may request data from a source again, in which case they will yeilded and be added to a new pending fiber list.
# D) Once all pending fibers have been resumed once, return to `A` above.
#
# For whatever reason, the best implementation I could find was to order the steps `[D, A, B, C]`, with a special case for skipping `D`
# on the first pass. I just couldn't find a better way to write the loops in a way that was DRY and easy to read.
#
pending_fibers = []
next_fibers = []
first_pass = true
while first_pass || (f = pending_fibers.shift)
if first_pass
first_pass = false
else
# These fibers were previously waiting for sources to load data,
# resume them. (They might wait again, in which case, re-enqueue them.)
resume(f)
if f.alive?
next_fibers << f
end
end
while @pending_jobs.any?
# Create a Fiber to consume jobs until one of the jobs yields
# or jobs run out
f = spawn_fiber {
while (job = @pending_jobs.shift)
job.call
end
}
resume(f)
# In this case, the job yielded. Queue it up to run again after
# we load whatever it's waiting for.
if f.alive?
next_fibers << f
end
end
if pending_fibers.empty?
# Now, run all Sources which have become pending _before_ resuming GraphQL execution.
# Sources might queue up other Sources, which is fine -- those will also run before resuming execution.
#
# This is where an evented approach would be even better -- can we tell which
# fibers are ready to continue, and continue execution there?
#
source_fiber_queue = if (first_source_fiber = create_source_fiber)
[first_source_fiber]
else
nil
end
if source_fiber_queue
while (outer_source_fiber = source_fiber_queue.shift)
resume(outer_source_fiber)
# If this source caused more sources to become pending, run those before running this one again:
next_source_fiber = create_source_fiber
if next_source_fiber
source_fiber_queue << next_source_fiber
end
if outer_source_fiber.alive?
source_fiber_queue << outer_source_fiber
end
end
end
# Move newly-enqueued Fibers on to the list to be resumed.
# Clear out the list of next-round Fibers, so that
# any Fibers that pause can be put on it.
pending_fibers.concat(next_fibers)
next_fibers.clear
end
end
if @pending_jobs.any?
raise "Invariant: #{@pending_jobs.size} pending jobs"
elsif pending_fibers.any?
raise "Invariant: #{pending_fibers.size} pending fibers"
elsif next_fibers.any?
raise "Invariant: #{next_fibers.size} next fibers"
end
nil
end
private
# If there are pending sources, return a fiber for running them.
# Otherwise, return `nil`.
#
# @return [Fiber, nil]
def create_source_fiber
pending_sources = nil
@source_cache.each_value do |source_by_batch_params|
source_by_batch_params.each_value do |source|
if source.pending?
pending_sources ||= []
pending_sources << source
end
end
end
if pending_sources
# By passing the whole array into this Fiber, it's possible that we set ourselves up for a bunch of no-ops.
# For example, if you have sources `[a, b, c]`, and `a` is loaded, then `b` yields to wait for `d`, then
# the next fiber would be dispatched with `[c, d]`. It would fulfill `c`, then `d`, then eventually
# the previous fiber would start up again. `c` would no longer be pending, but it would still receive `.run_pending_keys`.
# That method is short-circuited since it isn't pending any more, but it's still a waste.
#
# This design could probably be improved by maintaining a `@pending_sources` queue which is shared by the fibers,
# similar to `@pending_jobs`. That way, when a fiber is resumed, it would never pick up work that was finished by a different fiber.
source_fiber = spawn_fiber do
pending_sources.each(&:run_pending_keys)
end
end
source_fiber
end
def resume(fiber)
fiber.resume
rescue UncaughtThrowError => e
throw e.tag, e.value
end
# Copies the thread local vars into the fiber thread local vars. Many
# gems (such as RequestStore, MiniRacer, etc.) rely on thread local vars
# to keep track of execution context, and without this they do not
# behave as expected.
#
# @see https://github.com/rmosolgo/graphql-ruby/issues/3449
def spawn_fiber
fiber_locals = {}
Thread.current.keys.each do |fiber_var_key|
fiber_locals[fiber_var_key] = Thread.current[fiber_var_key]
end
Fiber.new do
fiber_locals.each { |k, v| Thread.current[k] = v }
yield
end
end
end
end