Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport/grpchttp2: add http2.Framer bridge #7453

Merged
merged 22 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions internal/transport/grpchttp2/framer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import "golang.org/x/net/http2/hpack"

const initHeaderTableSize = 4096 // Default HTTP/2 header table size.
easwars marked this conversation as resolved.
Show resolved Hide resolved

// FrameType represents the type of an HTTP/2 Frame.
// See [Frame Type].
//
Expand Down Expand Up @@ -55,6 +57,12 @@
FlagContinuationEndHeaders Flag = 0x4
)

// IsSet returns a boolean indicating whether the passed flag is set on this
// flag instance.
func (f Flag) IsSet(flag Flag) bool {
arvindbr8 marked this conversation as resolved.
Show resolved Hide resolved
return f&flag != 0

Check warning on line 63 in internal/transport/grpchttp2/framer.go

View check run for this annotation

Codecov / codecov/patch

internal/transport/grpchttp2/framer.go#L62-L63

Added lines #L62 - L63 were not covered by tests
}

// Setting represents the id and value pair of an HTTP/2 setting.
// See [Setting Format].
//
Expand Down Expand Up @@ -105,6 +113,7 @@
//
// Each concrete Frame type defined below implements the Frame interface.
type Frame interface {
// Header returns the HTTP/2 9 byte header from the current Frame.
Header() *FrameHeader
// Free frees the underlying buffer if present so it can be reused by the
// framer.
arvindbr8 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -258,6 +267,9 @@
return f.hdr
}

// Free is a no-op for WindowUpdateFrame.
func (f *WindowUpdateFrame) Free() {}

Check warning on line 271 in internal/transport/grpchttp2/framer.go

View check run for this annotation

Codecov / codecov/patch

internal/transport/grpchttp2/framer.go#L271

Added line #L271 was not covered by tests
easwars marked this conversation as resolved.
Show resolved Hide resolved

// ContinuationFrame is the representation of a [CONTINUATION Frame]. The
// CONTINUATION frame is used to continue a sequence of header block fragments.
//
Expand Down Expand Up @@ -302,6 +314,26 @@
// Free is a no-op for MetaHeadersFrame.
func (f *MetaHeadersFrame) Free() {}

// UnknownFrame is a struct that is returned when the framer encounters an
// unsupported frame.
type UnknownFrame struct {
hdr *FrameHeader
Payload []byte
free func()
}

// Header returns the 9 byte HTTP/2 header for this frame.
func (f *UnknownFrame) Header() *FrameHeader {
return f.hdr

Check warning on line 327 in internal/transport/grpchttp2/framer.go

View check run for this annotation

Codecov / codecov/patch

internal/transport/grpchttp2/framer.go#L326-L327

Added lines #L326 - L327 were not covered by tests
}

// Free frees the underlying data in the frame.
func (f *UnknownFrame) Free() {
if f.free != nil {
f.free()

Check warning on line 333 in internal/transport/grpchttp2/framer.go

View check run for this annotation

Codecov / codecov/patch

internal/transport/grpchttp2/framer.go#L331-L333

Added lines #L331 - L333 were not covered by tests
}
}

// Framer encapsulates the functionality to read and write HTTP/2 frames.
type Framer interface {
// ReadFrame returns grpchttp2.Frame. It is the caller's responsibility to
Expand Down
218 changes: 218 additions & 0 deletions internal/transport/grpchttp2/http2bridge.go
easwars marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package grpchttp2

import (
"io"

"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/mem"
)

// FramerBridge is a struct that works as an adapter for the net/x/http2
// Framer implementation to be able to work with the grpchttp2.Framer interface.
// This type exists to give an opt-out feature for the new framer and it is
// eventually going to be removed.
arvindbr8 marked this conversation as resolved.
Show resolved Hide resolved
type FramerBridge struct {
framer *http2.Framer // the underlying http2.Framer implementation to perform reads and writes.
pool mem.BufferPool // a pool to reuse buffers when reading.
}

// NewFramerBridge creates a new framer by taking a writer and a reader,
// alongside the maxHeaderListSize for the maximum size of the headers the
// receiver is willing to accept from its peer. The underlying framer uses the
// SetReuseFrames feature to avoid extra allocations.
arvindbr8 marked this conversation as resolved.
Show resolved Hide resolved
func NewFramerBridge(w io.Writer, r io.Reader, maxHeaderListSize uint32, pool mem.BufferPool) *FramerBridge {
fr := http2.NewFramer(w, r)
fr.SetReuseFrames()
fr.MaxHeaderListSize = maxHeaderListSize
fr.ReadMetaHeaders = hpack.NewDecoder(initHeaderTableSize, nil)

if pool == nil {
pool = mem.DefaultBufferPool()
arvindbr8 marked this conversation as resolved.
Show resolved Hide resolved
}

return &FramerBridge{
framer: fr,
pool: pool,
}
}

// ReadFrame reads a frame from the underlying http2.Framer and returns a
// Frame defined in the grpchttp2 package.
arvindbr8 marked this conversation as resolved.
Show resolved Hide resolved
func (fr *FramerBridge) ReadFrame() (Frame, error) {
f, err := fr.framer.ReadFrame()
if err != nil {
return nil, err

Check warning on line 63 in internal/transport/grpchttp2/http2bridge.go

View check run for this annotation

Codecov / codecov/patch

internal/transport/grpchttp2/http2bridge.go#L63

Added line #L63 was not covered by tests
}

h := f.Header()
easwars marked this conversation as resolved.
Show resolved Hide resolved
hdr := &FrameHeader{
Size: h.Length,
Type: FrameType(h.Type),
Flags: Flag(h.Flags),
StreamID: h.StreamID,
}

switch f := f.(type) {
case *http2.DataFrame:
buf := fr.pool.Get(int(hdr.Size))
copy(buf, f.Data())
df := &DataFrame{
hdr: hdr,
Data: buf,
}
df.free = func() { fr.pool.Put(buf) }
return df, nil
case *http2.RSTStreamFrame:
return &RSTStreamFrame{
hdr: hdr,
Code: ErrCode(f.ErrCode),
}, nil
case *http2.SettingsFrame:
buf := make([]Setting, 0, f.NumSettings())
f.ForeachSetting(func(s http2.Setting) error {
buf = append(buf, Setting{
ID: SettingID(s.ID),
Value: s.Val,
})
return nil
})
return &SettingsFrame{
hdr: hdr,
Settings: buf,
}, nil
case *http2.PingFrame:
buf := fr.pool.Get(int(hdr.Size))
copy(buf, f.Data[:])
pf := &PingFrame{
hdr: hdr,
Data: buf,
}
pf.free = func() { fr.pool.Put(buf) }
return pf, nil
arvindbr8 marked this conversation as resolved.
Show resolved Hide resolved
case *http2.GoAwayFrame:
// Size of the frame minus the code and lastStreamID
buf := fr.pool.Get(int(hdr.Size) - 8)
copy(buf, f.DebugData())
gf := &GoAwayFrame{
hdr: hdr,
LastStreamID: f.LastStreamID,
Code: ErrCode(f.ErrCode),
DebugData: buf,
}
gf.free = func() { fr.pool.Put(buf) }
return gf, nil
arvindbr8 marked this conversation as resolved.
Show resolved Hide resolved
case *http2.WindowUpdateFrame:
return &WindowUpdateFrame{
hdr: hdr,
Inc: f.Increment,
}, nil
case *http2.MetaHeadersFrame:
return &MetaHeadersFrame{
hdr: hdr,
Fields: f.Fields,
}, nil
default:
buf := fr.pool.Get(int(hdr.Size))
arvindbr8 marked this conversation as resolved.
Show resolved Hide resolved
huf := f.(*http2.UnknownFrame)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, what's a huf? lol

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to type uf lol.

copy(buf, huf.Payload())
uf := &UnknownFrame{
hdr: hdr,
Payload: buf,

Check warning on line 139 in internal/transport/grpchttp2/http2bridge.go

View check run for this annotation

Codecov / codecov/patch

internal/transport/grpchttp2/http2bridge.go#L133-L139

Added lines #L133 - L139 were not covered by tests
}
uf.free = func() { fr.pool.Put(buf) }
return uf, nil

Check warning on line 142 in internal/transport/grpchttp2/http2bridge.go

View check run for this annotation

Codecov / codecov/patch

internal/transport/grpchttp2/http2bridge.go#L141-L142

Added lines #L141 - L142 were not covered by tests
arvindbr8 marked this conversation as resolved.
Show resolved Hide resolved
}
}

// WriteData writes a DATA Frame into the underlying writer.
func (fr *FramerBridge) WriteData(streamID uint32, endStream bool, data ...[]byte) error {
var buf []byte
arvindbr8 marked this conversation as resolved.
Show resolved Hide resolved
if len(data) != 1 {
tl := 0
for _, s := range data {
tl += len(s)
}

easwars marked this conversation as resolved.
Show resolved Hide resolved
buf = fr.pool.Get(tl)[:0]
defer fr.pool.Put(buf)
for _, s := range data {
buf = append(buf, s...)
}
} else {
buf = data[0]

Check warning on line 161 in internal/transport/grpchttp2/http2bridge.go

View check run for this annotation

Codecov / codecov/patch

internal/transport/grpchttp2/http2bridge.go#L160-L161

Added lines #L160 - L161 were not covered by tests
}

return fr.framer.WriteData(streamID, endStream, buf)
}

// WriteHeaders writes a Headers Frame into the underlying writer.
func (fr *FramerBridge) WriteHeaders(streamID uint32, endStream, endHeaders bool, headerBlock []byte) error {
return fr.framer.WriteHeaders(http2.HeadersFrameParam{
StreamID: streamID,
EndStream: endStream,
EndHeaders: endHeaders,
BlockFragment: headerBlock,
})
}

// WriteRSTStream writes a RSTStream Frame into the underlying writer.
func (fr *FramerBridge) WriteRSTStream(streamID uint32, code ErrCode) error {
return fr.framer.WriteRSTStream(streamID, http2.ErrCode(code))
}

// WriteSettings writes a Settings Frame into the underlying writer.
func (fr *FramerBridge) WriteSettings(settings ...Setting) error {
ss := make([]http2.Setting, 0, len(settings))
for _, s := range settings {
ss = append(ss, http2.Setting{
ID: http2.SettingID(s.ID),
Val: s.Value,
})
}

return fr.framer.WriteSettings(ss...)
}

// WriteSettingsAck writes a Settings Frame with the Ack flag set.
func (fr *FramerBridge) WriteSettingsAck() error {
return fr.framer.WriteSettingsAck()
}

// WritePing writes a Ping frame to the underlying writer.
func (fr *FramerBridge) WritePing(ack bool, data [8]byte) error {
arvindbr8 marked this conversation as resolved.
Show resolved Hide resolved
return fr.framer.WritePing(ack, data)
}

// WriteGoAway writes a GoAway Frame to the unerlying writer.
func (fr *FramerBridge) WriteGoAway(maxStreamID uint32, code ErrCode, debugData []byte) error {
return fr.framer.WriteGoAway(maxStreamID, http2.ErrCode(code), debugData)
}

// WriteWindowUpdate writes a WindowUpdate Frame into the underlying writer.
func (fr *FramerBridge) WriteWindowUpdate(streamID, inc uint32) error {
return fr.framer.WriteWindowUpdate(streamID, inc)
}

// WriteContinuation writes a Continuation Frame into the underlying writer.
func (fr *FramerBridge) WriteContinuation(streamID uint32, endHeaders bool, headerBlock []byte) error {
return fr.framer.WriteContinuation(streamID, endHeaders, headerBlock)
}
Loading
Loading