diff --git a/packages/apollo-server-core/src/ApolloServer.ts b/packages/apollo-server-core/src/ApolloServer.ts index 22389130e61..fb1c696ea5d 100644 --- a/packages/apollo-server-core/src/ApolloServer.ts +++ b/packages/apollo-server-core/src/ApolloServer.ts @@ -31,7 +31,6 @@ import { ApolloServerPlugin, GraphQLServiceContext, GraphQLServerListener, - GraphQLSchemaContext, } from 'apollo-server-plugin-base'; import runtimeSupportsUploads from './utils/runtimeSupportsUploads'; @@ -52,8 +51,6 @@ import { SubscriptionServerOptions, FileUploadOptions, PluginDefinition, - GraphQLService, - Unsubscriber, } from './types'; import { gql } from './index'; @@ -92,6 +89,7 @@ import { ApolloServerPluginUsageReporting, } from './plugin'; import { InternalPluginId, pluginIsInternal } from './plugin/internalPlugin'; +import { SchemaManager } from './utils/schemaManager'; const NoIntrospection = (context: ValidationContext) => ({ Field(node: FieldDefinitionNode) { @@ -113,10 +111,9 @@ function approximateObjectSize(obj: T): number { return Buffer.byteLength(JSON.stringify(obj), 'utf8'); } -type SchemaDerivedData = { +export type SchemaDerivedData = { schema: GraphQLSchema; schemaHash: SchemaHash; - coreSupergraphSdl?: string; extensions: Array<() => GraphQLExtension>; // A store that, when enabled (default), will store the parsed and validated // versions of operations in-memory, allowing subsequent parses/validates @@ -125,22 +122,21 @@ type SchemaDerivedData = { }; type ServerState = - | { phase: 'initialized with schema'; schemaDerivedData: SchemaDerivedData } - | { phase: 'initialized with gateway'; gateway: GraphQLService } + | { phase: 'initialized'; schemaManager: SchemaManager } | { phase: 'starting'; barrier: Resolvable; - schemaDerivedData?: SchemaDerivedData; + schemaManager: SchemaManager; } | { phase: 'invoking serverWillStart'; barrier: Resolvable; - schemaDerivedData: SchemaDerivedData; + schemaManager: SchemaManager; } | { phase: 'failed to start'; error: Error; loadedSchema: boolean } | { phase: 'started'; - schemaDerivedData: SchemaDerivedData; + schemaManager: SchemaManager; } | { phase: 'stopping'; barrier: Resolvable } | { phase: 'stopped'; stopError: Error | null }; @@ -179,9 +175,6 @@ export class ApolloServerBase { private state: ServerState; /** @deprecated: This is undefined for servers operating as gateways, and will be removed in a future release **/ protected schema?: GraphQLSchema; - private schemaDidLoadOrUpdateListeners = new Set< - (schemaContext: GraphQLSchemaContext) => void - >(); private toDispose = new Set<() => Promise>(); private toDisposeLast = new Set<() => Promise>(); private experimental_approximateDocumentStoreMiB: Config['experimental_approximateDocumentStoreMiB']; @@ -413,7 +406,15 @@ export class ApolloServerBase { // called. (In the case of a serverless framework integration, // `ensureStarting` is automatically called at the end of the // constructor.) - this.state = { phase: 'initialized with gateway', gateway }; + this.state = { + phase: 'initialized', + schemaManager: new SchemaManager({ + gateway, + apolloConfig: this.apolloConfig, + schemaDerivedDataProvider: this.generateSchemaDerivedData.bind(this), + logger: this.logger, + }), + }; // The main thing that the Gateway does is replace execution with // its own executor. It would be awkward if you always had to pass @@ -428,16 +429,18 @@ export class ApolloServerBase { // and the deprecated `.schema` field (neither of which have ever worked // with the gateway) are available immediately after the constructor returns. this.state = { - phase: 'initialized with schema', - schemaDerivedData: this.generateSchemaDerivedData({ - schema: this.constructSchema(), + phase: 'initialized', + schemaManager: new SchemaManager({ + apiSchema: this.constructSchema(), + schemaDerivedDataProvider: this.generateSchemaDerivedData.bind(this), + logger: this.logger, }), }; // This field is deprecated; users who are interested in learning // their server's schema should instead make a plugin with serverWillStart, - // or register onSchemaChange on their gateway. It is only ever + // or register onSchemaLoadOrUpdate on their gateway. It is only ever // set for non-gateway servers. - this.schema = this.state.schemaDerivedData.schema; + this.schema = this.state.schemaManager.getSchemaDerivedData().schema; } // The main entry point (createHandler) to serverless frameworks generally @@ -513,53 +516,36 @@ export class ApolloServerBase { // This is protected so that it can be called from `apollo-server`. It is // otherwise an internal implementation detail. protected async _start(): Promise { - const initialState = this.state; - if ( - initialState.phase !== 'initialized with gateway' && - initialState.phase !== 'initialized with schema' - ) { + if (this.state.phase !== 'initialized') { throw new Error( - `called start() with surprising state ${initialState.phase}`, + `called start() with surprising state ${this.state.phase}`, ); } - const maybeGateway = - initialState.phase === 'initialized with gateway' - ? initialState.gateway - : undefined; + const schemaManager = this.state.schemaManager; const barrier = resolvable(); - this.state = { phase: 'starting', barrier }; + this.state = { + phase: 'starting', + barrier, + schemaManager, + }; let loadedSchema = false; try { - const schemaDerivedData = - initialState.phase === 'initialized with schema' - ? initialState.schemaDerivedData - : this.generateSchemaDerivedData({ - schema: await this.startGatewayAndLoadSchema( - initialState.gateway, - ), - }); + await schemaManager.start(); + this.toDispose.add(async () => { + await schemaManager.stop(); + }); loadedSchema = true; this.state = { phase: 'invoking serverWillStart', barrier, - // We can't just use the variable schemaDerivedData here, as any - // suspension point between gateway.load() and now is a place where the - // schema may update, at which point the variable schemaDerivedState - // would be stale. - // - // We also can't just use this.state.schemaDerivedData all the time, - // because for old gateways, gateway.load() will not trigger any - // onSchemaChange callbacks under certain conditions, in which case - // this.state.schemaDerivedState won't end up getting set. However, by - // coincidence, such cases also only load the schema and never update - // it. So we can defer to variable schemaDerivedData in such cases. - schemaDerivedData: this.state.schemaDerivedData ?? schemaDerivedData, + schemaManager, }; + const schemaDerivedData = schemaManager.getSchemaDerivedData(); const service: GraphQLServiceContext = { logger: this.logger, - schema: this.state.schemaDerivedData.schema, - schemaHash: this.state.schemaDerivedData.schemaHash, + schema: schemaDerivedData.schema, + schemaHash: schemaDerivedData.schemaHash, apollo: this.apolloConfig, serverlessFramework: this.serverlessFramework(), engine: { @@ -596,40 +582,19 @@ export class ApolloServerBase { serverListeners.forEach(({ schemaDidLoadOrUpdate, serverWillStop }) => { if (schemaDidLoadOrUpdate) { - if (maybeGateway) { - if (!maybeGateway.onSchemaLoadOrUpdate) { - // TODO: Once a gateway version providing the core schema to - // callbacks has been released, update this message to state - // the specific version needed. - throw new Error( - [ - `One of your plugins uses the 'onSchemaLoadOrUpdate' hook,`, - `but your gateway version is too old to support this hook.`, - `Please update your gateway version to latest.`, - ].join(' '), - ); - } - // This check shouldn't ever fail, but Typescript assumes this.state - // can be anything at this point, so we have this to help Typescript. - if (this.state.phase !== 'invoking serverWillStart') { - throw new Error(`State was unexpectedly ${this.state.phase}`); - } - // It is crucial there not be any await statements between this - // read and the addition of a listener to the listener set, or else - // schema updates could be missed by listeners. - const latestSchemaDerivedData = this.state.schemaDerivedData; - if (!latestSchemaDerivedData.coreSupergraphSdl) { - throw new Error( - `Gateway did not notify listeners when loading schema.`, - ); - } - schemaDidLoadOrUpdate({ - apiSchema: latestSchemaDerivedData.schema, - coreSupergraphSdl: latestSchemaDerivedData.coreSupergraphSdl, - }); - this.schemaDidLoadOrUpdateListeners.add(schemaDidLoadOrUpdate); - } else { - schemaDidLoadOrUpdate({ apiSchema: schemaDerivedData.schema }); + try { + schemaManager.onSchemaLoadOrUpdate(schemaDidLoadOrUpdate); + } catch (_) { + // TODO: Once a gateway version providing the core schema to + // callbacks has been released, update this message to state + // the specific version needed. + throw new Error( + [ + `One of your plugins uses the 'onSchemaLoadOrUpdate' hook,`, + `but your gateway version is too old to support this hook.`, + `Please update your gateway version to latest.`, + ].join(' '), + ); } } @@ -642,7 +607,7 @@ export class ApolloServerBase { this.state = { phase: 'started', - schemaDerivedData: this.state.schemaDerivedData, + schemaManager, }; } catch (error) { this.state = { phase: 'failed to start', error, loadedSchema }; @@ -691,8 +656,7 @@ export class ApolloServerBase { private async ensureStarted(): Promise { while (true) { switch (this.state.phase) { - case 'initialized with gateway': - case 'initialized with schema': + case 'initialized': try { await this._start(); } catch { @@ -717,7 +681,7 @@ export class ApolloServerBase { 'This data graph is missing a valid configuration. More details may be available in the server logs.', ); case 'started': - return this.state.schemaDerivedData; + return this.state.schemaManager.getSchemaDerivedData(); case 'stopping': throw new Error( 'Cannot execute GraphQL operations while the server is stopping.', @@ -745,10 +709,7 @@ export class ApolloServerBase { // call, startup wouldn't occur until `graphQLServerOptions` invokes // `ensureStarted`. protected ensureStarting() { - if ( - this.state.phase === 'initialized with gateway' || - this.state.phase === 'initialized with schema' - ) { + if (this.state.phase === 'initialized') { // Ah well. It would have been nice if the user had bothered // to call and await `start()`; that way they'd be able to learn // about any errors from it. Instead we'll kick it off here. @@ -781,75 +742,6 @@ export class ApolloServerBase { ); } - private async startGatewayAndLoadSchema( - gateway: GraphQLService, - ): Promise { - // Store the unsubscribe handles for later disposal when the server stops. - let unsubscribe: Unsubscriber; - if (gateway.onSchemaLoadOrUpdate) { - // Use onSchemaLoadOrUpdate if available, as it reports the core - // supergraph SDL and always reports the initial schema load. - unsubscribe = gateway.onSchemaLoadOrUpdate( - ({ apiSchema, coreSupergraphSdl }) => { - if ( - this.state.phase === 'starting' || - this.state.phase === 'invoking serverWillStart' || - this.state.phase === 'started' - ) { - this.state.schemaDerivedData = this.generateSchemaDerivedData({ - schema: apiSchema, - coreSupergraphSdl, - }); - this.schemaDidLoadOrUpdateListeners.forEach((listener) => { - listener({ - apiSchema, - coreSupergraphSdl, - }); - }); - } - }, - ); - } else { - unsubscribe = gateway.onSchemaChange((schema) => { - if ( - this.state.phase === 'starting' || - this.state.phase === 'invoking serverWillStart' || - this.state.phase === 'started' - ) { - // Note that we don't set the coreSupergraphSdl here, but we only need - // that if we have didSchemaLoadOrUpdate listeners, so we just check - // later in _start() whether that's true and throw there. - this.state.schemaDerivedData = this.generateSchemaDerivedData({ - schema, - }); - this.schemaDidLoadOrUpdateListeners.forEach((listener) => { - listener({ - apiSchema: schema, - }); - }); - } - }); - } - this.toDispose.add(async () => unsubscribe()); - - // For backwards compatibility with old versions of @apollo/gateway. - const engineConfig = - this.apolloConfig.keyHash && this.apolloConfig.graphId - ? { - apiKeyHash: this.apolloConfig.keyHash, - graphId: this.apolloConfig.graphId, - graphVariant: this.apolloConfig.graphVariant, - } - : undefined; - - const config = await gateway.load({ - apollo: this.apolloConfig, - engine: engineConfig, - }); - this.toDispose.add(async () => await gateway.stop?.()); - return config.schema; - } - private constructSchema(): GraphQLSchema { const { schema, @@ -927,13 +819,7 @@ export class ApolloServerBase { }); } - private generateSchemaDerivedData({ - schema, - coreSupergraphSdl, - }: { - schema: GraphQLSchema; - coreSupergraphSdl?: string; - }): SchemaDerivedData { + private generateSchemaDerivedData(schema: GraphQLSchema): SchemaDerivedData { const schemaHash = generateSchemaHash(schema); const { mocks, mockEntireSchema, extensions: _extensions } = this.config; @@ -962,7 +848,6 @@ export class ApolloServerBase { return { schema, schemaHash, - coreSupergraphSdl, extensions, documentStore, }; @@ -1019,12 +904,12 @@ export class ApolloServerBase { | Http2SecureServer | WebSocket.Server, ) { + if (this.config.gateway) { + throw Error( + 'Subscriptions are not supported when operating as a gateway', + ); + } if (!this.subscriptionServerOptions) { - if (this.config.gateway) { - throw Error( - 'Subscriptions are not supported when operating as a gateway', - ); - } if (this.supportsSubscriptions()) { throw Error( 'Subscriptions are disabled, due to subscriptions set to false in the ApolloServer constructor', @@ -1041,13 +926,11 @@ export class ApolloServerBase { let schema: GraphQLSchema; switch (this.state.phase) { - case 'initialized with schema': + case 'initialized': case 'invoking serverWillStart': case 'started': - schema = this.state.schemaDerivedData.schema; + schema = this.state.schemaManager.getSchemaDerivedData().schema; break; - case 'initialized with gateway': - // shouldn't happen: gateway doesn't support subs case 'starting': // shouldn't happen: there's no await between 'starting' and // 'invoking serverWillStart' without gateway diff --git a/packages/apollo-server-core/src/utils/schemaManager.ts b/packages/apollo-server-core/src/utils/schemaManager.ts new file mode 100644 index 00000000000..65314024e76 --- /dev/null +++ b/packages/apollo-server-core/src/utils/schemaManager.ts @@ -0,0 +1,231 @@ +import { GraphQLSchema } from 'graphql'; +import { + ApolloConfig, + GraphQLSchemaContext, + Logger, +} from 'apollo-server-types'; +import { GraphQLService, Unsubscriber } from '../types'; +import { SchemaDerivedData } from '../ApolloServer'; + +type SchemaDerivedDataProvider = ( + apiSchema: GraphQLSchema, +) => SchemaDerivedData; + +/** + * An async-safe class for tracking changes in schemas and schema-derived data. + * + * Specifically, as long as start() is called (and completes) before stop() is + * called, any set of executions of public methods is linearizable. + * + * Note that linearizability in Javascript is trivial if all public methods are + * non-async, but increasingly difficult to guarantee if public methods become + * async. Accordingly, if you believe a public method should be async, think + * carefully on whether it's worth the mental overhead. (E.g. if you wished that + * a callback was async, consider instead resolving a Promise in a non-async + * callback and having your async code wait on the Promise in setTimeout().) + */ +export class SchemaManager { + private readonly logger: Logger; + private readonly schemaDerivedDataProvider: SchemaDerivedDataProvider; + private readonly onSchemaLoadOrUpdateListeners = new Set< + (schemaContext: GraphQLSchemaContext) => void + >(); + private isStopped = false; + private schemaDerivedData?: SchemaDerivedData; + private schemaContext?: GraphQLSchemaContext; + + // For state that's specific to the mode of operation. + private readonly modeSpecificState: + | { + readonly mode: 'gateway'; + readonly gateway: GraphQLService; + readonly apolloConfig: ApolloConfig; + unsubscribeFromGateway?: Unsubscriber; + } + | { + readonly mode: 'schema'; + readonly apiSchema: GraphQLSchema; + }; + + constructor( + options: ( + | { gateway: GraphQLService; apolloConfig: ApolloConfig } + | { apiSchema: GraphQLSchema } + ) & { + logger: Logger; + schemaDerivedDataProvider: SchemaDerivedDataProvider; + }, + ) { + this.logger = options.logger; + this.schemaDerivedDataProvider = options.schemaDerivedDataProvider; + if ('gateway' in options) { + this.modeSpecificState = { + mode: 'gateway', + gateway: options.gateway, + apolloConfig: options.apolloConfig, + }; + } else { + this.modeSpecificState = { + mode: 'schema', + apiSchema: options.apiSchema, + }; + + // Note that we must trigger the load event here instead of start() since: + // - The call site of this constructor expects to fail fast if the schema + // is invalid. + // - ApolloServerBase.installSubscriptionHandlers() expects schema-derived + // data to be available before ApolloServerBase.start() is called. + // - ApolloServerBase.schema expects to be set shortly after construction. + this.processSchemaLoadOrUpdateEvent({ apiSchema: options.apiSchema }); + } + } + + /** + * Calling start() will: + * - Start gateway schema fetching (if a gateway was provided). + * - Ensures schema-derived data is initialized for all modes of operation. + * - For gateways, synchronously notify onSchemaLoadOrUpdate() listeners of + * schema load, and asynchronously notify them of schema updates. + */ + public async start(): Promise { + if (this.modeSpecificState.mode === 'gateway') { + const gateway = this.modeSpecificState.gateway; + if (gateway.onSchemaLoadOrUpdate) { + // Use onSchemaLoadOrUpdate if available, as it reports the core + // supergraph SDL and always reports the initial schema load. + this.modeSpecificState.unsubscribeFromGateway = + gateway.onSchemaLoadOrUpdate((schemaContext) => { + this.processSchemaLoadOrUpdateEvent(schemaContext); + }); + } else { + this.modeSpecificState.unsubscribeFromGateway = gateway.onSchemaChange( + (apiSchema) => { + this.processSchemaLoadOrUpdateEvent({ apiSchema }); + }, + ); + } + + // For backwards compatibility with old versions of @apollo/gateway. + const apolloConfig = this.modeSpecificState.apolloConfig; + const engineConfig = + apolloConfig.keyHash && apolloConfig.graphId + ? { + apiKeyHash: apolloConfig.keyHash, + graphId: apolloConfig.graphId, + graphVariant: apolloConfig.graphVariant, + } + : undefined; + const config = await this.modeSpecificState.gateway.load({ + apollo: apolloConfig, + engine: engineConfig, + }); + + // Note that for old gateways that have onSchemaChange() and no + // onSchemaLoadOrUpdate(), this.schemaDerivedData may not be initialized + // during gateway.load() (because old gateways don't notify listeners on + // schema load in some cases), so we must initialize it here if needed. + if (!this.schemaDerivedData) { + this.processSchemaLoadOrUpdateEvent({ apiSchema: config.schema }); + } + } + } + + /** + * Registers a listener for schema load/update events. Note that the latest + * event is buffered, i.e. + * - If registered before schema load time (at start() for gateways and at + * construction otherwise), then the callback will be first called in the + * method that loads, and then later for updates. + * - If registered after schema load time but before stop(), the callback will + * be first called in this method (for whatever the current schema is), and + * then later for updates. + * - If registered after stop(), the callback will never be called. + * + * For gateways, a core supergraph SDL will be provided to the callback. If + * your gateway is too old to provide a core supergraph SDL, this method will + * throw. + * + * @param callback The listener to execute on schema load/updates. + */ + public onSchemaLoadOrUpdate( + callback: (schemaContext: GraphQLSchemaContext) => void, + ): Unsubscriber { + if ( + this.modeSpecificState.mode === 'gateway' && + !this.modeSpecificState.gateway.onSchemaLoadOrUpdate + ) { + throw new Error( + 'Your gateway is too old to register a onSchemaLoadOrUpdate() listener', + ); + } else { + if (!this.isStopped && this.schemaContext) { + try { + callback(this.schemaContext); + } catch (e) { + this.logger.error( + [ + "An error was thrown from an 'onSchemaLoadOrUpdate' listener:", + `${e?.message ?? e}`, + ].join(' '), + ); + } + } + this.onSchemaLoadOrUpdateListeners.add(callback); + } + + return () => { + this.onSchemaLoadOrUpdateListeners.delete(callback); + }; + } + + /** + * Get the schema-derived state for the current schema. For gateways, this + * throws if called before start() is called. + */ + public getSchemaDerivedData(): SchemaDerivedData { + if (!this.schemaDerivedData) { + throw new Error( + 'For gateways, you must call start() before getSchemaDerivedData()', + ); + } + return this.schemaDerivedData; + } + + /** + * Calling stop() will: + * - Stop gateway schema fetching (if a gateway was provided). + * - Note that this specific step may not succeed if gateway is old. + * - Stop updating schema-derived data. + * - Stop notifying onSchemaLoadOrUpdate() listeners. + */ + public async stop(): Promise { + this.isStopped = true; + if (this.modeSpecificState.mode === 'gateway') { + this.modeSpecificState.unsubscribeFromGateway?.(); + await this.modeSpecificState.gateway.stop?.(); + } + } + + private processSchemaLoadOrUpdateEvent( + schemaContext: GraphQLSchemaContext, + ): void { + if (!this.isStopped) { + this.schemaDerivedData = this.schemaDerivedDataProvider( + schemaContext.apiSchema, + ); + this.schemaContext = schemaContext; + this.onSchemaLoadOrUpdateListeners.forEach((listener) => { + try { + listener(schemaContext); + } catch (e) { + this.logger.error( + [ + "An error was thrown from an 'onSchemaLoadOrUpdate' listener:", + `${e?.message ?? e}`, + ].join(' '), + ); + } + }); + } + } +}