Skip to content

Commit

Permalink
feat: support for large update counts (JDBC 4.2)
Browse files Browse the repository at this point in the history
Added support for large update counts of the JDBC 4.2 API (Java 8+)

This implementation supports:
PreparedStatement.executeLargeUpdate()
Statement.executeLargeUpdate(String sql)
Statement.getLargeUpdateCount()
Statement.executeLargeBatch()
Statement.executeLargeUpdate(String sql, int autoGeneratedKeys) *
Statement.executeLargeUpdate(String sql, int[] columnIndexes) *
Statement.executeLargeUpdate(String sql, String[] columnNames) *
  • Loading branch information
jorsol committed Sep 26, 2019
1 parent bd9485e commit cc524b9
Show file tree
Hide file tree
Showing 12 changed files with 588 additions and 74 deletions.
Expand Up @@ -48,7 +48,7 @@ void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
* @param insertOID for a single-row INSERT query, the OID of the newly inserted row; 0 if not
* available.
*/
void handleCommandStatus(String status, int updateCount, long insertOID);
void handleCommandStatus(String status, long updateCount, long insertOID);

/**
* Called when a warning is emitted.
Expand Down
Expand Up @@ -30,7 +30,7 @@ public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tup
}

@Override
public void handleCommandStatus(String status, int updateCount, long insertOID) {
public void handleCommandStatus(String status, long updateCount, long insertOID) {
}

@Override
Expand Down
Expand Up @@ -31,7 +31,7 @@ public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tup
}

@Override
public void handleCommandStatus(String status, int updateCount, long insertOID) {
public void handleCommandStatus(String status, long updateCount, long insertOID) {
if (delegate != null) {
delegate.handleCommandStatus(status, updateCount, insertOID);
}
Expand Down
22 changes: 8 additions & 14 deletions pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java
Expand Up @@ -51,7 +51,6 @@
import java.net.SocketTimeoutException;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -558,7 +557,8 @@ public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tup
}
}

public void handleCommandStatus(String status, int updateCount, long insertOID) {
@Override
public void handleCommandStatus(String status, long updateCount, long insertOID) {
if (!sawBegin) {
sawBegin = true;
if (!status.equals("BEGIN")) {
Expand Down Expand Up @@ -600,7 +600,8 @@ public void doSubprotocolBegin() throws SQLException {
ResultHandler handler = new ResultHandlerBase() {
private boolean sawBegin = false;

public void handleCommandStatus(String status, int updateCount, long insertOID) {
@Override
public void handleCommandStatus(String status, long updateCount, long insertOID) {
if (!sawBegin) {
if (!status.equals("BEGIN")) {
handleError(
Expand All @@ -614,6 +615,7 @@ public void handleCommandStatus(String status, int updateCount, long insertOID)
}
}

@Override
public void handleWarning(SQLWarning warning) {
// we don't want to ignore warnings and it would be tricky
// to chain them back to the connection, so since we don't
Expand Down Expand Up @@ -2408,7 +2410,8 @@ public synchronized void fetch(ResultCursor cursor, ResultHandler handler, int f
// (if the fetch returns no rows, we see just a CommandStatus..)
final ResultHandler delegateHandler = handler;
handler = new ResultHandlerDelegate(delegateHandler) {
public void handleCommandStatus(String status, int updateCount, long insertOID) {
@Override
public void handleCommandStatus(String status, long updateCount, long insertOID) {
handleResultRows(portal.getQuery(), null, new ArrayList<byte[][]>(), null);
}
};
Expand Down Expand Up @@ -2539,16 +2542,7 @@ private void interpretCommandStatus(String status, ResultHandler handler) {
long oid = commandCompleteParser.getOid();
long count = commandCompleteParser.getRows();

int countAsInt = 0;
if (count > Integer.MAX_VALUE) {
// If command status indicates that we've modified more than Integer.MAX_VALUE rows
// then we set the result count to reflect that we cannot provide the actual number
// due to the JDBC field being an int rather than a long.
countAsInt = Statement.SUCCESS_NO_INFO;
} else if (count > 0) {
countAsInt = (int) count;
}
handler.handleCommandStatus(status, countAsInt, oid);
handler.handleCommandStatus(status, count, oid);
}

private void receiveRFQ() throws IOException {
Expand Down
83 changes: 60 additions & 23 deletions pgjdbc/src/main/java/org/postgresql/jdbc/BatchResultHandler.java
Expand Up @@ -28,16 +28,17 @@
* Internal class, it is not a part of public API.
*/
public class BatchResultHandler extends ResultHandlerBase {
private PgStatement pgStatement;

private final PgStatement pgStatement;
private int resultIndex = 0;

private final Query[] queries;
private final int[] updateCounts;
private final long[] longUpdateCounts;
private final ParameterList[] parameterLists;
private final boolean expectGeneratedKeys;
private PgResultSet generatedKeys;
private int committedRows; // 0 means no rows committed. 1 means row 0 was committed, and so on
private List<List<byte[][]>> allGeneratedRows;
private final List<List<byte[][]>> allGeneratedRows;
private List<byte[][]> latestGeneratedRows;
private PgResultSet latestGeneratedKeysRs;

Expand All @@ -46,11 +47,12 @@ public class BatchResultHandler extends ResultHandlerBase {
this.pgStatement = pgStatement;
this.queries = queries;
this.parameterLists = parameterLists;
this.updateCounts = new int[queries.length];
this.longUpdateCounts = new long[queries.length];
this.expectGeneratedKeys = expectGeneratedKeys;
this.allGeneratedRows = !expectGeneratedKeys ? null : new ArrayList<List<byte[][]>>();
}

@Override
public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
ResultCursor cursor) {
// If SELECT, then handleCommandStatus call would just be missing
Expand All @@ -63,17 +65,17 @@ public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tup
try {
// If SELECT, the resulting ResultSet is not valid
// Thus it is up to handleCommandStatus to decide if resultSet is good enough
latestGeneratedKeysRs =
(PgResultSet) pgStatement.createResultSet(fromQuery, fields,
new ArrayList<byte[][]>(), cursor);
latestGeneratedKeysRs = (PgResultSet) pgStatement.createResultSet(fromQuery, fields,
new ArrayList<byte[][]>(), cursor);
} catch (SQLException e) {
handleError(e);
}
}
latestGeneratedRows = tuples;
}

public void handleCommandStatus(String status, int updateCount, long insertOID) {
@Override
public void handleCommandStatus(String status, long updateCount, long insertOID) {
if (latestGeneratedRows != null) {
// We have DML. Decrease resultIndex that was just increased in handleResultRows
resultIndex--;
Expand All @@ -95,7 +97,7 @@ public void handleCommandStatus(String status, int updateCount, long insertOID)
}
latestGeneratedKeysRs = null;

updateCounts[resultIndex++] = updateCount;
longUpdateCounts[resultIndex++] = updateCount;
}

private boolean isAutoCommit() {
Expand Down Expand Up @@ -125,14 +127,15 @@ private void updateGeneratedKeys() {
allGeneratedRows.clear();
}

@Override
public void handleWarning(SQLWarning warning) {
pgStatement.addWarning(warning);
}

@Override
public void handleError(SQLException newError) {
if (getException() == null) {
Arrays.fill(updateCounts, committedRows, updateCounts.length, Statement.EXECUTE_FAILED);
Arrays.fill(longUpdateCounts, committedRows, longUpdateCounts.length, Statement.EXECUTE_FAILED);
if (allGeneratedRows != null) {
allGeneratedRows.clear();
}
Expand All @@ -142,30 +145,50 @@ public void handleError(SQLException newError) {
queryString = queries[resultIndex].toString(parameterLists[resultIndex]);
}

BatchUpdateException batchException = new BatchUpdateException(
BatchUpdateException batchException;
//#if mvn.project.property.postgresql.jdbc.spec >= "JDBC4.2"
batchException = new BatchUpdateException(
GT.tr("Batch entry {0} {1} was aborted: {2} Call getNextException to see other errors in the batch.",
resultIndex, queryString, newError.getMessage()),
newError.getSQLState(), 0, uncompressLongUpdateCount(), newError);
//#else
batchException = new BatchUpdateException(
GT.tr("Batch entry {0} {1} was aborted: {2} Call getNextException to see other errors in the batch.",
resultIndex, queryString, newError.getMessage()),
newError.getSQLState(), uncompressUpdateCount());
batchException.initCause(newError);
newError.getSQLState(), 0, uncompressUpdateCount(), newError);
//#endif

super.handleError(batchException);
}
resultIndex++;

super.handleError(newError);
}

@Override
public void handleCompletion() throws SQLException {
updateGeneratedKeys();
SQLException batchException = getException();
if (batchException != null) {
if (isAutoCommit()) {
// Re-create batch exception since rows after exception might indeed succeed.
BatchUpdateException newException = new BatchUpdateException(
BatchUpdateException newException;
//#if mvn.project.property.postgresql.jdbc.spec >= "JDBC4.2"
newException = new BatchUpdateException(
batchException.getMessage(),
batchException.getSQLState(),
uncompressUpdateCount()
batchException.getSQLState(), 0,
uncompressLongUpdateCount(),
batchException.getCause()
);
newException.initCause(batchException.getCause());
//#else
newException = new BatchUpdateException(
batchException.getMessage(),
batchException.getSQLState(), 0,
uncompressUpdateCount(),
batchException.getCause()
);
//#endif

SQLException next = batchException.getNextException();
if (next != null) {
newException.setNextException(next);
Expand All @@ -181,8 +204,21 @@ public ResultSet getGeneratedKeys() {
}

private int[] uncompressUpdateCount() {
long[] original = uncompressLongUpdateCount();
int[] copy = new int[original.length];
for (int i = 0; i < original.length; i++) {
copy[i] = original[i] > Integer.MAX_VALUE ? Statement.SUCCESS_NO_INFO : (int) original[i];
}
return copy;
}

public int[] getUpdateCount() {
return uncompressUpdateCount();
}

private long[] uncompressLongUpdateCount() {
if (!(queries[0] instanceof BatchedQuery)) {
return updateCounts;
return longUpdateCounts;
}
int totalRows = 0;
boolean hasRewrites = false;
Expand All @@ -192,20 +228,20 @@ private int[] uncompressUpdateCount() {
hasRewrites |= batchSize > 1;
}
if (!hasRewrites) {
return updateCounts;
return longUpdateCounts;
}

/* In this situation there is a batch that has been rewritten. Substitute
* the running total returned by the database with a status code to
* indicate successful completion for each row the driver client added
* to the batch.
*/
int[] newUpdateCounts = new int[totalRows];
long[] newUpdateCounts = new long[totalRows];
int offset = 0;
for (int i = 0; i < queries.length; i++) {
Query query = queries[i];
int batchSize = query.getBatchSize();
int superBatchResult = updateCounts[i];
long superBatchResult = longUpdateCounts[i];
if (batchSize == 1) {
newUpdateCounts[offset++] = superBatchResult;
continue;
Expand All @@ -221,7 +257,8 @@ private int[] uncompressUpdateCount() {
return newUpdateCounts;
}

public int[] getUpdateCount() {
return uncompressUpdateCount();
public long[] getLargeUpdateCount() {
return uncompressLongUpdateCount();
}

}
Expand Up @@ -122,9 +122,18 @@ public int executeUpdate(String sql) throws SQLException {
@Override
public int executeUpdate() throws SQLException {
executeWithFlags(QueryExecutor.QUERY_NO_RESULTS);
checkNoResultUpdate();
return getUpdateCount();
}

return getNoResultUpdateCount();
//#if mvn.project.property.postgresql.jdbc.spec >= "JDBC4.2"
@Override
public long executeLargeUpdate() throws SQLException {
executeWithFlags(QueryExecutor.QUERY_NO_RESULTS);
checkNoResultUpdate();
return getLargeUpdateCount();
}
//#endif

@Override
public boolean execute(String sql) throws SQLException {
Expand Down
5 changes: 4 additions & 1 deletion pgjdbc/src/main/java/org/postgresql/jdbc/PgResultSet.java
Expand Up @@ -1744,17 +1744,20 @@ private void updateRowBuffer() throws SQLException {

public class CursorResultHandler extends ResultHandlerBase {

@Override
public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
ResultCursor cursor) {
PgResultSet.this.rows = tuples;
PgResultSet.this.cursor = cursor;
}

public void handleCommandStatus(String status, int updateCount, long insertOID) {
@Override
public void handleCommandStatus(String status, long updateCount, long insertOID) {
handleError(new PSQLException(GT.tr("Unexpected command status: {0}.", status),
PSQLState.PROTOCOL_VIOLATION));
}

@Override
public void handleCompletion() throws SQLException {
SQLWarning warning = getWarning();
if (warning != null) {
Expand Down

0 comments on commit cc524b9

Please sign in to comment.