Skip to content

Commit

Permalink
Merge pull request #3246 from MTES-MCT/bull-lock-duration
Browse files Browse the repository at this point in the history
Increase lock duration and paginate in `getBsdsIdentifiers`
  • Loading branch information
benoitguigal committed Apr 11, 2024
2 parents c98f056 + ca680f3 commit d57a9ff
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 19 deletions.
94 changes: 86 additions & 8 deletions back/src/bsds/indexation/__tests__/bulkIndexBsds.integration.ts
Expand Up @@ -34,13 +34,89 @@ describe("processDbIdentifiersByChunk", () => {
describe("getBsdIdentifiers", () => {
afterEach(resetDatabase);

it("should return all identifiers of a bsd type", async () => {
const user = await userFactory();

const form = await formFactory({ ownerId: user.id });
const ids = await getBsdIdentifiers("bsdd");
expect(ids).toEqual([form.id]);
});
it.each([1, 2, 3, 4, 5, 100])(
"should return all BSDD's identifiers when paginating by %p",
async paginateBy => {
const user = await userFactory();

const form1 = await formFactory({ ownerId: user.id });
const form2 = await formFactory({ ownerId: user.id });
const form3 = await formFactory({ ownerId: user.id });
const form4 = await formFactory({ ownerId: user.id });
const form5 = await formFactory({ ownerId: user.id });

const ids = await getBsdIdentifiers("bsdd", { paginateBy });
expect(ids).toEqual([form1.id, form2.id, form3.id, form4.id, form5.id]);
}
);

it.each([1, 2, 3, 4, 5, 100])(
"should return all BSDA's identifiers when paginating by %p",
async paginateBy => {
const bsda1 = await bsdaFactory({});
const bsda2 = await bsdaFactory({});
const bsda3 = await bsdaFactory({});
const bsda4 = await bsdaFactory({});
const bsda5 = await bsdaFactory({});

const ids = await getBsdIdentifiers("bsda", { paginateBy });
expect(ids).toEqual([bsda1.id, bsda2.id, bsda3.id, bsda4.id, bsda5.id]);
}
);

it.each([1, 2, 3, 4, 5, 100])(
"should return all BSDASRI's identifiers when paginating by %p",
async paginateBy => {
const bsdasri1 = await bsdasriFactory({});
const bsdasri2 = await bsdasriFactory({});
const bsdasri3 = await bsdasriFactory({});
const bsdasri4 = await bsdasriFactory({});
const bsdasri5 = await bsdasriFactory({});

const ids = await getBsdIdentifiers("bsdasri", { paginateBy });
expect(ids).toEqual([
bsdasri1.id,
bsdasri2.id,
bsdasri3.id,
bsdasri4.id,
bsdasri5.id
]);
}
);

it.each([1, 2, 3, 4, 5, 100])(
"should return all BSFF's identifiers when paginating by %p",
async paginateBy => {
const bsff1 = await createBsff();
const bsff2 = await createBsff({});
const bsff3 = await createBsff({});
const bsff4 = await createBsff({});
const bsff5 = await createBsff({});

const ids = await getBsdIdentifiers("bsff", { paginateBy });
expect(ids).toEqual([bsff1.id, bsff2.id, bsff3.id, bsff4.id, bsff5.id]);
}
);

it.each([1, 2, 3, 4, 5, 100])(
"should return all BSVHU's identifiers when paginating by %p",
async paginateBy => {
const bsvhu1 = await bsvhuFactory({});
const bsvhu2 = await bsvhuFactory({});
const bsvhu3 = await bsvhuFactory({});
const bsvhu4 = await bsvhuFactory({});
const bsvhu5 = await bsvhuFactory({});

const ids = await getBsdIdentifiers("bsvhu", { paginateBy });
expect(ids).toEqual([
bsvhu1.id,
bsvhu2.id,
bsvhu3.id,
bsvhu4.id,
bsvhu5.id
]);
}
);

it("should not return identifiers updated before since paramater", async () => {
const user = await userFactory();
Expand All @@ -54,7 +130,9 @@ describe("getBsdIdentifiers", () => {
ownerId: user.id,
opt: { updatedAt: new Date("2023-02-01") }
});
const ids = await getBsdIdentifiers("bsdd", new Date("2023-02-01"));
const ids = await getBsdIdentifiers("bsdd", {
since: new Date("2023-02-01")
});
expect(ids).toEqual([form2.id]);
});
});
Expand Down
61 changes: 50 additions & 11 deletions back/src/bsds/indexation/bulkIndexBsds.ts
Expand Up @@ -265,24 +265,63 @@ export async function isIndexMappingsVersionChanged(
}
}

type GetBsdIdentifiersOpt = {
since?: Date;
paginateBy?: number;
};

/**
* Retrieves all BSD identifiers for a given BSD type
*/
export async function getBsdIdentifiers(
bsdName: string,
since?: Date
{ since, paginateBy }: GetBsdIdentifiersOpt = {}
): Promise<string[]> {
const prismaModelDelegate = prismaModels[bsdName];

const bsds = await prismaModelDelegate.findMany({
where: {
isDeleted: false,
...(since ? { updatedAt: { gte: since } } : {})
},
select: { id: true }
});
const defaultPaginateBy = 500000;
const take = paginateBy ?? defaultPaginateBy;

// Renvoie <take> bordereaux après le bordereau
// identifié par son curseur
async function nextPage(after: string | null) {
const bsds = await prismaModelDelegate.findMany({
take,
...(after
? {
// Cf https://www.prisma.io/docs/orm/prisma-client/queries/pagination#cursor-based-pagination
skip: 1,
cursor: {
rowNumber: after
}
}
: {}),
where: {
isDeleted: false,
...(since ? { updatedAt: { gte: since } } : {})
},
select: { id: true, rowNumber: true },
orderBy: { rowNumber: "asc" }
});

return bsds;
}

// Récupère tous les identifiants en paginant la liste des bordereaux
// de manière récursive grâce à une pagination par curseur qui utilise `rowNumber`
async function paginate(after: string | null = null, ids: string[] = []) {
const bsds = await nextPage(after);
const length = bsds.length;
if (length === 0) {
return ids;
} else {
const bsdIds = bsds.map(bsd => bsd.id) as string[];
const nextCursor = bsds[length - 1].rowNumber;
return paginate(nextCursor, [...ids, ...bsdIds]);
}
}

return bsds.map(bsd => bsd.id);
return paginate();
}

export async function processDbIdentifiersByChunk(
Expand All @@ -305,7 +344,7 @@ export async function indexAllBsdTypeSync({
since,
indexConfig
}: IndexAllFnSignature): Promise<void> {
const ids = await getBsdIdentifiers(bsdName, since);
const ids = await getBsdIdentifiers(bsdName, { since });

logger.info(`Starting synchronous indexation of ${ids.length} ${bsdName}`);

Expand All @@ -331,7 +370,7 @@ export async function indexAllBsdTypeConcurrentJobs({
}: IndexAllFnSignature) {
const jobs: Job<string>[] = [];
const data: { name: string; data: string; opts?: JobOptions }[] = [];
const ids = await getBsdIdentifiers(bsdName, since);
const ids = await getBsdIdentifiers(bsdName, { since });
logger.info(`Starting indexation of ${ids.length} ${bsdName}`);

// Prepare Job data payload to call indexQueue.addBulk
Expand Down
13 changes: 13 additions & 0 deletions back/src/queue/producers/elastic.ts
Expand Up @@ -63,10 +63,23 @@ export const bulkIndexMasterQueue = new Queue<string>(
// 24h pour prendre de la marge, les jobs de cette queue attendent que toutes les chunks
// soit process lors d'un reindex global
timeout: 24 * 3600 * 1000
},
settings: {
// https://github.com/OptimalBits/bull?tab=readme-ov-file#important-notes
// https://github.com/OptimalBits/bull/issues/1591
lockDuration: 2 * 60 * 1000, // 2 minutes - default 3000 (30s)
// prevent the job from being run twice if it is stalled
maxStalledCount: 0 // default is 1
}
}
);

bulkIndexMasterQueue.on("stalled", opt => {
logger.warn(
`The job ${opt.id} in queue bulkIndexMasterQueue has been stalled`
);
});

indexQueue.on("completed", async job => {
const id = job.data;

Expand Down

0 comments on commit d57a9ff

Please sign in to comment.