diff --git a/.pubnub.yml b/.pubnub.yml index 9c6bbb633..80e24093e 100644 --- a/.pubnub.yml +++ b/.pubnub.yml @@ -1,8 +1,13 @@ name: c-sharp -version: "8.0.4" +version: "8.1.0" schema: 1 scm: github.com/pubnub/c-sharp changelog: + - date: 2025-12-18 + version: v8.1.0 + changes: + - type: feature + text: "Added a config option to split multi-channel subscribes for keysets with channel sharding enabled." - date: 2025-12-04 version: v8.0.4 changes: @@ -972,7 +977,7 @@ features: - QUERY-PARAM supported-platforms: - - version: Pubnub 'C#' 8.0.4 + version: Pubnub 'C#' 8.1.0 platforms: - Windows 10 and up - Windows Server 2008 and up @@ -983,7 +988,7 @@ supported-platforms: - .Net Framework 4.6.1+ - .Net Framework 6.0 - - version: PubnubPCL 'C#' 8.0.4 + version: PubnubPCL 'C#' 8.1.0 platforms: - Xamarin.Android - Xamarin.iOS @@ -1003,7 +1008,7 @@ supported-platforms: - .Net Core - .Net 6.0 - - version: PubnubUWP 'C#' 8.0.4 + version: PubnubUWP 'C#' 8.1.0 platforms: - Windows Phone 10 - Universal Windows Apps @@ -1027,7 +1032,7 @@ sdks: distribution-type: source distribution-repository: GitHub package-name: Pubnub - location: https://github.com/pubnub/c-sharp/releases/tag/v8.0.4 + location: https://github.com/pubnub/c-sharp/releases/tag/v8.1.0 requires: - name: ".Net" @@ -1310,7 +1315,7 @@ sdks: distribution-type: source distribution-repository: GitHub package-name: PubNubPCL - location: https://github.com/pubnub/c-sharp/releases/tag/v8.0.4 + location: https://github.com/pubnub/c-sharp/releases/tag/v8.1.0 requires: - name: ".Net Core" @@ -1669,7 +1674,7 @@ sdks: distribution-type: source distribution-repository: GitHub package-name: PubnubUWP - location: https://github.com/pubnub/c-sharp/releases/tag/v8.0.4 + location: https://github.com/pubnub/c-sharp/releases/tag/v8.1.0 requires: - name: "Universal Windows Platform Development" diff --git a/CHANGELOG b/CHANGELOG index 7e8fcb9ae..f74e49fbf 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,7 @@ +v8.1.0 - December 18 2025 +----------------------------- +- Added: added a config option to split multi-channel subscribes for keysets with channel sharding enabled. + v8.0.4 - December 04 2025 ----------------------------- - Modified: prevent resubscribe to previously subscribed entities. diff --git a/src/Api/PubnubApi/EndPoint/PubSub/SubscribeEndpoint.cs b/src/Api/PubnubApi/EndPoint/PubSub/SubscribeEndpoint.cs index 4bb4551d6..ed2da63aa 100644 --- a/src/Api/PubnubApi/EndPoint/PubSub/SubscribeEndpoint.cs +++ b/src/Api/PubnubApi/EndPoint/PubSub/SubscribeEndpoint.cs @@ -23,7 +23,6 @@ public class SubscribeEndpoint: ISubscribeOperation private SubscribeManager2 manager; private Dictionary queryParam; private Pubnub PubnubInstance; - private SubscribeEventEngine subscribeEventEngine; private SubscribeEventEngineFactory subscribeEventEngineFactory; private PresenceOperation presenceOperation; private HeartbeatOperation heartbeatOperation; @@ -100,17 +99,24 @@ public void Execute() subscribeChannelGroupNames ??= new List(); if (presenceSubscribeEnabled) { - List presenceChannelNames = (this.subscribeChannelNames != null && this.subscribeChannelNames.Count > 0 && !string.IsNullOrEmpty(this.subscribeChannelNames[0])) - ? this.subscribeChannelNames.Select(c => string.Format(CultureInfo.InvariantCulture, "{0}-pnpres", c)).ToList() : new List(); - List presenceChannelGroupNames = (this.subscribeChannelGroupNames != null && this.subscribeChannelGroupNames.Count > 0 && !string.IsNullOrEmpty(this.subscribeChannelGroupNames[0])) - ? this.subscribeChannelGroupNames.Select(c => string.Format(CultureInfo.InvariantCulture, "{0}-pnpres", c)).ToList() : new List(); - - if (this.subscribeChannelNames != null && presenceChannelNames.Count > 0) { - this.subscribeChannelNames.AddRange(presenceChannelNames); + if (config.SplitSubscribeCalls) + { + config?.Logger.Error("Presence subscription is disabled when SplitSubscribeCalls = true!"); } + else + { + List presenceChannelNames = (this.subscribeChannelNames != null && this.subscribeChannelNames.Count > 0 && !string.IsNullOrEmpty(this.subscribeChannelNames[0])) + ? this.subscribeChannelNames.Select(c => string.Format(CultureInfo.InvariantCulture, "{0}-pnpres", c)).ToList() : new List(); + List presenceChannelGroupNames = (this.subscribeChannelGroupNames != null && this.subscribeChannelGroupNames.Count > 0 && !string.IsNullOrEmpty(this.subscribeChannelGroupNames[0])) + ? this.subscribeChannelGroupNames.Select(c => string.Format(CultureInfo.InvariantCulture, "{0}-pnpres", c)).ToList() : new List(); - if (this.subscribeChannelGroupNames != null && presenceChannelGroupNames.Count > 0) { - this.subscribeChannelGroupNames.AddRange(presenceChannelGroupNames); + if (this.subscribeChannelNames != null && presenceChannelNames.Count > 0) { + this.subscribeChannelNames.AddRange(presenceChannelNames); + } + + if (this.subscribeChannelGroupNames != null && presenceChannelGroupNames.Count > 0) { + this.subscribeChannelGroupNames.AddRange(presenceChannelGroupNames); + } } } @@ -123,6 +129,31 @@ public void Execute() Subscribe(channelNames, channelGroupNames, cursor, this.queryParam); } + private SubscribeEventEngine InitSubscribeEvenEngine(string id) + { + SubscribeEventEngine subscribeEventEngine; + if (subscribeEventEngineFactory.HasEventEngine(id)) + { + subscribeEventEngine = subscribeEventEngineFactory.GetEventEngine(id); + } else { + var subscribeManager = new SubscribeManager2(config, jsonLibrary, unit, pubnubTokenMgr, PubnubInstance); + subscribeEventEngine = subscribeEventEngineFactory.InitializeEventEngine(id, PubnubInstance, config, subscribeManager, this.EventEmitter, jsonLibrary, StatusEmitter); + subscribeEventEngine.OnStateTransition += (transitionResult) => + { + SubscribeEventEngine_OnStateTransition(subscribeEventEngine, transitionResult); + }; + subscribeEventEngine.OnEventQueued += (@event) => + { + SubscribeEventEngine_OnEventQueued(subscribeEventEngine, @event); + }; + subscribeEventEngine.OnEffectDispatch += (invocation) => + { + SubscribeEventEngine_OnEffectDispatch(subscribeEventEngine, invocation); + }; + } + return subscribeEventEngine; + } + private async void Subscribe(string[] channels, string[] channelGroups, SubscriptionCursor cursor, Dictionary externalQueryParam) { try @@ -131,16 +162,24 @@ private async void Subscribe(string[] channels, string[] channelGroups, Subscrip throw new ArgumentException("Either Channel Or Channel Group or Both should be provided."); } - if (subscribeEventEngineFactory.HasEventEngine(instanceId)) { - subscribeEventEngine = subscribeEventEngineFactory.GetEventEngine(instanceId); - } else { - var subscribeManager = new SubscribeManager2(config, jsonLibrary, unit, pubnubTokenMgr, PubnubInstance); - subscribeEventEngine = subscribeEventEngineFactory.InitializeEventEngine(instanceId, PubnubInstance, config, subscribeManager, this.EventEmitter, jsonLibrary, StatusEmitter); - subscribeEventEngine.OnStateTransition += SubscribeEventEngine_OnStateTransition; - subscribeEventEngine.OnEventQueued += SubscribeEventEngine_OnEventQueued; - subscribeEventEngine.OnEffectDispatch += SubscribeEventEngine_OnEffectDispatch; + if (config.SplitSubscribeCalls) + { + if (channelGroups is { Length: > 0 }) + { + config.Logger?.Warn("Subscribing to channel groups is disabled when PNConfiguration.SplitSubscribeCalls = true!"); + } + foreach (var channel in channels) + { + var subscribeEventEngine = InitSubscribeEvenEngine($"{instanceId}-{channel}"); + var channelSubscriptionCursor = cursor == null ? null : new SubscriptionCursor(cursor.Timetoken, cursor.Region); + subscribeEventEngine.Subscribe(new []{channel}, new string[]{}, channelSubscriptionCursor); + } + } + else + { + var subscribeEventEngine = InitSubscribeEvenEngine(instanceId); + subscribeEventEngine.Subscribe(channels, channelGroups, cursor); } - subscribeEventEngine.Subscribe(channels, channelGroups, cursor); if (this.presenceOperation == null) { if (inputChannels.Any() || inputChannelGroups.Any()) @@ -164,7 +203,7 @@ await heartbeatOperation.HeartbeatRequest( } } - private void SubscribeEventEngine_OnEffectDispatch(IEffectInvocation obj) + private void SubscribeEventEngine_OnEffectDispatch(SubscribeEventEngine subscribeEventEngine, IEffectInvocation obj) { try { @@ -177,7 +216,7 @@ private void SubscribeEventEngine_OnEffectDispatch(IEffectInvocation obj) } } - private void SubscribeEventEngine_OnEventQueued(IEvent @event) + private void SubscribeEventEngine_OnEventQueued(SubscribeEventEngine subscribeEventEngine, IEvent @event) { try { @@ -199,7 +238,7 @@ private void SubscribeEventEngine_OnEventQueued(IEvent @event) } } - private void SubscribeEventEngine_OnStateTransition(EventEngine.Core.TransitionResult obj) + private void SubscribeEventEngine_OnStateTransition(SubscribeEventEngine subscribeEventEngine, EventEngine.Core.TransitionResult obj) { try { diff --git a/src/Api/PubnubApi/EndPoint/PubSub/UnsubscribeAllEndpoint.cs b/src/Api/PubnubApi/EndPoint/PubSub/UnsubscribeAllEndpoint.cs index 40a798887..c21a85de5 100644 --- a/src/Api/PubnubApi/EndPoint/PubSub/UnsubscribeAllEndpoint.cs +++ b/src/Api/PubnubApi/EndPoint/PubSub/UnsubscribeAllEndpoint.cs @@ -27,11 +27,23 @@ public UnsubscribeAllEndpoint(PNConfiguration pubnubConfig, IJsonPluggableLibrar private void UnsubscribeAll() { logger?.Trace($"{GetType().Name} Execute invoked"); - if (subscribeEventEngineFactory.HasEventEngine(instanceId)) { - logger?.Trace($"EventEngine instance found."); - SubscribeEventEngine subscribeEventEngine = subscribeEventEngineFactory.GetEventEngine(instanceId); - subscribeEventEngine.UnsubscribeAll(); - } + + if (config.SplitSubscribeCalls) + { + var allEngines = subscribeEventEngineFactory.GetAllEngineInstances(instanceId); + foreach (var engine in allEngines) + { + engine.UnsubscribeAll(); + } + } + else + { + if (subscribeEventEngineFactory.HasEventEngine(instanceId)) { + logger?.Trace($"EventEngine instance found."); + SubscribeEventEngine subscribeEventEngine = subscribeEventEngineFactory.GetEventEngine(instanceId); + subscribeEventEngine.UnsubscribeAll(); + } + } if (config.PresenceInterval > 0 && presenceEventEngineFactory.HasEventEngine(instanceId)) { PresenceEventEngine presenceEventEngine = presenceEventEngineFactory.GetEventEngine(instanceId); diff --git a/src/Api/PubnubApi/EndPoint/PubSub/UnsubscribeEndpoint.cs b/src/Api/PubnubApi/EndPoint/PubSub/UnsubscribeEndpoint.cs index 2418b732d..cee3a4de3 100644 --- a/src/Api/PubnubApi/EndPoint/PubSub/UnsubscribeEndpoint.cs +++ b/src/Api/PubnubApi/EndPoint/PubSub/UnsubscribeEndpoint.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Globalization; using System.Linq; +using System.Threading.Tasks; namespace PubnubApi.EndPoint { @@ -15,7 +16,6 @@ public class UnsubscribeEndpoint : PubnubCoreBase, IUnsubscribeOperation private string[] subscribeChannelGroupNames; private Dictionary queryParam { get; set; } private Pubnub pubnubInstance { get; set; } - private SubscribeEventEngine subscribeEventEngine { get; set; } private SubscribeEventEngineFactory subscribeEventEngineFactory { get; set; } private PresenceEventEngineFactory presenceEventEngineFactory; private LeaveOperation leaveOperation; @@ -56,6 +56,81 @@ public void Execute() Unsubscribe(subscribeChannelNames, subscribeChannelGroupNames); } + private async Task UnsubscribeInSubscribeEventEngineInstance(SubscribeEventEngine subscribeEventEngine, string[] channels, string[] channelGroups) + { + channels ??= []; + channelGroups ??= []; + var uniqueChannelsToRemove = new List(); + var uniqueChannelGroupsToRemove = new List(); + var channelNamesToRemove = new List(channels); + channelNamesToRemove = + channelNamesToRemove.Concat(channelNamesToRemove.Where(c=>!c.EndsWith(Constants.Pnpres)).Select(c => $"{c}{Constants.Pnpres}")).ToList(); + var uniqueChannelNamesCount = subscribeEventEngine.Channels.Distinct().Count(); + foreach (var c in channelNamesToRemove) + { + if (subscribeEventEngine.Channels.Contains(c)) + { + subscribeEventEngine.Channels.Remove(c); + if (!subscribeEventEngine.Channels.Contains(c) && !c.EndsWith(Constants.Pnpres)) + uniqueChannelsToRemove.Add(c); + } + } + var uniqueChannelNamesCountAfterRemoval = subscribeEventEngine.Channels.Distinct().Count(); + bool isUniqueChannelCountChanged = uniqueChannelNamesCount != uniqueChannelNamesCountAfterRemoval; + + var channelGroupNamesToRemove = new List(channelGroups); + channelGroupNamesToRemove = + channelGroupNamesToRemove.Concat(channelGroupNamesToRemove.Where(cg=>!cg.EndsWith(Constants.Pnpres)).Select(c => $"{c}{Constants.Pnpres}")).ToList(); + var uniqueChannelGroupNamesCount = subscribeEventEngine.ChannelGroups.Distinct().Count(); + foreach (var cg in channelGroupNamesToRemove) + { + if (subscribeEventEngine.ChannelGroups.Contains(cg)) + { + subscribeEventEngine.ChannelGroups.Remove(cg); + if (!subscribeEventEngine.ChannelGroups.Contains(cg) && !cg.EndsWith(Constants.Pnpres)) + uniqueChannelGroupsToRemove.Add(cg); + } + } + var uniqueChannelGroupNamesCountAfterRemoval = subscribeEventEngine.ChannelGroups.Distinct().Count(); + bool isUniqueChannelGroupCountChanged = uniqueChannelGroupNamesCount != uniqueChannelGroupNamesCountAfterRemoval; + + var isSubscriptionChanged = isUniqueChannelCountChanged || isUniqueChannelGroupCountChanged; + if (isSubscriptionChanged) + { + subscribeEventEngine.Unsubscribe(subscribeEventEngine.Channels.ToArray(), subscribeEventEngine.ChannelGroups.ToArray()); + if (config.PresenceInterval > 0 && presenceEventEngineFactory.HasEventEngine(instanceId) && (uniqueChannelsToRemove.Count > 0 || uniqueChannelGroupsToRemove.Count > 0)) { + PresenceEventEngine presenceEventEngine = presenceEventEngineFactory.GetEventEngine(instanceId); + presenceEventEngine.EventQueue.Enqueue(new EventEngine.Presence.Events.LeftEvent() + { + Input = new EventEngine.Presence.Common.PresenceInput() + { Channels = uniqueChannelsToRemove.ToArray(), ChannelGroups = uniqueChannelGroupsToRemove.ToArray() } + }); + } + else + { + if(!config.SuppressLeaveEvents) + await leaveOperation.LeaveRequest( + uniqueChannelsToRemove.Distinct().ToArray(), + uniqueChannelGroupsToRemove.Distinct().ToArray() + ).ConfigureAwait(false); + } + if (config.MaintainPresenceState) { + if (ChannelLocalUserState.TryGetValue(PubnubInstance.InstanceId, out + var userState)) { + foreach (var channelName in uniqueChannelsToRemove ) { + userState.TryRemove(channelName, out _); + } + } + if (ChannelGroupLocalUserState.TryGetValue(PubnubInstance.InstanceId, out + var channelGroupUserState)) { + foreach (var channelGroupName in uniqueChannelGroupsToRemove) { + channelGroupUserState.TryRemove(channelGroupName, out _); + } + } + } + } + } + private async void Unsubscribe(string[] channels, string[] channelGroups) { if ((channels == null || channels.Length == 0) && (channelGroups == null || channelGroups.Length == 0)) { @@ -64,83 +139,28 @@ private async void Unsubscribe(string[] channels, string[] channelGroups) logger?.Trace($" Unsubscribe request for channels: {string.Join(",", channels ?? [])}, channelGroups: {string.Join(",", channelGroups ?? [])} "); - if (subscribeEventEngineFactory.HasEventEngine(instanceId)) { - subscribeEventEngine = subscribeEventEngineFactory.GetEventEngine(instanceId); - channels ??= []; - channelGroups ??= []; - var uniqueChannelsToRemove = new List(); - var uniqueChannelGroupsToRemove = new List(); - var channelNamesToRemove = new List(channels); - channelNamesToRemove = - channelNamesToRemove.Concat(channelNamesToRemove.Where(c=>!c.EndsWith(Constants.Pnpres)).Select(c => $"{c}{Constants.Pnpres}")).ToList(); - var uniqueChannelNamesCount = subscribeEventEngine.Channels.Distinct().Count(); - foreach (var c in channelNamesToRemove) - { - if (subscribeEventEngine.Channels.Contains(c)) - { - subscribeEventEngine.Channels.Remove(c); - if (!subscribeEventEngine.Channels.Contains(c) && !c.EndsWith(Constants.Pnpres)) - uniqueChannelsToRemove.Add(c); - } - } - var uniqueChannelNamesCountAfterRemoval = subscribeEventEngine.Channels.Distinct().Count(); - bool isUniqueChannelCountChanged = uniqueChannelNamesCount != uniqueChannelNamesCountAfterRemoval; - - var channelGroupNamesToRemove = new List(channelGroups); - channelGroupNamesToRemove = - channelGroupNamesToRemove.Concat(channelGroupNamesToRemove.Where(cg=>!cg.EndsWith(Constants.Pnpres)).Select(c => $"{c}{Constants.Pnpres}")).ToList(); - var uniqueChannelGroupNamesCount = subscribeEventEngine.ChannelGroups.Distinct().Count(); - foreach (var cg in channelGroupNamesToRemove) + if (config.SplitSubscribeCalls) + { + foreach (var channel in channels) { - if (subscribeEventEngine.ChannelGroups.Contains(cg)) - { - subscribeEventEngine.ChannelGroups.Remove(cg); - if (!subscribeEventEngine.ChannelGroups.Contains(cg) && !cg.EndsWith(Constants.Pnpres)) - uniqueChannelGroupsToRemove.Add(cg); + var id = $"{instanceId}-{channel}"; + if (subscribeEventEngineFactory.HasEventEngine(id)) { + var subscribeEventEngine = subscribeEventEngineFactory.GetEventEngine(id); + await UnsubscribeInSubscribeEventEngineInstance(subscribeEventEngine, new []{channel}, channelGroups); + } else { + logger?.Error($"Attempted unsubscribe without event engine instance for this channel"); } } - var uniqueChannelGroupNamesCountAfterRemoval = subscribeEventEngine.ChannelGroups.Distinct().Count(); - bool isUniqueChannelGroupCountChanged = uniqueChannelGroupNamesCount != uniqueChannelGroupNamesCountAfterRemoval; - - var isSubscriptionChanged = isUniqueChannelCountChanged || isUniqueChannelGroupCountChanged; - if (isSubscriptionChanged) - { - subscribeEventEngine.Unsubscribe(subscribeEventEngine.Channels.ToArray(), subscribeEventEngine.ChannelGroups.ToArray()); - if (config.PresenceInterval > 0 && presenceEventEngineFactory.HasEventEngine(instanceId) && (uniqueChannelsToRemove.Count > 0 || uniqueChannelGroupsToRemove.Count > 0)) { - PresenceEventEngine presenceEventEngine = presenceEventEngineFactory.GetEventEngine(instanceId); - presenceEventEngine.EventQueue.Enqueue(new EventEngine.Presence.Events.LeftEvent() - { - Input = new EventEngine.Presence.Common.PresenceInput() - { Channels = uniqueChannelsToRemove.ToArray(), ChannelGroups = uniqueChannelGroupsToRemove.ToArray() } - }); - } - else - { - if(!config.SuppressLeaveEvents) - await leaveOperation.LeaveRequest( - uniqueChannelsToRemove.Distinct().ToArray(), - uniqueChannelGroupsToRemove.Distinct().ToArray() - ).ConfigureAwait(false); - } - if (config.MaintainPresenceState) { - if (ChannelLocalUserState.TryGetValue(PubnubInstance.InstanceId, out - var userState)) { - foreach (var channelName in uniqueChannelsToRemove ) { - userState.TryRemove(channelName, out _); - } - } - if (ChannelGroupLocalUserState.TryGetValue(PubnubInstance.InstanceId, out - var channelGroupUserState)) { - foreach (var channelGroupName in uniqueChannelGroupsToRemove) { - channelGroupUserState.TryRemove(channelGroupName, out _); - } - } - } + } + else + { + if (subscribeEventEngineFactory.HasEventEngine(instanceId)) { + var subscribeEventEngine = subscribeEventEngineFactory.GetEventEngine(instanceId); + await UnsubscribeInSubscribeEventEngineInstance(subscribeEventEngine, channels, channelGroups); + } else { + logger?.Error($"Attempted unsubscribe without event engine"); } - } else { - logger?.Error($"Attempted unsubscribe without event engine"); } - } } } diff --git a/src/Api/PubnubApi/EventEngine/Common/EventEmitter.cs b/src/Api/PubnubApi/EventEngine/Common/EventEmitter.cs index 2b7e174d1..4866fbd3d 100644 --- a/src/Api/PubnubApi/EventEngine/Common/EventEmitter.cs +++ b/src/Api/PubnubApi/EventEngine/Common/EventEmitter.cs @@ -227,7 +227,20 @@ public void EmitEvent(object e) var userMetaData = eventData.UserMetadata; jsonFields.Add("userMetadata", userMetaData); - jsonFields.Add("publishTimetoken", GetTimetokenMetadata(eventData.PublishMetadata).Timetoken); + + var timetokenMetadata = GetTimetokenMetadata(eventData.PublishMetadata); + long publishTimetoken = 0L; + if (timetokenMetadata != null) + { + publishTimetoken = timetokenMetadata.Timetoken; + } + else + { + configuration?.Logger?.Warn( + $"GetTimetokenMetadata returned null for channel {currentMessageChannel}. Using default timetoken=0."); + } + + jsonFields.Add("publishTimetoken", publishTimetoken); jsonFields.Add("userId", eventData.IssuingClientId); jsonFields.Add("currentMessageChannelGroup", currentMessageChannelGroup); diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngineFactory.cs b/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngineFactory.cs index 742a753c6..21177c653 100644 --- a/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngineFactory.cs +++ b/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngineFactory.cs @@ -1,5 +1,7 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; using PubnubApi.EndPoint; using PubnubApi.EventEngine.Common; @@ -23,6 +25,19 @@ internal SubscribeEventEngine GetEventEngine(string instanceId) return subscribeEventEngine; } + internal List GetAllEngineInstances(string instanceId) + { + var engines = new List(); + foreach (var key in engineInstances.Keys) + { + if (key.Contains(instanceId)) + { + engines.Add(engineInstances[key]); + } + } + return engines; + } + internal SubscribeEventEngine InitializeEventEngine(string instanceId, Pubnub pubnubInstance, PNConfiguration pubnubConfiguration, diff --git a/src/Api/PubnubApi/NewtonsoftJsonDotNet.cs b/src/Api/PubnubApi/NewtonsoftJsonDotNet.cs index 4f3485aa7..ac02bdc30 100644 --- a/src/Api/PubnubApi/NewtonsoftJsonDotNet.cs +++ b/src/Api/PubnubApi/NewtonsoftJsonDotNet.cs @@ -299,35 +299,55 @@ private T DeserializeMessageToObjectBasedOnPlatform(List listObject) //Set Time PropertyInfo timeProp = specific.GetRuntimeProperty("Timetoken"); - long timetoken; - Int64.TryParse(listObject[2].ToString(), out timetoken); - timeProp.SetValue(message, timetoken, null); + if (timeProp != null) + { + long timetoken; + Int64.TryParse(listObject[2].ToString(), out timetoken); + timeProp.SetValue(message, timetoken, null); + } //Set Publisher PropertyInfo publisherProp = specific.GetRuntimeProperty("Publisher"); - string publisherValue = (listObject[3] != null) ? listObject[3].ToString() : ""; - publisherProp.SetValue(message, publisherValue, null); + if (publisherProp != null) + { + string publisherValue = (listObject[3] != null) ? listObject[3].ToString() : ""; + publisherProp.SetValue(message, publisherValue, null); + } // Set ChannelName PropertyInfo channelNameProp = specific.GetRuntimeProperty("Channel"); - channelNameProp.SetValue(message, listObject[5]?.ToString(), null); + if (channelNameProp != null) + { + channelNameProp.SetValue(message, listObject[5]?.ToString(), null); + } // Set ChannelGroup PropertyInfo subsciptionProp = specific.GetRuntimeProperty("Subscription"); - subsciptionProp.SetValue(message, listObject[4]?.ToString(), null); - + if (subsciptionProp != null) + { + subsciptionProp.SetValue(message, listObject[4]?.ToString(), null); + } + PropertyInfo customMessageType = specific.GetRuntimeProperty("CustomMessageType"); - customMessageType.SetValue(message, listObject[6], null); + if (customMessageType != null) + { + customMessageType.SetValue(message, listObject[6], null); + } + //Set Metadata list second position, index=1 if (listObject[1] != null) { PropertyInfo userMetadataProp = specific.GetRuntimeProperty("UserMetadata"); - userMetadataProp.SetValue(message, listObject[1], null); + if (userMetadataProp != null) + { + userMetadataProp.SetValue(message, listObject[1], null); + } } - + ret = (T)Convert.ChangeType(message, specific, CultureInfo.InvariantCulture); } + logger?.Trace("JsonNet Deserialized Messages successfully."); return ret; } @@ -377,32 +397,63 @@ public T DeserializeToObject(IDictionary jsonFields) } PropertyInfo timeProp = specific.GetRuntimeProperty("Timetoken"); - long timetoken; - Int64.TryParse(jsonFields["publishTimetoken"].ToString(), out timetoken); - timeProp.SetValue(message, timetoken, null); + if (timeProp != null) + { + long timetoken = 0; + if (jsonFields.ContainsKey("publishTimetoken") && jsonFields["publishTimetoken"] != null) + { + Int64.TryParse(jsonFields["publishTimetoken"].ToString(), out timetoken); + } + + timeProp.SetValue(message, timetoken, null); + } PropertyInfo publisherProp = specific.GetRuntimeProperty("Publisher"); - string publisherValue = (jsonFields["userId"] != null) ? jsonFields["userId"].ToString() : ""; - publisherProp.SetValue(message, publisherValue, null); + if (publisherProp != null) + { + string publisherValue = (jsonFields.ContainsKey("userId") && jsonFields["userId"] != null) + ? jsonFields["userId"].ToString() + : ""; + publisherProp.SetValue(message, publisherValue, null); + } PropertyInfo channelNameProp = specific.GetRuntimeProperty("Channel"); - channelNameProp.SetValue(message, jsonFields["channel"]?.ToString(), null); + if (channelNameProp != null) + { + string channelValue = (jsonFields.ContainsKey("channel") && jsonFields["channel"] != null) + ? jsonFields["channel"].ToString() + : null; + channelNameProp.SetValue(message, channelValue, null); + } PropertyInfo subsciptionProp = specific.GetRuntimeProperty("Subscription"); - subsciptionProp.SetValue(message, jsonFields["channelGroup"]?.ToString(), null); + if (subsciptionProp != null) + { + string subscriptionValue = + (jsonFields.ContainsKey("channelGroup") && jsonFields["channelGroup"] != null) + ? jsonFields["channelGroup"].ToString() + : null; + subsciptionProp.SetValue(message, subscriptionValue, null); + } if (jsonFields.ContainsKey("customMessageType")) { PropertyInfo customMessageType = specific.GetRuntimeProperty("CustomMessageType"); - customMessageType.SetValue(message, jsonFields["customMessageType"], null); + if (customMessageType != null) + { + customMessageType.SetValue(message, jsonFields["customMessageType"], null); + } } - if (jsonFields["userMetadata"] != null) + if (jsonFields.ContainsKey("userMetadata") && jsonFields["userMetadata"] != null) { PropertyInfo userMetadataProp = specific.GetRuntimeProperty("UserMetadata"); - userMetadataProp.SetValue(message, ConvertToDictionaryObject(jsonFields["userMetadata"]), null); + if (userMetadataProp != null) + { + userMetadataProp.SetValue(message, ConvertToDictionaryObject(jsonFields["userMetadata"]), null); + } } - + response = (T)Convert.ChangeType(message, specific, CultureInfo.InvariantCulture); } diff --git a/src/Api/PubnubApi/PNConfiguration.cs b/src/Api/PubnubApi/PNConfiguration.cs index 45044d967..cb7078bdb 100644 --- a/src/Api/PubnubApi/PNConfiguration.cs +++ b/src/Api/PubnubApi/PNConfiguration.cs @@ -50,6 +50,17 @@ public string SubscribeKey } } + /// + /// When enabled single multi-channel subscriptions will internally be changed into multiple single-channel ones. + /// Enable this option if your keyset has channel-based sharding enabled. + /// This option requires Event Engine to be enabled (it's on by default). + /// WARNING: Enabling this option will also: + ///
  • Disable subscribing to channel groups
  • + ///
  • Disable subscribing with WithPresence() (you can still explicitly subscribe to *-pnpres channels directly)
  • + ///
  • Generate more server requests, potentially increasing costs
  • + ///
    + public bool SplitSubscribeCalls { get; set; } = false; + public string PublishKey { get => publishKey; diff --git a/src/Api/PubnubApi/Properties/AssemblyInfo.cs b/src/Api/PubnubApi/Properties/AssemblyInfo.cs index 2cac81845..1607dc05f 100644 --- a/src/Api/PubnubApi/Properties/AssemblyInfo.cs +++ b/src/Api/PubnubApi/Properties/AssemblyInfo.cs @@ -11,8 +11,8 @@ [assembly: AssemblyProduct("Pubnub C# SDK")] [assembly: AssemblyCopyright("Copyright © 2021")] [assembly: AssemblyTrademark("")] -[assembly: AssemblyVersion("8.0.4")] -[assembly: AssemblyFileVersion("8.0.4")] +[assembly: AssemblyVersion("8.1.0")] +[assembly: AssemblyFileVersion("8.1.0")] // Setting ComVisible to false makes the types in this assembly not visible // to COM components. If you need to access a type in this assembly from // COM, set the ComVisible attribute to true on that type. diff --git a/src/Api/PubnubApi/Pubnub.cs b/src/Api/PubnubApi/Pubnub.cs index f1a33c4f5..2fee9d258 100644 --- a/src/Api/PubnubApi/Pubnub.cs +++ b/src/Api/PubnubApi/Pubnub.cs @@ -674,9 +674,23 @@ public UserId GetCurrentUserId() public List GetSubscribedChannels() { - if (pubnubConfig[InstanceId].EnableEventEngine) + var config = pubnubConfig[InstanceId]; + if (config.EnableEventEngine) { - return this.subscribeEventEngineFactory.GetEventEngine(InstanceId).Channels; + if (config.SplitSubscribeCalls) + { + var allEngines = this.subscribeEventEngineFactory.GetAllEngineInstances(InstanceId); + var channels = new List(); + foreach (var engine in allEngines) + { + channels.AddRange(engine.Channels); + } + return channels; + } + else + { + return this.subscribeEventEngineFactory.GetEventEngine(InstanceId).Channels; + } } OtherOperation endpoint = @@ -688,11 +702,17 @@ public List GetSubscribedChannels() public List GetSubscribedChannelGroups() { - if (pubnubConfig[InstanceId].EnableEventEngine) + var config = pubnubConfig[InstanceId]; + //Can't subscribe to channel groups with split subscribing + if (config.SplitSubscribeCalls) + { + config.Logger?.Warn("Subscribing to channel groups is disabled when PNConfiguration.SplitSubscribeCalls = true!"); + return new List(); + } + if (config.EnableEventEngine) { return this.subscribeEventEngineFactory.GetEventEngine(InstanceId).ChannelGroups; } - OtherOperation endpoint = new OtherOperation(pubnubConfig.ContainsKey(InstanceId) ? pubnubConfig[InstanceId] : null, JsonPluggableLibrary, pubnubUnitTest, tokenManager, this); @@ -738,87 +758,34 @@ public void SetAuthToken(string token) } } - public async Task Reconnect() + public async Task Reconnect(bool resetSubscribeTimetoken = false) { bool ret = false; - SubscribeEventEngine subscribeEventEngine = null; - if (pubnubConfig[InstanceId].EnableEventEngine) + var subscribeEventEngines = new List(); + var config = pubnubConfig[InstanceId]; + if (config.EnableEventEngine) { - if (subscribeEventEngineFactory.HasEventEngine(InstanceId)) - { - subscribeEventEngine = subscribeEventEngineFactory.GetEventEngine(InstanceId); - - subscribeEventEngine.EventQueue.Enqueue(new ReconnectEvent() - { - Channels = (subscribeEventEngine.CurrentState as SubscriptionState).Channels, - ChannelGroups = (subscribeEventEngine.CurrentState as SubscriptionState).ChannelGroups, - Cursor = (subscribeEventEngine.CurrentState as SubscriptionState).Cursor - }); - } - - if (presenceEventengineFactory.HasEventEngine(InstanceId)) - { - var presenceEventEngine = presenceEventengineFactory.GetEventEngine(InstanceId); - - presenceEventEngine.EventQueue.Enqueue(new EventEngine.Presence.Events.ReconnectEvent() - { - Input = new EventEngine.Presence.Common.PresenceInput() - { - Channels = (presenceEventEngine.CurrentState as EventEngine.Presence.States.APresenceState) - ?.Input.Channels, - ChannelGroups = - (presenceEventEngine.CurrentState as EventEngine.Presence.States.APresenceState)?.Input - .ChannelGroups - } - }); - } - else + if (config.SplitSubscribeCalls) { - if (subscribeEventEngine != null) - heartbeatOperation ??= new HeartbeatOperation( - pubnubConfig.ContainsKey(InstanceId) ? pubnubConfig[InstanceId] : null, - JsonPluggableLibrary, pubnubUnitTest, tokenManager, this); - - await heartbeatOperation.HeartbeatRequest( - (subscribeEventEngine?.CurrentState as SubscriptionState)?.Channels.ToArray(), - (subscribeEventEngine?.CurrentState as SubscriptionState)?.ChannelGroups.ToArray() - ).ConfigureAwait(false); + subscribeEventEngines = subscribeEventEngineFactory.GetAllEngineInstances(InstanceId); } - } - else - { - if (savedSubscribeOperation is SubscribeOperation) + else if(subscribeEventEngineFactory.HasEventEngine(InstanceId)) { - SubscribeOperation subscibeOperationInstance = savedSubscribeOperation as SubscribeOperation; - if (subscibeOperationInstance != null) - { - ret = subscibeOperationInstance.Retry(true, false); - } + subscribeEventEngines = [subscribeEventEngineFactory.GetEventEngine(InstanceId)]; } - } - - return ret; - } - public async Task Reconnect(bool resetSubscribeTimetoken) - { - bool ret = false; - SubscribeEventEngine subscribeEventEngine = null; - if (pubnubConfig[InstanceId].EnableEventEngine) - { - if (subscribeEventEngineFactory.HasEventEngine(InstanceId)) + foreach (var subscribeEventEngine in subscribeEventEngines) { - subscribeEventEngine = subscribeEventEngineFactory.GetEventEngine(InstanceId); subscribeEventEngine.EventQueue.Enqueue(new ReconnectEvent() { - Channels = (subscribeEventEngine.CurrentState as SubscriptionState)?.Channels, - ChannelGroups = (subscribeEventEngine.CurrentState as SubscriptionState)?.ChannelGroups, + Channels = (subscribeEventEngine.CurrentState as SubscriptionState).Channels, + ChannelGroups = (subscribeEventEngine.CurrentState as SubscriptionState).ChannelGroups, Cursor = resetSubscribeTimetoken ? null : (subscribeEventEngine.CurrentState as SubscriptionState)?.Cursor }); } - + if (presenceEventengineFactory.HasEventEngine(InstanceId)) { var presenceEventEngine = presenceEventengineFactory.GetEventEngine(InstanceId); @@ -837,14 +804,18 @@ public async Task Reconnect(bool resetSubscribeTimetoken) } else { - if (subscribeEventEngine != null) - heartbeatOperation ??= new HeartbeatOperation( - pubnubConfig.ContainsKey(InstanceId) ? pubnubConfig[InstanceId] : null, - JsonPluggableLibrary, pubnubUnitTest, tokenManager, this); - await heartbeatOperation.HeartbeatRequest( - (subscribeEventEngine?.CurrentState as SubscriptionState)?.Channels.ToArray(), - (subscribeEventEngine?.CurrentState as SubscriptionState)?.ChannelGroups.ToArray() - ).ConfigureAwait(false); + foreach (var subscribeEventEngine in subscribeEventEngines) + { + if (subscribeEventEngine != null) + heartbeatOperation ??= new HeartbeatOperation( + pubnubConfig.ContainsKey(InstanceId) ? pubnubConfig[InstanceId] : null, + JsonPluggableLibrary, pubnubUnitTest, tokenManager, this); + + await heartbeatOperation.HeartbeatRequest( + (subscribeEventEngine?.CurrentState as SubscriptionState)?.Channels.ToArray(), + (subscribeEventEngine?.CurrentState as SubscriptionState)?.ChannelGroups.ToArray() + ).ConfigureAwait(false); + } } } else @@ -854,7 +825,7 @@ await heartbeatOperation.HeartbeatRequest( SubscribeOperation subscibeOperationInstance = savedSubscribeOperation as SubscribeOperation; if (subscibeOperationInstance != null) { - ret = subscibeOperationInstance.Retry(true, resetSubscribeTimetoken); + ret = subscibeOperationInstance.Retry(true, false); } } } @@ -865,18 +836,27 @@ await heartbeatOperation.HeartbeatRequest( public async Task Disconnect() { bool ret = false; - SubscribeEventEngine subscribeEventEngine = null; + var subscribeEventEngines = new List(); + var config = pubnubConfig[InstanceId]; if (pubnubConfig[InstanceId].EnableEventEngine) { - if (subscribeEventEngineFactory.HasEventEngine(InstanceId)) + if (config.SplitSubscribeCalls) + { + subscribeEventEngines = subscribeEventEngineFactory.GetAllEngineInstances(InstanceId); + }else if (subscribeEventEngineFactory.HasEventEngine(InstanceId)) + { + subscribeEventEngines = [subscribeEventEngineFactory.GetEventEngine(InstanceId)]; + } + + foreach (var subscribeEventEngine in subscribeEventEngines) { - subscribeEventEngine = subscribeEventEngineFactory.GetEventEngine(InstanceId); subscribeEventEngine.EventQueue.Enqueue(new DisconnectEvent() { Channels = (subscribeEventEngine.CurrentState as SubscriptionState)?.Channels, ChannelGroups = (subscribeEventEngine.CurrentState as SubscriptionState)?.ChannelGroups }); } + if (presenceEventengineFactory.HasEventEngine(InstanceId)) { @@ -886,16 +866,19 @@ public async Task Disconnect() } else { - if (subscribeEventEngine != null && !(pubnubConfig.ContainsKey(InstanceId) && - pubnubConfig[InstanceId].SuppressLeaveEvents)) + foreach (var subscribeEventEngine in subscribeEventEngines) { - leaveOperation ??= new LeaveOperation( - pubnubConfig.ContainsKey(InstanceId) ? pubnubConfig[InstanceId] : null, - JsonPluggableLibrary, pubnubUnitTest, tokenManager, this); - await leaveOperation.LeaveRequest( - (subscribeEventEngine?.CurrentState as SubscriptionState)?.Channels.ToArray(), - (subscribeEventEngine?.CurrentState as SubscriptionState)?.ChannelGroups.ToArray()) - .ConfigureAwait(false); + if (subscribeEventEngine != null && !(pubnubConfig.ContainsKey(InstanceId) && + pubnubConfig[InstanceId].SuppressLeaveEvents)) + { + leaveOperation ??= new LeaveOperation( + pubnubConfig.ContainsKey(InstanceId) ? pubnubConfig[InstanceId] : null, + JsonPluggableLibrary, pubnubUnitTest, tokenManager, this); + await leaveOperation.LeaveRequest( + (subscribeEventEngine?.CurrentState as SubscriptionState)?.Channels.ToArray(), + (subscribeEventEngine?.CurrentState as SubscriptionState)?.ChannelGroups.ToArray()) + .ConfigureAwait(false); + } } } } diff --git a/src/Api/PubnubApi/PubnubApi.csproj b/src/Api/PubnubApi/PubnubApi.csproj index 5cc2b4ce7..c8aee7c14 100644 --- a/src/Api/PubnubApi/PubnubApi.csproj +++ b/src/Api/PubnubApi/PubnubApi.csproj @@ -14,7 +14,7 @@ Pubnub - 8.0.4 + 8.1.0 PubNub C# .NET - Web Data Push API Pandu Masabathula PubNub @@ -22,7 +22,7 @@ http://pubnub.s3.amazonaws.com/2011/powered-by-pubnub/pubnub-icon-600x600.png true https://github.com/pubnub/c-sharp/ - Prevent resubscribe to previously subscribed entities. + Added a config option to split multi-channel subscribes for keysets with channel sharding enabled. Web Data Push Real-time Notifications ESB Message Broadcasting Distributed Computing PubNub is a Massively Scalable Web Push Service for Web and Mobile Games. This is a cloud-based service for broadcasting messages to thousands of web and mobile clients simultaneously diff --git a/src/Api/PubnubApiPCL/PubnubApiPCL.csproj b/src/Api/PubnubApiPCL/PubnubApiPCL.csproj index b48b11efe..2f23d4608 100644 --- a/src/Api/PubnubApiPCL/PubnubApiPCL.csproj +++ b/src/Api/PubnubApiPCL/PubnubApiPCL.csproj @@ -14,7 +14,7 @@ PubnubPCL - 8.0.4 + 8.1.0 PubNub C# .NET - Web Data Push API Pandu Masabathula PubNub @@ -22,7 +22,7 @@ http://pubnub.s3.amazonaws.com/2011/powered-by-pubnub/pubnub-icon-600x600.png true https://github.com/pubnub/c-sharp/ - Prevent resubscribe to previously subscribed entities. + Added a config option to split multi-channel subscribes for keysets with channel sharding enabled. Web Data Push Real-time Notifications ESB Message Broadcasting Distributed Computing PubNub is a Massively Scalable Web Push Service for Web and Mobile Games. This is a cloud-based service for broadcasting messages to thousands of web and mobile clients simultaneously diff --git a/src/Api/PubnubApiUWP/PubnubApiUWP.csproj b/src/Api/PubnubApiUWP/PubnubApiUWP.csproj index f6f552c0b..113a10ce3 100644 --- a/src/Api/PubnubApiUWP/PubnubApiUWP.csproj +++ b/src/Api/PubnubApiUWP/PubnubApiUWP.csproj @@ -16,7 +16,7 @@ PubnubUWP - 8.0.4 + 8.1.0 PubNub C# .NET - Web Data Push API Pandu Masabathula PubNub @@ -24,7 +24,7 @@ http://pubnub.s3.amazonaws.com/2011/powered-by-pubnub/pubnub-icon-600x600.png true https://github.com/pubnub/c-sharp/ - Prevent resubscribe to previously subscribed entities. + Added a config option to split multi-channel subscribes for keysets with channel sharding enabled. Web Data Push Real-time Notifications ESB Message Broadcasting Distributed Computing PubNub is a Massively Scalable Web Push Service for Web and Mobile Games. This is a cloud-based service for broadcasting messages to thousands of web and mobile clients simultaneously diff --git a/src/Api/PubnubApiUnity/PubnubApiUnity.csproj b/src/Api/PubnubApiUnity/PubnubApiUnity.csproj index 6c1c07c27..a804de806 100644 --- a/src/Api/PubnubApiUnity/PubnubApiUnity.csproj +++ b/src/Api/PubnubApiUnity/PubnubApiUnity.csproj @@ -15,7 +15,7 @@ PubnubApiUnity - 8.0.4 + 8.1.0 PubNub C# .NET - Web Data Push API Pandu Masabathula PubNub diff --git a/src/UnitTests/PubnubApi.Tests/WhenSubscriptionSplittingIsEnabled.cs b/src/UnitTests/PubnubApi.Tests/WhenSubscriptionSplittingIsEnabled.cs new file mode 100644 index 000000000..7a4ac01c3 --- /dev/null +++ b/src/UnitTests/PubnubApi.Tests/WhenSubscriptionSplittingIsEnabled.cs @@ -0,0 +1,261 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; +using PubnubApi; + +namespace PubNubMessaging.Tests +{ + + [TestFixture] + public class WhenSubscriptionSplittingIsEnabled : TestHarness + { + private static Pubnub pubnub; + private static string authToken; + + [SetUp] + public static async Task Init() + { + PNConfiguration config = new PNConfiguration(new UserId("mytestuuid")) + { + PublishKey = PubnubCommon.PublishKey, + SubscribeKey = PubnubCommon.SubscribeKey, + SecretKey = PubnubCommon.SecretKey + }; + + pubnub = createPubNubInstance(config); + + if (string.IsNullOrEmpty(PubnubCommon.GrantToken)) + { + await GenerateTestGrantToken(pubnub); + } + + authToken = PubnubCommon.GrantToken; + + pubnub.Destroy(); + pubnub.PubnubUnitTest = null; + pubnub = null; + } + + [TearDown] + public static void Exit() + { + if (pubnub != null) + { + pubnub.UnsubscribeAll(); + pubnub.Destroy(); + pubnub.PubnubUnitTest = null; + pubnub = null; + } + } + + private static void InitPubnubForTest() + { + PNConfiguration config = new PNConfiguration(new UserId("mytestuuid")) + { + PublishKey = PubnubCommon.PublishKey, + SubscribeKey = PubnubCommon.SubscribeKey, + SplitSubscribeCalls = true + }; + if (PubnubCommon.PAMServerSideRun) + { + config.SecretKey = PubnubCommon.SecretKey; + } + pubnub = createPubNubInstance(config, authToken); + } + + [Test] + public static async Task ThenWithSplitSubscriptionSubscribeToMultipleShouldWork() + { + InitPubnubForTest(); + + var channels = new string[] + { + $"foo.1", + $"foo.2", + $"foo.3", + $"foo.4", + }; + var channelsWithConnection = new List(); + var connectedToAllReset = new ManualResetEvent(false); + + var eventListener = new SubscribeCallbackExt( + delegate (Pubnub pnObj, PNObjectEventResult eventResult) + { + }, + delegate (Pubnub pnObj, PNStatus status) + { + if (status.Category == PNStatusCategory.PNSubscriptionChangedCategory || + status.AffectedChannels.Count > 1) + { + Assert.Fail("Received more than 1 channel in status callback with subscription splitting enabled!"); + } + if (status.Category == PNStatusCategory.PNConnectedCategory) + { + channelsWithConnection.AddRange(status.AffectedChannels); + if (channels.All(x => channelsWithConnection.Contains(x))) + { + connectedToAllReset.Set(); + } + } + } + ); + pubnub.AddListener(eventListener); + pubnub.Subscribe().Channels(channels).Execute(); + var connectedToAll = connectedToAllReset.WaitOne(25000); + pubnub.RemoveListener(eventListener); + Assert.True(connectedToAll, "Split subscription didn't receive connected status for all channels!"); + var subscribedChannels = pubnub.GetSubscribedChannels(); + Assert.True(channels.All(x => subscribedChannels.Contains(x)), "Not all channels were present in subscription list!"); + } + + [Test] + public static async Task ThenWithSplitSubscriptionSubscribeToTenPlusChannelsShouldWork() + { + InitPubnubForTest(); + + var channels = new string[] + { + $"foo.1", + $"foo.2", + $"foo.3", + $"foo.5", + $"foo.6", + $"foo.7", + $"foo.8", + $"foo.9", + $"foo.10", + $"foo.11", + $"foo.12", + $"foo.13", + $"foo.14", + }; + var channelsWithConnection = new ConcurrentBag(); + var connectedToAllReset = new ManualResetEvent(false); + + var eventListener = new SubscribeCallbackExt( + delegate (Pubnub pnObj, PNObjectEventResult eventResult) + { + }, + delegate (Pubnub pnObj, PNStatus status) + { + if (status.Category == PNStatusCategory.PNSubscriptionChangedCategory || + status.AffectedChannels.Count > 1) + { + Assert.Fail("Received more than 1 channel in status callback with subscription splitting enabled!"); + } + if (status.Category == PNStatusCategory.PNConnectedCategory) + { + if (string.IsNullOrEmpty(status.AffectedChannels[0])) + { + ; + } + channelsWithConnection.Add(status.AffectedChannels[0]); + if (channelsWithConnection.Any(string.IsNullOrEmpty)) + { + ; + } + if (channels.All(x => channelsWithConnection.Contains(x))) + { + connectedToAllReset.Set(); + } + } + } + ); + pubnub.AddListener(eventListener); + pubnub.Subscribe().Channels(channels).Execute(); + var connectedToAll = connectedToAllReset.WaitOne(50000); + pubnub.RemoveListener(eventListener); + Assert.True(connectedToAll, "Split subscription didn't receive connected status for all channels!"); + var subscribedChannels = pubnub.GetSubscribedChannels(); + Assert.True(channels.All(x => subscribedChannels.Contains(x)), "Not all channels were present in subscription list!"); + } + + [Test] + public static async Task ThenWithSplitSubscriptionUnSubscribeShouldWork() + { + InitPubnubForTest(); + + var channels = new string[] + { + $"foo.1", + $"foo.2", + $"foo.3", + $"foo.4", + }; + var disconnectReset = new ManualResetEvent(false); + + var eventListener = new SubscribeCallbackExt( + delegate (Pubnub pnObj, PNObjectEventResult eventResult) + { + } + ,delegate (Pubnub pnObj, PNStatus status) + { + if (status.AffectedChannels.Count > 1) + { + Assert.Fail("Received more than 1 channel in status callback with subscription splitting enabled!"); + } + if (status.Category == PNStatusCategory.PNDisconnectedCategory) + { + disconnectReset.Set(); + } + } + ); + pubnub.AddListener(eventListener); + pubnub.Subscribe().Channels(channels).Execute(); + + await Task.Delay(5000); + + pubnub.Unsubscribe().Channels(new []{"foo.1"}).Execute(); + var receivedDisconnect = disconnectReset.WaitOne(25000); + pubnub.RemoveListener(eventListener); + Assert.True(receivedDisconnect, "Split subscription didn't receive disconnection status for channel!"); + Assert.False(pubnub.GetSubscribedChannels().Contains("foo.1"), "Subscribed channels contains disconnected channel!"); + } + + [Test] + public static async Task ThenWithSplitSubscriptionUnSubscribeAllShouldWork() + { + InitPubnubForTest(); + + var channels = new string[] + { + $"foo.1", + $"foo.2", + $"foo.3", + $"foo.4", + }; + var disconnectReset = new ManualResetEvent(false); + + var eventListener = new SubscribeCallbackExt( + delegate (Pubnub pnObj, PNObjectEventResult eventResult) + { + } + ,delegate (Pubnub pnObj, PNStatus status) + { + if (status.AffectedChannels.Count > 1) + { + Assert.Fail("Received more than 1 channel in status callback with subscription splitting enabled!"); + } + if (status.Category == PNStatusCategory.PNDisconnectedCategory) + { + disconnectReset.Set(); + } + } + ); + pubnub.AddListener(eventListener); + pubnub.Subscribe().Channels(channels).Execute(); + + await Task.Delay(5000); + + pubnub.UnsubscribeAll(); + var receivedDisconnect = disconnectReset.WaitOne(25000); + + pubnub.RemoveListener(eventListener); + Assert.True(receivedDisconnect, "Split subscription didn't receive disconnection status for channel!"); + } + } +} \ No newline at end of file diff --git a/src/UnitTests/PubnubApiPCL.Tests/PubnubApiPCL.Tests.csproj b/src/UnitTests/PubnubApiPCL.Tests/PubnubApiPCL.Tests.csproj index 050c6a416..a9dc4dfe8 100644 --- a/src/UnitTests/PubnubApiPCL.Tests/PubnubApiPCL.Tests.csproj +++ b/src/UnitTests/PubnubApiPCL.Tests/PubnubApiPCL.Tests.csproj @@ -98,6 +98,9 @@ + + WhenSubscriptionSplittingIsEnabled.cs +