Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retries, ping timeout, sql mock handler #75

Merged
merged 5 commits into from Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -3,3 +3,5 @@
.vscode
.idea/
*.iml

mock-data
57 changes: 53 additions & 4 deletions datasource.go
Expand Up @@ -233,6 +233,20 @@ func (ds *SQLDatasource) handleQuery(ctx context.Context, req backend.DataQuery,
return QueryDB(ctx, db, ds.c.Converters(), fillMode, q)
}

// allow retries on timeouts
if errors.Is(err, context.DeadlineExceeded) {
for i := 0; i < ds.driverSettings.Retries; i++ {
backend.Logger.Warn(fmt.Sprintf("connection timed out. retrying %d times", i))
db, err := ds.c.Connect(dbConn.settings, q.ConnectionArgs)
if err != nil {
continue
}
ds.storeDBConnection(cacheKey, dbConnection{db, dbConn.settings})

return QueryDB(ctx, db, ds.c.Converters(), fillMode, q)
}
}

return nil, err
}

Expand All @@ -243,11 +257,39 @@ func (ds *SQLDatasource) CheckHealth(ctx context.Context, req *backend.CheckHeal
if !ok {
return nil, MissingDBConnection
}
if err := dbConn.db.Ping(); err != nil {

if ds.driverSettings.Retries == 0 {
return ds.check(dbConn)
}

return ds.checkWithRetries(dbConn)
}

func (ds *SQLDatasource) DriverSettings() DriverSettings {
return ds.driverSettings
}

func (ds *SQLDatasource) checkWithRetries(conn dbConnection) (*backend.CheckHealthResult, error) {
var result *backend.CheckHealthResult
var err error

for i := 0; i < ds.driverSettings.Retries; i++ {
result, err = ds.check(conn)
if err == nil {
return result, err
}
}

// TODO: failed health checks don't return an error
return result, nil
}

func (ds *SQLDatasource) check(conn dbConnection) (*backend.CheckHealthResult, error) {
if err := ds.ping(conn); err != nil {
return &backend.CheckHealthResult{
Status: backend.HealthStatusError,
Message: err.Error(),
}, nil
}, err
}

return &backend.CheckHealthResult{
Expand All @@ -256,6 +298,13 @@ func (ds *SQLDatasource) CheckHealth(ctx context.Context, req *backend.CheckHeal
}, nil
}

func (ds *SQLDatasource) DriverSettings() DriverSettings {
return ds.driverSettings
func (ds *SQLDatasource) ping(conn dbConnection) error {
if ds.driverSettings.Timeout == 0 {
return conn.db.Ping()
}

ctx, cancel := context.WithTimeout(context.Background(), ds.driverSettings.Timeout)
defer cancel()

return conn.db.PingContext(ctx)
}
62 changes: 61 additions & 1 deletion datasource_test.go
@@ -1,12 +1,17 @@
package sqlds

import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"testing"
"time"

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/sqlds/v2/mock"
"github.com/stretchr/testify/assert"
)

type fakeDriver struct {
Expand All @@ -15,10 +20,12 @@ type fakeDriver struct {
Driver
}

func (d *fakeDriver) Connect(backend.DataSourceInstanceSettings, json.RawMessage) (db *sql.DB, err error) {
func (d fakeDriver) Connect(backend.DataSourceInstanceSettings, json.RawMessage) (db *sql.DB, err error) {
return d.db, nil
}

// func (d fakeDriver) Settings(backend.DataSourceInstanceSettings) DriverSettings

func Test_getDBConnectionFromQuery(t *testing.T) {
db := &sql.DB{}
db2 := &sql.DB{}
Expand Down Expand Up @@ -113,3 +120,56 @@ func Test_Dispose(t *testing.T) {
}
})
}

func Test_retries(t *testing.T) {
dsUID := "timeout"
settings := backend.DataSourceInstanceSettings{UID: dsUID}

handler := testSqlHandler{}
mockDriver := "sqlmock"
mock.RegisterDriver(mockDriver, handler)
db, err := sql.Open(mockDriver, "")
if err != nil {
t.Errorf("failed to connect to mock driver: %v", err)
}
timeoutDriver := fakeDriver{
db: db,
}
retries := 5
max := time.Duration(testTimeout) * time.Second
driverSettings := DriverSettings{Retries: retries, Timeout: max}
ds := &SQLDatasource{c: timeoutDriver, driverSettings: driverSettings}

key := defaultKey(dsUID)
// Add the mandatory default db
ds.storeDBConnection(key, dbConnection{db, settings})
ctx := context.Background()
req := &backend.CheckHealthRequest{
PluginContext: backend.PluginContext{
DataSourceInstanceSettings: &settings,
},
}
result, err := ds.CheckHealth(ctx, req)

assert.Nil(t, err)
assert.Equal(t, retries, testCounter)
expected := context.DeadlineExceeded.Error()
assert.Equal(t, expected, result.Message)
}

var testCounter = 0
var testTimeout = 1

type testSqlHandler struct {
mock.DBHandler
}

func (s testSqlHandler) Ping(ctx context.Context) error {
testCounter++ // track the retries for the test assertion
time.Sleep(time.Duration(testTimeout + 1)) // simulate a connection delay
select {
case <-ctx.Done():
fmt.Println(ctx.Err())
return ctx.Err()
}
}
1 change: 1 addition & 0 deletions driver.go
Expand Up @@ -14,6 +14,7 @@ import (
type DriverSettings struct {
Timeout time.Duration
FillMode *data.FillMissing
Retries int
}

// Driver is a simple interface that defines how to connect to a backend SQL datasource
Expand Down
2 changes: 1 addition & 1 deletion mock/csv_data.go → mock/csv/csv_data.go
@@ -1,4 +1,4 @@
package mock
package csv

import (
"errors"
Expand Down
12 changes: 6 additions & 6 deletions mock/csv_mock.go → mock/csv/csv_mock.go
@@ -1,4 +1,4 @@
package mock
package csv

import (
"context"
Expand All @@ -13,7 +13,7 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
ds "github.com/grafana/sqlds/v2"
"github.com/grafana/sqlds/v2"
_ "github.com/mithrandie/csvq-driver"
)

Expand All @@ -22,8 +22,8 @@ type SQLCSVMock struct {
folder string
}

func (h *SQLCSVMock) Settings(config backend.DataSourceInstanceSettings) ds.DriverSettings {
return ds.DriverSettings{
func (h *SQLCSVMock) Settings(config backend.DataSourceInstanceSettings) sqlds.DriverSettings {
return sqlds.DriverSettings{
FillMode: &data.FillMissing{
Mode: data.FillModeNull,
},
Expand Down Expand Up @@ -91,6 +91,6 @@ func (h *SQLCSVMock) Converters() []sqlutil.Converter {
}

// Macros returns list of macro functions convert the macros of raw query
func (h *SQLCSVMock) Macros() ds.Macros {
return ds.Macros{}
func (h *SQLCSVMock) Macros() sqlds.Macros {
return sqlds.Macros{}
}
29 changes: 23 additions & 6 deletions mock/mock_driver.go
@@ -1,35 +1,52 @@
package mock

import (
"context"
"database/sql"
"database/sql/driver"
"encoding/json"
"errors"
"sync"

"github.com/grafana/grafana-plugin-sdk-go/backend"
)

var pool *mockDriver

func init() {
func RegisterDriver(name string, handler DBHandler) *mockDriver {
pool = &mockDriver{
conns: make(map[string]*sqlmock),
conns: make(map[string]*sqlmock),
handler: handler,
}
sql.Register("sqlmock", pool)
sql.Register(name, pool)
return pool
}

type DBHandler interface {
Ping(ctx context.Context) error
Query(args []driver.Value) (driver.Rows, error)
Columns() []string
Next(dest []driver.Value) error
}

type mockDriver struct {
sync.Mutex
counter int
conns map[string]*sqlmock
handler DBHandler
}

func (d *mockDriver) Open(dsn string) (driver.Conn, error) {
if len(d.conns) == 0 {
mock := &sqlmock{
drv: d,
sleep: 10,
drv: d,
}
d.conns = map[string]*sqlmock{
dsn: mock,
}
}
return d.conns[dsn], nil
}

func (d *mockDriver) Connect(backend.DataSourceInstanceSettings, json.RawMessage) (db *sql.DB, err error) {
return nil, errors.New("context deadline exceeded")
}
38 changes: 27 additions & 11 deletions mock/sqlmock.go
Expand Up @@ -3,12 +3,10 @@ package mock
import (
"context"
"database/sql/driver"
"time"
)

type sqlmock struct {
drv *mockDriver
sleep int
drv *mockDriver
}

// Begin meets http://golang.org/pkg/database/sql/driver/#Conn interface
Expand Down Expand Up @@ -37,14 +35,7 @@ func (c *sqlmock) Close() error {
}

func (c *sqlmock) Ping(ctx context.Context) error {
// so we can test timeout retries
if c.sleep > 0 {
v := c.sleep
next := float64(v) * .5
c.sleep = int(next)
time.Sleep(time.Duration(v) * time.Second)
}
return nil
return c.drv.handler.Ping(ctx)
}

// statement
Expand All @@ -58,6 +49,9 @@ func (stmt *statement) Exec(args []driver.Value) (driver.Result, error) {
}

func (stmt *statement) Query(args []driver.Value) (driver.Rows, error) {
if stmt.conn.drv.handler != nil {
stmt.conn.drv.handler.Query(args)
}
return nil, nil
}

Expand All @@ -68,3 +62,25 @@ func (stmt *statement) Close() error {
func (stmt *statement) NumInput() int {
return -1
}

type rows struct {
conn *sqlmock
}

func (r rows) Columns() []string {
if r.conn.drv.handler != nil {
return r.conn.drv.handler.Columns()
}
return []string{}
}

func (r rows) Close() error {
return nil
}

func (r rows) Next(dest []driver.Value) error {
if r.conn.drv.handler != nil {
return r.conn.drv.handler.Next(dest)
}
return nil
}