[#591] Merge upstream commit 63b6d78: "CASSGO 1 Support for Native Protocol 5"#593
[#591] Merge upstream commit 63b6d78: "CASSGO 1 Support for Native Protocol 5"#593nikagra wants to merge 4 commits intoscylladb:masterfrom
63b6d78: "CASSGO 1 Support for Native Protocol 5"#593Conversation
d894ba4 to
b5ee23f
Compare
What's the value of the v5 protocol? |
It will allow to test modern Cassandra side by side, bring proper metadata handling, which we will use in next pr. |
For proper metada handling we need this change to the wire protocol? v5 doesn't seem to be providing huge value, maybe I'm missing something. Modern cassandra isn't exciting. CQL5 - I'm not sure it's on our todo list just yet. |
We don't need it, but if we don't bring CQL5 now, later we will have to deal with rewiring metadata updates when CQL5 going to get it, which will make it way harder, out of experience with other drivers I can tell that bringing CQL5 first and then implementing metadata update feature way easier than otherway around. |
80e1137 to
18a0652
Compare
e7022a5 to
5a76087
Compare
63b6d78: "CASSGO 1 Support for Native Protocol 5"
5a76087 to
a2cb754
Compare
63b6d78: "CASSGO 1 Support for Native Protocol 5"63b6d78: "CASSGO 1 Support for Native Protocol 5"
cassandra_test.go
Outdated
| text string | ||
| ) | ||
|
|
||
| q := session.Query("SELECT * FROM gocql_query_keyspace_override_test.query_keyspace"). |
There was a problem hiding this comment.
Please remove ks from the query.
cassandra_test.go
Outdated
| id = 0 | ||
| text = "" | ||
|
|
||
| q = session.Query("SELECT * FROM gocql_query_keyspace_override_test.query_keyspace"). |
There was a problem hiding this comment.
Please remove ks from the query.
common_test.go
Outdated
|
|
||
| var ( | ||
| flagCluster = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples") | ||
| flagCluster = flag.String("cluster", "127.0.2.1,127.0.2.2,127.0.2.3", "a comma-separated list of host:port tuples") |
conn.go
Outdated
| if c.r.GetTimeout() > 0 { | ||
| c.r.SetReadDeadline(time.Time{}) | ||
| } |
There was a problem hiding this comment.
This one does not make any sense to have it here, it will be overridden on next .Read call on the reader, please drop it
There was a problem hiding this comment.
Let's do the following instead, on first io.ReadFull(r, p[:1]) in the readHeader(r io.Reader, p []byte) (head frameHeader, err error) let's handle read timeout and wrap/or return an error that signals that it is an expected situation and handle it gracefully on serve(ctx context.Context).
The problem is the following, if there is no traffic serve(ctx context.Context) is still calling readHeader, in which situation it is ok to get timeout on reading begging of the frame.
conn.go
Outdated
|
|
||
| // ConnReader is like net.Conn but also allows to set timeout duration. | ||
| type ConnReader interface { | ||
| net.Conn |
There was a problem hiding this comment.
Please remove net.Conn and add only methods that are used.
conn.go
Outdated
| func (c *connReader) LocalAddr() net.Addr { | ||
| return c.conn.LocalAddr() | ||
| } |
There was a problem hiding this comment.
It is not used, drop it
conn.go
Outdated
| return c.conn.RemoteAddr() | ||
| } | ||
|
|
||
| func (c *connReader) SetDeadline(t time.Time) error { |
There was a problem hiding this comment.
It is not used, drop it
conn.go
Outdated
| func (c *connReader) SetReadDeadline(t time.Time) error { | ||
| return c.conn.SetReadDeadline(t) | ||
| } |
There was a problem hiding this comment.
It should not be used, please drop it.
conn.go
Outdated
| func (c *connReader) SetWriteDeadline(t time.Time) error { | ||
| return c.conn.SetWriteDeadline(t) | ||
| } |
There was a problem hiding this comment.
SetWriteDeadline on a connReader ? Please drop it.
dkropachev
left a comment
There was a problem hiding this comment.
Please go over all the comments, make sure they are fixed.
cassandra_test.go
Outdated
| text string | ||
| ) | ||
|
|
||
| q := session.Query("SELECT * FROM gocql_query_keyspace_override_test.query_keyspace"). |
5aa5745 to
98b0063
Compare
a6eba67 to
a629414
Compare
Native Protocol 5 was introduced with the release of C* 4.0. This PR provides full support for a newer version including new format frames (segments), and new fields for QUERY, BATCH, and EXECUTE messages. Also, this PR brings changes to the Compressor interface to follow an append-like design. One more thing, it bumps Go version to the newer 1.19. Patch by Bohdan Siryk; Reviewed by João Reis, James Hartig for CASSGO-1 CASSGO-30
…y to internal package. Integration tests config changes. Reverting unwanted merge changes.
Detailed summary:
- Moving CRC functionality to internal package.
- Update test cluster hosts to match CCM-created cluster
- Update flagCluster default from 127.0.0.1 to 127.0.2.1,127.0.2.2,127.0.2.3 to align with the 3-node cluster configuration created by CCM. Fixes connection refused errors in integration tests.
- Adding validation to `ReadTimeout` value in `ClusterConfig`
- Removing `headSize` from `framer`.
- Adding `.vscode` to `.gitignore`.
…nhancements. Refactor keyspace handling in `TestQuery_SetKeyspace`
a629414 to
66b18af
Compare
There was a problem hiding this comment.
Pull request overview
Merges upstream work for Cassandra Native Protocol v5 support (including segment framing and new QUERY/BATCH/EXECUTE fields) as groundwork for Scylla’s SCYLLA_USE_METADATA_ID extension, plus a few repo-specific cleanups (CRC extraction, config validation, ignores).
Changes:
- Add Native Protocol v5 framing support (segments, CRCs), plus support for keyspace and
now_in_secondsquery/batch flags. - Update prepared-statement/result-metadata handling to support metadata IDs / metadata-changed flows.
- Refactor compression APIs to append-style methods and extract CRC utilities into
internal/crc.
Reviewed changes
Copilot reviewed 21 out of 23 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| session.go | Keyspace-aware routing-key info caching and keyspace override support for routing computations. |
| conn.go | Core v5 send/receive flow: segment encode/decode, prepared metadata IDs, keyspace/now_in_seconds plumbing. |
| frame.go | Protocol v5 framing (segments), CRC checks, new flags/fields in QUERY/BATCH/EXECUTE and result metadata parsing. |
| internal/frame/frames.go | Extends protocol constants/flags for v5 (e.g., metadata-changed, now_in_seconds). |
| compressor.go | Compressor interface redesign to append-style methods used by legacy frames and v5 segments. |
| compressor_test.go | Updates tests/benchmarks for the new compressor interface. |
| lz4/lz4.go | Updates LZ4 compressor implementation to new append-style APIs. |
| lz4/lz4_test.go | Expands LZ4 tests to cover append/grow-slice behaviors and new APIs. |
| lz4/go.mod | Nested module definition for github.com/gocql/gocql/lz4. |
| lz4/go.sum | Dependency sums for the nested lz4 module. |
| internal/crc/crc.go | New CRC helper package used for v5 segment checksums. |
| internal/crc/crc_test.go | Unit tests for CRC helpers. |
| prepared_cache.go | Adds a helper getter on the prepared LRU cache. |
| dialer/utils.go | Adjusts query-flag parsing types to match v5’s wider flag field. |
| control.go | Sets v5 as the max protocol version attempted during discovery. |
| frame_test.go | Adds tests for v5 segment framing and new EXECUTE/BATCH fields. |
| conn_test.go | Updates tests for new reader abstraction and adds segment-processing test coverage. |
| common_test.go | Adds compressor flag defaults and LZ4 option wiring for integration tests; adds a deep-equal helper. |
| cluster.go | Validates ReadTimeout is non-negative. |
| cassandra_test.go | Adds integration tests for keyspace override, now_in_seconds, large frames, and metadata-changed behavior. |
| batch_test.go | Adds integration tests for batch keyspace override and now_in_seconds. |
| scylla_test.go | Updates mocked connection construction to match new connection reader structure. |
| go.mod | Adds dependency on the nested lz4 module and a temporary replace directive. |
| go.sum | Updates dependency sums to reflect new/updated module requirements. |
| .gitignore | Ignores .vscode/. |
Comments suppressed due to low confidence (2)
session.go:799
- On prepare error, the cache entry is added under
routingKeyInfoCacheKeybut removed usingRemove(stmt), so the inflight entry will be left behind (and future callers may block or see stale errors). Remove using the same cache key you used forAdd/Get(and apply the same fix to the otherRemove(stmt)calls in this function).
info, inflight.err = conn.prepareStatement(ctx, stmt, nil, keyspace, requestTimeout)
if inflight.err != nil {
// don't cache this error
s.routingKeyInfoCache.Remove(stmt)
return nil, inflight.err
session.go:820
- If
scyllaGetTablePartitionerreturns an error, this branch returnsinflight.err(which isn't set here), likely swallowing the real error. Setinflight.err = errand return that, and also remove the correct cache key (notstmt).
partitioner, err := scyllaGetTablePartitioner(s, keyspace, table)
if err != nil {
// don't cache this error
s.routingKeyInfoCache.Remove(stmt)
return nil, inflight.err
}
| return dst[:oldDstLen+n], err | ||
| } | ||
|
|
||
| // grow grows b to guaranty space for n elements, if needed. |
There was a problem hiding this comment.
Comment typo: "guaranty" -> "guarantee".
| // grow grows b to guaranty space for n elements, if needed. | |
| // grow grows b to guarantee space for n elements, if needed. |
|
|
||
| longString := strings.Repeat("a", 500_000) | ||
|
|
||
| err := session.Query("INSERT INTO gocql_test.large_size_query (id, text_col) VALUES (?, ?)", "1", longString).Exec() |
There was a problem hiding this comment.
id column is declared as int, but the test binds it as a string ("1"). This will typically fail marshaling/binding. Bind an int value instead.
| err := session.Query("INSERT INTO gocql_test.large_size_query (id, text_col) VALUES (?, ?)", "1", longString).Exec() | |
| err := session.Query("INSERT INTO gocql_test.large_size_query (id, text_col) VALUES (?, ?)", 1, longString).Exec() |
| err := session.Query("INSERT INTO gocql_test.large_size_query (id, text_col) VALUES (?, ?)", "1", str).Exec() | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
|
|
||
| var result string | ||
| err = session.Query("SELECT text_col FROM gocql_test.large_size_query").Scan(&result) |
There was a problem hiding this comment.
This test creates gocql_test.compression_now_worth_it but then inserts/selects from gocql_test.large_size_query. That makes the test order-dependent and likely to fail when run in isolation. Use the table created in this test for the insert/select statements.
| err := session.Query("INSERT INTO gocql_test.large_size_query (id, text_col) VALUES (?, ?)", "1", str).Exec() | |
| if err != nil { | |
| t.Fatal(err) | |
| } | |
| var result string | |
| err = session.Query("SELECT text_col FROM gocql_test.large_size_query").Scan(&result) | |
| err := session.Query("INSERT INTO gocql_test.compression_now_worth_it (id, text_col) VALUES (?, ?)", 1, str).Exec() | |
| if err != nil { | |
| t.Fatal(err) | |
| } | |
| var result string | |
| err = session.Query("SELECT text_col FROM gocql_test.compression_now_worth_it WHERE id = ?", 1).Scan(&result) |
| // Self-contained flag | ||
| selfContained := (headerBuf[4] & 0b100) != 0 | ||
|
|
||
| compressedPayload := make([]byte, compressedLen) |
There was a problem hiding this comment.
readCompressedSegment does make([]byte, compressedLen) where compressedLen is a uint32, which won’t compile (slice length must be int). Cast to int (with bounds checking if needed) before allocating/reading.
| compressedPayload := make([]byte, compressedLen) | |
| // Ensure compressedLen fits into an int before allocating the slice | |
| if compressedLen > uint32(^uint(0)>>1) { | |
| return nil, false, fmt.Errorf("gocql: compressed frame payload too large: %d", compressedLen) | |
| } | |
| compressedPayload := make([]byte, int(compressedLen)) |
| func (s SnappyCompressor) AppendCompressed(dst, src []byte) ([]byte, error) { | ||
| panic("SnappyCompressor.AppendCompressed is not supported") | ||
| } | ||
|
|
||
| func (s SnappyCompressor) Decode(data []byte) ([]byte, error) { | ||
| return s2.Decode(nil, data) | ||
| func (s SnappyCompressor) AppendDecompressed(dst, src []byte, decompressedLength uint32) ([]byte, error) { | ||
| panic("SnappyCompressor.AppendDecompressed is not supported") |
There was a problem hiding this comment.
SnappyCompressor.AppendCompressed / AppendDecompressed currently panic, but protocol v5 segment encoding/decoding (newCompressedSegment / readCompressedSegment) calls these methods when compression is negotiated. This can crash the driver at runtime when using snappy with proto v5; implement these methods for snappy or fail negotiation/validation with a returned error instead of panicking.
| queryAfterTableAltering2.conn = conn | ||
| iter = queryAfterTableAltering2.Iter() | ||
| handleRows(iter) | ||
| err = iter.Close() |
There was a problem hiding this comment.
handleRows(iter) already calls iter.Close() internally, but the test closes the same iterator again right after. Double-closing can hide real failures or introduce unexpected errors; either let handleRows return the close error, or remove the extra iter.Close() here.
| err = iter.Close() |
| // Metadata_changed because the table is not being changed. | ||
| queryAfterTableAltering3 := session.Query(selectStmt).WithContext(ctx) | ||
| queryAfterTableAltering3.conn = conn | ||
| iter = queryAfterTableAltering2.Iter() |
There was a problem hiding this comment.
This block creates queryAfterTableAltering3 but then calls queryAfterTableAltering2.Iter() instead, so it’s not actually testing the intended "no metadata_changed" follow-up execution. Use the new query variable when creating the iterator.
| iter = queryAfterTableAltering2.Iter() | |
| iter = queryAfterTableAltering3.Iter() |
| keyspace = s.cfg.Keyspace | ||
| } | ||
|
|
||
| routingKeyInfoCacheKey := keyspace + stmt |
There was a problem hiding this comment.
routingKeyInfoCacheKey := keyspace + stmt can produce collisions (e.g. keyspace="ab", stmt="c" vs keyspace="a", stmt="bc"), which can return incorrect routing info from the cache. Use an unambiguous cache key (e.g., include a delimiter that cannot occur in keyspace, or use a structured key).
| routingKeyInfoCacheKey := keyspace + stmt | |
| routingKeyInfoCacheKey := keyspace + "\x00" + stmt |
| func (f *framer) parseResultPrepared() frame { | ||
| frame := &resultPreparedFrame{ | ||
| FrameHeader: *f.header, | ||
| preparedID: f.readShortBytes(), | ||
| reqMeta: f.parsePreparedMetadata(), | ||
| } | ||
|
|
||
| if f.proto > protoVersion4 { | ||
| frame.resultMetadataID = copyBytes(f.readShortBytes()) | ||
| } | ||
|
|
||
| frame.reqMeta = f.parsePreparedMetadata() | ||
| frame.respMeta = f.parseResultMetadata() | ||
|
|
There was a problem hiding this comment.
parseResultPrepared parses reqMeta twice (reqMeta: f.parsePreparedMetadata() in the struct literal, then frame.reqMeta = f.parsePreparedMetadata() again). This will advance the framer cursor too far and corrupt parsing of the PREPARED result. Adjust the parsing order to match the protocol spec (prepared_id, optional result_metadata_id, then the correct metadata sections exactly once each).
|
|
||
| uncompressedLen := len(uncompressedPayload) | ||
| if uncompressedLen > maxSegmentPayloadSize { | ||
| return nil, fmt.Errorf("gocql: payload length (%d) exceeds maximum size of %d", uncompressedPayload, maxSegmentPayloadSize) |
There was a problem hiding this comment.
In newCompressedSegment, the error message uses uncompressedPayload with %d instead of uncompressedLen, so the formatting will be wrong (%!d([]uint8=...)). Use the length variable in the error message.
| return nil, fmt.Errorf("gocql: payload length (%d) exceeds maximum size of %d", uncompressedPayload, maxSegmentPayloadSize) | |
| return nil, fmt.Errorf("gocql: payload length (%d) exceeds maximum size of %d", uncompressedLen, maxSegmentPayloadSize) |
Merging upstream commit 63b6d78:
These changes introduce functionality necessary to implement
SCYLLA_USE_METADATA_IDprotocol extension (see #527).Extra changes:
headSizefromframer.ReadTimeoutvalue inClusterConfig.vscodeto.gitignore.