From d60b0e248d352485cdd041e5f77d03c7f5cbb1b3 Mon Sep 17 00:00:00 2001 From: Dan Yishai Date: Tue, 25 Jun 2024 13:39:08 +0300 Subject: [PATCH] dan/per-10153-change-local-facts-uploader-to-use-update-callbacks (#157) * Changed update subscriber to wait for OPA respond * Fixed report without update id * Removed sleep after wait * Changed decorator name --- horizon/config.py | 7 ---- horizon/facts/update_subscriber.py | 51 ++++++++++++------------------ 2 files changed, 21 insertions(+), 37 deletions(-) diff --git a/horizon/config.py b/horizon/config.py index a2d817d5..8a597267 100644 --- a/horizon/config.py +++ b/horizon/config.py @@ -168,13 +168,6 @@ class SidecarConfig(Confi): 10, description="The amount of time in seconds to wait for the local facts to be synced before timing out", ) - LOCAL_FACT_POST_ACK_SLEEP_S = confi.float( - "LOCAL_FACT_POST_ACK_SLEEP_S", - 1, - description="The amount of time to sleep after the ack of receiving the local facts." - "Used in order to close the gap between receiving the local fact message from the server" - "and putting the actual fact in the engine.", - ) # non configurable values ------------------------------------------------- diff --git a/horizon/facts/update_subscriber.py b/horizon/facts/update_subscriber.py index ecc8aee9..56ddd08d 100644 --- a/horizon/facts/update_subscriber.py +++ b/horizon/facts/update_subscriber.py @@ -1,12 +1,11 @@ import asyncio from collections import defaultdict +from functools import wraps from uuid import uuid4 -from fastapi_websocket_pubsub import ALL_TOPICS from loguru import logger from opal_client.data.updater import DataUpdater -from opal_common.schemas.data import DataUpdate -from tenacity import retry, wait_fixed, stop_after_delay +from opal_common.schemas.data import DataUpdate, DataUpdateReport from horizon.config import sidecar_config @@ -14,45 +13,38 @@ class DataUpdateSubscriber: def __init__(self, updater: DataUpdater): self._updater = updater + self._updater._should_send_reports = True self._notifier_id = uuid4().hex self._update_listeners: dict[str, asyncio.Event] = defaultdict(asyncio.Event) - self._register_callbacks() + self._inject_subscriber() - # Sometimes a request is sent before the client is created, so we retry the registration - @retry(wait=wait_fixed(1), stop=stop_after_delay(10), reraise=True) - def _register_callbacks(self) -> None: - """ - Register the on_message callback for incoming messages from all topics subscribed by the PubSub client. - """ - callbacks = self._updater._client._callbacks # noqa - if ALL_TOPICS not in callbacks: - callbacks[ALL_TOPICS] = [] - callbacks[ALL_TOPICS].append(self._on_message) + def _inject_subscriber(self): + reporter = self._updater.callbacks_reporter + reporter.report_update_results = self._reports_callback_decorator( + reporter.report_update_results + ) - async def _on_message(self, topic: str = "", data: dict | None = None) -> None: - """ - Callback for incoming messages from the PubSub client. - """ - if data is None: - logger.debug(f"Received message on topic {topic!r} without data") - return + def _reports_callback_decorator(self, func): + @wraps(func) + async def wrapper(report: DataUpdateReport, *args, **kwargs): + if report.update_id is not None: + self._resolve_listeners(report.update_id) + else: + logger.debug("Received report without update ID") + return await func(report, *args, **kwargs) - update_id = data.get("id") - if update_id is None: - logger.debug( - f"Received message on topic {topic!r} without an update ID: {data}" - ) - return + return wrapper + def _resolve_listeners(self, update_id: str) -> None: event = self._update_listeners.get(update_id) if event is not None: logger.debug( - f"Received message on topic {topic!r} with update ID {update_id!r}, resolving listener(s)" + f"Received acknowledgment for update ID {update_id!r}, resolving listener(s)" ) event.set() else: logger.debug( - f"Received message on topic {topic!r} with update ID {update_id!r}, but no listener found" + f"Received acknowledgment for update ID {update_id!r}, but no listener found" ) async def wait_for_message( @@ -71,7 +63,6 @@ async def wait_for_message( event.wait(), timeout=timeout, ) - await asyncio.sleep(sidecar_config.LOCAL_FACT_POST_ACK_SLEEP_S) return True except asyncio.TimeoutError: logger.warning(f"Timeout waiting for update id={update_id!r}")