Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

📝 Update Waypoint doc & SSE docstring #1118

Merged
merged 14 commits into from
Oct 21, 2024
24 changes: 19 additions & 5 deletions app/routes/sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,30 @@ async def get_sse_subscribe_event_with_field_and_state(
auth: AcaPyAuthVerified = Depends(acapy_auth_verified),
) -> StreamingResponse:
"""
Subscribe to SSE events wait for a desired state with a field filter.
---
***This endpoint can't be called on the swagger UI, as it requires a stream response.***

Wait for a desired state to be reached for some event for this wallet and topic,
filtering for payloads that contain `field:field_id`.

example: `/{wallet_id}/credentials/connection_id/some-uuid/done` will stream a credential exchange event on a
specific connection with state done.
The field and field ID pair must be present in the payload (other than state) for the event to be streamed.
The stream will be closed after the event is returned.

Parameters:
-----------
wallet_id: The ID of the wallet subscribing to the events.
topic: The topic to which the wallet is subscribing.
field: The field to which the wallet is subscribing.
field_id: The ID of the field subscribing to the events.
desired_state: The desired state to be reached.
wallet_id:
The ID of the wallet subscribing to the events.
topic:
The topic to which the wallet is subscribing.
field:
The field to which the wallet is subscribing.
field_id:
The ID of the field subscribing to the events.
desired_state:
The desired state to be reached.
"""
logger.bind(
body={
Expand Down
30 changes: 30 additions & 0 deletions docs/NATS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# NATS
cl0ete marked this conversation as resolved.
Show resolved Hide resolved

**NATS** is an open-source messaging system designed for high-performance, lightweight, and reliable communication between
distributed applications. It supports **pub-sub** (publish-subscribe), **request-reply**, and **message queue** patterns,
allowing for flexible communication between microservices, IoT devices, and cloud-native systems.

## Key Features

- Simple: Text-based protocol with straightforward publish-subscribe semantics
- Fast: Written in Go, capable of millions of messages per second
- Lightweight: Small footprint, minimal dependencies
- Cloud Native: Built for modern distributed systems

## Core Concepts

- Publishers: Send messages to subjects
- Subscribers: Receive messages from subjects
- Subjects: Named channels for message routing
- Queue Groups: Load balance messages across subscribers

## Message Patterns

- Publish/Subscribe: One-to-many message distribution
- Request/Reply: Synchronous communication
- Queue Groups: Load balanced message processing
- Stream Processing: Persistent message streams (via NATS Streaming/JetStream)

## Consuming our NATS

Please contact us for help with connecting/authenticating to our NATS service
213 changes: 68 additions & 145 deletions docs/Webhooks.md
Original file line number Diff line number Diff line change
@@ -1,179 +1,102 @@
# Webhooks

The webhooks container serves as both a relay and storage for the webhooks,
enabling hooks to be retrieved at a later time. Furthermore, the webhooks
container processes the webhooks in two distinct ways.

Firstly, the hooks are converted into a shared format, which is shared with the
CloudAPI. This standardization is beneficial as the information and its format
in both the hooks and the CloudAPI are the same. This consistency allows for
easier client implementation as it only needs to anticipate a single data
structure from two endpoints, even if it's using v1 or v2 protocols.

Secondly, the webhooks are stored by topic and wallet ID. Consequently, the hooks
can be retrieved in these two ways - per wallet or per wallet per topic (but not
per topic - a choice made to reduce the data footprint).

There are also two ways to retrieve webhook data. The first method is the
classic HTTP request, potentially used multiple times to create a polling system.
Find the HTTP endpoints via the
[webhooks Swagger UI](http://webhooks.cloudapi.127.0.0.1.nip.io/docs) and create
your own polling mechanism. Alternatively, you can subscribe via the PubSub
mechanism (underlying Websockets) at the
[/pubsub endpoint](http://webhooks.cloudapi.127.0.0.1.nip.io/pubsub) by
specifying one or more topics. A brief example of how to do this in Python using
the [`fastapi_websocket_pubsub`](https://github.com/permitio/fastapi_websocket_pubsub)
package can be found in `webhooks/clients.example.py`.

>**_NOTE_**: The webhooks container is **NOT** intended to be directly exposed
to the world wide web, especially not via pubsub/websocket as there is NO
authentication mechanism in place. Exposing the websocket will leave anyone on
the internet able to read any webhook.

Valid topics are:
# Webhooks/Event Consumption

```python
topics = Literal[
"basic-messages",
"connections",
"proofs",
"credentials",
"endorsements",
"oob",
"revocation",
"issuer_cred_rev",
"problem_report",
]
```

A client can subscribe to the webhooks via the CloudAPI (as opposed to directly
via the webhooks container). This requires only targeting the `/webhooks`
endpoint and optionally targeting the `/webhooks/{topic}` sub-route by suffixing
a topic. Using the authentication mechanism (see section below), the app
automatically extracts the required info about the wallet (i.e., the wallet id
and JWT) and retrieves only the associated webhooks with a particular wallet.
Failing to authenticate will return a 403 HTTP Error.

## Implementing your webhook listener

You can (assuming you are within the docker network) use a pubsub client (see
also `webhooks/clients.example.py`):

```python
from fastapi_websocket_pubsub import PubSubClient
import asyncio
## Primary Method: NATS

The recommended and most efficient way to consume events is through our NATS implementation. NATS provides a robust,
scalable solution for event streaming and processing. For detailed information about consuming events from our NATS implementation,
please refer to this document: [NATS](NATS.md).

async def on_events(data, topic):
print(f"{topic}:\n{data}")
## Alternative: Waypoint Service

The Waypoint service provides a specialized Server-Sent Events (SSE) endpoint for cases where you need to wait for a specific
event that you know is coming. This is particularly useful when you need to track the state change of a specific entity
and have all the necessary filter information.

async def main():
# Create a client and subscribe to topics
topics = ["connections", "credentials", "proofs", "endorsements", "basic-messages"]
client = PubSubClient([*topics], callback=on_events)
The Waypoint service exposes a single SSE endpoint, via the main app, for event streaming:

client.start_client(f"ws://webhooks.cloudapi.127.0.0.1.nip.io/pubsub")
await client.wait_until_done()
- `GET` `/v1/sse/{wallet_id}/{topic}/{field}/{field_id}/{desired_state}`

This endpoint is designed for targeted event retrieval where you can specify exact filters to wait for a particular event.

asyncio.run(main())
```
### How to use

All webhooks are by default logged to the container's stdout, which can be handy
for debugging or development. You can easily pipe them into a file. On a Unix
machine, you can use this command to follow the webhook logs:
The endpoint requires specific filter parameters:

```bash
kubectl logs -f $(kubectl get pods \
-l app.kubernetes.io/instance=governance-webhooks-web \
-o jsonpath="{.items[0].metadata.name}")
```
- `wallet_id`: Identifier for the specific wallet
- `topic`: The event topic to subscribe to
- `field`: Filter field from the event payload
- `field_id`: Specific value for the filter field
- `desired_state`: The desired state of the event

### Non-Python options
For example, if you're waiting for a specific connection to reach the 'completed' state, you would use:

Listening to webhooks the subscriber way (not long polling via HTTP) is not
limited to the Python example given above. In fact, all one needs is a
websocket-based RPC client.
- `field`: "connection_id"
- `field_id`: "<your_connection_id>"
- `desired_state`: "completed"

Here are two examples:

```bash
websocat --exit-on-eof --text \
ws://webhooks.cloudapi.127.0.0.1.nip.io/pubsub \
exec:'{"request": {"method": "subscribe", "arguments": {"topics": ["proofs", "endorsements", "oob", "out_of_band", "connections", "basic-messages", "credentials"]}}}'
```
The stream will remain open until the desired event (matching the filters) is found and returned, at which point the stream
will be closed or timeout after 60 seconds.
cl0ete marked this conversation as resolved.
Show resolved Hide resolved

or
Valid topics include:

```bash
wscat -c ws://webhooks.cloudapi.127.0.0.1.nip.io \
-x '{"request": {"method": "subscribe", "arguments": {"topics": ["proofs", "endorsements", "oob", "out_of_band", "connections", "basic-messages", "credentials"]}}}' \
-w 99999
```python
topics = Literal[
"basic-messages",
"connections",
"credentials",
"credentials_indy",
"credentials_ld",
"endorsements",
"issuer_cred_rev",
"oob",
"problem_report",
"proofs",
"revocation",
]
```

How this works is that either procedure instantiates a client connecting to the
websocket endpoint exposed via the webhooks container (_NOTE:_ You might have to
change the URI according to your setup of the webhooks relay). Both examples do
pretty much the same. However, [wscat](https://github.com/websockets/wscat) is
written in JavaScript whereas [websocat](https://github.com/vi/websocat) is
implemented in Rust. Both examples are given to illustrate that it really does
not matter what language one wishes to implement a listener in. After having
established a connection to the exposed endpoint, the `exec:` parameter and `-x`
flag mean execute. Execute, in this case, refers to sending the JSON payload to
the webhooks relay. It requests the endpoint to add the connection as a
subscriber to the topics array of the arguments key. You can pass any arguments
supported by the webhooks relay (see above). Passing an empty array under topics
means 'end the subscription'. By adding the `wallet_id` in the header is the way
to only receive hooks for a specific wallet.

## Server Sent Events

There are five different endpoints for listening to server-sent events (SSE).

- `GET` `/v1/sse/{wallet_id}`
- `GET` `/v1/sse/{wallet_id}/{topic}`
- `GET` `/v1/sse/{wallet_id}/{topic}/{desired_state}`
- `GET` `/v1/sse/{wallet_id}/{topic}/{field}/{field_id}`
- `GET` `/v1/sse/{wallet_id}/{topic}/{field}/{field_id}/{desired_state}`
## Implementing Your Event Listener

Valid topics are same as noted above.
Here's an example of how to implement a SSE event listener using JavaScript:

The `field` and `field_id` in the endpoints above serve as filters, and refers to
any field in the webhook event payload (excluding `wallet_id` and `topic`). The
name of the field of interest is passed in `{field}`, and the corresponding ID
or text that one wants to wait for is passed as `{field_id}`. i.e., you can pass
`connection_id` as `{field}`, and the ID of the connection you want to see the
events for as `{field_id}`. This will return only webhook events that match the
specified filter.
```javascript
const EventSource = require('eventsource');

The routes that specify a `{desired_state}` will keep the stream open until a
single webhook event matches the specified filters, and then return that event.
The endpoints that don't specify a `{desired_state}`, will keep the stream open
and return all events matching the topic and optional field/field_id filters.
const wallet_id = '<your_wallet_id>';
const url = `http://cloudapi.127.0.0.1.nip.io/tenant-admin/v1/sse/${wallet_id}/proofs/connections/<some_id>/done`;

Here is an example Javascript implementation
const headers = {
'x-api-key': 'tenant.<tenant/wallet_token>',
};

```js
const EventSource = require('eventsource');
url = "http://cloudapi.127.0.0.1.nip.io/tenant-admin/v1/sse/<wallet_id>"
const eventSource = new EventSource(url, { headers });

const headers = {
'x-api-key':"tenant.<tenant/wallet_token>",
};
const eventSource = new EventSource(url,{headers});
// Event listener for incoming server-sent events
eventSource.onmessage = (event) => {
const eventData = JSON.parse(event.data);
// log event
console.log("EVENT ==> ", eventData)
console.log("EVENT ==> ", eventData);
};

// Event listener for handling errors
eventSource.onerror = (error) => {
console.error('EventSource error:', error);
// You can add error handling logic here if needed
// Add your error handling logic here
};

console.log("<==============>")
console.log("Listening for events...");
```

This script establishes a connection to the SSE endpoint and listens for incoming events. When an event is received,
it's logged to the console. Error handling is also implemented to manage any issues that may arise during the connection.

>**_NOTE_**: Ensure that you replace `<your_wallet_id>` with the actual wallet ID and use the appropriate `x-api-key`
for authentication.

## Authentication

The Waypoint service requires authentication to access the SSE endpoint. This is managed through the `x-api-key` header
in the request. The key should be in the format `tenant.<tenant/wallet_token>`. Failing to provide valid authentication
will result in a 403 HTTP Error.

cl0ete marked this conversation as resolved.
Show resolved Hide resolved
A tenant will only be able to listen to events that belong to their wallet and a tenant-admin is able to listen to all events
belonging to their group.
2 changes: 1 addition & 1 deletion docs/examples/4. Create Connection with Issuer.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ Response:
}
```

Both of the tenants can listen to `Webhook/SSE events` to track the progress of
Both of the tenants can listen to `SSE events` to track the progress of
the connection being made. Once the `state` is `completed`, the connection is
made. This can also be asserted by fetching connection records for the holder or
issuer, and validating that their connection has transitioned to state:
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/6. Create Connection with Verifier.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ Response:
}
```

Listen to SSE/Webhooks until this connection is in `state`: `completed`
Listen to SSE until this connection is in `state`: `completed`

```json
{
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/7. Verify Credential.md
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ curl -X 'POST' \
</details>

If the proof request is valid, then the verification will complete automatically. Once again, we wait for the exchange
to be completed by listening on waypoint. Here is an example webhook event for the topic `proofs` in the `done` state.
to be completed by listening on SSE. Here is an example webhook event for the topic `proofs` in the `done` state.

```json
{
Expand Down
Loading