From 917c61b2490a595299531f85dd33e5c5c7186548 Mon Sep 17 00:00:00 2001 From: William Conti Date: Tue, 10 Dec 2024 10:06:48 -0500 Subject: [PATCH 1/5] fix parenting --- ddtrace/contrib/internal/kafka/patch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddtrace/contrib/internal/kafka/patch.py b/ddtrace/contrib/internal/kafka/patch.py index 225c0f82877..b8e8fce007d 100644 --- a/ddtrace/contrib/internal/kafka/patch.py +++ b/ddtrace/contrib/internal/kafka/patch.py @@ -247,7 +247,7 @@ def _instrument_message(messages, pin, start_ns, instance, err): name=schematize_messaging_operation(kafkax.CONSUME, provider="kafka", direction=SpanDirection.PROCESSING), service=trace_utils.ext_service(pin, config.kafka), span_type=SpanTypes.WORKER, - child_of=ctx if ctx is not None else pin.tracer.context_provider.active(), + child_of=ctx if ctx is not None and ctx.trace_id is not None else pin.tracer.context_provider.active(), activate=True, ) as span: # reset span start time to before function call From 47b44c4a4908ec39a53272265be12df1ebdb8fb0 Mon Sep 17 00:00:00 2001 From: William Conti Date: Tue, 10 Dec 2024 15:39:25 -0500 Subject: [PATCH 2/5] add a test case --- tests/contrib/kafka/test_kafka.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tests/contrib/kafka/test_kafka.py b/tests/contrib/kafka/test_kafka.py index 9bcf4ffc538..0423986ca4a 100644 --- a/tests/contrib/kafka/test_kafka.py +++ b/tests/contrib/kafka/test_kafka.py @@ -885,6 +885,33 @@ def test_context_header_injection_works_no_client_added_headers(kafka_topic, pro assert propagation_asserted is True +def test_consumer_uses_active_context_when_no_valid_distributed_context_exists(kafka_topic, producer, consumer, dummy_tracer): + # use a random int in this string to prevent reading a message produced by a previous test run + test_string = "producer doesnt inject context test " + str(random.randint(0, 1000)) + test_key = "producer doesnt inject context test " + str(random.randint(0, 1000)) + PAYLOAD = bytes(test_string, encoding="utf-8") + + producer.produce(kafka_topic, PAYLOAD, key=test_key) + producer.flush() + + Pin.override(consumer, tracer=dummy_tracer) + + with dummy_tracer.trace("kafka consumer parent span") as parent_span: + with override_config("kafka", dict(distributed_tracing_enabled=True)): + message = None + while message is None or str(message.value()) != str(PAYLOAD): + message = consumer.poll() + + traces = dummy_tracer.pop_traces() + consume_span = traces[len(traces) - 1][-1] + + # assert consumer_span parent is our custom span + assert consume_span.name == "kafka.consume" + assert consume_span.parent_id == parent_span.span_id + + Pin.override(consumer, tracer=None) + + def test_span_has_dsm_payload_hash(dummy_tracer, consumer, producer, kafka_topic): Pin.override(producer, tracer=dummy_tracer) Pin.override(consumer, tracer=dummy_tracer) From f1bf7a184a06475e9bdd76331f068e2df523e76d Mon Sep 17 00:00:00 2001 From: William Conti Date: Tue, 10 Dec 2024 15:47:20 -0500 Subject: [PATCH 3/5] add release note --- .../notes/fix-kafka-consumer-parenting-29acfd08e05d2350.yaml | 5 +++++ tests/contrib/kafka/test_kafka.py | 4 +++- 2 files changed, 8 insertions(+), 1 deletion(-) create mode 100644 releasenotes/notes/fix-kafka-consumer-parenting-29acfd08e05d2350.yaml diff --git a/releasenotes/notes/fix-kafka-consumer-parenting-29acfd08e05d2350.yaml b/releasenotes/notes/fix-kafka-consumer-parenting-29acfd08e05d2350.yaml new file mode 100644 index 00000000000..70f28705671 --- /dev/null +++ b/releasenotes/notes/fix-kafka-consumer-parenting-29acfd08e05d2350.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + Fixes an issue with Kafka consumer spans not using the active trace context when distributed + tracing was enabled and no valid distributed context found was found within a consumed message. diff --git a/tests/contrib/kafka/test_kafka.py b/tests/contrib/kafka/test_kafka.py index 0423986ca4a..29cc64410ec 100644 --- a/tests/contrib/kafka/test_kafka.py +++ b/tests/contrib/kafka/test_kafka.py @@ -885,7 +885,9 @@ def test_context_header_injection_works_no_client_added_headers(kafka_topic, pro assert propagation_asserted is True -def test_consumer_uses_active_context_when_no_valid_distributed_context_exists(kafka_topic, producer, consumer, dummy_tracer): +def test_consumer_uses_active_context_when_no_valid_distributed_context_exists( + kafka_topic, producer, consumer, dummy_tracer +): # use a random int in this string to prevent reading a message produced by a previous test run test_string = "producer doesnt inject context test " + str(random.randint(0, 1000)) test_key = "producer doesnt inject context test " + str(random.randint(0, 1000)) From 43f9ccea53d462571d18c8e3a774a9816165798e Mon Sep 17 00:00:00 2001 From: William Conti Date: Tue, 10 Dec 2024 15:59:33 -0500 Subject: [PATCH 4/5] fix spelling --- tests/contrib/kafka/test_kafka.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/contrib/kafka/test_kafka.py b/tests/contrib/kafka/test_kafka.py index 29cc64410ec..d49f85f26b2 100644 --- a/tests/contrib/kafka/test_kafka.py +++ b/tests/contrib/kafka/test_kafka.py @@ -889,8 +889,8 @@ def test_consumer_uses_active_context_when_no_valid_distributed_context_exists( kafka_topic, producer, consumer, dummy_tracer ): # use a random int in this string to prevent reading a message produced by a previous test run - test_string = "producer doesnt inject context test " + str(random.randint(0, 1000)) - test_key = "producer doesnt inject context test " + str(random.randint(0, 1000)) + test_string = "producer does not inject context test " + str(random.randint(0, 1000)) + test_key = "producer does not inject context test " + str(random.randint(0, 1000)) PAYLOAD = bytes(test_string, encoding="utf-8") producer.produce(kafka_topic, PAYLOAD, key=test_key) From 8a0d205b9608ec8286bbc93a9b66e1a977489962 Mon Sep 17 00:00:00 2001 From: William Conti <58711692+wconti27@users.noreply.github.com> Date: Wed, 11 Dec 2024 10:18:27 -0500 Subject: [PATCH 5/5] Update releasenotes/notes/fix-kafka-consumer-parenting-29acfd08e05d2350.yaml Co-authored-by: Emmett Butler <723615+emmettbutler@users.noreply.github.com> --- .../notes/fix-kafka-consumer-parenting-29acfd08e05d2350.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/releasenotes/notes/fix-kafka-consumer-parenting-29acfd08e05d2350.yaml b/releasenotes/notes/fix-kafka-consumer-parenting-29acfd08e05d2350.yaml index 70f28705671..df8cdcfe986 100644 --- a/releasenotes/notes/fix-kafka-consumer-parenting-29acfd08e05d2350.yaml +++ b/releasenotes/notes/fix-kafka-consumer-parenting-29acfd08e05d2350.yaml @@ -1,5 +1,5 @@ --- fixes: - | - Fixes an issue with Kafka consumer spans not using the active trace context when distributed + kafka: Fixes an issue with Kafka consumer spans not using the active trace context when distributed tracing was enabled and no valid distributed context found was found within a consumed message.