Skip to content

Commit 1accd58

Browse files
ff137henrymsiska
andauthored
✨ Re-add NATS Plugin after reset to upstream/main (#813)
* 🎉 NATS Plugin 🚧 (#666) * ⬆️ Update lock files Signed-off-by: ff137 <[email protected]> * 🚀 take on NATS plugin Copied from redis_events plugin, and replaced redis with nats-py (and json with orjson) Signed-off-by: ff137 <[email protected]> * ✅ basic unit tests for events Signed-off-by: ff137 <[email protected]> * 👷 skip nats_events integration testing Signed-off-by: ff137 <[email protected]> * ✨ read `NATS_CREDS_FILE` from env vars Signed-off-by: ff137 <[email protected]> * Update nats_events/README.md Co-authored-by: Henry Msiska <[email protected]> Signed-off-by: Mourits de Beer <[email protected]> * 🎨 rename nats topic to subject Signed-off-by: ff137 <[email protected]> * 🎨 replace f-string interpolation in logs Signed-off-by: ff137 <[email protected]> * ✨ use JetStream. Define streams on startup Signed-off-by: ff137 <[email protected]> * ✨ bind and re-use JetStream context Signed-off-by: ff137 <[email protected]> * ✨ handle jetstream publish ack Signed-off-by: ff137 <[email protected]> * 🎨 remove use of `cast` Signed-off-by: ff137 <[email protected]> * 🎨 more clear setup of jetstream context, instead of just nats Signed-off-by: ff137 <[email protected]> * ✅ fix tests Signed-off-by: ff137 <[email protected]> * 🎨 update NATS subject map to use `.`-pattern Signed-off-by: ff137 <[email protected]> * Update nats_events/README.md Co-authored-by: Henry Msiska <[email protected]> Signed-off-by: Mourits de Beer <[email protected]> * 🙈 don't ignore vscode settings Signed-off-by: ff137 <[email protected]> * 🔧 vscode cspell settings Signed-off-by: ff137 <[email protected]> * 🎨 Signed-off-by: ff137 <[email protected]> * 🚧 add some verbose logging for testing Signed-off-by: ff137 <[email protected]> * 🎨 update method for nats connection args Signed-off-by: ff137 <[email protected]> * 🚧 debug timeout error for defining stream, change stream name Signed-off-by: ff137 <[email protected]> * 🚧 debug on_startup connecting to jetstream Signed-off-by: ff137 <[email protected]> * 🚧 test replacing . in name with _ Signed-off-by: ff137 <[email protected]> * ✨ publish with retry -- correctly handle PubAck Signed-off-by: ff137 <[email protected]> * 🎨 log payload and no longer only write `with-state` records Signed-off-by: ff137 <[email protected]> * 🎨 fix payload log type Signed-off-by: ff137 <[email protected]> * 🎨 log levels Signed-off-by: ff137 <[email protected]> * ✅ fix tests Signed-off-by: ff137 <[email protected]> * ✨ handle OutboundMessage type (make it serialisable) Signed-off-by: ff137 <[email protected]> * 🎨 retry should log error instead of raising exception Signed-off-by: ff137 <[email protected]> * 🎨 handle optional target types Signed-off-by: ff137 <[email protected]> * ✅ fix tests Signed-off-by: ff137 <[email protected]> * 🎨 add words to cspell list Signed-off-by: ff137 <[email protected]> * 🚧 remove define stream Signed-off-by: ff137 <[email protected]> * 🎨 define one stream with many subjects Signed-off-by: ff137 <[email protected]> * ⬆️ Update lock files Signed-off-by: ff137 <[email protected]> * ⬆️ Update lock files Signed-off-by: ff137 <[email protected]> --------- Signed-off-by: ff137 <[email protected]> Signed-off-by: Mourits de Beer <[email protected]> Co-authored-by: Henry Msiska <[email protected]> Signed-off-by: ff137 <[email protected]> * 🎨 nats_events: update log levels (#733) * 🎨 Update log levels Signed-off-by: ff137 <[email protected]> * ⬆️ Upgrade uvicorn Signed-off-by: ff137 <[email protected]> * ⬆️ Upgrade fastapi-slim Signed-off-by: ff137 <[email protected]> * ⬆️ Upgrade pytest-asyncio Signed-off-by: ff137 <[email protected]> * ⬆️ Update lock files Signed-off-by: ff137 <[email protected]> * 🐛 trace -> debug Signed-off-by: ff137 <[email protected]> --------- Signed-off-by: ff137 <[email protected]> * Skip-nats-outbound-messages (#734) * ⚡ skip outbound message types, temporarily for performance reasons Signed-off-by: ff137 <[email protected]> * ⬆️ Update lock files Signed-off-by: ff137 <[email protected]> --------- Signed-off-by: ff137 <[email protected]> * ⬆️ Update dependencies Signed-off-by: ff137 <[email protected]> * ⬆️ Update lock file Signed-off-by: ff137 <[email protected]> * ⏪ Revert changes to other lock files Signed-off-by: ff137 <[email protected]> --------- Signed-off-by: ff137 <[email protected]> Signed-off-by: Mourits de Beer <[email protected]> Co-authored-by: Henry Msiska <[email protected]>
1 parent a898307 commit 1accd58

File tree

12 files changed

+4007
-1
lines changed

12 files changed

+4007
-1
lines changed

.github/dependabot.yml

+12
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,18 @@ updates:
169169
- dependency-name: "*"
170170
update-types: ["version-update:semver-major"]
171171

172+
# Maintain dependencies for Python Packages
173+
- package-ecosystem: "pip"
174+
directory: "/nats_events"
175+
schedule:
176+
interval: "weekly"
177+
day: "monday"
178+
time: "04:00"
179+
timezone: "Canada/Pacific"
180+
ignore:
181+
- dependency-name: "*"
182+
update-types: ["version-update:semver-major"]
183+
172184
# Maintain dependencies for Python Packages
173185
- package-ecosystem: "pip"
174186
directory: "/oid4vci/integration/afj_runner"

.github/workflows/pr-integration-tests.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ jobs:
4343
declare -a changed_dirs=()
4444
for dir in ./*/; do
4545
current_folder=$(basename "$dir")
46-
if [[ $current_folder == "plugin_globals" ]]; then
46+
if [[ $current_folder == "plugin_globals" || [[ $current_folder == "nats_events" ]]; then
4747
continue
4848
fi
4949
for changed_file in ${{ steps.changed-files.outputs.all_changed_files }}; do

.vscode/settings.json

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{
2+
"cSpell.words": [
3+
"acapy",
4+
"actionmenu",
5+
"aiohttp",
6+
"basicmessage",
7+
"basicmessages",
8+
"cloudagent",
9+
"CREDS",
10+
"crids",
11+
"devcontainer",
12+
"didcomm",
13+
"jetstream",
14+
"keylist",
15+
"linedata",
16+
"Pydantic",
17+
"resp",
18+
"verkey"
19+
]
20+
}

nats_events/README.md

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# NATS Events Plugin
2+
3+
## Overview
4+
5+
Welcome to the NATS Events Plugin! This plugin is designed to facilitate event handling using NATS as the messaging system. It is currently under active development, and we are working hard to implement all the necessary features and ensure robust functionality.
6+
7+
## Status
8+
9+
**Under Construction** 🚧
10+
11+
Please note that this plugin is still in the early stages of development. The following components are yet to be implemented:
12+
13+
- **Inbound Transporters**: Mechanisms to receive and process incoming messages.
14+
- **Outbound Transporters**: Mechanisms to send messages to external systems.
15+
- **Documentation**: Comprehensive documentation to guide users on setup, configuration, and usage.
16+
- **Testing**: Thorough testing to ensure reliability and performance.
17+
18+
## Features
19+
20+
- **Event Handling**: Push events to NATS subjects.
21+
- **Dynamic Topic Mapping**: Map event patterns to NATS subjects (analogous to topics in other systems systems like Kafka) using templates.
22+
- **Metadata Enrichment**: Add metadata to events before publishing.

nats_events/nats_events/definition.py

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
"""Version definitions for this plugin."""
2+
3+
versions = [
4+
{
5+
"major_version": 1,
6+
"minimum_minor_version": 0,
7+
"current_minor_version": 0,
8+
"path": "v1_0",
9+
}
10+
]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
"""ACA-Py Over NATS."""
2+
3+
import logging
4+
5+
from .config import get_config
6+
7+
LOGGER = logging.getLogger(__name__)
8+
9+
__all__ = ["get_config"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
"""NATS Queue configuration."""
2+
3+
import logging
4+
from typing import Any, Mapping, Optional
5+
6+
from pydantic import BaseModel
7+
8+
LOGGER = logging.getLogger(__name__)
9+
10+
EVENT_TOPIC_MAP = {
11+
"^acapy::webhook::(.*)$": "acapy.webhook.$wallet_id",
12+
"^acapy::record::([^:]*)::([^:]*)$": "acapy.record-with-state.$wallet_id",
13+
"^acapy::record::([^:])?": "acapy.record.$wallet_id",
14+
"acapy::basicmessage::received": "acapy.basicmessage.received",
15+
"acapy::problem_report": "acapy.problem_report",
16+
"acapy::ping::received": "acapy.ping.received",
17+
"acapy::ping::response_received": "acapy.ping.response_received",
18+
"acapy::actionmenu::received": "acapy.actionmenu.received",
19+
"acapy::actionmenu::get-active-menu": "acapy.actionmenu.get-active-menu",
20+
"acapy::actionmenu::perform-menu-action": "acapy.actionmenu.perform-menu-action",
21+
"acapy::keylist::updated": "acapy.keylist.updated",
22+
"acapy::revocation-notification::received": "acapy.revocation-notification.received",
23+
"acapy::revocation-notification-v2::received": "acapy.revocation-notification-v2.received", # noqa: E501
24+
"acapy::forward::received": "acapy.forward.received",
25+
"acapy::outbound-message::queued_for_delivery": "acapy.outbound-message.queued-for-delivery", # noqa: E501
26+
}
27+
28+
EVENT_WEBHOOK_TOPIC_MAP = {
29+
"acapy::basicmessage::received": "basicmessages",
30+
"acapy::problem_report": "problem_report",
31+
"acapy::ping::received": "ping",
32+
"acapy::ping::response_received": "ping",
33+
"acapy::actionmenu::received": "actionmenu",
34+
"acapy::actionmenu::get-active-menu": "get-active-menu",
35+
"acapy::actionmenu::perform-menu-action": "perform-menu-action",
36+
"acapy::keylist::updated": "keylist",
37+
}
38+
39+
40+
def _alias_generator(key: str) -> str:
41+
return key.replace("_", "-")
42+
43+
44+
class ConnectionConfig(BaseModel):
45+
"""Connection configuration model."""
46+
47+
connection_url: str
48+
49+
class Config:
50+
"""Pydantic config."""
51+
52+
alias_generator = _alias_generator
53+
populate_by_name = True
54+
55+
@classmethod
56+
def default(cls):
57+
"""Default connection configuration."""
58+
return cls(connection_url="nats://default:test1234@localhost:4222")
59+
60+
61+
class EventConfig(BaseModel):
62+
"""Event configuration model."""
63+
64+
event_topic_maps: Mapping[str, str] = EVENT_TOPIC_MAP
65+
event_webhook_topic_maps: Mapping[str, str] = EVENT_WEBHOOK_TOPIC_MAP
66+
deliver_webhook: bool = True
67+
68+
class Config:
69+
"""Pydantic config."""
70+
71+
alias_generator = _alias_generator
72+
populate_by_name = True
73+
74+
@classmethod
75+
def default(cls):
76+
"""Default event configuration."""
77+
return cls(
78+
event_topic_maps=EVENT_TOPIC_MAP,
79+
event_webhook_topic_maps=EVENT_WEBHOOK_TOPIC_MAP,
80+
deliver_webhook=True,
81+
)
82+
83+
84+
class InboundConfig(BaseModel):
85+
"""Inbound configuration model."""
86+
87+
acapy_inbound_topic: str = "acapy_inbound"
88+
acapy_direct_resp_topic: str = "acapy_inbound_direct_resp"
89+
90+
class Config:
91+
"""Pydantic config."""
92+
93+
alias_generator = _alias_generator
94+
populate_by_name = True
95+
96+
@classmethod
97+
def default(cls):
98+
"""Default inbound configuration."""
99+
return cls(
100+
acapy_inbound_topic="acapy_inbound",
101+
acapy_direct_resp_topic="acapy_inbound_direct_resp",
102+
)
103+
104+
105+
class OutboundConfig(BaseModel):
106+
"""Outbound configuration model."""
107+
108+
acapy_outbound_topic: str = "acapy_outbound"
109+
mediator_mode: bool = False
110+
111+
@classmethod
112+
def default(cls):
113+
"""Default outbound configuration."""
114+
return cls(
115+
acapy_outbound_topic="acapy_outbound",
116+
mediator_mode=False,
117+
)
118+
119+
120+
class NATSConfig(BaseModel):
121+
"""NATS configuration model."""
122+
123+
event: Optional[EventConfig] = EventConfig.default()
124+
inbound: Optional[InboundConfig] = InboundConfig.default()
125+
outbound: Optional[OutboundConfig] = OutboundConfig.default()
126+
connection: ConnectionConfig
127+
128+
@classmethod
129+
def default(cls):
130+
"""Default NATS configuration."""
131+
return cls(
132+
event=EventConfig.default(),
133+
inbound=InboundConfig.default(),
134+
outbound=OutboundConfig.default(),
135+
connection=ConnectionConfig.default(),
136+
)
137+
138+
139+
def process_config_dict(config_dict: dict) -> dict:
140+
"""Add connection to inbound, outbound, event and return updated config."""
141+
filter = ["inbound", "event", "outbound", "connection"]
142+
for key, value in config_dict.items():
143+
if key in filter:
144+
config_dict[key] = value
145+
return config_dict
146+
147+
148+
def get_config(settings: Mapping[str, Any]) -> NATSConfig:
149+
"""Retrieve producer configuration from settings."""
150+
try:
151+
LOGGER.debug("Constructing config from: %s", settings.get("plugin_config"))
152+
config_dict = settings["plugin_config"].get("nats_queue", {})
153+
LOGGER.debug("Retrieved: %s", config_dict)
154+
config_dict = process_config_dict(config_dict)
155+
config = NATSConfig(**config_dict)
156+
except KeyError:
157+
LOGGER.warning("Using default configuration")
158+
config = NATSConfig.default()
159+
160+
LOGGER.debug("Returning config: %s", config.model_dump_json(indent=2))
161+
LOGGER.debug(
162+
"Returning config(aliases): %s", config.model_dump_json(by_alias=True, indent=2)
163+
)
164+
return config

0 commit comments

Comments
 (0)