diff --git a/app/routes/sse.py b/app/routes/sse.py index d6cffc07d5..2613c552f4 100644 --- a/app/routes/sse.py +++ b/app/routes/sse.py @@ -39,16 +39,30 @@ async def get_sse_subscribe_event_with_field_and_state( auth: AcaPyAuthVerified = Depends(acapy_auth_verified), ) -> StreamingResponse: """ + Subscribe to SSE events wait for a desired state with a field filter. + --- + ***This endpoint can't be called on the swagger UI, as it requires a stream response.*** + Wait for a desired state to be reached for some event for this wallet and topic, filtering for payloads that contain `field:field_id`. + example: `/{wallet_id}/credentials/connection_id/some-uuid/done` will stream a credential exchange event on a + specific connection with state done. + The field and field ID pair must be present in the payload (other than state) for the event to be streamed. + The stream will be closed after the event is returned. + Parameters: ----------- - wallet_id: The ID of the wallet subscribing to the events. - topic: The topic to which the wallet is subscribing. - field: The field to which the wallet is subscribing. - field_id: The ID of the field subscribing to the events. - desired_state: The desired state to be reached. + wallet_id: + The ID of the wallet subscribing to the events. + topic: + The topic to which the wallet is subscribing. + field: + The field to which the wallet is subscribing. + field_id: + The ID of the field subscribing to the events. + desired_state: + The desired state to be reached. """ logger.bind( body={ diff --git a/docs/NATS.md b/docs/NATS.md new file mode 100644 index 0000000000..595589c0fc --- /dev/null +++ b/docs/NATS.md @@ -0,0 +1,30 @@ +# NATS + +**NATS** is an open-source messaging system designed for high-performance, lightweight, and reliable communication between +distributed applications. It supports **pub-sub** (publish-subscribe), **request-reply**, and **message queue** patterns, +allowing for flexible communication between microservices, IoT devices, and cloud-native systems. + +## Key Features + +- Simple: Text-based protocol with straightforward publish-subscribe semantics +- Fast: Written in Go, capable of millions of messages per second +- Lightweight: Small footprint, minimal dependencies +- Cloud Native: Built for modern distributed systems + +## Core Concepts + +- Publishers: Send messages to subjects +- Subscribers: Receive messages from subjects +- Subjects: Named channels for message routing +- Queue Groups: Load balance messages across subscribers + +## Message Patterns + +- Publish/Subscribe: One-to-many message distribution +- Request/Reply: Synchronous communication +- Queue Groups: Load balanced message processing +- Stream Processing: Persistent message streams (via NATS Streaming/JetStream) + +## Consuming our NATS + +Please contact us for help with connecting/authenticating to our NATS service diff --git a/docs/Webhooks.md b/docs/Webhooks.md index 8ac3484c54..1a48e0741e 100644 --- a/docs/Webhooks.md +++ b/docs/Webhooks.md @@ -1,179 +1,102 @@ -# Webhooks - -The webhooks container serves as both a relay and storage for the webhooks, -enabling hooks to be retrieved at a later time. Furthermore, the webhooks -container processes the webhooks in two distinct ways. - -Firstly, the hooks are converted into a shared format, which is shared with the -CloudAPI. This standardization is beneficial as the information and its format -in both the hooks and the CloudAPI are the same. This consistency allows for -easier client implementation as it only needs to anticipate a single data -structure from two endpoints, even if it's using v1 or v2 protocols. - -Secondly, the webhooks are stored by topic and wallet ID. Consequently, the hooks -can be retrieved in these two ways - per wallet or per wallet per topic (but not -per topic - a choice made to reduce the data footprint). - -There are also two ways to retrieve webhook data. The first method is the -classic HTTP request, potentially used multiple times to create a polling system. -Find the HTTP endpoints via the -[webhooks Swagger UI](http://webhooks.cloudapi.127.0.0.1.nip.io/docs) and create -your own polling mechanism. Alternatively, you can subscribe via the PubSub -mechanism (underlying Websockets) at the -[/pubsub endpoint](http://webhooks.cloudapi.127.0.0.1.nip.io/pubsub) by -specifying one or more topics. A brief example of how to do this in Python using -the [`fastapi_websocket_pubsub`](https://github.com/permitio/fastapi_websocket_pubsub) -package can be found in `webhooks/clients.example.py`. - ->**_NOTE_**: The webhooks container is **NOT** intended to be directly exposed -to the world wide web, especially not via pubsub/websocket as there is NO -authentication mechanism in place. Exposing the websocket will leave anyone on -the internet able to read any webhook. - -Valid topics are: +# Webhooks/Event Consumption -```python -topics = Literal[ - "basic-messages", - "connections", - "proofs", - "credentials", - "endorsements", - "oob", - "revocation", - "issuer_cred_rev", - "problem_report", -] -``` - -A client can subscribe to the webhooks via the CloudAPI (as opposed to directly -via the webhooks container). This requires only targeting the `/webhooks` -endpoint and optionally targeting the `/webhooks/{topic}` sub-route by suffixing -a topic. Using the authentication mechanism (see section below), the app -automatically extracts the required info about the wallet (i.e., the wallet id -and JWT) and retrieves only the associated webhooks with a particular wallet. -Failing to authenticate will return a 403 HTTP Error. - -## Implementing your webhook listener - -You can (assuming you are within the docker network) use a pubsub client (see -also `webhooks/clients.example.py`): - -```python -from fastapi_websocket_pubsub import PubSubClient -import asyncio +## Primary Method: NATS +The recommended and most efficient way to consume events is through our NATS implementation. NATS provides a robust, +scalable solution for event streaming and processing. For detailed information about consuming events from our NATS implementation, +please refer to this document: [NATS](NATS.md). -async def on_events(data, topic): - print(f"{topic}:\n{data}") +## Alternative: Waypoint Service +The Waypoint service provides a specialized Server-Sent Events (SSE) endpoint for cases where you need to wait for a specific +event that you know is coming. This is particularly useful when you need to track the state change of a specific entity +and have all the necessary filter information. -async def main(): - # Create a client and subscribe to topics - topics = ["connections", "credentials", "proofs", "endorsements", "basic-messages"] - client = PubSubClient([*topics], callback=on_events) +The Waypoint service exposes a single SSE endpoint, via the main app, for event streaming: - client.start_client(f"ws://webhooks.cloudapi.127.0.0.1.nip.io/pubsub") - await client.wait_until_done() +- `GET` `/v1/sse/{wallet_id}/{topic}/{field}/{field_id}/{desired_state}` +This endpoint is designed for targeted event retrieval where you can specify exact filters to wait for a particular event. -asyncio.run(main()) -``` +### How to use -All webhooks are by default logged to the container's stdout, which can be handy -for debugging or development. You can easily pipe them into a file. On a Unix -machine, you can use this command to follow the webhook logs: +The endpoint requires specific filter parameters: -```bash -kubectl logs -f $(kubectl get pods \ - -l app.kubernetes.io/instance=governance-webhooks-web \ - -o jsonpath="{.items[0].metadata.name}") -``` +- `wallet_id`: Identifier for the specific wallet +- `topic`: The event topic to subscribe to +- `field`: Filter field from the event payload +- `field_id`: Specific value for the filter field +- `desired_state`: The desired state of the event -### Non-Python options +For example, if you're waiting for a specific connection to reach the 'completed' state, you would use: -Listening to webhooks the subscriber way (not long polling via HTTP) is not -limited to the Python example given above. In fact, all one needs is a -websocket-based RPC client. +- `field`: "connection_id" +- `field_id`: "" +- `desired_state`: "completed" -Here are two examples: - -```bash -websocat --exit-on-eof --text \ - ws://webhooks.cloudapi.127.0.0.1.nip.io/pubsub \ - exec:'{"request": {"method": "subscribe", "arguments": {"topics": ["proofs", "endorsements", "oob", "out_of_band", "connections", "basic-messages", "credentials"]}}}' -``` +The stream will remain open until the desired event (matching the filters) is found and returned, at which point the stream +will be closed or timeout after 60 seconds. -or +Valid topics include: -```bash -wscat -c ws://webhooks.cloudapi.127.0.0.1.nip.io \ - -x '{"request": {"method": "subscribe", "arguments": {"topics": ["proofs", "endorsements", "oob", "out_of_band", "connections", "basic-messages", "credentials"]}}}' \ - -w 99999 +```python +topics = Literal[ + "basic-messages", + "connections", + "credentials", + "credentials_indy", + "credentials_ld", + "endorsements", + "issuer_cred_rev", + "oob", + "problem_report", + "proofs", + "revocation", +] ``` -How this works is that either procedure instantiates a client connecting to the -websocket endpoint exposed via the webhooks container (_NOTE:_ You might have to -change the URI according to your setup of the webhooks relay). Both examples do -pretty much the same. However, [wscat](https://github.com/websockets/wscat) is -written in JavaScript whereas [websocat](https://github.com/vi/websocat) is -implemented in Rust. Both examples are given to illustrate that it really does -not matter what language one wishes to implement a listener in. After having -established a connection to the exposed endpoint, the `exec:` parameter and `-x` -flag mean execute. Execute, in this case, refers to sending the JSON payload to -the webhooks relay. It requests the endpoint to add the connection as a -subscriber to the topics array of the arguments key. You can pass any arguments -supported by the webhooks relay (see above). Passing an empty array under topics -means 'end the subscription'. By adding the `wallet_id` in the header is the way -to only receive hooks for a specific wallet. - -## Server Sent Events - -There are five different endpoints for listening to server-sent events (SSE). - -- `GET` `/v1/sse/{wallet_id}` -- `GET` `/v1/sse/{wallet_id}/{topic}` -- `GET` `/v1/sse/{wallet_id}/{topic}/{desired_state}` -- `GET` `/v1/sse/{wallet_id}/{topic}/{field}/{field_id}` -- `GET` `/v1/sse/{wallet_id}/{topic}/{field}/{field_id}/{desired_state}` +## Implementing Your Event Listener -Valid topics are same as noted above. +Here's an example of how to implement a SSE event listener using JavaScript: -The `field` and `field_id` in the endpoints above serve as filters, and refers to -any field in the webhook event payload (excluding `wallet_id` and `topic`). The -name of the field of interest is passed in `{field}`, and the corresponding ID -or text that one wants to wait for is passed as `{field_id}`. i.e., you can pass -`connection_id` as `{field}`, and the ID of the connection you want to see the -events for as `{field_id}`. This will return only webhook events that match the -specified filter. +```javascript +const EventSource = require('eventsource'); -The routes that specify a `{desired_state}` will keep the stream open until a -single webhook event matches the specified filters, and then return that event. -The endpoints that don't specify a `{desired_state}`, will keep the stream open -and return all events matching the topic and optional field/field_id filters. +const wallet_id = ''; +const url = `http://cloudapi.127.0.0.1.nip.io/tenant-admin/v1/sse/${wallet_id}/proofs/connections//done`; -Here is an example Javascript implementation +const headers = { + 'x-api-key': 'tenant.', +}; -```js -const EventSource = require('eventsource'); -url = "http://cloudapi.127.0.0.1.nip.io/tenant-admin/v1/sse/" +const eventSource = new EventSource(url, { headers }); -const headers = { - 'x-api-key':"tenant.", - }; -const eventSource = new EventSource(url,{headers}); // Event listener for incoming server-sent events eventSource.onmessage = (event) => { const eventData = JSON.parse(event.data); - // log event - console.log("EVENT ==> ", eventData) + console.log("EVENT ==> ", eventData); }; // Event listener for handling errors eventSource.onerror = (error) => { console.error('EventSource error:', error); - // You can add error handling logic here if needed + // Add your error handling logic here }; -console.log("<==============>") +console.log("Listening for events..."); ``` + +This script establishes a connection to the SSE endpoint and listens for incoming events. When an event is received, +it's logged to the console. Error handling is also implemented to manage any issues that may arise during the connection. + +>**_NOTE_**: Ensure that you replace `` with the actual wallet ID and use the appropriate `x-api-key` +for authentication. + +## Authentication + +The Waypoint service requires authentication to access the SSE endpoint. This is managed through the `x-api-key` header +in the request. The key should be in the format `tenant.`. Failing to provide valid authentication +will result in a 403 HTTP Error. + +A tenant will only be able to listen to events that belong to their wallet and a tenant-admin is able to listen to all events +belonging to their group. diff --git a/docs/examples/4. Create Connection with Issuer.md b/docs/examples/4. Create Connection with Issuer.md index 99a53b0a60..f4ae64d4ef 100644 --- a/docs/examples/4. Create Connection with Issuer.md +++ b/docs/examples/4. Create Connection with Issuer.md @@ -100,7 +100,7 @@ Response: } ``` -Both of the tenants can listen to `Webhook/SSE events` to track the progress of +Both of the tenants can listen to `SSE events` to track the progress of the connection being made. Once the `state` is `completed`, the connection is made. This can also be asserted by fetching connection records for the holder or issuer, and validating that their connection has transitioned to state: diff --git a/docs/examples/6. Create Connection with Verifier.md b/docs/examples/6. Create Connection with Verifier.md index 471c08e97f..0244484ec7 100644 --- a/docs/examples/6. Create Connection with Verifier.md +++ b/docs/examples/6. Create Connection with Verifier.md @@ -87,7 +87,7 @@ Response: } ``` -Listen to SSE/Webhooks until this connection is in `state`: `completed` +Listen to SSE until this connection is in `state`: `completed` ```json { diff --git a/docs/examples/7. Verify Credential.md b/docs/examples/7. Verify Credential.md index 0f78d8a7aa..c8419e0ba1 100644 --- a/docs/examples/7. Verify Credential.md +++ b/docs/examples/7. Verify Credential.md @@ -345,7 +345,7 @@ curl -X 'POST' \ If the proof request is valid, then the verification will complete automatically. Once again, we wait for the exchange -to be completed by listening on waypoint. Here is an example webhook event for the topic `proofs` in the `done` state. +to be completed by listening on SSE. Here is an example webhook event for the topic `proofs` in the `done` state. ```json {