From d79e392b86c79d73a91b7fe15b10a535d1110e9c Mon Sep 17 00:00:00 2001 From: aawsome <37850842+aawsome@users.noreply.github.com> Date: Sat, 23 Nov 2024 17:07:33 +0100 Subject: [PATCH] fix(filesystem): Use channels to communicate within webdav filesystem (#1361) Solves the webdav problem reported in #1181 --------- Signed-off-by: simonsan <14062932+simonsan@users.noreply.github.com> Co-authored-by: simonsan <14062932+simonsan@users.noreply.github.com> --- src/commands/webdav.rs | 6 +- src/commands/webdav/webdavfs.rs | 254 ++++++++++++++++++++++++-------- 2 files changed, 193 insertions(+), 67 deletions(-) diff --git a/src/commands/webdav.rs b/src/commands/webdav.rs index 90b79072..aa06d390 100644 --- a/src/commands/webdav.rs +++ b/src/commands/webdav.rs @@ -3,6 +3,9 @@ // ignore markdown clippy lints as we use doc-comments to generate clap help texts #![allow(clippy::doc_markdown)] +mod webdavfs; +use webdavfs::WebDavFS; + use std::net::ToSocketAddrs; use crate::{repository::CliIndexedRepo, status_err, Application, RusticConfig, RUSTIC_APP}; @@ -13,9 +16,6 @@ use dav_server::{warp::dav_handler, DavHandler}; use serde::{Deserialize, Serialize}; use rustic_core::vfs::{FilePolicy, IdenticalSnapshot, Latest, Vfs}; -use webdavfs::WebDavFS; - -mod webdavfs; #[derive(Clone, Command, Default, Debug, clap::Parser, Serialize, Deserialize, Merge)] #[serde(default, rename_all = "kebab-case", deny_unknown_fields)] diff --git a/src/commands/webdav/webdavfs.rs b/src/commands/webdav/webdavfs.rs index 0c3ebb54..87146ad7 100644 --- a/src/commands/webdav/webdavfs.rs +++ b/src/commands/webdav/webdavfs.rs @@ -3,7 +3,7 @@ use std::os::unix::ffi::OsStrExt; use std::{ fmt::{Debug, Formatter}, io::SeekFrom, - sync::{Arc, OnceLock}, + sync::OnceLock, time::SystemTime, }; @@ -16,6 +16,7 @@ use dav_server::{ }, }; use futures::FutureExt; +use tokio::sync::{mpsc, oneshot}; use rustic_core::{ repofile::Node, @@ -40,6 +41,102 @@ struct DavFsInner { file_policy: FilePolicy, } +impl DavFsInner { + /// Get a [`Node`] from the specified [`DavPath`]. + /// + /// # Arguments + /// + /// * `path` - The path to get the [`Tree`] at + /// + /// # Errors + /// + /// * If the [`Tree`] could not be found + /// + /// # Returns + /// + /// The [`Node`] at the specified path + /// + /// [`Tree`]: crate::repofile::Tree + fn node_from_path(&self, path: &DavPath) -> Result { + self.vfs + .node_from_path(&self.repo, &path.as_pathbuf()) + .map_err(|_| FsError::GeneralFailure) + } + + /// Get a list of [`Node`]s from the specified directory path. + /// + /// # Arguments + /// + /// * `path` - The path to get the [`Tree`] at + /// + /// # Errors + /// + /// * If the [`Tree`] could not be found + /// + /// # Returns + /// + /// The list of [`Node`]s at the specified path + /// + /// [`Tree`]: crate::repofile::Tree + fn dir_entries_from_path(&self, path: &DavPath) -> Result, FsError> { + self.vfs + .dir_entries_from_path(&self.repo, &path.as_pathbuf()) + .map_err(|_| FsError::GeneralFailure) + } + + fn open(&self, node: &Node, options: OpenOptions) -> Result { + if options.write + || options.append + || options.truncate + || options.create + || options.create_new + { + return Err(FsError::Forbidden); + } + + if matches!(self.file_policy, FilePolicy::Forbidden) { + return Err(FsError::Forbidden); + } + + let open = self + .repo + .open_file(node) + .map_err(|_err| FsError::GeneralFailure)?; + Ok(open) + } + + fn read_bytes( + &self, + file: OpenFile, + seek: usize, + count: usize, + ) -> Result<(Bytes, OpenFile), FsError> { + let data = self + .repo + .read_file_at(&file, seek, count) + .map_err(|_err| FsError::GeneralFailure)?; + Ok((data, file)) + } +} + +/// Messages used +#[allow(clippy::large_enum_variant)] +enum DavFsInnerCommand { + Node(DavPath, oneshot::Sender>), + DirEntries(DavPath, oneshot::Sender, FsError>>), + Open( + Node, + OpenOptions, + oneshot::Sender>, + ), + ReadBytes( + OpenFile, + usize, + usize, + oneshot::Sender>, + ), +} + impl Debug for DavFsInner { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { write!(f, "DavFS") @@ -50,12 +147,12 @@ impl Debug for DavFsInner { /// /// This is the main entry point for the DAV filesystem. /// It implements [`DavFileSystem`] and can be used to serve a [`Repository`] via DAV. -#[derive(Debug)] -pub struct WebDavFS { - inner: Arc>, +#[derive(Debug, Clone)] +pub struct WebDavFS { + send: mpsc::Sender, } -impl WebDavFS { +impl WebDavFS { /// Create a new [`WebDavFS`] instance. /// /// # Arguments @@ -67,16 +164,44 @@ impl WebDavFS { /// # Returns /// /// A new [`WebDavFS`] instance - pub(crate) fn new(repo: Repository, vfs: Vfs, file_policy: FilePolicy) -> Self { + pub(crate) fn new( + repo: Repository, + vfs: Vfs, + file_policy: FilePolicy, + ) -> Self { let inner = DavFsInner { repo, vfs, file_policy, }; - Self { - inner: Arc::new(inner), - } + let (send, mut rcv) = mpsc::channel(1); + + let _ = std::thread::spawn(move || -> Result<_, FsError> { + while let Some(task) = rcv.blocking_recv() { + match task { + DavFsInnerCommand::Node(path, res) => { + res.send(inner.node_from_path(&path)) + .map_err(|_err| FsError::GeneralFailure)?; + } + DavFsInnerCommand::DirEntries(path, res) => { + res.send(inner.dir_entries_from_path(&path)) + .map_err(|_err| FsError::GeneralFailure)?; + } + DavFsInnerCommand::Open(path, open_options, res) => { + res.send(inner.open(&path, open_options)) + .map_err(|_err| FsError::GeneralFailure)?; + } + DavFsInnerCommand::ReadBytes(file, seek, count, res) => { + res.send(inner.read_bytes(file, seek, count)) + .map_err(|_err| FsError::GeneralFailure)?; + } + } + } + Ok(()) + }); + + Self { send } } /// Get a [`Node`] from the specified [`DavPath`]. @@ -94,11 +219,13 @@ impl WebDavFS { /// The [`Node`] at the specified path /// /// [`Tree`]: crate::repofile::Tree - fn node_from_path(&self, path: &DavPath) -> Result { - self.inner - .vfs - .node_from_path(&self.inner.repo, &path.as_pathbuf()) - .map_err(|_| FsError::GeneralFailure) + async fn node_from_path(&self, path: &DavPath) -> Result { + let (send, rcv) = oneshot::channel(); + self.send + .send(DavFsInnerCommand::Node(path.clone(), send)) + .await + .map_err(|_err| FsError::GeneralFailure)?; + rcv.await.map_err(|_err| FsError::GeneralFailure)? } /// Get a list of [`Node`]s from the specified directory path. @@ -116,32 +243,46 @@ impl WebDavFS { /// The list of [`Node`]s at the specified path /// /// [`Tree`]: crate::repofile::Tree - fn dir_entries_from_path(&self, path: &DavPath) -> Result, FsError> { - self.inner - .vfs - .dir_entries_from_path(&self.inner.repo, &path.as_pathbuf()) - .map_err(|_| FsError::GeneralFailure) + async fn dir_entries_from_path(&self, path: &DavPath) -> Result, FsError> { + let (send, rcv) = oneshot::channel(); + self.send + .send(DavFsInnerCommand::DirEntries(path.clone(), send)) + .await + .map_err(|_err| FsError::GeneralFailure)?; + rcv.await.map_err(|_err| FsError::GeneralFailure)? } -} -impl Clone for WebDavFS { - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - } + async fn open(&self, node: &Node, options: OpenOptions) -> Result { + let (send, rcv) = oneshot::channel(); + self.send + .send(DavFsInnerCommand::Open(node.clone(), options, send)) + .await + .map_err(|_err| FsError::GeneralFailure)?; + rcv.await.map_err(|_err| FsError::GeneralFailure)? + } + async fn read_bytes( + &self, + file: OpenFile, + seek: usize, + count: usize, + ) -> Result<(Bytes, OpenFile), FsError> { + let (send, rcv) = oneshot::channel(); + self.send + .send(DavFsInnerCommand::ReadBytes(file, seek, count, send)) + .await + .map_err(|_err| FsError::GeneralFailure)?; + rcv.await.map_err(|_err| FsError::GeneralFailure)? } } -impl DavFileSystem - for WebDavFS -{ +impl DavFileSystem for WebDavFS { fn metadata<'a>(&'a self, davpath: &'a DavPath) -> FsFuture<'_, Box> { self.symlink_metadata(davpath) } fn symlink_metadata<'a>(&'a self, davpath: &'a DavPath) -> FsFuture<'_, Box> { async move { - let node = self.node_from_path(davpath)?; + let node = self.node_from_path(davpath).await?; let meta: Box = Box::new(DavFsMetaData(node)); Ok(meta) } @@ -154,7 +295,7 @@ impl FsFuture<'_, FsStream>> { async move { - let entries = self.dir_entries_from_path(davpath)?; + let entries = self.dir_entries_from_path(davpath).await?; let entry_iter = entries.into_iter().map(|e| { let entry: Box = Box::new(DavFsDirEntry(e)); Ok(entry) @@ -171,30 +312,13 @@ impl FsFuture<'_, Box> { async move { - if options.write - || options.append - || options.truncate - || options.create - || options.create_new - { - return Err(FsError::Forbidden); - } - - let node = self.node_from_path(path)?; - if matches!(self.inner.file_policy, FilePolicy::Forbidden) { - return Err(FsError::Forbidden); - } - - let open = self - .inner - .repo - .open_file(&node) - .map_err(|_err| FsError::GeneralFailure)?; + let node = self.node_from_path(path).await?; + let file = self.open(&node, options).await?; let file: Box = Box::new(DavFsFile { - node, - open, - fs: self.inner.clone(), + open: Some(file), seek: 0, + fs: self.clone(), + node, }); Ok(file) } @@ -234,27 +358,25 @@ impl DavDirEntry for DavFsDirEntry { /// A [`DavFile`] implementation for [`Node`]s. /// /// This is a read-only file. -struct DavFsFile { +struct DavFsFile { /// The [`Node`] this file is for node: Node, /// The [`OpenFile`] for this file - open: OpenFile, - - /// The [`DavFsInner`] this file belongs to - fs: Arc>, + open: Option, /// The current seek position seek: usize, + fs: WebDavFS, } -impl Debug for DavFsFile { +impl Debug for DavFsFile { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { write!(f, "DavFile") } } -impl DavFile for DavFsFile { +impl DavFile for DavFsFile { fn metadata(&mut self) -> FsFuture<'_, Box> { async move { let meta: Box = Box::new(DavFsMetaData(self.node.clone())); @@ -273,12 +395,16 @@ impl DavFile for D fn read_bytes(&mut self, count: usize) -> FsFuture<'_, Bytes> { async move { - let data = self + let (data, open) = self .fs - .repo - .read_file_at(&self.open, self.seek, count) - .map_err(|_err| FsError::GeneralFailure)?; + .read_bytes( + self.open.take().ok_or(FsError::GeneralFailure)?, + self.seek, + count, + ) + .await?; self.seek += data.len(); + self.open = Some(open); Ok(data) } .boxed() @@ -292,7 +418,7 @@ impl DavFile for D } SeekFrom::Current(delta) => { self.seek = usize::try_from( - i64::try_from(self.seek).expect("i64 wrapped around") + delta, + i64::try_from(self.seek).expect("i64 should not wrap around") + delta, ) .expect("usize overflow should not happen"); }