Skip to content

Commit

Permalink
usage reporting: fix memory leak
Browse files Browse the repository at this point in the history
A data structure in the usage reporting plugin would get a new entry
each time the server's schema changed. Old entries would never be
deleted, though their values would be shrunk to a minimum size.

It seems that this was not good enough and is producing notable memory
leaks for some folks. So let's fix it!

The reason for the old behavior was to be really careful to never write
to a report that may have already been taken out of the map and sent.
Previously this was accomplished by having the main entry in the map
never change. Now this is accomplished by making sure that we never
write to the report any later than immediately (and synchronously) after
we pull it out of the map.

Fixes #6983.
  • Loading branch information
glasser committed Oct 7, 2022
1 parent f6c2991 commit af3c9bb
Showing 1 changed file with 56 additions and 45 deletions.
101 changes: 56 additions & 45 deletions packages/server/src/plugin/usageReporting/plugin.ts
Expand Up @@ -46,22 +46,6 @@ const reportHeaderDefaults = {
uname: `${os.platform()}, ${os.type()}, ${os.release()}, ${os.arch()})`,
};

class ReportData {
report!: OurReport;
readonly header: ReportHeader;
constructor(executableSchemaId: string, graphRef: string) {
this.header = new ReportHeader({
...reportHeaderDefaults,
executableSchemaId,
graphRef,
});
this.reset();
}
reset() {
this.report = new OurReport(this.header);
}
}

export function ApolloServerPluginUsageReporting<TContext extends BaseContext>(
options: ApolloServerPluginUsageReportingOptions<TContext> = Object.create(
null,
Expand Down Expand Up @@ -143,9 +127,45 @@ export function ApolloServerPluginUsageReporting<TContext extends BaseContext>(
cache: LRUCache<string, OperationDerivedData>;
} | null = null;

const reportDataByExecutableSchemaId: {
[executableSchemaId: string]: ReportData | undefined;
} = Object.create(null);
// This map maps from executable schema ID (schema hash, basically) to the
// report we'll send about it. That's because when we're using a gateway,
// the schema can change over time, but each report needs to be about a
// single schema. We avoid having this function be a memory leak by
// removing values from it when we're in the process of sending reports.
// That means we have to be very careful never to pull a Report out of it
// and hang on to it for a while before writing to it, because the report
// might have gotten sent and discarded in the meantime. So you should
// only access the values of this Map via
// getReportWhichMustBeUsedImmediately and getAndDeleteReport, and never
// hang on to the value returned by getReportWhichMustBeUsedImmediately.
const reportByExecutableSchemaId = new Map<string, OurReport>();
const getReportWhichMustBeUsedImmediately = (
executableSchemaId: string,
): OurReport => {
const existing = reportByExecutableSchemaId.get(executableSchemaId);
if (existing) {
return existing;
}
const report = new OurReport(
new ReportHeader({
...reportHeaderDefaults,
executableSchemaId,
graphRef,
}),
);
reportByExecutableSchemaId.set(executableSchemaId, report);
return report;
};
const getAndDeleteReport = (
executableSchemaId: string,
): OurReport | null => {
const report = reportByExecutableSchemaId.get(executableSchemaId);
if (report) {
reportByExecutableSchemaId.delete(executableSchemaId);
return report;
}
return null;
};

const overriddenExecutableSchemaId = options.overrideReportedSchema
? computeCoreSchemaHash(options.overrideReportedSchema)
Expand Down Expand Up @@ -192,21 +212,10 @@ export function ApolloServerPluginUsageReporting<TContext extends BaseContext>(
return id;
}

const getReportData = (executableSchemaId: string): ReportData => {
const existing = reportDataByExecutableSchemaId[executableSchemaId];
if (existing) {
return existing;
}
const reportData = new ReportData(executableSchemaId, graphRef);
reportDataByExecutableSchemaId[executableSchemaId] = reportData;
return reportData;
};

async function sendAllReportsAndReportErrors(): Promise<void> {
await Promise.all(
Object.keys(reportDataByExecutableSchemaId).map(
(executableSchemaId) =>
sendReportAndReportErrors(executableSchemaId),
[...reportByExecutableSchemaId.keys()].map((executableSchemaId) =>
sendReportAndReportErrors(executableSchemaId),
),
);
}
Expand All @@ -228,13 +237,11 @@ export function ApolloServerPluginUsageReporting<TContext extends BaseContext>(

// Needs to be an arrow function to be confident that key is defined.
const sendReport = async (executableSchemaId: string): Promise<void> => {
const reportData = getReportData(executableSchemaId);
const { report } = reportData;
reportData.reset();

const report = getAndDeleteReport(executableSchemaId);
if (
Object.keys(report.tracesPerQuery).length === 0 &&
report.operationCount === 0
!report ||
(Object.keys(report.tracesPerQuery).length === 0 &&
report.operationCount === 0)
) {
return;
}
Expand Down Expand Up @@ -577,10 +584,12 @@ export function ApolloServerPluginUsageReporting<TContext extends BaseContext>(
treeBuilder.stopTiming();
const executableSchemaId =
overriddenExecutableSchemaId ?? executableSchemaIdForSchema(schema);
const reportData = getReportData(executableSchemaId);

if (includeOperationInUsageReporting === false) {
if (resolvedOperation) reportData.report.operationCount++;
if (resolvedOperation) {
getReportWhichMustBeUsedImmediately(executableSchemaId)
.operationCount++;
}
return;
}

Expand Down Expand Up @@ -634,8 +643,6 @@ export function ApolloServerPluginUsageReporting<TContext extends BaseContext>(
overriddenExecutableSchemaId ??
executableSchemaIdForSchema(schema);

const reportData = getReportData(executableSchemaId);
const { report } = reportData;
const { trace } = treeBuilder;

let statsReportKey: string | undefined = undefined;
Expand Down Expand Up @@ -673,9 +680,12 @@ export function ApolloServerPluginUsageReporting<TContext extends BaseContext>(
throw new Error(`Error encoding trace: ${protobufError}`);
}

if (resolvedOperation) report.operationCount++;
if (resolvedOperation) {
getReportWhichMustBeUsedImmediately(executableSchemaId)
.operationCount++;
}

report.addTrace({
getReportWhichMustBeUsedImmediately(executableSchemaId).addTrace({
statsReportKey,
trace,
// We include the operation as a trace (rather than aggregated
Expand All @@ -700,7 +710,8 @@ export function ApolloServerPluginUsageReporting<TContext extends BaseContext>(
// If the buffer gets big (according to our estimate), send.
if (
sendReportsImmediately ||
report.sizeEstimator.bytes >=
getReportWhichMustBeUsedImmediately(executableSchemaId)
.sizeEstimator.bytes >=
(options.maxUncompressedReportSize || 4 * 1024 * 1024)
) {
await sendReportAndReportErrors(executableSchemaId);
Expand Down

0 comments on commit af3c9bb

Please sign in to comment.