From 342cafe2103ad80d82b5620ad4e0203919463770 Mon Sep 17 00:00:00 2001 From: Damir Sayfutdinov Date: Thu, 7 Oct 2021 19:13:20 +0300 Subject: [PATCH] Check connection liveness in beginTx This commit fixes for ClickHouse/clickhouse-go#213. If the connection closes from the server side for some reason, the database driver returns driver.ErrBadConn to database/sql. Usually, database/sql retries a request, assuming that the error occurs in a function that could be called first after retrieving a connection from the pool. But beginTx in clickhouse-go doesn't perform any network interaction and driver.ErrBadConn is returned later in the transaction. database/sql doesn't retry it because assumes that connection is alive - beginTx didn't return the error. This commit adds a method to check the connection liveness and performs that check in beginTx function. The check is taken from go-sql-driver/mysql#934. There is no way to check the liveness of a secure connection, as long as there is no access to raw TCP net.Conn in clickhouse-go. --- README.md | 1 + bootstrap.go | 50 +++++++++++++++---------- clickhouse.go | 33 +++++++++++----- connect_check.go | 57 ++++++++++++++++++++++++++++ connect_check_dummy.go | 9 +++++ connect_check_test.go | 85 ++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 205 insertions(+), 30 deletions(-) create mode 100644 connect_check.go create mode 100644 connect_check_dummy.go create mode 100644 connect_check_test.go diff --git a/README.md b/README.md index 397acea60f..208ab1fbc1 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ Golang SQL database driver for [Yandex ClickHouse](https://clickhouse.yandex/) * pool_size - maximum amount of preallocated byte chunks used in queries (default is 100). Decrease this if you experience memory problems at the expense of more GC pressure and vice versa. * debug - enable debug output (boolean value) * compress - enable lz4 compression (integer value, default is '0') +* check_connection_liveness - on supported platforms non-secure connections retrieved from the connection pool are checked in beginTx() for liveness before using them. If the check fails, the respective connection is marked as bad and the query retried with another connection. (boolean value, default is 'true') SSL/TLS parameters: diff --git a/bootstrap.go b/bootstrap.go index 8e44bd3387..57a7e549cc 100644 --- a/bootstrap.go +++ b/bootstrap.go @@ -85,22 +85,23 @@ func open(dsn string) (*clickhouse, error) { return nil, err } var ( - hosts = []string{url.Host} - query = url.Query() - secure = false - skipVerify = false - tlsConfigName = query.Get("tls_config") - noDelay = true - compress = false - database = query.Get("database") - username = query.Get("username") - password = query.Get("password") - blockSize = 1000000 - connTimeout = DefaultConnTimeout - readTimeout = DefaultReadTimeout - writeTimeout = DefaultWriteTimeout - connOpenStrategy = connOpenRandom - poolSize = 100 + hosts = []string{url.Host} + query = url.Query() + secure = false + skipVerify = false + tlsConfigName = query.Get("tls_config") + noDelay = true + compress = false + database = query.Get("database") + username = query.Get("username") + password = query.Get("password") + blockSize = 1000000 + connTimeout = DefaultConnTimeout + readTimeout = DefaultReadTimeout + writeTimeout = DefaultWriteTimeout + connOpenStrategy = connOpenRandom + poolSize = 100 + checkConnLiveness = true ) if len(database) == 0 { database = DefaultDatabase @@ -165,12 +166,21 @@ func open(dsn string) (*clickhouse, error) { compress = v } + if v, err := strconv.ParseBool(query.Get("check_connection_liveness")); err == nil { + checkConnLiveness = v + } + if secure { + // There is no way to check the liveness of a secure connection, as long as there is no access to raw TCP net.Conn + checkConnLiveness = false + } + var ( ch = clickhouse{ - logf: func(string, ...interface{}) {}, - settings: settings, - compress: compress, - blockSize: blockSize, + logf: func(string, ...interface{}) {}, + settings: settings, + compress: compress, + blockSize: blockSize, + checkConnLiveness: checkConnLiveness, ServerInfo: data.ServerInfo{ Timezone: time.Local, }, diff --git a/clickhouse.go b/clickhouse.go index 7a8ba8fe91..9ad12797a1 100644 --- a/clickhouse.go +++ b/clickhouse.go @@ -47,16 +47,17 @@ type clickhouse struct { sync.Mutex data.ServerInfo data.ClientInfo - logf logger - conn *connect - block *data.Block - buffer *bufio.Writer - decoder *binary.Decoder - encoder *binary.Encoder - settings *querySettings - compress bool - blockSize int - inTransaction bool + logf logger + conn *connect + block *data.Block + buffer *bufio.Writer + decoder *binary.Decoder + encoder *binary.Encoder + settings *querySettings + compress bool + blockSize int + inTransaction bool + checkConnLiveness bool } func (ch *clickhouse) Prepare(query string) (driver.Stmt, error) { @@ -124,6 +125,18 @@ func (ch *clickhouse) beginTx(ctx context.Context, opts txOptions) (*clickhouse, case ch.conn.closed: return nil, driver.ErrBadConn } + + // Perform a stale connection check. We only perform this check in beginTx, + // because database/sql retries driver.ErrBadConn only for first request, + // but beginTx doesn't perform any other network interaction. + if ch.checkConnLiveness { + if err := ch.conn.connCheck(); err != nil { + ch.logf("[begin] closing bad idle connection: %w", err) + ch.Close() + return ch, driver.ErrBadConn + } + } + if finish := ch.watchCancel(ctx); finish != nil { defer finish() } diff --git a/connect_check.go b/connect_check.go new file mode 100644 index 0000000000..4b01a13e9f --- /dev/null +++ b/connect_check.go @@ -0,0 +1,57 @@ +// +build linux darwin dragonfly freebsd netbsd openbsd solaris illumos + +package clickhouse + +import ( + "errors" + "fmt" + "io" + "syscall" + "time" +) + +var errUnexpectedRead = errors.New("unexpected read from socket") + +func (conn *connect) connCheck() error { + var sysErr error + + sysConn, ok := conn.Conn.(syscall.Conn) + if !ok { + return nil + } + rawConn, err := sysConn.SyscallConn() + if err != nil { + return err + } + // If this connection has a ReadTimeout which we've been setting on + // reads, reset it to zero value before we attempt a non-blocking + // read, otherwise we may get os.ErrDeadlineExceeded for the cached + // connection from the pool with an expired timeout. + if conn.readTimeout != 0 { + err = conn.SetReadDeadline(time.Time{}) + if err != nil { + return fmt.Errorf("set read deadline: %w", err) + } + conn.lastReadDeadlineTime = time.Time{} + } + err = rawConn.Read(func(fd uintptr) bool { + var buf [1]byte + n, err := syscall.Read(int(fd), buf[:]) + switch { + case n == 0 && err == nil: + sysErr = io.EOF + case n > 0: + sysErr = errUnexpectedRead + case err == syscall.EAGAIN || err == syscall.EWOULDBLOCK: + sysErr = nil + default: + sysErr = err + } + return true + }) + if err != nil { + return err + } + + return sysErr +} diff --git a/connect_check_dummy.go b/connect_check_dummy.go new file mode 100644 index 0000000000..2bdb4dcc7a --- /dev/null +++ b/connect_check_dummy.go @@ -0,0 +1,9 @@ +// +build !linux,!darwin,!dragonfly,!freebsd,!netbsd,!openbsd,!solaris,!illumos + +package clickhouse + +import "net" + +func connCheck(conn net.Conn) error { + return nil +} diff --git a/connect_check_test.go b/connect_check_test.go new file mode 100644 index 0000000000..02f48dd0b6 --- /dev/null +++ b/connect_check_test.go @@ -0,0 +1,85 @@ +package clickhouse + +import ( + "context" + "database/sql" + "database/sql/driver" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func Test_ConnCheck(t *testing.T) { + const ( + ddl = ` + CREATE TABLE clickhouse_test_conncheck ( + Value String + ) Engine = Memory + ` + dml = ` + INSERT INTO clickhouse_test_conncheck + VALUES (?) + ` + ) + + if connect, err := sql.Open("clickhouse", "tcp://127.0.0.1:9000?debug=false"); assert.NoError(t, err) { + // We could change settings only at session level. + // If we have only 1 connection, we guarantee that we change settings for them. + connect.SetMaxOpenConns(1) + if _, err := connect.Exec("DROP TABLE IF EXISTS clickhouse_test_conncheck"); assert.NoError(t, err) { + if _, err := connect.Exec(ddl); assert.NoError(t, err) { + _, err = connect.Exec("set idle_connection_timeout=1") + assert.NoError(t, err) + + _, err = connect.Exec("set tcp_keep_alive_timeout=0") + assert.NoError(t, err) + + time.Sleep(1100 * time.Millisecond) + ctx := context.Background() + tx, err := connect.BeginTx(ctx, nil) + assert.NoError(t, err) + + _, err = tx.PrepareContext(ctx, dml) + assert.NoError(t, err) + } + } + } +} + +func Test_ConnCheckNegative(t *testing.T) { + const ( + ddl = ` + CREATE TABLE clickhouse_test_conncheck_negative ( + Value String + ) Engine = Memory + ` + dml = ` + INSERT INTO clickhouse_test_conncheck_negative + VALUES (?) + ` + ) + + if connect, err := sql.Open("clickhouse", "tcp://127.0.0.1:9000?debug=true&check_connection_liveness=false"); assert.NoError(t, err) { + // We can only change the settings at the connection level. + // If we have only one connection, we change the settings specifically for that connection. + connect.SetMaxOpenConns(1) + if _, err := connect.Exec("DROP TABLE IF EXISTS clickhouse_test_conncheck_negative"); assert.NoError(t, err) { + if _, err := connect.Exec(ddl); assert.NoError(t, err) { + _, err = connect.Exec("set idle_connection_timeout=1") + assert.NoError(t, err) + + _, err = connect.Exec("set tcp_keep_alive_timeout=0") + assert.NoError(t, err) + + time.Sleep(1100 * time.Millisecond) + ctx := context.Background() + tx, err := connect.BeginTx(ctx, nil) + assert.NoError(t, err) + + _, err = tx.PrepareContext(ctx, dml) + assert.Equal(t, driver.ErrBadConn, err) + } + } + } +}