Skip to content

Commit

Permalink
e2e test: use one connection per stream
Browse files Browse the repository at this point in the history
Sharing the same connection for multiple streams should have worked,
but ran into unexpected timeouts:

I0227 08:07:49.754263   80029 portproxy.go:109] container "mock" in pod csi-mock-volumes-4037-2061/csi-mockplugin-0 is running
E0227 08:07:49.779359   80029 portproxy.go:178] prepare forwarding csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: dialer failed: unable to upgrade connection: pod not found ("csi-mockplugin-0_csi-mock-volumes-4037-2061")
I0227 08:07:50.782705   80029 portproxy.go:109] container "mock" in pod csi-mock-volumes-4037-2061/csi-mockplugin-0 is running
I0227 08:07:50.809326   80029 portproxy.go:125] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: starting connection polling
I0227 08:07:50.909544   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #0, 0 open
I0227 08:07:50.912436   80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #0
I0227 08:07:50.912503   80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #0
I0227 08:07:50.913161   80029 portproxy.go:322] forward connection #0 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream
E0227 08:07:50.913324   80029 portproxy.go:242] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: an error occurred connecting to the remote port: error forwarding port 9000 to pod 66662ea1ab30b4193dac0102c49be840971d337c802cc0c8bbc074214522bd13, uid : failed to execute portforward in network namespace "/var/run/netns/cni-c15e4e36-dad9-8316-c301-33af9dad5717": failed to dial 9000: dial tcp4 127.0.0.1:9000: connect: connection refused
I0227 08:07:50.913371   80029 portproxy.go:340] forward connection #0 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side
W0227 08:07:50.913487   80029 server.go:669] grpc: Server.Serve failed to create ServerTransport: connection error: desc = "transport: http2Server.HandleStreams failed to receive the preface from client: EOF"
I0227 08:07:51.009519   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #1, 0 open
I0227 08:07:51.011912   80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #1
I0227 08:07:51.011973   80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #1
I0227 08:07:51.013677   80029 portproxy.go:322] forward connection #1 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream
I0227 08:07:51.013720   80029 portproxy.go:340] forward connection #1 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side
W0227 08:07:51.013794   80029 server.go:669] grpc: Server.Serve failed to create ServerTransport: connection error: desc = "transport: http2Server.HandleStreams failed to receive the preface from client: EOF"
E0227 08:07:51.017026   80029 portproxy.go:242] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: an error occurred connecting to the remote port: error forwarding port 9000 to pod 66662ea1ab30b4193dac0102c49be840971d337c802cc0c8bbc074214522bd13, uid : failed to execute portforward in network namespace "/var/run/netns/cni-c15e4e36-dad9-8316-c301-33af9dad5717": failed to dial 9000: dial tcp4 127.0.0.1:9000: connect: connection refused
I0227 08:07:51.109515   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #2, 0 open
I0227 08:07:51.111479   80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #2
I0227 08:07:51.111519   80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #2
I0227 08:07:51.209519   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #3, 1 open
I0227 08:07:51.766305   80029 csi.go:377] gRPC call: {"Method":"/csi.v1.Identity/Probe","Request":{},"Response":{"ready":{"value":true}},"Error":"","FullError":null}
I0227 08:07:51.768304   80029 csi.go:377] gRPC call: {"Method":"/csi.v1.Identity/GetPluginInfo","Request":{},"Response":{"name":"csi-mock-csi-mock-volumes-4037","vendor_version":"0.3.0","manifest":{"url":"https://k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/mock"}},"Error":"","FullError":null}
I0227 08:07:51.770494   80029 csi.go:377] gRPC call: {"Method":"/csi.v1.Identity/GetPluginCapabilities","Request":{},"Response":{"capabilities":[{"Type":{"Service":{"type":1}}},{"Type":{"VolumeExpansion":{"type":1}}},{"Type":{"Service":{"type":2}}}]},"Error":"","FullError":null}
I0227 08:07:51.772899   80029 csi.go:377] gRPC call: {"Method":"/csi.v1.Controller/ControllerGetCapabilities","Request":{},"Response":{"capabilities":[{"Type":{"Rpc":{"type":1}}},{"Type":{"Rpc":{"type":3}}},{"Type":{"Rpc":{"type":10}}},{"Type":{"Rpc":{"type":4}}},{"Type":{"Rpc":{"type":6}}},{"Type":{"Rpc":{"type":5}}},{"Type":{"Rpc":{"type":8}}},{"Type":{"Rpc":{"type":7}}},{"Type":{"Rpc":{"type":12}}},{"Type":{"Rpc":{"type":11}}},{"Type":{"Rpc":{"type":9}}}]},"Error":"","FullError":null}
I0227 08:08:21.209901   80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred
I0227 08:08:21.209980   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #3, 1 open
I0227 08:08:51.211522   80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating data stream: Timeout occurred
I0227 08:08:51.211566   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #3, 1 open
I0227 08:08:51.213451   80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #3
I0227 08:08:51.213498   80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #3
I0227 08:08:51.309540   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #4, 2 open
I0227 08:08:52.215358   80029 portproxy.go:322] forward connection #3 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream
I0227 08:08:52.215475   80029 portproxy.go:340] forward connection #3 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side
I0227 08:09:21.310003   80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred
I0227 08:09:21.310086   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #4, 1 open
I0227 08:09:51.311854   80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating data stream: Timeout occurred
I0227 08:09:51.311908   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #4, 1 open
I0227 08:09:51.314415   80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #4
I0227 08:09:51.314497   80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #4
I0227 08:09:51.409527   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection kubernetes#5, 2 open
I0227 08:09:52.326203   80029 portproxy.go:322] forward connection #4 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream
I0227 08:09:52.326277   80029 portproxy.go:340] forward connection #4 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side
I0227 08:10:21.409892   80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred
I0227 08:10:21.409954   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection kubernetes#5, 1 open
I0227 08:10:51.411455   80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating data stream: Timeout occurred
I0227 08:10:51.411557   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection kubernetes#5, 1 open
I0227 08:10:51.413229   80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection kubernetes#5
I0227 08:10:51.413274   80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection kubernetes#5
I0227 08:10:51.509508   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection kubernetes#6, 2 open
I0227 08:10:52.414862   80029 portproxy.go:322] forward connection kubernetes#5 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream
I0227 08:10:52.414931   80029 portproxy.go:340] forward connection kubernetes#5 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side
I0227 08:11:21.509879   80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred
I0227 08:11:21.509934   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection kubernetes#6, 1 open
I0227 08:11:51.511519   80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating data stream: Timeout occurred
I0227 08:11:51.511568   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection kubernetes#6, 1 open
I0227 08:11:51.513519   80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection kubernetes#6
I0227 08:11:51.513571   80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection kubernetes#6
I0227 08:11:51.609504   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection kubernetes#7, 2 open
I0227 08:11:52.517799   80029 portproxy.go:322] forward connection kubernetes#6 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream
I0227 08:11:52.517918   80029 portproxy.go:340] forward connection kubernetes#6 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side
I0227 08:12:21.609856   80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred
I0227 08:12:21.609909   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection kubernetes#7, 1 open
I0227 08:12:51.611494   80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating data stream: Timeout occurred
I0227 08:12:51.611555   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection kubernetes#7, 1 open
I0227 08:12:51.613289   80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection kubernetes#7
I0227 08:12:51.613343   80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection kubernetes#7
I0227 08:12:51.709535   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection kubernetes#8, 2 open
I0227 08:12:52.615858   80029 portproxy.go:322] forward connection kubernetes#7 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream
I0227 08:12:52.615989   80029 portproxy.go:340] forward connection kubernetes#7 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side
W0227 08:12:52.616116   80029 server.go:669] grpc: Server.Serve failed to create ServerTransport: connection error: desc = "transport: http2Server.HandleStreams failed to receive the preface from client: EOF"
I0227 08:13:21.709934   80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred
I0227 08:13:21.709997   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection kubernetes#8, 1 open
Feb 27 08:13:30.916: FAIL: Failed to register CSIDriver csi-mock-csi-mock-volumes-4037
Unexpected error:
    <*errors.errorString | 0xc002666220>: {
        s: "error waiting for CSI driver csi-mock-csi-mock-volumes-4037 registration on node kind-worker2: timed out waiting for the condition",
    }
    error waiting for CSI driver csi-mock-csi-mock-volumes-4037 registration on node kind-worker2: timed out waiting for the condition
occurred
  • Loading branch information
pohly committed Mar 1, 2021
1 parent 5089af1 commit 06ffdbc
Showing 1 changed file with 46 additions and 64 deletions.
110 changes: 46 additions & 64 deletions test/e2e/storage/drivers/proxy/portproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ import (
"io/ioutil"
"net"
"net/http"
"strconv"
"sync"
"sync/atomic"
"time"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/httpstream"
Expand Down Expand Up @@ -94,35 +92,11 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res
addr: addr,
}

// Port forwarding is allowed to fail and will be restarted when it does.
prepareForwarding := func() (*remotePort, error) {
pod, err := clientset.CoreV1().Pods(addr.Namespace).Get(ctx, addr.PodName, metav1.GetOptions{})
if err != nil {
return nil, err
}
for i, status := range pod.Status.ContainerStatuses {
if pod.Spec.Containers[i].Name == addr.ContainerName &&
status.State.Running == nil {
return nil, fmt.Errorf("container %q is not running", addr.ContainerName)
}
}

streamConn, _, err := dialer.Dial(portforward.PortForwardProtocolV1Name)
if err != nil {
return nil, fmt.Errorf("dialer failed: %v", err)
}
rp := &remotePort{
streamConn: streamConn,
}
return rp, nil
}

var connectionsCreated, connectionsClosed int32

runForwarding := func(rp *remotePort) {
defer rp.Close()
klog.V(5).Infof("%s: starting connection polling", prefix)
defer klog.V(5).Infof("%s: connection polling ended", prefix)
runForwarding := func() {
klog.V(2).Infof("%s: starting connection polling", prefix)
defer klog.V(2).Infof("%s: connection polling ended", prefix)

// This delay determines how quickly we notice when someone has
// connected inside the cluster. With socat, we cannot make this too small
Expand All @@ -145,9 +119,9 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res
}

klog.V(5).Infof("%s: trying to create a new connection #%d, %d open", prefix, connectionsCreated, openConnections)
stream, err := rp.dial(ctx, prefix, addr.Port)
stream, err := dial(ctx, fmt.Sprintf("%s #%d", prefix, connectionsCreated), dialer, addr.Port)
if err != nil {
klog.V(5).Infof("%s: no connection: %v", prefix, err)
klog.Errorf("%s: no connection: %v", prefix, err)
break
}
// Make the connection available to Accept below.
Expand All @@ -166,18 +140,24 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res
// Portforwarding and polling for connections run in the background.
go func() {
for {
fw, err := prepareForwarding()
if err == nil {
runForwarding(fw)
} else {
if apierrors.IsNotFound(err) {
// This is normal, the pod isn't running yet. Log with lower severity.
klog.V(5).Infof("prepare forwarding %s: %v", addr, err)
} else {
klog.Errorf("prepare forwarding %s: %v", addr, err)
running := false
pod, err := clientset.CoreV1().Pods(addr.Namespace).Get(ctx, addr.PodName, metav1.GetOptions{})
if err != nil {
klog.V(5).Infof("checking for container %q in pod %s/%s: %v", addr.ContainerName, addr.Namespace, addr.PodName, err)
}
for i, status := range pod.Status.ContainerStatuses {
if pod.Spec.Containers[i].Name == addr.ContainerName &&
status.State.Running != nil {
running = true
break
}
}

if running {
klog.V(2).Infof("container %q in pod %s/%s is running", addr.ContainerName, addr.Namespace, addr.PodName)
runForwarding()
}

select {
case <-ctx.Done():
return
Expand Down Expand Up @@ -209,27 +189,32 @@ func (a Addr) String() string {
return fmt.Sprintf("%s/%s:%d", a.Namespace, a.PodName, a.Port)
}

// remotePort is a stripped down version of client-go/tools/portforward minus
// the local listeners.
type remotePort struct {
type stream struct {
httpstream.Stream
streamConn httpstream.Connection

requestIDLock sync.Mutex
requestID int
}

func (rp *remotePort) dial(ctx context.Context, prefix string, port int) (httpstream.Stream, error) {
requestID := rp.nextRequestID()
func dial(ctx context.Context, prefix string, dialer httpstream.Dialer, port int) (s *stream, finalErr error) {
streamConn, _, err := dialer.Dial(portforward.PortForwardProtocolV1Name)
if err != nil {
return nil, fmt.Errorf("dialer failed: %v", err)
}
requestID := "1"
defer func() {
if finalErr != nil {
streamConn.Close()
}
}()

// create error stream
headers := http.Header{}
headers.Set(v1.StreamType, v1.StreamTypeError)
headers.Set(v1.PortHeader, fmt.Sprintf("%d", port))
headers.Set(v1.PortForwardRequestIDHeader, strconv.Itoa(requestID))
headers.Set(v1.PortForwardRequestIDHeader, requestID)

// We're not writing to this stream, just reading an error message from it.
// This happens asynchronously.
errorStream, err := rp.streamConn.CreateStream(headers)
errorStream, err := streamConn.CreateStream(headers)
if err != nil {
return nil, fmt.Errorf("error creating error stream: %v", err)
}
Expand All @@ -246,24 +231,20 @@ func (rp *remotePort) dial(ctx context.Context, prefix string, port int) (httpst

// create data stream
headers.Set(v1.StreamType, v1.StreamTypeData)
dataStream, err := rp.streamConn.CreateStream(headers)
dataStream, err := streamConn.CreateStream(headers)
if err != nil {
return nil, fmt.Errorf("error creating data stream: %v", err)
}

return dataStream, nil
return &stream{
Stream: dataStream,
streamConn: streamConn,
}, nil
}

func (rp *remotePort) Close() {
rp.streamConn.Close()
}

func (rp *remotePort) nextRequestID() int {
rp.requestIDLock.Lock()
defer rp.requestIDLock.Unlock()
id := rp.requestID
rp.requestID++
return id
func (s *stream) Close() {
s.Stream.Close()
s.streamConn.Close()
}

type listener struct {
Expand Down Expand Up @@ -292,7 +273,7 @@ func (l *listener) Accept() (net.Conn, error) {
}

type connection struct {
stream httpstream.Stream
stream *stream
addr Addr
counter int32
closed *int32
Expand Down Expand Up @@ -346,7 +327,8 @@ func (c *connection) Close() error {
atomic.AddInt32(c.closed, 1)
c.closed = nil
}
return c.stream.Close()
c.stream.Close()
return nil
}

func (l *listener) Addr() net.Addr {
Expand Down

0 comments on commit 06ffdbc

Please sign in to comment.