Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): Add a DirectPath fallback integration test #3487

Merged
merged 2 commits into from Jan 7, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
110 changes: 110 additions & 0 deletions spanner/integration_test.go
Expand Up @@ -25,6 +25,7 @@ import (
"math"
"math/big"
"os"
"os/exec"
"reflect"
"regexp"
"strings"
Expand Down Expand Up @@ -168,12 +169,22 @@ var (
}

validInstancePattern = regexp.MustCompile("^projects/(?P<project>[^/]+)/instances/(?P<instance>[^/]+)$")

blackholeDpv6Cmd string
blackholeDpv4Cmd string
allowDpv6Cmd string
allowDpv4Cmd string
)

func init() {
flag.BoolVar(&dpConfig.attemptDirectPath, "it.attempt-directpath", false, "DirectPath integration test flag")
flag.BoolVar(&dpConfig.directPathIPv4Only, "it.directpath-ipv4-only", false, "Run DirectPath on a IPv4-only VM")
peerInfo = &peer.Peer{}
// Use sysctl or iptables to blackhole DirectPath IP for fallback tests.
flag.StringVar(&blackholeDpv6Cmd, "it.blackhole-dpv6-cmd", "", "Command to make LB and backend addresses blackholed over dpv6")
flag.StringVar(&blackholeDpv4Cmd, "it.blackhole-dpv4-cmd", "", "Command to make LB and backend addresses blackholed over dpv4")
flag.StringVar(&allowDpv6Cmd, "it.allow-dpv6-cmd", "", "Command to make LB and backend addresses allowed over dpv6")
flag.StringVar(&allowDpv4Cmd, "it.allow-dpv4-cmd", "", "Command to make LB and backend addresses allowed over dpv4")
}

type directPathTestConfig struct {
Expand Down Expand Up @@ -2957,6 +2968,52 @@ func TestIntegration_StartBackupOperation(t *testing.T) {
}
}

// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed.
func TestIntegration_DirectPathFallback(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Set up testing environment.
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements)
defer cleanup()

if !dpConfig.attemptDirectPath {
return
}
if len(blackholeDpv6Cmd) == 0 {
t.Fatal("-it.blackhole-dpv6-cmd unset")
}
if len(blackholeDpv4Cmd) == 0 {
t.Fatal("-it.blackhole-dpv4-cmd unset")
}
if len(allowDpv6Cmd) == 0 {
t.Fatal("-it.allowdpv6-cmd unset")
}
if len(allowDpv4Cmd) == 0 {
t.Fatal("-it.allowdpv4-cmd unset")
}

// Precondition: wait for DirectPath to connect.
countEnough := examineTraffic(ctx, client /*blackholeDP = */, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: /*blackholeDP = */ should be added after the ,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that gofmt will automatically move /*blackholeDP = */ before the ,, do you want me to remove the comment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, nevermind, maybe that is the standard.

if !countEnough {
t.Fatalf("Failed to observe RPCs over DirectPath")
}

// Enable the blackhole, which will prevent communication with grpclb and thus DirectPath.
blackholeOrAllowDirectPath(t /*blackholeDP = */, true)
countEnough = examineTraffic(ctx, client /*blackholeDP = */, true)
if !countEnough {
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
}

// Disable the blackhole, and client should use DirectPath again.
blackholeOrAllowDirectPath(t /*blackholeDP = */, false)
countEnough = examineTraffic(ctx, client /*blackholeDP = */, false)
if !countEnough {
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
}
}

// Prepare initializes Cloud Spanner testing DB and clients.
func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) {
if databaseAdmin == nil {
Expand Down Expand Up @@ -3303,3 +3360,56 @@ func isDirectPathRemoteAddress() (_ string, _ bool) {
// DirectPath ipv6 can use either ipv4 or ipv6 traffic.
return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix)
}

// examineTraffic counts RPCs use DirectPath or CFE traffic.
func examineTraffic(ctx context.Context, client *Client, expectDP bool) bool {
var numCount uint64
const (
numRPCsToSend = 20
minCompleteRPC = 40
)
countEnough := false
start := time.Now()
for !countEnough && time.Since(start) < 2*time.Minute {
for i := 0; i < numRPCsToSend; i++ {
_, _ = readAllTestTable(client.Single().Read(ctx, testTable, Key{"k1"}, testTableColumns))
if _, useDP := isDirectPathRemoteAddress(); useDP != expectDP {
numCount++
}
time.Sleep(100 * time.Millisecond)
countEnough = numCount >= minCompleteRPC
}
}
return countEnough
}

func blackholeOrAllowDirectPath(t *testing.T, blackholeDP bool) {
if dpConfig.directPathIPv4Only {
if blackholeDP {
cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
} else {
cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
}
return
}
// DirectPath supports both ipv4 and ipv6
if blackholeDP {
cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd)
out, _ = cmdRes.CombinedOutput()
t.Logf(string(out))
} else {
cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
cmdRes = exec.Command("bash", "-c", allowDpv6Cmd)
out, _ = cmdRes.CombinedOutput()
t.Logf(string(out))
}
}