Skip to content

Commit

Permalink
Fix issue with asyncio.Queue which is not thread safe.
Browse files Browse the repository at this point in the history
  • Loading branch information
grossmj committed Nov 17, 2024
1 parent e83e12b commit 31a2cb9
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 6 deletions.
5 changes: 3 additions & 2 deletions gns3server/compute/notification_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.


import asyncio
from contextlib import contextmanager
from gns3server.utils.notification_queue import NotificationQueue

Expand All @@ -28,6 +28,7 @@ class NotificationManager:

def __init__(self):
self._listeners = set()
self._loop = asyncio.get_event_loop()

@contextmanager
def queue(self):
Expand All @@ -54,7 +55,7 @@ def emit(self, action, event, **kwargs):
"""

for listener in self._listeners:
listener.put_nowait((action, event, kwargs))
self._loop.call_soon_threadsafe(listener.put_nowait, (action, event, kwargs))

@staticmethod
def reset():
Expand Down
9 changes: 5 additions & 4 deletions gns3server/controller/notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import os
import asyncio
from contextlib import contextmanager

from gns3server.utils.notification_queue import NotificationQueue
Expand All @@ -32,6 +32,7 @@ def __init__(self, controller):
self._controller = controller
self._project_listeners = {}
self._controller_listeners = set()
self._loop = asyncio.get_event_loop()

@contextmanager
def project_queue(self, project_id):
Expand Down Expand Up @@ -73,7 +74,7 @@ def controller_emit(self, action, event):
"""

for controller_listener in self._controller_listeners:
controller_listener.put_nowait((action, event, {}))
self._loop.call_soon_threadsafe(controller_listener.put_nowait, (action, event, {}))

def project_has_listeners(self, project_id):
"""
Expand Down Expand Up @@ -134,7 +135,7 @@ def _send_event_to_project(self, project_id, action, event):
except KeyError:
return
for listener in project_listeners:
listener.put_nowait((action, event, {}))
self._loop.call_soon_threadsafe(listener.put_nowait, (action, event, {}))

def _send_event_to_all_projects(self, action, event):
"""
Expand All @@ -146,4 +147,4 @@ def _send_event_to_all_projects(self, action, event):
"""
for project_listeners in self._project_listeners.values():
for listener in project_listeners:
listener.put_nowait((action, event, {}))
self._loop.call_soon_threadsafe(listener.put_nowait, (action, event, {}))

0 comments on commit 31a2cb9

Please sign in to comment.