Skip to content

Commit

Permalink
Fix Blockhole implemention for e2e tests
Browse files Browse the repository at this point in the history
Based on Fu Wei's idea discussed in the issue [1], we employ the
blocking on L7 but without using external tools.

[Problem]

A peer will receive traffic from its peers and initiate connections to
its peers (via stream and pipeline), thus, the current mechanism of
only blocking traffic via proxy is incomplete, since the traffic
initiated from the peers to others will be "leaking", since `blackholeTX`
and `blackholeRX` only drop traffic coming in and out of the peer's
proxy.

[Solution - main idea]

Let's first agree on the naming of the existing proxy as a
"reverse proxy".

We will introduce a "forward proxy", which will be proxying all the
connection initiated from a peer to its peers.

The modified architecture will look something like this:
```
A -- B's SSL termination proxy - B's transparent proxy - B
     ^ newly introduced          ^ in the original codebase
```

By adding this forward proxy, we can block all traffic coming in and out
of a peer, without having to resort to external tools, such as iptables,
and the blocking of traffic is complete.

[Implementation]

The main subtasks are:
- Set up forward proxy
- Enable/disable forward proxy

[Test]
make gofail-enable && make build && make gofail-disable && \
go test -timeout 60s -run ^TestBlackholeByMockingPartitionLeader$ go.etcd.io/etcd/tests/v3/e2e -v -count=1

make gofail-enable && make build && make gofail-disable && \
go test -timeout 60s -run ^TestBlackholeByMockingPartitionFollower$ go.etcd.io/etcd/tests/v3/e2e -v -count=1

[Reference]
[1] Issue etcd-io#17737
[2] Supersedes PR (V1) https://github.com/henrybear327/etcd/tree/fix/e2e_blackhole
[3] Superseded PR (V2) etcd-io#17891
  • Loading branch information
henrybear327 committed May 5, 2024
1 parent 989ad8b commit 4e5a0ed
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 29 deletions.
10 changes: 9 additions & 1 deletion client/pkg/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"net"
"net/http"
"net/url"
"os"
"strings"
"time"
)
Expand All @@ -31,7 +33,13 @@ func NewTransport(info TLSInfo, dialtimeoutd time.Duration) (*http.Transport, er
}

t := &http.Transport{
Proxy: http.ProxyFromEnvironment,
// Proxy: http.ProxyFromEnvironment,
// FIXME: hack
// ref: https://maelvls.dev/go-ignores-proxy-localhost/
Proxy: func(req *http.Request) (*url.URL, error) {
HTTP_PROXY := os.Getenv("HTTP_PROXY")
return url.Parse(HTTP_PROXY)
},
DialContext: (&net.Dialer{
Timeout: dialtimeoutd,
// value taken from http.DefaultTransport
Expand Down
92 changes: 76 additions & 16 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package proxy

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -130,18 +132,21 @@ type Server interface {

// ServerConfig defines proxy server configuration.
type ServerConfig struct {
Logger *zap.Logger
From url.URL
To url.URL
TLSInfo transport.TLSInfo
DialTimeout time.Duration
BufferSize int
RetryInterval time.Duration
Logger *zap.Logger
From url.URL
To url.URL
TLSInfo transport.TLSInfo
DialTimeout time.Duration
BufferSize int
RetryInterval time.Duration
IsForwardProxy bool
}

type server struct {
lg *zap.Logger

isForwardProxy bool

from url.URL
fromPort int
to url.URL
Expand Down Expand Up @@ -194,6 +199,8 @@ func NewServer(cfg ServerConfig) Server {
s := &server{
lg: cfg.Logger,

isForwardProxy: cfg.IsForwardProxy,

from: cfg.From,
to: cfg.To,

Expand All @@ -216,10 +223,12 @@ func NewServer(cfg ServerConfig) Server {
if err == nil {
s.fromPort, _ = strconv.Atoi(fromPort)
}
var toPort string
_, toPort, err = net.SplitHostPort(cfg.To.Host)
if err == nil {
s.toPort, _ = strconv.Atoi(toPort)
if !s.isForwardProxy {
var toPort string
_, toPort, err = net.SplitHostPort(cfg.To.Host)
if err == nil {
s.toPort, _ = strconv.Atoi(toPort)
}
}

if s.dialTimeout == 0 {
Expand All @@ -239,8 +248,10 @@ func NewServer(cfg ServerConfig) Server {
if strings.HasPrefix(s.from.Scheme, "http") {
s.from.Scheme = "tcp"
}
if strings.HasPrefix(s.to.Scheme, "http") {
s.to.Scheme = "tcp"
if !s.isForwardProxy {
if strings.HasPrefix(s.to.Scheme, "http") {
s.to.Scheme = "tcp"
}
}

addr := fmt.Sprintf(":%d", s.fromPort)
Expand Down Expand Up @@ -273,7 +284,10 @@ func (s *server) From() string {
}

func (s *server) To() string {
return fmt.Sprintf("%s://%s", s.to.Scheme, s.to.Host)
if !s.isForwardProxy {
return fmt.Sprintf("%s://%s", s.to.Scheme, s.to.Host)
}
return ""
}

// TODO: implement packet reordering from multiple TCP connections
Expand Down Expand Up @@ -370,9 +384,55 @@ func (s *server) listenAndServe() {
}
continue
}
out, err = tp.DialContext(ctx, s.to.Scheme, s.to.Host)
if s.isForwardProxy {
// FIXME
out, err = tp.DialContext(ctx, "tcp", in.RemoteAddr().String())
} else {
out, err = tp.DialContext(ctx, s.to.Scheme, s.to.Host)
}
} else {
out, err = net.Dial(s.to.Scheme, s.to.Host)
if s.isForwardProxy {
// the request should always come with the CONNECT header field
buf := make([]byte, s.bufferSize)
nr1, err := in.Read(buf)
if err != nil {
if err == io.EOF {
return
}
}
data := buf[:nr1]

scanner := bufio.NewScanner(bufio.NewReader(bytes.NewBuffer(data)))
dest := ""
for scanner.Scan() {
str := scanner.Text()
if strings.Contains(str, "CONNECT") {
str = strings.ReplaceAll(str, "CONNECT ", "")
str = strings.ReplaceAll(str, " HTTP/1.1", "")
dest = str
}
}
if err := scanner.Err(); err != nil {
panic(fmt.Errorf(err.Error()))
}

if len(dest) == 0 {
panic(string(data))
}

// make sure a reply is sent
connect_response := &http.Response{
StatusCode: 200,
ProtoMajor: 1,
ProtoMinor: 1,
}
connect_response.Write(in)

// dial out normally
out, err = net.Dial("tcp", dest)
} else {
out, err = net.Dial(s.to.Scheme, s.to.Host)
}
}
if err != nil {
select {
Expand Down
6 changes: 6 additions & 0 deletions tests/e2e/blackhole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea
e2e.WithSnapshotCatchUpEntries(10),
e2e.WithIsPeerTLS(true),
e2e.WithPeerProxy(true),
e2e.WithGoFailEnabled(true),
)
require.NoError(t, err, "failed to start etcd cluster: %v", err)
defer func() {
Expand All @@ -59,9 +60,12 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea
partitionedMember := epc.Procs[mockPartitionNodeIndex]
// Mock partition
proxy := partitionedMember.PeerProxy()
forwardProxy := partitionedMember.PeerForwardProxy()
t.Logf("Blackholing traffic from and to member %q", partitionedMember.Config().Name)
proxy.BlackholeTx()
proxy.BlackholeRx()
forwardProxy.BlackholeTx()
forwardProxy.BlackholeRx()

t.Logf("Wait 5s for any open connections to expire")
time.Sleep(5 * time.Second)
Expand All @@ -81,6 +85,8 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea
t.Logf("Unblackholing traffic from and to member %q", partitionedMember.Config().Name)
proxy.UnblackholeTx()
proxy.UnblackholeRx()
forwardProxy.UnblackholeTx()
forwardProxy.UnblackholeRx()

leaderEPC = epc.Procs[epc.WaitLeader(t)]
time.Sleep(5 * time.Second)
Expand Down
23 changes: 20 additions & 3 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,12 +481,13 @@ func (cfg *EtcdProcessClusterConfig) SetInitialOrDiscovery(serverCfg *EtcdServer
func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i int) *EtcdServerProcessConfig {
var curls []string
var curl string
port := cfg.BasePort + 5*i
port := cfg.BasePort + 6*i
clientPort := port
peerPort := port + 1
peerPort := port + 1 // the port that the peer actually listens on
metricsPort := port + 2
peer2Port := port + 3
peer2Port := port + 3 // the port that the peer advertises
clientHTTPPort := port + 4
forwardProxyPort := port + 5 // the port of the forward proxy

if cfg.Client.ConnectionType == ClientTLSAndNonTLS {
curl = clientURL(cfg.ClientScheme(), clientPort, ClientNonTLS)
Expand All @@ -499,6 +500,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
peerListenURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
peerAdvertiseURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
var proxyCfg *proxy.ServerConfig
var forwardProxyCfg *proxy.ServerConfig
if cfg.PeerProxy {
if !cfg.IsPeerTLS {
panic("Can't use peer proxy without peer TLS as it can result in malformed packets")
Expand All @@ -509,6 +511,20 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
To: peerListenURL,
From: peerAdvertiseURL,
}

// setup forward proxy
forwardProxyURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", forwardProxyPort)}
forwardProxyCfg = &proxy.ServerConfig{
Logger: zap.NewNop(),
From: forwardProxyURL,
IsForwardProxy: true,
}

if cfg.EnvVars == nil {
cfg.EnvVars = make(map[string]string)
}
cfg.EnvVars["HTTP_PROXY"] = fmt.Sprintf("http://127.0.0.1:%d", forwardProxyPort)
cfg.EnvVars["HTTPS_PROXY"] = fmt.Sprintf("http://127.0.0.1:%d", forwardProxyPort)
}

name := fmt.Sprintf("%s-test-%d", testNameCleanRegex.ReplaceAllString(tb.Name(), ""), i)
Expand Down Expand Up @@ -631,6 +647,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
GoFailPort: gofailPort,
GoFailClientTimeout: cfg.GoFailClientTimeout,
Proxy: proxyCfg,
ForwardProxy: forwardProxyCfg,
LazyFSEnabled: cfg.LazyFSEnabled,
}
}
Expand Down
34 changes: 28 additions & 6 deletions tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type EtcdProcess interface {
Close() error
Config() *EtcdServerProcessConfig
PeerProxy() proxy.Server
PeerForwardProxy() proxy.Server
Failpoints() *BinaryFailpoints
LazyFS() *LazyFS
Logs() LogsExpect
Expand All @@ -69,12 +70,13 @@ type LogsExpect interface {
}

type EtcdServerProcess struct {
cfg *EtcdServerProcessConfig
proc *expect.ExpectProcess
proxy proxy.Server
lazyfs *LazyFS
failpoints *BinaryFailpoints
donec chan struct{} // closed when Interact() terminates
cfg *EtcdServerProcessConfig
proc *expect.ExpectProcess
proxy proxy.Server
forwardProxy proxy.Server
lazyfs *LazyFS
failpoints *BinaryFailpoints
donec chan struct{} // closed when Interact() terminates
}

type EtcdServerProcessConfig struct {
Expand Down Expand Up @@ -102,6 +104,7 @@ type EtcdServerProcessConfig struct {

LazyFSEnabled bool
Proxy *proxy.ServerConfig
ForwardProxy *proxy.ServerConfig
}

func NewEtcdServerProcess(t testing.TB, cfg *EtcdServerProcessConfig) (*EtcdServerProcess, error) {
Expand Down Expand Up @@ -159,6 +162,14 @@ func (ep *EtcdServerProcess) Start(ctx context.Context) error {
case err := <-ep.proxy.Error():
return err
}

ep.cfg.lg.Info("starting forward proxy...", zap.String("name", ep.cfg.Name), zap.String("from", ep.cfg.ForwardProxy.From.String()), zap.String("to", ep.cfg.ForwardProxy.To.String()))
ep.forwardProxy = proxy.NewServer(*ep.cfg.ForwardProxy)
select {
case <-ep.forwardProxy.Ready():
case err := <-ep.forwardProxy.Error():
return err
}
}
if ep.lazyfs != nil {
ep.cfg.lg.Info("starting lazyfs...", zap.String("name", ep.cfg.Name))
Expand Down Expand Up @@ -228,6 +239,13 @@ func (ep *EtcdServerProcess) Stop() (err error) {
if err != nil {
return err
}

ep.cfg.lg.Info("stopping forward proxy...", zap.String("name", ep.cfg.Name))
err = ep.forwardProxy.Close()
ep.forwardProxy = nil
if err != nil {
return err
}
}
if ep.lazyfs != nil {
ep.cfg.lg.Info("stopping lazyfs...", zap.String("name", ep.cfg.Name))
Expand Down Expand Up @@ -330,6 +348,10 @@ func (ep *EtcdServerProcess) PeerProxy() proxy.Server {
return ep.proxy
}

func (ep *EtcdServerProcess) PeerForwardProxy() proxy.Server {
return ep.forwardProxy
}

func (ep *EtcdServerProcess) LazyFS() *LazyFS {
return ep.lazyfs
}
Expand Down
13 changes: 10 additions & 3 deletions tests/robustness/failpoint/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,8 @@ func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, proces

func Blackhole(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, shouldWaitTillSnapshot bool) error {
proxy := member.PeerProxy()
forwardProxy := member.PeerForwardProxy()

// Blackholing will cause peers to not be able to use streamWriters registered with member
// but peer traffic is still possible because member has 'pipeline' with peers
// TODO: find a way to stop all traffic
t.Logf("Blackholing traffic from and to member %q", member.Config().Name)
proxy.BlackholeTx()
proxy.BlackholeRx()
Expand All @@ -75,6 +73,15 @@ func Blackhole(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *
proxy.UnblackholeTx()
proxy.UnblackholeRx()
}()

forwardProxy.BlackholeTx()
forwardProxy.BlackholeRx()
defer func() {
t.Logf("Forward traffic restored from and to member %q", member.Config().Name)
forwardProxy.UnblackholeTx()
forwardProxy.UnblackholeRx()
}()

if shouldWaitTillSnapshot {
return waitTillSnapshot(ctx, t, clus, member)
}
Expand Down

0 comments on commit 4e5a0ed

Please sign in to comment.