Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
0977102
Rename decoder -> decoder_legacy
UdeshyaDhungana Aug 29, 2025
2e38b0d
Suffix modules with *_legacy
UdeshyaDhungana Aug 29, 2025
a492573
Rename all usages
UdeshyaDhungana Aug 29, 2025
9d07a5e
Rename all remaining usages
UdeshyaDhungana Aug 29, 2025
be0cb21
Rename errors -> errors_legacy
UdeshyaDhungana Aug 29, 2025
a7ba1d5
assertions package is now legacy
UdeshyaDhungana Aug 29, 2025
a18ba77
Add new decoder and errors package
UdeshyaDhungana Aug 29, 2025
c3f9b6d
Rename builder to builder_legacy
UdeshyaDhungana Aug 29, 2025
9a570ff
Rename builder_legacy usages
UdeshyaDhungana Aug 29, 2025
27f017f
Merge branch 'rename-modules' into temp
UdeshyaDhungana Aug 29, 2025
c8d1f04
migrate upto stage 4
UdeshyaDhungana Aug 29, 2025
f292d88
Add remaining files
UdeshyaDhungana Aug 29, 2025
3969d77
client and interface moved to legacy
UdeshyaDhungana Aug 29, 2025
6d88217
Resolve merge conflict
UdeshyaDhungana Aug 29, 2025
10411d7
move encoder to legacy
UdeshyaDhungana Aug 29, 2025
830cd46
Merge branch 'rename-modules' into temp
UdeshyaDhungana Aug 29, 2025
2bc71fa
serializer -> serializer_legacy
UdeshyaDhungana Aug 29, 2025
8c8175b
Resolve merge conflict
UdeshyaDhungana Aug 29, 2025
9171cb6
First round of refactor
UdeshyaDhungana Aug 29, 2025
bafa2da
Apply suggested fixes
UdeshyaDhungana Sep 1, 2025
88869fd
Remove the usage of serializer_legacy in base stages
UdeshyaDhungana Sep 2, 2025
3c4cecf
Minor refactor
UdeshyaDhungana Sep 2, 2025
70533ee
Refactor files manager
UdeshyaDhungana Sep 2, 2025
cf33239
Encoder interface changed
UdeshyaDhungana Sep 2, 2025
6a52ba2
Remove logger and indentation in ApiVersionsResponseBody.Decode()
UdeshyaDhungana Sep 2, 2025
7662386
Change decoder printing format and refactor api_versions_response.go
UdeshyaDhungana Sep 2, 2025
8bc159e
Refactor and regenerate fixtures
UdeshyaDhungana Sep 2, 2025
8b662c7
Refactor base stages and dependencies
UdeshyaDhungana Sep 2, 2025
97cf7fa
Refactor decoder
UdeshyaDhungana Sep 2, 2025
17468a4
Refactor decoder
UdeshyaDhungana Sep 2, 2025
ac4ba7e
Remove files manager for refactor-base-stages
UdeshyaDhungana Sep 3, 2025
d9bf369
Reove printing log files details to CLI in base stages
UdeshyaDhungana Sep 3, 2025
0d6c84c
Merge with main
UdeshyaDhungana Sep 3, 2025
b488f96
Add missing files after git merge
UdeshyaDhungana Sep 3, 2025
76723a5
Minor decoder refactor
UdeshyaDhungana Sep 3, 2025
391be3d
CI: Re-record Fixtures
UdeshyaDhungana Sep 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions internal/assertions/apiversions_response_assertion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package assertions

import (
"fmt"

"github.com/codecrafters-io/kafka-tester/protocol/kafkaapi"
"github.com/codecrafters-io/tester-utils/logger"
)

var apiKeyNames = map[int16]string{
1: "FETCH",
18: "API_VERSIONS",
75: "DESCRIBE_TOPIC_PARTITIONS",
}

var errorCodes = map[int]string{
0: "NO_ERROR",
3: "UNKNOWN_TOPIC_OR_PARTITION",
35: "UNSUPPORTED_VERSION",
100: "UNKNOWN_TOPIC_ID",
}

type ApiVersionsResponseAssertion struct {
ActualValue kafkaapi.ApiVersionsResponse
ExpectedValue kafkaapi.ApiVersionsResponse
}

func NewApiVersionsResponseAssertion(actualValue kafkaapi.ApiVersionsResponse, expectedValue kafkaapi.ApiVersionsResponse) *ApiVersionsResponseAssertion {
return &ApiVersionsResponseAssertion{
ActualValue: actualValue,
ExpectedValue: expectedValue,
}
}

func (a *ApiVersionsResponseAssertion) assertBody(logger *logger.Logger) error {
expectedErrorCodeName, ok := errorCodes[int(a.ExpectedValue.Body.ErrorCode)]

if !ok {
panic(fmt.Sprintf("CodeCrafters Internal Error: Expected %d to be in errorCodes map", a.ExpectedValue.Body.ErrorCode))
}

if a.ActualValue.Body.ErrorCode != a.ExpectedValue.Body.ErrorCode {
return fmt.Errorf("Expected ErrorCode to be %d (%s), got %d", a.ExpectedValue.Body.ErrorCode, expectedErrorCodeName, a.ActualValue.Body.ErrorCode)
}

logger.Successf("✓ ErrorCode: %d (%s)", a.ActualValue.Body.ErrorCode, expectedErrorCodeName)

if err := a.assertAPIKeysArray(logger); err != nil {
return err
}

return nil
}

func (a *ApiVersionsResponseAssertion) assertAPIKeysArray(logger *logger.Logger) error {
if len(a.ActualValue.Body.ApiKeys) < len(a.ExpectedValue.Body.ApiKeys) {
return fmt.Errorf("Expected API keys array to include atleast %d keys, got %d", len(a.ExpectedValue.Body.ApiKeys), len(a.ActualValue.Body.ApiKeys))
}

logger.Successf("✓ API keys array length: %d", len(a.ActualValue.Body.ApiKeys))

for _, expectedApiVersionKey := range a.ExpectedValue.Body.ApiKeys {
found := false

for _, actualApiVersionKey := range a.ActualValue.Body.ApiKeys {
if actualApiVersionKey.ApiKey == expectedApiVersionKey.ApiKey {
found = true

if actualApiVersionKey.MinVersion > expectedApiVersionKey.MaxVersion {
return fmt.Errorf("Expected min version %v to be < max version %v for %s", actualApiVersionKey.MinVersion, expectedApiVersionKey.MaxVersion, apiKeyNames[expectedApiVersionKey.ApiKey])
}

// anything above or equal to expected minVersion is fine
if actualApiVersionKey.MinVersion < expectedApiVersionKey.MinVersion {
return fmt.Errorf("Expected API version %v to be supported for %s, got %v", expectedApiVersionKey.MinVersion, apiKeyNames[expectedApiVersionKey.ApiKey], actualApiVersionKey.MinVersion)
}

logger.Successf("✓ MinVersion for %s is <= %v & >= %v", apiKeyNames[expectedApiVersionKey.ApiKey], expectedApiVersionKey.MaxVersion, expectedApiVersionKey.MinVersion)

if actualApiVersionKey.MaxVersion < expectedApiVersionKey.MaxVersion {
return fmt.Errorf("Expected API version %v to be supported for %s, got %v", expectedApiVersionKey.MaxVersion, apiKeyNames[expectedApiVersionKey.ApiKey], actualApiVersionKey.MaxVersion)
}

logger.Successf("✓ MaxVersion for %s is >= %v", apiKeyNames[expectedApiVersionKey.ApiKey], expectedApiVersionKey.MaxVersion)
}
}

if !found {
return fmt.Errorf("Expected APIVersionsResponseKey array to include API key %d (%s)", expectedApiVersionKey.ApiKey, apiKeyNames[expectedApiVersionKey.ApiKey])
}
}

return nil
}

func (a *ApiVersionsResponseAssertion) Run(logger *logger.Logger) error {
if err := NewResponseHeaderAssertion(a.ActualValue.Header, a.ExpectedValue.Header).Run(logger); err != nil {
return err
}

if err := a.assertBody(logger); err != nil {
return err
}

return nil
}
34 changes: 34 additions & 0 deletions internal/assertions/response_header_assertion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package assertions

import (
"fmt"

"github.com/codecrafters-io/kafka-tester/protocol/kafkaapi/headers"
"github.com/codecrafters-io/tester-utils/logger"
)

type ResponseHeaderAssertion struct {
ActualValue headers.ResponseHeader
ExpectedValue headers.ResponseHeader
}

func NewResponseHeaderAssertion(actualValue headers.ResponseHeader, expectedValue headers.ResponseHeader) *ResponseHeaderAssertion {
return &ResponseHeaderAssertion{
ActualValue: actualValue,
ExpectedValue: expectedValue,
}
}

func (a *ResponseHeaderAssertion) assertCorrelationId(logger *logger.Logger) error {
if a.ActualValue.CorrelationId != a.ExpectedValue.CorrelationId {
return fmt.Errorf("Expected correlation_id to be %d, got %d", a.ExpectedValue.CorrelationId, a.ActualValue.CorrelationId)
}

logger.Successf("✓ correlation_id: %v", a.ActualValue.CorrelationId)

return nil
}

func (a *ResponseHeaderAssertion) Run(logger *logger.Logger) error {
return a.assertCorrelationId(logger)
}
65 changes: 28 additions & 37 deletions internal/stage_2.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,41 @@ import (
"fmt"

"github.com/codecrafters-io/kafka-tester/internal/kafka_executable"
"github.com/codecrafters-io/kafka-tester/protocol"
"github.com/codecrafters-io/kafka-tester/protocol/builder_legacy"
"github.com/codecrafters-io/kafka-tester/protocol/decoder_legacy"
"github.com/codecrafters-io/kafka-tester/protocol/errors_legacy"
"github.com/codecrafters-io/kafka-tester/protocol/kafka_client_legacy"
"github.com/codecrafters-io/kafka-tester/protocol/kafkaapi_legacy"
"github.com/codecrafters-io/kafka-tester/protocol/builder"
"github.com/codecrafters-io/kafka-tester/protocol/decoder"
"github.com/codecrafters-io/kafka-tester/protocol/kafka_client"
"github.com/codecrafters-io/kafka-tester/protocol/kafkaapi"
"github.com/codecrafters-io/kafka-tester/protocol/serializer_legacy"
"github.com/codecrafters-io/kafka-tester/protocol/utils"
"github.com/codecrafters-io/tester-utils/logger"
"github.com/codecrafters-io/tester-utils/test_case_harness"
)

func testHardcodedCorrelationId(stageHarness *test_case_harness.TestCaseHarness) error {
b := kafka_executable.NewKafkaExecutable(stageHarness)
err := serializer_legacy.GenerateLogDirs(logger.GetQuietLogger(""), true)
if err != nil {
stageLogger := stageHarness.Logger

if err := serializer_legacy.GenerateLogDirs(logger.GetQuietLogger(""), true); err != nil {
return err
}

stageLogger := stageHarness.Logger
b := kafka_executable.NewKafkaExecutable(stageHarness)

if err := b.Run(); err != nil {
return err
}

client := kafka_client_legacy.NewClient("localhost:9092")
client := kafka_client.NewClient("localhost:9092")

if err := client.ConnectWithRetries(b, stageLogger); err != nil {
return err
}
defer func(client *kafka_client_legacy.Client) {
_ = client.Close()
}(client)

defer client.Close()
correlationId := int32(7)

request := kafkaapi_legacy.ApiVersionsRequest{
Header: builder_legacy.NewRequestHeaderBuilder().BuildApiVersionsRequestHeader(correlationId),
Body: kafkaapi_legacy.ApiVersionsRequestBody{
request := kafkaapi.ApiVersionsRequest{
Header: builder.NewRequestHeaderBuilder().BuildApiVersionsRequestHeader(correlationId),
Body: kafkaapi.ApiVersionsRequestBody{
Version: 4,
ClientSoftwareName: "kafka-cli",
ClientSoftwareVersion: "0.1",
Expand All @@ -49,45 +48,37 @@ func testHardcodedCorrelationId(stageHarness *test_case_harness.TestCaseHarness)
message := request.Encode()
stageLogger.Infof("Sending \"ApiVersions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId)
stageLogger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", utils.GetFormattedHexdump(message))
err := client.Send(message)

err = client.Send(message)
if err != nil {
return err
}

response, err := client.ReceiveRaw()

if err != nil {
return err
}
stageLogger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", utils.GetFormattedHexdump(response))

decoder := decoder_legacy.Decoder{}
decoder.Init(response)
stageLogger.UpdateLastSecondaryPrefix("Decoder")
stageLogger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", utils.GetFormattedHexdump(response))
decoder := decoder.NewDecoder(response, stageLogger)
decoder.BeginSubSection("response")
_, err = decoder.ReadInt32("message_length")

stageLogger.Debugf("- .Response")
messageLength, err := decoder.GetInt32()
if err != nil {
if decodingErr, ok := err.(*errors_legacy.PacketDecodingError); ok {
err = decodingErr.WithAddedContext("message length").WithAddedContext("response")
return decoder.FormatDetailedError(err.Error())
}
return err
}
protocol.LogWithIndentation(stageLogger, 1, "- .message_length (%d)", messageLength)

stageLogger.Debugf("- .response_header")
responseCorrelationId, err := decoder.GetInt32()
decoder.BeginSubSection("response_header")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Decoder Indentation Leak

The decoder.BeginSubSection("response_header") call in stage_2.go, stage_3.go, and stage_4.go lacks a matching EndCurrentSubSection(). This leaves the decoder's log indentation permanently increased for any subsequent logging operations.

Additional Locations (2)

Fix in Cursor Fix in Web

responseCorrelationId, err := decoder.ReadInt32("correlation_id")

if err != nil {
if decodingErr, ok := err.(*errors_legacy.PacketDecodingError); ok {
err = decodingErr.WithAddedContext("correlation_id").WithAddedContext("response")
return decoder.FormatDetailedError(err.Error())
}
return err
}
protocol.LogWithIndentation(stageLogger, 1, "- .correlation_id (%d)", responseCorrelationId)

stageLogger.ResetSecondaryPrefixes()

if responseCorrelationId != int32(correlationId) {
if responseCorrelationId != correlationId {
return fmt.Errorf("Expected Correlation ID to be %v, got %v", int32(correlationId), responseCorrelationId)
}

Expand Down
66 changes: 28 additions & 38 deletions internal/stage_3.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,42 @@ package internal
import (
"fmt"

"github.com/codecrafters-io/tester-utils/logger"

"github.com/codecrafters-io/kafka-tester/internal/kafka_executable"
"github.com/codecrafters-io/kafka-tester/protocol"
"github.com/codecrafters-io/kafka-tester/protocol/builder_legacy"
"github.com/codecrafters-io/kafka-tester/protocol/decoder_legacy"
"github.com/codecrafters-io/kafka-tester/protocol/errors_legacy"
"github.com/codecrafters-io/kafka-tester/protocol/kafka_client_legacy"
"github.com/codecrafters-io/kafka-tester/protocol/kafkaapi_legacy"
"github.com/codecrafters-io/kafka-tester/protocol/builder"
"github.com/codecrafters-io/kafka-tester/protocol/decoder"
"github.com/codecrafters-io/kafka-tester/protocol/kafka_client"
"github.com/codecrafters-io/kafka-tester/protocol/kafkaapi"
"github.com/codecrafters-io/kafka-tester/protocol/serializer_legacy"
"github.com/codecrafters-io/kafka-tester/protocol/utils"
"github.com/codecrafters-io/tester-utils/logger"
"github.com/codecrafters-io/tester-utils/test_case_harness"
)

func testCorrelationId(stageHarness *test_case_harness.TestCaseHarness) error {
b := kafka_executable.NewKafkaExecutable(stageHarness)
err := serializer_legacy.GenerateLogDirs(logger.GetQuietLogger(""), true)
if err != nil {
stageLogger := stageHarness.Logger

if err := serializer_legacy.GenerateLogDirs(logger.GetQuietLogger(""), true); err != nil {
return err
}

stageLogger := stageHarness.Logger
b := kafka_executable.NewKafkaExecutable(stageHarness)

if err := b.Run(); err != nil {
return err
}

client := kafka_client_legacy.NewClient("localhost:9092")
client := kafka_client.NewClient("localhost:9092")

if err := client.ConnectWithRetries(b, stageLogger); err != nil {
return err
}
defer func(client *kafka_client_legacy.Client) {
_ = client.Close()
}(client)

defer client.Close()
correlationId := getRandomCorrelationId()

request := kafkaapi_legacy.ApiVersionsRequest{
Header: builder_legacy.NewRequestHeaderBuilder().BuildApiVersionsRequestHeader(correlationId),
Body: kafkaapi_legacy.ApiVersionsRequestBody{
request := kafkaapi.ApiVersionsRequest{
Header: builder.NewRequestHeaderBuilder().BuildApiVersionsRequestHeader(correlationId),
Body: kafkaapi.ApiVersionsRequestBody{
Version: 4,
ClientSoftwareName: "kafka-cli",
ClientSoftwareVersion: "0.1",
Expand All @@ -51,42 +48,35 @@ func testCorrelationId(stageHarness *test_case_harness.TestCaseHarness) error {
message := request.Encode()
stageLogger.Infof("Sending \"ApiVersions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId)
stageLogger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", utils.GetFormattedHexdump(message))
err := client.Send(message)

err = client.Send(message)
if err != nil {
return err
}

response, err := client.ReceiveRaw()

if err != nil {
return err
}
stageLogger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", utils.GetFormattedHexdump(response))

decoder := decoder_legacy.Decoder{}
decoder.Init(response)
stageLogger.UpdateLastSecondaryPrefix("Decoder")
stageLogger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", utils.GetFormattedHexdump(response))
decoder := decoder.NewDecoder(response, stageLogger)
decoder.BeginSubSection("response")
_, err = decoder.ReadInt32("message_length")

stageLogger.Debugf("- .Response")
messageLength, err := decoder.GetInt32()
if err != nil {
if decodingErr, ok := err.(*errors_legacy.PacketDecodingError); ok {
err = decodingErr.WithAddedContext("message length").WithAddedContext("response")
return decoder.FormatDetailedError(err.Error())
}
return err
}
protocol.LogWithIndentation(stageLogger, 1, "- .message_length (%d)", messageLength)

stageLogger.Debugf("- .response_header")
responseCorrelationId, err := decoder.GetInt32()
decoder.BeginSubSection("response_header")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Decoder Missing End Calls Causes Log Formatting Issues

The new decoder's BeginSubSection calls in internal/stage_2.go, internal/stage_3.go, and internal/stage_4.go lack corresponding EndCurrentSubSection calls. This results in continuously increasing logging indentation and incorrect log formatting.

Additional Locations (2)

Fix in Cursor Fix in Web


responseCorrelationId, err := decoder.ReadInt32("correlation_id")

if err != nil {
if decodingErr, ok := err.(*errors_legacy.PacketDecodingError); ok {
err = decodingErr.WithAddedContext("correlation_id").WithAddedContext("response")
return decoder.FormatDetailedError(err.Error())
}
return err
}
protocol.LogWithIndentation(stageLogger, 1, "- .correlation_id (%d)", responseCorrelationId)

stageLogger.ResetSecondaryPrefixes()

if responseCorrelationId != correlationId {
Expand Down
Loading