From db506d11c50247c0058993742697403260d0ad64 Mon Sep 17 00:00:00 2001 From: Lars Kanis Date: Tue, 14 Jun 2022 13:58:14 +0200 Subject: [PATCH] Don't flush at each put_copy_data call, but flush at get_result This better mimics, what libpq does internally. put_copy_data is significantly faster when it doesn't flush at every call. This is by a factor of 4 on Linux and 10 on Windows when sending typical per-row blocks of 60 byte. put_copy_end unconditionally calls flush in libpq, so it is not changed here. Also adjust conn.block to send all enqueued data to mimic the behavior of get_result in libpq. With the change to put_copy_data, unsent data can happen, when largs blocks are sent. In this case get_result should catch up on flush. This is what the newly added spec verifies. PQgetResult does flushing based on it's internal states that we don't have access to. Since conn.block is performance critical in case of single_row_mode, we don't flush at every call to conn.block, but only when it's about to wait for IO. --- ext/pg_connection.c | 6 ++++++ lib/pg/connection.rb | 6 +++--- spec/pg/connection_spec.rb | 13 +++++++++++++ 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/ext/pg_connection.c b/ext/pg_connection.c index 2d8ac37df..214ebba81 100644 --- a/ext/pg_connection.c +++ b/ext/pg_connection.c @@ -24,6 +24,7 @@ static VALUE pgconn_set_default_encoding( VALUE self ); static VALUE pgconn_wait_for_flush( VALUE self ); static void pgconn_set_internal_encoding_index( VALUE ); static const rb_data_type_t pg_connection_type; +static VALUE pgconn_async_flush(VALUE self); /* * Global functions @@ -2425,6 +2426,11 @@ wait_socket_readable( VALUE self, struct timeval *ptimeout, void *(*is_readable) /* Is the given timeout valid? */ if( !ptimeout || (waittime.tv_sec >= 0 && waittime.tv_usec >= 0) ){ + /* before we wait for data, make sure everything has been sent */ + pgconn_async_flush(self); + if ((retval=is_readable(conn))) + return retval; + VALUE socket_io = pgconn_socket_io(self); /* Wait for the socket to become readable before checking again */ ret = pg_rb_io_wait(socket_io, RB_INT2NUM(PG_RUBY_IO_READABLE), wait_timeout); diff --git a/lib/pg/connection.rb b/lib/pg/connection.rb index cc72286ee..7ca5970f4 100644 --- a/lib/pg/connection.rb +++ b/lib/pg/connection.rb @@ -407,10 +407,10 @@ def isnonblocking # See also #copy_data. # def put_copy_data(buffer, encoder=nil) - until sync_put_copy_data(buffer, encoder) - flush + until res=sync_put_copy_data(buffer, encoder) + res = flush end - flush + res end alias async_put_copy_data put_copy_data diff --git a/spec/pg/connection_spec.rb b/spec/pg/connection_spec.rb index 75a3e26db..9a935240f 100644 --- a/spec/pg/connection_spec.rb +++ b/spec/pg/connection_spec.rb @@ -984,6 +984,19 @@ expect( results ).to include( "1\n", "2\n" ) end + it "#get_result should send remaining data before waiting" do + str = "abcd" * 2000 + "\n" + @conn.exec( "CREATE TEMP TABLE copytable2 (col1 TEXT)" ) + @conn.exec( "COPY copytable2 FROM STDOUT" ) + + 1000.times do + @conn.sync_put_copy_data(str) + end + @conn.sync_put_copy_end + res = @conn.get_last_result + expect( res.result_status ).to eq( PG::PGRES_COMMAND_OK ) + end + describe "#copy_data" do it "can process #copy_data output queries" do rows = []