Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove bluebird.each #3471

Merged
merged 3 commits into from Oct 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
267 changes: 124 additions & 143 deletions lib/dialects/oracledb/index.js
Expand Up @@ -7,6 +7,7 @@ const ColumnCompiler = require('./schema/columncompiler');
const { BlobHelper, ReturningHelper, isConnectionError } = require('./utils');
const Bluebird = require('bluebird');
const stream = require('stream');
const { promisify } = require('util');
const Transaction = require('./transaction');
const Client_Oracle = require('../oracle');
const Oracle_Formatter = require('../oracle/formatter');
Expand Down Expand Up @@ -128,7 +129,7 @@ Client_Oracledb.prototype.acquireRawConnection = function() {
});
});
};
const fetchAsync = function(sql, bindParams, options, cb) {
const fetchAsync = promisify(function(sql, bindParams, options, cb) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this function use any closure variables, or it can be moved outside to only be promisified once?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function use connection from outside

options = options || {};
options.outFormat = client.driver.OBJECT;
if (options.resultSet) {
Expand Down Expand Up @@ -173,61 +174,45 @@ Client_Oracledb.prototype.acquireRawConnection = function() {
} else {
connection.execute(sql, bindParams || [], options, cb);
}
};
});
connection.executeAsync = function(sql, bindParams, options) {
// Read all lob
return new Bluebird(function(resultResolve, resultReject) {
fetchAsync(sql, bindParams, options, function(err, results) {
if (err) {
return resultReject(err);
}
// Collect LOBs to read
const lobs = [];
if (results.rows) {
if (Array.isArray(results.rows)) {
for (let i = 0; i < results.rows.length; i++) {
// Iterate through the rows
const row = results.rows[i];
for (const column in row) {
if (row[column] instanceof stream.Readable) {
lobs.push({ index: i, key: column, stream: row[column] });
}
return fetchAsync(sql, bindParams, options).then(async (results) => {
const closeResultSet = () => {
return results.resultSet
? promisify(results.resultSet.close).call(results.resultSet)
: Promise.resolve();
};

// Collect LOBs to read
const lobs = [];
if (results.rows) {
if (Array.isArray(results.rows)) {
for (let i = 0; i < results.rows.length; i++) {
// Iterate through the rows
const row = results.rows[i];
for (const column in row) {
if (row[column] instanceof stream.Readable) {
lobs.push({ index: i, key: column, stream: row[column] });
}
}
}
}
Bluebird.each(lobs, function(lob) {
return new Bluebird(function(lobResolve, lobReject) {
readStream(lob.stream, function(err, d) {
if (err) {
if (results.resultSet) {
results.resultSet.close(function() {
return lobReject(err);
});
}
return lobReject(err);
}
results.rows[lob.index][lob.key] = d;
lobResolve();
});
});
}).then(
function() {
if (results.resultSet) {
results.resultSet.close(function(err) {
if (err) {
return resultReject(err);
}
return resultResolve(results);
});
}
resultResolve(results);
},
function(err) {
resultReject(err);
}
);
});
}

try {
for (const lob of lobs) {
results.rows[lob.index][lob.key] = await readStream(lob.stream);
}
} catch (e) {
await closeResultSet().catch(() => {});

throw e;
}

await closeResultSet();

return results;
});
};
resolver(connection);
Expand All @@ -245,110 +230,106 @@ Client_Oracledb.prototype.destroyRawConnection = function(connection) {
// Runs the query on the specified connection, providing the bindings
// and any other necessary prep work.
Client_Oracledb.prototype._query = function(connection, obj) {
return new Bluebird(function(resolver, rejecter) {
if (!obj.sql) {
return rejecter(new Error('The query is empty'));
if (!obj.sql) throw new Error('The query is empty');

const options = { autoCommit: false };
if (obj.method === 'select') {
options.resultSet = true;
}
return Bluebird.resolve(
connection.executeAsync(obj.sql, obj.bindings, options)
).then(async function(response) {
// Flatten outBinds
let outBinds = _.flatten(response.outBinds);
obj.response = response.rows || [];
obj.rowsAffected = response.rows
? response.rows.rowsAffected
: response.rowsAffected;

//added for outBind parameter
if (obj.method === 'raw' && outBinds.length > 0) {
return {
response: outBinds,
};
}

if (obj.method === 'update') {
const modifiedRowsCount = obj.rowsAffected.length || obj.rowsAffected;
const updatedObjOutBinding = [];
const updatedOutBinds = [];
const updateOutBinds = (i) =>
function(value, index) {
const OutBindsOffset = index * modifiedRowsCount;
updatedOutBinds.push(outBinds[i + OutBindsOffset]);
};

for (let i = 0; i < modifiedRowsCount; i++) {
updatedObjOutBinding.push(obj.outBinding[0]);
_.each(obj.outBinding[0], updateOutBinds(i));
}
outBinds = updatedOutBinds;
obj.outBinding = updatedObjOutBinding;
}
const options = { autoCommit: false };
if (obj.method === 'select') {
options.resultSet = true;

if (!obj.returning && outBinds.length === 0) {
await connection.commitAsync();
return obj;
}
connection
.executeAsync(obj.sql, obj.bindings, options)
.then(function(response) {
// Flatten outBinds
let outBinds = _.flatten(response.outBinds);
obj.response = response.rows || [];
obj.rowsAffected = response.rows
? response.rows.rowsAffected
: response.rowsAffected;

//added for outBind parameter
if (obj.method === 'raw' && outBinds.length > 0) {
return resolver({
response: outBinds,
});
}
const rowIds = [];
let offset = 0;

if (obj.method === 'update') {
const modifiedRowsCount = obj.rowsAffected.length || obj.rowsAffected;
const updatedObjOutBinding = [];
const updatedOutBinds = [];
const updateOutBinds = (i) =>
function(value, index) {
const OutBindsOffset = index * modifiedRowsCount;
updatedOutBinds.push(outBinds[i + OutBindsOffset]);
};
for (let line = 0; line < obj.outBinding.length; line++) {
const ret = obj.outBinding[line];

for (let i = 0; i < modifiedRowsCount; i++) {
updatedObjOutBinding.push(obj.outBinding[0]);
_.each(obj.outBinding[0], updateOutBinds(i));
}
outBinds = updatedOutBinds;
obj.outBinding = updatedObjOutBinding;
}
offset =
offset +
(obj.outBinding[line - 1] ? obj.outBinding[line - 1].length : 0);

if (!obj.returning && outBinds.length === 0) {
return connection.commitAsync().then(function() {
resolver(obj);
});
}
const rowIds = [];
let offset = 0;
Bluebird.each(obj.outBinding, function(ret, line) {
offset =
offset +
(obj.outBinding[line - 1] ? obj.outBinding[line - 1].length : 0);
return Bluebird.each(ret, function(out, index) {
return new Bluebird(function(bindResolver, bindRejecter) {
if (out instanceof BlobHelper) {
const blob = outBinds[index + offset];
if (out.returning) {
obj.response[line] = obj.response[line] || {};
obj.response[line][out.columnName] = out.value;
}
blob.on('error', function(err) {
bindRejecter(err);
});
blob.on('finish', function() {
bindResolver();
});
blob.write(out.value);
blob.end();
} else if (obj.outBinding[line][index] === 'ROWID') {
rowIds.push(outBinds[index + offset]);
bindResolver();
} else {
obj.response[line] = obj.response[line] || {};
obj.response[line][out] = outBinds[index + offset];
bindResolver();
}
});
});
})
.then(function() {
return connection.commitAsync();
})
.then(function() {
if (obj.returningSql) {
return connection
.executeAsync(obj.returningSql(), rowIds, { resultSet: true })
.then(function(response) {
obj.response = response.rows;
return obj;
}, rejecter);
for (let index = 0; index < ret.length; index++) {
const out = ret[index];

await new Promise(function(bindResolver, bindRejecter) {
if (out instanceof BlobHelper) {
const blob = outBinds[index + offset];
if (out.returning) {
obj.response[line] = obj.response[line] || {};
obj.response[line][out.columnName] = out.value;
}
blob.on('error', function(err) {
bindRejecter(err);
});
blob.on('finish', function() {
bindResolver();
});
blob.write(out.value);
blob.end();
} else if (obj.outBinding[line][index] === 'ROWID') {
rowIds.push(outBinds[index + offset]);
bindResolver();
} else {
obj.response[line] = obj.response[line] || {};
obj.response[line][out] = outBinds[index + offset];
bindResolver();
}
});
}
}
return connection.commitAsync().then(function() {
if (obj.returningSql) {
return connection
.executeAsync(obj.returningSql(), rowIds, { resultSet: true })
.then(function(response) {
obj.response = response.rows;
return obj;
}, rejecter)
.then(function(obj) {
resolver(obj);
});
}, rejecter);
}
return obj;
});
});
};

// Handle clob
function readStream(stream, cb) {
const readStream = promisify((stream, cb) => {
const oracledb = require('oracledb');
let data = '';

Expand All @@ -370,7 +351,7 @@ function readStream(stream, cb) {
stream.on('end', function() {
cb(null, data);
});
}
});

// Process the response as returned from the query.
Client_Oracledb.prototype.processResponse = function(obj, runner) {
Expand Down
16 changes: 9 additions & 7 deletions test/tape/transactions.js
@@ -1,5 +1,4 @@
'use strict';
const Bluebird = require('bluebird');
const harness = require('./harness');
const tape = require('tape');
const JSONStream = require('JSONStream');
Expand Down Expand Up @@ -483,14 +482,17 @@ module.exports = function(knex) {
let cid,
queryCount = 0;

const runCommands = async (tx, commands) => {
for (const command of commands) {
await tx.raw(command);
}
};
return knex
.transaction(function(tx) {
Bluebird.each(
['SET join_collapse_limit to 1', 'SET enable_nestloop = off'],
function(request) {
return tx.raw(request);
}
)
runCommands(tx, [
'SET join_collapse_limit to 1',
'SET enable_nestloop = off',
])
.then(function() {
const stream = tx.table('test_table').stream();
stream.on('end', function() {
Expand Down