From f34cb6a5ccf2aa544ad7120db43cbf2cb54d55f9 Mon Sep 17 00:00:00 2001 From: tt2468 Date: Thu, 30 Sep 2021 01:22:06 -0700 Subject: [PATCH] Update to 1.2.1 - Fixes concurrency issue caused by an unresolved bug in Python - Increases max message payload size to 16MB (from 8) - Removes some unneeded code --- setup.py | 2 +- simpleobsws.py | 26 +++++++++++++------------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/setup.py b/setup.py index 44c84f6..8055f2e 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setuptools.setup( name="simpleobsws", - version="1.2.0", + version="1.2.1", author="tt2468", author_email="tt2468@gmail.com", description="A simple obs-websocket library in async Python for people who just want JSON output.", diff --git a/simpleobsws.py b/simpleobsws.py index 26d7ee8..5424cfb 100644 --- a/simpleobsws.py +++ b/simpleobsws.py @@ -65,6 +65,14 @@ class EventRegistrationError(Exception): class NotIdentifiedError(Exception): pass +async def _wait_cond(cond): + async with cond: + await cond.wait() + +async def _wait_for_cond(cond, func): + async with cond: + await cond.wait_for(func) + class WebSocketClient: def __init__(self, url: str = "ws://localhost:4444", @@ -92,7 +100,7 @@ async def connect(self): self.recv_task = None self.identified = False self.hello_message = None - self.ws = await websockets.connect(self.url, max_size=2**23) + self.ws = await websockets.connect(self.url, max_size=2**24) self.recv_task = self.loop.create_task(self._ws_recv_task()) return True @@ -101,12 +109,9 @@ async def wait_until_identified(self, timeout: int = 10): log.debug('WebSocket session is not open. Returning early.') return False try: - async with self.cond: - await asyncio.wait_for(self.cond.wait_for(self.is_identified), timeout=timeout) + await asyncio.wait_for(_wait_for_cond(self.cond, self.is_identified), timeout=timeout) return True except asyncio.TimeoutError: - #if not self.ws.open: - # log.debug('WebSocket session is no longer open. Returning early.') return False @@ -141,8 +146,7 @@ async def call(self, request: Request, timeout: int = 15): try: self.waiters[request_id] = waiter await self.ws.send(json.dumps(request_payload)) - async with waiter.cond: - await asyncio.wait_for(waiter.cond.wait(), timeout=timeout) + await asyncio.wait_for(_wait_cond(waiter.cond), timeout=timeout) except asyncio.TimeoutError: raise MessageTimeout('The request with type {} timed out after {} seconds.'.format(request.requestType, timeout)) finally: @@ -191,8 +195,7 @@ async def call_batch(self, requests: list, timeout: int = 15, halt_on_failure: b try: self.waiters[request_batch_id] = waiter await self.ws.send(json.dumps(request_batch_payload)) - async with waiter.cond: - await asyncio.wait_for(waiter.cond.wait(), timeout=timeout) + await asyncio.wait_for(_wait_cond(waiter.cond), timeout=timeout) except asyncio.TimeoutError: raise MessageTimeout('The request batch timed out after {} seconds.'.format(timeout)) finally: @@ -272,7 +275,7 @@ async def _ws_recv_task(self): while self.ws.open: message = '' try: - message = await asyncio.wait_for(self.ws.recv(), timeout=5) + message = await self.ws.recv() if not message: continue incoming_payload = json.loads(message) @@ -318,7 +321,4 @@ async def _ws_recv_task(self): break except json.JSONDecodeError: continue - except asyncio.TimeoutError: - async with self.cond: - self.cond.notify_all() self.identified = False