From 330dd41964175ebb98a6f995e83c431805abde6d Mon Sep 17 00:00:00 2001 From: Daniel Muehlbachler-Pietrzykowski Date: Fri, 17 Nov 2023 13:01:29 +0100 Subject: [PATCH] feat: implement listening to events endpoint using callbacks BREAKING CHANGE: adds new method to listen for events in the background --- examples/events/main.py | 89 ++++++++++++++++ {example => examples/streaming}/main.py | 0 onyx_client/client.py | 57 ++++++++++- onyx_client/utils/const.py | 2 + poetry.lock | 15 ++- pyproject.toml | 1 + tests/test_client.py | 131 ++++++++++++++++++++++++ 7 files changed, 293 insertions(+), 2 deletions(-) create mode 100644 examples/events/main.py rename {example => examples/streaming}/main.py (100%) diff --git a/examples/events/main.py b/examples/events/main.py new file mode 100644 index 0000000..a61b386 --- /dev/null +++ b/examples/events/main.py @@ -0,0 +1,89 @@ +import asyncio +import getopt +from sys import argv + +import aiohttp + +from onyx_client.client import create +from onyx_client.data.device_command import DeviceCommand + + +class LoggingClientSession(aiohttp.ClientSession): + """Used to intercept requests and to be logged.""" + + def __init__(self, enable_logging: bool = True): + """Initialize the custom logging session.""" + self.enable_logging = enable_logging + super().__init__() + + async def _request(self, method, url, **kwargs): + if self.enable_logging: + print(f"Starting request {method} {url} {kwargs}\n") + return await super()._request(method, url, **kwargs) + + +async def shutter_worker(queue, client, device_id): + """Worker processing our position commands.""" + while True: + position = await queue.get() + print( + await client.send_command( + device_id, DeviceCommand(properties={"target_position": position}) + ) + ) + await asyncio.sleep(15) + queue.task_done() + + +async def perform(fingerprint: str, access_token: str): + """Performs your actions.""" + + # open session and create client + session = LoggingClientSession() + client = create( + fingerprint=fingerprint, access_token=access_token, client_session=session + ) + + # verify API + print(await client.verify()) + print() + + # get all devices + devices = await client.devices(include_details=True) + print(devices) + print() + + # call the events API + def received(device): + print(device) + print(device.actual_position) + print(device.actual_position.animation) + + client.set_event_callback(received) + client.start() + + queue = asyncio.Queue() + queue.put_nowait(300) + await queue.join() + await session.close() + + +if __name__ == "__main__": + # process command line args + finger = "" + token = "" + opts, args = getopt.getopt(argv[1:], "hf:t:", ["fingerprint=", "token="]) + for opt, arg in opts: + if opt in ("-f", "--fingerprint"): + finger = arg + elif opt in ("-t", "--token"): + token = arg + + # check if args are not empty + if len(finger) == 0 or len(token) == 0: + print("No fingerprint and/or access token provided.") + exit(1) + + # we are async, so wait until everything completed + loop = asyncio.get_event_loop() + loop.run_until_complete(perform(finger, token)) diff --git a/example/main.py b/examples/streaming/main.py similarity index 100% rename from example/main.py rename to examples/streaming/main.py diff --git a/onyx_client/client.py b/onyx_client/client.py index e49eb0a..a43972e 100644 --- a/onyx_client/client.py +++ b/onyx_client/client.py @@ -2,8 +2,10 @@ import json import logging from typing import Optional +from random import uniform import aiohttp +import asyncio from onyx_client.configuration.configuration import Configuration from onyx_client.data.date_information import DateInformation @@ -14,7 +16,7 @@ from onyx_client.enum.device_type import DeviceType from onyx_client.group.group import Group from onyx_client.helpers.url import UrlHelper -from onyx_client.utils.const import API_VERSION +from onyx_client.utils.const import API_VERSION, MAX_BACKOFF_TIME from onyx_client.utils.filter import present from onyx_client.utils.mapper import init_device @@ -32,6 +34,11 @@ def __init__(self, config: Configuration, client_session: aiohttp.ClientSession) """Initialize the API client.""" self.config = config self.url_helper = UrlHelper(config, client_session) + self._shutdown = True + self._readLoopTask = None + self._eventLoop = asyncio.get_event_loop() + self._activeTasks = set() + self._event_callback = None async def supported_versions(self) -> Optional[SupportedVersions]: """Get all supported versions by the ONYX.CENTER.""" @@ -233,6 +240,54 @@ async def events(self, include_details: bool = False) -> Device: "Received unknown device data. Dropping device %s", key ) + def start(self, include_details: bool = False): + """Starts the event stream via callback.""" + self._shutdown = False + self._readLoopTask = self._create_internal_task( + self._read_loop(include_details), name="read_loop" + ) + + def stop(self): + """Stops the event stream via callback.""" + self._shutdown = True + + def set_event_callback(self, callback): + """Sets the event stream callback.""" + self._event_callback = callback + + def _create_internal_task(self, coro, name=None): + """Creates an internal task running in the background.""" + task = self._eventLoop.create_task(coro, name=name) + task.add_done_callback(self._complete_internal_task) + self._activeTasks.add(task) + + def _complete_internal_task(self, task): + """Removes an internal task that was running in the background.""" + self._activeTasks.remove(task) + if not task.cancelled(): + ex = task.exception() + _LOGGER.error("Unexpected exception: %r", ex) + raise ex + + async def _read_loop(self, include_details: bool = False): + """Streams data from the ONYX API endpoint and emits device updates. + Updates are emitted as events through the event_callback.""" + while not self._shutdown: + async for device in self.events(include_details): + if self._shutdown: + break + if self._event_callback is not None: + _LOGGER.info("Received device: %s", device) + self._event_callback(device) + else: + _LOGGER.warning("Received data but no callback is defined") + + # lost connection so reattempt connection with a backoff time + if not self._shutdown: + backoff = int(uniform(0, MAX_BACKOFF_TIME) * 60) + _LOGGER.error("Lost connection. Reconnection attempt in %ds", backoff) + await asyncio.sleep(backoff) + def create( config: Configuration = None, diff --git a/onyx_client/utils/const.py b/onyx_client/utils/const.py index 60335cb..4f13254 100644 --- a/onyx_client/utils/const.py +++ b/onyx_client/utils/const.py @@ -5,3 +5,5 @@ "Content-Type": "application/json", } API_VERSION = "v3" + +MAX_BACKOFF_TIME = 3 diff --git a/poetry.lock b/poetry.lock index c5db0fe..d632fa0 100644 --- a/poetry.lock +++ b/poetry.lock @@ -147,6 +147,19 @@ files = [ {file = "async_timeout-4.0.3-py3-none-any.whl", hash = "sha256:7405140ff1230c310e51dc27b3145b9092d659ce68ff733fb0cefe3ee42be028"}, ] +[[package]] +name = "asyncio" +version = "3.4.3" +description = "reference implementation of PEP 3156" +optional = false +python-versions = "*" +files = [ + {file = "asyncio-3.4.3-cp33-none-win32.whl", hash = "sha256:b62c9157d36187eca799c378e572c969f0da87cd5fc42ca372d92cdb06e7e1de"}, + {file = "asyncio-3.4.3-cp33-none-win_amd64.whl", hash = "sha256:c46a87b48213d7464f22d9a497b9eef8c1928b68320a2fa94240f969f6fec08c"}, + {file = "asyncio-3.4.3-py3-none-any.whl", hash = "sha256:c4d18b22701821de07bd6aea8b53d21449ec0ec5680645e5317062ea21817d2d"}, + {file = "asyncio-3.4.3.tar.gz", hash = "sha256:83360ff8bc97980e4ff25c964c7bd3923d333d177aa4f7fb736b019f26c7cb41"}, +] + [[package]] name = "attrs" version = "23.1.0" @@ -1079,4 +1092,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "cb08bb3070abda867bf47c15a63b2ce1668c46df8359b19f8433c889cb1560e2" +content-hash = "c0950e42342490dc036fba0270ed918005c04d31449d9e935ba97975f9cd8ff0" diff --git a/pyproject.toml b/pyproject.toml index b545096..8c0c3f3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ classifiers = [ [tool.poetry.dependencies] python = "^3.11" aiohttp = "^3.8.5" +asyncio = "^3.4.0" [tool.poetry.dev-dependencies] pytest = "^7.2.1" diff --git a/tests/test_client.py b/tests/test_client.py index 139e4af..abc86eb 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -32,6 +32,11 @@ def test_create_client(mock_session): assert client.url_helper.client_session == mock_session assert client.url_helper.config.fingerprint == "finger" assert client.url_helper.config.access_token == "token" + assert client._shutdown + assert client._readLoopTask is None + assert client._eventLoop is not None + assert len(client._activeTasks) == 0 + assert client._event_callback is None @patch("aiohttp.ClientSession") @@ -46,6 +51,11 @@ def test_create_client_with_config(mock_session): assert client.config is not None assert client.config.fingerprint == "finger" assert client.config.access_token == "token" + assert client._shutdown + assert client._readLoopTask is None + assert client._eventLoop is not None + assert len(client._activeTasks) == 0 + assert client._event_callback is None @patch("aiohttp.ClientSession") @@ -56,6 +66,11 @@ def test_create_client_without_session(mock_session): assert client.url_helper.config.access_token == "token" assert client.config.fingerprint == "finger" assert client.config.access_token == "token" + assert client._shutdown + assert client._readLoopTask is None + assert client._eventLoop is not None + assert len(client._activeTasks) == 0 + assert client._event_callback is None class TestOnyxClient: @@ -74,6 +89,42 @@ def mock_response(self): def client(self, session) -> OnyxClient: yield OnyxClient(Configuration("finger", "token"), session) + def test_set_event_callback(self, client): + callback = 0 + client.set_event_callback(callback) + assert client._event_callback == callback + + def test__complete_internal_task(self, client): + class Task: + def cancelled(self): + return True + + callback = Task() + client._activeTasks.add(callback) + assert len(client._activeTasks) == 1 + assert callback in client._activeTasks + client._complete_internal_task(callback) + assert len(client._activeTasks) == 0 + + def test__complete_internal_task_cancelled(self, client): + class Task: + def cancelled(self): + return False + + def exception(self): + raise Exception("error") + + callback = Task() + client._activeTasks.add(callback) + assert len(client._activeTasks) == 1 + assert callback in client._activeTasks + with pytest.raises(Exception): + client._complete_internal_task(callback) + assert len(client._activeTasks) == 0 + + def test_stop(self, client): + assert client._shutdown + @pytest.mark.asyncio async def test_verify(self, mock_response, client): mock_response.get( @@ -816,6 +867,86 @@ async def test_events(self, mock_response, client): index += 1 assert index == 4 + @patch("onyx_client.client.OnyxClient._complete_internal_task") + @pytest.mark.asyncio + async def test_start(self, mock__complete_internal_task, mock_response, client): + mock_response.get( + f"{API_URL}/box/finger/api/{API_VERSION}/events", + status=200, + body='data: { "devices": { "device1":' + '{ "name": "device1", "type": "rollershutter" },' + '"device2": { "name": "device2" },' + '"device3": { "type": "rollershutter" } } }', + ) + + def callback(device): + if device.identifier == "device3": + client.stop() + + async def check_index(task): + assert task in client._activeTasks + assert client._shutdown + assert len(client._activeTasks) == 0 + await task + + mock__complete_internal_task.side_effect = check_index + client.set_event_callback(callback) + client.start() + assert len(client._activeTasks) == 1 + assert not client._shutdown + for task in client._activeTasks.copy(): + await task + + @pytest.mark.asyncio + async def test_start_break(self, mock_response, client): + mock_response.get( + f"{API_URL}/box/finger/api/{API_VERSION}/events", + status=200, + body='data: { "devices": { "device1":' + '{ "name": "device1", "type": "rollershutter" },' + '"device2": { "name": "device2" },' + '"device3": { "type": "rollershutter" } } }', + ) + + def callback(device): + if device.identifier == "device1": + client.stop() + + client.set_event_callback(callback) + client.start() + assert len(client._activeTasks) == 1 + assert not client._shutdown + for task in client._activeTasks.copy(): + await task + + @pytest.mark.asyncio + async def test_start_without_callback(self, mock_response, client): + mock_response.get( + f"{API_URL}/box/finger/api/{API_VERSION}/events", + status=200, + body='data: { "devices": { "device1":' + '{ "name": "device1", "type": "rollershutter" },' + '"device2": { "name": "device2" },' + '"device3": { "type": "rollershutter" } } }', + ) + + client.start() + assert not client._shutdown + assert len(client._activeTasks) == 1 + + @patch("onyx_client.client.OnyxClient.events") + @pytest.mark.asyncio + async def test_start_with_events_error(self, mock_events, client): + mock_events.side_effect = Exception("error") + client.start() + assert not client._shutdown + assert len(client._activeTasks) == 1 + for task in client._activeTasks.copy(): + with pytest.raises(Exception): + await task + assert len(client._activeTasks) == 0 + assert mock_events.called + @patch("onyx_client.client.OnyxClient.device") @pytest.mark.asyncio async def test_events_details(self, mock_device, mock_response, client):