-
Notifications
You must be signed in to change notification settings - Fork 19
/
elastic.ts
154 lines (135 loc) · 4.71 KB
/
elastic.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import Queue, { JobOptions } from "bull";
import { logger } from "@td/logger";
import { scheduleWebhook } from "./webhooks";
import {
INDEX_JOB_NAME,
INDEX_CREATED_JOB_NAME,
INDEX_UPDATED_JOB_NAME,
DELETE_JOB_NAME
} from "./jobNames";
import { updatesQueue } from "./bsdUpdate";
import { operationHooksQueue } from "./operationHook";
import { updateFavorites } from "../../companies/database";
const { REDIS_URL, NODE_ENV } = process.env;
export const INDEX_QUEUE_NAME = `queue_index_elastic_${NODE_ENV}`;
export const indexQueue = new Queue<string>(INDEX_QUEUE_NAME, REDIS_URL!, {
defaultJobOptions: {
attempts: 3,
backoff: { type: "fixed", delay: 100 },
removeOnComplete: 10_000,
timeout: 10000
}
});
// On gère l'indexation en bulk dans une queue et un worker séparée afin de
// ne pas bloquer les jobs d'indexation "courants" lors d'un réindex
// global
export const BULK_INDEX_QUEUE_NAME = `queue_bulk_index_elastic_${NODE_ENV}`;
export const bulkIndexQueue = new Queue<string>(
BULK_INDEX_QUEUE_NAME,
REDIS_URL!,
{
defaultJobOptions: {
// un seul essai pour chaque chunk histoire de ne pas bloquer les autres trop longtemps
// s'il y a un problème avec une chunk, on garde la possibilité de retry à la mano depuis
// le dashboard bull
attempts: 1,
backoff: { type: "fixed", delay: 100 },
stackTraceLimit: 100,
removeOnComplete: 10_000,
// 10 minutes
timeout: 10 * 60 * 1000
}
}
);
// Cette queue permet de process le job qui enqueue toutes les chunks et attend qu'elles soient
// toutes terminées pour lancer un rattrapage (Cf fonction indexAllBsdTypeConcurrentJobs).
// Elle est également démarré sur un worker séparé pour l'isoler du reste et éviter que le job soit
// relancé en cas de crash d'un worker de bulk indexation
export const BULK_INDEX_MASTER_QUEUE_NAME = `queue_bulk_index_master_elastic_${NODE_ENV}`;
export const bulkIndexMasterQueue = new Queue<string>(
BULK_INDEX_MASTER_QUEUE_NAME,
REDIS_URL!,
{
defaultJobOptions: {
attempts: 1,
backoff: { type: "fixed", delay: 100 },
removeOnComplete: 10_000,
// 24h pour prendre de la marge, les jobs de cette queue attendent que toutes les chunks
// soit process lors d'un reindex global
timeout: 24 * 3600 * 1000
},
settings: {
// https://github.com/OptimalBits/bull?tab=readme-ov-file#important-notes
// https://github.com/OptimalBits/bull/issues/1591
lockDuration: 2 * 60 * 1000, // 2 minutes - default 3000 (30s)
// prevent the job from being run twice if it is stalled
maxStalledCount: 0 // default is 1
}
}
);
indexQueue.on("completed", async job => {
const id = job.data;
const { sirets, siretsBeforeUpdate, status } = job.returnvalue;
if (
[
DELETE_JOB_NAME,
INDEX_JOB_NAME,
INDEX_CREATED_JOB_NAME,
INDEX_UPDATED_JOB_NAME
].includes(job.name)
) {
// aggregate and deduplicate sirets to notify relevant recipients
const orgIdsToNotify = Array.from(
new Set([...sirets, ...(siretsBeforeUpdate ?? [])])
);
updatesQueue.add({ sirets: orgIdsToNotify, id, jobName: job.name });
scheduleWebhook(id, orgIdsToNotify, job.name);
// exclude favorites indexation for other statuses
if (!["SENT", "RESENT"].includes(status)) {
return;
}
// après qu'un BSD soit mis à jour et indexé dans l'index `bsds`
// on doit mettre à jour le cache des `favorites` pour chaque orgId
// présent dansd ce BSD afin qu'en tant qu'éditeur dans le futur on lui
// propose les favoris pré-calculés.
await updateFavorites(orgIdsToNotify);
}
});
indexQueue.on("failed", (job, err) => {
const id = job.data;
logger.error(`Indexation job failed for bsd "${id}"`, { id, err });
});
type JobName = typeof INDEX_CREATED_JOB_NAME | typeof INDEX_UPDATED_JOB_NAME;
async function enqueueBsdToIndex(
bsdId: string,
jobName: JobName,
options?: JobOptions
): Promise<void> {
logger.info(`Enqueuing BSD ${bsdId} for indexation`);
await indexQueue.add(jobName, bsdId, options);
}
export async function enqueueCreatedBsdToIndex(
bsdId: string,
options?: JobOptions
): Promise<void> {
await enqueueBsdToIndex(bsdId, INDEX_CREATED_JOB_NAME, options);
}
export async function enqueueUpdatedBsdToIndex(
bsdId: string,
options?: JobOptions
): Promise<void> {
await enqueueBsdToIndex(bsdId, INDEX_UPDATED_JOB_NAME, options);
}
export async function enqueueBsdToDelete(
bsdId: string,
options?: JobOptions
): Promise<void> {
await indexQueue.add(DELETE_JOB_NAME, bsdId, options);
}
export function closeIndexAndUpdatesQueue() {
return Promise.all([
indexQueue.close(),
updatesQueue.close(),
operationHooksQueue.close()
]);
}