From f4b684859448b1972aae0a6ce70e158f969a05cf Mon Sep 17 00:00:00 2001 From: Oran Epelbaum Date: Sun, 27 Oct 2019 17:14:18 +0200 Subject: [PATCH] Dynamic connection configuration resolution (#3497) --- lib/client.js | 46 +++++++++--- .../integration/connection-config-provider.js | 71 +++++++++++++++++++ test/integration/index.js | 1 + types/index.d.ts | 34 ++++++--- 4 files changed, 131 insertions(+), 21 deletions(-) create mode 100644 test/integration/connection-config-provider.js diff --git a/lib/client.js b/lib/client.js index 9f092c9e45..da12c65b47 100644 --- a/lib/client.js +++ b/lib/client.js @@ -55,7 +55,13 @@ function Client(config = {}) { this.version = config.version; } - this.connectionSettings = cloneDeep(config.connection || {}); + if (config.connection && config.connection instanceof Function) { + this.connectionConfigProvider = config.connection; + this.connectionConfigExpirationChecker = () => true; // causes the provider to be called on first use + } else { + this.connectionSettings = cloneDeep(config.connection || {}); + this.connectionConfigExpirationChecker = null; + } if (this.driverName && config.connection) { this.initializeDriver(); if (!config.pool || (config.pool && config.pool.max !== 0)) { @@ -254,16 +260,36 @@ Object.assign(Client.prototype, { // choose the smallest, positive timeout setting and set on poolConfig poolConfig.acquireTimeoutMillis = Math.min(...timeouts); - return Object.assign(poolConfig, { - create: () => { - return this.acquireRawConnection().then(async (connection) => { - connection.__knexUid = uniqueId('__knexUid'); + const updatePoolConnectionSettingsFromProvider = async () => { + if (!this.connectionConfigProvider) { + return; // static configuration, nothing to update + } + if ( + !this.connectionConfigExpirationChecker || + !this.connectionConfigExpirationChecker() + ) { + return; // not expired, reuse existing connection + } + const providerResult = await this.connectionConfigProvider(); + if (providerResult.expirationChecker) { + this.connectionConfigExpirationChecker = + providerResult.expirationChecker; + delete providerResult.expirationChecker; // MySQL2 driver warns on receiving extra properties + } else { + this.connectionConfigExpirationChecker = null; + } + this.connectionSettings = providerResult; + }; - if (poolConfig.afterCreate) { - await promisify(poolConfig.afterCreate)(connection); - } - return connection; - }); + return Object.assign(poolConfig, { + create: async () => { + await updatePoolConnectionSettingsFromProvider(); + const connection = await this.acquireRawConnection(); + connection.__knexUid = uniqueId('__knexUid'); + if (poolConfig.afterCreate) { + await promisify(poolConfig.afterCreate)(connection); + } + return connection; }, destroy: (connection) => { diff --git a/test/integration/connection-config-provider.js b/test/integration/connection-config-provider.js new file mode 100644 index 0000000000..ac8a910122 --- /dev/null +++ b/test/integration/connection-config-provider.js @@ -0,0 +1,71 @@ +/*global expect*/ + +'use strict'; + +const _ = require('lodash'); +const makeKnex = require('../../knex'); + +module.exports = function(config) { + describe('Connection configuration provider', function() { + let configWorkingCopy; + let providerInvocationCount; + let connectionConfigWorkingCopy; + + this.beforeEach(() => { + configWorkingCopy = _.cloneDeep(config); + configWorkingCopy.pool.min = 1; + configWorkingCopy.pool.max = 2; + providerInvocationCount = 0; + connectionConfigWorkingCopy = configWorkingCopy.connection; + }); + + it('is not used when configuration is static', async function() { + return runTwoConcurrentTransactions(0); + }); + + it('can return a promise for a config object, which is reused when not given given an expiry checker', async () => { + configWorkingCopy.connection = () => { + ++providerInvocationCount; + return Promise.resolve(connectionConfigWorkingCopy); + }; + return runTwoConcurrentTransactions(1); + }); + + it('can return a config object, which is reused when not given given an expiry checker', async () => { + configWorkingCopy.connection = () => { + ++providerInvocationCount; + return connectionConfigWorkingCopy; + }; + return runTwoConcurrentTransactions(1); + }); + + it('reuses the same resolved config when not yet expired', async () => { + configWorkingCopy.connection = () => { + ++providerInvocationCount; + return Object.assign(connectionConfigWorkingCopy, { + expirationChecker: () => false, + }); + }; + return runTwoConcurrentTransactions(1); + }); + + it('replaces the resolved config when expired', async () => { + configWorkingCopy.connection = () => { + ++providerInvocationCount; + return Object.assign(connectionConfigWorkingCopy, { + expirationChecker: () => true, + }); + }; + return runTwoConcurrentTransactions(2); + }); + + async function runTwoConcurrentTransactions(expectedInvocationCount) { + const knex = makeKnex(configWorkingCopy); + await knex.transaction(async (trx) => { + await knex.transaction(async (trx2) => {}); + }); + await knex.destroy(); + expect(providerInvocationCount).equals(expectedInvocationCount); + } + }); +}; diff --git a/test/integration/index.js b/test/integration/index.js index ecb1ab45ca..1af217017b 100644 --- a/test/integration/index.js +++ b/test/integration/index.js @@ -6,6 +6,7 @@ const config = require('../knexfile'); const fs = require('fs'); Object.keys(config).forEach((dialectName) => { + require('./connection-config-provider')(config[dialectName]); return require('./suite')(logger(knex(config[dialectName]))); }); diff --git a/types/index.d.ts b/types/index.d.ts index 49245ffd40..e667b5dd3c 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -1609,17 +1609,7 @@ declare namespace Knex { client?: string | typeof Client; dialect?: string; version?: string; - connection?: - | string - | ConnectionConfig - | MariaSqlConnectionConfig - | MySqlConnectionConfig - | MsSqlConnectionConfig - | OracleDbConnectionConfig - | PgConnectionConfig - | RedshiftConnectionConfig - | Sqlite3ConnectionConfig - | SocketConnectionConfig; + connection?: string | StaticConnectionConfig | ConnectionConfigProvider; pool?: PoolConfig; migrations?: MigratorConfig; postProcessResponse?: (result: any, queryContext: any) => any; @@ -1636,6 +1626,21 @@ declare namespace Knex { log?: Logger; } + type StaticConnectionConfig = + | ConnectionConfig + | MariaSqlConnectionConfig + | MySqlConnectionConfig + | MsSqlConnectionConfig + | OracleDbConnectionConfig + | PgConnectionConfig + | RedshiftConnectionConfig + | Sqlite3ConnectionConfig + | SocketConnectionConfig; + + type ConnectionConfigProvider = SyncConnectionConfigProvider | AsyncConnectionConfigProvider; + type SyncConnectionConfigProvider = () => StaticConnectionConfig; + type AsyncConnectionConfigProvider = () => Promise; + interface ConnectionConfig { host: string; user: string; @@ -1660,6 +1665,7 @@ declare namespace Knex { requestTimeout?: number; stream?: boolean; parseJSON?: boolean; + expirationChecker?(): boolean; options?: { encrypt?: boolean; instanceName?: string; @@ -1707,6 +1713,7 @@ declare namespace Knex { read_default_group?: string; charset?: string; streamHWM?: number; + expirationChecker?(): boolean; } interface MariaSslConfiguration { @@ -1716,6 +1723,7 @@ declare namespace Knex { capath?: string; cipher?: string; rejectUnauthorized?: boolean; + expirationChecker?(): boolean; } // Config object for mysql: https://github.com/mysqljs/mysql#connection-options @@ -1743,6 +1751,7 @@ declare namespace Knex { flags?: string; ssl?: string | MariaSslConfiguration; decimalNumbers?: boolean; + expirationChecker?(): boolean; } interface OracleDbConnectionConfig { @@ -1755,6 +1764,7 @@ declare namespace Knex { debug?: boolean; requestTimeout?: number; connectString?: string; + expirationChecker?(): boolean; } // Config object for pg: https://github.com/DefinitelyTyped/DefinitelyTyped/blob/master/types/pg/index.d.ts @@ -1778,6 +1788,7 @@ declare namespace Knex { interface Sqlite3ConnectionConfig { filename: string; debug?: boolean; + expirationChecker?(): boolean; } interface SocketConnectionConfig { @@ -1786,6 +1797,7 @@ declare namespace Knex { password: string; database: string; debug?: boolean; + expirationChecker?(): boolean; } interface PoolConfig {