diff --git a/lib/dialects/oracledb/index.js b/lib/dialects/oracledb/index.js index 3a1fefc46c..aed6ded57d 100644 --- a/lib/dialects/oracledb/index.js +++ b/lib/dialects/oracledb/index.js @@ -7,6 +7,7 @@ const ColumnCompiler = require('./schema/columncompiler'); const { BlobHelper, ReturningHelper, isConnectionError } = require('./utils'); const Bluebird = require('bluebird'); const stream = require('stream'); +const { promisify } = require('util'); const Transaction = require('./transaction'); const Client_Oracle = require('../oracle'); const Oracle_Formatter = require('../oracle/formatter'); @@ -128,7 +129,7 @@ Client_Oracledb.prototype.acquireRawConnection = function() { }); }); }; - const fetchAsync = function(sql, bindParams, options, cb) { + const fetchAsync = promisify(function(sql, bindParams, options, cb) { options = options || {}; options.outFormat = client.driver.OBJECT; if (options.resultSet) { @@ -173,61 +174,45 @@ Client_Oracledb.prototype.acquireRawConnection = function() { } else { connection.execute(sql, bindParams || [], options, cb); } - }; + }); connection.executeAsync = function(sql, bindParams, options) { // Read all lob - return new Bluebird(function(resultResolve, resultReject) { - fetchAsync(sql, bindParams, options, function(err, results) { - if (err) { - return resultReject(err); - } - // Collect LOBs to read - const lobs = []; - if (results.rows) { - if (Array.isArray(results.rows)) { - for (let i = 0; i < results.rows.length; i++) { - // Iterate through the rows - const row = results.rows[i]; - for (const column in row) { - if (row[column] instanceof stream.Readable) { - lobs.push({ index: i, key: column, stream: row[column] }); - } + return fetchAsync(sql, bindParams, options).then(async (results) => { + const closeResultSet = () => { + return results.resultSet + ? promisify(results.resultSet.close).call(results.resultSet) + : Promise.resolve(); + }; + + // Collect LOBs to read + const lobs = []; + if (results.rows) { + if (Array.isArray(results.rows)) { + for (let i = 0; i < results.rows.length; i++) { + // Iterate through the rows + const row = results.rows[i]; + for (const column in row) { + if (row[column] instanceof stream.Readable) { + lobs.push({ index: i, key: column, stream: row[column] }); } } } } - Bluebird.each(lobs, function(lob) { - return new Bluebird(function(lobResolve, lobReject) { - readStream(lob.stream, function(err, d) { - if (err) { - if (results.resultSet) { - results.resultSet.close(function() { - return lobReject(err); - }); - } - return lobReject(err); - } - results.rows[lob.index][lob.key] = d; - lobResolve(); - }); - }); - }).then( - function() { - if (results.resultSet) { - results.resultSet.close(function(err) { - if (err) { - return resultReject(err); - } - return resultResolve(results); - }); - } - resultResolve(results); - }, - function(err) { - resultReject(err); - } - ); - }); + } + + try { + for (const lob of lobs) { + results.rows[lob.index][lob.key] = await readStream(lob.stream); + } + } catch (e) { + await closeResultSet().catch(() => {}); + + throw e; + } + + await closeResultSet(); + + return results; }); }; resolver(connection); @@ -245,110 +230,106 @@ Client_Oracledb.prototype.destroyRawConnection = function(connection) { // Runs the query on the specified connection, providing the bindings // and any other necessary prep work. Client_Oracledb.prototype._query = function(connection, obj) { - return new Bluebird(function(resolver, rejecter) { - if (!obj.sql) { - return rejecter(new Error('The query is empty')); + if (!obj.sql) throw new Error('The query is empty'); + + const options = { autoCommit: false }; + if (obj.method === 'select') { + options.resultSet = true; + } + return Bluebird.resolve( + connection.executeAsync(obj.sql, obj.bindings, options) + ).then(async function(response) { + // Flatten outBinds + let outBinds = _.flatten(response.outBinds); + obj.response = response.rows || []; + obj.rowsAffected = response.rows + ? response.rows.rowsAffected + : response.rowsAffected; + + //added for outBind parameter + if (obj.method === 'raw' && outBinds.length > 0) { + return { + response: outBinds, + }; + } + + if (obj.method === 'update') { + const modifiedRowsCount = obj.rowsAffected.length || obj.rowsAffected; + const updatedObjOutBinding = []; + const updatedOutBinds = []; + const updateOutBinds = (i) => + function(value, index) { + const OutBindsOffset = index * modifiedRowsCount; + updatedOutBinds.push(outBinds[i + OutBindsOffset]); + }; + + for (let i = 0; i < modifiedRowsCount; i++) { + updatedObjOutBinding.push(obj.outBinding[0]); + _.each(obj.outBinding[0], updateOutBinds(i)); + } + outBinds = updatedOutBinds; + obj.outBinding = updatedObjOutBinding; } - const options = { autoCommit: false }; - if (obj.method === 'select') { - options.resultSet = true; + + if (!obj.returning && outBinds.length === 0) { + await connection.commitAsync(); + return obj; } - connection - .executeAsync(obj.sql, obj.bindings, options) - .then(function(response) { - // Flatten outBinds - let outBinds = _.flatten(response.outBinds); - obj.response = response.rows || []; - obj.rowsAffected = response.rows - ? response.rows.rowsAffected - : response.rowsAffected; - - //added for outBind parameter - if (obj.method === 'raw' && outBinds.length > 0) { - return resolver({ - response: outBinds, - }); - } + const rowIds = []; + let offset = 0; - if (obj.method === 'update') { - const modifiedRowsCount = obj.rowsAffected.length || obj.rowsAffected; - const updatedObjOutBinding = []; - const updatedOutBinds = []; - const updateOutBinds = (i) => - function(value, index) { - const OutBindsOffset = index * modifiedRowsCount; - updatedOutBinds.push(outBinds[i + OutBindsOffset]); - }; + for (let line = 0; line < obj.outBinding.length; line++) { + const ret = obj.outBinding[line]; - for (let i = 0; i < modifiedRowsCount; i++) { - updatedObjOutBinding.push(obj.outBinding[0]); - _.each(obj.outBinding[0], updateOutBinds(i)); - } - outBinds = updatedOutBinds; - obj.outBinding = updatedObjOutBinding; - } + offset = + offset + + (obj.outBinding[line - 1] ? obj.outBinding[line - 1].length : 0); - if (!obj.returning && outBinds.length === 0) { - return connection.commitAsync().then(function() { - resolver(obj); - }); - } - const rowIds = []; - let offset = 0; - Bluebird.each(obj.outBinding, function(ret, line) { - offset = - offset + - (obj.outBinding[line - 1] ? obj.outBinding[line - 1].length : 0); - return Bluebird.each(ret, function(out, index) { - return new Bluebird(function(bindResolver, bindRejecter) { - if (out instanceof BlobHelper) { - const blob = outBinds[index + offset]; - if (out.returning) { - obj.response[line] = obj.response[line] || {}; - obj.response[line][out.columnName] = out.value; - } - blob.on('error', function(err) { - bindRejecter(err); - }); - blob.on('finish', function() { - bindResolver(); - }); - blob.write(out.value); - blob.end(); - } else if (obj.outBinding[line][index] === 'ROWID') { - rowIds.push(outBinds[index + offset]); - bindResolver(); - } else { - obj.response[line] = obj.response[line] || {}; - obj.response[line][out] = outBinds[index + offset]; - bindResolver(); - } - }); - }); - }) - .then(function() { - return connection.commitAsync(); - }) - .then(function() { - if (obj.returningSql) { - return connection - .executeAsync(obj.returningSql(), rowIds, { resultSet: true }) - .then(function(response) { - obj.response = response.rows; - return obj; - }, rejecter); + for (let index = 0; index < ret.length; index++) { + const out = ret[index]; + + await new Promise(function(bindResolver, bindRejecter) { + if (out instanceof BlobHelper) { + const blob = outBinds[index + offset]; + if (out.returning) { + obj.response[line] = obj.response[line] || {}; + obj.response[line][out.columnName] = out.value; } + blob.on('error', function(err) { + bindRejecter(err); + }); + blob.on('finish', function() { + bindResolver(); + }); + blob.write(out.value); + blob.end(); + } else if (obj.outBinding[line][index] === 'ROWID') { + rowIds.push(outBinds[index + offset]); + bindResolver(); + } else { + obj.response[line] = obj.response[line] || {}; + obj.response[line][out] = outBinds[index + offset]; + bindResolver(); + } + }); + } + } + return connection.commitAsync().then(function() { + if (obj.returningSql) { + return connection + .executeAsync(obj.returningSql(), rowIds, { resultSet: true }) + .then(function(response) { + obj.response = response.rows; return obj; - }, rejecter) - .then(function(obj) { - resolver(obj); }); - }, rejecter); + } + return obj; + }); }); }; // Handle clob -function readStream(stream, cb) { +const readStream = promisify((stream, cb) => { const oracledb = require('oracledb'); let data = ''; @@ -370,7 +351,7 @@ function readStream(stream, cb) { stream.on('end', function() { cb(null, data); }); -} +}); // Process the response as returned from the query. Client_Oracledb.prototype.processResponse = function(obj, runner) { diff --git a/test/tape/transactions.js b/test/tape/transactions.js index 2fad17b5dc..78df3ff88f 100644 --- a/test/tape/transactions.js +++ b/test/tape/transactions.js @@ -1,5 +1,4 @@ 'use strict'; -const Bluebird = require('bluebird'); const harness = require('./harness'); const tape = require('tape'); const JSONStream = require('JSONStream'); @@ -483,14 +482,17 @@ module.exports = function(knex) { let cid, queryCount = 0; + const runCommands = async (tx, commands) => { + for (const command of commands) { + await tx.raw(command); + } + }; return knex .transaction(function(tx) { - Bluebird.each( - ['SET join_collapse_limit to 1', 'SET enable_nestloop = off'], - function(request) { - return tx.raw(request); - } - ) + runCommands(tx, [ + 'SET join_collapse_limit to 1', + 'SET enable_nestloop = off', + ]) .then(function() { const stream = tx.table('test_table').stream(); stream.on('end', function() {