Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revamp GRPC Port forwarding tunnels to use existing proxy #2985

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

balajiv113
Copy link
Member

@balajiv113 balajiv113 commented Dec 6, 2024

Fixes #2968 and several GRPC port forwarding issues.

FYI - Running ddev testsuite to verify end-end (Done)
https://buildkite.com/test-305/ddev/builds/19#01939bef-af2a-4b7d-9ea8-fa1ca461f037

return len(p), err
}

func (g GRPCServerRW) Read(p []byte) (n int, err error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a grpc module providing the server and client so we have the core of this code in one place? This will tiny dead code in the guest and host agent that may not be optimized out, but it will make it easier to understand and maintain when all the pieces are the same place.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the reason behind. But i don't think we can create a common one. As server and client are 2 different one. Create a common will create cyclic dependency as well.

At max we can do is, have a Base conn with method like setDeadline, setWriteDeadline etc. But for now i will avoid it. If you need for sure i can do that as well

}

func (g GRPCServerRW) LocalAddr() net.Addr {
return &net.UnixAddr{Name: "grpc", Net: "unixpacket"}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like the right way to support good logging, better than the NamedReadWriter added in #2944.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pointless comment. The purpose of LocalAddr is to identify the connection (which this doesn't do) while the NamedReadWriter in #2944 is identifying the purpose of the connection.

This implementation here is a lie, it doesn't identify the connection and the address it returns cannot be used in any of the normal ways that net.Addr is used.

}

func (g GRPCServerRW) Close() error {
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this does nothing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Server doesn't have any stream closing methods. So it doesn't do anything

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is implemented only to satisfy the net.Conn interface? Should we log a debug message when about ignoring Close?

This code can be integrated in other code expecting that Close() does something. If doing nothing breaks something, the debug log will help to diagnose the issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug logs are useful only in dev side, also grpc server doesn't provide close, its purely handled on client itself. I don't think adding debug is anywhere useful.

I don't want to spam it in our guestagent logs which doesn't give much value.

pkg/portfwdserver/server.go Show resolved Hide resolved
}
remote.HandleConn(conn)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delegating the code to another libary is nice, if the library we used is reliable and well maintained.

Is this same tcpproxy libary that does not log anything on errors and was replaced in vz/network_darwin.go?

We must have 2 properties:

  • when one side is closed or fail during a copy, the other side is closed
  • when the first copy fail, the error is logged or returned to the caller

Example failure flow:

  1. first io.Copy() blocked reading from host side socket
  2. second io.Copy() blocked reading from grpc socket
  3. host side socket closed, first io.Copy() return
  4. second io.Copy() continue to block reading from grpc
  5. no error is logged since error handler is waiting for both io.Copy to finish

If this library does not ensure both properties we should not use it. It will impossible to debug when we have issues.

We can mitigate the logging issues by logging errors in Read() and Write() since forwarding will stop on the first error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking in current tcpproxy used, it is the same library with the same issues:

	errc := make(chan error, 1)
	go proxyCopy(errc, src, dst)
	go proxyCopy(errc, dst, src)
	<-errc

The errors from io.Copy are dropped silently.

func proxyCopy(errc chan<- error, dst, src net.Conn) {
	// Before we unwrap src and/or dst, copy any buffered data.
	if wc, ok := src.(*Conn); ok && len(wc.Peeked) > 0 {
		if _, err := dst.Write(wc.Peeked); err != nil {
			errc <- err
			return
		}
		wc.Peeked = nil
	}

	// Unwrap the src and dst from *Conn to *net.TCPConn so Go
	// 1.11's splice optimization kicks in.
	src = UnderlyingConn(src)
	dst = UnderlyingConn(dst)

	_, err := io.Copy(dst, src)
	errc <- err
}

When one io.Copy finish (success of failure), the other io.Copy is not affected. If both are blocked on Read() or Recv(), what would cause the call to return?

We have the same issue in the current code, but at least in the current code we will log the error if both io.Copy() fail.

It can be useful to fix both issues in tcpproxy but it will be easier to fix them in lima now. Is this the same code used for user v2 network?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to keep the port forwarding stuffs consistent across portfwclient, portfwserver, user-v2, vz forwardings.

In server we need to use tcpproxy because we need to dial and pass connection. So i wanted to use the same on client side as well.

Additionally, gvisor-tap-vsock userspace network is already using these for proxy we are using the same for vz and user-v2 network. This library has been reliable from that point of view.

I intend to reuse as these are testing across different products (not just lima), so that we can avoid failures with custom implementation like before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 the error handling in tcpproxy looks sloppy to me.

Also it looks like this whole file was copied from https://github.com/inetaf/tcpproxy/blame/master/tcpproxy.go :/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which file you mean ???

pkg/portfwd/client.go Show resolved Hide resolved
return 0, err
}
if len(in.Data) == 0 {
return 0, nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not possible or just not needed, since the return value will be the same anyway if this happens?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed. We will not get empty data. Even if it happens technically we should simply write empty data to proxy conn as well. So removed it

}

func (g GrpcClientRW) SetDeadline(t time.Time) error {
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we don't implement it? should we log a warning to make the issue visible?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too much logging will become a noise since these are connections. It will be called multiple times and its doesn't help much. SetDeadline is not even usecul in our case as ours is a grpc stream

@@ -130,14 +94,32 @@ func (g GrpcClientRW) Write(p []byte) (n int, err error) {
func (g GrpcClientRW) Read(p []byte) (n int, err error) {
in, err := g.stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
return 0, nil
}
return 0, err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't have proper logging in tcpproxy, we can mitigate it here by logging real read errors. Same can be done for Write().

@nirs
Copy link
Member

nirs commented Dec 6, 2024

Since this change convert the GRPC endpoint to net.Con interface, why not use it with bcopy? #2944

It has the same terminating issue as tcpproxy, but at least it logs errors. It will be easier to fix it to return an error and terminate the copies.

return nil
}
return err
}
Copy link
Member

@nirs nirs Dec 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was wrong in current code, we ignored the last n byte read if we got io.EOF. Fixed in #2971.

Is this fixed in forwarder.NewProxy?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that it has the same issue:

func (proxy *UDPProxy) Run() {
	readBuf := make([]byte, UDPBufSize)
	for {
		read, from, err := proxy.listener.ReadFrom(readBuf)
		if err != nil {
			// NOTE: Apparently ReadFrom doesn't return
			// ECONNREFUSED like Read do (see comment in
			// UDPProxy.replyLoop)
			if !isClosedError(err) {
				log.Debugf("Stopping udp proxy (%s)", err)
			}
			break
		}

It ignores the last n byte read.

It is calling listener.ReadFrom() which seems to have the same issue:

func (pk *pseudoLoopbackPacketConn) ReadFrom(bytes []byte) (n int, addr net.Addr, err error) {
	n, remoteAddr, err := pk.PacketConn.ReadFrom(bytes)
	if err != nil {
		return 0, nil, err
	}

	remoteAddrIP, _, err := net.SplitHostPort(remoteAddr.String())
	if err != nil {
		return 0, nil, err
	}
	if !IsLoopback(remoteAddrIP) {
		return 0, nil, fmt.Errorf("pseudoloopback forwarder: rejecting non-loopback remoteAddr %q", remoteAddr)
	}
	return n, remoteAddr, nil
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can raise PR with gvisor-tap-vsock to fix the UDPProxy part

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't have a real failure case for it for now, i intend to do this as a follow up

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do it now? This has already been sitting for a few weeks.

}

defer proxy.Close()
proxy.Run()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't get any error, or we ignore it? Are error logged in the proxy? Does ensure termination of both copies on errors or normal completion?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer proxy.Close() - Takes care of proper completion. Also we have debug and error logs in UDPProxy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you log the error from Close?

defer func() {
  if err := proxy.Close(); err != nil {
    ...
  }
}()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you check the code, proxy.close always returns nil. So handling it i don't think useful

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then panic.

@balajiv113
Copy link
Member Author

Ran DDEV Tests in buildkite. All tests passed

https://buildkite.com/test-305/ddev/builds/19#01939bef-af2a-4b7d-9ea8-fa1ca461f037

@balajiv113 balajiv113 force-pushed the grpc-pw branch 2 times, most recently from 48b2d5b to 07d39e5 Compare December 9, 2024 08:48
@balajiv113 balajiv113 requested a review from a team December 9, 2024 09:06
@AkihiroSuda AkihiroSuda added this to the v1.0.3 milestone Dec 10, 2024
pkg/portfwdserver/server.go Outdated Show resolved Hide resolved
}

func (g GRPCServerRW) LocalAddr() net.Addr {
return &net.UnixAddr{Name: "grpc", Net: "unixpacket"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pointless comment. The purpose of LocalAddr is to identify the connection (which this doesn't do) while the NamedReadWriter in #2944 is identifying the purpose of the connection.

This implementation here is a lie, it doesn't identify the connection and the address it returns cannot be used in any of the normal ways that net.Addr is used.

pkg/portfwd/client.go Outdated Show resolved Hide resolved
pkg/portfwd/client.go Outdated Show resolved Hide resolved
}

defer proxy.Close()
proxy.Run()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you log the error from Close?

defer func() {
  if err := proxy.Close(); err != nil {
    ...
  }
}()

pkg/portfwd/client.go Outdated Show resolved Hide resolved
return &TunnelServer{
Conns: make(map[string]net.Conn),
}
return &TunnelServer{}
}

func (s *TunnelServer) Start(stream api.GuestService_TunnelServer) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TunnelServer is an empty type now. Why does it need to exist? This can be a free function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree but just for usecases like before, it is useful to retain that empty. So that later if we want to support reverse forwarding we can use these with different methods / some caching etc.

Copy link
Contributor

@tamird tamird left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love this.

Delegating to a 3rd party library is a nice idea but as @nirs points out the error handling is subtle and in the case of this library, dubious. We also have to provide a largely fake interface implementation and instead of panicking (strict) we return nonsense (lax).

}
remote.HandleConn(conn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 the error handling in tcpproxy looks sloppy to me.

Also it looks like this whole file was copied from https://github.com/inetaf/tcpproxy/blame/master/tcpproxy.go :/

return nil
}
return err
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do it now? This has already been sitting for a few weeks.

}

defer proxy.Close()
proxy.Run()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then panic.

_, err := io.Copy(conn, rw)
return err
})
// Handshake message to start tunnel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where was this in the old code? I'm having trouble finding it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

GRPC port forwarding hangs when using mysql connection
4 participants