Skip to content

Commit 8698fd6

Browse files
authored
Merge pull request #1852 from YayBurritos/1849-include-messageid-tag-in-rabbitmqactivitysource-during-basicpublish
Closes#1849: include a message id tag in rabbitmqactivitysource for published messages
2 parents 44d8be3 + c1c54f1 commit 8698fd6

File tree

5 files changed

+48
-197
lines changed

5 files changed

+48
-197
lines changed

projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
6161
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
6262

6363
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
64-
? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length)
64+
? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length, basicProperties)
6565
: default;
6666

6767
ulong publishSequenceNumber = 0;
@@ -116,7 +116,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
116116
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
117117

118118
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
119-
? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length)
119+
? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length, basicProperties)
120120
: default;
121121

122122
ulong publishSequenceNumber = 0;

projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public static bool UseRoutingKeyAsOperationName
6666
new KeyValuePair<string, object?>(ProtocolVersion, "0.9.1")
6767
};
6868

69-
internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize,
69+
internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize, IReadOnlyBasicProperties basicProperties,
7070
ActivityContext linkedContext = default)
7171
{
7272
if (!s_publisherSource.HasListeners())
@@ -83,7 +83,7 @@ public static bool UseRoutingKeyAsOperationName
8383
ActivityKind.Producer, linkedContext);
8484
if (activity != null && activity.IsAllDataRequested)
8585
{
86-
PopulateMessagingTags(MessagingOperationTypeSend, MessagingOperationNameBasicPublish, routingKey, exchange, 0, bodySize, activity);
86+
PopulateMessagingTags(MessagingOperationTypeSend, MessagingOperationNameBasicPublish, routingKey, exchange, 0, basicProperties, bodySize, activity);
8787
}
8888

8989
return activity;

projects/Test/SequentialIntegration/TestActivitySource.cs

Lines changed: 22 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -75,120 +75,6 @@ void AssertIntTagGreaterThanZero(Activity activity, string name)
7575
Assert.True(activity.GetTagItem(name) is int result && result > 0);
7676
}
7777

78-
[Theory]
79-
[InlineData(true, true)]
80-
[InlineData(true, false)]
81-
[InlineData(false, true)]
82-
[InlineData(false, false)]
83-
public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
84-
{
85-
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
86-
RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent;
87-
var _activities = new List<Activity>();
88-
using ActivityListener activityListener = StartActivityListener(_activities);
89-
await Task.Delay(500);
90-
string queueName = $"{Guid.NewGuid()}";
91-
QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
92-
byte[] sendBody = Encoding.UTF8.GetBytes("hi");
93-
byte[] consumeBody = null;
94-
var consumer = new AsyncEventingBasicConsumer(_channel);
95-
var consumerReceivedTcs =
96-
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
97-
consumer.ReceivedAsync += (o, a) =>
98-
{
99-
consumeBody = a.Body.ToArray();
100-
consumerReceivedTcs.SetResult(true);
101-
return Task.CompletedTask;
102-
};
103-
104-
string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
105-
await _channel.BasicPublishAsync("", q.QueueName, true, sendBody);
106-
107-
await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
108-
Assert.True(await consumerReceivedTcs.Task);
109-
110-
await _channel.BasicCancelAsync(consumerTag);
111-
await Task.Delay(500);
112-
AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queueName, _activities, true);
113-
}
114-
115-
[Theory]
116-
[InlineData(true, true)]
117-
[InlineData(true, false)]
118-
[InlineData(false, true)]
119-
[InlineData(false, false)]
120-
public async Task TestPublisherWithCachedStringsAndConsumerActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
121-
{
122-
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
123-
RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent;
124-
var _activities = new List<Activity>();
125-
using ActivityListener activityListener = StartActivityListener(_activities);
126-
await Task.Delay(500);
127-
string queueName = $"{Guid.NewGuid()}";
128-
QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
129-
byte[] sendBody = Encoding.UTF8.GetBytes("hi");
130-
byte[] consumeBody = null;
131-
var consumer = new AsyncEventingBasicConsumer(_channel);
132-
var consumerReceivedTcs =
133-
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
134-
consumer.ReceivedAsync += (o, a) =>
135-
{
136-
consumeBody = a.Body.ToArray();
137-
consumerReceivedTcs.SetResult(true);
138-
return Task.CompletedTask;
139-
};
140-
141-
string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
142-
CachedString exchange = new CachedString("");
143-
CachedString routingKey = new CachedString(q.QueueName);
144-
await _channel.BasicPublishAsync(exchange, routingKey, true, sendBody);
145-
146-
await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
147-
Assert.True(await consumerReceivedTcs.Task);
148-
149-
await _channel.BasicCancelAsync(consumerTag);
150-
await Task.Delay(500);
151-
AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queueName, _activities, true);
152-
}
153-
154-
[Theory]
155-
[InlineData(true, true)]
156-
[InlineData(true, false)]
157-
[InlineData(false, true)]
158-
[InlineData(false, false)]
159-
public async Task TestPublisherWithPublicationAddressAndConsumerActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
160-
{
161-
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
162-
RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent;
163-
var _activities = new List<Activity>();
164-
using ActivityListener activityListener = StartActivityListener(_activities);
165-
await Task.Delay(500);
166-
string queueName = $"{Guid.NewGuid()}";
167-
QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
168-
byte[] sendBody = Encoding.UTF8.GetBytes("hi");
169-
byte[] consumeBody = null;
170-
var consumer = new AsyncEventingBasicConsumer(_channel);
171-
var consumerReceivedTcs =
172-
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
173-
consumer.ReceivedAsync += (o, a) =>
174-
{
175-
consumeBody = a.Body.ToArray();
176-
consumerReceivedTcs.SetResult(true);
177-
return Task.CompletedTask;
178-
};
179-
180-
string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
181-
PublicationAddress publicationAddress = new PublicationAddress(ExchangeType.Direct, "", q.QueueName);
182-
await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody);
183-
184-
await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
185-
Assert.True(await consumerReceivedTcs.Task);
186-
187-
await _channel.BasicCancelAsync(consumerTag);
188-
await Task.Delay(500);
189-
AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queueName, _activities, true);
190-
}
191-
19278
[Theory]
19379
[InlineData(true, true)]
19480
[InlineData(true, false)]
@@ -307,11 +193,15 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn
307193
}
308194

309195
[Theory]
310-
[InlineData(true, true)]
311-
[InlineData(true, false)]
312-
[InlineData(false, true)]
313-
[InlineData(false, false)]
314-
public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
196+
[InlineData(true, true, true)]
197+
[InlineData(true, true, false)]
198+
[InlineData(true, false, true)]
199+
[InlineData(true, false, false)]
200+
[InlineData(false, true, true)]
201+
[InlineData(false, true, false)]
202+
[InlineData(false, false, true)]
203+
[InlineData(false, false, false)]
204+
public async Task TestPublisherAndBasicGetActivityTagsAsync(bool useRoutingKeyAsOperationName, bool usePublisherAsParent, bool useMessageId)
315205
{
316206
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
317207
RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent;
@@ -321,18 +211,20 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
321211
string queue = $"queue-{Guid.NewGuid()}";
322212
const string msg = "for basic.get";
323213

214+
var basicProps = useMessageId ? new BasicProperties() { MessageId = Guid.NewGuid().ToString() } : new BasicProperties();
215+
324216
try
325217
{
326218
await _channel.QueueDeclareAsync(queue, false, false, false, null);
327-
await _channel.BasicPublishAsync("", queue, true, Encoding.UTF8.GetBytes(msg));
219+
await _channel.BasicPublishAsync("", queue, true, basicProps, Encoding.UTF8.GetBytes(msg));
328220
QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue);
329221
Assert.Equal(1u, ok.MessageCount);
330222
BasicGetResult res = await _channel.BasicGetAsync(queue, true);
331223
Assert.Equal(msg, Encoding.UTF8.GetString(res.Body.ToArray()));
332224
ok = await _channel.QueueDeclarePassiveAsync(queue);
333225
Assert.Equal(0u, ok.MessageCount);
334226
await Task.Delay(500);
335-
AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queue, activities, false);
227+
AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queue, activities, false, basicProps.MessageId);
336228
}
337229
finally
338230
{
@@ -345,7 +237,7 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
345237
[InlineData(true, false)]
346238
[InlineData(false, true)]
347239
[InlineData(false, false)]
348-
public async Task TestPublisherWithCachedStringsAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
240+
public async Task TestPublisherWithCachedStringsAndBasicGetActivityTagsAsync(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
349241
{
350242
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
351243
RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent;
@@ -381,7 +273,7 @@ public async Task TestPublisherWithCachedStringsAndBasicGetActivityTags(bool use
381273
[InlineData(true, false)]
382274
[InlineData(false, true)]
383275
[InlineData(false, false)]
384-
public async Task TestPublisherWithPublicationAddressAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
276+
public async Task TestPublisherWithPublicationAddressAndBasicGetActivityTagsAsync(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
385277
{
386278
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
387279
RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent;
@@ -427,7 +319,7 @@ private static ActivityListener StartActivityListener(List<Activity> activities)
427319
}
428320

429321
private void AssertActivityData(bool useRoutingKeyAsOperationName, bool usePublisherAsParent, string queueName,
430-
List<Activity> activityList, bool isDeliver = false)
322+
List<Activity> activityList, bool isDeliver = false, string messageId = null)
431323
{
432324
string childName = isDeliver ? "deliver" : "fetch";
433325
Activity[] activities = activityList.ToArray();
@@ -480,6 +372,12 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, bool usePubli
480372
AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingEnvelopeSize);
481373
AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingBodySize);
482374
AssertIntTagGreaterThanZero(receiveActivity, RabbitMQActivitySource.MessagingBodySize);
375+
376+
if (messageId is not null)
377+
{
378+
AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessageId, messageId);
379+
AssertStringTagEquals(receiveActivity, RabbitMQActivitySource.MessageId, messageId);
380+
}
483381
}
484382
}
485383
}

projects/Test/SequentialIntegration/TestHeartbeats.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public override Task InitializeAsync()
5959

6060
[SkippableFact(Timeout = 35000)]
6161
[Trait("Category", "LongRunning")]
62-
public async Task TestThatHeartbeatWriterUsesConfigurableInterval()
62+
public async Task TestThatHeartbeatWriterUsesConfigurableIntervalAsync()
6363
{
6464
Skip.IfNot(LongRunningTestsEnabled(), "RABBITMQ_LONG_RUNNING_TESTS is not set, skipping test");
6565

@@ -72,7 +72,7 @@ public async Task TestThatHeartbeatWriterUsesConfigurableInterval()
7272

7373
[SkippableFact]
7474
[Trait("Category", "LongRunning")]
75-
public async Task TestThatHeartbeatWriterWithTLSEnabled()
75+
public async Task TestThatHeartbeatWriterWithTLSEnabledAsync()
7676
{
7777
Skip.IfNot(LongRunningTestsEnabled(), "RABBITMQ_LONG_RUNNING_TESTS is not set, skipping test");
7878

@@ -94,7 +94,7 @@ public async Task TestThatHeartbeatWriterWithTLSEnabled()
9494

9595
[SkippableFact(Timeout = 90000)]
9696
[Trait("Category", "LongRunning")]
97-
public async Task TestHundredsOfConnectionsWithRandomHeartbeatInterval()
97+
public async Task TestHundredsOfConnectionsWithRandomHeartbeatIntervalAsync()
9898
{
9999
Skip.IfNot(LongRunningTestsEnabled(), "RABBITMQ_LONG_RUNNING_TESTS is not set, skipping test");
100100

0 commit comments

Comments
 (0)