Skip to content
This repository has been archived by the owner on Feb 26, 2024. It is now read-only.

Commit

Permalink
Check connection liveness in beginTx
Browse files Browse the repository at this point in the history
This commit fixes for ClickHouse#213.

If the connection closes from the server side for some reason, the database driver returns driver.ErrBadConn to database/sql.

Usually, database/sql retries a request, assuming that the error occurs in a function that could be called first after retrieving a connection from the pool.
But beginTx in clickhouse-go doesn't perform any network interaction and driver.ErrBadConn is returned later in the transaction. database/sql doesn't retry it because assumes that connection is alive - beginTx didn't return the error.

 This commit adds a method to check the connection liveness and performs that check in beginTx function.

 The check is taken from go-sql-driver/mysql#934.

There is no way to check the liveness of a secure connection, as long as there is no access to raw TCP net.Conn in clickhouse-go.
  • Loading branch information
saydamir authored and Alan Braithwaite committed Oct 23, 2021
1 parent 860c275 commit 342cafe
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 30 deletions.
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -26,6 +26,7 @@ Golang SQL database driver for [Yandex ClickHouse](https://clickhouse.yandex/)
* pool_size - maximum amount of preallocated byte chunks used in queries (default is 100). Decrease this if you experience memory problems at the expense of more GC pressure and vice versa.
* debug - enable debug output (boolean value)
* compress - enable lz4 compression (integer value, default is '0')
* check_connection_liveness - on supported platforms non-secure connections retrieved from the connection pool are checked in beginTx() for liveness before using them. If the check fails, the respective connection is marked as bad and the query retried with another connection. (boolean value, default is 'true')

SSL/TLS parameters:

Expand Down
50 changes: 30 additions & 20 deletions bootstrap.go
Expand Up @@ -85,22 +85,23 @@ func open(dsn string) (*clickhouse, error) {
return nil, err
}
var (
hosts = []string{url.Host}
query = url.Query()
secure = false
skipVerify = false
tlsConfigName = query.Get("tls_config")
noDelay = true
compress = false
database = query.Get("database")
username = query.Get("username")
password = query.Get("password")
blockSize = 1000000
connTimeout = DefaultConnTimeout
readTimeout = DefaultReadTimeout
writeTimeout = DefaultWriteTimeout
connOpenStrategy = connOpenRandom
poolSize = 100
hosts = []string{url.Host}
query = url.Query()
secure = false
skipVerify = false
tlsConfigName = query.Get("tls_config")
noDelay = true
compress = false
database = query.Get("database")
username = query.Get("username")
password = query.Get("password")
blockSize = 1000000
connTimeout = DefaultConnTimeout
readTimeout = DefaultReadTimeout
writeTimeout = DefaultWriteTimeout
connOpenStrategy = connOpenRandom
poolSize = 100
checkConnLiveness = true
)
if len(database) == 0 {
database = DefaultDatabase
Expand Down Expand Up @@ -165,12 +166,21 @@ func open(dsn string) (*clickhouse, error) {
compress = v
}

if v, err := strconv.ParseBool(query.Get("check_connection_liveness")); err == nil {
checkConnLiveness = v
}
if secure {
// There is no way to check the liveness of a secure connection, as long as there is no access to raw TCP net.Conn
checkConnLiveness = false
}

var (
ch = clickhouse{
logf: func(string, ...interface{}) {},
settings: settings,
compress: compress,
blockSize: blockSize,
logf: func(string, ...interface{}) {},
settings: settings,
compress: compress,
blockSize: blockSize,
checkConnLiveness: checkConnLiveness,
ServerInfo: data.ServerInfo{
Timezone: time.Local,
},
Expand Down
33 changes: 23 additions & 10 deletions clickhouse.go
Expand Up @@ -47,16 +47,17 @@ type clickhouse struct {
sync.Mutex
data.ServerInfo
data.ClientInfo
logf logger
conn *connect
block *data.Block
buffer *bufio.Writer
decoder *binary.Decoder
encoder *binary.Encoder
settings *querySettings
compress bool
blockSize int
inTransaction bool
logf logger
conn *connect
block *data.Block
buffer *bufio.Writer
decoder *binary.Decoder
encoder *binary.Encoder
settings *querySettings
compress bool
blockSize int
inTransaction bool
checkConnLiveness bool
}

func (ch *clickhouse) Prepare(query string) (driver.Stmt, error) {
Expand Down Expand Up @@ -124,6 +125,18 @@ func (ch *clickhouse) beginTx(ctx context.Context, opts txOptions) (*clickhouse,
case ch.conn.closed:
return nil, driver.ErrBadConn
}

// Perform a stale connection check. We only perform this check in beginTx,
// because database/sql retries driver.ErrBadConn only for first request,
// but beginTx doesn't perform any other network interaction.
if ch.checkConnLiveness {
if err := ch.conn.connCheck(); err != nil {
ch.logf("[begin] closing bad idle connection: %w", err)
ch.Close()
return ch, driver.ErrBadConn
}
}

if finish := ch.watchCancel(ctx); finish != nil {
defer finish()
}
Expand Down
57 changes: 57 additions & 0 deletions connect_check.go
@@ -0,0 +1,57 @@
// +build linux darwin dragonfly freebsd netbsd openbsd solaris illumos

package clickhouse

import (
"errors"
"fmt"
"io"
"syscall"
"time"
)

var errUnexpectedRead = errors.New("unexpected read from socket")

func (conn *connect) connCheck() error {
var sysErr error

sysConn, ok := conn.Conn.(syscall.Conn)
if !ok {
return nil
}
rawConn, err := sysConn.SyscallConn()
if err != nil {
return err
}
// If this connection has a ReadTimeout which we've been setting on
// reads, reset it to zero value before we attempt a non-blocking
// read, otherwise we may get os.ErrDeadlineExceeded for the cached
// connection from the pool with an expired timeout.
if conn.readTimeout != 0 {
err = conn.SetReadDeadline(time.Time{})
if err != nil {
return fmt.Errorf("set read deadline: %w", err)
}
conn.lastReadDeadlineTime = time.Time{}
}
err = rawConn.Read(func(fd uintptr) bool {
var buf [1]byte
n, err := syscall.Read(int(fd), buf[:])
switch {
case n == 0 && err == nil:
sysErr = io.EOF
case n > 0:
sysErr = errUnexpectedRead
case err == syscall.EAGAIN || err == syscall.EWOULDBLOCK:
sysErr = nil
default:
sysErr = err
}
return true
})
if err != nil {
return err
}

return sysErr
}
9 changes: 9 additions & 0 deletions connect_check_dummy.go
@@ -0,0 +1,9 @@
// +build !linux,!darwin,!dragonfly,!freebsd,!netbsd,!openbsd,!solaris,!illumos

package clickhouse

import "net"

func connCheck(conn net.Conn) error {
return nil
}
85 changes: 85 additions & 0 deletions connect_check_test.go
@@ -0,0 +1,85 @@
package clickhouse

import (
"context"
"database/sql"
"database/sql/driver"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func Test_ConnCheck(t *testing.T) {
const (
ddl = `
CREATE TABLE clickhouse_test_conncheck (
Value String
) Engine = Memory
`
dml = `
INSERT INTO clickhouse_test_conncheck
VALUES (?)
`
)

if connect, err := sql.Open("clickhouse", "tcp://127.0.0.1:9000?debug=false"); assert.NoError(t, err) {
// We could change settings only at session level.
// If we have only 1 connection, we guarantee that we change settings for them.
connect.SetMaxOpenConns(1)
if _, err := connect.Exec("DROP TABLE IF EXISTS clickhouse_test_conncheck"); assert.NoError(t, err) {
if _, err := connect.Exec(ddl); assert.NoError(t, err) {
_, err = connect.Exec("set idle_connection_timeout=1")
assert.NoError(t, err)

_, err = connect.Exec("set tcp_keep_alive_timeout=0")
assert.NoError(t, err)

time.Sleep(1100 * time.Millisecond)
ctx := context.Background()
tx, err := connect.BeginTx(ctx, nil)
assert.NoError(t, err)

_, err = tx.PrepareContext(ctx, dml)
assert.NoError(t, err)
}
}
}
}

func Test_ConnCheckNegative(t *testing.T) {
const (
ddl = `
CREATE TABLE clickhouse_test_conncheck_negative (
Value String
) Engine = Memory
`
dml = `
INSERT INTO clickhouse_test_conncheck_negative
VALUES (?)
`
)

if connect, err := sql.Open("clickhouse", "tcp://127.0.0.1:9000?debug=true&check_connection_liveness=false"); assert.NoError(t, err) {
// We can only change the settings at the connection level.
// If we have only one connection, we change the settings specifically for that connection.
connect.SetMaxOpenConns(1)
if _, err := connect.Exec("DROP TABLE IF EXISTS clickhouse_test_conncheck_negative"); assert.NoError(t, err) {
if _, err := connect.Exec(ddl); assert.NoError(t, err) {
_, err = connect.Exec("set idle_connection_timeout=1")
assert.NoError(t, err)

_, err = connect.Exec("set tcp_keep_alive_timeout=0")
assert.NoError(t, err)

time.Sleep(1100 * time.Millisecond)
ctx := context.Background()
tx, err := connect.BeginTx(ctx, nil)
assert.NoError(t, err)

_, err = tx.PrepareContext(ctx, dml)
assert.Equal(t, driver.ErrBadConn, err)
}
}
}
}

0 comments on commit 342cafe

Please sign in to comment.