Skip to content

Commit

Permalink
[RFC] Attempt to make Blockhole blocking on L7
Browse files Browse the repository at this point in the history
Based on Fu Wei's idea, we employ blocking on L7 but without using
external tools.

[Main ideas]
The main idea is
1) utilize the `X-PeerURLs` field from the header, as we know all
traffic to peer will contain this header field.
2) As we also know that all nodes will create direct connections with their
peers, so the traffic blocking will have to happen at all nodes'
proxies, contrary to the current design, where only the proxy of the
peer is being blackholed.

Based on the main ideas, we introduce a SSL termination proxy so we can
obtain the `X-PeerURLs` field from the header.

[Issues]

There are 2 known issues with this approach
1) still leaking some traffic. But the leaked traffic (as discussed
later won't affect the blackhole idea that we would like to achieve (as
stream and pipeline traffic between raft nodes are now properly terminated)
2) we would need to employ SSL termination proxy, which might lead to a
small performance hit

For 1), as this way of blocking (by utilizing `X-PeerURLs` from the
header) will miss certain types of traffic to certain endpoints, as
the traffic to some endpoints doesn't have the `X-PeerURLs` field.
Currently, here are the known ones: /members, /version, and /raft/probing.
As you can see from the log, its header doesn't contain the `X-PeerURLs`
field, but only the following fields:
- map[Accept-Encoding:[gzip] User-Agent:[Go-http-client/1.1]] /members
- map[Accept-Encoding:[gzip] User-Agent:[Go-http-client/1.1]] /version
- map[Accept-Encoding:[gzip] User-Agent:[Go-http-client/1.1]] /raft/probing

For 2) in order to read out `X-PeerURLs` from the header, we need to
terminate the SSL connection, as we can't drop cleartext traffic
(ref [1]). Thus, a new option `e2e.WithSSLTerminationProxy(true)`
is introduced, which will change the network flow into
```
A -- B's SSL termination proxy - B's transparent proxy - B
     ^ newly introduced          ^ in the original codebase
```

[Known improvements required before turning RFC into PR]

The prototype needs to be further improved for code review after
fixing the following issues:
- blocking only RX or TX traffic (as currently a call to `blackholeTX`
or `blackholeRX` will black both TX and RX traffic instead of just
the specified one.
- slowness when performing test cleanup (I think this is related to the
SSL timeout setting, but I haven't looked into it yet)
- coding style improvements

References:
[1] #15595

Signed-off-by: Chun-Hung Tseng <henrybear327@gmail.com>
  • Loading branch information
henrybear327 committed Apr 28, 2024
1 parent ea8a0e2 commit 60c5240
Show file tree
Hide file tree
Showing 5 changed files with 355 additions and 58 deletions.
239 changes: 218 additions & 21 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package proxy

import (
"context"
"bufio"
"bytes"
"crypto/tls"
"fmt"
"io"
mrand "math/rand"
Expand All @@ -31,12 +33,14 @@ import (
"go.uber.org/zap"

"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/tests/v3/framework/testutils"
)

var (
defaultDialTimeout = 3 * time.Second
defaultBufferSize = 48 * 1024
defaultRetryInterval = 10 * time.Millisecond
FixturesDir = testutils.MustAbsPath("../fixtures")
)

// Server defines proxy server layer that simulates common network faults:
Expand Down Expand Up @@ -137,6 +141,15 @@ type ServerConfig struct {
DialTimeout time.Duration
BufferSize int
RetryInterval time.Duration

IsSSLTerminatingProxy bool
TerminatingTLSInfo transport.TLSInfo
// for SSL termination proxy: put outgoing connection local addr - connection initiated by peerURLs
// for transparent proxy: read incoming connection remote addr - check BlackholeMap to see if peerURLs is blocked
ConnectionMap map[string]string
ConnectionMapMu *sync.RWMutex
BlackholeMap map[string]bool // peerURLs to be blackholed - true
BlackholeMapMu *sync.RWMutex
}

type server struct {
Expand All @@ -150,6 +163,13 @@ type server struct {
tlsInfo transport.TLSInfo
dialTimeout time.Duration

terminatingTLSInfo transport.TLSInfo
isSSLTerminatingProxy bool
connectionMap map[string]string
connectionMapMu *sync.RWMutex
blackholeMap map[string]bool
blackholeMapMu *sync.RWMutex

bufferSize int
retryInterval time.Duration

Expand Down Expand Up @@ -200,6 +220,13 @@ func NewServer(cfg ServerConfig) Server {
tlsInfo: cfg.TLSInfo,
dialTimeout: cfg.DialTimeout,

isSSLTerminatingProxy: cfg.IsSSLTerminatingProxy,
terminatingTLSInfo: cfg.TerminatingTLSInfo,
connectionMap: cfg.ConnectionMap,
connectionMapMu: cfg.ConnectionMapMu,
blackholeMap: cfg.BlackholeMap,
blackholeMapMu: cfg.BlackholeMapMu,

bufferSize: cfg.BufferSize,
retryInterval: cfg.RetryInterval,

Expand All @@ -216,6 +243,7 @@ func NewServer(cfg ServerConfig) Server {
if err == nil {
s.fromPort, _ = strconv.Atoi(fromPort)
}

var toPort string
_, toPort, err = net.SplitHostPort(cfg.To.Host)
if err == nil {
Expand Down Expand Up @@ -250,7 +278,11 @@ func NewServer(cfg ServerConfig) Server {

var ln net.Listener
if !s.tlsInfo.Empty() {
ln, err = transport.NewListener(addr, s.from.Scheme, &s.tlsInfo)
if s.isSSLTerminatingProxy {
ln, err = transport.NewListener(addr, "https", &s.tlsInfo)
} else {
ln, err = transport.NewListener(addr, s.from.Scheme, &s.tlsInfo)
}
} else {
ln, err = net.Listen(s.from.Scheme, addr)
}
Expand All @@ -265,6 +297,7 @@ func NewServer(cfg ServerConfig) Server {
go s.listenAndServe()

s.lg.Info("started proxying", zap.String("from", s.From()), zap.String("to", s.To()))

return s
}

Expand All @@ -284,7 +317,7 @@ func (s *server) To() string {
func (s *server) listenAndServe() {
defer s.closeWg.Done()

ctx := context.Background()
// ctx := context.Background()
s.lg.Info("proxy is listening on", zap.String("from", s.From()))
close(s.readyc)

Expand Down Expand Up @@ -355,22 +388,37 @@ func (s *server) listenAndServe() {

var out net.Conn
if !s.tlsInfo.Empty() {
var tp *http.Transport
tp, err = transport.NewTransport(s.tlsInfo, s.dialTimeout)
// Not sure why this is not working...
// var tp *http.Transport
// tp, err = transport.NewTransport(s.tlsInfo, s.dialTimeout)
// if err != nil {
// select {
// case s.errc <- err:
// select {
// case <-s.donec:
// return
// default:
// }
// case <-s.donec:
// return
// }
// continue
// }
// out, err = tp.DialContext(ctx, s.to.Scheme, s.to.Host)

// using simple Cert loading for now
var cert tls.Certificate
CertPath := FixturesDir + "/server.crt"
PrivateKeyPath := FixturesDir + "/server.key.insecure"
cert, err = tls.LoadX509KeyPair(CertPath, PrivateKeyPath)
if err != nil {
select {
case s.errc <- err:
select {
case <-s.donec:
return
default:
}
case <-s.donec:
return
}
continue
panic(err)
}
out, err = tp.DialContext(ctx, s.to.Scheme, s.to.Host)
conf := &tls.Config{
InsecureSkipVerify: true,
Certificates: []tls.Certificate{cert},
}
out, err = tls.Dial(s.to.Scheme, s.to.Host, conf)
} else {
out, err = net.Dial(s.to.Scheme, s.to.Host)
}
Expand All @@ -393,36 +441,72 @@ func (s *server) listenAndServe() {
go func() {
defer s.closeWg.Done()
// read incoming bytes from listener, dispatch to outgoing connection

s.transmit(out, in)
out.Close()
in.Close()

if s.isSSLTerminatingProxy {
// when the connection is closed, we delete the entry from the map
s.connectionMapMu.Lock()
port, err := getPort(out.LocalAddr().String())
if err != nil {
panic("")
}
delete(s.connectionMap, port)
s.connectionMapMu.Unlock()
}
}()
go func() {
defer s.closeWg.Done()
// read response from outgoing connection, write back to listener
s.receive(in, out)
in.Close()
out.Close()

if s.isSSLTerminatingProxy {
// when the connection is closed, we delete the entry from the map
s.connectionMapMu.Lock()
port, err := getPort(out.LocalAddr().String())
if err != nil {
panic("")
}
delete(s.connectionMap, port)
s.connectionMapMu.Unlock()
}
}()
}
}

func (s *server) transmit(dst io.Writer, src io.Reader) {
func (s *server) transmit(dst, src net.Conn) {
s.ioCopy(dst, src, proxyTx)
}

func (s *server) receive(dst io.Writer, src io.Reader) {
func (s *server) receive(dst, src net.Conn) {
s.ioCopy(dst, src, proxyRx)
}

func getPort(host string) (string, error) {
if strings.HasPrefix(host, "https") {
return strings.Replace(host, "https://localhost:", "", 1), nil
}

_, port, err := net.SplitHostPort(host)
if err != nil {
return "", nil
}
// return strconv.Atoi(port)
return port, nil
}

type proxyType uint8

const (
proxyTx proxyType = iota
proxyRx
)

func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) {
func (s *server) ioCopy(dst, src net.Conn, ptype proxyType) {
buf := make([]byte, s.bufferSize)
for {
nr1, err := src.Read(buf)
Expand Down Expand Up @@ -455,6 +539,53 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) {
}
data := buf[:nr1]

// attempt to obtain the header information
// this approach won't work since we are looking at encrypted bytestream, and headers are also encrypted
// unless we are able to decrypt this
if s.isSSLTerminatingProxy && ptype == proxyTx {
/*
if we are isolating A from B and C
case 1 - Proxy of A
The incoming traffic and outgoing traffic are already handled by the original design
case 2 - Proxy of B and C
The outgoing traffic to A is already handled by the original design
The incoming traffic from A will have to be inspected at all SSL termination proxy, by extracting
X-PeerURLs field from the Header. Since the SSL termination proxy will initiate a new connection
to forward the traffic to the transparent proxy (the original proxy), we will record which port
that the new forwarding connection is using in a map shared by SSL termination proxy and the
transparent proxy, containing the key-value pair of port-peer mapping. So now at the transparent
proxy, we can identify which traffic is coming from which node!
Note A: failed to decode headers and no X-PeerURLs
After looking into the logs, we can see the following traffic that doesn't have the X-PeerURLs present
- map[Accept-Encoding:[gzip] User-Agent:[Go-http-client/1.1]] /members
- map[Accept-Encoding:[gzip] User-Agent:[Go-http-client/1.1]] /version
- map[Accept-Encoding:[gzip] User-Agent:[Go-http-client/1.1]] /raft/probing
*/
if req, err := http.ReadRequest(bufio.NewReader(bytes.NewBuffer(data))); err != nil {
// check note A
} else {
peerURLs := req.Header.Get("X-PeerURLs")

if len(peerURLs) == 0 {
// check note A
} else {
s.connectionMapMu.Lock()
port, err := getPort(dst.LocalAddr().String())
if err != nil {
panic("")
}
blockingPort, err := getPort(peerURLs) // TODO: make this able to parse multiple peerURLs?
if err != nil {
panic("")
}
s.connectionMap[port] = blockingPort
s.connectionMapMu.Unlock()
}
}
}

// alters/corrupts/drops data
switch ptype {
case proxyTx:
Expand All @@ -472,6 +603,40 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) {
default:
panic("unknown proxy type")
}

// we also block off the traffic to targetted node
if !s.isSSLTerminatingProxy && data != nil {
var port string
switch ptype {
case proxyTx:
port, err = getPort(src.RemoteAddr().String())
if err != nil {
panic("")
}

case proxyRx:
port, err = getPort(dst.RemoteAddr().String())
if err != nil {
panic("")
}
default:
panic("unknown proxy type")
}

s.connectionMapMu.RLock()
if peerPort, ok := s.connectionMap[port]; ok {

s.blackholeMapMu.RLock()
if _, ok := s.blackholeMap[peerPort]; ok {
data = nil
}
s.blackholeMapMu.RUnlock()
} else {
// we might have non-peer messages on the network
}
s.connectionMapMu.RUnlock()
}

nr2 := len(data)
switch ptype {
case proxyTx:
Expand Down Expand Up @@ -867,6 +1032,21 @@ func (s *server) BlackholeTx() {
zap.String("from", s.From()),
zap.String("to", s.To()),
)

s.blackholeMapMu.Lock()

port, err := getPort(s.listener.Addr().String())
if err != nil {
panic("")
}
portInt, err := strconv.Atoi(port)
if err != nil {
panic("")
}
port = strconv.Itoa((portInt - 2))
s.blackholeMap[port] = true

s.blackholeMapMu.Unlock()
}

func (s *server) UnblackholeTx() {
Expand All @@ -876,6 +1056,19 @@ func (s *server) UnblackholeTx() {
zap.String("from", s.From()),
zap.String("to", s.To()),
)

s.blackholeMapMu.Lock()
port, err := getPort(s.listener.Addr().String())
if err != nil {
panic("")
}
portInt, err := strconv.Atoi(port)
if err != nil {
panic("")
}
port = strconv.Itoa((portInt - 2))
delete(s.blackholeMap, port)
s.blackholeMapMu.Unlock()
}

func (s *server) BlackholeRx() {
Expand Down Expand Up @@ -972,7 +1165,11 @@ func (s *server) ResetListener() error {
var ln net.Listener
var err error
if !s.tlsInfo.Empty() {
ln, err = transport.NewListener(s.from.Host, s.from.Scheme, &s.tlsInfo)
if s.isSSLTerminatingProxy {
ln, err = transport.NewListener(s.from.Host, "https", &s.tlsInfo)
} else {
ln, err = transport.NewListener(s.from.Host, s.from.Scheme, &s.tlsInfo)
}
} else {
ln, err = net.Listen(s.from.Scheme, s.from.Host)
}
Expand Down

0 comments on commit 60c5240

Please sign in to comment.