Skip to content

Commit

Permalink
Support for processing forward messages (#1028)
Browse files Browse the repository at this point in the history
Support for processing forward messages (#1028)

Signed-off-by: Naian <[email protected]>
  • Loading branch information
nain-F49FF806 authored Oct 27, 2023
1 parent 9846060 commit 939ec0e
Show file tree
Hide file tree
Showing 14 changed files with 341 additions and 113 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion agents/rust/mediator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ mediation = { path = "./mediation" }
test_utils = { path = "../../../tools/test_utils" }

[dev-dependencies]
reqwest = { version = "0.11.20", features = ["blocking"] }
chrono = "0.4.31"
reqwest = { version = "0.11.20", features = ["blocking"] }
43 changes: 9 additions & 34 deletions agents/rust/mediator/mediation/src/didcomm_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,15 @@ pub struct ForwardMsg {
pub message_data: String,
}

// impl ForwardMsg {
// pub fn default_alice() -> ForwardMsg {
// ForwardMsg {
// _type: type_uri::FORWARD.to_owned(),
// recipient_key: "Alice".to_owned(),
// message: "Hello!".to_owned(),
// }
// }
// pub fn new(recipient_key: &str, message: &str) -> ForwardMsg {
// ForwardMsg {
// _type: type_uri::FORWARD.to_string(),
// recipient_key: recipient_key.to_string(),
// message: message.to_string(),
// }
// }
// }
impl ForwardMsg {
pub fn new(recipient_key: &str, message: &str) -> ForwardMsg {
ForwardMsg {
_type: type_uri::FORWARD.to_string(),
recipient_key: recipient_key.to_string(),
message_data: message.to_string(),
}
}
}

#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "@type")]
Expand Down Expand Up @@ -84,17 +77,6 @@ pub struct PickupStatusReqMsg {
pub recipient_key: Option<String>,
}

// impl PickupStatusReqMsg {
// pub fn new(recipient_key: Option<String>) -> PickupStatusReqMsg {
// PickupStatusReqMsg { recipient_key }
// }
// // pub fn custom_type(self, _type: String) -> PickupStatusReqMsg {
// // PickupStatusReqMsg {
// // recipient_key: self.recipient_key,
// // }
// // }
// }

#[skip_serializing_none]
#[derive(Serialize, Deserialize, Debug)]
pub struct PickupDeliveryReqMsg {
Expand Down Expand Up @@ -173,13 +155,6 @@ pub mod mediator_coord_structs {
},
}

// #[derive(Serialize, Deserialize, Debug)]
// pub struct MediateRequestData {
// #[serde(default)]
// pub auth_pubkey: String,
// pub did_doc: String,
// }

#[derive(Serialize, Deserialize, Debug)]
pub struct MediateDenyData {
pub reason: String,
Expand Down
20 changes: 8 additions & 12 deletions agents/rust/mediator/src/aries_agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use messages::{
use serde_json::json;

use self::transports::AriesTransport;
use crate::utils::{prelude::*, structs::VeriKey};
use crate::utils::{prelude::*, structs::VerKey};

#[cfg(any(test, feature = "client"))]
pub mod client;
Expand Down Expand Up @@ -78,7 +78,7 @@ impl AgentBuilder<IndySdkWallet> {

// Utils
impl<T: BaseWallet + 'static, P: MediatorPersistence> Agent<T, P> {
pub fn get_wallet_ref(&self) -> Arc<dyn BaseWallet> {
pub fn get_wallet_ref(&self) -> Arc<impl BaseWallet> {
self.wallet.clone()
}
pub fn get_persistence_ref(&self) -> Arc<impl MediatorPersistence> {
Expand Down Expand Up @@ -136,7 +136,7 @@ impl<T: BaseWallet + 'static, P: MediatorPersistence> Agent<T, P> {
pub async fn pack_didcomm(
&self,
message: &[u8],
our_vk: &VeriKey,
our_vk: &VerKey,
their_diddoc: &AriesDidDoc,
) -> Result<EncryptionEnvelope, String> {
EncryptionEnvelope::create(self.wallet.as_ref(), message, Some(our_vk), their_diddoc)
Expand All @@ -146,7 +146,7 @@ impl<T: BaseWallet + 'static, P: MediatorPersistence> Agent<T, P> {
pub async fn pack_and_send_didcomm(
&self,
message: &[u8],
our_vk: &VeriKey,
our_vk: &VerKey,
their_diddoc: &AriesDidDoc,
aries_transport: &mut impl AriesTransport,
) -> Result<(), String> {
Expand All @@ -163,14 +163,10 @@ impl<T: BaseWallet + 'static, P: MediatorPersistence> Agent<T, P> {
.map_err(string_from_std_error)
}

// pub async fn pack_message(&self, message: AriesMessage, recipient_vk: VeriKey, sender_vk:
// VeriKey) -> Value { todo!()
// }
/// Returns account details (account_name, our_signing_key, did_doc)
pub async fn auth_and_get_details(
&self,
sender_verkey: &Option<VeriKey>,
) -> Result<(String, VeriKey, AriesDidDoc), String> {
sender_verkey: &Option<VerKey>,
) -> Result<(String, VerKey, AriesDidDoc), String> {
let auth_pubkey = sender_verkey
.as_deref()
.ok_or("Anonymous sender can't be authenticated")?;
Expand Down Expand Up @@ -238,8 +234,8 @@ impl<T: BaseWallet + 'static, P: MediatorPersistence> Agent<T, P> {

pub async fn create_account(
&self,
their_vk: &VeriKey,
our_vk: &VeriKey,
their_vk: &VerKey,
our_vk: &VerKey,
did_doc: &AriesDidDoc,
) -> Result<(), String> {
self.persistence
Expand Down
6 changes: 3 additions & 3 deletions agents/rust/mediator/src/aries_agent/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ use messages::{
};
use uuid::Uuid;

use crate::utils::structs::VeriKey;
use crate::utils::structs::VerKey;

pub async fn build_response_content(
wallet: &impl BaseWallet,
thread_id: String,
old_recipient_vk: VeriKey,
old_recipient_vk: VerKey,
new_recipient_did: String,
new_recipient_vk: VeriKey,
new_recipient_vk: VerKey,
new_service_endpoint: url::Url,
new_routing_keys: Vec<String>,
) -> VcxResult<Response> {
Expand Down
17 changes: 17 additions & 0 deletions agents/rust/mediator/src/didcomm_handlers/forward.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use axum::{extract::State, Json};
use mediation::{didcomm_types::ForwardMsg, routes::forward::handle_forward};
use messages::msg_fields::protocols::routing::Forward;

use super::{utils::prelude::*, ArcAgent};

pub async fn handle_routing_forward(
agent: ArcAgent<impl BaseWallet + 'static, impl MediatorPersistence>,
forward: Forward,
) -> Result<(), String> {
info!("{:?}", forward);
let forward_msg_content_str = serde_json::to_string(&forward.content.msg).unwrap();
let forward_msg: ForwardMsg = ForwardMsg::new(&forward.content.to, &forward_msg_content_str);

let _ = handle_forward(State(agent.get_persistence_ref()), Json(forward_msg)).await;
Ok(())
}
9 changes: 8 additions & 1 deletion agents/rust/mediator/src/didcomm_handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@ use std::fmt::Debug;
use axum::{body::Bytes, extract::State, Json};
use messages::AriesMessage;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use serde_json::{json, Value};
use utils::prelude::*;

mod connection;
mod forward;
mod mediator_coord;
mod pickup;
mod utils;

use connection::handle_aries_connection;
use forward::handle_routing_forward;
use mediator_coord::handle_mediation_coord;
use pickup::handle_pickup_protocol;

Expand Down Expand Up @@ -39,6 +41,11 @@ pub async fn handle_aries<T: BaseWallet + 'static, P: MediatorPersistence>(
aries_message
{
handle_aries_connection(agent.clone(), conn).await?
} else if let GeneralAriesMessage::AriesVCXSupported(AriesMessage::Routing(forward)) =
aries_message
{
handle_routing_forward(agent.clone(), forward).await?;
return Ok(Json(json!({})));
} else {
let (account_name, our_signing_key, their_diddoc) =
agent.auth_and_get_details(&unpacked.sender_verkey).await?;
Expand Down
2 changes: 1 addition & 1 deletion agents/rust/mediator/src/utils/structs.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pub type VeriKey = String;
pub type VerKey = String;
82 changes: 82 additions & 0 deletions agents/rust/mediator/tests/common/agent_and_transport_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use std::collections::VecDeque;

use aries_vcx::protocols::connection::invitee::{states::completed::Completed, InviteeConnection};
use aries_vcx_core::wallet::base_wallet::BaseWallet;
use diddoc_legacy::aries::diddoc::AriesDidDoc;
use mediation::storage::MediatorPersistence;
use mediator::{
aries_agent::{
transports::{AriesReqwest, AriesTransport},
Agent,
},
utils::{structs::VerKey, GenericStringError},
};
use messages::msg_fields::protocols::out_of_band::invitation::Invitation as OOBInvitation;
use reqwest::header::ACCEPT;

use super::prelude::*;

const ENDPOINT_ROOT: &str = "http://localhost:8005";

pub async fn didcomm_connection(
agent: &Agent<impl BaseWallet + 'static, impl MediatorPersistence>,
aries_transport: &mut impl AriesTransport,
) -> Result<InviteeConnection<Completed>> {
let client = reqwest::Client::new();
let base: Url = ENDPOINT_ROOT.parse().unwrap();
let endpoint_register = base.join("register").unwrap();

let oobi: OOBInvitation = client
.get(endpoint_register)
.header(ACCEPT, "application/json")
.send()
.await?
.error_for_status()?
.json()
.await?;
info!("Got invitation from register endpoint {:?}", oobi);

let state: InviteeConnection<Completed> =
agent.establish_connection(oobi, aries_transport).await?;

Ok(state)
}

/// Returns agent, aries transport for agent, agent's verkey, and mediator's diddoc.
pub async fn gen_mediator_connected_agent() -> Result<(
Agent<impl BaseWallet + 'static, impl MediatorPersistence>,
impl AriesTransport,
VerKey,
AriesDidDoc,
)> {
let agent = mediator::aries_agent::AgentBuilder::new_demo_agent().await?;
let mut aries_transport = AriesReqwest {
response_queue: VecDeque::new(),
client: reqwest::Client::new(),
};
let completed_connection = didcomm_connection(&agent, &mut aries_transport).await?;
let our_verkey: VerKey = completed_connection.pairwise_info().pw_vk.clone();
let their_diddoc = completed_connection.their_did_doc().clone();
Ok((agent, aries_transport, our_verkey, their_diddoc))
}

/// Sends message over didcomm connection and returns unpacked response message
pub async fn send_message_and_pop_response_message(
message_bytes: &[u8],
agent: &Agent<impl BaseWallet + 'static, impl MediatorPersistence>,
aries_transport: &mut impl AriesTransport,
our_verkey: &VerKey,
their_diddoc: &AriesDidDoc,
) -> Result<String> {
agent
.pack_and_send_didcomm(message_bytes, our_verkey, their_diddoc, aries_transport)
.await
.map_err(|err| GenericStringError { msg: err })?;
// unpack
let response = aries_transport.pop_aries_envelope()?;
let unpacked_response = agent
.unpack_didcomm(&serde_json::to_vec(&response).unwrap())
.await
.unwrap();
Ok(unpacked_response.message)
}
12 changes: 9 additions & 3 deletions agents/rust/mediator/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
#![allow(dead_code)]
pub mod agent_and_transport_utils;

pub mod test_setup {
// inspired by
// https://stackoverflow.com/questions/58006033/how-to-run-setup-code-before-any-tests-run-in-rust
use std::sync::Once;

static INIT: Once = Once::new();
static INIT: std::sync::Once = std::sync::Once::new();
pub trait OneTimeInit {
// runs the initialization code if it hasn't been run yet, else does nothing
fn init(&self) {
Expand All @@ -14,6 +13,13 @@ pub mod test_setup {
// your custom initialization code goes here
fn one_time_setup_code(&self);
}

pub fn setup_env_logging() {
// default test setup code
let _ = dotenvy::dotenv();
let env = env_logger::Env::default().default_filter_or("info");
env_logger::init_from_env(env);
}
}

pub mod prelude {
Expand Down
22 changes: 5 additions & 17 deletions agents/rust/mediator/tests/mediator-aries-connection.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,19 @@
mod common;
use std::collections::VecDeque;

use common::{prelude::*, test_setup::OneTimeInit};
use mediator::aries_agent::transports::AriesReqwest;
use messages::msg_fields::protocols::out_of_band::invitation::Invitation as OOBInvitation;
use reqwest::header::ACCEPT;

const ENDPOINT_ROOT: &str = "http://localhost:8005";
use crate::common::{prelude::*, test_setup::setup_env_logging};

struct TestSetupAries;
impl OneTimeInit for TestSetupAries {
fn one_time_setup_code(&self) {
fn setup_logging() {
let env = env_logger::Env::default().default_filter_or("info");
env_logger::init_from_env(env);
}
fn load_dot_env() {
let _ = dotenvy::dotenv();
}
load_dot_env();
setup_logging();
}
}
static LOGGING_INIT: std::sync::Once = std::sync::Once::new();

const ENDPOINT_ROOT: &str = "http://localhost:8005";

#[tokio::test]
async fn didcomm_connection_succeeds() -> Result<()> {
TestSetupAries.init();
LOGGING_INIT.call_once(setup_env_logging);
let client = reqwest::Client::new();
let base: Url = ENDPOINT_ROOT.parse().unwrap();
let endpoint_register = base.join("register").unwrap();
Expand Down
Loading

0 comments on commit 939ec0e

Please sign in to comment.