diff --git a/back/src/bsds/indexation/__tests__/bulkIndexBsds.integration.ts b/back/src/bsds/indexation/__tests__/bulkIndexBsds.integration.ts index 7dbe4ba7ea..9ff87dd394 100644 --- a/back/src/bsds/indexation/__tests__/bulkIndexBsds.integration.ts +++ b/back/src/bsds/indexation/__tests__/bulkIndexBsds.integration.ts @@ -34,13 +34,89 @@ describe("processDbIdentifiersByChunk", () => { describe("getBsdIdentifiers", () => { afterEach(resetDatabase); - it("should return all identifiers of a bsd type", async () => { - const user = await userFactory(); - - const form = await formFactory({ ownerId: user.id }); - const ids = await getBsdIdentifiers("bsdd"); - expect(ids).toEqual([form.id]); - }); + it.each([1, 2, 3, 4, 5, 100])( + "should return all BSDD's identifiers when paginating by %p", + async paginateBy => { + const user = await userFactory(); + + const form1 = await formFactory({ ownerId: user.id }); + const form2 = await formFactory({ ownerId: user.id }); + const form3 = await formFactory({ ownerId: user.id }); + const form4 = await formFactory({ ownerId: user.id }); + const form5 = await formFactory({ ownerId: user.id }); + + const ids = await getBsdIdentifiers("bsdd", { paginateBy }); + expect(ids).toEqual([form1.id, form2.id, form3.id, form4.id, form5.id]); + } + ); + + it.each([1, 2, 3, 4, 5, 100])( + "should return all BSDA's identifiers when paginating by %p", + async paginateBy => { + const bsda1 = await bsdaFactory({}); + const bsda2 = await bsdaFactory({}); + const bsda3 = await bsdaFactory({}); + const bsda4 = await bsdaFactory({}); + const bsda5 = await bsdaFactory({}); + + const ids = await getBsdIdentifiers("bsda", { paginateBy }); + expect(ids).toEqual([bsda1.id, bsda2.id, bsda3.id, bsda4.id, bsda5.id]); + } + ); + + it.each([1, 2, 3, 4, 5, 100])( + "should return all BSDASRI's identifiers when paginating by %p", + async paginateBy => { + const bsdasri1 = await bsdasriFactory({}); + const bsdasri2 = await bsdasriFactory({}); + const bsdasri3 = await bsdasriFactory({}); + const bsdasri4 = await bsdasriFactory({}); + const bsdasri5 = await bsdasriFactory({}); + + const ids = await getBsdIdentifiers("bsdasri", { paginateBy }); + expect(ids).toEqual([ + bsdasri1.id, + bsdasri2.id, + bsdasri3.id, + bsdasri4.id, + bsdasri5.id + ]); + } + ); + + it.each([1, 2, 3, 4, 5, 100])( + "should return all BSFF's identifiers when paginating by %p", + async paginateBy => { + const bsff1 = await createBsff(); + const bsff2 = await createBsff({}); + const bsff3 = await createBsff({}); + const bsff4 = await createBsff({}); + const bsff5 = await createBsff({}); + + const ids = await getBsdIdentifiers("bsff", { paginateBy }); + expect(ids).toEqual([bsff1.id, bsff2.id, bsff3.id, bsff4.id, bsff5.id]); + } + ); + + it.each([1, 2, 3, 4, 5, 100])( + "should return all BSVHU's identifiers when paginating by %p", + async paginateBy => { + const bsvhu1 = await bsvhuFactory({}); + const bsvhu2 = await bsvhuFactory({}); + const bsvhu3 = await bsvhuFactory({}); + const bsvhu4 = await bsvhuFactory({}); + const bsvhu5 = await bsvhuFactory({}); + + const ids = await getBsdIdentifiers("bsvhu", { paginateBy }); + expect(ids).toEqual([ + bsvhu1.id, + bsvhu2.id, + bsvhu3.id, + bsvhu4.id, + bsvhu5.id + ]); + } + ); it("should not return identifiers updated before since paramater", async () => { const user = await userFactory(); @@ -54,7 +130,9 @@ describe("getBsdIdentifiers", () => { ownerId: user.id, opt: { updatedAt: new Date("2023-02-01") } }); - const ids = await getBsdIdentifiers("bsdd", new Date("2023-02-01")); + const ids = await getBsdIdentifiers("bsdd", { + since: new Date("2023-02-01") + }); expect(ids).toEqual([form2.id]); }); }); diff --git a/back/src/bsds/indexation/bulkIndexBsds.ts b/back/src/bsds/indexation/bulkIndexBsds.ts index b835d530f2..2d0cca88b5 100644 --- a/back/src/bsds/indexation/bulkIndexBsds.ts +++ b/back/src/bsds/indexation/bulkIndexBsds.ts @@ -265,24 +265,63 @@ export async function isIndexMappingsVersionChanged( } } +type GetBsdIdentifiersOpt = { + since?: Date; + paginateBy?: number; +}; + /** * Retrieves all BSD identifiers for a given BSD type */ export async function getBsdIdentifiers( bsdName: string, - since?: Date + { since, paginateBy }: GetBsdIdentifiersOpt = {} ): Promise { const prismaModelDelegate = prismaModels[bsdName]; - const bsds = await prismaModelDelegate.findMany({ - where: { - isDeleted: false, - ...(since ? { updatedAt: { gte: since } } : {}) - }, - select: { id: true } - }); + const defaultPaginateBy = 500000; + const take = paginateBy ?? defaultPaginateBy; + + // Renvoie bordereaux après le bordereau + // identifié par son curseur + async function nextPage(after: string | null) { + const bsds = await prismaModelDelegate.findMany({ + take, + ...(after + ? { + // Cf https://www.prisma.io/docs/orm/prisma-client/queries/pagination#cursor-based-pagination + skip: 1, + cursor: { + rowNumber: after + } + } + : {}), + where: { + isDeleted: false, + ...(since ? { updatedAt: { gte: since } } : {}) + }, + select: { id: true, rowNumber: true }, + orderBy: { rowNumber: "asc" } + }); + + return bsds; + } + + // Récupère tous les identifiants en paginant la liste des bordereaux + // de manière récursive grâce à une pagination par curseur qui utilise `rowNumber` + async function paginate(after: string | null = null, ids: string[] = []) { + const bsds = await nextPage(after); + const length = bsds.length; + if (length === 0) { + return ids; + } else { + const bsdIds = bsds.map(bsd => bsd.id) as string[]; + const nextCursor = bsds[length - 1].rowNumber; + return paginate(nextCursor, [...ids, ...bsdIds]); + } + } - return bsds.map(bsd => bsd.id); + return paginate(); } export async function processDbIdentifiersByChunk( @@ -305,7 +344,7 @@ export async function indexAllBsdTypeSync({ since, indexConfig }: IndexAllFnSignature): Promise { - const ids = await getBsdIdentifiers(bsdName, since); + const ids = await getBsdIdentifiers(bsdName, { since }); logger.info(`Starting synchronous indexation of ${ids.length} ${bsdName}`); @@ -331,7 +370,7 @@ export async function indexAllBsdTypeConcurrentJobs({ }: IndexAllFnSignature) { const jobs: Job[] = []; const data: { name: string; data: string; opts?: JobOptions }[] = []; - const ids = await getBsdIdentifiers(bsdName, since); + const ids = await getBsdIdentifiers(bsdName, { since }); logger.info(`Starting indexation of ${ids.length} ${bsdName}`); // Prepare Job data payload to call indexQueue.addBulk diff --git a/back/src/queue/producers/elastic.ts b/back/src/queue/producers/elastic.ts index b598c36ab2..05a2536232 100644 --- a/back/src/queue/producers/elastic.ts +++ b/back/src/queue/producers/elastic.ts @@ -63,10 +63,23 @@ export const bulkIndexMasterQueue = new Queue( // 24h pour prendre de la marge, les jobs de cette queue attendent que toutes les chunks // soit process lors d'un reindex global timeout: 24 * 3600 * 1000 + }, + settings: { + // https://github.com/OptimalBits/bull?tab=readme-ov-file#important-notes + // https://github.com/OptimalBits/bull/issues/1591 + lockDuration: 2 * 60 * 1000, // 2 minutes - default 3000 (30s) + // prevent the job from being run twice if it is stalled + maxStalledCount: 0 // default is 1 } } ); +bulkIndexMasterQueue.on("stalled", opt => { + logger.warn( + `The job ${opt.id} in queue bulkIndexMasterQueue has been stalled` + ); +}); + indexQueue.on("completed", async job => { const id = job.data;