diff --git a/python/test/nodeDiscoveryPubs_TEST.py b/python/test/nodeDiscoveryPubs_TEST.py new file mode 100644 index 00000000..f53f1b7a --- /dev/null +++ b/python/test/nodeDiscoveryPubs_TEST.py @@ -0,0 +1,44 @@ +from threading import Thread +from multiprocessing import Process +import time + + +def create_pubs(id_): + '''creates publishers at isolated discovery network, isolated network is created based on id_''' + import os + isolated_env = {'GZ_DISCOVERY_MSG_PORT':f'{11320+id_*10}', + 'GZ_DISCOVERY_SRV_PORT':f'{11321+id_*10}', + 'GZ_DISCOVERY_MULTICAST_IP': f'239.255.0.{10+id_}', + 'GZ_PARTITION':f'env{id_}', + } + + print("Exec the following in cli to listen to topics at isolated env", id_) + [print(f'export {key}={isolated_env[key]}') for key in isolated_env] + os.environ.update(isolated_env) + import gz.transport14 as transport + from gz.msgs11.empty_pb2 import Empty + node = transport.Node() + pub = node.advertise(f'/empty{id_}', Empty) + import time + while True: + print(f"publishing at env {id_}") + pub.publish(Empty()) + time.sleep(1.0) + + + +def pub_proc(n_threads): + '''process for creating multiple pubs''' + + pub_threads =[Thread(target=create_pubs,args=[i]) for i in range(n_threads)] + for pi in pub_threads: + pi.start() + time.sleep(2.0) + for pi in pub_threads: + pi.join() + + + +proc_pub = Thread(target=pub_proc, args=[2]) +proc_pub.start() +proc_pub.join() \ No newline at end of file diff --git a/python/test/nodeDiscoverySubs_TEST.py b/python/test/nodeDiscoverySubs_TEST.py new file mode 100644 index 00000000..f9600502 --- /dev/null +++ b/python/test/nodeDiscoverySubs_TEST.py @@ -0,0 +1,64 @@ +from threading import Thread +from multiprocessing import Process +import time + + + +def create_subs(id_): + '''creates subscribers at isolated discovery network, isolated network is created based on id_''' + import os + isolated_env = {'GZ_DISCOVERY_MSG_PORT':f'{11320+id_*10}', + 'GZ_DISCOVERY_SRV_PORT':f'{11321+id_*10}', + 'GZ_DISCOVERY_MULTICAST_IP': f'239.255.0.{10+id_}', + 'GZ_PARTITION':f'env{id_}', + } + + print("Exec the following in cli to listen to topics at isolated env", id_) + [print(f'export {key}={isolated_env[key]}') for key in isolated_env] + os.environ.update(isolated_env) + import gz.transport14 as transport + from gz.msgs11.empty_pb2 import Empty + node = transport.Node() + def cb(msg): + print(f"Recieved msg at env {id_}") + + sub = node.subscribe(Empty, f'/empty{id_}', cb) + + import time + while True: + time.sleep(1.0) + + + + +def sub_proc(n_threads): + '''process for creating multiple subs''' + # We reverse the order of env, this is to ensure + # that the mew environments do not keep the initial envf vars + # For example, consider the scenario where + # two publishers are created with ids 0,1 + # If the env var change did affect them, then each one should be publishing + # with different discovery ports + # If it didn't affect, then all of them should be publishing at same discovery port + # In order to verify that these publishers are indeed publishing at different ports + # We can reverse the order of env id, when creating subscribers. + # This ensures that the env var change indeed affected + # See below: + # 1. pub1 and pub2 created under same discovery (order of env creation 0->1) + # sub1 and sub2 created under same discovery (order of env creation 0->1) + # Since they are under same discovery, both pubs and subs will receive their msgs + # But its not easy to verify if they were created under different discovery + # 2. pub1 and pub2 created under different discovery (order of env creation 0->1) + # sub1 and sub2 created under different discovery (order of env creation 1->0) + # If they are under same discovery, both subs will NOT receive their msgs + + sub_threads =[Thread(target=create_subs,args=[i]) for i in reversed(range(n_threads))] + for si in sub_threads: + si.start() + time.sleep(2.0) + for si in sub_threads: + si.join() + +proc_sub = Thread(target=sub_proc, args=[2]) +proc_sub.start() +proc_sub.join() \ No newline at end of file diff --git a/src/NodeShared.cc b/src/NodeShared.cc index 6e6e8c45..78def74d 100644 --- a/src/NodeShared.cc +++ b/src/NodeShared.cc @@ -159,11 +159,19 @@ NodeShared *NodeShared::Instance() // is not shared between different processes. static std::shared_mutex mutex; - static std::unordered_map nodeSharedMap; + static std::unordered_map nodeSharedMap; // Get current process ID. auto pid = getProcessId(); - + // Get current thread ID, so we can create multiple NodeShared per Process + // , per thread + auto tid = std::this_thread::get_id(); + // Create a unique id using both pid and tid + std::stringstream ss; + ss << tid; + std::stringstream unique_ss; + unique_ss << std::hex << pid << "-" << ss.str(); + auto uid = unique_ss.str(); // Check if there's already a NodeShared instance for this process. // Use a shared_lock so multiple threads can read simultaneously. // This will only block if there's another thread locking exclusively @@ -173,7 +181,7 @@ NodeShared *NodeShared::Instance() try { std::shared_lock readLock(mutex); - return nodeSharedMap.at(pid); + return nodeSharedMap.at(uid); } catch (...) { @@ -182,7 +190,7 @@ NodeShared *NodeShared::Instance() // not an already constructed NodeShared instance for this process. std::lock_guard writeLock(mutex); - auto iter = nodeSharedMap.find(pid); + auto iter = nodeSharedMap.find(uid); if (iter != nodeSharedMap.end()) { // There's already an instance for this process, return it. @@ -190,7 +198,7 @@ NodeShared *NodeShared::Instance() } // No instance, construct a new one. - auto ret = nodeSharedMap.insert({pid, new NodeShared}); + auto ret = nodeSharedMap.insert({uid, new NodeShared}); assert(ret.second); // Insert operation should be successful. return ret.first->second; }