Skip to content

Commit

Permalink
fix(filesystem): Use channels to communicate within webdav filesystem (
Browse files Browse the repository at this point in the history
…#1361)

Solves the webdav problem reported in #1181

---------

Signed-off-by: simonsan <[email protected]>
Co-authored-by: simonsan <[email protected]>
  • Loading branch information
aawsome and simonsan authored Nov 23, 2024
1 parent e96325d commit d79e392
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 67 deletions.
6 changes: 3 additions & 3 deletions src/commands/webdav.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
// ignore markdown clippy lints as we use doc-comments to generate clap help texts
#![allow(clippy::doc_markdown)]

mod webdavfs;
use webdavfs::WebDavFS;

use std::net::ToSocketAddrs;

use crate::{repository::CliIndexedRepo, status_err, Application, RusticConfig, RUSTIC_APP};
Expand All @@ -13,9 +16,6 @@ use dav_server::{warp::dav_handler, DavHandler};
use serde::{Deserialize, Serialize};

use rustic_core::vfs::{FilePolicy, IdenticalSnapshot, Latest, Vfs};
use webdavfs::WebDavFS;

mod webdavfs;

#[derive(Clone, Command, Default, Debug, clap::Parser, Serialize, Deserialize, Merge)]
#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
Expand Down
254 changes: 190 additions & 64 deletions src/commands/webdav/webdavfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::os::unix::ffi::OsStrExt;
use std::{
fmt::{Debug, Formatter},
io::SeekFrom,
sync::{Arc, OnceLock},
sync::OnceLock,
time::SystemTime,
};

Expand All @@ -16,6 +16,7 @@ use dav_server::{
},
};
use futures::FutureExt;
use tokio::sync::{mpsc, oneshot};

use rustic_core::{
repofile::Node,
Expand All @@ -40,6 +41,102 @@ struct DavFsInner<P, S> {
file_policy: FilePolicy,
}

impl<P, S: IndexedFull> DavFsInner<P, S> {
/// Get a [`Node`] from the specified [`DavPath`].
///
/// # Arguments
///
/// * `path` - The path to get the [`Tree`] at
///
/// # Errors
///
/// * If the [`Tree`] could not be found
///
/// # Returns
///
/// The [`Node`] at the specified path
///
/// [`Tree`]: crate::repofile::Tree
fn node_from_path(&self, path: &DavPath) -> Result<Node, FsError> {
self.vfs
.node_from_path(&self.repo, &path.as_pathbuf())
.map_err(|_| FsError::GeneralFailure)
}

/// Get a list of [`Node`]s from the specified directory path.
///
/// # Arguments
///
/// * `path` - The path to get the [`Tree`] at
///
/// # Errors
///
/// * If the [`Tree`] could not be found
///
/// # Returns
///
/// The list of [`Node`]s at the specified path
///
/// [`Tree`]: crate::repofile::Tree
fn dir_entries_from_path(&self, path: &DavPath) -> Result<Vec<Node>, FsError> {
self.vfs
.dir_entries_from_path(&self.repo, &path.as_pathbuf())
.map_err(|_| FsError::GeneralFailure)
}

fn open(&self, node: &Node, options: OpenOptions) -> Result<OpenFile, FsError> {
if options.write
|| options.append
|| options.truncate
|| options.create
|| options.create_new
{
return Err(FsError::Forbidden);
}

if matches!(self.file_policy, FilePolicy::Forbidden) {
return Err(FsError::Forbidden);
}

let open = self
.repo
.open_file(node)
.map_err(|_err| FsError::GeneralFailure)?;
Ok(open)
}

fn read_bytes(
&self,
file: OpenFile,
seek: usize,
count: usize,
) -> Result<(Bytes, OpenFile), FsError> {
let data = self
.repo
.read_file_at(&file, seek, count)
.map_err(|_err| FsError::GeneralFailure)?;
Ok((data, file))
}
}

/// Messages used
#[allow(clippy::large_enum_variant)]
enum DavFsInnerCommand {
Node(DavPath, oneshot::Sender<Result<Node, FsError>>),
DirEntries(DavPath, oneshot::Sender<Result<Vec<Node>, FsError>>),
Open(
Node,
OpenOptions,
oneshot::Sender<Result<OpenFile, FsError>>,
),
ReadBytes(
OpenFile,
usize,
usize,
oneshot::Sender<Result<(Bytes, OpenFile), FsError>>,
),
}

impl<P, S> Debug for DavFsInner<P, S> {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
write!(f, "DavFS")
Expand All @@ -50,12 +147,12 @@ impl<P, S> Debug for DavFsInner<P, S> {
///
/// This is the main entry point for the DAV filesystem.
/// It implements [`DavFileSystem`] and can be used to serve a [`Repository`] via DAV.
#[derive(Debug)]
pub struct WebDavFS<P, S> {
inner: Arc<DavFsInner<P, S>>,
#[derive(Debug, Clone)]
pub struct WebDavFS {
send: mpsc::Sender<DavFsInnerCommand>,
}

impl<P, S: IndexedFull> WebDavFS<P, S> {
impl WebDavFS {
/// Create a new [`WebDavFS`] instance.
///
/// # Arguments
Expand All @@ -67,16 +164,44 @@ impl<P, S: IndexedFull> WebDavFS<P, S> {
/// # Returns
///
/// A new [`WebDavFS`] instance
pub(crate) fn new(repo: Repository<P, S>, vfs: Vfs, file_policy: FilePolicy) -> Self {
pub(crate) fn new<P: Send + 'static, S: IndexedFull + Send + 'static>(
repo: Repository<P, S>,
vfs: Vfs,
file_policy: FilePolicy,
) -> Self {
let inner = DavFsInner {
repo,
vfs,
file_policy,
};

Self {
inner: Arc::new(inner),
}
let (send, mut rcv) = mpsc::channel(1);

let _ = std::thread::spawn(move || -> Result<_, FsError> {
while let Some(task) = rcv.blocking_recv() {
match task {
DavFsInnerCommand::Node(path, res) => {
res.send(inner.node_from_path(&path))
.map_err(|_err| FsError::GeneralFailure)?;
}
DavFsInnerCommand::DirEntries(path, res) => {
res.send(inner.dir_entries_from_path(&path))
.map_err(|_err| FsError::GeneralFailure)?;
}
DavFsInnerCommand::Open(path, open_options, res) => {
res.send(inner.open(&path, open_options))
.map_err(|_err| FsError::GeneralFailure)?;
}
DavFsInnerCommand::ReadBytes(file, seek, count, res) => {
res.send(inner.read_bytes(file, seek, count))
.map_err(|_err| FsError::GeneralFailure)?;
}
}
}
Ok(())
});

Self { send }
}

/// Get a [`Node`] from the specified [`DavPath`].
Expand All @@ -94,11 +219,13 @@ impl<P, S: IndexedFull> WebDavFS<P, S> {
/// The [`Node`] at the specified path
///
/// [`Tree`]: crate::repofile::Tree
fn node_from_path(&self, path: &DavPath) -> Result<Node, FsError> {
self.inner
.vfs
.node_from_path(&self.inner.repo, &path.as_pathbuf())
.map_err(|_| FsError::GeneralFailure)
async fn node_from_path(&self, path: &DavPath) -> Result<Node, FsError> {
let (send, rcv) = oneshot::channel();
self.send
.send(DavFsInnerCommand::Node(path.clone(), send))
.await
.map_err(|_err| FsError::GeneralFailure)?;
rcv.await.map_err(|_err| FsError::GeneralFailure)?
}

/// Get a list of [`Node`]s from the specified directory path.
Expand All @@ -116,32 +243,46 @@ impl<P, S: IndexedFull> WebDavFS<P, S> {
/// The list of [`Node`]s at the specified path
///
/// [`Tree`]: crate::repofile::Tree
fn dir_entries_from_path(&self, path: &DavPath) -> Result<Vec<Node>, FsError> {
self.inner
.vfs
.dir_entries_from_path(&self.inner.repo, &path.as_pathbuf())
.map_err(|_| FsError::GeneralFailure)
async fn dir_entries_from_path(&self, path: &DavPath) -> Result<Vec<Node>, FsError> {
let (send, rcv) = oneshot::channel();
self.send
.send(DavFsInnerCommand::DirEntries(path.clone(), send))
.await
.map_err(|_err| FsError::GeneralFailure)?;
rcv.await.map_err(|_err| FsError::GeneralFailure)?
}
}

impl<P, S: IndexedFull> Clone for WebDavFS<P, S> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
async fn open(&self, node: &Node, options: OpenOptions) -> Result<OpenFile, FsError> {
let (send, rcv) = oneshot::channel();
self.send
.send(DavFsInnerCommand::Open(node.clone(), options, send))
.await
.map_err(|_err| FsError::GeneralFailure)?;
rcv.await.map_err(|_err| FsError::GeneralFailure)?
}
async fn read_bytes(
&self,
file: OpenFile,
seek: usize,
count: usize,
) -> Result<(Bytes, OpenFile), FsError> {
let (send, rcv) = oneshot::channel();
self.send
.send(DavFsInnerCommand::ReadBytes(file, seek, count, send))
.await
.map_err(|_err| FsError::GeneralFailure)?;
rcv.await.map_err(|_err| FsError::GeneralFailure)?
}
}

impl<P: Debug + Send + Sync + 'static, S: IndexedFull + Debug + Send + Sync + 'static> DavFileSystem
for WebDavFS<P, S>
{
impl DavFileSystem for WebDavFS {
fn metadata<'a>(&'a self, davpath: &'a DavPath) -> FsFuture<'_, Box<dyn DavMetaData>> {
self.symlink_metadata(davpath)
}

fn symlink_metadata<'a>(&'a self, davpath: &'a DavPath) -> FsFuture<'_, Box<dyn DavMetaData>> {
async move {
let node = self.node_from_path(davpath)?;
let node = self.node_from_path(davpath).await?;
let meta: Box<dyn DavMetaData> = Box::new(DavFsMetaData(node));
Ok(meta)
}
Expand All @@ -154,7 +295,7 @@ impl<P: Debug + Send + Sync + 'static, S: IndexedFull + Debug + Send + Sync + 's
_meta: ReadDirMeta,
) -> FsFuture<'_, FsStream<Box<dyn DavDirEntry>>> {
async move {
let entries = self.dir_entries_from_path(davpath)?;
let entries = self.dir_entries_from_path(davpath).await?;
let entry_iter = entries.into_iter().map(|e| {
let entry: Box<dyn DavDirEntry> = Box::new(DavFsDirEntry(e));
Ok(entry)
Expand All @@ -171,30 +312,13 @@ impl<P: Debug + Send + Sync + 'static, S: IndexedFull + Debug + Send + Sync + 's
options: OpenOptions,
) -> FsFuture<'_, Box<dyn DavFile>> {
async move {
if options.write
|| options.append
|| options.truncate
|| options.create
|| options.create_new
{
return Err(FsError::Forbidden);
}

let node = self.node_from_path(path)?;
if matches!(self.inner.file_policy, FilePolicy::Forbidden) {
return Err(FsError::Forbidden);
}

let open = self
.inner
.repo
.open_file(&node)
.map_err(|_err| FsError::GeneralFailure)?;
let node = self.node_from_path(path).await?;
let file = self.open(&node, options).await?;
let file: Box<dyn DavFile> = Box::new(DavFsFile {
node,
open,
fs: self.inner.clone(),
open: Some(file),
seek: 0,
fs: self.clone(),
node,
});
Ok(file)
}
Expand Down Expand Up @@ -234,27 +358,25 @@ impl DavDirEntry for DavFsDirEntry {
/// A [`DavFile`] implementation for [`Node`]s.
///
/// This is a read-only file.
struct DavFsFile<P, S> {
struct DavFsFile {
/// The [`Node`] this file is for
node: Node,

/// The [`OpenFile`] for this file
open: OpenFile,

/// The [`DavFsInner`] this file belongs to
fs: Arc<DavFsInner<P, S>>,
open: Option<OpenFile>,

/// The current seek position
seek: usize,
fs: WebDavFS,
}

impl<P, S> Debug for DavFsFile<P, S> {
impl Debug for DavFsFile {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
write!(f, "DavFile")
}
}

impl<P: Debug + Send + Sync, S: IndexedFull + Debug + Send + Sync> DavFile for DavFsFile<P, S> {
impl DavFile for DavFsFile {
fn metadata(&mut self) -> FsFuture<'_, Box<dyn DavMetaData>> {
async move {
let meta: Box<dyn DavMetaData> = Box::new(DavFsMetaData(self.node.clone()));
Expand All @@ -273,12 +395,16 @@ impl<P: Debug + Send + Sync, S: IndexedFull + Debug + Send + Sync> DavFile for D

fn read_bytes(&mut self, count: usize) -> FsFuture<'_, Bytes> {
async move {
let data = self
let (data, open) = self
.fs
.repo
.read_file_at(&self.open, self.seek, count)
.map_err(|_err| FsError::GeneralFailure)?;
.read_bytes(
self.open.take().ok_or(FsError::GeneralFailure)?,
self.seek,
count,
)
.await?;
self.seek += data.len();
self.open = Some(open);
Ok(data)
}
.boxed()
Expand All @@ -292,7 +418,7 @@ impl<P: Debug + Send + Sync, S: IndexedFull + Debug + Send + Sync> DavFile for D
}
SeekFrom::Current(delta) => {
self.seek = usize::try_from(
i64::try_from(self.seek).expect("i64 wrapped around") + delta,
i64::try_from(self.seek).expect("i64 should not wrap around") + delta,
)
.expect("usize overflow should not happen");
}
Expand Down

0 comments on commit d79e392

Please sign in to comment.