Skip to content

Commit

Permalink
Merge branch 'v2' into oded/per-10154-remove-unsupported-local-api-in…
Browse files Browse the repository at this point in the history
…-sidecar
  • Loading branch information
obsd authored Jun 26, 2024
2 parents 0ffad1f + d60b0e2 commit 6f18172
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 37 deletions.
7 changes: 0 additions & 7 deletions horizon/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,6 @@ class SidecarConfig(Confi):
10,
description="The amount of time in seconds to wait for the local facts to be synced before timing out",
)
LOCAL_FACT_POST_ACK_SLEEP_S = confi.float(
"LOCAL_FACT_POST_ACK_SLEEP_S",
1,
description="The amount of time to sleep after the ack of receiving the local facts."
"Used in order to close the gap between receiving the local fact message from the server"
"and putting the actual fact in the engine.",
)

# non configurable values -------------------------------------------------

Expand Down
51 changes: 21 additions & 30 deletions horizon/facts/update_subscriber.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,50 @@
import asyncio
from collections import defaultdict
from functools import wraps
from uuid import uuid4

from fastapi_websocket_pubsub import ALL_TOPICS
from loguru import logger
from opal_client.data.updater import DataUpdater
from opal_common.schemas.data import DataUpdate
from tenacity import retry, wait_fixed, stop_after_delay
from opal_common.schemas.data import DataUpdate, DataUpdateReport

from horizon.config import sidecar_config


class DataUpdateSubscriber:
def __init__(self, updater: DataUpdater):
self._updater = updater
self._updater._should_send_reports = True
self._notifier_id = uuid4().hex
self._update_listeners: dict[str, asyncio.Event] = defaultdict(asyncio.Event)
self._register_callbacks()
self._inject_subscriber()

# Sometimes a request is sent before the client is created, so we retry the registration
@retry(wait=wait_fixed(1), stop=stop_after_delay(10), reraise=True)
def _register_callbacks(self) -> None:
"""
Register the on_message callback for incoming messages from all topics subscribed by the PubSub client.
"""
callbacks = self._updater._client._callbacks # noqa
if ALL_TOPICS not in callbacks:
callbacks[ALL_TOPICS] = []
callbacks[ALL_TOPICS].append(self._on_message)
def _inject_subscriber(self):
reporter = self._updater.callbacks_reporter
reporter.report_update_results = self._reports_callback_decorator(
reporter.report_update_results
)

async def _on_message(self, topic: str = "", data: dict | None = None) -> None:
"""
Callback for incoming messages from the PubSub client.
"""
if data is None:
logger.debug(f"Received message on topic {topic!r} without data")
return
def _reports_callback_decorator(self, func):
@wraps(func)
async def wrapper(report: DataUpdateReport, *args, **kwargs):
if report.update_id is not None:
self._resolve_listeners(report.update_id)
else:
logger.debug("Received report without update ID")
return await func(report, *args, **kwargs)

update_id = data.get("id")
if update_id is None:
logger.debug(
f"Received message on topic {topic!r} without an update ID: {data}"
)
return
return wrapper

def _resolve_listeners(self, update_id: str) -> None:
event = self._update_listeners.get(update_id)
if event is not None:
logger.debug(
f"Received message on topic {topic!r} with update ID {update_id!r}, resolving listener(s)"
f"Received acknowledgment for update ID {update_id!r}, resolving listener(s)"
)
event.set()
else:
logger.debug(
f"Received message on topic {topic!r} with update ID {update_id!r}, but no listener found"
f"Received acknowledgment for update ID {update_id!r}, but no listener found"
)

async def wait_for_message(
Expand All @@ -71,7 +63,6 @@ async def wait_for_message(
event.wait(),
timeout=timeout,
)
await asyncio.sleep(sidecar_config.LOCAL_FACT_POST_ACK_SLEEP_S)
return True
except asyncio.TimeoutError:
logger.warning(f"Timeout waiting for update id={update_id!r}")
Expand Down

0 comments on commit 6f18172

Please sign in to comment.