-
Notifications
You must be signed in to change notification settings - Fork 248
/
0094.yml
153 lines (151 loc) · 4.55 KB
/
0094.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
version: 94
description: fetch pending and claimed tasks for task queue
methods:
get_pending_tasks_by_task_queue_id:
description: |-
Get all tasks that are currently pending in a given task queue.
Records would be returned by insert time, or when tasks were scheduled.
To iterate over all pending tasks, `after_inserted_in`, `after_task_id_in`
parameters can be used.
Full task record is being returned plus `inserted` for pagination purposes
mode: read
serviceName: queue
args: task_queue_id_in text, page_size_in integer, after_inserted_in timestamptz, after_task_id_in text
returns: |-
table (
task_id text,
task_queue_id text,
scheduler_id text,
project_id text,
task_group_id text,
dependencies jsonb,
requires task_requires,
routes jsonb,
priority task_priority,
retries integer,
retries_left int,
created timestamptz,
deadline timestamptz,
expires timestamptz,
scopes jsonb,
payload jsonb,
metadata jsonb,
tags jsonb,
extra jsonb,
runs jsonb,
taken_until timestamptz,
run_id integer,
inserted timestamptz
)
body: |-
begin
return query
select
tasks.task_id,
tasks.task_queue_id,
tasks.scheduler_id,
coalesce(tasks.project_id, 'none') as project_id,
tasks.task_group_id,
tasks.dependencies,
tasks.requires,
tasks.routes,
tasks.priority,
tasks.retries,
tasks.retries_left,
tasks.created,
tasks.deadline,
tasks.expires,
tasks.scopes,
tasks.payload,
tasks.metadata,
tasks.tags,
tasks.extra,
tasks.runs,
tasks.taken_until,
-- one for the pagination
q.run_id,
q.inserted
from queue_pending_tasks q
left join tasks on tasks.task_id=q.task_id
where q.task_queue_id = task_queue_id_in
and tasks.task_id is not null
and (after_inserted_in is null or q.inserted > after_inserted_in)
-- timestamp alone might not be enough
-- since time part is truncated to 1000th of a second
and (after_task_id_in is null or q.task_id != after_task_id_in)
order by q.inserted asc
limit get_page_limit(page_size_in);
end
get_claimed_tasks_by_task_queue_id:
description: |-
Get all tasks that are currently claimed by workers in a given task queue.
mode: read
serviceName: queue
args: task_queue_id_in text, page_size_in integer, after_claimed_in timestamptz, after_task_id_in text
returns: |-
table (
task_id text,
task_queue_id text,
scheduler_id text,
project_id text,
task_group_id text,
dependencies jsonb,
requires task_requires,
routes jsonb,
priority task_priority,
retries integer,
retries_left int,
created timestamptz,
deadline timestamptz,
expires timestamptz,
scopes jsonb,
payload jsonb,
metadata jsonb,
tags jsonb,
extra jsonb,
runs jsonb,
taken_until timestamptz,
run_id integer,
worker_group text,
worker_id text,
claimed timestamptz
)
body: |-
begin
return query
select
tasks.task_id,
tasks.task_queue_id,
tasks.scheduler_id,
coalesce(tasks.project_id, 'none') as project_id,
tasks.task_group_id,
tasks.dependencies,
tasks.requires,
tasks.routes,
tasks.priority,
tasks.retries,
tasks.retries_left,
tasks.created,
tasks.deadline,
tasks.expires,
tasks.scopes,
tasks.payload,
tasks.metadata,
tasks.tags,
tasks.extra,
tasks.runs,
tasks.taken_until,
-- for pagination and results
q.run_id,
q.worker_group,
q.worker_id,
q.claimed
from queue_claimed_tasks q
left join tasks on tasks.task_id=q.task_id
where q.task_queue_id = task_queue_id_in
and tasks.task_id is not null
and (after_claimed_in is null or q.claimed > after_claimed_in)
and (after_task_id_in is null or q.task_id != after_task_id_in)
order by q.claimed asc
limit get_page_limit(page_size_in);
end