Skip to content

Commit

Permalink
fix: kafka consumer parenting logic (#11653)
Browse files Browse the repository at this point in the history
Fixes bug where Kafka consumer was creating unparented spans when
consuming a message with no context present.

## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [x] Reviewer has checked that all the criteria below are met
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

---------

Co-authored-by: Emmett Butler <[email protected]>
(cherry picked from commit 385d8e0)
  • Loading branch information
wconti27 authored and github-actions[bot] committed Dec 13, 2024
1 parent 8aa79fa commit 9b750b7
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 1 deletion.
2 changes: 1 addition & 1 deletion ddtrace/contrib/internal/kafka/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def _instrument_message(messages, pin, start_ns, instance, err):
name=schematize_messaging_operation(kafkax.CONSUME, provider="kafka", direction=SpanDirection.PROCESSING),
service=trace_utils.ext_service(pin, config.kafka),
span_type=SpanTypes.WORKER,
child_of=ctx if ctx is not None else pin.tracer.context_provider.active(),
child_of=ctx if ctx is not None and ctx.trace_id is not None else pin.tracer.context_provider.active(),
activate=True,
) as span:
# reset span start time to before function call
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
fixes:
- |
kafka: Fixes an issue with Kafka consumer spans not using the active trace context when distributed
tracing was enabled and no valid distributed context found was found within a consumed message.
29 changes: 29 additions & 0 deletions tests/contrib/kafka/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,35 @@ def test_context_header_injection_works_no_client_added_headers(kafka_topic, pro
assert propagation_asserted is True


def test_consumer_uses_active_context_when_no_valid_distributed_context_exists(
kafka_topic, producer, consumer, dummy_tracer
):
# use a random int in this string to prevent reading a message produced by a previous test run
test_string = "producer does not inject context test " + str(random.randint(0, 1000))
test_key = "producer does not inject context test " + str(random.randint(0, 1000))
PAYLOAD = bytes(test_string, encoding="utf-8")

producer.produce(kafka_topic, PAYLOAD, key=test_key)
producer.flush()

Pin.override(consumer, tracer=dummy_tracer)

with dummy_tracer.trace("kafka consumer parent span") as parent_span:
with override_config("kafka", dict(distributed_tracing_enabled=True)):
message = None
while message is None or str(message.value()) != str(PAYLOAD):
message = consumer.poll()

traces = dummy_tracer.pop_traces()
consume_span = traces[len(traces) - 1][-1]

# assert consumer_span parent is our custom span
assert consume_span.name == "kafka.consume"
assert consume_span.parent_id == parent_span.span_id

Pin.override(consumer, tracer=None)


def test_span_has_dsm_payload_hash(dummy_tracer, consumer, producer, kafka_topic):
Pin.override(producer, tracer=dummy_tracer)
Pin.override(consumer, tracer=dummy_tracer)
Expand Down

0 comments on commit 9b750b7

Please sign in to comment.