-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
📝 Update Waypoint doc & SSE docstring (#1118)
* rename webhooks doc and updated for waypoint service * change webhook to sse * update sse endpoint docstrings * minor updates * explain stream fill close when event found * tweaks to docstrings * Rename back to webhooks and refer to Nats * add empty nats file for now * minor edits * sort nats summary * sort list * reword and add section * minor edits
- Loading branch information
Showing
6 changed files
with
120 additions
and
153 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
# NATS | ||
|
||
**NATS** is an open-source messaging system designed for high-performance, lightweight, and reliable communication between | ||
distributed applications. It supports **pub-sub** (publish-subscribe), **request-reply**, and **message queue** patterns, | ||
allowing for flexible communication between microservices, IoT devices, and cloud-native systems. | ||
|
||
## Key Features | ||
|
||
- Simple: Text-based protocol with straightforward publish-subscribe semantics | ||
- Fast: Written in Go, capable of millions of messages per second | ||
- Lightweight: Small footprint, minimal dependencies | ||
- Cloud Native: Built for modern distributed systems | ||
|
||
## Core Concepts | ||
|
||
- Publishers: Send messages to subjects | ||
- Subscribers: Receive messages from subjects | ||
- Subjects: Named channels for message routing | ||
- Queue Groups: Load balance messages across subscribers | ||
|
||
## Message Patterns | ||
|
||
- Publish/Subscribe: One-to-many message distribution | ||
- Request/Reply: Synchronous communication | ||
- Queue Groups: Load balanced message processing | ||
- Stream Processing: Persistent message streams (via NATS Streaming/JetStream) | ||
|
||
## Consuming our NATS | ||
|
||
Please contact us for help with connecting/authenticating to our NATS service |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,179 +1,102 @@ | ||
# Webhooks | ||
|
||
The webhooks container serves as both a relay and storage for the webhooks, | ||
enabling hooks to be retrieved at a later time. Furthermore, the webhooks | ||
container processes the webhooks in two distinct ways. | ||
|
||
Firstly, the hooks are converted into a shared format, which is shared with the | ||
CloudAPI. This standardization is beneficial as the information and its format | ||
in both the hooks and the CloudAPI are the same. This consistency allows for | ||
easier client implementation as it only needs to anticipate a single data | ||
structure from two endpoints, even if it's using v1 or v2 protocols. | ||
|
||
Secondly, the webhooks are stored by topic and wallet ID. Consequently, the hooks | ||
can be retrieved in these two ways - per wallet or per wallet per topic (but not | ||
per topic - a choice made to reduce the data footprint). | ||
|
||
There are also two ways to retrieve webhook data. The first method is the | ||
classic HTTP request, potentially used multiple times to create a polling system. | ||
Find the HTTP endpoints via the | ||
[webhooks Swagger UI](http://webhooks.cloudapi.127.0.0.1.nip.io/docs) and create | ||
your own polling mechanism. Alternatively, you can subscribe via the PubSub | ||
mechanism (underlying Websockets) at the | ||
[/pubsub endpoint](http://webhooks.cloudapi.127.0.0.1.nip.io/pubsub) by | ||
specifying one or more topics. A brief example of how to do this in Python using | ||
the [`fastapi_websocket_pubsub`](https://github.com/permitio/fastapi_websocket_pubsub) | ||
package can be found in `webhooks/clients.example.py`. | ||
|
||
>**_NOTE_**: The webhooks container is **NOT** intended to be directly exposed | ||
to the world wide web, especially not via pubsub/websocket as there is NO | ||
authentication mechanism in place. Exposing the websocket will leave anyone on | ||
the internet able to read any webhook. | ||
|
||
Valid topics are: | ||
# Webhooks/Event Consumption | ||
|
||
```python | ||
topics = Literal[ | ||
"basic-messages", | ||
"connections", | ||
"proofs", | ||
"credentials", | ||
"endorsements", | ||
"oob", | ||
"revocation", | ||
"issuer_cred_rev", | ||
"problem_report", | ||
] | ||
``` | ||
|
||
A client can subscribe to the webhooks via the CloudAPI (as opposed to directly | ||
via the webhooks container). This requires only targeting the `/webhooks` | ||
endpoint and optionally targeting the `/webhooks/{topic}` sub-route by suffixing | ||
a topic. Using the authentication mechanism (see section below), the app | ||
automatically extracts the required info about the wallet (i.e., the wallet id | ||
and JWT) and retrieves only the associated webhooks with a particular wallet. | ||
Failing to authenticate will return a 403 HTTP Error. | ||
|
||
## Implementing your webhook listener | ||
|
||
You can (assuming you are within the docker network) use a pubsub client (see | ||
also `webhooks/clients.example.py`): | ||
|
||
```python | ||
from fastapi_websocket_pubsub import PubSubClient | ||
import asyncio | ||
## Primary Method: NATS | ||
|
||
The recommended and most efficient way to consume events is through our NATS implementation. NATS provides a robust, | ||
scalable solution for event streaming and processing. For detailed information about consuming events from our NATS implementation, | ||
please refer to this document: [NATS](NATS.md). | ||
|
||
async def on_events(data, topic): | ||
print(f"{topic}:\n{data}") | ||
## Alternative: Waypoint Service | ||
|
||
The Waypoint service provides a specialized Server-Sent Events (SSE) endpoint for cases where you need to wait for a specific | ||
event that you know is coming. This is particularly useful when you need to track the state change of a specific entity | ||
and have all the necessary filter information. | ||
|
||
async def main(): | ||
# Create a client and subscribe to topics | ||
topics = ["connections", "credentials", "proofs", "endorsements", "basic-messages"] | ||
client = PubSubClient([*topics], callback=on_events) | ||
The Waypoint service exposes a single SSE endpoint, via the main app, for event streaming: | ||
|
||
client.start_client(f"ws://webhooks.cloudapi.127.0.0.1.nip.io/pubsub") | ||
await client.wait_until_done() | ||
- `GET` `/v1/sse/{wallet_id}/{topic}/{field}/{field_id}/{desired_state}` | ||
|
||
This endpoint is designed for targeted event retrieval where you can specify exact filters to wait for a particular event. | ||
|
||
asyncio.run(main()) | ||
``` | ||
### How to use | ||
|
||
All webhooks are by default logged to the container's stdout, which can be handy | ||
for debugging or development. You can easily pipe them into a file. On a Unix | ||
machine, you can use this command to follow the webhook logs: | ||
The endpoint requires specific filter parameters: | ||
|
||
```bash | ||
kubectl logs -f $(kubectl get pods \ | ||
-l app.kubernetes.io/instance=governance-webhooks-web \ | ||
-o jsonpath="{.items[0].metadata.name}") | ||
``` | ||
- `wallet_id`: Identifier for the specific wallet | ||
- `topic`: The event topic to subscribe to | ||
- `field`: Filter field from the event payload | ||
- `field_id`: Specific value for the filter field | ||
- `desired_state`: The desired state of the event | ||
|
||
### Non-Python options | ||
For example, if you're waiting for a specific connection to reach the 'completed' state, you would use: | ||
|
||
Listening to webhooks the subscriber way (not long polling via HTTP) is not | ||
limited to the Python example given above. In fact, all one needs is a | ||
websocket-based RPC client. | ||
- `field`: "connection_id" | ||
- `field_id`: "<your_connection_id>" | ||
- `desired_state`: "completed" | ||
|
||
Here are two examples: | ||
|
||
```bash | ||
websocat --exit-on-eof --text \ | ||
ws://webhooks.cloudapi.127.0.0.1.nip.io/pubsub \ | ||
exec:'{"request": {"method": "subscribe", "arguments": {"topics": ["proofs", "endorsements", "oob", "out_of_band", "connections", "basic-messages", "credentials"]}}}' | ||
``` | ||
The stream will remain open until the desired event (matching the filters) is found and returned, at which point the stream | ||
will be closed or timeout after 60 seconds. | ||
|
||
or | ||
Valid topics include: | ||
|
||
```bash | ||
wscat -c ws://webhooks.cloudapi.127.0.0.1.nip.io \ | ||
-x '{"request": {"method": "subscribe", "arguments": {"topics": ["proofs", "endorsements", "oob", "out_of_band", "connections", "basic-messages", "credentials"]}}}' \ | ||
-w 99999 | ||
```python | ||
topics = Literal[ | ||
"basic-messages", | ||
"connections", | ||
"credentials", | ||
"credentials_indy", | ||
"credentials_ld", | ||
"endorsements", | ||
"issuer_cred_rev", | ||
"oob", | ||
"problem_report", | ||
"proofs", | ||
"revocation", | ||
] | ||
``` | ||
|
||
How this works is that either procedure instantiates a client connecting to the | ||
websocket endpoint exposed via the webhooks container (_NOTE:_ You might have to | ||
change the URI according to your setup of the webhooks relay). Both examples do | ||
pretty much the same. However, [wscat](https://github.com/websockets/wscat) is | ||
written in JavaScript whereas [websocat](https://github.com/vi/websocat) is | ||
implemented in Rust. Both examples are given to illustrate that it really does | ||
not matter what language one wishes to implement a listener in. After having | ||
established a connection to the exposed endpoint, the `exec:` parameter and `-x` | ||
flag mean execute. Execute, in this case, refers to sending the JSON payload to | ||
the webhooks relay. It requests the endpoint to add the connection as a | ||
subscriber to the topics array of the arguments key. You can pass any arguments | ||
supported by the webhooks relay (see above). Passing an empty array under topics | ||
means 'end the subscription'. By adding the `wallet_id` in the header is the way | ||
to only receive hooks for a specific wallet. | ||
|
||
## Server Sent Events | ||
|
||
There are five different endpoints for listening to server-sent events (SSE). | ||
|
||
- `GET` `/v1/sse/{wallet_id}` | ||
- `GET` `/v1/sse/{wallet_id}/{topic}` | ||
- `GET` `/v1/sse/{wallet_id}/{topic}/{desired_state}` | ||
- `GET` `/v1/sse/{wallet_id}/{topic}/{field}/{field_id}` | ||
- `GET` `/v1/sse/{wallet_id}/{topic}/{field}/{field_id}/{desired_state}` | ||
## Implementing Your Event Listener | ||
|
||
Valid topics are same as noted above. | ||
Here's an example of how to implement a SSE event listener using JavaScript: | ||
|
||
The `field` and `field_id` in the endpoints above serve as filters, and refers to | ||
any field in the webhook event payload (excluding `wallet_id` and `topic`). The | ||
name of the field of interest is passed in `{field}`, and the corresponding ID | ||
or text that one wants to wait for is passed as `{field_id}`. i.e., you can pass | ||
`connection_id` as `{field}`, and the ID of the connection you want to see the | ||
events for as `{field_id}`. This will return only webhook events that match the | ||
specified filter. | ||
```javascript | ||
const EventSource = require('eventsource'); | ||
|
||
The routes that specify a `{desired_state}` will keep the stream open until a | ||
single webhook event matches the specified filters, and then return that event. | ||
The endpoints that don't specify a `{desired_state}`, will keep the stream open | ||
and return all events matching the topic and optional field/field_id filters. | ||
const wallet_id = '<your_wallet_id>'; | ||
const url = `http://cloudapi.127.0.0.1.nip.io/tenant-admin/v1/sse/${wallet_id}/proofs/connections/<some_id>/done`; | ||
|
||
Here is an example Javascript implementation | ||
const headers = { | ||
'x-api-key': 'tenant.<tenant/wallet_token>', | ||
}; | ||
|
||
```js | ||
const EventSource = require('eventsource'); | ||
url = "http://cloudapi.127.0.0.1.nip.io/tenant-admin/v1/sse/<wallet_id>" | ||
const eventSource = new EventSource(url, { headers }); | ||
|
||
const headers = { | ||
'x-api-key':"tenant.<tenant/wallet_token>", | ||
}; | ||
const eventSource = new EventSource(url,{headers}); | ||
// Event listener for incoming server-sent events | ||
eventSource.onmessage = (event) => { | ||
const eventData = JSON.parse(event.data); | ||
// log event | ||
console.log("EVENT ==> ", eventData) | ||
console.log("EVENT ==> ", eventData); | ||
}; | ||
|
||
// Event listener for handling errors | ||
eventSource.onerror = (error) => { | ||
console.error('EventSource error:', error); | ||
// You can add error handling logic here if needed | ||
// Add your error handling logic here | ||
}; | ||
|
||
console.log("<==============>") | ||
console.log("Listening for events..."); | ||
``` | ||
|
||
This script establishes a connection to the SSE endpoint and listens for incoming events. When an event is received, | ||
it's logged to the console. Error handling is also implemented to manage any issues that may arise during the connection. | ||
|
||
>**_NOTE_**: Ensure that you replace `<your_wallet_id>` with the actual wallet ID and use the appropriate `x-api-key` | ||
for authentication. | ||
|
||
## Authentication | ||
|
||
The Waypoint service requires authentication to access the SSE endpoint. This is managed through the `x-api-key` header | ||
in the request. The key should be in the format `tenant.<tenant/wallet_token>`. Failing to provide valid authentication | ||
will result in a 403 HTTP Error. | ||
|
||
A tenant will only be able to listen to events that belong to their wallet and a tenant-admin is able to listen to all events | ||
belonging to their group. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters