Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions .pubnub.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
name: c-sharp
version: "8.0.4"
version: "8.1.0"
schema: 1
scm: github.com/pubnub/c-sharp
changelog:
- date: 2025-12-18
version: v8.1.0
changes:
- type: feature
text: "Added a config option to split multi-channel subscribes for keysets with channel sharding enabled."
- date: 2025-12-04
version: v8.0.4
changes:
Expand Down Expand Up @@ -972,7 +977,7 @@ features:
- QUERY-PARAM
supported-platforms:
-
version: Pubnub 'C#' 8.0.4
version: Pubnub 'C#' 8.1.0
platforms:
- Windows 10 and up
- Windows Server 2008 and up
Expand All @@ -983,7 +988,7 @@ supported-platforms:
- .Net Framework 4.6.1+
- .Net Framework 6.0
-
version: PubnubPCL 'C#' 8.0.4
version: PubnubPCL 'C#' 8.1.0
platforms:
- Xamarin.Android
- Xamarin.iOS
Expand All @@ -1003,7 +1008,7 @@ supported-platforms:
- .Net Core
- .Net 6.0
-
version: PubnubUWP 'C#' 8.0.4
version: PubnubUWP 'C#' 8.1.0
platforms:
- Windows Phone 10
- Universal Windows Apps
Expand All @@ -1027,7 +1032,7 @@ sdks:
distribution-type: source
distribution-repository: GitHub
package-name: Pubnub
location: https://github.com/pubnub/c-sharp/releases/tag/v8.0.4
location: https://github.com/pubnub/c-sharp/releases/tag/v8.1.0
requires:
-
name: ".Net"
Expand Down Expand Up @@ -1310,7 +1315,7 @@ sdks:
distribution-type: source
distribution-repository: GitHub
package-name: PubNubPCL
location: https://github.com/pubnub/c-sharp/releases/tag/v8.0.4
location: https://github.com/pubnub/c-sharp/releases/tag/v8.1.0
requires:
-
name: ".Net Core"
Expand Down Expand Up @@ -1669,7 +1674,7 @@ sdks:
distribution-type: source
distribution-repository: GitHub
package-name: PubnubUWP
location: https://github.com/pubnub/c-sharp/releases/tag/v8.0.4
location: https://github.com/pubnub/c-sharp/releases/tag/v8.1.0
requires:
-
name: "Universal Windows Platform Development"
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
v8.1.0 - December 18 2025
-----------------------------
- Added: added a config option to split multi-channel subscribes for keysets with channel sharding enabled.

v8.0.4 - December 04 2025
-----------------------------
- Modified: prevent resubscribe to previously subscribed entities.
Expand Down
83 changes: 61 additions & 22 deletions src/Api/PubnubApi/EndPoint/PubSub/SubscribeEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
private SubscribeManager2 manager;
private Dictionary<string, object> queryParam;
private Pubnub PubnubInstance;
private SubscribeEventEngine subscribeEventEngine;
private SubscribeEventEngineFactory subscribeEventEngineFactory;
private PresenceOperation<T> presenceOperation;
private HeartbeatOperation heartbeatOperation;
Expand Down Expand Up @@ -100,17 +99,24 @@
subscribeChannelGroupNames ??= new List<string>();

if (presenceSubscribeEnabled) {
List<string> presenceChannelNames = (this.subscribeChannelNames != null && this.subscribeChannelNames.Count > 0 && !string.IsNullOrEmpty(this.subscribeChannelNames[0]))
? this.subscribeChannelNames.Select(c => string.Format(CultureInfo.InvariantCulture, "{0}-pnpres", c)).ToList() : new List<string>();
List<string> presenceChannelGroupNames = (this.subscribeChannelGroupNames != null && this.subscribeChannelGroupNames.Count > 0 && !string.IsNullOrEmpty(this.subscribeChannelGroupNames[0]))
? this.subscribeChannelGroupNames.Select(c => string.Format(CultureInfo.InvariantCulture, "{0}-pnpres", c)).ToList() : new List<string>();

if (this.subscribeChannelNames != null && presenceChannelNames.Count > 0) {
this.subscribeChannelNames.AddRange(presenceChannelNames);
if (config.SplitSubscribeCalls)
{
config?.Logger.Error("Presence subscription is disabled when SplitSubscribeCalls = true!");
}
else
{
List<string> presenceChannelNames = (this.subscribeChannelNames != null && this.subscribeChannelNames.Count > 0 && !string.IsNullOrEmpty(this.subscribeChannelNames[0]))
? this.subscribeChannelNames.Select(c => string.Format(CultureInfo.InvariantCulture, "{0}-pnpres", c)).ToList() : new List<string>();
List<string> presenceChannelGroupNames = (this.subscribeChannelGroupNames != null && this.subscribeChannelGroupNames.Count > 0 && !string.IsNullOrEmpty(this.subscribeChannelGroupNames[0]))
? this.subscribeChannelGroupNames.Select(c => string.Format(CultureInfo.InvariantCulture, "{0}-pnpres", c)).ToList() : new List<string>();

if (this.subscribeChannelGroupNames != null && presenceChannelGroupNames.Count > 0) {
this.subscribeChannelGroupNames.AddRange(presenceChannelGroupNames);
if (this.subscribeChannelNames != null && presenceChannelNames.Count > 0) {
this.subscribeChannelNames.AddRange(presenceChannelNames);
}

if (this.subscribeChannelGroupNames != null && presenceChannelGroupNames.Count > 0) {
this.subscribeChannelGroupNames.AddRange(presenceChannelGroupNames);
}
}
}

Expand All @@ -123,6 +129,31 @@
Subscribe(channelNames, channelGroupNames, cursor, this.queryParam);
}

private SubscribeEventEngine InitSubscribeEvenEngine(string id)
{
SubscribeEventEngine subscribeEventEngine;
if (subscribeEventEngineFactory.HasEventEngine(id))
{
subscribeEventEngine = subscribeEventEngineFactory.GetEventEngine(id);
} else {
var subscribeManager = new SubscribeManager2(config, jsonLibrary, unit, pubnubTokenMgr, PubnubInstance);
subscribeEventEngine = subscribeEventEngineFactory.InitializeEventEngine(id, PubnubInstance, config, subscribeManager, this.EventEmitter, jsonLibrary, StatusEmitter);
subscribeEventEngine.OnStateTransition += (transitionResult) =>
{
SubscribeEventEngine_OnStateTransition(subscribeEventEngine, transitionResult);
};
subscribeEventEngine.OnEventQueued += (@event) =>
{
SubscribeEventEngine_OnEventQueued(subscribeEventEngine, @event);
};
subscribeEventEngine.OnEffectDispatch += (invocation) =>
{
SubscribeEventEngine_OnEffectDispatch(subscribeEventEngine, invocation);
};
}
return subscribeEventEngine;
}

private async void Subscribe(string[] channels, string[] channelGroups, SubscriptionCursor cursor, Dictionary<string, object> externalQueryParam)
{
try
Expand All @@ -131,16 +162,24 @@
throw new ArgumentException("Either Channel Or Channel Group or Both should be provided.");
}

if (subscribeEventEngineFactory.HasEventEngine(instanceId)) {
subscribeEventEngine = subscribeEventEngineFactory.GetEventEngine(instanceId);
} else {
var subscribeManager = new SubscribeManager2(config, jsonLibrary, unit, pubnubTokenMgr, PubnubInstance);
subscribeEventEngine = subscribeEventEngineFactory.InitializeEventEngine(instanceId, PubnubInstance, config, subscribeManager, this.EventEmitter, jsonLibrary, StatusEmitter);
subscribeEventEngine.OnStateTransition += SubscribeEventEngine_OnStateTransition;
subscribeEventEngine.OnEventQueued += SubscribeEventEngine_OnEventQueued;
subscribeEventEngine.OnEffectDispatch += SubscribeEventEngine_OnEffectDispatch;
if (config.SplitSubscribeCalls)
{
if (channelGroups is { Length: > 0 })
{
config.Logger?.Warn("Subscribing to channel groups is disabled when PNConfiguration.SplitSubscribeCalls = true!");
}
foreach (var channel in channels)
{
var subscribeEventEngine = InitSubscribeEvenEngine($"{instanceId}-{channel}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏻

var channelSubscriptionCursor = cursor == null ? null : new SubscriptionCursor(cursor.Timetoken, cursor.Region);
subscribeEventEngine.Subscribe<T>(new []{channel}, new string[]{}, channelSubscriptionCursor);
}
}
else
{
var subscribeEventEngine = InitSubscribeEvenEngine(instanceId);
subscribeEventEngine.Subscribe<T>(channels, channelGroups, cursor);
}
subscribeEventEngine.Subscribe<T>(channels, channelGroups, cursor);
if (this.presenceOperation == null)
{
if (inputChannels.Any() || inputChannelGroups.Any())
Expand All @@ -164,7 +203,7 @@
}
}

private void SubscribeEventEngine_OnEffectDispatch(IEffectInvocation obj)
private void SubscribeEventEngine_OnEffectDispatch(SubscribeEventEngine subscribeEventEngine, IEffectInvocation obj)
{
try
{
Expand All @@ -177,7 +216,7 @@
}
}

private void SubscribeEventEngine_OnEventQueued(IEvent @event)
private void SubscribeEventEngine_OnEventQueued(SubscribeEventEngine subscribeEventEngine, IEvent @event)
{
try
{
Expand All @@ -199,7 +238,7 @@
}
}

private void SubscribeEventEngine_OnStateTransition(EventEngine.Core.TransitionResult obj)
private void SubscribeEventEngine_OnStateTransition(SubscribeEventEngine subscribeEventEngine, EventEngine.Core.TransitionResult obj)
{
try
{
Expand All @@ -212,7 +251,7 @@
}
}

private void MessageEmitter<T>(Pubnub pubnubInstance, PNMessageResult<T> messageResult)

Check warning on line 254 in src/Api/PubnubApi/EndPoint/PubSub/SubscribeEndpoint.cs

View workflow job for this annotation

GitHub Actions / Acceptance tests

Type parameter 'T' has the same name as the type parameter from outer type 'SubscribeEndpoint<T>'

Check warning on line 254 in src/Api/PubnubApi/EndPoint/PubSub/SubscribeEndpoint.cs

View workflow job for this annotation

GitHub Actions / Integration and Unit tests

Type parameter 'T' has the same name as the type parameter from outer type 'SubscribeEndpoint<T>'

Check warning on line 254 in src/Api/PubnubApi/EndPoint/PubSub/SubscribeEndpoint.cs

View workflow job for this annotation

GitHub Actions / Integration and Unit tests

Type parameter 'T' has the same name as the type parameter from outer type 'SubscribeEndpoint<T>'

Check warning on line 254 in src/Api/PubnubApi/EndPoint/PubSub/SubscribeEndpoint.cs

View workflow job for this annotation

GitHub Actions / Publish to NuGet

Type parameter 'T' has the same name as the type parameter from outer type 'SubscribeEndpoint<T>'
{
foreach (var listener in SubscribeListenerList)
{
Expand Down
22 changes: 17 additions & 5 deletions src/Api/PubnubApi/EndPoint/PubSub/UnsubscribeAllEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,23 @@ public UnsubscribeAllEndpoint(PNConfiguration pubnubConfig, IJsonPluggableLibrar
private void UnsubscribeAll()
{
logger?.Trace($"{GetType().Name} Execute invoked");
if (subscribeEventEngineFactory.HasEventEngine(instanceId)) {
logger?.Trace($"EventEngine instance found.");
SubscribeEventEngine subscribeEventEngine = subscribeEventEngineFactory.GetEventEngine(instanceId);
subscribeEventEngine.UnsubscribeAll();
}

if (config.SplitSubscribeCalls)
{
var allEngines = subscribeEventEngineFactory.GetAllEngineInstances(instanceId);
foreach (var engine in allEngines)
{
engine.UnsubscribeAll();
}
}
else
{
if (subscribeEventEngineFactory.HasEventEngine(instanceId)) {
logger?.Trace($"EventEngine instance found.");
SubscribeEventEngine subscribeEventEngine = subscribeEventEngineFactory.GetEventEngine(instanceId);
subscribeEventEngine.UnsubscribeAll();
}
}

if (config.PresenceInterval > 0 && presenceEventEngineFactory.HasEventEngine(instanceId)) {
PresenceEventEngine presenceEventEngine = presenceEventEngineFactory.GetEventEngine(instanceId);
Expand Down
Loading
Loading