Skip to content

Commit

Permalink
Fix Blackhole implemention for e2e tests
Browse files Browse the repository at this point in the history
Thanks to Fu Wei for the input regarding etcd-io#17938.

[Problem]

A peer will
(a) receive traffic from its peers
(b) initiate connections to its peers (via stream and pipeline).

Thus, the current mechanism of only blocking peer traffic via the peer's existing proxy is insufficient, since only scenario (a) is handled, and scenario (b) is not blocked at all.

[Main idea]

We introduce 1 shared HTTP proxy for all peers. All peers will be proxying all the connections through it.

The modified architecture will look something like this:
```
A -- shared HTTP proxy ----- B
     ^ newly introduced
```

By adding this HTTP proxy, we can block all in and out traffic that is initiated from a peer to others, without having to resort to external tools, such as iptables. It's verified that the blocking of traffic is complete, compared to previous solutions [2][3].

[Implementation]

The main subtasks are
- set up an environment variable `FORWARD_PROXY`, because go will not parse HTTP_PROXY and HTTPS_PROXY that is using localhost or 127.0.0.1, regardless if the port is present or not
- implement the shared HTTP proxy by extending the existing proxy server code (we need to be able to identify the source sender)
- remove existing proxy setup (the per-peer proxy)
- implement enable/disable of the HTTP proxy in the e2e test

[Testing]

- `make gofail-enable && make build && make gofail-disable && \
go test -timeout 60s -run ^TestBlackholeByMockingPartitionLeader$ go.etcd.io/etcd/tests/v3/e2e -v -count=1`
- `make gofail-enable && make build && make gofail-disable && \
go test -timeout 60s -run ^TestBlackholeByMockingPartitionFollower$ go.etcd.io/etcd/tests/v3/e2e -v -count=1`

[References]
[1] Tracking issue etcd-io#17737
[2] PR (V1) https://github.com/henrybear327/etcd/tree/fix/e2e_blackhole
[3] PR (V2) etcd-io#17891
[4] PR (V3) etcd-io#17938

Signed-off-by: Chun-Hung Tseng <henrybear327@gmail.com>
  • Loading branch information
henrybear327 committed May 11, 2024
1 parent a8e6d48 commit 9affb87
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 25 deletions.
99 changes: 96 additions & 3 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bufio"
"bytes"
"context"
"encoding/base64"
"fmt"
"io"
mrand "math/rand"
Expand Down Expand Up @@ -116,6 +117,11 @@ type Server interface {
// UnblackholeRx removes blackhole operation on "receiving".
UnblackholeRx()

// BlackholePeer drops all traffic coming in and out of the specified peer
BlackholePeer(url.URL)
// UnblackholePeer resumes all traffic coming in and out of the specified peer
UnblackholePeer(url.URL)

// PauseTx stops "forwarding" packets; "outgoing" traffic blocks.
PauseTx()
// UnpauseTx removes "forwarding" pause operation.
Expand Down Expand Up @@ -191,6 +197,11 @@ type server struct {

latencyRxMu sync.RWMutex
latencyRx time.Duration

blackholeURL map[string]struct{}
blackholeURLMu sync.RWMutex
connectionMap map[string]string
connectionMapMu sync.RWMutex
}

// NewServer returns a proxy implementation with no iptables/tc dependencies.
Expand All @@ -217,6 +228,9 @@ func NewServer(cfg ServerConfig) Server {
pauseAcceptc: make(chan struct{}),
pauseTxc: make(chan struct{}),
pauseRxc: make(chan struct{}),

blackholeURL: make(map[string]struct{}),
connectionMap: make(map[string]string),
}

_, fromPort, err := net.SplitHostPort(cfg.From.Host)
Expand Down Expand Up @@ -395,6 +409,28 @@ func (s *server) listenAndServe() {
}
connectResponse.Write(in)

// maintain connection mapping
// we use the Proxy-Authorization field to identify the sender
// we map the randomly selected port to the port that the peer is listening on
proxyAuthString := req.Header.Get("Proxy-Authorization")
proxyAuthString = strings.ReplaceAll(proxyAuthString, "Basic ", "")
payload, err := base64.StdEncoding.DecodeString(proxyAuthString)
if err != nil {
panic("Invalid Proxy-Authorization data")
}
payloadSplit := strings.Split(string(payload), ":")
if len(payloadSplit) != 2 {
panic("Wrong Proxy-Authorization format")
}
sourcePort := payloadSplit[0]
s.connectionMapMu.Lock()
_, extractedPort, err := net.SplitHostPort(in.RemoteAddr().String())
if err != nil {
panic("Failed to parse port")
}
s.connectionMap[extractedPort] = sourcePort
s.connectionMapMu.Unlock()

return req.URL.Host
}

Expand Down Expand Up @@ -454,6 +490,16 @@ func (s *server) listenAndServe() {
s.transmit(out, in)
out.Close()
in.Close()

s.connectionMapMu.Lock()
_, extractedPort, err := net.SplitHostPort(in.RemoteAddr().String())
if err != nil {
panic("Failed to parse port")
}
if sourcePeer, ok := s.connectionMap[extractedPort]; ok {
delete(s.connectionMap, sourcePeer)
}
s.connectionMapMu.Unlock()
}()
go func() {
defer s.closeWg.Done()
Expand All @@ -465,11 +511,11 @@ func (s *server) listenAndServe() {
}
}

func (s *server) transmit(dst io.Writer, src io.Reader) {
func (s *server) transmit(dst, src net.Conn) {
s.ioCopy(dst, src, proxyTx)
}

func (s *server) receive(dst io.Writer, src io.Reader) {
func (s *server) receive(dst, src net.Conn) {
s.ioCopy(dst, src, proxyRx)
}

Expand All @@ -480,7 +526,7 @@ const (
proxyRx
)

func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) {
func (s *server) ioCopy(dst, src net.Conn, ptype proxyType) {
buf := make([]byte, s.bufferSize)
for {
nr1, err := src.Read(buf)
Expand Down Expand Up @@ -530,6 +576,41 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) {
default:
panic("unknown proxy type")
}

// blackhole by URL
switch ptype {
case proxyTx:
s.connectionMapMu.RLock()
_, extractedPort, err := net.SplitHostPort(src.RemoteAddr().String())
if err != nil {
panic("Failed to parse port")
}
if sourcePeer, ok := s.connectionMap[extractedPort]; ok {
s.blackholeURLMu.RLock()
if _, ok := s.blackholeURL[sourcePeer]; ok {
data = nil
}
s.blackholeURLMu.RUnlock()
}
s.connectionMapMu.RUnlock()
case proxyRx:
s.connectionMapMu.RLock()
_, extractedPort, err := net.SplitHostPort(dst.RemoteAddr().String())
if err != nil {
panic("Failed to parse port")
}
if sourcePeer, ok := s.connectionMap[extractedPort]; ok {
s.blackholeURLMu.RLock()
if _, ok := s.blackholeURL[sourcePeer]; ok {
data = nil
}
s.blackholeURLMu.RUnlock()
}
s.connectionMapMu.RUnlock()
default:
panic("unknown proxy type")
}

nr2 := len(data)
switch ptype {
case proxyTx:
Expand Down Expand Up @@ -954,6 +1035,18 @@ func (s *server) UnblackholeRx() {
)
}

func (s *server) BlackholePeer(peerURL url.URL) {
s.blackholeURLMu.Lock()
s.blackholeURL[peerURL.Port()] = struct{}{}
s.blackholeURLMu.Unlock()
}

func (s *server) UnblackholePeer(peerURL url.URL) {
s.blackholeURLMu.Lock()
delete(s.blackholeURL, peerURL.Port())
s.blackholeURLMu.Unlock()
}

func (s *server) PauseTx() {
s.pauseTxMu.Lock()
s.pauseTxc = make(chan struct{})
Expand Down
14 changes: 3 additions & 11 deletions tests/e2e/blackhole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea
e2e.WithSnapshotCount(10),
e2e.WithSnapshotCatchUpEntries(10),
e2e.WithIsPeerTLS(true),
e2e.WithPeerProxy(true),
e2e.WithSingleHTTPProxy(true),
)
require.NoError(t, err, "failed to start etcd cluster: %v", err)
defer func() {
Expand All @@ -58,13 +58,8 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea
}
partitionedMember := epc.Procs[mockPartitionNodeIndex]
// Mock partition
proxy := partitionedMember.PeerProxy()
httpProxy := partitionedMember.PeerHTTPProxy()
t.Logf("Blackholing traffic from and to member %q", partitionedMember.Config().Name)
proxy.BlackholeTx()
proxy.BlackholeRx()
httpProxy.BlackholeTx()
httpProxy.BlackholeRx()
epc.SingleHTTPProxyInstance.BlackholePeer(partitionedMember.Config().PeerURL)

t.Logf("Wait 5s for any open connections to expire")
time.Sleep(5 * time.Second)
Expand All @@ -82,10 +77,7 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea
// Wait for some time to restore the network
time.Sleep(1 * time.Second)
t.Logf("Unblackholing traffic from and to member %q", partitionedMember.Config().Name)
proxy.UnblackholeTx()
proxy.UnblackholeRx()
httpProxy.UnblackholeTx()
httpProxy.UnblackholeRx()
epc.SingleHTTPProxyInstance.UnblackholePeer(partitionedMember.Config().PeerURL)

leaderEPC = epc.Procs[epc.WaitLeader(t)]
time.Sleep(5 * time.Second)
Expand Down
49 changes: 49 additions & 0 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ type EtcdProcessCluster struct {
Cfg *EtcdProcessClusterConfig
Procs []EtcdProcess
nextSeq int // sequence number of the next etcd process (if it will be required)

SingleHTTPProxyInstance proxy.Server
}

type EtcdProcessClusterConfig struct {
Expand All @@ -144,6 +146,9 @@ type EtcdProcessClusterConfig struct {
LazyFSEnabled bool
PeerProxy bool

SingleHTTPProxy bool
SingleHTTPProxyPort int

// Process config

EnvVars map[string]string
Expand Down Expand Up @@ -184,6 +189,8 @@ func DefaultConfig() *EtcdProcessClusterConfig {
CN: true,

ServerConfig: *embed.NewConfig(),

SingleHTTPProxyPort: 55688,
}
cfg.ServerConfig.InitialClusterToken = "new"
return cfg
Expand Down Expand Up @@ -371,6 +378,10 @@ func WithPeerProxy(enabled bool) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.PeerProxy = enabled }
}

func WithSingleHTTPProxy(enabled bool) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.SingleHTTPProxy = enabled }
}

// NewEtcdProcessCluster launches a new cluster from etcd processes, returning
// a new EtcdProcessCluster once all nodes are ready to accept client requests.
func NewEtcdProcessCluster(ctx context.Context, t testing.TB, opts ...EPClusterOption) (*EtcdProcessCluster, error) {
Expand Down Expand Up @@ -421,6 +432,20 @@ func InitEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdP

// StartEtcdProcessCluster launches a new cluster from etcd processes.
func StartEtcdProcessCluster(ctx context.Context, t testing.TB, epc *EtcdProcessCluster, cfg *EtcdProcessClusterConfig) (*EtcdProcessCluster, error) {
if cfg.SingleHTTPProxy && epc.SingleHTTPProxyInstance == nil {
cfg.Logger.Info("starting single HTTP proxy...", zap.String("name", cfg.ServerConfig.Name))
epc.SingleHTTPProxyInstance = proxy.NewServer(proxy.ServerConfig{
Logger: zap.NewNop(),
From: url.URL{Scheme: "tcp", Host: fmt.Sprintf("localhost:%d", cfg.SingleHTTPProxyPort)},
IsHTTPProxy: true,
})
select {
case <-epc.SingleHTTPProxyInstance.Ready():
case err := <-epc.SingleHTTPProxyInstance.Error():
return nil, err
}
}

if cfg.RollingStart {
if err := epc.RollingStart(ctx); err != nil {
return nil, fmt.Errorf("cannot rolling-start: %v", err)
Expand Down Expand Up @@ -501,6 +526,11 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
peerAdvertiseURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
var proxyCfg *proxy.ServerConfig
var httpProxyCfg *proxy.ServerConfig

if cfg.PeerProxy && cfg.SingleHTTPProxy {
panic("Can't only use PeerProxy and SingleHTTPProxy at the same time")
}

if cfg.PeerProxy {
if !cfg.IsPeerTLS {
panic("Can't use peer proxy without peer TLS as it can result in malformed packets")
Expand All @@ -524,6 +554,17 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
cfg.EnvVars = make(map[string]string)
}
cfg.EnvVars["FORWARD_PROXY"] = fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort)
} else if cfg.SingleHTTPProxy {
if !cfg.IsPeerTLS {
panic("Can't use peer proxy without peer TLS as it can result in malformed packets")
}

if cfg.EnvVars == nil {
cfg.EnvVars = make(map[string]string)
}
// TODO: switch to using basic
cfg.EnvVars["FORWARD_PROXY"] = fmt.Sprintf("http://%d:doesnotmatter@127.0.0.1:%d", peerPort, cfg.SingleHTTPProxyPort)
// cfg.EnvVars["FORWARD_PROXY"] = fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort)
}

name := fmt.Sprintf("%s-test-%d", testNameCleanRegex.ReplaceAllString(tb.Name(), ""), i)
Expand Down Expand Up @@ -986,6 +1027,14 @@ func (epc *EtcdProcessCluster) Close() error {
err = cerr
}
}

if epc.SingleHTTPProxyInstance != nil {
epc.lg.Info("closing single HTTP Proxy...")
if err = epc.SingleHTTPProxyInstance.Close(); err != nil {
return err
}
}

epc.lg.Info("closed test cluster.")
return err
}
Expand Down
13 changes: 2 additions & 11 deletions tests/robustness/failpoint/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,11 @@ func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, proces
}

func Blackhole(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, shouldWaitTillSnapshot bool) error {
proxy := member.PeerProxy()
httpProxy := member.PeerHTTPProxy()

t.Logf("Blackholing traffic from and to member %q", member.Config().Name)
proxy.BlackholeTx()
proxy.BlackholeRx()
httpProxy.BlackholeTx()
httpProxy.BlackholeRx()
clus.SingleHTTPProxyInstance.BlackholePeer(member.Config().PeerURL)
defer func() {
t.Logf("Traffic restored from and to member %q", member.Config().Name)
proxy.UnblackholeTx()
proxy.UnblackholeRx()
httpProxy.UnblackholeTx()
httpProxy.UnblackholeRx()
clus.SingleHTTPProxyInstance.UnblackholePeer(member.Config().PeerURL)
}()

if shouldWaitTillSnapshot {
Expand Down

0 comments on commit 9affb87

Please sign in to comment.