Skip to content

Commit

Permalink
refactor: introduce tuple abstraction (rebased) (#1701)
Browse files Browse the repository at this point in the history
* refactor: introduce tuple abstraction

This replaces the use of byte[][] in a number of places with a Tuple
class that wraps it. This is a necessary change to support doing
anything other than blindly reading incoming data into a heap-
allocated byte array.

* docs: fix up copyright and add some javadoc

* Extract remaining tuples in new code

Co-authored-by: Tom Dunstan <tomdcc@users.noreply.github.com>
  • Loading branch information
Lőrinc Pap and tomdcc committed Feb 10, 2020
1 parent 8a8e8ff commit ed09fd1
Show file tree
Hide file tree
Showing 15 changed files with 240 additions and 139 deletions.
4 changes: 2 additions & 2 deletions pgjdbc/src/main/java/org/postgresql/core/BaseStatement.java
Expand Up @@ -24,7 +24,7 @@ public interface BaseStatement extends PGStatement, Statement {
* @return the new ResultSet
* @throws SQLException if something goes wrong
*/
ResultSet createDriverResultSet(Field[] fields, List<byte[][]> tuples) throws SQLException;
ResultSet createDriverResultSet(Field[] fields, List<Tuple> tuples) throws SQLException;

/**
* Create a resultset from data retrieved from the server.
Expand All @@ -38,7 +38,7 @@ public interface BaseStatement extends PGStatement, Statement {
* @return the new ResultSet
* @throws SQLException if something goes wrong
*/
ResultSet createResultSet(Query originalQuery, Field[] fields, List<byte[][]> tuples,
ResultSet createResultSet(Query originalQuery, Field[] fields, List<Tuple> tuples,
ResultCursor cursor) throws SQLException;

/**
Expand Down
4 changes: 2 additions & 2 deletions pgjdbc/src/main/java/org/postgresql/core/PGStream.java
Expand Up @@ -472,7 +472,7 @@ public String receiveString() throws IOException {
* @throws IOException if a data I/O error occurs
* @throws SQLException if read more bytes than set maxResultBuffer
*/
public byte[][] receiveTupleV3() throws IOException, OutOfMemoryError, SQLException {
public Tuple receiveTupleV3() throws IOException, OutOfMemoryError, SQLException {
int messageSize = receiveInteger4(); // MESSAGE SIZE
int nf = receiveInteger2();
//size = messageSize - 4 bytes of message size - 2 bytes of field count - 4 bytes for each column length
Expand All @@ -498,7 +498,7 @@ public byte[][] receiveTupleV3() throws IOException, OutOfMemoryError, SQLExcept
throw oom;
}

return answer;
return new Tuple(answer);
}

/**
Expand Down
3 changes: 1 addition & 2 deletions pgjdbc/src/main/java/org/postgresql/core/ResultHandler.java
Expand Up @@ -36,8 +36,7 @@ public interface ResultHandler {
* @param cursor a cursor to use to fetch additional data; <code>null</code> if no further results
* are present.
*/
void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
ResultCursor cursor);
void handleResultRows(Query fromQuery, Field[] fields, List<Tuple> tuples, ResultCursor cursor);

/**
* Called when a query that did not return a resultset completes.
Expand Down
Expand Up @@ -25,7 +25,7 @@ public class ResultHandlerBase implements ResultHandler {
private SQLWarning lastWarning;

@Override
public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
public void handleResultRows(Query fromQuery, Field[] fields, List<Tuple> tuples,
ResultCursor cursor) {
}

Expand Down
Expand Up @@ -23,7 +23,7 @@ public ResultHandlerDelegate(ResultHandler delegate) {
}

@Override
public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
public void handleResultRows(Query fromQuery, Field[] fields, List<Tuple> tuples,
ResultCursor cursor) {
if (delegate != null) {
delegate.handleResultRows(fromQuery, fields, tuples, cursor);
Expand Down
10 changes: 5 additions & 5 deletions pgjdbc/src/main/java/org/postgresql/core/SetupQueryRunner.java
Expand Up @@ -21,13 +21,13 @@
public class SetupQueryRunner {

private static class SimpleResultHandler extends ResultHandlerBase {
private List<byte[][]> tuples;
private List<Tuple> tuples;

List<byte[][]> getResults() {
List<Tuple> getResults() {
return tuples;
}

public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
public void handleResultRows(Query fromQuery, Field[] fields, List<Tuple> tuples,
ResultCursor cursor) {
this.tuples = tuples;
}
Expand All @@ -38,7 +38,7 @@ public void handleWarning(SQLWarning warning) {
}
}

public static byte[][] run(QueryExecutor executor, String queryString,
public static Tuple run(QueryExecutor executor, String queryString,
boolean wantResults) throws SQLException {
Query query = executor.createSimpleQuery(queryString);
SimpleResultHandler handler = new SimpleResultHandler();
Expand All @@ -59,7 +59,7 @@ public static byte[][] run(QueryExecutor executor, String queryString,
return null;
}

List<byte[][]> tuples = handler.getResults();
List<Tuple> tuples = handler.getResults();
if (tuples == null || tuples.size() != 1) {
throw new PSQLException(GT.tr("An unexpected result was returned by a query."),
PSQLState.CONNECTION_UNABLE_TO_CONNECT);
Expand Down
100 changes: 100 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/core/Tuple.java
@@ -0,0 +1,100 @@
/*
* Copyright (c) 2020, PostgreSQL Global Development Group
* See the LICENSE file in the project root for more information.
*/

package org.postgresql.core;

/**
* Class representing a row in a {@link java.sql.ResultSet}.
*/
public class Tuple {
private final boolean forUpdate;
final byte[][] data;

/**
* Construct an empty tuple. Used in updatable result sets.
* @param length the number of fields in the tuple.
*/
public Tuple(int length) {
this(new byte[length][], true);
}

/**
* Construct a populated tuple. Used when returning results.
* @param data the tuple data
*/
public Tuple(byte[][] data) {
this(data, false);
}

private Tuple(byte[][] data, boolean forUpdate) {
this.data = data;
this.forUpdate = forUpdate;
}

/**
* Number of fields in the tuple
* @return number of fields
*/
public int fieldCount() {
return data.length;
}

/**
* Total length in bytes of the tuple data.
* @return the number of bytes in this tuple
*/
public int length() {
int length = 0;
for (byte[] field : data) {
if (field != null) {
length += field.length;
}
}
return length;
}

/**
* Get the data for the given field
* @param index 0-based field position in the tuple
* @return byte array of the data
*/
public byte[] get(int index) {
return data[index];
}

/**
* Create a copy of the tuple for updating.
* @return a copy of the tuple that allows updates
*/
public Tuple updateableCopy() {
return copy(true);
}

/**
* Create a read-only copy of the tuple
* @return a copy of the tuple that does not allow updates
*/
public Tuple readOnlyCopy() {
return copy(false);
}

private Tuple copy(boolean forUpdate) {
byte[][] dataCopy = new byte[data.length][];
System.arraycopy(data, 0, dataCopy, 0, data.length);
return new Tuple(dataCopy, forUpdate);
}

/**
* Set the given field to the given data.
* @param index 0-based field position
* @param fieldData the data to set
*/
public void set(int index, byte[] fieldData) {
if (!forUpdate) {
throw new IllegalArgumentException("Attempted to write to readonly tuple");
}
data[index] = fieldData;
}
}
Expand Up @@ -13,6 +13,7 @@
import org.postgresql.core.ServerVersion;
import org.postgresql.core.SetupQueryRunner;
import org.postgresql.core.SocketFactoryFactory;
import org.postgresql.core.Tuple;
import org.postgresql.core.Utils;
import org.postgresql.core.Version;
import org.postgresql.hostchooser.CandidateHost;
Expand Down Expand Up @@ -753,8 +754,8 @@ private void runInitialQueries(QueryExecutor queryExecutor, Properties info)
}

private boolean isMaster(QueryExecutor queryExecutor) throws SQLException, IOException {
byte[][] results = SetupQueryRunner.run(queryExecutor, "show transaction_read_only", true);
String value = queryExecutor.getEncoding().decode(results[0]);
Tuple results = SetupQueryRunner.run(queryExecutor, "show transaction_read_only", true);
String value = queryExecutor.getEncoding().decode(results.get(0));
return value.equalsIgnoreCase("off");
}
}
27 changes: 11 additions & 16 deletions pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java
Expand Up @@ -31,6 +31,7 @@
import org.postgresql.core.SqlCommand;
import org.postgresql.core.SqlCommandType;
import org.postgresql.core.TransactionState;
import org.postgresql.core.Tuple;
import org.postgresql.core.Utils;
import org.postgresql.core.v3.replication.V3ReplicationProtocol;
import org.postgresql.jdbc.AutoSave;
Expand Down Expand Up @@ -555,7 +556,7 @@ private ResultHandler sendQueryPreamble(final ResultHandler delegateHandler, int
return new ResultHandlerDelegate(delegateHandler) {
private boolean sawBegin = false;

public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
public void handleResultRows(Query fromQuery, Field[] fields, List<Tuple> tuples,
ResultCursor cursor) {
if (sawBegin) {
super.handleResultRows(fromQuery, fields, tuples, cursor);
Expand Down Expand Up @@ -1999,7 +2000,7 @@ protected void processResults(ResultHandler handler, int flags) throws IOExcepti
boolean noResults = (flags & QueryExecutor.QUERY_NO_RESULTS) != 0;
boolean bothRowsAndStatus = (flags & QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS) != 0;

List<byte[][]> tuples = null;
List<Tuple> tuples = null;

int c;
boolean endQuery = false;
Expand Down Expand Up @@ -2093,7 +2094,7 @@ protected void processResults(ResultHandler handler, int flags) throws IOExcepti
Field[] fields = currentQuery.getFields();

if (fields != null) { // There was a resultset.
tuples = new ArrayList<byte[][]>();
tuples = new ArrayList<Tuple>();
handler.handleResultRows(currentQuery, fields, tuples, null);
tuples = null;
}
Expand All @@ -2115,7 +2116,7 @@ protected void processResults(ResultHandler handler, int flags) throws IOExcepti
if (fields != null && tuples == null) {
// When no results expected, pretend an empty resultset was returned
// Not sure if new ArrayList can be always replaced with emptyList
tuples = noResults ? Collections.<byte[][]>emptyList() : new ArrayList<byte[][]>();
tuples = noResults ? Collections.<Tuple>emptyList() : new ArrayList<Tuple>();
}

handler.handleResultRows(currentQuery, fields, tuples, currentPortal);
Expand Down Expand Up @@ -2166,7 +2167,7 @@ protected void processResults(ResultHandler handler, int flags) throws IOExcepti
if (fields != null && tuples == null) {
// When no results expected, pretend an empty resultset was returned
// Not sure if new ArrayList can be always replaced with emptyList
tuples = noResults ? Collections.<byte[][]>emptyList() : new ArrayList<byte[][]>();
tuples = noResults ? Collections.<Tuple>emptyList() : new ArrayList<Tuple>();
}

// If we received tuples we must know the structure of the
Expand Down Expand Up @@ -2203,7 +2204,7 @@ protected void processResults(ResultHandler handler, int flags) throws IOExcepti
}

case 'D': // Data Transfer (ongoing Execute response)
byte[][] tuple = null;
Tuple tuple = null;
try {
tuple = pgStream.receiveTupleV3();
} catch (OutOfMemoryError oome) {
Expand All @@ -2217,7 +2218,7 @@ protected void processResults(ResultHandler handler, int flags) throws IOExcepti
}
if (!noResults) {
if (tuples == null) {
tuples = new ArrayList<byte[][]>();
tuples = new ArrayList<Tuple>();
}
tuples.add(tuple);
}
Expand All @@ -2227,13 +2228,7 @@ protected void processResults(ResultHandler handler, int flags) throws IOExcepti
if (tuple == null) {
length = -1;
} else {
length = 0;
for (byte[] aTuple : tuple) {
if (aTuple == null) {
continue;
}
length += aTuple.length;
}
length = tuple.length();
}
LOGGER.log(Level.FINEST, " <=BE DataRow(len={0})", length);
}
Expand Down Expand Up @@ -2287,7 +2282,7 @@ protected void processResults(ResultHandler handler, int flags) throws IOExcepti

case 'T': // Row Description (response to Describe)
Field[] fields = receiveFields();
tuples = new ArrayList<byte[][]>();
tuples = new ArrayList<Tuple>();

SimpleQuery query = pendingDescribePortalQueue.peekFirst();
if (!pendingExecuteQueue.isEmpty() && !pendingExecuteQueue.peekFirst().asSimple) {
Expand Down Expand Up @@ -2422,7 +2417,7 @@ public synchronized void fetch(ResultCursor cursor, ResultHandler handler, int f
handler = new ResultHandlerDelegate(delegateHandler) {
@Override
public void handleCommandStatus(String status, long updateCount, long insertOID) {
handleResultRows(portal.getQuery(), null, new ArrayList<byte[][]>(), null);
handleResultRows(portal.getQuery(), null, new ArrayList<Tuple>(), null);
}
};

Expand Down
13 changes: 7 additions & 6 deletions pgjdbc/src/main/java/org/postgresql/jdbc/BatchResultHandler.java
Expand Up @@ -10,6 +10,7 @@
import org.postgresql.core.Query;
import org.postgresql.core.ResultCursor;
import org.postgresql.core.ResultHandlerBase;
import org.postgresql.core.Tuple;
import org.postgresql.core.v3.BatchedQuery;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
Expand Down Expand Up @@ -38,8 +39,8 @@ public class BatchResultHandler extends ResultHandlerBase {
private final boolean expectGeneratedKeys;
private PgResultSet generatedKeys;
private int committedRows; // 0 means no rows committed. 1 means row 0 was committed, and so on
private final List<List<byte[][]>> allGeneratedRows;
private List<byte[][]> latestGeneratedRows;
private final List<List<Tuple>> allGeneratedRows;
private List<Tuple> latestGeneratedRows;
private PgResultSet latestGeneratedKeysRs;

BatchResultHandler(PgStatement pgStatement, Query[] queries, ParameterList[] parameterLists,
Expand All @@ -49,11 +50,11 @@ public class BatchResultHandler extends ResultHandlerBase {
this.parameterLists = parameterLists;
this.longUpdateCounts = new long[queries.length];
this.expectGeneratedKeys = expectGeneratedKeys;
this.allGeneratedRows = !expectGeneratedKeys ? null : new ArrayList<List<byte[][]>>();
this.allGeneratedRows = !expectGeneratedKeys ? null : new ArrayList<List<Tuple>>();
}

@Override
public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
public void handleResultRows(Query fromQuery, Field[] fields, List<Tuple> tuples,
ResultCursor cursor) {
// If SELECT, then handleCommandStatus call would just be missing
resultIndex++;
Expand All @@ -66,7 +67,7 @@ public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tup
// If SELECT, the resulting ResultSet is not valid
// Thus it is up to handleCommandStatus to decide if resultSet is good enough
latestGeneratedKeysRs = (PgResultSet) pgStatement.createResultSet(fromQuery, fields,
new ArrayList<byte[][]>(), cursor);
new ArrayList<Tuple>(), cursor);
} catch (SQLException e) {
handleError(e);
}
Expand Down Expand Up @@ -121,7 +122,7 @@ private void updateGeneratedKeys() {
if (allGeneratedRows == null || allGeneratedRows.isEmpty()) {
return;
}
for (List<byte[][]> rows : allGeneratedRows) {
for (List<Tuple> rows : allGeneratedRows) {
generatedKeys.addRows(rows);
}
allGeneratedRows.clear();
Expand Down
Expand Up @@ -9,6 +9,7 @@
import org.postgresql.core.ParameterList;
import org.postgresql.core.Query;
import org.postgresql.core.ResultCursor;
import org.postgresql.core.Tuple;

import java.util.List;

Expand All @@ -17,7 +18,7 @@ class CallableBatchResultHandler extends BatchResultHandler {
super(statement, queries, parameterLists, false);
}

public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples, ResultCursor cursor) {
public void handleResultRows(Query fromQuery, Field[] fields, List<Tuple> tuples, ResultCursor cursor) {
/* ignore */
}
}

0 comments on commit ed09fd1

Please sign in to comment.