Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support for large update counts (JDBC 4.2) #935

Merged
merged 1 commit into from Oct 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -48,7 +48,7 @@ void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
* @param insertOID for a single-row INSERT query, the OID of the newly inserted row; 0 if not
* available.
*/
void handleCommandStatus(String status, int updateCount, long insertOID);
void handleCommandStatus(String status, long updateCount, long insertOID);

/**
* Called when a warning is emitted.
Expand Down
Expand Up @@ -30,7 +30,7 @@ public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tup
}

@Override
public void handleCommandStatus(String status, int updateCount, long insertOID) {
public void handleCommandStatus(String status, long updateCount, long insertOID) {
}

@Override
Expand Down
Expand Up @@ -31,7 +31,7 @@ public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tup
}

@Override
public void handleCommandStatus(String status, int updateCount, long insertOID) {
public void handleCommandStatus(String status, long updateCount, long insertOID) {
if (delegate != null) {
delegate.handleCommandStatus(status, updateCount, insertOID);
}
Expand Down
22 changes: 8 additions & 14 deletions pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java
Expand Up @@ -51,7 +51,6 @@
import java.net.SocketTimeoutException;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -558,7 +557,8 @@ public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tup
}
}

public void handleCommandStatus(String status, int updateCount, long insertOID) {
@Override
public void handleCommandStatus(String status, long updateCount, long insertOID) {
if (!sawBegin) {
sawBegin = true;
if (!status.equals("BEGIN")) {
Expand Down Expand Up @@ -600,7 +600,8 @@ public void doSubprotocolBegin() throws SQLException {
ResultHandler handler = new ResultHandlerBase() {
private boolean sawBegin = false;

public void handleCommandStatus(String status, int updateCount, long insertOID) {
@Override
public void handleCommandStatus(String status, long updateCount, long insertOID) {
if (!sawBegin) {
if (!status.equals("BEGIN")) {
handleError(
Expand All @@ -614,6 +615,7 @@ public void handleCommandStatus(String status, int updateCount, long insertOID)
}
}

@Override
public void handleWarning(SQLWarning warning) {
// we don't want to ignore warnings and it would be tricky
// to chain them back to the connection, so since we don't
Expand Down Expand Up @@ -2408,7 +2410,8 @@ public synchronized void fetch(ResultCursor cursor, ResultHandler handler, int f
// (if the fetch returns no rows, we see just a CommandStatus..)
final ResultHandler delegateHandler = handler;
handler = new ResultHandlerDelegate(delegateHandler) {
public void handleCommandStatus(String status, int updateCount, long insertOID) {
@Override
public void handleCommandStatus(String status, long updateCount, long insertOID) {
handleResultRows(portal.getQuery(), null, new ArrayList<byte[][]>(), null);
}
};
Expand Down Expand Up @@ -2539,16 +2542,7 @@ private void interpretCommandStatus(String status, ResultHandler handler) {
long oid = commandCompleteParser.getOid();
long count = commandCompleteParser.getRows();

int countAsInt = 0;
if (count > Integer.MAX_VALUE) {
// If command status indicates that we've modified more than Integer.MAX_VALUE rows
// then we set the result count to reflect that we cannot provide the actual number
// due to the JDBC field being an int rather than a long.
countAsInt = Statement.SUCCESS_NO_INFO;
} else if (count > 0) {
countAsInt = (int) count;
}
handler.handleCommandStatus(status, countAsInt, oid);
handler.handleCommandStatus(status, count, oid);
}

private void receiveRFQ() throws IOException {
Expand Down
83 changes: 60 additions & 23 deletions pgjdbc/src/main/java/org/postgresql/jdbc/BatchResultHandler.java
Expand Up @@ -28,16 +28,17 @@
* Internal class, it is not a part of public API.
*/
public class BatchResultHandler extends ResultHandlerBase {
private PgStatement pgStatement;

private final PgStatement pgStatement;
private int resultIndex = 0;

private final Query[] queries;
private final int[] updateCounts;
private final long[] longUpdateCounts;
private final ParameterList[] parameterLists;
private final boolean expectGeneratedKeys;
private PgResultSet generatedKeys;
private int committedRows; // 0 means no rows committed. 1 means row 0 was committed, and so on
private List<List<byte[][]>> allGeneratedRows;
private final List<List<byte[][]>> allGeneratedRows;
private List<byte[][]> latestGeneratedRows;
private PgResultSet latestGeneratedKeysRs;

Expand All @@ -46,11 +47,12 @@ public class BatchResultHandler extends ResultHandlerBase {
this.pgStatement = pgStatement;
this.queries = queries;
this.parameterLists = parameterLists;
this.updateCounts = new int[queries.length];
this.longUpdateCounts = new long[queries.length];
this.expectGeneratedKeys = expectGeneratedKeys;
this.allGeneratedRows = !expectGeneratedKeys ? null : new ArrayList<List<byte[][]>>();
}

@Override
public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
ResultCursor cursor) {
// If SELECT, then handleCommandStatus call would just be missing
Expand All @@ -63,17 +65,17 @@ public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tup
try {
// If SELECT, the resulting ResultSet is not valid
// Thus it is up to handleCommandStatus to decide if resultSet is good enough
latestGeneratedKeysRs =
(PgResultSet) pgStatement.createResultSet(fromQuery, fields,
new ArrayList<byte[][]>(), cursor);
latestGeneratedKeysRs = (PgResultSet) pgStatement.createResultSet(fromQuery, fields,
new ArrayList<byte[][]>(), cursor);
} catch (SQLException e) {
handleError(e);
}
}
latestGeneratedRows = tuples;
}

public void handleCommandStatus(String status, int updateCount, long insertOID) {
@Override
public void handleCommandStatus(String status, long updateCount, long insertOID) {
if (latestGeneratedRows != null) {
// We have DML. Decrease resultIndex that was just increased in handleResultRows
resultIndex--;
Expand All @@ -95,7 +97,7 @@ public void handleCommandStatus(String status, int updateCount, long insertOID)
}
latestGeneratedKeysRs = null;

updateCounts[resultIndex++] = updateCount;
longUpdateCounts[resultIndex++] = updateCount;
}

private boolean isAutoCommit() {
Expand Down Expand Up @@ -125,14 +127,15 @@ private void updateGeneratedKeys() {
allGeneratedRows.clear();
}

@Override
public void handleWarning(SQLWarning warning) {
pgStatement.addWarning(warning);
}

@Override
public void handleError(SQLException newError) {
if (getException() == null) {
Arrays.fill(updateCounts, committedRows, updateCounts.length, Statement.EXECUTE_FAILED);
Arrays.fill(longUpdateCounts, committedRows, longUpdateCounts.length, Statement.EXECUTE_FAILED);
if (allGeneratedRows != null) {
allGeneratedRows.clear();
}
Expand All @@ -142,30 +145,50 @@ public void handleError(SQLException newError) {
queryString = queries[resultIndex].toString(parameterLists[resultIndex]);
}

BatchUpdateException batchException = new BatchUpdateException(
BatchUpdateException batchException;
//#if mvn.project.property.postgresql.jdbc.spec >= "JDBC4.2"
batchException = new BatchUpdateException(
GT.tr("Batch entry {0} {1} was aborted: {2} Call getNextException to see other errors in the batch.",
resultIndex, queryString, newError.getMessage()),
newError.getSQLState(), 0, uncompressLongUpdateCount(), newError);
//#else
batchException = new BatchUpdateException(
GT.tr("Batch entry {0} {1} was aborted: {2} Call getNextException to see other errors in the batch.",
resultIndex, queryString, newError.getMessage()),
newError.getSQLState(), uncompressUpdateCount());
batchException.initCause(newError);
newError.getSQLState(), 0, uncompressUpdateCount(), newError);
//#endif

super.handleError(batchException);
}
resultIndex++;

super.handleError(newError);
}

@Override
public void handleCompletion() throws SQLException {
updateGeneratedKeys();
SQLException batchException = getException();
if (batchException != null) {
if (isAutoCommit()) {
// Re-create batch exception since rows after exception might indeed succeed.
BatchUpdateException newException = new BatchUpdateException(
BatchUpdateException newException;
//#if mvn.project.property.postgresql.jdbc.spec >= "JDBC4.2"
newException = new BatchUpdateException(
batchException.getMessage(),
batchException.getSQLState(),
uncompressUpdateCount()
batchException.getSQLState(), 0,
uncompressLongUpdateCount(),
batchException.getCause()
);
newException.initCause(batchException.getCause());
//#else
newException = new BatchUpdateException(
batchException.getMessage(),
batchException.getSQLState(), 0,
uncompressUpdateCount(),
batchException.getCause()
);
//#endif

SQLException next = batchException.getNextException();
if (next != null) {
newException.setNextException(next);
Expand All @@ -181,8 +204,21 @@ public ResultSet getGeneratedKeys() {
}

private int[] uncompressUpdateCount() {
long[] original = uncompressLongUpdateCount();
int[] copy = new int[original.length];
for (int i = 0; i < original.length; i++) {
copy[i] = original[i] > Integer.MAX_VALUE ? Statement.SUCCESS_NO_INFO : (int) original[i];
}
return copy;
}

public int[] getUpdateCount() {
return uncompressUpdateCount();
}

private long[] uncompressLongUpdateCount() {
if (!(queries[0] instanceof BatchedQuery)) {
return updateCounts;
return longUpdateCounts;
}
int totalRows = 0;
boolean hasRewrites = false;
Expand All @@ -192,20 +228,20 @@ private int[] uncompressUpdateCount() {
hasRewrites |= batchSize > 1;
}
if (!hasRewrites) {
return updateCounts;
return longUpdateCounts;
}

/* In this situation there is a batch that has been rewritten. Substitute
* the running total returned by the database with a status code to
* indicate successful completion for each row the driver client added
* to the batch.
*/
int[] newUpdateCounts = new int[totalRows];
long[] newUpdateCounts = new long[totalRows];
int offset = 0;
for (int i = 0; i < queries.length; i++) {
Query query = queries[i];
int batchSize = query.getBatchSize();
int superBatchResult = updateCounts[i];
long superBatchResult = longUpdateCounts[i];
if (batchSize == 1) {
newUpdateCounts[offset++] = superBatchResult;
continue;
Expand All @@ -221,7 +257,8 @@ private int[] uncompressUpdateCount() {
return newUpdateCounts;
}

public int[] getUpdateCount() {
return uncompressUpdateCount();
public long[] getLargeUpdateCount() {
return uncompressLongUpdateCount();
}

}
Expand Up @@ -122,9 +122,18 @@ public int executeUpdate(String sql) throws SQLException {
@Override
public int executeUpdate() throws SQLException {
executeWithFlags(QueryExecutor.QUERY_NO_RESULTS);
checkNoResultUpdate();
return getUpdateCount();
}

return getNoResultUpdateCount();
//#if mvn.project.property.postgresql.jdbc.spec >= "JDBC4.2"
@Override
public long executeLargeUpdate() throws SQLException {
executeWithFlags(QueryExecutor.QUERY_NO_RESULTS);
checkNoResultUpdate();
return getLargeUpdateCount();
}
//#endif

@Override
public boolean execute(String sql) throws SQLException {
Expand Down
5 changes: 4 additions & 1 deletion pgjdbc/src/main/java/org/postgresql/jdbc/PgResultSet.java
Expand Up @@ -1744,17 +1744,20 @@ private void updateRowBuffer() throws SQLException {

public class CursorResultHandler extends ResultHandlerBase {

@Override
public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
ResultCursor cursor) {
PgResultSet.this.rows = tuples;
PgResultSet.this.cursor = cursor;
}

public void handleCommandStatus(String status, int updateCount, long insertOID) {
@Override
public void handleCommandStatus(String status, long updateCount, long insertOID) {
handleError(new PSQLException(GT.tr("Unexpected command status: {0}.", status),
PSQLState.PROTOCOL_VIOLATION));
}

@Override
public void handleCompletion() throws SQLException {
SQLWarning warning = getWarning();
if (warning != null) {
Expand Down