diff --git a/ddtrace/contrib/internal/kafka/patch.py b/ddtrace/contrib/internal/kafka/patch.py index 225c0f82877..b8e8fce007d 100644 --- a/ddtrace/contrib/internal/kafka/patch.py +++ b/ddtrace/contrib/internal/kafka/patch.py @@ -247,7 +247,7 @@ def _instrument_message(messages, pin, start_ns, instance, err): name=schematize_messaging_operation(kafkax.CONSUME, provider="kafka", direction=SpanDirection.PROCESSING), service=trace_utils.ext_service(pin, config.kafka), span_type=SpanTypes.WORKER, - child_of=ctx if ctx is not None else pin.tracer.context_provider.active(), + child_of=ctx if ctx is not None and ctx.trace_id is not None else pin.tracer.context_provider.active(), activate=True, ) as span: # reset span start time to before function call diff --git a/releasenotes/notes/fix-kafka-consumer-parenting-29acfd08e05d2350.yaml b/releasenotes/notes/fix-kafka-consumer-parenting-29acfd08e05d2350.yaml new file mode 100644 index 00000000000..df8cdcfe986 --- /dev/null +++ b/releasenotes/notes/fix-kafka-consumer-parenting-29acfd08e05d2350.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + kafka: Fixes an issue with Kafka consumer spans not using the active trace context when distributed + tracing was enabled and no valid distributed context found was found within a consumed message. diff --git a/tests/contrib/kafka/test_kafka.py b/tests/contrib/kafka/test_kafka.py index 9bcf4ffc538..d49f85f26b2 100644 --- a/tests/contrib/kafka/test_kafka.py +++ b/tests/contrib/kafka/test_kafka.py @@ -885,6 +885,35 @@ def test_context_header_injection_works_no_client_added_headers(kafka_topic, pro assert propagation_asserted is True +def test_consumer_uses_active_context_when_no_valid_distributed_context_exists( + kafka_topic, producer, consumer, dummy_tracer +): + # use a random int in this string to prevent reading a message produced by a previous test run + test_string = "producer does not inject context test " + str(random.randint(0, 1000)) + test_key = "producer does not inject context test " + str(random.randint(0, 1000)) + PAYLOAD = bytes(test_string, encoding="utf-8") + + producer.produce(kafka_topic, PAYLOAD, key=test_key) + producer.flush() + + Pin.override(consumer, tracer=dummy_tracer) + + with dummy_tracer.trace("kafka consumer parent span") as parent_span: + with override_config("kafka", dict(distributed_tracing_enabled=True)): + message = None + while message is None or str(message.value()) != str(PAYLOAD): + message = consumer.poll() + + traces = dummy_tracer.pop_traces() + consume_span = traces[len(traces) - 1][-1] + + # assert consumer_span parent is our custom span + assert consume_span.name == "kafka.consume" + assert consume_span.parent_id == parent_span.span_id + + Pin.override(consumer, tracer=None) + + def test_span_has_dsm_payload_hash(dummy_tracer, consumer, producer, kafka_topic): Pin.override(producer, tracer=dummy_tracer) Pin.override(consumer, tracer=dummy_tracer)