From 2430c0c9e960a61f0fccc9e0aca318c4b61f3977 Mon Sep 17 00:00:00 2001 From: Kai Koehn Date: Fri, 11 Sep 2020 14:12:11 +0200 Subject: [PATCH] Generates a unique subscription key per subscriber --- lib/resource_provider.js | 4 ++-- lib/subscription_manager.js | 28 ++++++++++++++++++---------- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/lib/resource_provider.js b/lib/resource_provider.js index 252d121..ef12a75 100644 --- a/lib/resource_provider.js +++ b/lib/resource_provider.js @@ -33,7 +33,7 @@ class ResourceProvider { } getSubscriptionByTopicName(topicName) { - return SubscriptionManager.getInstance().getSubscriptionByTopicName(topicName).get(); + return SubscriptionManager.getInstance().getSubscriptionByTopicName(topicName, this._bridgeId).get(); } getClientByServiceName(serviceName) { @@ -126,7 +126,7 @@ class ResourceProvider { } hasSubscription(topicName) { - return SubscriptionManager.getInstance().getSubscriptionByTopicName(topicName) !== undefined; + return SubscriptionManager.getInstance().getSubscriptionByTopicName(topicName, this._bridgeId) !== undefined; } clean() { diff --git a/lib/subscription_manager.js b/lib/subscription_manager.js index 650d626..7b4e532 100644 --- a/lib/subscription_manager.js +++ b/lib/subscription_manager.js @@ -46,12 +46,18 @@ class SubscriptionManager { this._node = node; } - getSubscriptionByTopicName(topicName) { - return this._subscripions.get(topicName); + getSubscriptionByTopicName(topicName, bridgeId) { + const subscriptionKey = this.getSubscriptionKey(topicName, bridgeId) + return this._subscripions.get(subscriptionKey); + } + + getSubscriptionKey(topicName, bridgeId) { + return `${topicName}:${bridgeId}`; } createSubscription(messageType, topicName, bridgeId, callback) { - let handle = this._subscripions.get(topicName); + const subscriptionKey = this.getSubscriptionKey(topicName, bridgeId) + let handle = this._subscripions.get(subscriptionKey); if (!handle) { const defaultOpts = {} @@ -60,13 +66,13 @@ class SubscriptionManager { console.log(topicName, opts) let subscription = this._node.createSubscription(messageType, topicName, {enableTypedArray: false, ...opts}, (message) => { - this._subscripions.get(topicName).callbacks.forEach(callback => { + this._subscripions.get(subscriptionKey).callbacks.forEach(callback => { callback(topicName, message); }); }); handle = new HandleWithCallbacks(subscription, this._node.destroySubscription.bind(this._node)); handle.addCallback(bridgeId, callback); - this._subscripions.set(topicName, handle); + this._subscripions.set(subscriptionKey, handle); debug(`Subscription has been created, and the topic name is ${topicName}.`); return handle.get(); @@ -78,13 +84,15 @@ class SubscriptionManager { } destroySubscription(topicName, bridgeId) { - if (this._subscripions.has(topicName)) { - let handle = this._subscripions.get(topicName); + const subscriptionKey = this.getSubscriptionKey(topicName, bridgeId) + + if (this._subscripions.has(subscriptionKey)) { + let handle = this._subscripions.get(subscriptionKey); if (handle.hasCallbackForId(bridgeId)) { handle.removeCallback(bridgeId); handle.release(); if (handle.count === 0) { - this._subscripions.delete(topicName); + this._subscripions.delete(subscriptionKey); } } } @@ -101,9 +109,9 @@ class SubscriptionManager { } _removeInvalidHandle() { - this._subscripions.forEach((handle, topicName, map) => { + this._subscripions.forEach((handle, subscriptionKey, map) => { if (handle.count === 0) { - map.delete(topicName); + map.delete(subscriptionKey); } }); }