Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: read only transactions #1252

Merged
merged 11 commits into from Nov 25, 2019
15 changes: 14 additions & 1 deletion pgjdbc/src/main/java/org/postgresql/PGProperty.java
Expand Up @@ -10,6 +10,7 @@
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;

import java.sql.Connection;
import java.sql.DriverPropertyInfo;
import java.util.Properties;

Expand Down Expand Up @@ -443,7 +444,19 @@ public enum PGProperty {
+ "to the database specified in the dbname parameter, "
+ "which will allow the connection to be used for logical replication "
+ "from that database. "
+ "(backend >= 9.4)");
+ "(backend >= 9.4)"),

/**
* Connection parameter to control behavior when
* {@link Connection#setReadOnly(boolean)} is set to {@code true}.
*/
READ_ONLY_MODE("readOnlyMode", "transaction",
"Controls the behavior when a connection is set to be read only, one of 'ignore', 'transaction', or 'always' "
+ "When 'ignore', setting readOnly has no effect. "
+ "When 'transaction' setting readOnly to 'true' will cause transactions to BEGIN READ ONLY if autocommit is 'false'. "
+ "When 'always' setting readOnly to 'true' will set the session to READ ONLY if autoCommit is 'true' "
+ "and the transaction to BEGIN READ ONLY if autocommit is 'false'.",
false, "ignore", "transaction", "always");

private final String name;
private final String defaultValue;
Expand Down
10 changes: 10 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/core/BaseConnection.java
Expand Up @@ -6,6 +6,7 @@
package org.postgresql.core;

import org.postgresql.PGConnection;
import org.postgresql.PGProperty;
import org.postgresql.jdbc.FieldMetadata;
import org.postgresql.jdbc.TimestampUtils;
import org.postgresql.util.LruCache;
Expand Down Expand Up @@ -202,4 +203,13 @@ CachedQuery createQuery(String sql, boolean escapeProcessing, boolean isParamete
* @param flushCacheOnDeallocate true if statement cache should be reset when "deallocate/discard" message observed
*/
void setFlushCacheOnDeallocate(boolean flushCacheOnDeallocate);

/**
* Indicates if statements to backend should be hinted as read only.
*
* @return Indication if hints to backend (such as when transaction begins)
* should be read only.
* @see PGProperty#READ_ONLY_MODE
*/
boolean hintReadOnly();
}
5 changes: 5 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/core/QueryExecutor.java
Expand Up @@ -121,6 +121,11 @@ public interface QueryExecutor extends TypeTransferModeRegistry {

int MAX_SAVE_POINTS = 1000;

/**
* Flag indicating that when beginning a transaction, it should be read only.
*/
int QUERY_READ_ONLY_HINT = 2048;

/**
* Execute a Query, passing results to a provided ResultHandler.
*
Expand Down
36 changes: 18 additions & 18 deletions pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java
Expand Up @@ -371,9 +371,7 @@ && getAutoSave() != AutoSave.NEVER
// PostgreSQL does not support bind, exec, simple, sync message flow,
// so we force autosavepoint to use simple if the main query is using simple
| QUERY_EXECUTE_AS_SIMPLE);

return true;

}
return false;
}
Expand Down Expand Up @@ -548,7 +546,9 @@ private ResultHandler sendQueryPreamble(final ResultHandler delegateHandler, int

beginFlags = updateQueryMode(beginFlags);

sendOneQuery(beginTransactionQuery, SimpleQuery.NO_PARAMETERS, 0, 0, beginFlags);
final SimpleQuery beginQuery = ((flags & QueryExecutor.QUERY_READ_ONLY_HINT) == 0) ? beginTransactionQuery : beginReadOnlyTransactionQuery;

sendOneQuery(beginQuery, SimpleQuery.NO_PARAMETERS, 0, 0, beginFlags);

// Insert a handler that intercepts the BEGIN.
return new ResultHandlerDelegate(delegateHandler) {
Expand Down Expand Up @@ -1145,8 +1145,8 @@ CopyOperationImpl processCopyResults(CopyOperationImpl op, boolean block)
try {
if (op == null) {
throw new PSQLException(GT
.tr("Received CommandComplete ''{0}'' without an active copy operation", status),
PSQLState.OBJECT_NOT_IN_STATE);
.tr("Received CommandComplete ''{0}'' without an active copy operation", status),
PSQLState.OBJECT_NOT_IN_STATE);
}
op.handleCommandStatus(status);
} catch (SQLException se) {
Expand All @@ -1171,7 +1171,7 @@ CopyOperationImpl processCopyResults(CopyOperationImpl op, boolean block)

if (op != null) {
error = new PSQLException(GT.tr("Got CopyInResponse from server during an active {0}",
op.getClass().getName()), PSQLState.OBJECT_NOT_IN_STATE);
op.getClass().getName()), PSQLState.OBJECT_NOT_IN_STATE);
}

op = new CopyInImpl();
Expand All @@ -1184,8 +1184,7 @@ CopyOperationImpl processCopyResults(CopyOperationImpl op, boolean block)
LOGGER.log(Level.FINEST, " <=BE CopyOutResponse");

if (op != null) {
error =
new PSQLException(GT.tr("Got CopyOutResponse from server during an active {0}",
error = new PSQLException(GT.tr("Got CopyOutResponse from server during an active {0}",
op.getClass().getName()), PSQLState.OBJECT_NOT_IN_STATE);
}

Expand All @@ -1199,8 +1198,7 @@ CopyOperationImpl processCopyResults(CopyOperationImpl op, boolean block)
LOGGER.log(Level.FINEST, " <=BE CopyBothResponse");

if (op != null) {
error =
new PSQLException(GT.tr("Got CopyBothResponse from server during an active {0}",
error = new PSQLException(GT.tr("Got CopyBothResponse from server during an active {0}",
op.getClass().getName()), PSQLState.OBJECT_NOT_IN_STATE);
}

Expand All @@ -1220,11 +1218,11 @@ CopyOperationImpl processCopyResults(CopyOperationImpl op, boolean block)
byte[] buf = pgStream.receive(len);
if (op == null) {
error = new PSQLException(GT.tr("Got CopyData without an active copy operation"),
PSQLState.OBJECT_NOT_IN_STATE);
PSQLState.OBJECT_NOT_IN_STATE);
} else if (!(op instanceof CopyOut)) {
error = new PSQLException(
GT.tr("Unexpected copydata from server for {0}", op.getClass().getName()),
PSQLState.COMMUNICATION_ERROR);
GT.tr("Unexpected copydata from server for {0}", op.getClass().getName()),
PSQLState.COMMUNICATION_ERROR);
} else {
op.handleCopydata(buf);
}
Expand All @@ -1242,7 +1240,7 @@ CopyOperationImpl processCopyResults(CopyOperationImpl op, boolean block)

if (!(op instanceof CopyOut)) {
error = new PSQLException("Got CopyDone while not copying from server",
PSQLState.OBJECT_NOT_IN_STATE);
PSQLState.OBJECT_NOT_IN_STATE);
}

// keep receiving since we expect a CommandComplete
Expand Down Expand Up @@ -1283,7 +1281,7 @@ CopyOperationImpl processCopyResults(CopyOperationImpl op, boolean block)

default:
throw new IOException(
GT.tr("Unexpected packet type during copy: {0}", Integer.toString(c)));
GT.tr("Unexpected packet type during copy: {0}", Integer.toString(c)));
}

// Collect errors into a neat chain for completeness
Expand Down Expand Up @@ -2792,6 +2790,11 @@ public boolean getIntegerDateTimes() {
new NativeQuery("BEGIN", new int[0], false, SqlCommand.BLANK),
null, false);

private final SimpleQuery beginReadOnlyTransactionQuery =
new SimpleQuery(
new NativeQuery("BEGIN READ ONLY", new int[0], false, SqlCommand.BLANK),
null, false);

private final SimpleQuery emptyQuery =
new SimpleQuery(
new NativeQuery("", new int[0], false,
Expand All @@ -2815,7 +2818,4 @@ public boolean getIntegerDateTimes() {
new SimpleQuery(
new NativeQuery("ROLLBACK TO SAVEPOINT PGJDBC_AUTOSAVE", new int[0], false, SqlCommand.BLANK),
null, false);



}
16 changes: 16 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/ds/common/BaseDataSource.java
Expand Up @@ -912,6 +912,22 @@ public void setReadOnly(boolean readOnly) {
PGProperty.READ_ONLY.set(properties, readOnly);
}

/**
* @return The behavior when set read only
* @see PGProperty#READ_ONLY_MODE
*/
public String getReadOnlyMode() {
return PGProperty.READ_ONLY_MODE.get(properties);
}

/**
* @param mode the behavior when set read only
* @see PGProperty#READ_ONLY_MODE
*/
public void setReadOnlyMode(String mode) {
PGProperty.READ_ONLY_MODE.set(properties, mode);
}

/**
* @return true if driver should log unclosed connections
* @see PGProperty#LOG_UNCLOSED_CONNECTIONS
Expand Down
73 changes: 69 additions & 4 deletions pgjdbc/src/main/java/org/postgresql/jdbc/PgConnection.java
Expand Up @@ -81,6 +81,12 @@ public class PgConnection implements BaseConnection {
private static final SQLPermission SQL_PERMISSION_ABORT = new SQLPermission("callAbort");
private static final SQLPermission SQL_PERMISSION_NETWORK_TIMEOUT = new SQLPermission("setNetworkTimeout");

private enum ReadOnlyBehavior {
ignore,
transaction,
always;
}

//
// Data initialized on construction:
//
Expand All @@ -89,6 +95,8 @@ public class PgConnection implements BaseConnection {
/* URL we were created via */
private final String creatingURL;

private final ReadOnlyBehavior readOnlyBehavior;

private Throwable openStackTrace;

/* Actual network handler */
Expand All @@ -99,6 +107,10 @@ public class PgConnection implements BaseConnection {
/* Query that runs ROLLBACK */
private final Query rollbackQuery;

private final CachedQuery setSessionReadOnly;

private final CachedQuery setSessionNotReadOnly;

private final TypeInfo typeCache;

private boolean disableColumnSanitiser = false;
Expand Down Expand Up @@ -184,6 +196,8 @@ public PgConnection(HostSpec[] hostSpecs,

this.creatingURL = url;

this.readOnlyBehavior = getReadOnlyBehavior(PGProperty.READ_ONLY_MODE.get(info));

setDefaultFetchSize(PGProperty.DEFAULT_ROW_FETCH_SIZE.getInt(info));

setPrepareThreshold(PGProperty.PREPARE_THRESHOLD.getInt(info));
Expand All @@ -199,6 +213,9 @@ public PgConnection(HostSpec[] hostSpecs,
LOGGER.log(Level.WARNING, "Unsupported Server Version: {0}", queryExecutor.getServerVersion());
}

setSessionReadOnly = createQuery("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY", false, true);
setSessionNotReadOnly = createQuery("SET SESSION CHARACTERISTICS AS TRANSACTION READ WRITE", false, true);

// Set read-only early if requested
if (PGProperty.READ_ONLY.getBoolean(info)) {
setReadOnly(true);
Expand Down Expand Up @@ -292,6 +309,18 @@ public TimeZone get() {
replicationConnection = PGProperty.REPLICATION.get(info) != null;
}

private static ReadOnlyBehavior getReadOnlyBehavior(String property) {
try {
return ReadOnlyBehavior.valueOf(property);
} catch (IllegalArgumentException e) {
try {
return ReadOnlyBehavior.valueOf(property.toLowerCase(Locale.US));
} catch (IllegalArgumentException e2) {
return ReadOnlyBehavior.transaction;
}
}
}

private static Set<Integer> getBinaryOids(Properties info) throws PSQLException {
boolean binaryTransfer = PGProperty.BINARY_TRANSFER.getBoolean(info);
// Formats that currently have binary protocol support
Expand Down Expand Up @@ -453,6 +482,24 @@ public void execSQLUpdate(String s) throws SQLException {
stmt.close();
}

void execSQLUpdate(CachedQuery query) throws SQLException {
BaseStatement stmt = (BaseStatement) createStatement();
if (stmt.executeWithFlags(query, QueryExecutor.QUERY_NO_METADATA | QueryExecutor.QUERY_NO_RESULTS
| QueryExecutor.QUERY_SUPPRESS_BEGIN)) {
throw new PSQLException(GT.tr("A result was returned when none was expected."),
PSQLState.TOO_MANY_RESULTS);
}

// Transfer warnings to the connection, since the user never
// has a chance to see the statement itself.
SQLWarning warnings = stmt.getWarnings();
if (warnings != null) {
addWarning(warnings);
}

stmt.close();
}

/**
* <p>In SQL, a result table can be retrieved through a cursor that is named. The current row of a
* result can be updated or deleted using a positioned update/delete statement that references the
Expand Down Expand Up @@ -705,10 +752,8 @@ public void setReadOnly(boolean readOnly) throws SQLException {
PSQLState.ACTIVE_SQL_TRANSACTION);
}

if (readOnly != this.readOnly) {
String readOnlySql
= "SET SESSION CHARACTERISTICS AS TRANSACTION " + (readOnly ? "READ ONLY" : "READ WRITE");
execSQLUpdate(readOnlySql); // nb: no BEGIN triggered.
if (readOnly != this.readOnly && autoCommit && this.readOnlyBehavior == ReadOnlyBehavior.always) {
execSQLUpdate(readOnly ? setSessionReadOnly : setSessionNotReadOnly);
}

this.readOnly = readOnly;
Expand All @@ -721,6 +766,11 @@ public boolean isReadOnly() throws SQLException {
return readOnly;
}

@Override
public boolean hintReadOnly() {
return readOnly && readOnlyBehavior != ReadOnlyBehavior.ignore;
}

@Override
public void setAutoCommit(boolean autoCommit) throws SQLException {
checkClosed();
Expand All @@ -733,6 +783,21 @@ public void setAutoCommit(boolean autoCommit) throws SQLException {
commit();
}

// if the connection is read only, we need to make sure session settings are
// correct when autocommit status changed
if (this.readOnly && readOnlyBehavior == ReadOnlyBehavior.always) {
// if we are turning on autocommit, we need to set session
// to read only
if (autoCommit) {
this.autoCommit = true;
execSQLUpdate(setSessionReadOnly);
} else {
// if we are turning auto commit off, we need to
// disable session
execSQLUpdate(setSessionNotReadOnly);
}
}

this.autoCommit = autoCommit;
LOGGER.log(Level.FINE, " setAutoCommit = {0}", autoCommit);
}
Expand Down
6 changes: 6 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/jdbc/PgStatement.java
Expand Up @@ -408,6 +408,9 @@ private void executeInternal(CachedQuery cachedQuery, ParameterList queryParamet
if (connection.getAutoCommit()) {
flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;
}
if (connection.hintReadOnly()) {
flags |= QueryExecutor.QUERY_READ_ONLY_HINT;
}

// updateable result sets do not yet support binary updates
if (concurrency != ResultSet.CONCUR_READ_ONLY) {
Expand Down Expand Up @@ -810,6 +813,9 @@ private BatchResultHandler internalExecuteBatch() throws SQLException {
if (connection.getAutoCommit()) {
flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;
}
if (connection.hintReadOnly()) {
flags |= QueryExecutor.QUERY_READ_ONLY_HINT;
}

BatchResultHandler handler;
handler = createBatchHandler(queries, parameterLists);
Expand Down