Skip to content

Commit

Permalink
feat: implement listening to events endpoint using callbacks
Browse files Browse the repository at this point in the history
BREAKING CHANGE: adds new method to listen for events in the background
  • Loading branch information
muhlba91 committed Nov 17, 2023
1 parent be8125b commit 330dd41
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 2 deletions.
89 changes: 89 additions & 0 deletions examples/events/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import asyncio
import getopt
from sys import argv

import aiohttp

from onyx_client.client import create
from onyx_client.data.device_command import DeviceCommand


class LoggingClientSession(aiohttp.ClientSession):
"""Used to intercept requests and to be logged."""

def __init__(self, enable_logging: bool = True):
"""Initialize the custom logging session."""
self.enable_logging = enable_logging
super().__init__()

async def _request(self, method, url, **kwargs):
if self.enable_logging:
print(f"Starting request {method} {url} {kwargs}\n")
return await super()._request(method, url, **kwargs)


async def shutter_worker(queue, client, device_id):
"""Worker processing our position commands."""
while True:
position = await queue.get()
print(
await client.send_command(
device_id, DeviceCommand(properties={"target_position": position})
)
)
await asyncio.sleep(15)
queue.task_done()


async def perform(fingerprint: str, access_token: str):
"""Performs your actions."""

# open session and create client
session = LoggingClientSession()
client = create(
fingerprint=fingerprint, access_token=access_token, client_session=session
)

# verify API
print(await client.verify())
print()

# get all devices
devices = await client.devices(include_details=True)
print(devices)
print()

# call the events API
def received(device):
print(device)
print(device.actual_position)
print(device.actual_position.animation)

client.set_event_callback(received)
client.start()

queue = asyncio.Queue()
queue.put_nowait(300)
await queue.join()
await session.close()


if __name__ == "__main__":
# process command line args
finger = ""
token = ""
opts, args = getopt.getopt(argv[1:], "hf:t:", ["fingerprint=", "token="])
for opt, arg in opts:
if opt in ("-f", "--fingerprint"):
finger = arg
elif opt in ("-t", "--token"):
token = arg

# check if args are not empty
if len(finger) == 0 or len(token) == 0:
print("No fingerprint and/or access token provided.")
exit(1)

# we are async, so wait until everything completed
loop = asyncio.get_event_loop()
loop.run_until_complete(perform(finger, token))
File renamed without changes.
57 changes: 56 additions & 1 deletion onyx_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import json
import logging
from typing import Optional
from random import uniform

import aiohttp
import asyncio

from onyx_client.configuration.configuration import Configuration
from onyx_client.data.date_information import DateInformation
Expand All @@ -14,7 +16,7 @@
from onyx_client.enum.device_type import DeviceType
from onyx_client.group.group import Group
from onyx_client.helpers.url import UrlHelper
from onyx_client.utils.const import API_VERSION
from onyx_client.utils.const import API_VERSION, MAX_BACKOFF_TIME
from onyx_client.utils.filter import present
from onyx_client.utils.mapper import init_device

Expand All @@ -32,6 +34,11 @@ def __init__(self, config: Configuration, client_session: aiohttp.ClientSession)
"""Initialize the API client."""
self.config = config
self.url_helper = UrlHelper(config, client_session)
self._shutdown = True
self._readLoopTask = None
self._eventLoop = asyncio.get_event_loop()
self._activeTasks = set()
self._event_callback = None

async def supported_versions(self) -> Optional[SupportedVersions]:
"""Get all supported versions by the ONYX.CENTER."""
Expand Down Expand Up @@ -233,6 +240,54 @@ async def events(self, include_details: bool = False) -> Device:
"Received unknown device data. Dropping device %s", key
)

def start(self, include_details: bool = False):
"""Starts the event stream via callback."""
self._shutdown = False
self._readLoopTask = self._create_internal_task(
self._read_loop(include_details), name="read_loop"
)

def stop(self):
"""Stops the event stream via callback."""
self._shutdown = True

def set_event_callback(self, callback):
"""Sets the event stream callback."""
self._event_callback = callback

def _create_internal_task(self, coro, name=None):
"""Creates an internal task running in the background."""
task = self._eventLoop.create_task(coro, name=name)
task.add_done_callback(self._complete_internal_task)
self._activeTasks.add(task)

def _complete_internal_task(self, task):
"""Removes an internal task that was running in the background."""
self._activeTasks.remove(task)
if not task.cancelled():
ex = task.exception()
_LOGGER.error("Unexpected exception: %r", ex)
raise ex

async def _read_loop(self, include_details: bool = False):
"""Streams data from the ONYX API endpoint and emits device updates.
Updates are emitted as events through the event_callback."""
while not self._shutdown:
async for device in self.events(include_details):
if self._shutdown:
break
if self._event_callback is not None:
_LOGGER.info("Received device: %s", device)
self._event_callback(device)
else:
_LOGGER.warning("Received data but no callback is defined")

# lost connection so reattempt connection with a backoff time
if not self._shutdown:
backoff = int(uniform(0, MAX_BACKOFF_TIME) * 60)
_LOGGER.error("Lost connection. Reconnection attempt in %ds", backoff)
await asyncio.sleep(backoff)


def create(
config: Configuration = None,
Expand Down
2 changes: 2 additions & 0 deletions onyx_client/utils/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@
"Content-Type": "application/json",
}
API_VERSION = "v3"

MAX_BACKOFF_TIME = 3
15 changes: 14 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ classifiers = [
[tool.poetry.dependencies]
python = "^3.11"
aiohttp = "^3.8.5"
asyncio = "^3.4.0"

[tool.poetry.dev-dependencies]
pytest = "^7.2.1"
Expand Down
131 changes: 131 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ def test_create_client(mock_session):
assert client.url_helper.client_session == mock_session
assert client.url_helper.config.fingerprint == "finger"
assert client.url_helper.config.access_token == "token"
assert client._shutdown
assert client._readLoopTask is None
assert client._eventLoop is not None
assert len(client._activeTasks) == 0
assert client._event_callback is None


@patch("aiohttp.ClientSession")
Expand All @@ -46,6 +51,11 @@ def test_create_client_with_config(mock_session):
assert client.config is not None
assert client.config.fingerprint == "finger"
assert client.config.access_token == "token"
assert client._shutdown
assert client._readLoopTask is None
assert client._eventLoop is not None
assert len(client._activeTasks) == 0
assert client._event_callback is None


@patch("aiohttp.ClientSession")
Expand All @@ -56,6 +66,11 @@ def test_create_client_without_session(mock_session):
assert client.url_helper.config.access_token == "token"
assert client.config.fingerprint == "finger"
assert client.config.access_token == "token"
assert client._shutdown
assert client._readLoopTask is None
assert client._eventLoop is not None
assert len(client._activeTasks) == 0
assert client._event_callback is None


class TestOnyxClient:
Expand All @@ -74,6 +89,42 @@ def mock_response(self):
def client(self, session) -> OnyxClient:
yield OnyxClient(Configuration("finger", "token"), session)

def test_set_event_callback(self, client):
callback = 0
client.set_event_callback(callback)
assert client._event_callback == callback

def test__complete_internal_task(self, client):
class Task:
def cancelled(self):
return True

callback = Task()
client._activeTasks.add(callback)
assert len(client._activeTasks) == 1
assert callback in client._activeTasks
client._complete_internal_task(callback)
assert len(client._activeTasks) == 0

def test__complete_internal_task_cancelled(self, client):
class Task:
def cancelled(self):
return False

def exception(self):
raise Exception("error")

callback = Task()
client._activeTasks.add(callback)
assert len(client._activeTasks) == 1
assert callback in client._activeTasks
with pytest.raises(Exception):
client._complete_internal_task(callback)
assert len(client._activeTasks) == 0

def test_stop(self, client):
assert client._shutdown

@pytest.mark.asyncio
async def test_verify(self, mock_response, client):
mock_response.get(
Expand Down Expand Up @@ -816,6 +867,86 @@ async def test_events(self, mock_response, client):
index += 1
assert index == 4

@patch("onyx_client.client.OnyxClient._complete_internal_task")
@pytest.mark.asyncio
async def test_start(self, mock__complete_internal_task, mock_response, client):
mock_response.get(
f"{API_URL}/box/finger/api/{API_VERSION}/events",
status=200,
body='data: { "devices": { "device1":'
'{ "name": "device1", "type": "rollershutter" },'
'"device2": { "name": "device2" },'
'"device3": { "type": "rollershutter" } } }',
)

def callback(device):
if device.identifier == "device3":
client.stop()

async def check_index(task):
assert task in client._activeTasks
assert client._shutdown
assert len(client._activeTasks) == 0
await task

mock__complete_internal_task.side_effect = check_index
client.set_event_callback(callback)
client.start()
assert len(client._activeTasks) == 1
assert not client._shutdown
for task in client._activeTasks.copy():
await task

@pytest.mark.asyncio
async def test_start_break(self, mock_response, client):
mock_response.get(
f"{API_URL}/box/finger/api/{API_VERSION}/events",
status=200,
body='data: { "devices": { "device1":'
'{ "name": "device1", "type": "rollershutter" },'
'"device2": { "name": "device2" },'
'"device3": { "type": "rollershutter" } } }',
)

def callback(device):
if device.identifier == "device1":
client.stop()

client.set_event_callback(callback)
client.start()
assert len(client._activeTasks) == 1
assert not client._shutdown
for task in client._activeTasks.copy():
await task

@pytest.mark.asyncio
async def test_start_without_callback(self, mock_response, client):
mock_response.get(
f"{API_URL}/box/finger/api/{API_VERSION}/events",
status=200,
body='data: { "devices": { "device1":'
'{ "name": "device1", "type": "rollershutter" },'
'"device2": { "name": "device2" },'
'"device3": { "type": "rollershutter" } } }',
)

client.start()
assert not client._shutdown
assert len(client._activeTasks) == 1

@patch("onyx_client.client.OnyxClient.events")
@pytest.mark.asyncio
async def test_start_with_events_error(self, mock_events, client):
mock_events.side_effect = Exception("error")
client.start()
assert not client._shutdown
assert len(client._activeTasks) == 1
for task in client._activeTasks.copy():
with pytest.raises(Exception):
await task
assert len(client._activeTasks) == 0
assert mock_events.called

@patch("onyx_client.client.OnyxClient.device")
@pytest.mark.asyncio
async def test_events_details(self, mock_device, mock_response, client):
Expand Down

0 comments on commit 330dd41

Please sign in to comment.