Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Expose ByteStreamWriter in CopyManager #1702

Merged
merged 1 commit into from Feb 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/copy/CopyIn.java
Expand Up @@ -5,6 +5,8 @@

package org.postgresql.copy;

import org.postgresql.util.ByteStreamWriter;

import java.sql.SQLException;

/**
Expand All @@ -22,6 +24,14 @@ public interface CopyIn extends CopyOperation {
*/
void writeToCopy(byte[] buf, int off, int siz) throws SQLException;

/**
* Writes a ByteStreamWriter to an open and writable copy operation.
*
* @param from the source of bytes, e.g. a ByteBufferByteStreamWriter
* @throws SQLException if the operation fails
*/
void writeToCopy(ByteStreamWriter from) throws SQLException;

/**
* Force any buffered output to be sent over the network to the backend. In general this is a
* useless operation as it will get pushed over in due time or when endCopy is called. Some
Expand Down
23 changes: 23 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/copy/CopyManager.java
Expand Up @@ -8,6 +8,7 @@
import org.postgresql.core.BaseConnection;
import org.postgresql.core.Encoding;
import org.postgresql.core.QueryExecutor;
import org.postgresql.util.ByteStreamWriter;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
Expand Down Expand Up @@ -227,4 +228,26 @@ public long copyIn(final String sql, InputStream from, int bufferSize)
}
}
}

/**
* Use COPY FROM STDIN for very fast copying from an ByteStreamWriter into a database table.
*
* @param sql COPY FROM STDIN statement
* @param from the source of bytes, e.g. a ByteBufferByteStreamWriter
* @return number of rows updated for server 8.2 or newer; -1 for older
* @throws SQLException on database usage issues
* @throws IOException upon input stream or database connection failure
*/
public long copyIn(String sql, ByteStreamWriter from)
throws SQLException, IOException {
CopyIn cp = copyIn(sql);
try {
cp.writeToCopy(from);
return cp.endCopy();
} finally { // see to it that we do not leave the connection locked
if (cp.isActive()) {
cp.cancelCopy();
}
}
}
}
Expand Up @@ -6,6 +6,7 @@
package org.postgresql.copy;

import org.postgresql.PGConnection;
import org.postgresql.util.ByteStreamWriter;
import org.postgresql.util.GT;

import java.io.IOException;
Expand Down Expand Up @@ -140,6 +141,10 @@ public void writeToCopy(byte[] buf, int off, int siz) throws SQLException {
}
}

public void writeToCopy(ByteStreamWriter from) throws SQLException {
op.writeToCopy(from);
}

public int getFormat() {
return op.getFormat();
}
Expand Down
5 changes: 5 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/core/v3/CopyDualImpl.java
Expand Up @@ -6,6 +6,7 @@
package org.postgresql.core.v3;

import org.postgresql.copy.CopyDual;
import org.postgresql.util.ByteStreamWriter;
import org.postgresql.util.PSQLException;

import java.sql.SQLException;
Expand All @@ -19,6 +20,10 @@ public void writeToCopy(byte[] data, int off, int siz) throws SQLException {
queryExecutor.writeToCopy(this, data, off, siz);
}

public void writeToCopy(ByteStreamWriter from) throws SQLException {
queryExecutor.writeToCopy(this, from);
}

public void flushCopy() throws SQLException {
queryExecutor.flushCopy(this);
}
Expand Down
5 changes: 5 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/core/v3/CopyInImpl.java
Expand Up @@ -6,6 +6,7 @@
package org.postgresql.core.v3;

import org.postgresql.copy.CopyIn;
import org.postgresql.util.ByteStreamWriter;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
Expand Down Expand Up @@ -37,6 +38,10 @@ public void writeToCopy(byte[] data, int off, int siz) throws SQLException {
queryExecutor.writeToCopy(this, data, off, siz);
}

public void writeToCopy(ByteStreamWriter from) throws SQLException {
queryExecutor.writeToCopy(this, from);
}

public void flushCopy() throws SQLException {
queryExecutor.flushCopy(this);
}
Expand Down
28 changes: 28 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java
Expand Up @@ -37,6 +37,7 @@
import org.postgresql.jdbc.AutoSave;
import org.postgresql.jdbc.BatchResultHandler;
import org.postgresql.jdbc.TimestampUtils;
import org.postgresql.util.ByteStreamWriter;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
Expand Down Expand Up @@ -1034,7 +1035,34 @@ public synchronized void writeToCopy(CopyOperationImpl op, byte[] data, int off,
pgStream.sendChar('d');
pgStream.sendInteger4(siz + 4);
pgStream.send(data, off, siz);
} catch (IOException ioe) {
throw new PSQLException(GT.tr("Database connection failed when writing to copy"),
PSQLState.CONNECTION_FAILURE, ioe);
}
}

/**
* Sends data during a live COPY IN operation. Only unlocks the connection if server suddenly
* returns CommandComplete, which should not happen
*
* @param op the CopyIn operation presumably currently holding lock on this connection
* @param from the source of bytes, e.g. a ByteBufferByteStreamWriter
* @throws SQLException on failure
*/
public synchronized void writeToCopy(CopyOperationImpl op, ByteStreamWriter from)
throws SQLException {
if (!hasLock(op)) {
throw new PSQLException(GT.tr("Tried to write to an inactive copy operation"),
PSQLState.OBJECT_NOT_IN_STATE);
}

int siz = from.getLength();
LOGGER.log(Level.FINEST, " FE=> CopyData({0})", siz);

try {
pgStream.sendChar('d');
pgStream.sendInteger4(siz + 4);
pgStream.send(from);
davecramer marked this conversation as resolved.
Show resolved Hide resolved
} catch (IOException ioe) {
throw new PSQLException(GT.tr("Database connection failed when writing to copy"),
PSQLState.CONNECTION_FAILURE, ioe);
Expand Down
10 changes: 10 additions & 0 deletions pgjdbc/src/test/java/org/postgresql/test/jdbc2/CopyTest.java
Expand Up @@ -18,6 +18,7 @@
import org.postgresql.copy.PGCopyOutputStream;
import org.postgresql.core.ServerVersion;
import org.postgresql.test.TestUtil;
import org.postgresql.util.ByteBufferByteStreamWriter;
import org.postgresql.util.PSQLState;

import org.junit.After;
Expand All @@ -31,6 +32,7 @@
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.StringReader;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down Expand Up @@ -210,6 +212,14 @@ public void testCopyInFromReader() throws SQLException, IOException {
assertEquals(dataRows, rowCount);
}

@Test
public void testCopyInFromByteStreamWriter() throws SQLException, IOException {
String sql = "COPY copytest FROM STDIN";
copyAPI.copyIn(sql, new ByteBufferByteStreamWriter(ByteBuffer.wrap(getData(origData))));
int rowCount = getCount();
assertEquals(dataRows, rowCount);
}

@Test
public void testSkipping() {
String sql = "COPY copytest FROM STDIN";
Expand Down