Skip to content

Conversation

ryan-gang
Copy link
Contributor

@ryan-gang ryan-gang commented Jul 6, 2025

Summary

  • Renamed kafka_broker package to kafka_client to better reflect its actual functionality
  • Updated Broker struct to Client as it represents a client connection to a Kafka broker, not the
    broker itself
  • Enhanced request handling interface with additional methods for API type, version, and correlation
    ID
  • Consolidated request handling by unifying SendAndReceive method

Changes

  • Package rename: protocol/kafka_brokerprotocol/kafka_client
  • Struct rename: Broker → Client with updated constructor NewClient()
  • Interface enhancement: Added GetApiType(), GetApiVersion(), GetCorrelationId() methods to RequestI
  • Code consistency: Updated all stage files to use the new client-based naming
  • Improved logging: Enhanced request logging with API type, version, and correlation ID information

Summary by CodeRabbit

  • New Features

    • Added flexible exclusion options for response assertions, allowing selective skipping of fields at multiple levels for API versions, topic partitions, and fetch responses.
    • Introduced new interfaces for request encoding and assertion execution.
  • Refactor

    • Migrated Kafka connection logic from a broker abstraction to a client abstraction, simplifying request sending and response handling.
    • Centralized and enhanced hex dump and logging utilities in a new shared utility package.
    • Streamlined assertion APIs to support method chaining and deferred error handling.
  • Bug Fixes

    • Improved test output verification for API versions and topic partitions to provide more granular and accurate validation.
  • Documentation

    • Updated comments and log output formatting for improved clarity in test and debug output.
  • Chores

    • Removed obsolete encoding functions and updated internal APIs to use new request encoding methods.

ryan-gang added 18 commits July 6, 2025 16:03
- Introduced GetFormattedHexdump function to format byte data into a structured hexdump for logging purposes, including ASCII representation.
- Updated ApiVersionsRequest to include an Encode method for message serialization, improving the encoding process.
- Added SendAndReceiveNew method to the Broker struct for processing requests using the new RequestI interface.
- Integrated ApiVersionsRequest encoding and logging for improved request management.
- Introduced builder.RequestI interface to standardize request encoding across different request types.
…mp in logging

- Updated logging statements across multiple stages to use protocol.GetFormattedHexdump instead of the now-removed GetFormattedHexdump function, ensuring consistent hexdump formatting for sent and received messages.
- Removed the GetFormattedHexdump function from utils.go as it is no longer needed.
- Introduced a new Broker struct for managing Kafka broker connections, including methods for connecting, sending, and receiving messages.
- Added retry logic for connection attempts to enhance reliability.
- Implemented methods for sending requests and receiving responses, including handling timeouts and raw data reading.
- Created a Response struct to encapsulate response data for better management.
- Replaced instances of protocol.NewBroker with kafka_broker.NewBroker in various stage files to standardize broker connection handling.
- Updated the corresponding defer function signatures to reflect the new broker type, ensuring consistency in resource management.
…function

- Removed the unused import of the protocol package in api_versions_request.go for better clarity.
- Updated the Encode method in ApiVersionsRequestBody to use the correct encoder import.
- Deleted the DescribeTopicPartitions function from describe_topic_partitions.go to streamline the codebase and eliminate redundancy.
- Changed encoder import to a more descriptive alias for clarity.
- Updated Encode methods in Partition, Topic, and ForgottenTopic to use the new encoder import.
- Reintroduced the FetchRequest struct and added an Encode method for message serialization, enhancing request handling.
- Updated SendAndReceiveNew method to handle FetchRequest in addition to ApiVersionsRequest, improving request processing capabilities.
- Integrated logging for FetchRequest encoding and response handling, ensuring consistent logging for both request types.
- Utilized protocol.GetFormattedHexdump for logging hexdumps of sent and received messages, maintaining uniformity in output.
- Removed unused protocol import and logging for FetchRequest encoding in multiple stage files.
- Updated to use the new SendAndReceiveNew method for sending requests, enhancing consistency across stages.
- Simplified response handling by eliminating redundant logging statements, focusing on essential operations.
- Changed encoder import to a more descriptive alias for clarity in fetch_request.go and fetch.go.
- Updated Encode methods in Partition, Topic, ForgottenTopic, and FetchRequestBody to use the new encoder import.
- Removed unused EncodeFetchRequest function to streamline the codebase.
…quest handling

- Eliminated the unused protocol import in stage files to enhance clarity.
- Updated to use the new SendAndReceiveNew method for sending ApiVersions requests, improving consistency across stages.
- Removed redundant logging statements related to request encoding and response hexdumps, focusing on essential operations.
- Changed encoder import to a more descriptive alias for clarity.
- Streamlined the Encode method in DescribeTopicPartitionsRequest to directly return encoded message bytes.
- Removed the unused EncodeDescribeTopicPartitionsRequest function to enhance code clarity and reduce redundancy.
…stages

- Removed unused protocol import in stage files to enhance clarity.
- Updated to use the new SendAndReceiveNew method for sending DescribeTopicPartitions requests, ensuring consistency across multiple stages.
- Eliminated redundant logging statements related to request encoding and response hexdumps, focusing on essential operations.
- Eliminated the SendAndReceive method from the Broker struct to streamline the codebase.
- Encouraged the use of the SendAndReceiveNew method for improved request handling and consistency.
- Introduced logging to indicate the current request number being sent in the testSequentialRequests function, enhancing traceability during request processing.
…SendAndReceive

- Updated multiple stage files to replace the SendAndReceiveNew method with SendAndReceive for consistency in request handling.
- This change enhances code clarity and maintains uniformity across various test cases.
- Enhanced the SendAndReceive method to include logging for DescribeTopicPartitions requests, improving traceability during request processing.
- Added hexdump logging for both sent and received DescribeTopicPartitions messages, ensuring consistent logging practices across request types.
- Updated logging statements to clarify the sending of ApiVersions requests, enhancing readability and traceability during request processing.
- Ensured consistent logging format across multiple requests by explicitly stating the request type being sent.
Copy link

coderabbitai bot commented Jul 6, 2025

Warning

Rate limit exceeded

@ryan-gang has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 20 minutes and 14 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 658a8c8 and a9aa498.

📒 Files selected for processing (4)
  • internal/assertions/apiversions_response_assertion.go (1 hunks)
  • internal/stage_5.go (3 hunks)
  • internal/stage_dtp1.go (5 hunks)
  • internal/stage_f1.go (5 hunks)

Walkthrough

This set of changes refactors the Kafka protocol test suite to use a new kafka_client.Client abstraction instead of the previous protocol.Broker, updates all request/response assertion logic to a stateful, exclusion-based API, and centralizes hex dump and logging utilities in a new utils package. Several protocol API request types now provide self-contained Encode methods. Test fixtures and scripts are updated to align with these refactorings, and builder interfaces are introduced for extensibility.

Changes

Files/Paths Change Summary
internal/assertions/apiversions_response_assertion.go, .../describe_topic_partitions_response_assertion.go, .../fetch_response_assertion.go, .../response_header_assertion.go Assertion structs refactored to support exclusion-based field skipping, stateful error handling, logger embedding, and new Run methods.
internal/assertions/utils.go Contains helper refactored to use slices.Contains.
internal/stage_2.go, internal/stage_3.go, internal/stage_4.go, internal/stage_5.go, ... All usages of protocol.Broker replaced with kafka_client.Client; request sending, logging, and assertion logic updated accordingly.
internal/stage_c1.go, internal/stage_c2.go Client handling updated to use new client abstraction; minor logging and loop changes in concurrency test.
internal/stage_dtp1.go, internal/stage_dtp2.go, internal/stage_dtp3.go, internal/stage_dtp4.go, internal/stage_dtp5.go Kafka client and assertion usage updated to new APIs; manual encoding and hexdump logging removed.
internal/stage_f1.go, internal/stage_f2.go, internal/stage_f3.go, internal/stage_f4.go, internal/stage_f5.go, internal/stage_f6.go Fetch request/response handling updated to use new client and assertion APIs, logging simplified.
internal/utils.go Removed GetFormattedHexdump function.
protocol/api/api_versions.go, .../api_versions_request.go, .../describe_topic_partitions_request.go, .../fetch_request.go Protocol API request types now provide self-contained Encode and GetHeader methods; internal encoding logic updated.
protocol/api/describe_topic_partitions.go, .../fetch.go Removed legacy encoding functions and direct broker usage; moved to new client/request interfaces.
protocol/api/api_versions_response.go, .../describe_topic_partitions_response.go, .../fetch_response.go, .../record_batch.go Logging calls updated to use new utils package; protocol import replaced.
protocol/api/header.go Request header encoding refactored; new public Encode method added, logging updated.
protocol/builder/interface.go Introduced RequestI and Assertion interfaces for extensibility.
protocol/decoder/inspectable_hex_dump.go Hex dump formatting utility import updated to new utils package.
protocol/kafka_client/client.go New client abstraction replaces broker; methods updated for structured requests, robust IO, and detailed logging.
protocol/utils.go File deleted; hex/logging utilities moved to new utils package.
protocol/utils/utils.go New package: hex dump formatting, logging helpers, and API key name mapping provided.
internal/test_helpers/fixtures/base/pass, .../concurrent_stages/pass, .../describe_topic_partitions/pass Test output logs updated for new assertion and logging behaviors; more granular checks and output formatting.
local_testing/test.sh Docker image name in comments updated for consistency.

Sequence Diagram(s)

sequenceDiagram
    participant Tester
    participant Client
    participant KafkaBroker
    participant Assertion
    participant Logger

    Tester->>Client: Connect()
    Client->>KafkaBroker: Establish TCP connection
    Tester->>Client: SendAndReceive(request, Logger)
    Client->>KafkaBroker: Send encoded request
    Client->>KafkaBroker: Receive response
    Client->>Tester: Return decoded response
    Tester->>Assertion: NewAssertion(actual, expected, Logger)
    Tester->>Assertion: Run()
    Assertion->>Logger: Log assertion results
    Assertion-->>Tester: Return error or success
    Tester->>Client: Close()
    Client->>KafkaBroker: Close connection
Loading

Estimated code review effort

5 (~2 hours 30 minutes)

Poem

Oh, what a hop through the code today,
Clients replaced brokers, in a modernized way.
Assertions now skip fields with a twitch of a nose,
While hex dumps and logs in new utils repose.
With interfaces fresh and requests self-contained,
This rabbit approves—though much has been changed!
🐇✨

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@ryan-gang ryan-gang self-assigned this Jul 6, 2025
@ryan-gang ryan-gang changed the title Simplify Broker.SendAndReceive to include encoding + logging Simplify Broker.SendAndReceive to include encoding + logging Jul 6, 2025
@ryan-gang ryan-gang requested a review from andy1li July 6, 2025 11:27
@ryan-gang
Copy link
Contributor Author

@andy1li can you base your changes on top of this ?

@ryan-gang
Copy link
Contributor Author

I feel this simplifies the stage files quite nicely.

@ryan-gang
Copy link
Contributor Author

Had to move broker into a new package to alleviate cyclic import errors.

- Improved logging to include request type, version, and correlation ID for ApiVersions, DescribeTopicPartitions, and Fetch requests, enhancing traceability during request processing.
- Consolidated hexdump logging for received responses to maintain consistency across different request types.
@ryan-gang ryan-gang added enhancement New feature or request and removed enhancement New feature or request labels Jul 6, 2025
@andy1li
Copy link
Member

andy1li commented Jul 6, 2025

Nice! I'll create a new branch from add-interface-for-requests, otherwise the rebase conflicts are killing me. 💀

ryan-gang added 22 commits July 16, 2025 22:55
- Replaced manual validation of response headers and body with assertion methods for improved clarity and maintainability.
- Introduced expected response structures to streamline the validation process, ensuring accurate checks for correlation ID and API keys.
- Enhanced error handling by utilizing assertion libraries, improving the overall reliability of the test logic.
- Removed redundant assertion calls in API version response validation methods across multiple test files.
- Updated the assertion methods to improve clarity and maintainability by consolidating body and API key assertions into a single run method.
- Renamed internal assertion methods to follow a consistent naming convention, enhancing code readability.
- Modified assertions in the API version response validation to provide more detailed output, including the length of the API keys array and version constraints.
- Enhanced the clarity of test results by specifying minimum and maximum version requirements for API_VERSIONS, improving overall test reliability.
- Removed redundant calls to AssertHeader in various test functions, streamlining the assertion process.
- Updated the internal assertion method names for consistency and clarity, enhancing code readability.
- Ensured that the Run method now directly invokes the assertion logic, improving maintainability across the test suite.
- Updated the handling of excluded fields in the DescribeTopicPartitionsResponseAssertion to clarify the exclusion of "partitions" and "topics".
- Enhanced comments to provide better understanding of the exclusion behavior, improving code readability and maintainability.
- Removed outdated comments regarding exclusion logic in ApiVersionsResponseAssertion and ResponseHeaderAssertion to enhance clarity.
…sertion

- Introduced SkipTopicFields method to allow exclusion of "topics" from assertions, enhancing flexibility in response validation.
- Updated internal assertion methods for clarity, ensuring that the exclusion logic is more intuitive and maintainable.
- Removed redundant calls to AssertTopicsAndPartitions in multiple test functions, simplifying the assertion process.
- Introduced SkipTopicFields method to enhance flexibility in response validation, allowing exclusion of "topics" from assertions.
- Updated internal assertion methods for improved clarity and maintainability.
- Eliminated calls to AssertTopicsAndPartitions in multiple test functions, simplifying the assertion process.
- Continued to leverage the SkipTopicFields method for enhanced flexibility in response validation, focusing on relevant fields.
- Improved clarity and maintainability of the assertion methods across the test suite.
- Removed redundant calls to AssertBody in multiple test functions, streamlining the assertion process.
- Updated the Run method in DescribeTopicPartitionsResponseAssertion to directly invoke necessary assertions, enhancing clarity and maintainability.
- Improved overall readability of the test suite by consolidating assertion logic.
…n method

- Updated the Run method to explicitly call assertBody and assertAPIKeysArray separately, improving clarity and maintainability of the assertion flow.
- Enhanced readability by clearly delineating the sequence of assertions performed in the API versions response validation.
- Renamed AssertBody and AssertEmptyPartitions methods to assertBody and assertEmptyPartitions, respectively, to follow a consistent naming convention.
- Updated the Run method in DescribeTopicPartitionsResponseAssertion to reflect these changes, enhancing code readability and maintainability.
- Added detailed comments to clarify the behavior of exclusion fields in FetchResponseAssertion, improving code readability.
- Introduced SkipTopicFields method to allow exclusion of "topics" from assertions, enhancing flexibility in response validation.
- Updated SkipPartitionFields and SkipRecordBatchFields methods to append exclusions, ensuring consistent handling of assertion logic.
- Consolidated calls to AssertBody in multiple test functions, enhancing the clarity and maintainability of the assertion process.
- Updated internal assertion methods to ensure consistent handling of excluded fields, particularly for topics and record batches.
- Improved overall readability of the test suite by streamlining the assertion flow and reducing redundancy.
- Removed calls to AssertBody and AssertRecordBatchBytes in multiple test functions, simplifying the assertion process.
- Updated internal FetchResponseAssertion methods to improve the flow of exclusion logic, ensuring consistent handling of fields.
- Enhanced overall readability and maintainability of the test suite by consolidating assertion logic.
- Streamlined the exclusion flow by chaining Skip methods for topics, partitions, and record batches, improving code readability.
- Updated the Run method to simplify the condition for asserting record batch bytes, ensuring consistent handling of excluded fields.
- Improved overall maintainability of the FetchResponseAssertion by consolidating exclusion logic.
- Enhanced comments for ExcludeRecordBatchFields in testFetchWithSingleMessage and testFetchMultipleMessages to clarify the need for a builder in future updates.
- Improved code readability by providing context for the exclusion logic, ensuring maintainability in the assertion process.
- Updated multiple files to replace calls to protocol functions with their corresponding utils functions for better organization and maintainability.
- Ensured consistent logging and formatting across the codebase by utilizing the new utils package.
- Improved readability and reduced dependencies on the protocol package in favor of a more modular approach.
@ryan-gang ryan-gang requested a review from rohitpaulk July 21, 2025 20:18
Copy link
Member

@rohitpaulk rohitpaulk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added one round of comments for the first assertion. Also added an example PR here of how I'd simplify it: https://github.com/codecrafters-io/kafka-tester/pull/62/files

a.assertBody()
a.assertAPIKeysArray()

return a.err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We aren't chaining anymore, so there's no reason to store err as state on the struct. We should just do what we'd usually do when running multiple methods in serial and returning errors:

if err := a.assertBody(); err != nil { 
  return err 
}

if err := a.assertAPIKeysArray... {
  return err 
}


// assertBody asserts the contents of the response body
// Fields asserted by default: ErrorCode
func (a *ApiVersionsResponseAssertion) assertBody() *ApiVersionsResponseAssertion {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (a *ApiVersionsResponseAssertion) assertBody() *ApiVersionsResponseAssertion {
func (a *ApiVersionsResponseAssertion) assertBody() error {

(We don't need chaining for these assert methods anymore)

ActualValue: actualValue,
ExpectedValue: expectedValue,
logger: logger,
excludedBodyFields: []string{},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ever used? I don't see a method that edits this field.

…sion check

- Updated the error message in the assertAPIKeysArray method to reference MinVersion instead of MaxVersion, ensuring accurate reporting of API version support.
- Improved clarity in error handling for API version assertions, enhancing the debugging process.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🔭 Outside diff range comments (1)
internal/assertions/fetch_response_assertion.go (1)

348-360: Consider dynamic buffer sizing or overflow protection.

The fixed buffer size of 4096 bytes might be insufficient for encoding large record batches, potentially causing a panic or data corruption.

 func encodeRecordBatches(recordBatches []kafkaapi.RecordBatch) []byte {
 	// Given an array of RecordBatch, encodes them using the encoder.Encoder
 	// and returns the resulting bytes.
+	
+	// Calculate approximate size needed
+	estimatedSize := 0
+	for _, batch := range recordBatches {
+		// Rough estimate: header + records
+		estimatedSize += 61 + len(batch.Records)*100 // Adjust based on actual sizes
+	}
+	if estimatedSize < 4096 {
+		estimatedSize = 4096
+	}

 	encoder := encoder.Encoder{}
-	encoder.Init(make([]byte, 4096))
+	encoder.Init(make([]byte, estimatedSize))
 	for _, recordBatch := range recordBatches {
 		recordBatch.Encode(&encoder)
 	}
 	return encoder.Bytes()[:encoder.Offset()]
 }

Alternatively, check if the encoder supports dynamic growth or has overflow protection built-in.

♻️ Duplicate comments (2)
internal/stage_dtp5.go (1)

55-58: Good! Request is passed by value as suggested in previous reviews.

The SendAndReceive method correctly accepts the request by value, addressing the previous concern about unnecessary pointer usage.

protocol/kafka_client/client.go (1)

112-115: Simplify variable declarations.

Variables can be declared and initialized in a single line.

-	header := request.GetHeader()
-	apiType := utils.APIKeyToName(header.ApiKey)
-	apiVersion := header.ApiVersion
-	correlationId := header.CorrelationId
+	header := request.GetHeader()
+	apiType := utils.APIKeyToName(header.ApiKey)
+	apiVersion := header.ApiVersion
+	correlationId := header.CorrelationId

Wait, the suggested change is the same as the current code. Let me reconsider...

Actually, this can be simplified to:

 	header := request.GetHeader()
-	apiType := utils.APIKeyToName(header.ApiKey)
-	apiVersion := header.ApiVersion
-	correlationId := header.CorrelationId
-	message := request.Encode()
+	apiType := utils.APIKeyToName(header.ApiKey)
+	message := request.Encode()

-	stageLogger.Infof("Sending \"%s\" (version: %v) request (Correlation id: %v)", apiType, apiVersion, correlationId)
+	stageLogger.Infof("Sending \"%s\" (version: %v) request (Correlation id: %v)", apiType, header.ApiVersion, header.CorrelationId)
🧹 Nitpick comments (8)
local_testing/test.sh (1)

11-14: Consider dropping or parameterizing the commented-out invocations

The two docker run … make test lines are now only documentation but continue to drift every time the image tag changes. Either delete them or extract the image tag to a single variable (e.g. IMG=local-kafka-tester) so future renames touch one place.

protocol/builder/interface.go (1)

10-16: Consider implementing or removing the TODO section.

The commented-out struct suggests planned future work but has been sitting as a TODO. Consider either implementing this functionality or removing the comment to keep the code clean.

-// TODO
-//type Assertion struct {
-// actualResponse kafkaResponseI
-// expectedResponse kafkaResponseI
-//	logger *logger.Logger
-//	err    error
-//}
internal/stage_2.go (1)

53-57: Consider using unified SendAndReceive method.

This file still uses separate Send() and ReceiveRaw() calls while other stage files have been updated to use the unified SendAndReceive() method. For consistency with the refactor objectives, consider updating to:

-	err = client.Send(message)
-	if err != nil {
-		return err
-	}
-	response, err := client.ReceiveRaw()
+	response, err := client.SendAndReceive(request, stageLogger)
protocol/api/describe_topic_partitions_request.go (1)

55-74: Consider implementing the suggested interface design from past reviews.

The new DescribeTopicPartitionsRequest struct encapsulates encoding well, but the fixed 4096-byte buffer size could be insufficient for requests with many topics or large topic names.

Based on the past review comment, consider implementing an interface that only contains Header to make ApiVersion, CorrelationId, and ApiType() directly computable rather than requiring separate getter methods.

+// Consider replacing the current approach with:
+type RequestInterface interface {
+    Header() RequestHeader
+}
+
+func (r DescribeTopicPartitionsRequest) Header() RequestHeader {
+    return r.Header
+}
+
+// Then ApiType(), ApiVersion, CorrelationId become computed properties

Also consider using a dynamic buffer or larger initial capacity:

-encoder.Init(make([]byte, 4096))
+encoder.Init(make([]byte, 8192)) // or use dynamic sizing
protocol/utils/utils.go (2)

10-19: Consider returning a string or accepting an io.Writer for flexibility.

The function prints directly to stdout, which limits its usability in testing scenarios or when output needs to be captured. Consider refactoring to either return a formatted string (like GetFormattedHexdump) or accept an io.Writer parameter.

-func PrintHexdump(data []byte) {
-	fmt.Printf("Hexdump of data:\n")
+func PrintHexdump(w io.Writer, data []byte) {
+	fmt.Fprintf(w, "Hexdump of data:\n")
 	for i, b := range data {
 		if i%16 == 0 {
-			fmt.Printf("\n%04x  ", i)
+			fmt.Fprintf(w, "\n%04x  ", i)
 		}
-		fmt.Printf("%02x ", b)
+		fmt.Fprintf(w, "%02x ", b)
 	}
-	fmt.Println()
+	fmt.Fprintln(w)
}

63-94: Consider refactoring to reduce code duplication with GetFormattedHexdump.

This function shares significant logic with GetFormattedHexdump (ASCII representation, padding, hex formatting). Consider extracting the common logic into a helper function.

+func formatHexLine(data []byte, start, end int, includeOffset bool, formattedHexdump *strings.Builder) string {
+	var asciiChars strings.Builder
+	
+	if includeOffset {
+		formattedHexdump.WriteString(fmt.Sprintf("%04x | ", start))
+	}
+	
+	for i := start; i < end && i < len(data); i++ {
+		formattedHexdump.WriteString(fmt.Sprintf("%02x ", data[i]))
+		if data[i] >= 32 && data[i] <= 126 {
+			asciiChars.WriteByte(data[i])
+		} else {
+			asciiChars.WriteByte('.')
+		}
+	}
+	
+	// Pad if necessary
+	if end-start < 16 {
+		padding := 16 - (end - start)
+		for i := 0; i < padding; i++ {
+			formattedHexdump.WriteString("   ")
+		}
+	}
+	
+	return asciiChars.String()
+}

 func GetFormattedHexdumpForErrors(data []byte) string {
 	var formattedHexdump strings.Builder
-	var asciiChars strings.Builder
 
 	for i, b := range data {
 		if i%16 == 0 && i != 0 {
-			formattedHexdump.WriteString("| " + asciiChars.String() + "\n")
-			asciiChars.Reset()
+			ascii := formatHexLine(data, i-16, i, false, &formattedHexdump)
+			formattedHexdump.WriteString("| " + ascii + "\n")
+			formattedHexdump.Reset() // Reset for next line
 		}
-		formattedHexdump.WriteString(fmt.Sprintf("%02x ", b))
-
-		// Add ASCII representation
-		if b >= 32 && b <= 126 {
-			asciiChars.WriteByte(b)
-		} else {
-			asciiChars.WriteByte('.')
-		}
 	}
 
-	// Pad the last line if necessary
-	if len(data)%16 != 0 {
-		padding := 16 - (len(data) % 16)
-		for i := 0; i < padding; i++ {
-			formattedHexdump.WriteString("   ")
-		}
-	}
-
-	// Add the final ASCII representation
-	formattedHexdump.WriteString("| " + asciiChars.String())
+	// Handle the last line
+	lastLineStart := (len(data) / 16) * 16
+	ascii := formatHexLine(data, lastLineStart, len(data), false, &formattedHexdump)
+	formattedHexdump.WriteString("| " + ascii)
 
 	return formattedHexdump.String()
 }
protocol/kafka_client/client.go (1)

178-184: Remove redundant slice operation.

io.ReadFull guarantees that it reads exactly len(bodyResponse) bytes or returns an error. The slice operation on line 183 is unnecessary.

 	numBytesRead, err := io.ReadFull(c.conn, bodyResponse)

 	// Reset the read deadline
 	c.conn.SetReadDeadline(time.Time{})
-
-	bodyResponse = bodyResponse[:numBytesRead]
internal/assertions/apiversions_response_assertion.go (1)

84-86: Clarify the error message for version validation.

The error message could be clearer about what's being validated. It's checking if MinVersion > MaxVersion but the message format might be confusing.

 				if actualApiVersionKey.MinVersion > expectedApiVersionKey.MaxVersion {
-					a.err = fmt.Errorf("Expected min version %v to be < max version %v for %s", actualApiVersionKey.MinVersion, expectedApiVersionKey.MaxVersion, apiKeyNames[expectedApiVersionKey.ApiKey])
+					a.err = fmt.Errorf("Invalid version range for %s: MinVersion (%v) should be <= MaxVersion (%v)", apiKeyNames[expectedApiVersionKey.ApiKey], actualApiVersionKey.MinVersion, expectedApiVersionKey.MaxVersion)
 					return a
 				}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9429937 and 658a8c8.

📒 Files selected for processing (43)
  • internal/assertions/apiversions_response_assertion.go (1 hunks)
  • internal/assertions/describe_topic_partitions_response_assertion.go (3 hunks)
  • internal/assertions/fetch_response_assertion.go (6 hunks)
  • internal/assertions/response_header_assertion.go (1 hunks)
  • internal/assertions/utils.go (1 hunks)
  • internal/stage_2.go (5 hunks)
  • internal/stage_3.go (5 hunks)
  • internal/stage_4.go (6 hunks)
  • internal/stage_5.go (3 hunks)
  • internal/stage_c1.go (3 hunks)
  • internal/stage_c2.go (4 hunks)
  • internal/stage_dtp1.go (5 hunks)
  • internal/stage_dtp2.go (5 hunks)
  • internal/stage_dtp3.go (5 hunks)
  • internal/stage_dtp4.go (5 hunks)
  • internal/stage_dtp5.go (5 hunks)
  • internal/stage_f1.go (5 hunks)
  • internal/stage_f2.go (5 hunks)
  • internal/stage_f3.go (5 hunks)
  • internal/stage_f4.go (5 hunks)
  • internal/stage_f5.go (5 hunks)
  • internal/stage_f6.go (6 hunks)
  • internal/test_helpers/fixtures/base/pass (1 hunks)
  • internal/test_helpers/fixtures/concurrent_stages/pass (2 hunks)
  • internal/test_helpers/fixtures/describe_topic_partitions/pass (2 hunks)
  • internal/utils.go (0 hunks)
  • local_testing/test.sh (1 hunks)
  • protocol/api/api_versions.go (1 hunks)
  • protocol/api/api_versions_request.go (2 hunks)
  • protocol/api/api_versions_response.go (8 hunks)
  • protocol/api/describe_topic_partitions.go (0 hunks)
  • protocol/api/describe_topic_partitions_request.go (1 hunks)
  • protocol/api/describe_topic_partitions_response.go (16 hunks)
  • protocol/api/fetch.go (0 hunks)
  • protocol/api/fetch_request.go (1 hunks)
  • protocol/api/fetch_response.go (15 hunks)
  • protocol/api/header.go (5 hunks)
  • protocol/api/record_batch.go (15 hunks)
  • protocol/builder/interface.go (1 hunks)
  • protocol/decoder/inspectable_hex_dump.go (2 hunks)
  • protocol/kafka_client/client.go (8 hunks)
  • protocol/utils.go (0 hunks)
  • protocol/utils/utils.go (1 hunks)
🧬 Code Graph Analysis (23)
protocol/decoder/inspectable_hex_dump.go (1)
protocol/utils/utils.go (1)
  • GetFormattedHexdumpForErrors (63-94)
internal/stage_c2.go (2)
protocol/kafka_client/client.go (2)
  • Client (31-35)
  • NewClient (39-41)
protocol/utils/utils.go (1)
  • GetFormattedHexdump (21-61)
protocol/api/record_batch.go (2)
protocol/utils/utils.go (1)
  • LogWithIndentation (96-98)
protocol/errors/errors.go (2)
  • PacketDecodingError (7-10)
  • NewPacketDecodingError (16-21)
protocol/api/api_versions.go (2)
protocol/api/api_versions_request.go (1)
  • ApiVersionsRequest (24-27)
protocol/encoder/encoder.go (1)
  • Encoder (13-16)
protocol/api/api_versions_response.go (2)
protocol/utils/utils.go (1)
  • LogWithIndentation (96-98)
protocol/errors/errors.go (1)
  • PacketDecodingError (7-10)
protocol/api/api_versions_request.go (2)
protocol/encoder/encoder.go (1)
  • Encoder (13-16)
protocol/api/header.go (1)
  • RequestHeader (12-21)
internal/stage_f1.go (3)
protocol/kafka_client/client.go (2)
  • NewClient (39-41)
  • Client (31-35)
internal/assertions/response_header_assertion.go (1)
  • NewResponseHeaderAssertion (21-28)
internal/assertions/apiversions_response_assertion.go (1)
  • NewApiVersionsResponseAssertion (34-41)
protocol/builder/interface.go (1)
protocol/api/header.go (1)
  • RequestHeader (12-21)
internal/stage_dtp4.go (3)
protocol/kafka_client/client.go (2)
  • NewClient (39-41)
  • Client (31-35)
internal/assertions/response_header_assertion.go (1)
  • NewResponseHeaderAssertion (21-28)
internal/assertions/describe_topic_partitions_response_assertion.go (1)
  • NewDescribeTopicPartitionsResponseAssertion (26-35)
internal/stage_4.go (2)
protocol/kafka_client/client.go (2)
  • NewClient (39-41)
  • Client (31-35)
protocol/utils/utils.go (2)
  • GetFormattedHexdump (21-61)
  • LogWithIndentation (96-98)
protocol/api/fetch_request.go (2)
protocol/api/header.go (1)
  • RequestHeader (12-21)
protocol/encoder/encoder.go (1)
  • Encoder (13-16)
internal/stage_dtp1.go (3)
protocol/kafka_client/client.go (2)
  • NewClient (39-41)
  • Client (31-35)
internal/assertions/response_header_assertion.go (1)
  • NewResponseHeaderAssertion (21-28)
internal/assertions/apiversions_response_assertion.go (1)
  • NewApiVersionsResponseAssertion (34-41)
internal/stage_2.go (2)
protocol/kafka_client/client.go (2)
  • NewClient (39-41)
  • Client (31-35)
protocol/utils/utils.go (2)
  • GetFormattedHexdump (21-61)
  • LogWithIndentation (96-98)
internal/stage_c1.go (1)
protocol/kafka_client/client.go (2)
  • NewClient (39-41)
  • Client (31-35)
protocol/api/describe_topic_partitions_request.go (2)
protocol/api/header.go (1)
  • RequestHeader (12-21)
protocol/encoder/encoder.go (1)
  • Encoder (13-16)
internal/stage_3.go (2)
protocol/kafka_client/client.go (2)
  • NewClient (39-41)
  • Client (31-35)
protocol/utils/utils.go (2)
  • GetFormattedHexdump (21-61)
  • LogWithIndentation (96-98)
internal/assertions/response_header_assertion.go (2)
protocol/api/header.go (1)
  • ResponseHeader (36-38)
internal/assertions/utils.go (1)
  • Contains (6-8)
internal/stage_f4.go (2)
protocol/kafka_client/client.go (2)
  • NewClient (39-41)
  • Client (31-35)
internal/assertions/response_header_assertion.go (1)
  • NewResponseHeaderAssertion (21-28)
protocol/kafka_client/client.go (3)
internal/kafka_executable/kafka_executable.go (1)
  • KafkaExecutable (14-18)
protocol/builder/interface.go (1)
  • RequestI (5-8)
protocol/utils/utils.go (2)
  • APIKeyToName (104-119)
  • GetFormattedHexdump (21-61)
internal/assertions/apiversions_response_assertion.go (2)
protocol/api/api_versions_response.go (1)
  • ApiVersionsResponse (63-72)
internal/assertions/utils.go (1)
  • Contains (6-8)
internal/assertions/describe_topic_partitions_response_assertion.go (3)
protocol/api/describe_topic_partitions_response.go (2)
  • DescribeTopicPartitionsResponse (15-19)
  • DescribeTopicPartitionsResponsePartition (172-182)
internal/assertions/utils.go (1)
  • Contains (6-8)
protocol/utils/utils.go (1)
  • SuccessLogWithIndentation (100-102)
protocol/api/describe_topic_partitions_response.go (2)
protocol/utils/utils.go (1)
  • LogWithIndentation (96-98)
protocol/errors/errors.go (1)
  • PacketDecodingError (7-10)
internal/assertions/fetch_response_assertion.go (4)
protocol/api/fetch_response.go (2)
  • FetchResponse (13-19)
  • PartitionResponse (154-163)
internal/assertions/utils.go (1)
  • Contains (6-8)
protocol/utils/utils.go (1)
  • SuccessLogWithIndentation (100-102)
protocol/api/record_batch.go (2)
  • RecordBatch (14-28)
  • Record (203-211)
💤 Files with no reviewable changes (4)
  • internal/utils.go
  • protocol/api/fetch.go
  • protocol/api/describe_topic_partitions.go
  • protocol/utils.go
🧰 Additional context used
🧬 Code Graph Analysis (23)
protocol/decoder/inspectable_hex_dump.go (1)
protocol/utils/utils.go (1)
  • GetFormattedHexdumpForErrors (63-94)
internal/stage_c2.go (2)
protocol/kafka_client/client.go (2)
  • Client (31-35)
  • NewClient (39-41)
protocol/utils/utils.go (1)
  • GetFormattedHexdump (21-61)
protocol/api/record_batch.go (2)
protocol/utils/utils.go (1)
  • LogWithIndentation (96-98)
protocol/errors/errors.go (2)
  • PacketDecodingError (7-10)
  • NewPacketDecodingError (16-21)
protocol/api/api_versions.go (2)
protocol/api/api_versions_request.go (1)
  • ApiVersionsRequest (24-27)
protocol/encoder/encoder.go (1)
  • Encoder (13-16)
protocol/api/api_versions_response.go (2)
protocol/utils/utils.go (1)
  • LogWithIndentation (96-98)
protocol/errors/errors.go (1)
  • PacketDecodingError (7-10)
protocol/api/api_versions_request.go (2)
protocol/encoder/encoder.go (1)
  • Encoder (13-16)
protocol/api/header.go (1)
  • RequestHeader (12-21)
internal/stage_f1.go (3)
protocol/kafka_client/client.go (2)
  • NewClient (39-41)
  • Client (31-35)
internal/assertions/response_header_assertion.go (1)
  • NewResponseHeaderAssertion (21-28)
internal/assertions/apiversions_response_assertion.go (1)
  • NewApiVersionsResponseAssertion (34-41)
protocol/builder/interface.go (1)
protocol/api/header.go (1)
  • RequestHeader (12-21)
internal/stage_dtp4.go (3)
protocol/kafka_client/client.go (2)
  • NewClient (39-41)
  • Client (31-35)
internal/assertions/response_header_assertion.go (1)
  • NewResponseHeaderAssertion (21-28)
internal/assertions/describe_topic_partitions_response_assertion.go (1)
  • NewDescribeTopicPartitionsResponseAssertion (26-35)
internal/stage_4.go (2)
protocol/kafka_client/client.go (2)
  • NewClient (39-41)
  • Client (31-35)
protocol/utils/utils.go (2)
  • GetFormattedHexdump (21-61)
  • LogWithIndentation (96-98)
protocol/api/fetch_request.go (2)
protocol/api/header.go (1)
  • RequestHeader (12-21)
protocol/encoder/encoder.go (1)
  • Encoder (13-16)
internal/stage_dtp1.go (3)
protocol/kafka_client/client.go (2)
  • NewClient (39-41)
  • Client (31-35)
internal/assertions/response_header_assertion.go (1)
  • NewResponseHeaderAssertion (21-28)
internal/assertions/apiversions_response_assertion.go (1)
  • NewApiVersionsResponseAssertion (34-41)
internal/stage_2.go (2)
protocol/kafka_client/client.go (2)
  • NewClient (39-41)
  • Client (31-35)
protocol/utils/utils.go (2)
  • GetFormattedHexdump (21-61)
  • LogWithIndentation (96-98)
internal/stage_c1.go (1)
protocol/kafka_client/client.go (2)
  • NewClient (39-41)
  • Client (31-35)
protocol/api/describe_topic_partitions_request.go (2)
protocol/api/header.go (1)
  • RequestHeader (12-21)
protocol/encoder/encoder.go (1)
  • Encoder (13-16)
internal/stage_3.go (2)
protocol/kafka_client/client.go (2)
  • NewClient (39-41)
  • Client (31-35)
protocol/utils/utils.go (2)
  • GetFormattedHexdump (21-61)
  • LogWithIndentation (96-98)
internal/assertions/response_header_assertion.go (2)
protocol/api/header.go (1)
  • ResponseHeader (36-38)
internal/assertions/utils.go (1)
  • Contains (6-8)
internal/stage_f4.go (2)
protocol/kafka_client/client.go (2)
  • NewClient (39-41)
  • Client (31-35)
internal/assertions/response_header_assertion.go (1)
  • NewResponseHeaderAssertion (21-28)
protocol/kafka_client/client.go (3)
internal/kafka_executable/kafka_executable.go (1)
  • KafkaExecutable (14-18)
protocol/builder/interface.go (1)
  • RequestI (5-8)
protocol/utils/utils.go (2)
  • APIKeyToName (104-119)
  • GetFormattedHexdump (21-61)
internal/assertions/apiversions_response_assertion.go (2)
protocol/api/api_versions_response.go (1)
  • ApiVersionsResponse (63-72)
internal/assertions/utils.go (1)
  • Contains (6-8)
internal/assertions/describe_topic_partitions_response_assertion.go (3)
protocol/api/describe_topic_partitions_response.go (2)
  • DescribeTopicPartitionsResponse (15-19)
  • DescribeTopicPartitionsResponsePartition (172-182)
internal/assertions/utils.go (1)
  • Contains (6-8)
protocol/utils/utils.go (1)
  • SuccessLogWithIndentation (100-102)
protocol/api/describe_topic_partitions_response.go (2)
protocol/utils/utils.go (1)
  • LogWithIndentation (96-98)
protocol/errors/errors.go (1)
  • PacketDecodingError (7-10)
internal/assertions/fetch_response_assertion.go (4)
protocol/api/fetch_response.go (2)
  • FetchResponse (13-19)
  • PartitionResponse (154-163)
internal/assertions/utils.go (1)
  • Contains (6-8)
protocol/utils/utils.go (1)
  • SuccessLogWithIndentation (100-102)
protocol/api/record_batch.go (2)
  • RecordBatch (14-28)
  • Record (203-211)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Cursor BugBot
🔇 Additional comments (91)
internal/assertions/utils.go (1)

3-7: LGTM: Good use of standard library.

Replacing the manual loop with slices.Contains improves code readability and leverages the optimized standard library implementation.

protocol/decoder/inspectable_hex_dump.go (1)

7-7: LGTM: Import path updated for package restructuring.

The import path change from protocol to protocol/utils is consistent with the broader refactoring to centralize utility functions in the utils package.

protocol/api/api_versions_response.go (2)

8-8: LGTM: Import updated for package restructuring.

The import change to protocol/utils aligns with the broader refactoring to centralize utility functions.


32-139: LGTM: Logging calls systematically updated.

All protocol.LogWithIndentation calls have been properly updated to utils.LogWithIndentation while maintaining the same parameters and functionality. The changes are consistent throughout both decode methods.

internal/test_helpers/fixtures/base/pass (1)

367-369: LGTM: Enhanced test assertions for better validation.

The updated assertions provide more precise validation:

  • Exact API keys array length verification (61)
  • Specific version range validation for API_VERSIONS (0-4 inclusive minimum, ≥4 maximum)

These changes improve test coverage and ensure more rigorous validation of the API versions response.

protocol/api/record_batch.go (2)

10-10: LGTM: Import updated for package restructuring.

The import change to protocol/utils is consistent with the broader refactoring to centralize utility functions.


70-410: LGTM: Logging calls systematically updated across all decode methods.

All protocol.LogWithIndentation calls have been properly updated to utils.LogWithIndentation while maintaining identical parameters and functionality. The changes are consistent across RecordBatch, Record, and RecordHeader decode methods.

internal/stage_c2.go (4)

10-10: LGTM: Import changes align with refactoring objectives.

The addition of kafka_client and utils package imports correctly supports the transition from protocol.Broker to kafka_client.Client and centralizes logging utilities.

Also applies to: 12-12


31-31: LGTM: Client array type updated correctly.

The change from []*protocol.Broker to []*kafka_client.Client properly reflects the new client abstraction.


35-35: LGTM: Client instantiation updated correctly.

The change from protocol.NewBroker to kafka_client.NewClient properly uses the new client constructor while maintaining the same address parameter.


62-62: LGTM: Hex dump utility calls updated correctly.

The changes from protocol.GetFormattedHexdump to utils.GetFormattedHexdump properly use the centralized logging utilities while maintaining the same functionality for request and response logging.

Also applies to: 78-78

protocol/api/fetch_response.go (2)

9-9: LGTM: Import updated for centralized logging utilities.

The change from protocol to utils package import correctly supports the migration of logging utilities to a centralized location.


33-33: LGTM: All logging utility calls migrated consistently.

The systematic replacement of protocol.LogWithIndentation with utils.LogWithIndentation across all decode methods correctly centralizes logging utilities while maintaining identical functionality and parameter signatures.

Also applies to: 41-41, 49-49, 58-58, 67-67, 84-84, 114-114, 124-124, 132-132, 149-149, 172-172, 180-180, 188-188, 196-196, 204-204, 213-213, 223-223, 241-241, 251-251, 256-256, 278-278, 295-295, 303-303

protocol/api/api_versions.go (2)

11-12: LGTM: Deprecation notice provides clear migration path.

The TODO comment appropriately indicates the planned removal of EncodeApiVersionsRequest and suggests the cleaner Request.Encode approach, aligning with the broader refactoring toward encapsulated encoding methods.


17-17: Header encoding modernization verified

All references to EncodeV2 have been replaced with Encode, and no remaining calls to EncodeV2 were found. The new Header.Encode(&encoder) method is consistently applied in:

  • protocol/api/api_versions.go
  • protocol/api/api_versions_request.go
  • protocol/api/fetch_request.go
  • protocol/api/describe_topic_partitions_request.go

This aligns with the updated RequestI interface. LGTM.

internal/test_helpers/fixtures/describe_topic_partitions/pass (2)

418-418: LGTM: Enhanced verification for empty partitions case.

The addition of "✓ TopicResponse[0] has empty partitions" appropriately validates that unknown topics (error code 3) correctly return zero partitions, improving test granularity.


483-484: LGTM: Enhanced partition-level validation.

The additions of "✓ PartitionResponse[0] Error code: 0" and "✓ PartitionResponse[0] Partition Index: 0" provide valuable granular validation of partition details, improving test coverage for successful topic responses.

internal/test_helpers/fixtures/concurrent_stages/pass (1)

7-8: LGTM - Log format improvement for better readability

The split from single-line to two-line request logging improves readability by separating the request sequence information from the API-specific details. This formatting change aligns well with the broader logging infrastructure refactoring.

Also applies to: 371-372

protocol/api/header.go (3)

7-7: Import refactoring aligns with logging utility centralization

The replacement of protocol import with utils is consistent with the broader refactoring to centralize logging utilities in a dedicated package.


23-33: Good encapsulation with public interface method

The introduction of a public Encode method that delegates to the private encodeV2 provides a clean interface while preserving the existing implementation. This pattern supports the new client abstraction that expects request objects to have self-contained encoding capabilities.


49-49: Logging calls updated consistently

All protocol.LogWithIndentation calls have been correctly replaced with utils.LogWithIndentation, maintaining the same functionality while using the new centralized logging utilities.

Also applies to: 63-63, 72-72

protocol/api/api_versions_request.go (2)

16-16: Good change from pointer to value receiver

Changing from a pointer receiver to a value receiver is appropriate here since the Encode method doesn't modify the ApiVersionsRequestBody struct, and the struct is lightweight enough that copying is not a performance concern.


29-42: Excellent implementation of self-contained encoding interface

The addition of Encode() and GetHeader() methods transforms the request into a self-contained object that encapsulates its own encoding logic. This design supports the new client abstraction that operates on typed request objects rather than raw byte slices, improving type safety and code maintainability.

The encoder initialization and message packing pattern is consistent with the broader refactoring approach.

protocol/api/describe_topic_partitions_response.go (1)

9-9: Consistent logging utility refactoring

All protocol.LogWithIndentation calls have been systematically replaced with utils.LogWithIndentation throughout the decoding methods. This change is part of the broader effort to centralize logging utilities and maintains the same logging functionality while using the new infrastructure.

Also applies to: 29-29, 38-38, 44-44, 69-69, 91-91, 101-101, 118-118, 127-127, 136-136, 141-141, 159-159, 167-167, 192-192, 201-201, 210-210, 219-219, 227-227, 235-235, 243-243, 251-251, 259-259, 267-267, 299-299, 303-303, 312-312, 320-320, 328-328

internal/stage_4.go (4)

13-15: LGTM! Import updates align with the refactor.

The import changes correctly reflect the migration from protocol.Broker to kafka_client.Client and the consolidation of utility functions into the utils package.


34-40: LGTM! Client initialization follows the new pattern.

The client initialization and connection setup correctly uses the new kafka_client.Client abstraction with proper deferred cleanup.


55-63: LGTM! Client method usage is consistent.

The migration from broker to client methods (Send, ReceiveRaw) maintains the same functionality while using the new client abstraction.


53-53: LGTM! Utility function migration is consistent.

The migration of GetFormattedHexdump and LogWithIndentation calls from the protocol package to the utils package is consistent across all usage points and maintains the same functionality.

Also applies to: 63-63, 78-78, 89-89, 99-99

protocol/builder/interface.go (2)

5-8: LGTM! Well-designed interface for request abstraction.

The RequestI interface provides a clean abstraction for Kafka requests with Encode() for serialization and GetHeader() for metadata access. This design enables the new kafka_client.Client to work with structured request objects instead of raw byte slices.


18-20: LGTM! Simple and effective assertion interface.

The Assertion interface with a single Run() method provides a clean contract for executing assertions, allowing for stateful assertion objects with embedded loggers and error handling.

internal/stage_3.go (4)

8-11: LGTM! Import reorganization is clean and consistent.

The import changes correctly add kafka_client and utils packages while maintaining proper grouping. The reorganization aligns with the refactor to use the new client abstraction.


32-38: LGTM! Client initialization follows established pattern.

The client setup matches the pattern used in other stage files with proper initialization, connection with retries, and deferred cleanup.


55-63: LGTM! Client method usage is correctly implemented.

The migration to use client.Send() and client.ReceiveRaw() methods maintains the same functionality while leveraging the new client abstraction.


53-53: LGTM! Utility function migration is consistent across the file.

All calls to utils.GetFormattedHexdump() and utils.LogWithIndentation() correctly reference the new utils package location.

Also applies to: 63-63, 78-78, 89-89

protocol/api/fetch_request.go (3)

105-108: LGTM! Struct definition follows established pattern.

The FetchRequest struct with Header and Body fields provides a clean structure that aligns with other request types in the codebase.


110-119: LGTM! Encoding implementation is consistent.

The Encode() method follows the established pattern across request types: initialize encoder with fixed buffer, encode header and body, pack message, and return bytes. While this does create some duplication as noted in past reviews, the current approach was agreed upon and a future refactor is planned to address this.


121-123: LGTM! Header accessor is straightforward.

The GetHeader() method provides simple access to the request header, fulfilling the RequestI interface contract.

internal/stage_f1.go (4)

8-8: LGTM! Import addition is consistent with refactor.

Adding the kafka_client import aligns with the migration from protocol.Broker to the new client abstraction used throughout the codebase.


28-34: LGTM! Client initialization follows established pattern.

The client setup is consistent with other stage files, using proper initialization, connection with retries, and deferred cleanup.


45-45: LGTM! Enhanced client method usage.

Using client.SendAndReceive() with the structured request object and logger parameter streamlines the request/response flow compared to separate send and receive calls.


58-58: LGTM! Assertion pattern upgrade is well-implemented.

The migration to use assertion objects with the Run() method (instead of Evaluate()) provides better encapsulation of assertion logic and error handling. The assertions now embed loggers internally, which improves the testing framework's consistency.

Also applies to: 79-79

internal/stage_dtp4.go (5)

9-9: Import update aligns with the refactor.

The import change from protocol to kafka_client is consistent with the broader refactor objectives.


27-33: Client setup follows the new abstraction correctly.

The migration from protocol.Broker to kafka_client.Client is implemented properly with appropriate connection handling and deferred cleanup.


47-47: Request sending simplified effectively.

The refactor to use client.SendAndReceive(request, stageLogger) directly with the structured request object is cleaner than the previous manual encoding approach and aligns with the PR objectives.


60-60: Assertion API update implemented correctly.

The change from .Evaluate() to .Run() with logger passed during assertion construction follows the new stateful assertion pattern.


99-99: Response assertion simplified appropriately.

The streamlined assertion call using .Run() instead of chained method calls aligns with the new exclusion-based assertion approach.

internal/stage_2.go (3)

11-13: Import updates align with the refactor.

The replacement of protocol with kafka_client and addition of utils package are consistent with the broader refactor objectives.


30-36: Client migration implemented correctly.

The transition from protocol.Broker to kafka_client.Client follows the established pattern with proper connection handling and cleanup.


51-61: Utility function migration completed correctly.

The replacement of protocol utility functions with utils package functions for hex dump formatting and indented logging is appropriate and maintains the same functionality.

Also applies to: 76-76, 87-87

internal/stage_dtp1.go (4)

8-8: Import update correctly implemented.

The addition of kafka_client package import aligns with the refactor from protocol.Broker to kafka_client.Client.


27-33: Client abstraction migration handled properly.

The replacement of protocol.Broker with kafka_client.Client is implemented correctly with appropriate connection management and resource cleanup.


44-44: Unified request handling implemented correctly.

The use of client.SendAndReceive(request, stageLogger) with the structured request object simplifies the code and aligns with the PR objective of consolidating request handling.


57-57: Assertion API updates properly implemented.

The migration from .Evaluate() to .Run() with logger embedded in assertion construction follows the new stateful assertion pattern consistently.

Also applies to: 78-78

internal/stage_f3.go (5)

9-9: Import change supports the client refactor.

The addition of kafka_client package import correctly supports the migration from protocol.Broker to kafka_client.Client.


30-36: Client abstraction properly adopted.

The migration from protocol.Broker to kafka_client.Client is correctly implemented with proper connection establishment and resource cleanup.


67-67: Unified request handling correctly implemented.

The use of client.SendAndReceive(request, stageLogger) simplifies the request processing and aligns with the PR objective of consolidating request handling.


80-80: Response header assertion updated appropriately.

The migration to .Run() method with logger passed during assertion construction follows the new stateful assertion pattern.


102-102: Field exclusion pattern implemented correctly.

The use of SkipRecordBatchFields() before .Run() demonstrates the new flexible exclusion-based assertion approach mentioned in the AI summary.

internal/stage_f6.go (6)

9-9: Import update supports client refactor.

The addition of kafka_client package import correctly supports the migration from the previous broker abstraction.


27-33: Client migration implemented correctly.

The replacement of protocol.Broker with kafka_client.Client follows the established pattern with proper connection and cleanup handling.


64-64: Unified request handling properly adopted.

The use of client.SendAndReceive(request, logger) with the structured request object aligns with the PR objective of consolidating request handling.


77-77: Assertion API update correctly implemented.

The migration from .Evaluate() to .Run() with embedded logger follows the new stateful assertion pattern.


122-125: TODO comments document known limitations appropriately.

The TODO comments clearly indicate that the BatchLength: 0 is a temporary workaround until the builder interface is implemented. This is good documentation of planned future improvements.


156-156: Field exclusion used appropriately for temporary workaround.

The ExcludeRecordBatchFields("BatchLength") call properly handles the known BatchLength issue mentioned in the TODO comments while the builder interface is being developed.

internal/stage_dtp5.go (1)

27-34: LGTM! Clean refactoring to the new client abstraction.

The migration from protocol.Broker to kafka_client.Client is implemented correctly with proper connection handling and deferred cleanup.

internal/stage_dtp3.go (1)

27-88: Clean and consistent refactoring.

The migration to kafka_client.Client and the updated assertion pattern with Run() method are implemented correctly throughout the function.

internal/stage_dtp2.go (1)

77-79: Good use of method chaining for assertions.

The fluent interface pattern with SkipPartitionFields().Run() improves readability and maintainability.

internal/stage_5.go (1)

28-77: Excellent refactoring with improved assertion handling.

The migration successfully consolidates request handling through the unified SendAndReceive method and improves code clarity with the new assertion pattern. The structured request objects and embedded logging make the code more maintainable.

internal/stage_f5.go (5)

9-9: Import addition aligns with client abstraction refactor.

The addition of the kafka_client import supports the migration from the lower-level protocol.Broker to the higher-level client abstraction.


27-33: Client instantiation and connection handling improved.

The new kafka_client.Client provides a cleaner abstraction compared to the previous protocol.Broker. The connection setup and deferred cleanup are properly implemented.


64-64: Simplified request sending with structured objects.

The direct sending of structured FetchRequest objects via client.SendAndReceive is much cleaner than the previous manual encoding approach. The logger integration provides better traceability.


77-77: Updated assertion pattern with embedded logger.

The new assertion pattern using Run() with logger embedded in the constructor is more consistent and encapsulated than the previous Evaluate approach.


129-131: Simplified assertion with field exclusion pattern.

The exclusion-based assertion approach (ExcludeRecordBatchFields("BatchLength")) provides flexibility while maintaining test correctness. The TODO comment appropriately notes the temporary nature pending a response builder.

internal/stage_f2.go (4)

8-8: Consistent import update for client abstraction.

The import change aligns with the overall refactoring from protocol.Broker to kafka_client.Client.


26-32: Proper client lifecycle management.

The client instantiation, connection with retries, and deferred cleanup follow the established pattern consistently across stage files.


49-49: Streamlined request handling.

Sending structured request objects directly simplifies the code and improves maintainability compared to manual encoding.


62-62: Modernized assertion pattern.

The updated assertion calls using Run() with embedded loggers and the SkipTopicFields() exclusion method provide better encapsulation and flexibility.

Also applies to: 72-74

internal/stage_f4.go (4)

9-9: Import aligns with client refactoring.

Consistent with the migration to the new kafka_client abstraction.


27-33: Standard client lifecycle implementation.

The client setup and cleanup pattern is consistently implemented across stage files.


64-64: Enhanced request handling with logging.

Direct request object sending with integrated logging improves traceability and simplifies the codebase.


77-77: Appropriate assertion updates.

The use of SkipRecordBatchFields() instead of SkipTopicFields() is contextually appropriate for this test case, which focuses on fetch responses without record batches.

Also applies to: 104-106

internal/assertions/response_header_assertion.go (4)

10-19: Enhanced assertion struct design.

The addition of logger, error state, and exclusion fields transforms this from a stateless utility to a proper assertion object with encapsulated behavior and flexible validation options.


21-28: Improved constructor with logger integration.

The constructor now properly initializes all fields and accepts a logger for integrated logging, improving traceability and debugging capabilities.


30-46: Well-designed assertion logic with exclusion support.

The private assertHeader method properly:

  • Respects the error state (early return pattern)
  • Uses the exclusion list to conditionally validate fields
  • Provides clear success logging with specific values
  • Sets descriptive error messages

48-53: Clean public interface.

The Run() method provides a simple public interface while encapsulating the assertion logic, following the pattern established across other assertion types.

protocol/utils/utils.go (2)

21-61: Well-implemented hex dump formatter.

The function efficiently builds a formatted hex dump with proper headers, separators, and ASCII representation. Good use of strings.Builder and proper handling of edge cases.


96-98: LGTM!

Simple and effective logging helper with configurable indentation.

protocol/kafka_client/client.go (2)

29-41: Good refactoring to clarify the abstraction.

The rename from Broker to Client better represents the actual purpose of this type - a client connection to a Kafka broker rather than the broker itself.


197-221: Good implementation with proper safeguards.

The use of io.LimitReader to prevent excessive memory consumption is a good practice. The 1MB limit seems reasonable for a testing framework.

internal/assertions/describe_topic_partitions_response_assertion.go (1)

37-60: Well-designed exclusion mechanism.

The multiple levels of exclusion (body, topic, partition) with convenient skip methods provide excellent flexibility for test scenarios.

internal/assertions/fetch_response_assertion.go (1)

71-89: Excellent cascading design for skip methods.

The cascading skip methods (SkipTopicFields → SkipPartitionFields → SkipRecordBatchFields → SkipRecordFields) provide an intuitive API that matches the hierarchical nature of the data structure.

Comment on lines 37 to +38
requestCount := random.RandomInt(2, 5)
for i := 0; i < requestCount; i++ {
for i := range requestCount {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix incorrect loop syntax - this will cause a compilation error.

The range keyword cannot be used directly on an integer. This is a syntax error in Go.

Apply this diff to fix the loop:

 requestCount := random.RandomInt(2, 5)
-for i := range requestCount {
+for i := 0; i < requestCount; i++ {

Alternatively, if you're using Go 1.22+, you can use:

-for i := range requestCount {
+for i := range int(requestCount) {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
requestCount := random.RandomInt(2, 5)
for i := 0; i < requestCount; i++ {
for i := range requestCount {
requestCount := random.RandomInt(2, 5)
for i := 0; i < requestCount; i++ {
🤖 Prompt for AI Agents
In internal/stage_c1.go around lines 37 to 38, the for loop incorrectly uses
'range' on an integer variable 'requestCount', which causes a compilation error.
Replace the loop with a standard for loop iterating from 0 to requestCount-1,
like 'for i := 0; i < requestCount; i++', to fix the syntax error.

Comment on lines +104 to +119
func APIKeyToName(apiKey int16) string {
switch apiKey {
case 0:
return "Produce"
case 1:
return "Fetch"
case 18:
return "ApiVersions"
case 19:
return "CreateTopics"
case 75:
return "DescribeTopicPartitions"
default:
panic(fmt.Sprintf("CodeCrafters Internal Error: Unknown API key: %v", apiKey))
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider returning a default string instead of panicking.

Panicking on an unknown API key could crash the entire test suite. Consider returning a formatted string like "Unknown API Key (%d)" instead, which would be more graceful and still informative.

 func APIKeyToName(apiKey int16) string {
 	switch apiKey {
 	case 0:
 		return "Produce"
 	case 1:
 		return "Fetch"
 	case 18:
 		return "ApiVersions"
 	case 19:
 		return "CreateTopics"
 	case 75:
 		return "DescribeTopicPartitions"
 	default:
-		panic(fmt.Sprintf("CodeCrafters Internal Error: Unknown API key: %v", apiKey))
+		return fmt.Sprintf("Unknown API Key (%d)", apiKey)
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func APIKeyToName(apiKey int16) string {
switch apiKey {
case 0:
return "Produce"
case 1:
return "Fetch"
case 18:
return "ApiVersions"
case 19:
return "CreateTopics"
case 75:
return "DescribeTopicPartitions"
default:
panic(fmt.Sprintf("CodeCrafters Internal Error: Unknown API key: %v", apiKey))
}
}
func APIKeyToName(apiKey int16) string {
switch apiKey {
case 0:
return "Produce"
case 1:
return "Fetch"
case 18:
return "ApiVersions"
case 19:
return "CreateTopics"
case 75:
return "DescribeTopicPartitions"
default:
return fmt.Sprintf("Unknown API Key (%d)", apiKey)
}
}
🤖 Prompt for AI Agents
In protocol/utils/utils.go around lines 104 to 119, the function APIKeyToName
panics when given an unknown API key, which can crash the test suite. Modify the
function to return a formatted string like "Unknown API Key (%d)" instead of
panicking, so it handles unexpected keys gracefully while still providing
informative output.

@ryan-gang ryan-gang closed this Jul 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants