Skip to content

Commit 1968918

Browse files
committed
feat(pubsub): add support for subscription names to enable multiple subscriptions per topic
Introduces subscription name support across topic metadata, attributes, and routing logic to allow multiple distinct subscriptions to the same topic within a single application. Enhances topic attributes and subscription models with an optional subscription name to differentiate subscription handlers. Updates routing and subscription grouping to incorporate subscription names for precise subscription management. Adds example handlers and integration tests demonstrating multiple subscriptions to the same topic distinguished by subscription names. This improvement enables advanced pubsub scenarios where separate subscription handlers can independently process messages from the same topic without conflict. Signed-off-by: Anatoliy Kolodkin <[email protected]>
1 parent 99bc726 commit 1968918

File tree

9 files changed

+88
-13
lines changed

9 files changed

+88
-13
lines changed

examples/AspNetCore/ControllerSample/Controllers/SampleController.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,4 +294,30 @@ public ActionResult<Account> ExampleCustomTopicMetadata(Transaction transaction)
294294
{
295295
return Ok();
296296
}
297+
298+
/// <summary>
299+
/// Example demonstrating multiple subscriptions to the same topic using subscription names.
300+
/// This handler processes deposits for accounting purposes.
301+
/// </summary>
302+
[Topic("pubsub", "multisub-deposit", subscriptionName: "deposit-accounting-subscription")]
303+
[HttpPost("multisub/deposit/accounting")]
304+
public ActionResult<Account> MultiSubDepositAccounting(Transaction transaction)
305+
{
306+
logger.LogInformation("Accounting handler: Processing deposit {Id} for amount {Amount}",
307+
transaction.Id, transaction.Amount);
308+
return Ok(new { handler = "accounting", transactionId = transaction.Id });
309+
}
310+
311+
/// <summary>
312+
/// Example demonstrating multiple subscriptions to the same topic using subscription names.
313+
/// This handler processes deposits for notification purposes.
314+
/// </summary>
315+
[Topic("pubsub", "multisub-deposit", subscriptionName: "deposit-notification-subscription")]
316+
[HttpPost("multisub/deposit/notifications")]
317+
public ActionResult<Account> MultiSubDepositNotifications(Transaction transaction)
318+
{
319+
logger.LogInformation("Notification handler: Processing deposit {Id} for amount {Amount}",
320+
transaction.Id, transaction.Amount);
321+
return Ok(new { handler = "notifications", transactionId = transaction.Id });
322+
}
297323
}

examples/AspNetCore/ControllerSample/CustomTopicAttribute.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,7 @@ public CustomTopicAttribute(string pubsubName, string name)
4141

4242
/// <inheritdoc/>
4343
public int Priority { get; }
44+
45+
/// <inheritdoc/>
46+
public string SubscriptionName { get; }
4447
}

src/Dapr.AspNetCore/DaprEndpointRouteBuilderExtensions.cs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,9 @@ private static IEndpointConventionBuilder CreateSubscribeEndPoint(IEndpointRoute
7474
var originalTopicMetadata = e.Metadata.GetOrderedMetadata<IOriginalTopicMetadata>();
7575
var bulkSubscribeMetadata = e.Metadata.GetOrderedMetadata<IBulkSubscribeMetadata>();
7676

77-
var subs = new List<(string PubsubName, string Name, string DeadLetterTopic, bool? EnableRawPayload,
78-
string Match, int Priority, Dictionary<string, string[]> OriginalTopicMetadata,
79-
string MetadataSeparator, RoutePattern RoutePattern, DaprTopicBulkSubscribe bulkSubscribe)>();
77+
var subs = new List<(string PubsubName, string Name, string DeadLetterTopic, bool? EnableRawPayload,
78+
string Match, int Priority, Dictionary<string, string[]> OriginalTopicMetadata,
79+
string MetadataSeparator, RoutePattern RoutePattern, DaprTopicBulkSubscribe bulkSubscribe, string SubscriptionName)>();
8080

8181
for (int i = 0; i < topicMetadata.Count(); i++)
8282
{
@@ -109,13 +109,14 @@ private static IEndpointConventionBuilder CreateSubscribeEndPoint(IEndpointRoute
109109
.ToDictionary(m => m.Key, m => m.Select(c => c.Value).Distinct().ToArray()),
110110
(topicMetadata[i] as IOwnedOriginalTopicMetadata)?.MetadataSeparator,
111111
e.RoutePattern,
112-
bulkSubscribe));
112+
bulkSubscribe,
113+
topicMetadata[i].SubscriptionName));
113114
}
114115

115116
return subs;
116117
})
117118
.Distinct()
118-
.GroupBy(e => new { e.PubsubName, e.Name })
119+
.GroupBy(e => new { e.PubsubName, e.Name, e.SubscriptionName })
119120
.Select(e => e.OrderBy(e => e.Priority))
120121
.Select(e =>
121122
{
@@ -155,7 +156,8 @@ private static IEndpointConventionBuilder CreateSubscribeEndPoint(IEndpointRoute
155156
Topic = first.Name,
156157
PubsubName = first.PubsubName,
157158
Metadata = metadata.Count > 0 ? metadata : null,
158-
BulkSubscribe = first.bulkSubscribe
159+
BulkSubscribe = first.bulkSubscribe,
160+
Name = first.SubscriptionName
159161
};
160162

161163
if (first.DeadLetterTopic != null)

src/Dapr.AspNetCore/ITopicMetadata.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,11 @@ public interface ITopicMetadata
3737
/// The priority in which this rule should be evaluated (lower to higher).
3838
/// </summary>
3939
int Priority { get; }
40+
41+
/// <summary>
42+
/// Gets the subscription name. This is optional and allows multiple subscriptions
43+
/// to the same topic within a single application. If not specified, the subscription
44+
/// is identified by the combination of PubsubName and topic Name.
45+
/// </summary>
46+
string SubscriptionName { get; }
4047
}

src/Dapr.AspNetCore/Subscription.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ internal class Subscription
4444
/// Gets or sets the metadata.
4545
/// </summary>
4646
public Metadata Metadata { get; set; }
47-
47+
4848
/// <summary>
4949
/// Gets or sets the deadletter topic.
5050
/// </summary>
@@ -54,6 +54,12 @@ internal class Subscription
5454
/// Gets or sets the bulk subscribe options.
5555
/// </summary>
5656
public DaprTopicBulkSubscribe BulkSubscribe { get; set; }
57+
58+
/// <summary>
59+
/// Gets or sets the subscription name. This is optional and allows multiple subscriptions
60+
/// to the same topic within a single application.
61+
/// </summary>
62+
public string Name { get; set; }
5763
}
5864

5965
/// <summary>

src/Dapr.AspNetCore/TopicAttribute.cs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ public class TopicAttribute : Attribute, ITopicMetadata, IRawTopicMetadata, IOwn
2828
/// <param name="name">The topic name.</param>
2929
/// <param name="ownedMetadatas">The topic owned metadata ids.</param>
3030
/// <param name="metadataSeparator">Separator to use for metadata.</param>
31-
public TopicAttribute(string pubsubName, string name, string[] ownedMetadatas = null, string metadataSeparator = null)
31+
/// <param name="subscriptionName">The subscription name (optional). Allows multiple subscriptions to the same topic.</param>
32+
public TopicAttribute(string pubsubName, string name, string[] ownedMetadatas = null, string metadataSeparator = null, string subscriptionName = null)
3233
{
3334
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
3435
ArgumentVerifier.ThrowIfNullOrEmpty(name, nameof(name));
@@ -37,6 +38,7 @@ public TopicAttribute(string pubsubName, string name, string[] ownedMetadatas =
3738
this.PubsubName = pubsubName;
3839
this.OwnedMetadatas = ownedMetadatas;
3940
this.MetadataSeparator = metadataSeparator;
41+
this.SubscriptionName = subscriptionName;
4042
}
4143

4244
/// <summary>
@@ -47,7 +49,8 @@ public TopicAttribute(string pubsubName, string name, string[] ownedMetadatas =
4749
/// <param name="enableRawPayload">The enable/disable raw pay load flag.</param>
4850
/// <param name="ownedMetadatas">The topic owned metadata ids.</param>
4951
/// <param name="metadataSeparator">Separator to use for metadata.</param>
50-
public TopicAttribute(string pubsubName, string name, bool enableRawPayload, string[] ownedMetadatas = null, string metadataSeparator = null)
52+
/// <param name="subscriptionName">The subscription name (optional). Allows multiple subscriptions to the same topic.</param>
53+
public TopicAttribute(string pubsubName, string name, bool enableRawPayload, string[] ownedMetadatas = null, string metadataSeparator = null, string subscriptionName = null)
5154
{
5255
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
5356
ArgumentVerifier.ThrowIfNullOrEmpty(name, nameof(name));
@@ -57,6 +60,7 @@ public TopicAttribute(string pubsubName, string name, bool enableRawPayload, str
5760
this.EnableRawPayload = enableRawPayload;
5861
this.OwnedMetadatas = ownedMetadatas;
5962
this.MetadataSeparator = metadataSeparator;
63+
this.SubscriptionName = subscriptionName;
6064
}
6165

6266
/// <summary>
@@ -68,7 +72,8 @@ public TopicAttribute(string pubsubName, string name, bool enableRawPayload, str
6872
/// <param name="priority">The priority of the rule (low-to-high values).</param>
6973
/// <param name="ownedMetadatas">The topic owned metadata ids.</param>
7074
/// <param name="metadataSeparator">Separator to use for metadata.</param>
71-
public TopicAttribute(string pubsubName, string name, string match, int priority, string[] ownedMetadatas = null, string metadataSeparator = null)
75+
/// <param name="subscriptionName">The subscription name (optional). Allows multiple subscriptions to the same topic.</param>
76+
public TopicAttribute(string pubsubName, string name, string match, int priority, string[] ownedMetadatas = null, string metadataSeparator = null, string subscriptionName = null)
7277
{
7378
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
7479
ArgumentVerifier.ThrowIfNullOrEmpty(name, nameof(name));
@@ -79,6 +84,7 @@ public TopicAttribute(string pubsubName, string name, string match, int priority
7984
this.Priority = priority;
8085
this.OwnedMetadatas = ownedMetadatas;
8186
this.MetadataSeparator = metadataSeparator;
87+
this.SubscriptionName = subscriptionName;
8288
}
8389

8490
/// <summary>
@@ -91,7 +97,8 @@ public TopicAttribute(string pubsubName, string name, string match, int priority
9197
/// <param name="priority">The priority of the rule (low-to-high values).</param>
9298
/// <param name="ownedMetadatas">The topic owned metadata ids.</param>
9399
/// <param name="metadataSeparator">Separator to use for metadata.</param>
94-
public TopicAttribute(string pubsubName, string name, bool enableRawPayload, string match, int priority, string[] ownedMetadatas = null, string metadataSeparator = null)
100+
/// <param name="subscriptionName">The subscription name (optional). Allows multiple subscriptions to the same topic.</param>
101+
public TopicAttribute(string pubsubName, string name, bool enableRawPayload, string match, int priority, string[] ownedMetadatas = null, string metadataSeparator = null, string subscriptionName = null)
95102
{
96103
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
97104
ArgumentVerifier.ThrowIfNullOrEmpty(name, nameof(name));
@@ -103,6 +110,7 @@ public TopicAttribute(string pubsubName, string name, bool enableRawPayload, str
103110
this.Priority = priority;
104111
this.OwnedMetadatas = ownedMetadatas;
105112
this.MetadataSeparator = metadataSeparator;
113+
this.SubscriptionName = subscriptionName;
106114
}
107115

108116
/// <summary>
@@ -114,7 +122,8 @@ public TopicAttribute(string pubsubName, string name, bool enableRawPayload, str
114122
/// <param name="enableRawPayload">The enable/disable raw pay load flag.</param>
115123
/// <param name="ownedMetadatas">The topic owned metadata ids.</param>
116124
/// <param name="metadataSeparator">Separator to use for metadata.</param>
117-
public TopicAttribute(string pubsubName, string name, string deadLetterTopic, bool enableRawPayload, string[] ownedMetadatas = null, string metadataSeparator = null)
125+
/// <param name="subscriptionName">The subscription name (optional). Allows multiple subscriptions to the same topic.</param>
126+
public TopicAttribute(string pubsubName, string name, string deadLetterTopic, bool enableRawPayload, string[] ownedMetadatas = null, string metadataSeparator = null, string subscriptionName = null)
118127
{
119128
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
120129
ArgumentVerifier.ThrowIfNullOrEmpty(name, nameof(name));
@@ -125,6 +134,7 @@ public TopicAttribute(string pubsubName, string name, string deadLetterTopic, bo
125134
this.EnableRawPayload = enableRawPayload;
126135
this.OwnedMetadatas = ownedMetadatas;
127136
this.MetadataSeparator = metadataSeparator;
137+
this.SubscriptionName = subscriptionName;
128138
}
129139

130140
/// <inheritdoc/>
@@ -150,4 +160,7 @@ public TopicAttribute(string pubsubName, string name, string deadLetterTopic, bo
150160

151161
/// <inheritdoc/>
152162
public string DeadLetterTopic { get; set; }
163+
164+
/// <inheritdoc/>
165+
public string SubscriptionName { get; set; }
153166
}

test/Dapr.AspNetCore.IntegrationTest.App/CustomTopicAttribute.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,6 @@ public CustomTopicAttribute(string pubsubName, string name)
3030
public new string Match { get; }
3131

3232
public int Priority { get; }
33+
34+
public string SubscriptionName { get; }
3335
}

test/Dapr.AspNetCore.IntegrationTest.App/DaprController.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,4 +170,17 @@ public ActionResult<UserInfo> RequiresApiToken(UserInfo user)
170170
{
171171
return user;
172172
}
173+
174+
// Test subscription names - multiple subscriptions to same topic
175+
[Topic("pubsub", "H", subscriptionName: "subscription-h-1")]
176+
[HttpPost("/H-Handler1")]
177+
public void TopicHHandler1()
178+
{
179+
}
180+
181+
[Topic("pubsub", "H", subscriptionName: "subscription-h-2")]
182+
[HttpPost("/H-Handler2")]
183+
public void TopicHHandler2()
184+
{
185+
}
173186
}

test/Dapr.AspNetCore.IntegrationTest/SubscribeEndpointTest.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public async Task SubscribeEndpoint_ReportsTopics()
4040
var json = await JsonSerializer.DeserializeAsync<JsonElement>(stream);
4141

4242
json.ValueKind.ShouldBe(JsonValueKind.Array);
43-
json.GetArrayLength().ShouldBe(18);
43+
json.GetArrayLength().ShouldBe(20); // Updated from 18 to 20 to account for 2 new subscription name tests
4444

4545
var subscriptions = new List<(string PubsubName, string Topic, string Route, string rawPayload,
4646
string match, string metadata, string DeadLetterTopic, string bulkSubscribeMetadata)>();
@@ -131,6 +131,9 @@ public async Task SubscribeEndpoint_ReportsTopics()
131131
"{\"enabled\":true,\"maxMessagesCount\":500,\"maxAwaitDurationMs\":2000}"));
132132
subscriptions.ShouldContain(("pubsub", "splitMetadataTopicBuilder", "splitMetadataTopics", string.Empty, string.Empty, "n1=v1;n2=v1", string.Empty, String.Empty));
133133
subscriptions.ShouldContain(("pubsub", "metadataseparatorbyemptytring", "topicmetadataseparatorattrbyemptytring", string.Empty, string.Empty, "n1=v1,", string.Empty, String.Empty));
134+
// Test subscription names - multiple subscriptions to same topic
135+
subscriptions.ShouldContain(("pubsub", "H", "H-Handler1", string.Empty, string.Empty, string.Empty, string.Empty, String.Empty));
136+
subscriptions.ShouldContain(("pubsub", "H", "H-Handler2", string.Empty, string.Empty, string.Empty, string.Empty, String.Empty));
134137
// Test priority route sorting
135138
var eTopic = subscriptions.FindAll(e => e.Topic == "E");
136139
eTopic.Count.ShouldBe(3);

0 commit comments

Comments
 (0)