From 76a4945b0159c2ff2a637f68ead2f2cfaffb4604 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveirinha?= Date: Sat, 13 Jan 2024 15:51:28 +0000 Subject: [PATCH] http2: add support to configure transport flow control values Adds to the transport configuration the capability to configure the maximum flow control values instead of using default ones. For applications that use a lot of client connections, being able to configure these values allows for better control on memory usage, particulary when connections are long-lived. related: golang/go#20448 --- http2/transport.go | 102 ++++++++++++++++++++++++++++++---------- http2/transport_test.go | 2 +- 2 files changed, 78 insertions(+), 26 deletions(-) diff --git a/http2/transport.go b/http2/transport.go index df578b86c..b9be1e0f8 100644 --- a/http2/transport.go +++ b/http2/transport.go @@ -39,14 +39,17 @@ import ( ) const ( - // transportDefaultConnFlow is how many connection-level flow control + // maxWriteFrameSize is the maximum possible frame size used to write to server. + maxWriteFrameSize = 512 << 10 + + // defaultTransportDefaultConnFlow is how many connection-level flow control // tokens we give the server at start-up, past the default 64k. - transportDefaultConnFlow = 1 << 30 + defaultTransportDefaultConnFlow = 1 << 30 - // transportDefaultStreamFlow is how many stream-level flow + // defaultTransportDefaultStreamFlow is how many stream-level flow // control tokens we announce to the peer, and how many bytes // we buffer per stream. - transportDefaultStreamFlow = 4 << 20 + defaultTransportDefaultStreamFlow = 4 << 20 defaultUserAgent = "Go-http-client/2.0" @@ -124,6 +127,21 @@ type Transport struct { // Values are bounded in the range 16k to 16M. MaxReadFrameSize uint32 + // MaxWriteFrameSize is the maximum frame size that we can a client + // connection can send to a server, even if server advertises a higher value. + // If 0, then a default value is used. + MaxWriteFrameSize uint32 + + // MaxDownloadBufferPerConnection is the maximum per connection buffer size, + // used for receiving data from the server. + // If 0, then a default value is used. + MaxDownloadBufferPerConnection uint32 + + // MaxDownloadBufferPerStream is the maximum buffer to use for inflow data sent + // by the server. + // If 0, then a default value is used. + MaxDownloadBufferPerStream uint32 + // MaxDecoderHeaderTableSize optionally specifies the http2 // SETTINGS_HEADER_TABLE_SIZE to send in the initial settings frame. It // informs the remote endpoint of the maximum size of the header compression @@ -304,6 +322,9 @@ type ClientConn struct { idleTimeout time.Duration // or 0 for never idleTimer *time.Timer + maxWriteFrameSize uint32 + maxDownloadBufferPerStream uint32 + mu sync.Mutex // guards following cond *sync.Cond // hold mu; broadcast on flow/closed changes flow outflow // our conn-level flow control quota (cs.outflow is per stream) @@ -731,25 +752,55 @@ func (t *Transport) maxEncoderHeaderTableSize() uint32 { return initialHeaderTableSize } +func (t *Transport) maxDownloadBufferPerConnection() uint32 { + maxWindow := uint32((2 << 31) - 1 - initialWindowSize) + + if v := t.MaxDownloadBufferPerConnection; v >= initialWindowSize { + if v > maxWindow { + return maxWindow + } else { + return v + } + } + + return defaultTransportDefaultConnFlow +} + +func (t *Transport) maxDownloadBufferPerStream() uint32 { + if v := t.MaxDownloadBufferPerStream; v > 0 { + return v + } + return defaultTransportDefaultStreamFlow +} + +func (t *Transport) maxWriteFrameSize() uint32 { + if v := t.MaxWriteFrameSize; v > 0 && v <= maxWriteFrameSize { + return v + } + return maxWriteFrameSize +} + func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) { return t.newClientConn(c, t.disableKeepAlives()) } func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) { cc := &ClientConn{ - t: t, - tconn: c, - readerDone: make(chan struct{}), - nextStreamID: 1, - maxFrameSize: 16 << 10, // spec default - initialWindowSize: 65535, // spec default - maxConcurrentStreams: initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings. - peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead. - streams: make(map[uint32]*clientStream), - singleUse: singleUse, - wantSettingsAck: true, - pings: make(map[[8]byte]chan struct{}), - reqHeaderMu: make(chan struct{}, 1), + t: t, + tconn: c, + readerDone: make(chan struct{}), + nextStreamID: 1, + maxFrameSize: 16 << 10, // spec default + initialWindowSize: 65535, // spec default + maxConcurrentStreams: initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings. + peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead. + maxWriteFrameSize: t.maxWriteFrameSize(), + maxDownloadBufferPerStream: t.maxDownloadBufferPerStream(), + streams: make(map[uint32]*clientStream), + singleUse: singleUse, + wantSettingsAck: true, + pings: make(map[[8]byte]chan struct{}), + reqHeaderMu: make(chan struct{}, 1), } if d := t.idleConnTimeout(); d != 0 { cc.idleTimeout = d @@ -796,7 +847,7 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro initialSettings := []Setting{ {ID: SettingEnablePush, Val: 0}, - {ID: SettingInitialWindowSize, Val: transportDefaultStreamFlow}, + {ID: SettingInitialWindowSize, Val: t.maxDownloadBufferPerStream()}, } if max := t.maxFrameReadSize(); max != 0 { initialSettings = append(initialSettings, Setting{ID: SettingMaxFrameSize, Val: max}) @@ -810,8 +861,8 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro cc.bw.Write(clientPreface) cc.fr.WriteSettings(initialSettings...) - cc.fr.WriteWindowUpdate(0, transportDefaultConnFlow) - cc.inflow.init(transportDefaultConnFlow + initialWindowSize) + cc.fr.WriteWindowUpdate(0, t.maxDownloadBufferPerConnection()) + cc.inflow.init(int32(t.maxDownloadBufferPerConnection()) + initialWindowSize) cc.bw.Flush() if cc.werr != nil { cc.Close() @@ -1660,12 +1711,12 @@ var ( // outgoing request bodies to read/write to/from. // // It returns max(1, min(peer's advertised max frame size, -// Request.ContentLength+1, 512KB)). +// Request.ContentLength+1, Transport.MaxWriteFrameSize)). func (cs *clientStream) frameScratchBufferLen(maxFrameSize int) int { - const max = 512 << 10 + var maxSize = int64(cs.cc.maxWriteFrameSize) n := int64(maxFrameSize) - if n > max { - n = max + if n > maxSize { + n = maxSize } if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n { // Add an extra byte past the declared content-length to @@ -2120,7 +2171,8 @@ type resAndError struct { func (cc *ClientConn) addStreamLocked(cs *clientStream) { cs.flow.add(int32(cc.initialWindowSize)) cs.flow.setConnFlow(&cc.flow) - cs.inflow.init(transportDefaultStreamFlow) + // no need to truncate since max is maxWriteFrameSize + cs.inflow.init(int32(cc.maxDownloadBufferPerStream)) cs.ID = cc.nextStreamID cc.nextStreamID += 2 cc.streams[cs.ID] = cs diff --git a/http2/transport_test.go b/http2/transport_test.go index a81131f29..6f5eab748 100644 --- a/http2/transport_test.go +++ b/http2/transport_test.go @@ -2867,7 +2867,7 @@ func TestTransportFlowControl(t *testing.T) { } read += int64(n) - const max = transportDefaultStreamFlow + const max = defaultTransportDefaultStreamFlow if w := atomic.LoadInt64(&wrote); -max > read-w || read-w > max { t.Fatalf("Too much data inflight: server wrote %v bytes but client only received %v", w, read) }