From 5b29e0723af3212346b96bd0b6a3a07bf17e9cde Mon Sep 17 00:00:00 2001 From: roby2014 Date: Sat, 21 Sep 2024 13:37:02 +0100 Subject: [PATCH] feat: async serial --- mavlink-core/Cargo.toml | 3 +- .../src/async_connection/direct_serial.rs | 120 ++++++++++++++++++ mavlink-core/src/async_connection/mod.rs | 14 +- 3 files changed, 135 insertions(+), 2 deletions(-) create mode 100644 mavlink-core/src/async_connection/direct_serial.rs diff --git a/mavlink-core/Cargo.toml b/mavlink-core/Cargo.toml index 5bf97b903c..75f4f449f5 100644 --- a/mavlink-core/Cargo.toml +++ b/mavlink-core/Cargo.toml @@ -29,6 +29,7 @@ serial = { version = "0.4", optional = true } tokio = { version = "1.0", default-features = false, features = ["io-util", "net", "sync", "fs"], optional = true } sha2 = { version = "0.10", optional = true } async-trait = { version = "0.1.18", optional = true } +tokio-serial = { version = "5.4.4", default-features = false, optional = true } [features] "std" = ["byteorder/std"] @@ -41,7 +42,7 @@ async-trait = { version = "0.1.18", optional = true } "embedded" = ["dep:embedded-io", "dep:embedded-io-async"] "embedded-hal-02" = ["dep:nb", "dep:embedded-hal-02"] "serde" = ["dep:serde", "dep:serde_arrays"] -"tokio-1" = ["dep:tokio", "dep:async-trait"] +"tokio-1" = ["dep:tokio", "dep:async-trait", "dep:tokio-serial"] "signing" = ["dep:sha2"] default = ["std", "tcp", "udp", "direct-serial", "serde"] diff --git a/mavlink-core/src/async_connection/direct_serial.rs b/mavlink-core/src/async_connection/direct_serial.rs new file mode 100644 index 0000000000..d9b2fe1de2 --- /dev/null +++ b/mavlink-core/src/async_connection/direct_serial.rs @@ -0,0 +1,120 @@ +//! Async Serial MAVLINK connection + +use core::ops::DerefMut; +use std::io; + +use tokio::sync::Mutex; +use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialStream}; + +use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message}; + +#[cfg(not(feature = "signing"))] +use crate::{read_versioned_msg_async, write_versioned_msg_async}; +#[cfg(feature = "signing")] +use crate::{ + read_versioned_msg_async_signed, write_versioned_msg_async_signed, SigningConfig, SigningData, +}; + +use super::AsyncMavConnection; + +pub fn open(settings: &str) -> io::Result { + let settings_toks: Vec<&str> = settings.split(':').collect(); + if settings_toks.len() < 2 { + return Err(io::Error::new( + io::ErrorKind::AddrNotAvailable, + "Incomplete port settings", + )); + } + + let Ok(baud) = settings_toks[1].parse::() else { + return Err(io::Error::new( + io::ErrorKind::AddrNotAvailable, + "Invalid baud rate", + )); + }; + + let port_name = settings_toks[0]; + let mut port = tokio_serial::new(port_name, baud).open_native_async()?; + port.set_data_bits(tokio_serial::DataBits::Eight)?; + port.set_parity(tokio_serial::Parity::None)?; + port.set_stop_bits(tokio_serial::StopBits::One)?; + port.set_flow_control(tokio_serial::FlowControl::None)?; + + Ok(AsyncSerialConnection { + port: Mutex::new(AsyncPeekReader::new(port)), + sequence: Mutex::new(0), + protocol_version: MavlinkVersion::V2, + #[cfg(feature = "signing")] + signing_data: None, + }) +} + +pub struct AsyncSerialConnection { + port: Mutex>, + sequence: Mutex, + protocol_version: MavlinkVersion, + #[cfg(feature = "signing")] + signing_data: Option, +} + +#[async_trait::async_trait] +impl AsyncMavConnection for AsyncSerialConnection { + async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> { + let mut port = self.port.lock().await; + + #[cfg(not(feature = "signing"))] + let result = read_versioned_msg_async(port.deref_mut(), self.protocol_version).await; + #[cfg(feature = "signing")] + let result = read_versioned_msg_async_signed( + port.deref_mut(), + self.protocol_version, + self.signing_data.as_ref(), + ) + .await; + result + } + + async fn send( + &self, + header: &MavHeader, + data: &M, + ) -> Result { + let mut port = self.port.lock().await; + let mut sequence = self.sequence.lock().await; + + let header = MavHeader { + sequence: *sequence, + system_id: header.system_id, + component_id: header.component_id, + }; + + *sequence = sequence.wrapping_add(1); + + #[cfg(not(feature = "signing"))] + let result = + write_versioned_msg_async(port.reader_mut(), self.protocol_version, header, data).await; + #[cfg(feature = "signing")] + let result = write_versioned_msg_async_signed( + port.reader_mut(), + self.protocol_version, + header, + data, + self.signing_data.as_ref(), + ) + .await; + result + } + + fn set_protocol_version(&mut self, version: MavlinkVersion) { + self.protocol_version = version; + } + + fn get_protocol_version(&self) -> MavlinkVersion { + self.protocol_version + } + + #[cfg(feature = "signing")] + fn setup_signing(&mut self, signing_data: Option) { + self.signing_data = signing_data.map(SigningData::from_config) + } +} diff --git a/mavlink-core/src/async_connection/mod.rs b/mavlink-core/src/async_connection/mod.rs index 35e2fa0e78..b2eb5c6757 100644 --- a/mavlink-core/src/async_connection/mod.rs +++ b/mavlink-core/src/async_connection/mod.rs @@ -8,6 +8,9 @@ mod tcp; #[cfg(feature = "udp")] mod udp; +#[cfg(feature = "direct-serial")] +mod direct_serial; + mod file; #[cfg(feature = "signing")] @@ -70,9 +73,9 @@ pub trait AsyncMavConnection { /// * `udpin::` to create a UDP server, listening for incoming packets /// * `udpout::` to create a UDP client /// * `udpbcast::` to create a UDP broadcast +/// * `serial::` to create a serial connection /// * `file:` to extract file data /// -/// Serial is currently not supported for async connections, use [`crate::connect`] instead. /// The type of the connection is determined at runtime based on the address type, so the /// connection is returned as a trait object. pub async fn connect_async( @@ -101,6 +104,15 @@ pub async fn connect_async( { protocol_err } + } else if cfg!(feature = "direct-serial") && address.starts_with("serial") { + #[cfg(feature = "direct-serial")] + { + Ok(Box::new(direct_serial::open(&address["serial:".len()..])?)) + } + #[cfg(not(feature = "direct-serial"))] + { + protocol_err + } } else if address.starts_with("file") { Ok(Box::new(file::open(&address["file:".len()..]).await?)) } else {