Skip to content

Commit 06ffdbc

Browse files
committed
e2e test: use one connection per stream
Sharing the same connection for multiple streams should have worked, but ran into unexpected timeouts: I0227 08:07:49.754263 80029 portproxy.go:109] container "mock" in pod csi-mock-volumes-4037-2061/csi-mockplugin-0 is running E0227 08:07:49.779359 80029 portproxy.go:178] prepare forwarding csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: dialer failed: unable to upgrade connection: pod not found ("csi-mockplugin-0_csi-mock-volumes-4037-2061") I0227 08:07:50.782705 80029 portproxy.go:109] container "mock" in pod csi-mock-volumes-4037-2061/csi-mockplugin-0 is running I0227 08:07:50.809326 80029 portproxy.go:125] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: starting connection polling I0227 08:07:50.909544 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #0, 0 open I0227 08:07:50.912436 80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #0 I0227 08:07:50.912503 80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #0 I0227 08:07:50.913161 80029 portproxy.go:322] forward connection #0 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream E0227 08:07:50.913324 80029 portproxy.go:242] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: an error occurred connecting to the remote port: error forwarding port 9000 to pod 66662ea1ab30b4193dac0102c49be840971d337c802cc0c8bbc074214522bd13, uid : failed to execute portforward in network namespace "/var/run/netns/cni-c15e4e36-dad9-8316-c301-33af9dad5717": failed to dial 9000: dial tcp4 127.0.0.1:9000: connect: connection refused I0227 08:07:50.913371 80029 portproxy.go:340] forward connection #0 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side W0227 08:07:50.913487 80029 server.go:669] grpc: Server.Serve failed to create ServerTransport: connection error: desc = "transport: http2Server.HandleStreams failed to receive the preface from client: EOF" I0227 08:07:51.009519 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #1, 0 open I0227 08:07:51.011912 80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #1 I0227 08:07:51.011973 80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #1 I0227 08:07:51.013677 80029 portproxy.go:322] forward connection #1 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream I0227 08:07:51.013720 80029 portproxy.go:340] forward connection #1 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side W0227 08:07:51.013794 80029 server.go:669] grpc: Server.Serve failed to create ServerTransport: connection error: desc = "transport: http2Server.HandleStreams failed to receive the preface from client: EOF" E0227 08:07:51.017026 80029 portproxy.go:242] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: an error occurred connecting to the remote port: error forwarding port 9000 to pod 66662ea1ab30b4193dac0102c49be840971d337c802cc0c8bbc074214522bd13, uid : failed to execute portforward in network namespace "/var/run/netns/cni-c15e4e36-dad9-8316-c301-33af9dad5717": failed to dial 9000: dial tcp4 127.0.0.1:9000: connect: connection refused I0227 08:07:51.109515 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #2, 0 open I0227 08:07:51.111479 80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #2 I0227 08:07:51.111519 80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #2 I0227 08:07:51.209519 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #3, 1 open I0227 08:07:51.766305 80029 csi.go:377] gRPC call: {"Method":"/csi.v1.Identity/Probe","Request":{},"Response":{"ready":{"value":true}},"Error":"","FullError":null} I0227 08:07:51.768304 80029 csi.go:377] gRPC call: {"Method":"/csi.v1.Identity/GetPluginInfo","Request":{},"Response":{"name":"csi-mock-csi-mock-volumes-4037","vendor_version":"0.3.0","manifest":{"url":"https://k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/mock"}},"Error":"","FullError":null} I0227 08:07:51.770494 80029 csi.go:377] gRPC call: {"Method":"/csi.v1.Identity/GetPluginCapabilities","Request":{},"Response":{"capabilities":[{"Type":{"Service":{"type":1}}},{"Type":{"VolumeExpansion":{"type":1}}},{"Type":{"Service":{"type":2}}}]},"Error":"","FullError":null} I0227 08:07:51.772899 80029 csi.go:377] gRPC call: {"Method":"/csi.v1.Controller/ControllerGetCapabilities","Request":{},"Response":{"capabilities":[{"Type":{"Rpc":{"type":1}}},{"Type":{"Rpc":{"type":3}}},{"Type":{"Rpc":{"type":10}}},{"Type":{"Rpc":{"type":4}}},{"Type":{"Rpc":{"type":6}}},{"Type":{"Rpc":{"type":5}}},{"Type":{"Rpc":{"type":8}}},{"Type":{"Rpc":{"type":7}}},{"Type":{"Rpc":{"type":12}}},{"Type":{"Rpc":{"type":11}}},{"Type":{"Rpc":{"type":9}}}]},"Error":"","FullError":null} I0227 08:08:21.209901 80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred I0227 08:08:21.209980 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #3, 1 open I0227 08:08:51.211522 80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating data stream: Timeout occurred I0227 08:08:51.211566 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #3, 1 open I0227 08:08:51.213451 80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #3 I0227 08:08:51.213498 80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #3 I0227 08:08:51.309540 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #4, 2 open I0227 08:08:52.215358 80029 portproxy.go:322] forward connection #3 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream I0227 08:08:52.215475 80029 portproxy.go:340] forward connection #3 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side I0227 08:09:21.310003 80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred I0227 08:09:21.310086 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #4, 1 open I0227 08:09:51.311854 80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating data stream: Timeout occurred I0227 08:09:51.311908 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #4, 1 open I0227 08:09:51.314415 80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #4 I0227 08:09:51.314497 80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #4 I0227 08:09:51.409527 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #5, 2 open I0227 08:09:52.326203 80029 portproxy.go:322] forward connection #4 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream I0227 08:09:52.326277 80029 portproxy.go:340] forward connection #4 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side I0227 08:10:21.409892 80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred I0227 08:10:21.409954 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #5, 1 open I0227 08:10:51.411455 80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating data stream: Timeout occurred I0227 08:10:51.411557 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #5, 1 open I0227 08:10:51.413229 80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #5 I0227 08:10:51.413274 80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #5 I0227 08:10:51.509508 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #6, 2 open I0227 08:10:52.414862 80029 portproxy.go:322] forward connection #5 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream I0227 08:10:52.414931 80029 portproxy.go:340] forward connection #5 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side I0227 08:11:21.509879 80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred I0227 08:11:21.509934 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #6, 1 open I0227 08:11:51.511519 80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating data stream: Timeout occurred I0227 08:11:51.511568 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #6, 1 open I0227 08:11:51.513519 80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #6 I0227 08:11:51.513571 80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #6 I0227 08:11:51.609504 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection kubernetes#7, 2 open I0227 08:11:52.517799 80029 portproxy.go:322] forward connection #6 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream I0227 08:11:52.517918 80029 portproxy.go:340] forward connection #6 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side I0227 08:12:21.609856 80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred I0227 08:12:21.609909 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection kubernetes#7, 1 open I0227 08:12:51.611494 80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating data stream: Timeout occurred I0227 08:12:51.611555 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection kubernetes#7, 1 open I0227 08:12:51.613289 80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection kubernetes#7 I0227 08:12:51.613343 80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection kubernetes#7 I0227 08:12:51.709535 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection kubernetes#8, 2 open I0227 08:12:52.615858 80029 portproxy.go:322] forward connection kubernetes#7 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream I0227 08:12:52.615989 80029 portproxy.go:340] forward connection kubernetes#7 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side W0227 08:12:52.616116 80029 server.go:669] grpc: Server.Serve failed to create ServerTransport: connection error: desc = "transport: http2Server.HandleStreams failed to receive the preface from client: EOF" I0227 08:13:21.709934 80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred I0227 08:13:21.709997 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection kubernetes#8, 1 open Feb 27 08:13:30.916: FAIL: Failed to register CSIDriver csi-mock-csi-mock-volumes-4037 Unexpected error: <*errors.errorString | 0xc002666220>: { s: "error waiting for CSI driver csi-mock-csi-mock-volumes-4037 registration on node kind-worker2: timed out waiting for the condition", } error waiting for CSI driver csi-mock-csi-mock-volumes-4037 registration on node kind-worker2: timed out waiting for the condition occurred
1 parent 5089af1 commit 06ffdbc

File tree

1 file changed

+46
-64
lines changed

1 file changed

+46
-64
lines changed

test/e2e/storage/drivers/proxy/portproxy.go

+46-64
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,11 @@ import (
2424
"io/ioutil"
2525
"net"
2626
"net/http"
27-
"strconv"
2827
"sync"
2928
"sync/atomic"
3029
"time"
3130

3231
v1 "k8s.io/api/core/v1"
33-
apierrors "k8s.io/apimachinery/pkg/api/errors"
3432
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3533
"k8s.io/apimachinery/pkg/runtime/schema"
3634
"k8s.io/apimachinery/pkg/util/httpstream"
@@ -94,35 +92,11 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res
9492
addr: addr,
9593
}
9694

97-
// Port forwarding is allowed to fail and will be restarted when it does.
98-
prepareForwarding := func() (*remotePort, error) {
99-
pod, err := clientset.CoreV1().Pods(addr.Namespace).Get(ctx, addr.PodName, metav1.GetOptions{})
100-
if err != nil {
101-
return nil, err
102-
}
103-
for i, status := range pod.Status.ContainerStatuses {
104-
if pod.Spec.Containers[i].Name == addr.ContainerName &&
105-
status.State.Running == nil {
106-
return nil, fmt.Errorf("container %q is not running", addr.ContainerName)
107-
}
108-
}
109-
110-
streamConn, _, err := dialer.Dial(portforward.PortForwardProtocolV1Name)
111-
if err != nil {
112-
return nil, fmt.Errorf("dialer failed: %v", err)
113-
}
114-
rp := &remotePort{
115-
streamConn: streamConn,
116-
}
117-
return rp, nil
118-
}
119-
12095
var connectionsCreated, connectionsClosed int32
12196

122-
runForwarding := func(rp *remotePort) {
123-
defer rp.Close()
124-
klog.V(5).Infof("%s: starting connection polling", prefix)
125-
defer klog.V(5).Infof("%s: connection polling ended", prefix)
97+
runForwarding := func() {
98+
klog.V(2).Infof("%s: starting connection polling", prefix)
99+
defer klog.V(2).Infof("%s: connection polling ended", prefix)
126100

127101
// This delay determines how quickly we notice when someone has
128102
// connected inside the cluster. With socat, we cannot make this too small
@@ -145,9 +119,9 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res
145119
}
146120

147121
klog.V(5).Infof("%s: trying to create a new connection #%d, %d open", prefix, connectionsCreated, openConnections)
148-
stream, err := rp.dial(ctx, prefix, addr.Port)
122+
stream, err := dial(ctx, fmt.Sprintf("%s #%d", prefix, connectionsCreated), dialer, addr.Port)
149123
if err != nil {
150-
klog.V(5).Infof("%s: no connection: %v", prefix, err)
124+
klog.Errorf("%s: no connection: %v", prefix, err)
151125
break
152126
}
153127
// Make the connection available to Accept below.
@@ -166,18 +140,24 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res
166140
// Portforwarding and polling for connections run in the background.
167141
go func() {
168142
for {
169-
fw, err := prepareForwarding()
170-
if err == nil {
171-
runForwarding(fw)
172-
} else {
173-
if apierrors.IsNotFound(err) {
174-
// This is normal, the pod isn't running yet. Log with lower severity.
175-
klog.V(5).Infof("prepare forwarding %s: %v", addr, err)
176-
} else {
177-
klog.Errorf("prepare forwarding %s: %v", addr, err)
143+
running := false
144+
pod, err := clientset.CoreV1().Pods(addr.Namespace).Get(ctx, addr.PodName, metav1.GetOptions{})
145+
if err != nil {
146+
klog.V(5).Infof("checking for container %q in pod %s/%s: %v", addr.ContainerName, addr.Namespace, addr.PodName, err)
147+
}
148+
for i, status := range pod.Status.ContainerStatuses {
149+
if pod.Spec.Containers[i].Name == addr.ContainerName &&
150+
status.State.Running != nil {
151+
running = true
152+
break
178153
}
179154
}
180155

156+
if running {
157+
klog.V(2).Infof("container %q in pod %s/%s is running", addr.ContainerName, addr.Namespace, addr.PodName)
158+
runForwarding()
159+
}
160+
181161
select {
182162
case <-ctx.Done():
183163
return
@@ -209,27 +189,32 @@ func (a Addr) String() string {
209189
return fmt.Sprintf("%s/%s:%d", a.Namespace, a.PodName, a.Port)
210190
}
211191

212-
// remotePort is a stripped down version of client-go/tools/portforward minus
213-
// the local listeners.
214-
type remotePort struct {
192+
type stream struct {
193+
httpstream.Stream
215194
streamConn httpstream.Connection
216-
217-
requestIDLock sync.Mutex
218-
requestID int
219195
}
220196

221-
func (rp *remotePort) dial(ctx context.Context, prefix string, port int) (httpstream.Stream, error) {
222-
requestID := rp.nextRequestID()
197+
func dial(ctx context.Context, prefix string, dialer httpstream.Dialer, port int) (s *stream, finalErr error) {
198+
streamConn, _, err := dialer.Dial(portforward.PortForwardProtocolV1Name)
199+
if err != nil {
200+
return nil, fmt.Errorf("dialer failed: %v", err)
201+
}
202+
requestID := "1"
203+
defer func() {
204+
if finalErr != nil {
205+
streamConn.Close()
206+
}
207+
}()
223208

224209
// create error stream
225210
headers := http.Header{}
226211
headers.Set(v1.StreamType, v1.StreamTypeError)
227212
headers.Set(v1.PortHeader, fmt.Sprintf("%d", port))
228-
headers.Set(v1.PortForwardRequestIDHeader, strconv.Itoa(requestID))
213+
headers.Set(v1.PortForwardRequestIDHeader, requestID)
229214

230215
// We're not writing to this stream, just reading an error message from it.
231216
// This happens asynchronously.
232-
errorStream, err := rp.streamConn.CreateStream(headers)
217+
errorStream, err := streamConn.CreateStream(headers)
233218
if err != nil {
234219
return nil, fmt.Errorf("error creating error stream: %v", err)
235220
}
@@ -246,24 +231,20 @@ func (rp *remotePort) dial(ctx context.Context, prefix string, port int) (httpst
246231

247232
// create data stream
248233
headers.Set(v1.StreamType, v1.StreamTypeData)
249-
dataStream, err := rp.streamConn.CreateStream(headers)
234+
dataStream, err := streamConn.CreateStream(headers)
250235
if err != nil {
251236
return nil, fmt.Errorf("error creating data stream: %v", err)
252237
}
253238

254-
return dataStream, nil
239+
return &stream{
240+
Stream: dataStream,
241+
streamConn: streamConn,
242+
}, nil
255243
}
256244

257-
func (rp *remotePort) Close() {
258-
rp.streamConn.Close()
259-
}
260-
261-
func (rp *remotePort) nextRequestID() int {
262-
rp.requestIDLock.Lock()
263-
defer rp.requestIDLock.Unlock()
264-
id := rp.requestID
265-
rp.requestID++
266-
return id
245+
func (s *stream) Close() {
246+
s.Stream.Close()
247+
s.streamConn.Close()
267248
}
268249

269250
type listener struct {
@@ -292,7 +273,7 @@ func (l *listener) Accept() (net.Conn, error) {
292273
}
293274

294275
type connection struct {
295-
stream httpstream.Stream
276+
stream *stream
296277
addr Addr
297278
counter int32
298279
closed *int32
@@ -346,7 +327,8 @@ func (c *connection) Close() error {
346327
atomic.AddInt32(c.closed, 1)
347328
c.closed = nil
348329
}
349-
return c.stream.Close()
330+
c.stream.Close()
331+
return nil
350332
}
351333

352334
func (l *listener) Addr() net.Addr {

0 commit comments

Comments
 (0)