Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bluebird remove map mapSeries #3474

Merged
merged 2 commits into from Oct 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 1 addition & 3 deletions lib/dialects/sqlite3/index.js
Expand Up @@ -122,9 +122,7 @@ Object.assign(Client_SQLite3.prototype, {
return client
._query(connection, sql)
.then((obj) => obj.response)
.map(function(row) {
stream.write(row);
})
.then((rows) => rows.forEach((row) => stream.write(row)))
.catch(function(err) {
stream.emit('error', err);
})
Expand Down
20 changes: 12 additions & 8 deletions lib/migrate/Migrator.js
Expand Up @@ -436,14 +436,18 @@ class Migrator {
qb.max('batch').from(getTableName(tableName, schemaName));
})
.orderBy('id', 'desc')
.map((migration) => {
return allMigrations.find((entry) => {
return (
this.config.migrationSource.getMigrationName(entry) ===
migration.name
);
});
});
.then((migrations) =>
Promise.all(
migrations.map((migration) => {
return allMigrations.find((entry) => {
return (
this.config.migrationSource.getMigrationName(entry) ===
migration.name
);
});
})
)
);
}

// Returns the latest batch number.
Expand Down
71 changes: 22 additions & 49 deletions lib/util/batchInsert.js
@@ -1,5 +1,5 @@
const { isNumber, chunk, flatten } = require('lodash');
const Bluebird = require('bluebird');
const delay = require('./delay');

module.exports = function batchInsert(
client,
Expand All @@ -8,63 +8,38 @@ module.exports = function batchInsert(
chunkSize = 1000
) {
let returning = void 0;
let autoTransaction = true;
let transaction = null;

const getTransaction = () =>
new Bluebird((resolve, reject) => {
if (transaction) {
autoTransaction = false;
return resolve(transaction);
}

autoTransaction = true;
client.transaction(resolve).catch(reject);
});

const wrapper = Object.assign(
new Bluebird((resolve, reject) => {
const chunks = chunk(batch, chunkSize);
const runInTransaction = (cb) => {
if (transaction) {
return cb(transaction);
}
return client.transaction(cb);
};

return Object.assign(
Promise.resolve().then(async () => {
if (!isNumber(chunkSize) || chunkSize < 1) {
return reject(new TypeError(`Invalid chunkSize: ${chunkSize}`));
throw new TypeError(`Invalid chunkSize: ${chunkSize}`);
}

if (!Array.isArray(batch)) {
return reject(
new TypeError(`Invalid batch: Expected array, got ${typeof batch}`)
throw new TypeError(
`Invalid batch: Expected array, got ${typeof batch}`
);
}

//Next tick to ensure wrapper functions are called if needed
return Bluebird.delay(1)
.then(getTransaction)
.then((tr) => {
return Bluebird.mapSeries(chunks, (items) =>
tr(tableName).insert(items, returning)
)
.then((result) => {
result = flatten(result || []);

if (autoTransaction) {
//TODO: -- Oracle tr.commit() does not return a 'thenable' !? Ugly hack for now.
return (tr.commit(result) || Bluebird.resolve()).then(
() => result
);
}

return result;
})
.catch((error) => {
if (autoTransaction) {
return tr.rollback(error).then(() => Bluebird.reject(error));
}
const chunks = chunk(batch, chunkSize);

return Bluebird.reject(error);
});
})
.then(resolve)
.catch(reject);
//Next tick to ensure wrapper functions are called if needed
await delay(1);
return runInTransaction(async (tr) => {
const chunksResults = [];
for (const items of chunks) {
chunksResults.push(await tr(tableName).insert(items, returning));
}
return flatten(chunksResults);
});
}),
{
returning(columns) {
Expand All @@ -79,6 +54,4 @@ module.exports = function batchInsert(
},
}
);

return wrapper;
};
3 changes: 3 additions & 0 deletions lib/util/delay.js
@@ -0,0 +1,3 @@
const { promisify } = require('util');

module.exports = promisify(setTimeout);
9 changes: 7 additions & 2 deletions test/integration/builder/inserts.js
Expand Up @@ -5,6 +5,7 @@
const uuid = require('uuid');
const _ = require('lodash');
const bluebird = require('bluebird');
const sinon = require('sinon');

module.exports = function(knex) {
describe('Inserts', function() {
Expand Down Expand Up @@ -1147,11 +1148,13 @@ module.exports = function(knex) {
});
});

it('#1880 - Duplicate keys in batchInsert should not throw unhandled exception', function() {
it('#1880 - Duplicate keys in batchInsert should not throw unhandled exception', async function() {
if (/redshift/i.test(knex.client.driverName)) {
return;
}
return new bluebird(function(resolve, reject) {
const fn = sinon.stub();
process.on('unhandledRejection', fn);
await new bluebird(function(resolve, reject) {
return knex.schema
.dropTableIfExists('batchInsertDuplicateKey')
.then(function() {
Expand Down Expand Up @@ -1182,6 +1185,8 @@ module.exports = function(knex) {
resolve(error);
});
}).timeout(10000);
expect(fn).have.not.been.called;
process.removeListener('unhandledRejection', fn);
});

it('knex.batchInsert with specified transaction', function() {
Expand Down
12 changes: 6 additions & 6 deletions test/integration/datatype/bigint.js
Expand Up @@ -28,15 +28,15 @@ module.exports = function(knex) {
.where('expiry', unsafeBigint)
.select('*');
})
.map(function(row) {
.then(function(row) {
// triggers request execution
})
.then(function() {
return knex(tableName)
.where('expiry', negativeUnsafeBigint)
.select('*');
})
.map(function(row) {
.then(function(row) {
// triggers request execution
})
.catch(function(err) {
Expand Down Expand Up @@ -79,16 +79,16 @@ module.exports = function(knex) {
.where('expiry', bigintTimestamp)
.select('*');
})
.map(function(row) {
expect(row.id).to.equal('positive');
.then(function(rows) {
rows.forEach((row) => expect(row.id).to.equal('positive'));
})
.then(function() {
return knex(tableName)
.where('expiry', negativeBigintTimestamp)
.select('*');
})
.map(function(row) {
expect(row.id).to.equal('negative');
.then(function(rows) {
rows.forEach((row) => expect(row.id).to.equal('negative'));
})
.catch(function(err) {
expect(err).to.be.undefined;
Expand Down