Skip to content

Commit

Permalink
remove bluebird.each (#3471)
Browse files Browse the repository at this point in the history
  • Loading branch information
maximelkin authored and kibertoad committed Oct 12, 2019
1 parent 20bd04b commit 4a71315
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 150 deletions.
267 changes: 124 additions & 143 deletions lib/dialects/oracledb/index.js
Expand Up @@ -7,6 +7,7 @@ const ColumnCompiler = require('./schema/columncompiler');
const { BlobHelper, ReturningHelper, isConnectionError } = require('./utils');
const Bluebird = require('bluebird');
const stream = require('stream');
const { promisify } = require('util');
const Transaction = require('./transaction');
const Client_Oracle = require('../oracle');
const Oracle_Formatter = require('../oracle/formatter');
Expand Down Expand Up @@ -128,7 +129,7 @@ Client_Oracledb.prototype.acquireRawConnection = function() {
});
});
};
const fetchAsync = function(sql, bindParams, options, cb) {
const fetchAsync = promisify(function(sql, bindParams, options, cb) {
options = options || {};
options.outFormat = client.driver.OBJECT;
if (options.resultSet) {
Expand Down Expand Up @@ -173,61 +174,45 @@ Client_Oracledb.prototype.acquireRawConnection = function() {
} else {
connection.execute(sql, bindParams || [], options, cb);
}
};
});
connection.executeAsync = function(sql, bindParams, options) {
// Read all lob
return new Bluebird(function(resultResolve, resultReject) {
fetchAsync(sql, bindParams, options, function(err, results) {
if (err) {
return resultReject(err);
}
// Collect LOBs to read
const lobs = [];
if (results.rows) {
if (Array.isArray(results.rows)) {
for (let i = 0; i < results.rows.length; i++) {
// Iterate through the rows
const row = results.rows[i];
for (const column in row) {
if (row[column] instanceof stream.Readable) {
lobs.push({ index: i, key: column, stream: row[column] });
}
return fetchAsync(sql, bindParams, options).then(async (results) => {
const closeResultSet = () => {
return results.resultSet
? promisify(results.resultSet.close).call(results.resultSet)
: Promise.resolve();
};

// Collect LOBs to read
const lobs = [];
if (results.rows) {
if (Array.isArray(results.rows)) {
for (let i = 0; i < results.rows.length; i++) {
// Iterate through the rows
const row = results.rows[i];
for (const column in row) {
if (row[column] instanceof stream.Readable) {
lobs.push({ index: i, key: column, stream: row[column] });
}
}
}
}
Bluebird.each(lobs, function(lob) {
return new Bluebird(function(lobResolve, lobReject) {
readStream(lob.stream, function(err, d) {
if (err) {
if (results.resultSet) {
results.resultSet.close(function() {
return lobReject(err);
});
}
return lobReject(err);
}
results.rows[lob.index][lob.key] = d;
lobResolve();
});
});
}).then(
function() {
if (results.resultSet) {
results.resultSet.close(function(err) {
if (err) {
return resultReject(err);
}
return resultResolve(results);
});
}
resultResolve(results);
},
function(err) {
resultReject(err);
}
);
});
}

try {
for (const lob of lobs) {
results.rows[lob.index][lob.key] = await readStream(lob.stream);
}
} catch (e) {
await closeResultSet().catch(() => {});

throw e;
}

await closeResultSet();

return results;
});
};
resolver(connection);
Expand All @@ -245,110 +230,106 @@ Client_Oracledb.prototype.destroyRawConnection = function(connection) {
// Runs the query on the specified connection, providing the bindings
// and any other necessary prep work.
Client_Oracledb.prototype._query = function(connection, obj) {
return new Bluebird(function(resolver, rejecter) {
if (!obj.sql) {
return rejecter(new Error('The query is empty'));
if (!obj.sql) throw new Error('The query is empty');

const options = { autoCommit: false };
if (obj.method === 'select') {
options.resultSet = true;
}
return Bluebird.resolve(
connection.executeAsync(obj.sql, obj.bindings, options)
).then(async function(response) {
// Flatten outBinds
let outBinds = _.flatten(response.outBinds);
obj.response = response.rows || [];
obj.rowsAffected = response.rows
? response.rows.rowsAffected
: response.rowsAffected;

//added for outBind parameter
if (obj.method === 'raw' && outBinds.length > 0) {
return {
response: outBinds,
};
}

if (obj.method === 'update') {
const modifiedRowsCount = obj.rowsAffected.length || obj.rowsAffected;
const updatedObjOutBinding = [];
const updatedOutBinds = [];
const updateOutBinds = (i) =>
function(value, index) {
const OutBindsOffset = index * modifiedRowsCount;
updatedOutBinds.push(outBinds[i + OutBindsOffset]);
};

for (let i = 0; i < modifiedRowsCount; i++) {
updatedObjOutBinding.push(obj.outBinding[0]);
_.each(obj.outBinding[0], updateOutBinds(i));
}
outBinds = updatedOutBinds;
obj.outBinding = updatedObjOutBinding;
}
const options = { autoCommit: false };
if (obj.method === 'select') {
options.resultSet = true;

if (!obj.returning && outBinds.length === 0) {
await connection.commitAsync();
return obj;
}
connection
.executeAsync(obj.sql, obj.bindings, options)
.then(function(response) {
// Flatten outBinds
let outBinds = _.flatten(response.outBinds);
obj.response = response.rows || [];
obj.rowsAffected = response.rows
? response.rows.rowsAffected
: response.rowsAffected;

//added for outBind parameter
if (obj.method === 'raw' && outBinds.length > 0) {
return resolver({
response: outBinds,
});
}
const rowIds = [];
let offset = 0;

if (obj.method === 'update') {
const modifiedRowsCount = obj.rowsAffected.length || obj.rowsAffected;
const updatedObjOutBinding = [];
const updatedOutBinds = [];
const updateOutBinds = (i) =>
function(value, index) {
const OutBindsOffset = index * modifiedRowsCount;
updatedOutBinds.push(outBinds[i + OutBindsOffset]);
};
for (let line = 0; line < obj.outBinding.length; line++) {
const ret = obj.outBinding[line];

for (let i = 0; i < modifiedRowsCount; i++) {
updatedObjOutBinding.push(obj.outBinding[0]);
_.each(obj.outBinding[0], updateOutBinds(i));
}
outBinds = updatedOutBinds;
obj.outBinding = updatedObjOutBinding;
}
offset =
offset +
(obj.outBinding[line - 1] ? obj.outBinding[line - 1].length : 0);

if (!obj.returning && outBinds.length === 0) {
return connection.commitAsync().then(function() {
resolver(obj);
});
}
const rowIds = [];
let offset = 0;
Bluebird.each(obj.outBinding, function(ret, line) {
offset =
offset +
(obj.outBinding[line - 1] ? obj.outBinding[line - 1].length : 0);
return Bluebird.each(ret, function(out, index) {
return new Bluebird(function(bindResolver, bindRejecter) {
if (out instanceof BlobHelper) {
const blob = outBinds[index + offset];
if (out.returning) {
obj.response[line] = obj.response[line] || {};
obj.response[line][out.columnName] = out.value;
}
blob.on('error', function(err) {
bindRejecter(err);
});
blob.on('finish', function() {
bindResolver();
});
blob.write(out.value);
blob.end();
} else if (obj.outBinding[line][index] === 'ROWID') {
rowIds.push(outBinds[index + offset]);
bindResolver();
} else {
obj.response[line] = obj.response[line] || {};
obj.response[line][out] = outBinds[index + offset];
bindResolver();
}
});
});
})
.then(function() {
return connection.commitAsync();
})
.then(function() {
if (obj.returningSql) {
return connection
.executeAsync(obj.returningSql(), rowIds, { resultSet: true })
.then(function(response) {
obj.response = response.rows;
return obj;
}, rejecter);
for (let index = 0; index < ret.length; index++) {
const out = ret[index];

await new Promise(function(bindResolver, bindRejecter) {
if (out instanceof BlobHelper) {
const blob = outBinds[index + offset];
if (out.returning) {
obj.response[line] = obj.response[line] || {};
obj.response[line][out.columnName] = out.value;
}
blob.on('error', function(err) {
bindRejecter(err);
});
blob.on('finish', function() {
bindResolver();
});
blob.write(out.value);
blob.end();
} else if (obj.outBinding[line][index] === 'ROWID') {
rowIds.push(outBinds[index + offset]);
bindResolver();
} else {
obj.response[line] = obj.response[line] || {};
obj.response[line][out] = outBinds[index + offset];
bindResolver();
}
});
}
}
return connection.commitAsync().then(function() {
if (obj.returningSql) {
return connection
.executeAsync(obj.returningSql(), rowIds, { resultSet: true })
.then(function(response) {
obj.response = response.rows;
return obj;
}, rejecter)
.then(function(obj) {
resolver(obj);
});
}, rejecter);
}
return obj;
});
});
};

// Handle clob
function readStream(stream, cb) {
const readStream = promisify((stream, cb) => {
const oracledb = require('oracledb');
let data = '';

Expand All @@ -370,7 +351,7 @@ function readStream(stream, cb) {
stream.on('end', function() {
cb(null, data);
});
}
});

// Process the response as returned from the query.
Client_Oracledb.prototype.processResponse = function(obj, runner) {
Expand Down
16 changes: 9 additions & 7 deletions test/tape/transactions.js
@@ -1,5 +1,4 @@
'use strict';
const Bluebird = require('bluebird');
const harness = require('./harness');
const tape = require('tape');
const JSONStream = require('JSONStream');
Expand Down Expand Up @@ -483,14 +482,17 @@ module.exports = function(knex) {
let cid,
queryCount = 0;

const runCommands = async (tx, commands) => {
for (const command of commands) {
await tx.raw(command);
}
};
return knex
.transaction(function(tx) {
Bluebird.each(
['SET join_collapse_limit to 1', 'SET enable_nestloop = off'],
function(request) {
return tx.raw(request);
}
)
runCommands(tx, [
'SET join_collapse_limit to 1',
'SET enable_nestloop = off',
])
.then(function() {
const stream = tx.table('test_table').stream();
stream.on('end', function() {
Expand Down

0 comments on commit 4a71315

Please sign in to comment.