Skip to content

Commit

Permalink
Bluebird remove map mapSeries (#3474)
Browse files Browse the repository at this point in the history
  • Loading branch information
maximelkin authored and kibertoad committed Oct 11, 2019
1 parent f782348 commit f56eaf5
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 68 deletions.
4 changes: 1 addition & 3 deletions lib/dialects/sqlite3/index.js
Expand Up @@ -122,9 +122,7 @@ Object.assign(Client_SQLite3.prototype, {
return client
._query(connection, sql)
.then((obj) => obj.response)
.map(function(row) {
stream.write(row);
})
.then((rows) => rows.forEach((row) => stream.write(row)))
.catch(function(err) {
stream.emit('error', err);
})
Expand Down
20 changes: 12 additions & 8 deletions lib/migrate/Migrator.js
Expand Up @@ -436,14 +436,18 @@ class Migrator {
qb.max('batch').from(getTableName(tableName, schemaName));
})
.orderBy('id', 'desc')
.map((migration) => {
return allMigrations.find((entry) => {
return (
this.config.migrationSource.getMigrationName(entry) ===
migration.name
);
});
});
.then((migrations) =>
Promise.all(
migrations.map((migration) => {
return allMigrations.find((entry) => {
return (
this.config.migrationSource.getMigrationName(entry) ===
migration.name
);
});
})
)
);
}

// Returns the latest batch number.
Expand Down
71 changes: 22 additions & 49 deletions lib/util/batchInsert.js
@@ -1,5 +1,5 @@
const { isNumber, chunk, flatten } = require('lodash');
const Bluebird = require('bluebird');
const delay = require('./delay');

module.exports = function batchInsert(
client,
Expand All @@ -8,63 +8,38 @@ module.exports = function batchInsert(
chunkSize = 1000
) {
let returning = void 0;
let autoTransaction = true;
let transaction = null;

const getTransaction = () =>
new Bluebird((resolve, reject) => {
if (transaction) {
autoTransaction = false;
return resolve(transaction);
}

autoTransaction = true;
client.transaction(resolve).catch(reject);
});

const wrapper = Object.assign(
new Bluebird((resolve, reject) => {
const chunks = chunk(batch, chunkSize);
const runInTransaction = (cb) => {
if (transaction) {
return cb(transaction);
}
return client.transaction(cb);
};

return Object.assign(
Promise.resolve().then(async () => {
if (!isNumber(chunkSize) || chunkSize < 1) {
return reject(new TypeError(`Invalid chunkSize: ${chunkSize}`));
throw new TypeError(`Invalid chunkSize: ${chunkSize}`);
}

if (!Array.isArray(batch)) {
return reject(
new TypeError(`Invalid batch: Expected array, got ${typeof batch}`)
throw new TypeError(
`Invalid batch: Expected array, got ${typeof batch}`
);
}

//Next tick to ensure wrapper functions are called if needed
return Bluebird.delay(1)
.then(getTransaction)
.then((tr) => {
return Bluebird.mapSeries(chunks, (items) =>
tr(tableName).insert(items, returning)
)
.then((result) => {
result = flatten(result || []);

if (autoTransaction) {
//TODO: -- Oracle tr.commit() does not return a 'thenable' !? Ugly hack for now.
return (tr.commit(result) || Bluebird.resolve()).then(
() => result
);
}

return result;
})
.catch((error) => {
if (autoTransaction) {
return tr.rollback(error).then(() => Bluebird.reject(error));
}
const chunks = chunk(batch, chunkSize);

return Bluebird.reject(error);
});
})
.then(resolve)
.catch(reject);
//Next tick to ensure wrapper functions are called if needed
await delay(1);
return runInTransaction(async (tr) => {
const chunksResults = [];
for (const items of chunks) {
chunksResults.push(await tr(tableName).insert(items, returning));
}
return flatten(chunksResults);
});
}),
{
returning(columns) {
Expand All @@ -79,6 +54,4 @@ module.exports = function batchInsert(
},
}
);

return wrapper;
};
3 changes: 3 additions & 0 deletions lib/util/delay.js
@@ -0,0 +1,3 @@
const { promisify } = require('util');

module.exports = promisify(setTimeout);
9 changes: 7 additions & 2 deletions test/integration/builder/inserts.js
Expand Up @@ -5,6 +5,7 @@
const uuid = require('uuid');
const _ = require('lodash');
const bluebird = require('bluebird');
const sinon = require('sinon');

module.exports = function(knex) {
describe('Inserts', function() {
Expand Down Expand Up @@ -1147,11 +1148,13 @@ module.exports = function(knex) {
});
});

it('#1880 - Duplicate keys in batchInsert should not throw unhandled exception', function() {
it('#1880 - Duplicate keys in batchInsert should not throw unhandled exception', async function() {
if (/redshift/i.test(knex.client.driverName)) {
return;
}
return new bluebird(function(resolve, reject) {
const fn = sinon.stub();
process.on('unhandledRejection', fn);
await new bluebird(function(resolve, reject) {
return knex.schema
.dropTableIfExists('batchInsertDuplicateKey')
.then(function() {
Expand Down Expand Up @@ -1182,6 +1185,8 @@ module.exports = function(knex) {
resolve(error);
});
}).timeout(10000);
expect(fn).have.not.been.called;
process.removeListener('unhandledRejection', fn);
});

it('knex.batchInsert with specified transaction', function() {
Expand Down
12 changes: 6 additions & 6 deletions test/integration/datatype/bigint.js
Expand Up @@ -28,15 +28,15 @@ module.exports = function(knex) {
.where('expiry', unsafeBigint)
.select('*');
})
.map(function(row) {
.then(function(row) {
// triggers request execution
})
.then(function() {
return knex(tableName)
.where('expiry', negativeUnsafeBigint)
.select('*');
})
.map(function(row) {
.then(function(row) {
// triggers request execution
})
.catch(function(err) {
Expand Down Expand Up @@ -79,16 +79,16 @@ module.exports = function(knex) {
.where('expiry', bigintTimestamp)
.select('*');
})
.map(function(row) {
expect(row.id).to.equal('positive');
.then(function(rows) {
rows.forEach((row) => expect(row.id).to.equal('positive'));
})
.then(function() {
return knex(tableName)
.where('expiry', negativeBigintTimestamp)
.select('*');
})
.map(function(row) {
expect(row.id).to.equal('negative');
.then(function(rows) {
rows.forEach((row) => expect(row.id).to.equal('negative'));
})
.catch(function(err) {
expect(err).to.be.undefined;
Expand Down

0 comments on commit f56eaf5

Please sign in to comment.