Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement sqlexp Messages for Query/QueryContext #690

Merged
merged 15 commits into from Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -4,5 +4,6 @@ go 1.11

require (
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe
github.com/golang-sql/sqlexp v0.0.0-20170517235910-f1bb20e5a188
golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c
)
2 changes: 2 additions & 0 deletions go.sum
@@ -1,5 +1,7 @@
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
github.com/golang-sql/sqlexp v0.0.0-20170517235910-f1bb20e5a188 h1:+eHOFJl1BaXrQxKX+T06f78590z4qA2ZzBTqahsKSE4=
github.com/golang-sql/sqlexp v0.0.0-20170517235910-f1bb20e5a188/go.mod h1:vXjM/+wXQnTPR4KqTKDgJukSZ6amVRtWMPEjE6sQoK8=
golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c h1:Vj5n4GlwjmQteupaxJ9+0FNOmBrHfq7vN4btdGoDZgI=
golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
173 changes: 171 additions & 2 deletions mssql.go
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/denisenkom/go-mssqldb/internal/querytext"
"github.com/denisenkom/go-mssqldb/msdsn"
"github.com/golang-sql/sqlexp"
)

// ReturnStatus may be used to return the return value from a proc.
Expand Down Expand Up @@ -179,6 +180,7 @@ type Conn struct {
type outputs struct {
params map[string]interface{}
returnStatus *ReturnStatus
msgq *sqlexp.ReturnMessage
}

// IsValid satisfies the driver.Validator interface.
Expand Down Expand Up @@ -637,6 +639,11 @@ func (s *Stmt) processQueryResponse(ctx context.Context) (res driver.Rows, err e
ctx, cancel := context.WithCancel(ctx)
reader := startReading(s.c.sess, ctx, s.c.outs)
s.c.clearOuts()
// For apps using a message queue, return right away and let Rowsq do all the work
if reader.outs.msgq != nil {
res = &Rowsq{stmt: s, reader: reader, cols: nil, cancel: cancel}
return res, nil
}
// process metadata
var cols []columnStruct
loop:
Expand Down Expand Up @@ -708,13 +715,13 @@ func (s *Stmt) processExec(ctx context.Context) (res driver.Result, err error) {
return &Result{s.c, reader.rowCount}, nil
}

// Rows represents the non-experimental data/sql model for Query and QueryContext
type Rows struct {
stmt *Stmt
cols []columnStruct
reader *tokenProcessor
nextCols []columnStruct

cancel func()
cancel func()
}

func (rc *Rows) Close() error {
Expand Down Expand Up @@ -742,6 +749,7 @@ func (rc *Rows) Close() error {
}

func (rc *Rows) Columns() (res []string) {

res = make([]string, len(rc.cols))
for i, col := range rc.cols {
res[i] = col.ColName
Expand All @@ -763,6 +771,7 @@ func (rc *Rows) Next(dest []driver.Value) error {
return io.EOF
} else {
switch tokdata := tok.(type) {
// processQueryResponse may have delegated all the token reading to us
case []columnStruct:
rc.nextCols = tokdata
return io.EOF
Expand Down Expand Up @@ -1028,3 +1037,163 @@ func (s *Stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (drive
}
return s.exec(ctx, list)
}

// Rowsq implements the sqlexp messages model for Query and QueryContext
// Theory: We could also implement the non-experimental model this way
type Rowsq struct {
stmt *Stmt
cols []columnStruct
reader *tokenProcessor
nextCols []columnStruct
cancel func()
requestDone bool
inResultSet bool
}

func (rc *Rowsq) Close() error {
rc.cancel()

for {
tok, err := rc.reader.nextToken()
if err == nil {
if tok == nil {
return nil
} else {
// continue consuming tokens
continue
}
} else {
if err == rc.reader.ctx.Err() {
return nil
} else {
return err
}
}
}
}

// data/sql calls Columns during the app's call to Next
func (rc *Rowsq) Columns() (res []string) {
if rc.cols == nil {
scan:
for {
tok, err := rc.reader.nextToken()
if err == nil {
if rc.reader.sess.logFlags&logDebug != 0 {
rc.reader.sess.log.Printf("Columns() token type:%v", reflect.TypeOf(tok))
}
if tok == nil {
return []string{}
} else {
switch tokdata := tok.(type) {
case []columnStruct:
rc.cols = tokdata
rc.inResultSet = true
break scan
}
}
}
}
}
res = make([]string, len(rc.cols))
for i, col := range rc.cols {
res[i] = col.ColName
}
return
}

func (rc *Rowsq) Next(dest []driver.Value) error {
if !rc.stmt.c.connectionGood {
return driver.ErrBadConn
}
for {
tok, err := rc.reader.nextToken()
if rc.reader.sess.logFlags&logDebug != 0 {
shueybubbles marked this conversation as resolved.
Show resolved Hide resolved
rc.reader.sess.log.Printf("Next() token type:%v", reflect.TypeOf(tok))
}
if err == nil {
if tok == nil {
return io.EOF
} else {
switch tokdata := tok.(type) {
case []interface{}:
for i := range dest {
dest[i] = tokdata[i]
}
return nil
case doneStruct:
if tokdata.Status&doneMore == 0 {
rc.requestDone = true
}
if tokdata.isError() {
e := rc.stmt.c.checkBadConn(tokdata.getError(), false)
switch e.(type) {
case Error:
// Ignore non-fatal server errors
shueybubbles marked this conversation as resolved.
Show resolved Hide resolved
default:
return e
}
}
if rc.inResultSet {
rc.inResultSet = false
return io.EOF
}
case ReturnStatus:
if rc.reader.outs.returnStatus != nil {
*rc.reader.outs.returnStatus = tokdata
}
}
}

} else {
return rc.stmt.c.checkBadConn(err, false)
}
}
}

// In Message Queue mode, we always claim another resultset could be on the way
// to avoid Rows being closed prematurely
func (rc *Rowsq) HasNextResultSet() bool {
return !rc.requestDone
}

// Scans to the next set of columns in the stream
// Note that the caller may not have read all the rows in the prior set
func (rc *Rowsq) NextResultSet() error {
if rc.requestDone {
return io.EOF
}
scan:
for {
// we should have a columns token in the channel if we aren't at the end
tok, err := rc.reader.nextToken()
if rc.reader.sess.logFlags&logDebug != 0 {
rc.reader.sess.log.Printf("NextResultSet() token type:%v", reflect.TypeOf(tok))
}

if err != nil {
return err
}
if tok == nil {
return io.EOF
}
switch tokdata := tok.(type) {
case []columnStruct:
rc.nextCols = tokdata
rc.inResultSet = true
break scan
case doneStruct:
if tokdata.Status&doneMore == 0 {
rc.nextCols = nil
rc.requestDone = true
break scan
}
}
}
rc.cols = rc.nextCols
rc.nextCols = nil
if rc.cols == nil {
return io.EOF
}
return nil
}
6 changes: 6 additions & 0 deletions mssql_go19.go
Expand Up @@ -10,6 +10,8 @@ import (
"reflect"
"time"

"github.com/golang-sql/sqlexp"

// "github.com/cockroachdb/apd"
"github.com/golang-sql/civil"
)
Expand Down Expand Up @@ -114,6 +116,10 @@ func (c *Conn) CheckNamedValue(nv *driver.NamedValue) error {
return driver.ErrRemoveArgument
case TVP:
return nil
case *sqlexp.ReturnMessage:
sqlexp.ReturnMessageInit(v)
c.outs.msgq = v
return driver.ErrRemoveArgument
default:
var err error
nv.Value, err = convertInputParameter(nv.Value)
Expand Down
54 changes: 53 additions & 1 deletion mssql_perf_test.go
Expand Up @@ -210,6 +210,58 @@ func BenchmarkSelectParser(b *testing.B) {
if err != nil {
b.Fatal(err)
}
processSingleResponse(sess, ch, outputs{})
processSingleResponse(context.Background(), sess, ch, outputs{})
}
}

func BenchmarkMessageQueue(b *testing.B) {
conn := open(b)
defer conn.Close()
b.Run("BlockingQuery", func(b *testing.B) {
var errs, results float64
for i := 0; i < b.N; i++ {
r, err := conn.Query(mixedQuery)
if err != nil {
b.Fatal(err.Error())
}
active := true
first := true
for active {
active = r.Next()
if active && first {
results++
}
first = false
if !active {
if r.Err() != nil {
b.Logf("r.Err:%v", r.Err())
errs++
}
active = r.NextResultSet()
if active {
first = true
}
}
}
r.Close()
}
b.ReportMetric(float64(0), "msgs/op")
b.ReportMetric(errs/float64(b.N), "errors/op")
b.ReportMetric(results/float64(b.N), "results/op")
})
b.Run("NonblockingQuery", func(b *testing.B) {
var msgs, errs, results, rowcounts float64
for i := 0; i < b.N; i++ {
m, e, r, rc := testMixedQuery(conn, b)
msgs += float64(m)
errs += float64(e)
results += float64(r)
rowcounts += float64(rc)

}
b.ReportMetric(msgs/float64(b.N), "msgs/op")
b.ReportMetric(errs/float64(b.N), "errors/op")
b.ReportMetric(results/float64(b.N), "results/op")
b.ReportMetric(rowcounts/float64(b.N), "rowcounts/op")
})
}