Skip to content

Commit

Permalink
fix: close refcursors when underlying cursor==null instead of relying…
Browse files Browse the repository at this point in the history
… on defaultRowFetchSize

See #2227
See #2371
  • Loading branch information
vlsi committed May 23, 2022
1 parent 12541c4 commit 85f8581
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 102 deletions.
3 changes: 3 additions & 0 deletions docs/_posts/2022-02-01-42.3.2-release.md
Expand Up @@ -7,6 +7,9 @@ version: 42.3.2
---

**Notable changes**
### Known issues
- Regression since 42.3.2: "cursor <unnamed portal 1> does not exist" when using ResultSet.setFetchSize from CallableStatement, fixed in 42.3.5 (see [PG #2377](https://github.com/pgjdbc/pgjdbc/pull/2377))

### Security
- CVE-2022-21724 pgjdbc instantiates plugin instances based on class names provided via authenticationPluginClassName,
sslhostnameverifier, socketFactory, sslfactory, sslpasswordcallback connection properties.
Expand Down
3 changes: 3 additions & 0 deletions docs/_posts/2022-02-15-42.3.3-release.md
Expand Up @@ -7,6 +7,9 @@ version: 42.3.3
---
**Notable changes**

### Known issues
- Regression since 42.3.2: "cursor <unnamed portal 1> does not exist" when using ResultSet.setFetchSize from CallableStatement, fixed in 42.3.5 (see [PG #2377](https://github.com/pgjdbc/pgjdbc/pull/2377))

### Changed
- fix: Removed loggerFile and loggerLevel configuration. While the properties still exist.
They can no longer be used to configure the driver logging. Instead use java.util.logging
Expand Down
2 changes: 2 additions & 0 deletions docs/_posts/2022-04-15-42.3.4-release.md
Expand Up @@ -7,6 +7,8 @@ version: 42.3.4
---

**Notable changes**
### Known issues
- Regression since 42.3.2: "cursor <unnamed portal 1> does not exist" when using ResultSet.setFetchSize from CallableStatement, fixed in 42.3.5 (see [PG #2377](https://github.com/pgjdbc/pgjdbc/pull/2377))

### Changed
- fix: change name of build cache [PR 2471](https://github.com/pgjdbc/pgjdbc/pull/2471)
Expand Down
2 changes: 2 additions & 0 deletions docs/_posts/2022-05-04-42.3.5-release.md
Expand Up @@ -6,6 +6,8 @@ categories:
version: 42.3.5
---
**Notable changes**
### Known issues
- Regression since 42.3.2: "cursor <unnamed portal 1> does not exist" when using ResultSet.setFetchSize from CallableStatement, fixed in 42.3.5 (see [PG #2377](https://github.com/pgjdbc/pgjdbc/pull/2377))

### Changed
- test: polish TimestampUtilsTest
Expand Down
57 changes: 28 additions & 29 deletions pgjdbc/src/main/java/org/postgresql/jdbc/PgResultSet.java
Expand Up @@ -286,23 +286,12 @@ public java.net.URL getURL(String columnName) throws SQLException {
// we have updatable cursors it must be readonly.
ResultSet rs =
connection.execSQLQuery(sb.toString(), resultsettype, ResultSet.CONCUR_READ_ONLY);

/*
If the user has set a fetch size we can't close the cursor yet.
Issue https://github.com/pgjdbc/pgjdbc/issues/2227
*/
if (connection.getDefaultFetchSize() == 0 ) {
/*
// In long running transactions these backend cursors take up memory space
// we could close in rs.close(), but if the transaction is closed before the result set,
// then the cursor no longer exists
*/
sb.setLength(0);
sb.append("CLOSE ");
Utils.escapeIdentifier(sb, cursorName);
connection.execSQLUpdate(sb.toString());
}
((PgResultSet) rs).setRefCursor(cursorName);
// In long-running transactions these backend cursors take up memory space
// we could close in rs.close(), but if the transaction is closed before the result set,
// then
// the cursor no longer exists
((PgResultSet) rs).closeRefCursor();
return rs;
}
if ("hstore".equals(type)) {
Expand Down Expand Up @@ -2154,6 +2143,10 @@ public boolean next() throws SQLException {
connection.getQueryExecutor()
.fetch(cursor, new CursorResultHandler(), fetchRows, adaptiveFetch);

// .fetch(...) could update this.cursor, and cursor==null means
// there are no more rows to fetch
closeRefCursor();

// After fetch, update last used fetch size (could be useful for adaptive fetch).
lastUsedFetchSize = fetchRows;

Expand Down Expand Up @@ -2192,24 +2185,30 @@ protected void closeInternally() throws SQLException {
rows = null;
JdbcBlackHole.close(deleteStatement);
deleteStatement = null;
if (cursor != null) {
cursor.close();
cursor = null;
}
closeRefCursor();
}

/* this used to be closed right after reading all of the rows,
however if fetchSize is set ony fetchSize rows will be read and then
the cursor will be closed
We only need to worry about closing it if the transaction is still open
if it is in error it will get closed eventually. (maybe not ?)
*/
if ( refCursorName != null && fetchSize != 0) {
/**
* Closes {@code <unnamed portal 1>} if no more fetch calls expected ({@code cursor==null})
* @throws SQLException if portal close fails
*/
private void closeRefCursor() throws SQLException {
String refCursorName = this.refCursorName;
if (refCursorName == null || cursor != null) {
return;
}
try {
if (connection.getTransactionState() == TransactionState.OPEN) {
StringBuilder sb = new StringBuilder("CLOSE ");
Utils.escapeIdentifier(sb, castNonNull(refCursorName));
Utils.escapeIdentifier(sb, refCursorName);
connection.execSQLUpdate(sb.toString());
refCursorName = null;
}
}
if (cursor != null) {
cursor.close();
cursor = null;
} finally {
this.refCursorName = null;
}
}

Expand Down
19 changes: 19 additions & 0 deletions pgjdbc/src/test/java/org/postgresql/test/TestUtil.java
Expand Up @@ -654,6 +654,13 @@ public static void dropType(Connection con, String type) throws SQLException {
dropObject(con, "TYPE", type);
}

/*
* Drops a function with a given signature.
*/
public static void dropFunction(Connection con, String name, String arguments) throws SQLException {
dropObject(con, "FUNCTION", name + "(" + arguments + ")");
}

private static void dropObject(Connection con, String type, String name) throws SQLException {
Statement stmt = con.createStatement();
try {
Expand Down Expand Up @@ -1163,7 +1170,19 @@ private static void waitStopReplicationSlot(Connection connection, String slotNa
}
}

/**
* Executes given SQL via {@link Statement#execute(String)} on a given connection.
* @deprecated prefer {@link #execute(Connection, String)} since it yields easier for read code
*/
@Deprecated
public static void execute(String sql, Connection connection) throws SQLException {
execute(connection, sql);
}

/**
* Executes given SQL via {@link Statement#execute(String)} on a given connection.
*/
public static void execute(Connection connection, String sql) throws SQLException {
try (Statement stmt = connection.createStatement()) {
stmt.execute(sql);
}
Expand Down
182 changes: 109 additions & 73 deletions pgjdbc/src/test/java/org/postgresql/test/jdbc2/RefCursorFetchTest.java
Expand Up @@ -7,115 +7,151 @@

import static org.junit.Assert.assertEquals;

import org.postgresql.PGConnection;
import org.postgresql.PGProperty;
import org.postgresql.core.ServerVersion;
import org.postgresql.test.TestUtil;

import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Properties;

@RunWith(Parameterized.class)
public class RefCursorFetchTest extends BaseTest4 {
@BeforeClass
public static void checkVersion() throws SQLException {
TestUtil.assumeHaveMinimumServerVersion(ServerVersion.v9_0);
private final int numRows;
private final @Nullable Integer defaultFetchSize;
private final @Nullable Integer resultSetFetchSize;
private final AutoCommit autoCommit;
private final boolean commitAfterExecute;

public RefCursorFetchTest(BinaryMode binaryMode, int numRows,
@Nullable Integer defaultFetchSize,
@Nullable Integer resultSetFetchSize,
AutoCommit autoCommit, boolean commitAfterExecute) {
this.numRows = numRows;
this.defaultFetchSize = defaultFetchSize;
this.resultSetFetchSize = resultSetFetchSize;
this.autoCommit = autoCommit;
this.commitAfterExecute = commitAfterExecute;
setBinaryMode(binaryMode);
}

@Parameterized.Parameters(name = "binary = {0}, numRows = {1}, defaultFetchSize = {2}, resultSetFetchSize = {3}, autoCommit = {4}, commitAfterExecute = {5}")
public static Iterable<Object[]> data() {
Collection<Object[]> ids = new ArrayList<>();
for (BinaryMode binaryMode : BinaryMode.values()) {
for (int numRows : new int[]{0, 10, 101}) {
for (Integer defaultFetchSize : new Integer[]{null, 0, 9, 50}) {
for (AutoCommit autoCommit : AutoCommit.values()) {
for (boolean commitAfterExecute : new boolean[]{true, false}) {
for (Integer resultSetFetchSize : new Integer[]{null, 0, 9, 50}) {
ids.add(new Object[]{binaryMode, numRows, defaultFetchSize, resultSetFetchSize, autoCommit, commitAfterExecute});
}
}
}
}
}
}
return ids;
}

@Override
public void setUp() throws Exception {
super.setUp();
assumeCallableStatementsSupported();
protected void updateProperties(Properties props) {
super.updateProperties(props);
if (defaultFetchSize != null) {
PGProperty.DEFAULT_ROW_FETCH_SIZE.set(props, defaultFetchSize);
}
}

TestUtil.dropTable(con, "test_blob");
TestUtil.createTable(con, "test_blob", "content bytea");
@BeforeClass
public static void beforeClass() throws Exception {
TestUtil.assumeHaveMinimumServerVersion(ServerVersion.v9_0);
try (Connection con = TestUtil.openDB();) {
TestUtil.createTable(con, "test_blob", "content bytea");
TestUtil.execute(con, "");
TestUtil.execute(con, "--create function to read data\n"
+ "CREATE OR REPLACE FUNCTION test_blob(p_cur OUT REFCURSOR, p_limit int4) AS $body$\n"
+ "BEGIN\n"
+ "OPEN p_cur FOR SELECT content FROM test_blob LIMIT p_limit;\n"
+ "END;\n"
+ "$body$ LANGUAGE plpgsql STABLE");

// Create a function to read the blob data
TestUtil.execute("CREATE OR REPLACE FUNCTION test_blob (p_cur OUT REFCURSOR) AS\n"
+ "$body$\n"
+ " BEGIN\n"
+ " OPEN p_cur FOR SELECT content FROM test_blob;\n"
+ " END;\n"
+ "$body$ LANGUAGE plpgsql STABLE", con);
TestUtil.execute(con,"--generate 101 rows with 4096 bytes:\n"
+ "insert into test_blob\n"
+ "select(select decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256) - 1), 2, '0'), ''), 'hex')"
+ " FROM generate_series(1, 4096))\n"
+ "from generate_series (1, 200)");
}
}

// Generate 101 rows with 4096 bytes
TestUtil.execute("INSERT INTO test_blob (content)\n"
+ "SELECT (SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256) - 1), 2, '0'), ''), 'hex')\n"
+ " FROM generate_series(1, 4096)) AS content\n"
+ "FROM generate_series (1, 101)", con);
@AfterClass
public static void afterClass() throws Exception {
try (Connection con = TestUtil.openDB();) {
TestUtil.dropTable(con, "test_blob");
TestUtil.dropFunction(con,"test_blob", "REFCURSOR, int4");
}
}

@Override
public void tearDown() throws SQLException {
TestUtil.execute("DROP FUNCTION IF EXISTS test_blob (OUT REFCURSOR)", con);
TestUtil.dropTable(con, "test_blob");
super.tearDown();
public void setUp() throws Exception {
super.setUp();
assumeCallableStatementsSupported();
con.setAutoCommit(autoCommit == AutoCommit.YES);
}

@Test
public void testRefCursorWithFetchSize() throws SQLException {
((PGConnection)con).setDefaultFetchSize(50);
public void fetchAllRows() throws SQLException {
int cnt = 0;
try (CallableStatement call = con.prepareCall("{? = call test_blob()}")) {
try (CallableStatement call = con.prepareCall("{? = call test_blob(?)}")) {
con.setAutoCommit(false); // ref cursors only work if auto commit is off
call.registerOutParameter(1, Types.REF_CURSOR);
call.setInt(2, numRows);
call.execute();
try (ResultSet rs = (ResultSet) call.getObject(1)) {
while (rs.next()) {
cnt++;
if (commitAfterExecute) {
if (autoCommit == AutoCommit.NO) {
con.commit();
} else {
con.setAutoCommit(false);
con.setAutoCommit(true);
}
}
assertEquals(101, cnt);
}
}

@Test
public void testRefCursorWithOutFetchSize() throws SQLException {
assumeCallableStatementsSupported();
int cnt = 0;
try (CallableStatement call = con.prepareCall("{? = call test_blob()}")) {
con.setAutoCommit(false); // ref cursors only work if auto commit is off
call.registerOutParameter(1, Types.REF_CURSOR);
call.execute();
try (ResultSet rs = (ResultSet) call.getObject(1)) {
while (rs.next()) {
cnt++;
if (resultSetFetchSize != null) {
rs.setFetchSize(resultSetFetchSize);
}
}
assertEquals(101, cnt);
}
}

/*
test to make sure that close in the result set does not attempt to get rid of the non-existent
portal
*/
@Test
public void testRefCursorWithFetchSizeNoTransaction() throws SQLException {
assumeCallableStatementsSupported();
((PGConnection)con).setDefaultFetchSize(50);
int cnt = 0;
try (CallableStatement call = con.prepareCall("{? = call test_blob()}")) {
con.setAutoCommit(false); // ref cursors only work if auto commit is off
call.registerOutParameter(1, Types.REF_CURSOR);
call.execute();
// end the transaction here, which will get rid of the refcursor
con.setAutoCommit(true);
// we should be able to read the first 50 as they were read before the tx was ended
try (ResultSet rs = (ResultSet) call.getObject(1)) {
while (rs.next()) {
cnt++;
}
} catch (SQLException ex) {
// should get an exception here as we try to read more but the portal is gone
assertEquals("34000", ex.getSQLState());
} finally {
assertEquals(50, cnt);
assertEquals("number of rows from test_blob(...) call", numRows, cnt);
} catch (SQLException e) {
if (commitAfterExecute && "34000".equals(e.getSQLState())) {
// Transaction commit closes refcursor, so the fetch call is expected to fail
// File: postgres.c, Routine: exec_execute_message, Line: 2070
// Server SQLState: 34000
int expectedRows =
defaultFetchSize != null && defaultFetchSize != 0 ? Math.min(defaultFetchSize, numRows) : numRows;
assertEquals(
"The transaction was committed before processing the results,"
+ " so expecting ResultSet to buffer fetchSize=" + defaultFetchSize + " rows out of "
+ numRows,
expectedRows,
cnt
);
return;
}
throw e;
}
}
}

}

0 comments on commit 85f8581

Please sign in to comment.