Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't flush at each put_copy_data call, but flush at get_result #462

Merged
merged 1 commit into from Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions ext/pg_connection.c
Expand Up @@ -24,6 +24,7 @@ static VALUE pgconn_set_default_encoding( VALUE self );
static VALUE pgconn_wait_for_flush( VALUE self );
static void pgconn_set_internal_encoding_index( VALUE );
static const rb_data_type_t pg_connection_type;
static VALUE pgconn_async_flush(VALUE self);

/*
* Global functions
Expand Down Expand Up @@ -2425,6 +2426,11 @@ wait_socket_readable( VALUE self, struct timeval *ptimeout, void *(*is_readable)

/* Is the given timeout valid? */
if( !ptimeout || (waittime.tv_sec >= 0 && waittime.tv_usec >= 0) ){
/* before we wait for data, make sure everything has been sent */
pgconn_async_flush(self);
if ((retval=is_readable(conn)))
return retval;

VALUE socket_io = pgconn_socket_io(self);
/* Wait for the socket to become readable before checking again */
ret = pg_rb_io_wait(socket_io, RB_INT2NUM(PG_RUBY_IO_READABLE), wait_timeout);
Expand Down
6 changes: 3 additions & 3 deletions lib/pg/connection.rb
Expand Up @@ -407,10 +407,10 @@ def isnonblocking
# See also #copy_data.
#
def put_copy_data(buffer, encoder=nil)
until sync_put_copy_data(buffer, encoder)
flush
until res=sync_put_copy_data(buffer, encoder)
res = flush
end
flush
res
end
alias async_put_copy_data put_copy_data

Expand Down
13 changes: 13 additions & 0 deletions spec/pg/connection_spec.rb
Expand Up @@ -984,6 +984,19 @@
expect( results ).to include( "1\n", "2\n" )
end

it "#get_result should send remaining data before waiting" do
str = "abcd" * 2000 + "\n"
@conn.exec( "CREATE TEMP TABLE copytable2 (col1 TEXT)" )
@conn.exec( "COPY copytable2 FROM STDOUT" )

1000.times do
@conn.sync_put_copy_data(str)
end
@conn.sync_put_copy_end
res = @conn.get_last_result
expect( res.result_status ).to eq( PG::PGRES_COMMAND_OK )
end

describe "#copy_data" do
it "can process #copy_data output queries" do
rows = []
Expand Down