Skip to content

Commit

Permalink
Merge pull request #24 from cockroachdb/nvanbenschoten/tcp
Browse files Browse the repository at this point in the history
Expose TCP/IP URL of TestServers
  • Loading branch information
nvanbenschoten committed Jan 31, 2017
2 parents ba57380 + 8bfcdb0 commit 6fd53f6
Showing 1 changed file with 104 additions and 41 deletions.
145 changes: 104 additions & 41 deletions testserver/testserver.go
Expand Up @@ -55,10 +55,12 @@
package testserver

import (
"bytes"
"database/sql"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net/url"
Expand All @@ -75,32 +77,33 @@ import (
)

var (
sqlURLRegexp = regexp.MustCompile("sql:\\s+(postgresql:.+)\n")
sqlURLRegexp = regexp.MustCompile(`sql:\s+(postgresql:.+)`)
customBinary = flag.String("cockroach-binary", "", "Use specified cockroach binary")
)

const (
stateNew = iota
stateRunning = iota
stateStopped = iota
stateFailed = iota

socketPort = 26257
socketFileBase = ".s.PGSQL"
stateNew = 1 + iota
stateRunning
stateStopped
stateFailed
)

// TestServer is a helper to run a real cockroach node.
type TestServer struct {
mu sync.RWMutex
state int
baseDir string
pgURL *url.URL
cmd *exec.Cmd
args []string
stdout string
stderr string
stdoutBuf logWriter
stderrBuf logWriter
mu sync.RWMutex
state int
baseDir string
pgURL struct {
set chan struct{}
u *url.URL
}
cmd *exec.Cmd
args []string
stdout string
stderr string
stdoutIntr *patternInterceptor
stdoutBuf logWriter
stderrBuf logWriter
}

// NewDBForTest creates a new CockroachDB TestServer instance and
Expand Down Expand Up @@ -179,35 +182,25 @@ func NewTestServer() (*TestServer, error) {
return nil, fmt.Errorf("could not create logs directory: %s: %s", logDir, err)
}

options := url.Values{
"host": []string{baseDir},
}
pgurl := &url.URL{
Scheme: "postgres",
User: url.User("root"),
Host: fmt.Sprintf(":%d", socketPort),
RawQuery: options.Encode(),
}
socketPath := filepath.Join(baseDir, fmt.Sprintf("%s.%d", socketFileBase, socketPort))

args := []string{
cockroachBinary,
"start",
"--logtostderr",
"--insecure",
"--host=localhost",
"--port=0",
"--http-port=0",
"--socket=" + socketPath,
"--store=" + baseDir,
}

ts := &TestServer{
state: stateNew,
baseDir: baseDir,
pgURL: pgurl,
args: args,
stdout: filepath.Join(logDir, "cockroach.stdout"),
stderr: filepath.Join(logDir, "cockroach.stderr"),
}
ts.pgURL.set = make(chan struct{})
return ts, nil
}

Expand All @@ -223,16 +216,17 @@ func (ts *TestServer) Stderr() string {

// PGURL returns the postgres connection URL to reach the started
// cockroach node.
// It loops until the expected unix socket file exists.
// This does not timeout, relying instead on test timeouts.
//
// It blocks until the network URL is determined and does not timeout,
// relying instead on test timeouts.
func (ts *TestServer) PGURL() *url.URL {
socketPath := filepath.Join(ts.baseDir, fmt.Sprintf("%s.%d", socketFileBase, socketPort))
for {
if _, err := os.Stat(socketPath); err == nil {
return ts.pgURL
}
time.Sleep(time.Millisecond * 10)
}
<-ts.pgURL.set
return ts.pgURL.u
}

func (ts *TestServer) setPGURL(u *url.URL) {
ts.pgURL.u = u
close(ts.pgURL.set)
}

// WaitForInit retries until a connection is successfully established.
Expand Down Expand Up @@ -270,7 +264,18 @@ func (ts *TestServer) Start() error {
}
ts.stdoutBuf = wr
}
ts.cmd.Stdout = ts.stdoutBuf

pi := newPatternInterceptor(ts.stdoutBuf, sqlURLRegexp, func(match [][]byte) error {
u, err := url.Parse(string(match[1]))
if err != nil {
return fmt.Errorf("failure to parse SQL URL: %v", err)
}
ts.setPGURL(u)
ts.stdoutIntr.Disable()
return nil
})
ts.stdoutIntr = pi
ts.cmd.Stdout = pi

if len(ts.stderr) > 0 {
wr, err := newFileLogWriter(ts.stderr)
Expand Down Expand Up @@ -350,6 +355,64 @@ func (ts *TestServer) Stop() {
_ = os.RemoveAll(ts.baseDir)
}

// patternInterceptor wraps an io.Writer and attempts to match the data stream
// to its regular expression pattern, calling the provided callback for all matches.
type patternInterceptor struct {
w io.Writer
re *regexp.Regexp
onMatch func([][]byte) error

buf []byte
disabled bool
}

func newPatternInterceptor(w io.Writer, re *regexp.Regexp, onMatch func([][]byte) error) *patternInterceptor {
return &patternInterceptor{
re: re,
onMatch: onMatch,
}
}

func (pi *patternInterceptor) Write(p []byte) (n int, err error) {
if !pi.disabled {
// Search each full line in p for matches. Buffer partial lines.
for sp := p; ; {
i := bytes.IndexByte(sp, '\n')
if i == -1 {
pi.buf = append(pi.buf, sp...)
break
}

l := sp[:i]
if len(pi.buf) > 0 {
l = append(pi.buf, l...)
}

if matches := pi.re.FindAllSubmatch(l, -1); matches != nil {
for _, match := range matches {
if err := pi.onMatch(match); err != nil {
return 0, err
}
}
}

sp = sp[i+1:]
pi.buf = pi.buf[:0]
}
}
if pi.w == nil {
return len(p), nil
}
return pi.w.Write(p)
}

// Disable disables the patternInterceptor from attempting to match the data
// stream to its pattern, allowing writes to pass through without inspection.
func (pi *patternInterceptor) Disable() {
pi.disabled = true
pi.buf = nil
}

type logWriter interface {
Write(p []byte) (n int, err error)
String() string
Expand Down

0 comments on commit 6fd53f6

Please sign in to comment.