Skip to content

Commit

Permalink
Fuse the calls to searchForIssuesUsingJql when possible + memoizati…
Browse files Browse the repository at this point in the history
…on + cleanups (#1456)
  • Loading branch information
ypc-faros committed May 14, 2024
1 parent 45433b9 commit e2664af
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 103 deletions.
4 changes: 1 addition & 3 deletions faros-airbyte-cdk/src/sources/source-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,7 @@ export abstract class AirbyteSourceBase<
}

// Requested streams in the order they should be processed
const sortedStreams = toposort
.array(configuredStreamNames, streamDeps)
.reverse();
const sortedStreams = toposort.array(configuredStreamNames, streamDeps);

const failedStreams = [];
for (const streamName of sortedStreams) {
Expand Down
5 changes: 4 additions & 1 deletion sources/jira-source/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ export class JiraSource extends AirbyteSourceBase<JiraConfig> {
WebhookSupplementStreamNames.includes(stream.stream.name)
);
}
return {config, catalog: {streams}, state};
const requestedStreams = new Set(
streams.map((stream) => stream.stream.name)
);
return {config: {...config, requestedStreams}, catalog: {streams}, state};
}
}
176 changes: 122 additions & 54 deletions sources/jira-source/src/jira.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {AirbyteConfig, AirbyteLogger} from 'faros-airbyte-cdk';
import {bucket} from 'faros-airbyte-common/common';
import {
Issue,
IssueCompact,
IssueField,
PullRequest,
Repo,
Expand All @@ -19,6 +20,7 @@ import https from 'https';
import jira, {AgileModels, Version2Models} from 'jira.js';
import {chunk, concat, isNil, pick, toInteger, toLower, toString} from 'lodash';
import {isEmpty} from 'lodash';
import moment from 'moment';
import pLimit from 'p-limit';
import path from 'path';
import {Memoize} from 'typescript-memoize';
Expand Down Expand Up @@ -52,6 +54,9 @@ export interface JiraConfig extends AirbyteConfig {
readonly api_url?: string;
readonly api_key?: string;
readonly graph?: string;
readonly requestedStreams?: Set<string>;
start_date?: Date;
end_date?: Date;
}

export const toFloat = (value: any): number | undefined =>
Expand Down Expand Up @@ -122,6 +127,10 @@ export class Jira {
private readonly fieldIdsByName: Map<string, string[]>;
// Counts the number of failed calls to fetch sprint history
private sprintHistoryFetchFailures = 0;
private readonly seenIssues: Map<string, IssueCompact[]> = new Map<
string,
IssueCompact[]
>();

constructor(
// Pass base url to enable creating issue url that can navigated in browser
Expand All @@ -138,7 +147,8 @@ export class Jira {
private readonly bucketId: number,
private readonly bucketTotal: number,
private readonly logger: AirbyteLogger,
private readonly useUsersPrefixSearch?: boolean
private readonly useUsersPrefixSearch?: boolean,
private readonly requestedStreams?: Set<string>
) {
// Create inverse mapping from field name -> ids
// Field can have multiple ids with the same name
Expand Down Expand Up @@ -230,6 +240,12 @@ export class Jira {
}
}

cfg.end_date = moment().utc().toDate();
cfg.start_date = moment()
.utc()
.subtract(cfg.cutoff_days || DEFAULT_CUTOFF_DAYS, 'days')
.toDate();

Jira.jira = new Jira(
cfg.url,
api,
Expand All @@ -244,7 +260,8 @@ export class Jira {
cfg.bucket_id ?? DEFAULT_BUCKET_ID,
cfg.bucket_total ?? DEFAULT_BUCKET_TOTAL,
logger,
cfg.use_users_prefix_search ?? DEFAULT_USE_USERS_PREFIX_SEARCH
cfg.use_users_prefix_search ?? DEFAULT_USE_USERS_PREFIX_SEARCH,
cfg.requestedStreams
);
return Jira.jira;
}
Expand Down Expand Up @@ -633,18 +650,24 @@ export class Jira {
return statusByName;
}

@Memoize()
getIssues(
jql: string,
fetchKeysOnly = false,
includeAdditionalFields = true,
additionalFields?: string[]
): AsyncIterableIterator<Issue> {
const {fieldIds, additionalFieldIds} = this.getIssueFields(
fetchKeysOnly,
includeAdditionalFields,
additionalFields
getIssuesKeys(jql: string): AsyncIterableIterator<string> {
return this.iterate(
(startAt) =>
this.api.v2.issueSearch.searchForIssuesUsingJql({
jql,
startAt,
fields: ['key'],
maxResults: this.maxPageSize,
}),
async (item: any) => {
return item.key;
},
'issues'
);
}

getIssues(jql: string): AsyncIterableIterator<Issue> {
const {fieldIds, additionalFieldIds} = this.getIssueFields();
const issueTransformer = new IssueTransformer(
this.baseURL,
this.fieldNameById,
Expand All @@ -653,24 +676,71 @@ export class Jira {
additionalFieldIds,
this.additionalFieldsArrayLimit
);

return this.iterate(
(startAt) =>
this.api.v2.issueSearch.searchForIssuesUsingJql({
jql,
startAt,
fields: [...fieldIds, ...additionalFieldIds],
expand: fetchKeysOnly ? undefined : 'changelog',
expand: 'changelog',
maxResults: this.maxPageSize,
}),
async (item: any) => {
this.memoizeIssue(item, jql);

return issueTransformer.toIssue(item);
},
'issues'
);
}

private memoizeIssue(item: any, jql: string) {
const issue: IssueCompact = {
id: item.id,
key: item.key,
created: Utils.toDate(item.fields.created),
updated: Utils.toDate(item.fields.updated),
fields: item.fields,
};

if (!this.seenIssues.has(jql)) {
this.seenIssues.set(jql, []);
}
this.seenIssues.get(jql).push(issue);
}

async *getIssuesCompact(jql: string): AsyncIterableIterator<IssueCompact> {
const {fieldIds, additionalFieldIds} = this.getIssueFields();
if (this.seenIssues.has(jql)) {
this.logger?.debug(`Using cached issues for JQL: ${jql}`);
for (const issue of this.seenIssues.get(jql)) {
yield issue;
}
return;
}

yield* this.iterate(
(startAt) =>
this.api.v2.issueSearch.searchForIssuesUsingJql({
jql,
startAt,
fields: [...fieldIds, ...additionalFieldIds],
maxResults: this.maxPageSize,
}),
async (item: any) => ({
id: item.id,
key: item.key,
created: Utils.toDate(item.fields.created),
updated: Utils.toDate(item.fields.updated),
fields: item.fields,
}),
'issues'
);
}

async getIssuePullRequests(
issue: Issue
issue: IssueCompact
): Promise<ReadonlyArray<PullRequest>> {
let pullRequests: ReadonlyArray<PullRequest> = [];
const devFieldIds = this.fieldIdsByName.get(DEV_FIELD_NAME) ?? [];
Expand All @@ -694,47 +764,45 @@ export class Jira {
return pullRequests;
}

private getIssueFields(
fetchKeysOnly: boolean,
includeAdditionalFields: boolean,
additionalFields?: string[]
): {fieldIds: string[]; additionalFieldIds: string[]} {
const fieldIds = fetchKeysOnly
? ['id', 'key', 'created', 'updated']
: [
'assignee',
'created',
'creator',
'description',
'issuelinks',
'issuetype',
'labels',
'parent',
'priority',
'project',
'resolution',
'resolutiondate',
'status',
'subtasks',
'summary',
'updated',
];
if (includeAdditionalFields) {
const additionalFieldIds: string[] = [];
for (const fieldId of this.fieldNameById.keys()) {
// Skip fields that are already included in the fields above,
// or that are not in the additional fields list if provided
if (
!fieldIds.includes(fieldId) &&
(!additionalFields ||
additionalFields.includes(this.fieldNameById.get(fieldId)))
) {
additionalFieldIds.push(fieldId);
}
private getIssueFields(): {fieldIds: string[]; additionalFieldIds: string[]} {
const fieldIds = new Set<string>(['id', 'key', 'created', 'updated']);

if (this.requestedStreams?.has('faros_issue_pull_requests')) {
fieldIds.add(DEV_FIELD_NAME);
}

if (this.requestedFarosIssuesStream()) {
[
'assignee',
'created',
'creator',
'description',
'issuelinks',
'issuetype',
'labels',
'parent',
'priority',
'project',
'resolution',
'resolutiondate',
'status',
'subtasks',
'summary',
'updated',
].forEach((field) => fieldIds.add(field));
}
const additionalFieldIds: string[] = [];
for (const fieldId of this.fieldNameById.keys()) {
// Skip fields that are already included in the fields above
if (!fieldIds.has(fieldId)) {
additionalFieldIds.push(fieldId);
}
return {fieldIds, additionalFieldIds};
}
return {fieldIds, additionalFieldIds: []};
return {fieldIds: Array.from(fieldIds), additionalFieldIds};
}

private requestedFarosIssuesStream() {
return this.requestedStreams?.has('faros_issues');
}

async *getBoards(
Expand Down
19 changes: 5 additions & 14 deletions sources/jira-source/src/streams/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@ import {AirbyteLogger, AirbyteStreamBase} from 'faros-airbyte-cdk';
import {FarosClient, Utils} from 'faros-js-client';
import moment from 'moment';

import {
DEFAULT_CUTOFF_DAYS,
DEFAULT_CUTOFF_LAG_DAYS,
JiraConfig,
} from '../jira';
import {DEFAULT_CUTOFF_LAG_DAYS, JiraConfig} from '../jira';
import {ProjectBoardFilter} from '../project-board-filter';

export type ProjectStreamSlice = {
Expand Down Expand Up @@ -48,15 +44,10 @@ export abstract class StreamBase extends AirbyteStreamBase {
}

protected getUpdateRange(cutoff?: number): [Date, Date] {
const newCutoff = moment().utc().toDate();
// If no state with cutoff, use the default one applying cutoffDays
const fromCutoff = cutoff
? Utils.toDate(cutoff)
: moment()
.utc()
.subtract(this.config.cutoff_days || DEFAULT_CUTOFF_DAYS, 'days')
.toDate();
return [fromCutoff, newCutoff];
return [
cutoff ? Utils.toDate(cutoff) : this.config.start_date,
this.config.end_date,
];
}

protected getUpdatedStreamState(
Expand Down
8 changes: 3 additions & 5 deletions sources/jira-source/src/streams/faros_board_issues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ export class FarosBoardIssues extends StreamWithBoardSlices {
// but, in practice, it is actually present
const projectKey = boardConfig.location['key'];
try {
for await (const issue of jira.getIssues(
new JqlBuilder(boardJql).withProject(projectKey).build(),
true,
false
for await (const issue of jira.getIssuesKeys(
new JqlBuilder(boardJql).withProject(projectKey).build()
)) {
yield {
key: issue.key,
key: issue,
boardId,
};
}
Expand Down
14 changes: 7 additions & 7 deletions sources/jira-source/src/streams/faros_issue_pull_requests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {PullRequest} from 'faros-airbyte-common/jira';
import {Utils} from 'faros-js-client';
import {Dictionary} from 'ts-essentials';

import {DEV_FIELD_NAME, Jira} from '../jira';
import {Jira} from '../jira';
import {JqlBuilder} from '../jql-builder';
import {
ProjectStreamSlice,
Expand All @@ -12,6 +12,10 @@ import {
} from './common';

export class FarosIssuePullRequests extends StreamWithProjectSlices {
get dependencies(): ReadonlyArray<string> {
return ['faros_issues'];
}

getJsonSchema(): Dictionary<any, string> {
return require('../../resources/schemas/farosIssuePullRequests.json');
}
Expand All @@ -36,15 +40,11 @@ export class FarosIssuePullRequests extends StreamWithProjectSlices {
syncMode === SyncMode.INCREMENTAL
? this.getUpdateRange(streamState?.[projectKey]?.cutoff)
: this.getUpdateRange();

for await (const issue of jira.getIssues(
for await (const issue of jira.getIssuesCompact(
new JqlBuilder()
.withProject(projectKey)
.withDateRange(updateRange)
.build(),
true,
true,
[DEV_FIELD_NAME]
.build()
)) {
for (const pullRequest of (await jira.getIssuePullRequests(issue)) ||
[]) {
Expand Down

0 comments on commit e2664af

Please sign in to comment.