diff --git a/.github/workflows/continuous-deploy.yml b/.github/workflows/continuous-deploy.yml index 4d9a8eabb..6a23fd595 100644 --- a/.github/workflows/continuous-deploy.yml +++ b/.github/workflows/continuous-deploy.yml @@ -609,6 +609,9 @@ jobs: nats --creds ./nats.creds \ --server ${{ secrets.NATS_URL }} \ stream rm acapy_events -f + nats --creds ./nats.creds \ + --server ${{ secrets.NATS_URL }} \ + stream rm cloudapi_aries_state_monitoring -f nats --creds ./nats.creds \ --server ${{ secrets.NATS_URL }} \ @@ -638,6 +641,19 @@ jobs: --replicas 3 \ --compression s2 + nats --creds ./nats.creds \ + --server ${{ secrets.NATS_URL }} \ + stream add cloudapi_aries_state_monitoring --subjects "cloudapi.aries.state_monitoring.*.*.>" \ + --defaults \ + --storage file \ + --replicas 3 \ + --compression s2 \ + --retention limits \ + --discard old \ + --max-age 1m \ + --dupe-window 1m \ + --max-msgs-per-subject 1000 + rm -f ./nats.creds - name: Helmfile Apply # Apply default helmfile (without RDS proxy) when resetting deployments. diff --git a/shared/constants.py b/shared/constants.py index 6b88e8f3a..1d97f1ce2 100644 --- a/shared/constants.py +++ b/shared/constants.py @@ -66,5 +66,7 @@ NATS_SERVER = os.getenv("NATS_SERVER", "nats://nats:4222") NATS_SUBJECT = os.getenv("NATS_SUBJECT", "cloudapi.aries.events") NATS_STREAM = os.getenv("NATS_STREAM", "cloudapi_aries_events") +NATS_STATE_STREAM = os.getenv("NATS_STATE_STREAM", "cloudapi_aries_state_monitoring") +NATS_STATE_SUBJECT = os.getenv("NATS_STATE_SUBJECT", "cloudapi.aries.state_monitoring") NATS_CREDS_FILE = os.getenv("NATS_CREDS_FILE", "") ENDORSER_DURABLE_CONSUMER = os.getenv("ENDORSER_DURABLE_CONSUMER", "endorser") diff --git a/tilt/cloudapi/nats/templates/job.yaml b/tilt/cloudapi/nats/templates/job.yaml index bf83619b7..fff3b04b1 100644 --- a/tilt/cloudapi/nats/templates/job.yaml +++ b/tilt/cloudapi/nats/templates/job.yaml @@ -40,10 +40,34 @@ spec: {{- range $stream, $config := .Values.postInstall.streams }} nats --server nats://{{ template "common.names.fullname" $ }}:{{ default 4222 $.Values.nats.service.ports.client }} stream add {{ $stream }} \ - --subjects {{ join "," $config.subjects | quote }} \ - {{- if $config.defaults }} - --defaults \ - {{- end }} - --storage {{ $config.storage }} \ - --replicas={{ $.Values.nats.replicaCount }} + --subjects {{ join "," $config.subjects | quote }} \ + {{- if $config.defaults }} + --defaults \ + {{- end }} + --storage {{ $config.storage }} \ + {{- if $config.compression }} + --compression {{ $config.compression }} \ + {{- end }} + {{- if $config.maxAge }} + --max-age {{ $config.maxAge }} \ + {{- end }} + {{- if $config.maxBytes }} + --max-bytes {{ $config.maxBytes }} \ + {{- end }} + {{- if $config.maxMsgs }} + --max-msgs {{ $config.maxMsgs }} \ + {{- end }} + {{- if $config.maxMsgsPerSubject }} + --max-msgs-per-subject {{ $config.maxMsgsPerSubject }} \ + {{- end }} + {{- if $config.retention }} + --retention {{ $config.retention }} \ + {{- end }} + {{- if $config.discard }} + --discard {{ $config.discard }} \ + {{- end }} + {{- if $config.duplicateWindow }} + --dupe-window {{ $config.duplicateWindow }} \ + {{- end }} + --replicas={{ $.Values.nats.replicaCount }} {{- end }} diff --git a/tilt/cloudapi/nats/values.yaml b/tilt/cloudapi/nats/values.yaml index bcd99f647..f1bf26a28 100644 --- a/tilt/cloudapi/nats/values.yaml +++ b/tilt/cloudapi/nats/values.yaml @@ -15,6 +15,16 @@ postInstall: - cloudapi.aries.events.*.* defaults: true storage: file + cloudapi_aries_state_monitoring: + subjects: + - cloudapi.aries.state_monitoring.*.*.> + defaults: true + storage: file + retention: limits + discard: old + maxAge: 1m + duplicateWindow: 1m + maxMsgsPerSubject: 1000 acapy_events: subjects: - acapy.> diff --git a/waypoint/routers/sse.py b/waypoint/routers/sse.py index 7ae49ff46..8204f8f14 100644 --- a/waypoint/routers/sse.py +++ b/waypoint/routers/sse.py @@ -52,6 +52,7 @@ async def nats_event_stream_generator( group_id=group_id, wallet_id=wallet_id, topic=topic, + state=desired_state, stop_event=stop_event, duration=SSE_TIMEOUT, look_back=look_back, @@ -65,7 +66,7 @@ async def nats_event_stream_generator( break payload = event.payload - if payload.get(field) == field_id and payload.get("state") == desired_state: + if payload.get(field) == field_id: logger.trace("Event found yielding event {}", event) yield event.model_dump_json() stop_event.set() diff --git a/waypoint/services/nats_service.py b/waypoint/services/nats_service.py index 22aceb40e..6b3650c5e 100644 --- a/waypoint/services/nats_service.py +++ b/waypoint/services/nats_service.py @@ -8,7 +8,7 @@ from nats.js.api import ConsumerConfig, DeliverPolicy from nats.js.client import JetStreamContext -from shared.constants import NATS_STREAM, NATS_SUBJECT +from shared.constants import NATS_STATE_STREAM, NATS_STATE_SUBJECT from shared.log_config import get_logger from shared.models.webhook_events import CloudApiWebhookEventGeneric @@ -25,7 +25,7 @@ def __init__(self, jetstream: JetStreamContext): self.js_context: JetStreamContext = jetstream async def _subscribe( - self, group_id: str, wallet_id: str, look_back: int + self, *, group_id: str, wallet_id: str, topic: str, state: str, look_back: int ) -> JetStreamContext.PullSubscription: try: logger.trace( @@ -35,14 +35,14 @@ async def _subscribe( ) group_id = group_id or "*" subscribe_kwargs = { - "subject": f"{NATS_SUBJECT}.{group_id}.{wallet_id}", - "stream": NATS_STREAM, + "subject": f"{NATS_STATE_SUBJECT}.{group_id}.{wallet_id}.{topic}.{state}", + "stream": NATS_STATE_STREAM, } # Get the current time in UTC current_time = datetime.now(timezone.utc) - # Subtract 30 seconds + # Subtract look_back time from the current time look_back_time = current_time - timedelta(seconds=look_back) # Format the time in the required format @@ -70,22 +70,29 @@ async def _subscribe( @asynccontextmanager async def process_events( self, + *, group_id: str, wallet_id: str, topic: str, + state: str, stop_event: asyncio.Event, duration: int = 10, look_back: int = 300, ): logger.debug( - "Processing events for group {} and wallet {} on topic {}", + "Processing events for group {} and wallet {} on topic {} with state {}", group_id, wallet_id, topic, + state, ) subscription = await self._subscribe( - group_id=group_id, wallet_id=wallet_id, look_back=look_back + group_id=group_id, + wallet_id=wallet_id, + topic=topic, + state=state, + look_back=look_back, ) async def event_generator(): @@ -101,9 +108,8 @@ async def event_generator(): try: messages = await subscription.fetch(batch=5, timeout=0.2) for message in messages: - if message.headers.get("event_topic") == topic: - event = orjson.loads(message.data) - yield CloudApiWebhookEventGeneric(**event) + event = orjson.loads(message.data) + yield CloudApiWebhookEventGeneric(**event) await message.ack() except TimeoutError: logger.trace("Timeout fetching messages continuing...") diff --git a/waypoint/tests/services/test_nats_service.py b/waypoint/tests/services/test_nats_service.py index f4e2d9f72..dc9a61805 100644 --- a/waypoint/tests/services/test_nats_service.py +++ b/waypoint/tests/services/test_nats_service.py @@ -9,7 +9,7 @@ from nats.js.api import ConsumerConfig, DeliverPolicy from nats.js.client import JetStreamContext -from shared.constants import NATS_STREAM, NATS_SUBJECT +from shared.constants import NATS_STATE_STREAM, NATS_STATE_SUBJECT from shared.models.webhook_events import CloudApiWebhookEventGeneric from shared.services.nats_jetstream import init_nats_client from waypoint.services.nats_service import NatsEventsProcessor @@ -60,11 +60,15 @@ async def test_nats_events_processor_subscribe( opt_start_time="2024-10-24T09:17:17.998149541Z", ) subscription = await processor._subscribe( # pylint: disable=protected-access - "group_id", "wallet_id", 300 + group_id="group_id", + wallet_id="wallet_id", + topic="proofs", + state="done", + look_back=300, ) mock_nats_client.pull_subscribe.assert_called_once_with( - subject=f"{NATS_SUBJECT}.group_id.wallet_id", - stream=NATS_STREAM, + subject=f"{NATS_STATE_SUBJECT}.group_id.wallet_id.proofs.done", + stream=NATS_STATE_STREAM, config=mock_config.return_value, ) assert isinstance(subscription, JetStreamContext.PullSubscription) @@ -79,7 +83,13 @@ async def test_nats_events_processor_subscribe_error( mock_nats_client.pull_subscribe.side_effect = exception with pytest.raises(exception): - await processor._subscribe("group_id", "wallet_id", 300) + await processor._subscribe( # pylint: disable=protected-access + group_id="group_id", + wallet_id="wallet_id", + topic="proofs", + state="done", + look_back=300, + ) @pytest.mark.anyio @@ -106,7 +116,12 @@ async def test_process_events( stop_event = asyncio.Event() async with processor.process_events( - group_id, "wallet_id", "test_topic", stop_event, duration=1 + group_id=group_id, + wallet_id="wallet_id", + topic="test_topic", + state="state", + stop_event=stop_event, + duration=1, ) as event_generator: events = [] async for event in event_generator: @@ -133,7 +148,12 @@ async def test_process_events_cancelled_error( with patch.object(mock_subscription, "fetch", side_effect=asyncio.CancelledError): async with processor.process_events( - "group_id", "wallet_id", "test_topic", stop_event, duration=1 + group_id="group_id", + wallet_id="wallet_id", + topic="test_topic", + state="state", + stop_event=stop_event, + duration=1, ) as event_generator: events = [] async for event in event_generator: @@ -155,7 +175,12 @@ async def test_process_events_timeout_error( stop_event = asyncio.Event() async with processor.process_events( - "group_id", "wallet_id", "test_topic", stop_event, duration=1 + group_id="group_id", + wallet_id="wallet_id", + topic="test_topic", + state="state", + stop_event=stop_event, + duration=1, ) as event_generator: events = [] async for event in event_generator: