-
Notifications
You must be signed in to change notification settings - Fork 248
/
0095.yml
177 lines (167 loc) · 5.59 KB
/
0095.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
version: 95
description: Removing queue migration compat columns
migrationScript: |-
begin
ALTER TABLE queue_pending_tasks
DROP COLUMN queue_name_compat,
DROP COLUMN message_id_compat;
ALTER TABLE queue_task_deadlines DROP COLUMN message_id_compat;
ALTER TABLE queue_resolved_tasks DROP COLUMN message_id_compat;
ALTER TABLE queue_claimed_tasks DROP COLUMN message_id_compat;
end
downgradeScript: |-
begin
ALTER TABLE queue_pending_tasks ADD COLUMN queue_name_compat text, ADD COLUMN message_id_compat uuid;
UPDATE queue_pending_tasks SET queue_name_compat = task_queue_id, message_id_compat = public.gen_random_uuid();
ALTER TABLE queue_task_deadlines ADD COLUMN message_id_compat uuid;
UPDATE queue_task_deadlines SET message_id_compat = public.gen_random_uuid();
ALTER TABLE queue_resolved_tasks ADD COLUMN message_id_compat uuid;
UPDATE queue_resolved_tasks SET message_id_compat = public.gen_random_uuid();
ALTER TABLE queue_claimed_tasks ADD COLUMN message_id_compat uuid;
UPDATE queue_claimed_tasks SET message_id_compat = public.gen_random_uuid();
end
methods:
# azure methods can now be finally deprecated
azure_queue_get:
deprecated: true
azure_queue_count:
deprecated: true
azure_queue_delete:
deprecated: true
azure_queue_update:
deprecated: true
azure_queue_put_extra:
deprecated: true
azure_queue_delete_expired:
deprecated: true
# patch methods using compat columns
queue_pending_tasks_put:
deprecated: true
# number of arguments changed, so the name changes too
queue_pending_tasks_add:
description: |
Put the task into the pending queue.
When record already exists, we update the priority, run_id, hint_id and expiration.
This also sends a notification to the `task_pending` channel with the `task_queue_id` as its payload.
mode: write
serviceName: queue
args: task_queue_id_in text, priority_in integer, task_id_in text, run_id_in integer, hint_id_in text, expires_in timestamp
returns: void
body: |-
begin
INSERT INTO queue_pending_tasks
(task_queue_id, priority, task_id, run_id, hint_id, inserted, expires, visible)
VALUES (
task_queue_id_in,
priority_in,
task_id_in,
run_id_in,
hint_id_in,
now(),
expires_in,
now()
)
ON CONFLICT (task_id, run_id) DO UPDATE
SET
expires = greatest(coalesce(expires_in, queue_pending_tasks.expires), queue_pending_tasks.expires),
priority = priority_in,
hint_id = hint_id_in
WHERE
queue_pending_tasks.task_queue_id = task_queue_id_in
AND queue_pending_tasks.task_id = task_id_in
AND queue_pending_tasks.run_id = run_id_in
AND queue_pending_tasks.pop_receipt is null;
-- notify listeners that there is a new task in the queue
EXECUTE 'NOTIFY task_pending, ' || quote_literal(task_queue_id_in) || ';';
end
# rewriting existing function from 91 version
queue_task_deadline_put:
description: |
Track task deadline upon task creation. This would stay until task
deadline to see if it was ever scheduled or resolved.
mode: write
serviceName: queue
args: task_group_id_in text, task_id_in text, scheduler_id_in text, deadline_in timestamptz, visible timestamptz
returns: void
body: |-
begin
insert into queue_task_deadlines (
task_group_id,
task_id,
scheduler_id,
created,
deadline,
visible
)
values (
task_group_id_in,
task_id_in,
scheduler_id_in,
now(),
deadline_in,
visible
);
end
# same as 91 but without _compat field
queue_claimed_task_put:
description: |
Track when task was claimed and when it should be reclaimed.
It is possible to have multiple records for a given taskId+runId combination.
mode: write
serviceName: queue
args: task_id_in text, run_id_in integer, taken_until_in timestamptz, task_queue_id_in text, worker_group_in text, worker_id_in text
returns: void
body: |-
begin
INSERT INTO queue_claimed_tasks (
task_id,
run_id,
task_queue_id,
worker_group,
worker_id,
claimed,
taken_until,
visible
)
VALUES (
task_id_in,
run_id_in,
task_queue_id_in,
worker_group_in,
worker_id_in,
now(),
taken_until_in,
taken_until_in -- visible initially same as taken_until
);
end
# same as 91 but without _compat field
queue_resolved_task_put:
description: |
Track when task was resolved.
This is a short-lived record that is used by dependency resolver to update dependencies.
Notification is sent to `task_resolved` channel with the `task_id` as its payload.
mode: write
serviceName: queue
args: task_group_id_in text, task_id_in text, scheduler_id_in text, resolution_in text
returns: void
body: |-
begin
insert into queue_resolved_tasks (
task_group_id,
task_id,
scheduler_id,
resolution,
resolved,
visible
)
values (
task_group_id_in,
task_id_in,
scheduler_id_in,
resolution_in,
now(),
now()
);
-- notify listeners that task was resolved
EXECUTE 'NOTIFY task_resolved';
end