Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check connection liveness in beginTx #421

Merged
merged 2 commits into from
Oct 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Golang SQL database driver for [Yandex ClickHouse](https://clickhouse.yandex/)
* pool_size - the maximum amount of preallocated byte chunks used in queries (default is 100). Decrease this if you experience memory problems at the expense of more GC pressure and vice versa.
* debug - enable debug output (boolean value)
* compress - enable lz4 compression (integer value, default is '0')
* check_connection_liveness - on supported platforms non-secure connections retrieved from the connection pool are checked in beginTx() for liveness before using them. If the check fails, the respective connection is marked as bad and the query retried with another connection. (boolean value, default is 'true')

SSL/TLS parameters:

Expand Down
48 changes: 29 additions & 19 deletions bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,22 @@ func open(dsn string) (*clickhouse, error) {
return nil, err
}
var (
hosts = []string{url.Host}
query = url.Query()
secure = false
skipVerify = false
tlsConfigName = query.Get("tls_config")
noDelay = true
compress = false
database = query.Get("database")
username = query.Get("username")
password = query.Get("password")
blockSize = 1000000
connTimeout = DefaultConnTimeout
readTimeout = DefaultReadTimeout
writeTimeout = DefaultWriteTimeout
connOpenStrategy = connOpenRandom
hosts = []string{url.Host}
query = url.Query()
secure = false
skipVerify = false
tlsConfigName = query.Get("tls_config")
noDelay = true
compress = false
database = query.Get("database")
username = query.Get("username")
password = query.Get("password")
blockSize = 1000000
connTimeout = DefaultConnTimeout
readTimeout = DefaultReadTimeout
writeTimeout = DefaultWriteTimeout
connOpenStrategy = connOpenRandom
checkConnLiveness = true
)
if len(database) == 0 {
database = DefaultDatabase
Expand Down Expand Up @@ -156,12 +157,21 @@ func open(dsn string) (*clickhouse, error) {
compress = v
}

if v, err := strconv.ParseBool(query.Get("check_connection_liveness")); err == nil {
checkConnLiveness = v
}
if secure {
// There is no way to check the liveness of a secure connection, as long as there is no access to raw TCP net.Conn
checkConnLiveness = false
}

var (
ch = clickhouse{
logf: func(string, ...interface{}) {},
settings: settings,
compress: compress,
blockSize: blockSize,
logf: func(string, ...interface{}) {},
settings: settings,
compress: compress,
blockSize: blockSize,
checkConnLiveness: checkConnLiveness,
ServerInfo: data.ServerInfo{
Timezone: time.Local,
},
Expand Down
33 changes: 23 additions & 10 deletions clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,17 @@ type clickhouse struct {
sync.Mutex
data.ServerInfo
data.ClientInfo
logf logger
conn *connect
block *data.Block
buffer *bufio.Writer
decoder *binary.Decoder
encoder *binary.Encoder
settings *querySettings
compress bool
blockSize int
inTransaction bool
logf logger
conn *connect
block *data.Block
buffer *bufio.Writer
decoder *binary.Decoder
encoder *binary.Encoder
settings *querySettings
compress bool
blockSize int
inTransaction bool
checkConnLiveness bool
}

func (ch *clickhouse) Prepare(query string) (driver.Stmt, error) {
Expand Down Expand Up @@ -124,6 +125,18 @@ func (ch *clickhouse) beginTx(ctx context.Context, opts txOptions) (*clickhouse,
case ch.conn.closed:
return nil, driver.ErrBadConn
}

// Perform a stale connection check. We only perform this check in beginTx,
// because database/sql retries driver.ErrBadConn only for first request,
// but beginTx doesn't perform any other network interaction.
if ch.checkConnLiveness {
if err := ch.conn.connCheck(); err != nil {
ch.logf("[begin] closing bad idle connection: %w", err)
ch.Close()
return ch, driver.ErrBadConn
}
}

if finish := ch.watchCancel(ctx); finish != nil {
defer finish()
}
Expand Down
57 changes: 57 additions & 0 deletions connect_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// +build linux darwin dragonfly freebsd netbsd openbsd solaris illumos

package clickhouse

import (
"errors"
"fmt"
"io"
"syscall"
"time"
)

var errUnexpectedRead = errors.New("unexpected read from socket")

func (conn *connect) connCheck() error {
var sysErr error

sysConn, ok := conn.Conn.(syscall.Conn)
if !ok {
return nil
}
rawConn, err := sysConn.SyscallConn()
if err != nil {
return err
}
// If this connection has a ReadTimeout which we've been setting on
// reads, reset it to zero value before we attempt a non-blocking
// read, otherwise we may get os.ErrDeadlineExceeded for the cached
// connection from the pool with an expired timeout.
if conn.readTimeout != 0 {
err = conn.SetReadDeadline(time.Time{})
if err != nil {
return fmt.Errorf("set read deadline: %w", err)
}
conn.lastReadDeadlineTime = time.Time{}
}
err = rawConn.Read(func(fd uintptr) bool {
var buf [1]byte
n, err := syscall.Read(int(fd), buf[:])
switch {
case n == 0 && err == nil:
sysErr = io.EOF
case n > 0:
sysErr = errUnexpectedRead
case err == syscall.EAGAIN || err == syscall.EWOULDBLOCK:
sysErr = nil
default:
sysErr = err
}
return true
})
if err != nil {
return err
}

return sysErr
}
9 changes: 9 additions & 0 deletions connect_check_dummy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// +build !linux,!darwin,!dragonfly,!freebsd,!netbsd,!openbsd,!solaris,!illumos

package clickhouse

import "net"

func connCheck(conn net.Conn) error {
return nil
}
85 changes: 85 additions & 0 deletions connect_check_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package clickhouse

import (
"context"
"database/sql"
"database/sql/driver"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func Test_ConnCheck(t *testing.T) {
const (
ddl = `
CREATE TABLE clickhouse_test_conncheck (
Value String
) Engine = Memory
`
dml = `
INSERT INTO clickhouse_test_conncheck
VALUES (?)
`
)

if connect, err := sql.Open("clickhouse", "tcp://127.0.0.1:9000?debug=false"); assert.NoError(t, err) {
// We can only change the settings at the connection level.
// If we have only one connection, we change the settings specifically for that connection.
connect.SetMaxOpenConns(1)
if _, err := connect.Exec("DROP TABLE IF EXISTS clickhouse_test_conncheck"); assert.NoError(t, err) {
if _, err := connect.Exec(ddl); assert.NoError(t, err) {
_, err = connect.Exec("set idle_connection_timeout=1")
assert.NoError(t, err)

_, err = connect.Exec("set tcp_keep_alive_timeout=0")
assert.NoError(t, err)

time.Sleep(1100 * time.Millisecond)
ctx := context.Background()
tx, err := connect.BeginTx(ctx, nil)
assert.NoError(t, err)

_, err = tx.PrepareContext(ctx, dml)
assert.NoError(t, err)
}
}
}
}

func Test_ConnCheckNegative(t *testing.T) {
const (
ddl = `
CREATE TABLE clickhouse_test_conncheck_negative (
Value String
) Engine = Memory
`
dml = `
INSERT INTO clickhouse_test_conncheck_negative
VALUES (?)
`
)

if connect, err := sql.Open("clickhouse", "tcp://127.0.0.1:9000?debug=true&check_connection_liveness=false"); assert.NoError(t, err) {
// We can only change the settings at the connection level.
// If we have only one connection, we change the settings specifically for that connection.
connect.SetMaxOpenConns(1)
if _, err := connect.Exec("DROP TABLE IF EXISTS clickhouse_test_conncheck_negative"); assert.NoError(t, err) {
if _, err := connect.Exec(ddl); assert.NoError(t, err) {
_, err = connect.Exec("set idle_connection_timeout=1")
assert.NoError(t, err)

_, err = connect.Exec("set tcp_keep_alive_timeout=0")
assert.NoError(t, err)

time.Sleep(1100 * time.Millisecond)
ctx := context.Background()
tx, err := connect.BeginTx(ctx, nil)
assert.NoError(t, err)

_, err = tx.PrepareContext(ctx, dml)
assert.Equal(t, driver.ErrBadConn, err)
}
}
}
}