diff --git a/lib/dialects/sqlite3/index.js b/lib/dialects/sqlite3/index.js index e02159ef92..711f11870d 100644 --- a/lib/dialects/sqlite3/index.js +++ b/lib/dialects/sqlite3/index.js @@ -122,9 +122,7 @@ Object.assign(Client_SQLite3.prototype, { return client ._query(connection, sql) .then((obj) => obj.response) - .map(function(row) { - stream.write(row); - }) + .then((rows) => rows.forEach((row) => stream.write(row))) .catch(function(err) { stream.emit('error', err); }) diff --git a/lib/migrate/Migrator.js b/lib/migrate/Migrator.js index 18f632e8b0..300d84ac56 100644 --- a/lib/migrate/Migrator.js +++ b/lib/migrate/Migrator.js @@ -436,14 +436,18 @@ class Migrator { qb.max('batch').from(getTableName(tableName, schemaName)); }) .orderBy('id', 'desc') - .map((migration) => { - return allMigrations.find((entry) => { - return ( - this.config.migrationSource.getMigrationName(entry) === - migration.name - ); - }); - }); + .then((migrations) => + Promise.all( + migrations.map((migration) => { + return allMigrations.find((entry) => { + return ( + this.config.migrationSource.getMigrationName(entry) === + migration.name + ); + }); + }) + ) + ); } // Returns the latest batch number. diff --git a/lib/util/batchInsert.js b/lib/util/batchInsert.js index cc1e3fb1d7..03765105ad 100644 --- a/lib/util/batchInsert.js +++ b/lib/util/batchInsert.js @@ -1,5 +1,5 @@ const { isNumber, chunk, flatten } = require('lodash'); -const Bluebird = require('bluebird'); +const delay = require('./delay'); module.exports = function batchInsert( client, @@ -8,63 +8,38 @@ module.exports = function batchInsert( chunkSize = 1000 ) { let returning = void 0; - let autoTransaction = true; let transaction = null; - const getTransaction = () => - new Bluebird((resolve, reject) => { - if (transaction) { - autoTransaction = false; - return resolve(transaction); - } - - autoTransaction = true; - client.transaction(resolve).catch(reject); - }); - - const wrapper = Object.assign( - new Bluebird((resolve, reject) => { - const chunks = chunk(batch, chunkSize); + const runInTransaction = (cb) => { + if (transaction) { + return cb(transaction); + } + return client.transaction(cb); + }; + return Object.assign( + Promise.resolve().then(async () => { if (!isNumber(chunkSize) || chunkSize < 1) { - return reject(new TypeError(`Invalid chunkSize: ${chunkSize}`)); + throw new TypeError(`Invalid chunkSize: ${chunkSize}`); } if (!Array.isArray(batch)) { - return reject( - new TypeError(`Invalid batch: Expected array, got ${typeof batch}`) + throw new TypeError( + `Invalid batch: Expected array, got ${typeof batch}` ); } - //Next tick to ensure wrapper functions are called if needed - return Bluebird.delay(1) - .then(getTransaction) - .then((tr) => { - return Bluebird.mapSeries(chunks, (items) => - tr(tableName).insert(items, returning) - ) - .then((result) => { - result = flatten(result || []); - - if (autoTransaction) { - //TODO: -- Oracle tr.commit() does not return a 'thenable' !? Ugly hack for now. - return (tr.commit(result) || Bluebird.resolve()).then( - () => result - ); - } - - return result; - }) - .catch((error) => { - if (autoTransaction) { - return tr.rollback(error).then(() => Bluebird.reject(error)); - } + const chunks = chunk(batch, chunkSize); - return Bluebird.reject(error); - }); - }) - .then(resolve) - .catch(reject); + //Next tick to ensure wrapper functions are called if needed + await delay(1); + return runInTransaction(async (tr) => { + const chunksResults = []; + for (const items of chunks) { + chunksResults.push(await tr(tableName).insert(items, returning)); + } + return flatten(chunksResults); + }); }), { returning(columns) { @@ -79,6 +54,4 @@ module.exports = function batchInsert( }, } ); - - return wrapper; }; diff --git a/lib/util/delay.js b/lib/util/delay.js new file mode 100644 index 0000000000..3a0376b875 --- /dev/null +++ b/lib/util/delay.js @@ -0,0 +1,3 @@ +const { promisify } = require('util'); + +module.exports = promisify(setTimeout); diff --git a/test/integration/builder/inserts.js b/test/integration/builder/inserts.js index 896eb43dcc..21667966f4 100644 --- a/test/integration/builder/inserts.js +++ b/test/integration/builder/inserts.js @@ -5,6 +5,7 @@ const uuid = require('uuid'); const _ = require('lodash'); const bluebird = require('bluebird'); +const sinon = require('sinon'); module.exports = function(knex) { describe('Inserts', function() { @@ -1147,11 +1148,13 @@ module.exports = function(knex) { }); }); - it('#1880 - Duplicate keys in batchInsert should not throw unhandled exception', function() { + it('#1880 - Duplicate keys in batchInsert should not throw unhandled exception', async function() { if (/redshift/i.test(knex.client.driverName)) { return; } - return new bluebird(function(resolve, reject) { + const fn = sinon.stub(); + process.on('unhandledRejection', fn); + await new bluebird(function(resolve, reject) { return knex.schema .dropTableIfExists('batchInsertDuplicateKey') .then(function() { @@ -1182,6 +1185,8 @@ module.exports = function(knex) { resolve(error); }); }).timeout(10000); + expect(fn).have.not.been.called; + process.removeListener('unhandledRejection', fn); }); it('knex.batchInsert with specified transaction', function() { diff --git a/test/integration/datatype/bigint.js b/test/integration/datatype/bigint.js index 8ed5e5e3e0..7a3c410853 100644 --- a/test/integration/datatype/bigint.js +++ b/test/integration/datatype/bigint.js @@ -28,7 +28,7 @@ module.exports = function(knex) { .where('expiry', unsafeBigint) .select('*'); }) - .map(function(row) { + .then(function(row) { // triggers request execution }) .then(function() { @@ -36,7 +36,7 @@ module.exports = function(knex) { .where('expiry', negativeUnsafeBigint) .select('*'); }) - .map(function(row) { + .then(function(row) { // triggers request execution }) .catch(function(err) { @@ -79,16 +79,16 @@ module.exports = function(knex) { .where('expiry', bigintTimestamp) .select('*'); }) - .map(function(row) { - expect(row.id).to.equal('positive'); + .then(function(rows) { + rows.forEach((row) => expect(row.id).to.equal('positive')); }) .then(function() { return knex(tableName) .where('expiry', negativeBigintTimestamp) .select('*'); }) - .map(function(row) { - expect(row.id).to.equal('negative'); + .then(function(rows) { + rows.forEach((row) => expect(row.id).to.equal('negative')); }) .catch(function(err) { expect(err).to.be.undefined;