Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement sqlexp Messages for Query/QueryContext #690

Merged
merged 15 commits into from Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions appveyor.yml
Expand Up @@ -49,6 +49,7 @@ install:
- go version
- go env
- go get -u github.com/golang-sql/civil
- go get -u github.com/golang-sql/sqlexp
shueybubbles marked this conversation as resolved.
Show resolved Hide resolved

build_script:
- go build
Expand Down
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -4,5 +4,6 @@ go 1.11

require (
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe
github.com/golang-sql/sqlexp v0.0.0-20170517235910-f1bb20e5a188
golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c
)
2 changes: 2 additions & 0 deletions go.sum
@@ -1,5 +1,7 @@
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
github.com/golang-sql/sqlexp v0.0.0-20170517235910-f1bb20e5a188 h1:+eHOFJl1BaXrQxKX+T06f78590z4qA2ZzBTqahsKSE4=
github.com/golang-sql/sqlexp v0.0.0-20170517235910-f1bb20e5a188/go.mod h1:vXjM/+wXQnTPR4KqTKDgJukSZ6amVRtWMPEjE6sQoK8=
golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c h1:Vj5n4GlwjmQteupaxJ9+0FNOmBrHfq7vN4btdGoDZgI=
golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
61 changes: 61 additions & 0 deletions messages_benchmark_test.go
@@ -0,0 +1,61 @@
// +build go1.14

package mssql

import (
"testing"
)

func BenchmarkMessageQueue(b *testing.B) {
conn := open(b)
defer conn.Close()
b.Run("BlockingQuery", func(b *testing.B) {
var errs, results float64
for i := 0; i < b.N; i++ {
r, err := conn.Query(mixedQuery)
if err != nil {
b.Fatal(err.Error())
}
defer r.Close()
active := true
first := true
for active {
active = r.Next()
if active && first {
results++
}
first = false
if !active {
if r.Err() != nil {
b.Logf("r.Err:%v", r.Err())
errs++
}
active = r.NextResultSet()
if active {
first = true
}
}
}
}
b.ReportMetric(float64(0), "msgs/op")
b.ReportMetric(errs/float64(b.N), "errors/op")
b.ReportMetric(results/float64(b.N), "results/op")
})
b.Run("NonblockingQuery", func(b *testing.B) {
var msgs, errs, results, rowcounts float64
for i := 0; i < b.N; i++ {
m, e, r, rc := testMixedQuery(conn, b)
msgs += float64(m)
errs += float64(e)
results += float64(r)
rowcounts += float64(rc)
if r != 4 {
b.Fatalf("Got wrong results count: %d, expected 4", r)
}
}
b.ReportMetric(msgs/float64(b.N), "msgs/op")
b.ReportMetric(errs/float64(b.N), "errors/op")
b.ReportMetric(results/float64(b.N), "results/op")
b.ReportMetric(rowcounts/float64(b.N), "rowcounts/op")
})
}
86 changes: 86 additions & 0 deletions messages_example_test.go
@@ -0,0 +1,86 @@
package mssql_test

import (
"context"
"database/sql"
"flag"
"fmt"
"log"
"os"

mssql "github.com/denisenkom/go-mssqldb"
"github.com/golang-sql/sqlexp"
)

const (
msgQuery = `select name from sys.tables
PRINT N'This is a message'
select 199
RAISERROR (N'Testing!' , 11, 1)
select 300
`
)

// This example shows the usage of sqlexp/Messages
func ExampleRows_usingmessages() {
flag.Parse()

if *debug {
fmt.Printf(" password:%s\n", *password)
fmt.Printf(" port:%d\n", *port)
fmt.Printf(" server:%s\n", *server)
fmt.Printf(" user:%s\n", *user)
}

connString := makeConnURL().String()
if *debug {
fmt.Printf(" connString:%s\n", connString)
}

// Create a new connector object by calling NewConnector
connector, err := mssql.NewConnector(connString)
if err != nil {
log.Println(err)
return
}

// Pass connector to sql.OpenDB to get a sql.DB object
db := sql.OpenDB(connector)
defer db.Close()
retmsg := &sqlexp.ReturnMessage{}
ctx := context.Background()
rows, err := db.QueryContext(ctx, msgQuery, retmsg)
if err != nil {
log.Fatalf("QueryContext failed: %v", err)
}
active := true
for active {
msg := retmsg.Message(ctx)
switch m := msg.(type) {
case sqlexp.MsgNotice:
fmt.Println(m.Message)
case sqlexp.MsgNext:
inresult := true
for inresult {
inresult = rows.Next()
cols, err := rows.Columns()
if err != nil {
log.Fatalf("Columns failed: %v", err)
}
fmt.Println(cols)
if inresult {
var d interface{}
rows.Scan(&d)
fmt.Println(d)
}
}
case sqlexp.MsgNextResultSet:
active = rows.NextResultSet()
case sqlexp.MsgError:
fmt.Fprintln(os.Stderr, m.Error)
case sqlexp.MsgRowsAffected:
fmt.Println("Rows affected:", m.Count)
}
}

}
173 changes: 171 additions & 2 deletions mssql.go
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/denisenkom/go-mssqldb/internal/querytext"
"github.com/denisenkom/go-mssqldb/msdsn"
"github.com/golang-sql/sqlexp"
)

// ReturnStatus may be used to return the return value from a proc.
Expand Down Expand Up @@ -179,6 +180,7 @@ type Conn struct {
type outputs struct {
params map[string]interface{}
returnStatus *ReturnStatus
msgq *sqlexp.ReturnMessage
}

// IsValid satisfies the driver.Validator interface.
Expand Down Expand Up @@ -637,6 +639,11 @@ func (s *Stmt) processQueryResponse(ctx context.Context) (res driver.Rows, err e
ctx, cancel := context.WithCancel(ctx)
reader := startReading(s.c.sess, ctx, s.c.outs)
s.c.clearOuts()
// For apps using a message queue, return right away and let Rowsq do all the work
if reader.outs.msgq != nil {
res = &Rowsq{stmt: s, reader: reader, cols: nil, cancel: cancel}
return res, nil
}
// process metadata
var cols []columnStruct
loop:
Expand Down Expand Up @@ -708,13 +715,13 @@ func (s *Stmt) processExec(ctx context.Context) (res driver.Result, err error) {
return &Result{s.c, reader.rowCount}, nil
}

// Rows represents the non-experimental data/sql model for Query and QueryContext
type Rows struct {
stmt *Stmt
cols []columnStruct
reader *tokenProcessor
nextCols []columnStruct

cancel func()
cancel func()
}

func (rc *Rows) Close() error {
Expand Down Expand Up @@ -742,6 +749,7 @@ func (rc *Rows) Close() error {
}

func (rc *Rows) Columns() (res []string) {

res = make([]string, len(rc.cols))
for i, col := range rc.cols {
res[i] = col.ColName
Expand All @@ -763,6 +771,7 @@ func (rc *Rows) Next(dest []driver.Value) error {
return io.EOF
} else {
switch tokdata := tok.(type) {
// processQueryResponse may have delegated all the token reading to us
case []columnStruct:
rc.nextCols = tokdata
return io.EOF
Expand Down Expand Up @@ -1028,3 +1037,163 @@ func (s *Stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (drive
}
return s.exec(ctx, list)
}

// Rowsq implements the sqlexp messages model for Query and QueryContext
// Theory: We could also implement the non-experimental model this way
type Rowsq struct {
stmt *Stmt
cols []columnStruct
reader *tokenProcessor
nextCols []columnStruct
cancel func()
requestDone bool
inResultSet bool
}

func (rc *Rowsq) Close() error {
rc.cancel()

for {
tok, err := rc.reader.nextToken()
if err == nil {
if tok == nil {
return nil
} else {
// continue consuming tokens
continue
}
} else {
if err == rc.reader.ctx.Err() {
return nil
} else {
return err
}
}
}
}

// data/sql calls Columns during the app's call to Next
func (rc *Rowsq) Columns() (res []string) {
if rc.cols == nil {
scan:
for {
tok, err := rc.reader.nextToken()
if err == nil {
if rc.reader.sess.logFlags&logDebug != 0 {
rc.reader.sess.log.Printf("Columns() token type:%v", reflect.TypeOf(tok))
}
if tok == nil {
return []string{}
} else {
switch tokdata := tok.(type) {
case []columnStruct:
rc.cols = tokdata
rc.inResultSet = true
break scan
}
}
}
}
}
res = make([]string, len(rc.cols))
for i, col := range rc.cols {
res[i] = col.ColName
}
return
}

func (rc *Rowsq) Next(dest []driver.Value) error {
if !rc.stmt.c.connectionGood {
return driver.ErrBadConn
}
for {
tok, err := rc.reader.nextToken()
if rc.reader.sess.logFlags&logDebug != 0 {
shueybubbles marked this conversation as resolved.
Show resolved Hide resolved
rc.reader.sess.log.Printf("Next() token type:%v", reflect.TypeOf(tok))
}
if err == nil {
if tok == nil {
return io.EOF
} else {
switch tokdata := tok.(type) {
case []interface{}:
for i := range dest {
dest[i] = tokdata[i]
}
return nil
case doneStruct:
if tokdata.Status&doneMore == 0 {
rc.requestDone = true
}
if tokdata.isError() {
e := rc.stmt.c.checkBadConn(tokdata.getError(), false)
switch e.(type) {
case Error:
// Ignore non-fatal server errors
shueybubbles marked this conversation as resolved.
Show resolved Hide resolved
default:
return e
}
}
if rc.inResultSet {
rc.inResultSet = false
return io.EOF
}
case ReturnStatus:
if rc.reader.outs.returnStatus != nil {
*rc.reader.outs.returnStatus = tokdata
}
}
}

} else {
return rc.stmt.c.checkBadConn(err, false)
}
}
}

// In Message Queue mode, we always claim another resultset could be on the way
// to avoid Rows being closed prematurely
func (rc *Rowsq) HasNextResultSet() bool {
return !rc.requestDone
}

// Scans to the next set of columns in the stream
// Note that the caller may not have read all the rows in the prior set
func (rc *Rowsq) NextResultSet() error {
if rc.requestDone {
return io.EOF
}
scan:
for {
// we should have a columns token in the channel if we aren't at the end
tok, err := rc.reader.nextToken()
if rc.reader.sess.logFlags&logDebug != 0 {
rc.reader.sess.log.Printf("NextResultSet() token type:%v", reflect.TypeOf(tok))
}

if err != nil {
return err
}
if tok == nil {
return io.EOF
}
switch tokdata := tok.(type) {
case []columnStruct:
rc.nextCols = tokdata
rc.inResultSet = true
break scan
case doneStruct:
if tokdata.Status&doneMore == 0 {
rc.nextCols = nil
rc.requestDone = true
break scan
}
}
}
rc.cols = rc.nextCols
rc.nextCols = nil
if rc.cols == nil {
return io.EOF
}
return nil
}