diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpConsumerActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpConsumerActor.java index c015a43e33..6a107d04a7 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpConsumerActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpConsumerActor.java @@ -37,6 +37,13 @@ import javax.jms.MessageListener; import javax.jms.TextMessage; +import org.apache.pekko.Done; +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.Props; +import org.apache.pekko.actor.Status; +import org.apache.pekko.japi.pf.ReceiveBuilder; +import org.apache.pekko.pattern.Patterns; +import org.apache.pekko.stream.javadsl.Sink; import org.apache.qpid.jms.JmsAcknowledgeCallback; import org.apache.qpid.jms.JmsMessageConsumer; import org.apache.qpid.jms.message.JmsMessage; @@ -63,20 +70,12 @@ import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure; import org.eclipse.ditto.connectivity.service.messaging.internal.RetrieveAddressStatus; import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.InfoProviderFactory; -import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter; import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier; +import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter; import org.eclipse.ditto.internal.utils.tracing.DittoTracing; import org.eclipse.ditto.internal.utils.tracing.span.SpanOperationName; import org.eclipse.ditto.internal.utils.tracing.span.TracingSpans; -import org.apache.pekko.Done; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.actor.Props; -import org.apache.pekko.actor.Status; -import org.apache.pekko.japi.pf.ReceiveBuilder; -import org.apache.pekko.pattern.Patterns; -import org.apache.pekko.stream.javadsl.Sink; - /** * Actor which receives message from an AMQP source and forwards them to a {@code MessageMappingProcessorActor}. */ @@ -342,10 +341,11 @@ private void messageConsumerFailed(final Status.Failure failure) { handleAddressStatus(addressStatus); } - private void handleJmsMessage(final JmsMessage message) { + private void handleJmsMessage(final JmsMessage message) throws JMSException { Map headers = null; - String correlationId = null; - var startedSpan = TracingSpans.emptyStartedSpan(SpanOperationName.of("amqp_consume")); + var startedSpan = TracingSpans.emptyStartedSpan( + SpanOperationName.of("amqp_consume: " + message.getJMSDestination()) + ); try { recordIncomingForRateLimit(message.getJMSMessageID()); if (logger.isDebugEnabled()) { @@ -358,12 +358,16 @@ private void handleJmsMessage(final JmsMessage message) { ackType); } headers = extractHeadersMapFromJmsMessage(message); - correlationId = headers.get(DittoHeaderDefinition.CORRELATION_ID.getKey()); + final String correlationId = headers.get(DittoHeaderDefinition.CORRELATION_ID.getKey()); startedSpan = DittoTracing.newPreparedSpan(headers, startedSpan.getOperationName()) .correlationId(correlationId) .connectionId(connectionId) .start(); - headers = startedSpan.propagateContext(headers); + headers = startedSpan.propagateContext(DittoHeaders.of(headers) + .toBuilder() + .removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey()) + .build() + ); final ExternalMessageBuilder builder = ExternalMessageFactory.newExternalMessageBuilder(headers); final ExternalMessage externalMessage = extractPayloadFromMessage(message, builder) .withAuthorizationContext(source.getAuthorizationContext()) diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaMessageTransformer.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaMessageTransformer.java index 8c2e6ea4a8..760a734a3f 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaMessageTransformer.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaMessageTransformer.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; +import org.apache.pekko.kafka.ConsumerMessage; import org.eclipse.ditto.base.model.common.ByteBufferUtils; import org.eclipse.ditto.base.model.common.CharsetDeterminer; import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; @@ -41,8 +42,6 @@ import org.eclipse.ditto.internal.utils.tracing.DittoTracing; import org.eclipse.ditto.internal.utils.tracing.span.SpanOperationName; -import org.apache.pekko.kafka.ConsumerMessage; - /** * Transforms incoming messages from Apache Kafka to {@link org.eclipse.ditto.connectivity.api.ExternalMessage}. */ @@ -108,11 +107,16 @@ public TransformationResult transform(final ConsumerRecord c final String correlationId = messageHeaders .getOrDefault(DittoHeaderDefinition.CORRELATION_ID.getKey(), UUID.randomUUID().toString()); - final var startedSpan = DittoTracing.newPreparedSpan(messageHeaders, SpanOperationName.of("kafka_consume")) - .correlationId(correlationId) + final var startedSpan = DittoTracing.newPreparedSpan(messageHeaders, + SpanOperationName.of("kafka_consume: " + consumerRecord.topic()) + ).correlationId(correlationId) .connectionId(connectionId) .start(); - messageHeaders = startedSpan.propagateContext(messageHeaders); + messageHeaders = startedSpan.propagateContext(DittoHeaders.of(messageHeaders) + .toBuilder() + .removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey()) + .build() + ); try { final String key = consumerRecord.key(); diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/rabbitmq/RabbitMQConsumerActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/rabbitmq/RabbitMQConsumerActor.java index 34f67defbc..7cefa5bc21 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/rabbitmq/RabbitMQConsumerActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/rabbitmq/RabbitMQConsumerActor.java @@ -22,6 +22,11 @@ import javax.annotation.Nullable; +import org.apache.pekko.Done; +import org.apache.pekko.NotUsed; +import org.apache.pekko.actor.Props; +import org.apache.pekko.japi.pf.ReceiveBuilder; +import org.apache.pekko.stream.javadsl.Sink; import org.eclipse.ditto.base.model.common.CharsetDeterminer; import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition; @@ -51,12 +56,6 @@ import com.rabbitmq.client.Delivery; import com.rabbitmq.client.Envelope; -import org.apache.pekko.Done; -import org.apache.pekko.NotUsed; -import org.apache.pekko.actor.Props; -import org.apache.pekko.japi.pf.ReceiveBuilder; -import org.apache.pekko.stream.javadsl.Sink; - /** * Actor which receives message from an RabbitMQ source and forwards them to a {@code MessageMappingProcessorActor}. @@ -135,7 +134,9 @@ private void handleDelivery(final Delivery delivery) { final Envelope envelope = delivery.getEnvelope(); final byte[] body = delivery.getBody(); - var startedSpan = TracingSpans.emptyStartedSpan(SpanOperationName.of("rabbitmq_consume")); + var startedSpan = TracingSpans.emptyStartedSpan( + SpanOperationName.of("rabbitmq_consume: " + envelope.getExchange()) + ); Map headers = null; try { @Nullable final String correlationId = properties.getCorrelationId(); @@ -150,7 +151,11 @@ private void handleDelivery(final Delivery delivery) { .connectionId(connectionId) .correlationId(correlationId) .start(); - headers = startedSpan.propagateContext(headers); + headers = startedSpan.propagateContext(DittoHeaders.of(headers) + .toBuilder() + .removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey()) + .build() + ); final ExternalMessageBuilder externalMessageBuilder = ExternalMessageFactory.newExternalMessageBuilder(headers);