Skip to content

Commit

Permalink
Expose ByteStreamWriter in CopyManager (#1702)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lőrinc Pap committed Feb 11, 2020
1 parent ed09fd1 commit 1e4f475
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 0 deletions.
10 changes: 10 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/copy/CopyIn.java
Expand Up @@ -5,6 +5,8 @@

package org.postgresql.copy;

import org.postgresql.util.ByteStreamWriter;

import java.sql.SQLException;

/**
Expand All @@ -22,6 +24,14 @@ public interface CopyIn extends CopyOperation {
*/
void writeToCopy(byte[] buf, int off, int siz) throws SQLException;

/**
* Writes a ByteStreamWriter to an open and writable copy operation.
*
* @param from the source of bytes, e.g. a ByteBufferByteStreamWriter
* @throws SQLException if the operation fails
*/
void writeToCopy(ByteStreamWriter from) throws SQLException;

/**
* Force any buffered output to be sent over the network to the backend. In general this is a
* useless operation as it will get pushed over in due time or when endCopy is called. Some
Expand Down
23 changes: 23 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/copy/CopyManager.java
Expand Up @@ -8,6 +8,7 @@
import org.postgresql.core.BaseConnection;
import org.postgresql.core.Encoding;
import org.postgresql.core.QueryExecutor;
import org.postgresql.util.ByteStreamWriter;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
Expand Down Expand Up @@ -227,4 +228,26 @@ public long copyIn(final String sql, InputStream from, int bufferSize)
}
}
}

/**
* Use COPY FROM STDIN for very fast copying from an ByteStreamWriter into a database table.
*
* @param sql COPY FROM STDIN statement
* @param from the source of bytes, e.g. a ByteBufferByteStreamWriter
* @return number of rows updated for server 8.2 or newer; -1 for older
* @throws SQLException on database usage issues
* @throws IOException upon input stream or database connection failure
*/
public long copyIn(String sql, ByteStreamWriter from)
throws SQLException, IOException {
CopyIn cp = copyIn(sql);
try {
cp.writeToCopy(from);
return cp.endCopy();
} finally { // see to it that we do not leave the connection locked
if (cp.isActive()) {
cp.cancelCopy();
}
}
}
}
Expand Up @@ -6,6 +6,7 @@
package org.postgresql.copy;

import org.postgresql.PGConnection;
import org.postgresql.util.ByteStreamWriter;
import org.postgresql.util.GT;

import java.io.IOException;
Expand Down Expand Up @@ -140,6 +141,10 @@ public void writeToCopy(byte[] buf, int off, int siz) throws SQLException {
}
}

public void writeToCopy(ByteStreamWriter from) throws SQLException {
op.writeToCopy(from);
}

public int getFormat() {
return op.getFormat();
}
Expand Down
5 changes: 5 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/core/v3/CopyDualImpl.java
Expand Up @@ -6,6 +6,7 @@
package org.postgresql.core.v3;

import org.postgresql.copy.CopyDual;
import org.postgresql.util.ByteStreamWriter;
import org.postgresql.util.PSQLException;

import java.sql.SQLException;
Expand All @@ -19,6 +20,10 @@ public void writeToCopy(byte[] data, int off, int siz) throws SQLException {
queryExecutor.writeToCopy(this, data, off, siz);
}

public void writeToCopy(ByteStreamWriter from) throws SQLException {
queryExecutor.writeToCopy(this, from);
}

public void flushCopy() throws SQLException {
queryExecutor.flushCopy(this);
}
Expand Down
5 changes: 5 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/core/v3/CopyInImpl.java
Expand Up @@ -6,6 +6,7 @@
package org.postgresql.core.v3;

import org.postgresql.copy.CopyIn;
import org.postgresql.util.ByteStreamWriter;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
Expand Down Expand Up @@ -37,6 +38,10 @@ public void writeToCopy(byte[] data, int off, int siz) throws SQLException {
queryExecutor.writeToCopy(this, data, off, siz);
}

public void writeToCopy(ByteStreamWriter from) throws SQLException {
queryExecutor.writeToCopy(this, from);
}

public void flushCopy() throws SQLException {
queryExecutor.flushCopy(this);
}
Expand Down
28 changes: 28 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java
Expand Up @@ -37,6 +37,7 @@
import org.postgresql.jdbc.AutoSave;
import org.postgresql.jdbc.BatchResultHandler;
import org.postgresql.jdbc.TimestampUtils;
import org.postgresql.util.ByteStreamWriter;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
Expand Down Expand Up @@ -1034,7 +1035,34 @@ public synchronized void writeToCopy(CopyOperationImpl op, byte[] data, int off,
pgStream.sendChar('d');
pgStream.sendInteger4(siz + 4);
pgStream.send(data, off, siz);
} catch (IOException ioe) {
throw new PSQLException(GT.tr("Database connection failed when writing to copy"),
PSQLState.CONNECTION_FAILURE, ioe);
}
}

/**
* Sends data during a live COPY IN operation. Only unlocks the connection if server suddenly
* returns CommandComplete, which should not happen
*
* @param op the CopyIn operation presumably currently holding lock on this connection
* @param from the source of bytes, e.g. a ByteBufferByteStreamWriter
* @throws SQLException on failure
*/
public synchronized void writeToCopy(CopyOperationImpl op, ByteStreamWriter from)
throws SQLException {
if (!hasLock(op)) {
throw new PSQLException(GT.tr("Tried to write to an inactive copy operation"),
PSQLState.OBJECT_NOT_IN_STATE);
}

int siz = from.getLength();
LOGGER.log(Level.FINEST, " FE=> CopyData({0})", siz);

try {
pgStream.sendChar('d');
pgStream.sendInteger4(siz + 4);
pgStream.send(from);
} catch (IOException ioe) {
throw new PSQLException(GT.tr("Database connection failed when writing to copy"),
PSQLState.CONNECTION_FAILURE, ioe);
Expand Down
10 changes: 10 additions & 0 deletions pgjdbc/src/test/java/org/postgresql/test/jdbc2/CopyTest.java
Expand Up @@ -18,6 +18,7 @@
import org.postgresql.copy.PGCopyOutputStream;
import org.postgresql.core.ServerVersion;
import org.postgresql.test.TestUtil;
import org.postgresql.util.ByteBufferByteStreamWriter;
import org.postgresql.util.PSQLState;

import org.junit.After;
Expand All @@ -31,6 +32,7 @@
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.StringReader;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down Expand Up @@ -210,6 +212,14 @@ public void testCopyInFromReader() throws SQLException, IOException {
assertEquals(dataRows, rowCount);
}

@Test
public void testCopyInFromByteStreamWriter() throws SQLException, IOException {
String sql = "COPY copytest FROM STDIN";
copyAPI.copyIn(sql, new ByteBufferByteStreamWriter(ByteBuffer.wrap(getData(origData))));
int rowCount = getCount();
assertEquals(dataRows, rowCount);
}

@Test
public void testSkipping() {
String sql = "COPY copytest FROM STDIN";
Expand Down

0 comments on commit 1e4f475

Please sign in to comment.