From bb44194a2e2e314e8fe72e964d53c0927b91357f Mon Sep 17 00:00:00 2001 From: Muhamad Azamy Date: Mon, 18 Sep 2023 16:29:12 +0200 Subject: [PATCH] WIP: refactor for separate stores --- build.rs | 6 - schema/schema.capnp | 97 --------- src/cache/mod.rs | 14 +- src/fs/mod.rs | 202 +++++++------------ src/fungi/meta.rs | 476 ++++++++++++++++++++++++++++++++++++++++++++ src/fungi/mod.rs | 393 +----------------------------------- src/lib.rs | 8 +- src/main.rs | 45 ++--- src/meta/backend.rs | 37 ---- src/meta/inode.rs | 109 ---------- src/meta/mod.rs | 278 -------------------------- src/meta/types.rs | 218 -------------------- src/store/mod.rs | 67 +++++++ src/store/router.rs | 52 +++++ src/store/zdb.rs | 0 15 files changed, 702 insertions(+), 1300 deletions(-) delete mode 100644 schema/schema.capnp create mode 100644 src/fungi/meta.rs delete mode 100644 src/meta/backend.rs delete mode 100644 src/meta/inode.rs delete mode 100644 src/meta/mod.rs delete mode 100644 src/meta/types.rs create mode 100644 src/store/mod.rs create mode 100644 src/store/router.rs create mode 100644 src/store/zdb.rs diff --git a/build.rs b/build.rs index 0fa496d..9eae9bb 100644 --- a/build.rs +++ b/build.rs @@ -3,10 +3,4 @@ fn main() { "cargo:rustc-env=GIT_VERSION={}", git_version::git_version!(args = ["--tags", "--always", "--dirty=-modified"]) ); - - capnpc::CompilerCommand::new() - .src_prefix("schema") - .file("schema/schema.capnp") - .run() - .expect("schema compiler command"); } diff --git a/schema/schema.capnp b/schema/schema.capnp deleted file mode 100644 index 32b69a5..0000000 --- a/schema/schema.capnp +++ /dev/null @@ -1,97 +0,0 @@ -@0xae9223e76351538a; - -struct FileBlock { - hash @0: Data; # File hash stored as key on the backend - key @1: Data; # Encryption key -} - -struct File { - # blocksize in bytes = blocksize * 4 KB, blocksize is same for all parts of file - # max blocksize = 128 MB - blockSize @0: UInt16; - blocks @1: List(FileBlock); # list of the hashes of the blocks -} - -struct Link { - target @0: Text; # Path to target -} - -struct Special { - type @0: Type; - - # 0 => socket (S_IFSOCK) - # 1 => block device (S_IFBLK) - # 2 => char. device (S_IFCHR) - # 3 => fifo pipe (S_IFIFO) - enum Type { - socket @0; - block @1; - chardev @2; - fifopipe @3; - unknown @4; - } - - data @1: Data; # data relevant for type of item -} - -struct SubDir { - key @0: Text; # Key ID of the subdirectory -} - -struct Inode { - name @0: Text; - size @1: UInt64; # in bytes - - attributes: union { - dir @2: SubDir; - file @3: File; - link @4: Link; - special @5: Special; - } - - aclkey @6: Text; # is pointer to ACL # FIXME: need to be int - modificationTime @7: UInt32; - creationTime @8: UInt32; -} - -struct Dir { - name @0: Text; # name of dir - - location @1: Text; # location in filesystem = namespace - contents @2: List(Inode); # list of the contents (files, links, specials) - parent @3: Text; # directory key of parent - - # directory's metadata - - size @4: UInt64; # in bytes - aclkey @5: Text; # is pointer to ACL # FIXME: need to be int - modificationTime @6: UInt32; - creationTime @7: UInt32; -} - -struct UserGroup { - name @0: Text; - iyoId @1: Text; # itsyou.online id - iyoInt @2: UInt64; # itsyouonline unique id per user or group, is globally unique -} - - -struct ACI { - # for backwards compatibility with posix - uname @0: Text; - gname @1: Text; - mode @2: UInt16; - - rights @3 :List(Right); - struct Right { - # text e.g. rwdl- (admin read write delete list -), freely to be chosen - # admin means all rights (e.g. on / = namespace or filesystem level all rights for everything) - # - means remove all previous ones (is to stop recursion), if usergroupid=0 then is for all users & all groups - right @0: Text; - usergroupid @1: UInt16; - } - - id @4: UInt32; - uid @5 :Int64 = -1; - gid @6 :Int64 = -1; -} diff --git a/src/cache/mod.rs b/src/cache/mod.rs index 5a9cbd2..b907b78 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -1,4 +1,4 @@ -use crate::meta::types::FileBlock; +use crate::fungi::meta::Block; use anyhow::{Context, Result}; use bb8_redis::redis::aio::Connection; @@ -127,21 +127,21 @@ impl Cache { //con. let result: Vec = con.get(id).await?; if result.is_empty() { - bail!("invalid chunk length downloaded"); + anyhow::bail!("invalid chunk length downloaded"); } let key = unsafe { std::str::from_utf8_unchecked(key) }; let mut decoder = snap::raw::Decoder::new(); let result = match decoder.decompress_vec(&xxtea::decrypt(&result, key)) { Ok(data) => data, - Err(_) => bail!("invalid chunk"), + Err(_) => anyhow::bail!("invalid chunk"), }; Ok(result) } // download given an open file, writes the content of the chunk to the file - async fn download(&self, file: &mut File, block: &FileBlock) -> Result { + async fn download(&self, file: &mut File, block: &Block) -> Result { let data = self.get_data(&block.hash, &block.key).await?; file.write_all(&data).await?; @@ -151,7 +151,7 @@ impl Cache { async fn prepare(&self, id: &[u8]) -> Result { let name = id.hex(); if name.len() < 4 { - bail!("invalid chunk hash"); + anyhow::bail!("invalid chunk hash"); } let path = self.root.join(&name[0..2]).join(&name[2..4]); fs::create_dir_all(&path).await?; @@ -170,7 +170,7 @@ impl Cache { /// get a file block either from cache or from remote if it's already /// not cached - pub async fn get(&self, block: &FileBlock) -> Result<(u64, File)> { + pub async fn get(&self, block: &Block) -> Result<(u64, File)> { let mut file = self.prepare(&block.hash).await?; // TODO: locking must happen here so no // other processes start downloading the same chunk @@ -198,7 +198,7 @@ impl Cache { /// direct downloads all the file blocks from remote and write it to output #[allow(dead_code)] - pub async fn direct(&self, blocks: &[FileBlock], out: &mut File) -> Result<()> { + pub async fn direct(&self, blocks: &[Block], out: &mut File) -> Result<()> { use tokio::io::copy; for (index, block) in blocks.iter().enumerate() { let (_, mut chunk) = self.get(block).await?; diff --git a/src/fs/mod.rs b/src/fs/mod.rs index 50bcdd2..a84d13f 100644 --- a/src/fs/mod.rs +++ b/src/fs/mod.rs @@ -2,13 +2,12 @@ #![deny(clippy::unimplemented, clippy::todo)] use crate::cache; -use crate::meta; -use crate::meta::inode::Inode; -use crate::meta::Entry; -use crate::meta::Metadata; +use crate::fungi::{ + meta::{FileType, Inode}, + Reader, +}; use anyhow::{ensure, Result}; -use meta::types::EntryKind; use polyfuse::reply::FileAttr; use polyfuse::{ op, @@ -33,13 +32,13 @@ type BlockSize = u64; #[derive(Clone)] pub struct Filesystem { - meta: meta::Metadata, + meta: Reader, cache: cache::Cache, lru: Arc>>, } impl Filesystem { - pub fn new(meta: meta::Metadata, cache: cache::Cache) -> Filesystem { + pub fn new(meta: Reader, cache: cache::Cache) -> Filesystem { Filesystem { meta, cache, @@ -96,25 +95,24 @@ impl Filesystem { } async fn readlink(&self, req: &Request, op: op::Readlink<'_>) -> Result<()> { - let entry = self.meta.entry(op.ino()).await?; - let link = match entry.kind { - EntryKind::Link(l) => l, - _ => { - return Ok(req.reply_error(libc::ENOLINK)?); - } - }; + let link = self.meta.inode(op.ino()).await?; + if !link.mode.is(FileType::Link) { + return Ok(req.reply_error(libc::ENOLINK)?); + } - req.reply(link.target)?; - Ok(()) + if let Some(target) = link.data { + req.reply(target); + return Ok(()); + } + + return Ok(req.reply_error(libc::ENOLINK)?); } async fn read(&self, req: &Request, op: op::Read<'_>) -> Result<()> { - let entry = self.meta.entry(op.ino()).await?; - let file_metadata = match entry.kind { - EntryKind::File(file) => file, - _ => { - return Ok(req.reply_error(libc::EISDIR)?); - } + let entry = self.meta.inode(op.ino()).await?; + + if !entry.mode.is(FileType::Regular) { + return Ok(req.reply_error(libc::EISDIR)?); }; let offset = op.offset() as usize; @@ -122,7 +120,9 @@ impl Filesystem { let chunk_size = CHUNK_SIZE; // file.block_size as usize; let chunk_index = offset / chunk_size; - if chunk_index >= file_metadata.blocks.len() || op.size() == 0 { + let blocks = self.meta.blocks(op.ino()).await?; + + if chunk_index >= blocks.len() || op.size() == 0 { // reading after the end of the file let data: &[u8] = &[]; return Ok(req.reply(data)?); @@ -133,7 +133,7 @@ impl Filesystem { let mut buf: Vec = vec![0; size]; let mut total = 0; - 'blocks: for block in file_metadata.blocks.iter().skip(chunk_index) { + 'blocks: for block in blocks.iter().skip(chunk_index) { // hash works as a key inside the LRU let hash = block.hash; @@ -198,32 +198,25 @@ impl Filesystem { } async fn getattr(&self, req: &Request, op: op::Getattr<'_>) -> Result<()> { - let entry = self.meta.entry(op.ino()).await?; + let entry = self.meta.inode(op.ino()).await?; + let mut attr = AttrOut::default(); let fill = attr.attr(); - if entry.fill(&self.meta, fill).await.is_err() { - req.reply_error(libc::ENOENT)?; - } + entry.fill(fill); - if op.ino() == 1 { - fill.mode(libc::S_IFDIR | 0o755); - } req.reply(attr)?; Ok(()) } async fn readdir(&self, req: &Request, op: op::Readdir<'_>) -> Result<()> { - let entry = self.meta.entry(op.ino()).await?; + let root = self.meta.inode(op.ino()).await?; - let dir = match entry.kind { - EntryKind::Dir(dir) => dir, - _ => { - req.reply_error(libc::ENOTDIR)?; - return Ok(()); - } - }; + if !root.mode.is(FileType::Dir) { + req.reply_error(libc::ENOTDIR)?; + return Ok(()); + } let mut out = ReaddirOut::new(op.size() as usize); let mut offset = op.offset(); @@ -233,7 +226,7 @@ impl Filesystem { "..".as_ref(), match op.ino() { 1 => 1, - _ => self.meta.dir_inode(dir.parent).await?.ino(), + _ => root.parent, }, libc::DT_DIR as u32, 2, @@ -244,30 +237,21 @@ impl Filesystem { offset -= 2; } - for (i, entry) in dir.entries.iter().enumerate().skip(offset as usize) { + let children = self.meta.children(root.ino, op.size(), offset).await?; + for (i, entry) in children.into_iter().enumerate() { let offset = i as u64 + 3; - let full = match &entry.kind { - EntryKind::SubDir(sub) => { - let inode = self.meta.dir_inode(&sub.key).await?; - out.entry( - entry.node.name.as_ref(), - inode.ino(), - libc::DT_DIR as u32, - offset, - ) + + let full = match entry.mode.file_type() { + FileType::Dir => { + //let inode = self.meta.dir_inode(&sub.key).await?; + out.entry(entry.name.as_ref(), entry.ino, libc::DT_DIR as u32, offset) + } + FileType::Regular => { + out.entry(entry.name.as_ref(), entry.ino, libc::DT_REG as u32, offset) + } + FileType::Link => { + out.entry(entry.name.as_ref(), entry.ino, libc::DT_LNK as u32, offset) } - EntryKind::File(_) => out.entry( - entry.node.name.as_ref(), - entry.node.inode.ino(), - libc::DT_REG as u32, - offset, - ), - EntryKind::Link(_) => out.entry( - entry.node.name.as_ref(), - entry.node.inode.ino(), - libc::DT_LNK as u32, - offset, - ), _ => { warn!("unkonwn entry"); false @@ -283,28 +267,31 @@ impl Filesystem { } async fn lookup(&self, req: &Request, op: op::Lookup<'_>) -> Result<()> { - let parent = self.meta.entry(op.parent()).await?; - - let dir = match parent.kind { - EntryKind::Dir(dir) => dir, - _ => { + let name = match op.name().to_str() { + Some(name) => name, + None => { req.reply_error(libc::ENOENT)?; return Ok(()); } }; - for entry in dir.entries.iter() { - if entry.node.name.as_bytes() == op.name().as_bytes() { - let mut out = EntryOut::default(); - let inode = entry.fill(&self.meta, out.attr()).await?; - out.ino(inode.ino()); - out.ttl_attr(TTL); - out.ttl_entry(TTL); - return Ok(req.reply(out)?); + let node = self.meta.lookup(op.parent(), name).await?; + + let node = match node { + Some(node) => node, + None => { + req.reply_error(libc::ENOENT)?; + return Ok(()); } - } + }; + let mut out = EntryOut::default(); + + node.fill(out.attr()); + out.ino(node.ino); + out.ttl_attr(TTL); + out.ttl_entry(TTL); - Ok(req.reply_error(libc::ENOENT)?) + return Ok(req.reply(out)?); } } @@ -346,60 +333,25 @@ impl AsyncSession { } } -#[async_trait::async_trait] trait AttributeFiller { - async fn fill(&self, meta: &Metadata, attr: &mut FileAttr) -> Result; + fn fill(&self, attr: &mut FileAttr); } -#[async_trait::async_trait] -impl AttributeFiller for Entry { - async fn fill(&self, meta: &Metadata, attr: &mut FileAttr) -> Result { +impl AttributeFiller for Inode { + fn fill(&self, attr: &mut FileAttr) { use std::time::Duration; - let mode = match meta.aci(&self.node.acl).await { - Ok(aci) => { - attr.uid(aci.user); - attr.gid(aci.group); - aci.mode & 0o777 - } - Err(_) => 0o444, - }; + attr.mode(self.mode.mode()); - let inode = self.node.inode; - attr.ino(inode.ino()); - attr.ctime(Duration::from_secs(self.node.creation as u64)); - attr.mtime(Duration::from_secs(self.node.modification as u64)); - attr.size(self.node.size); - - let inode = match &self.kind { - EntryKind::Unknown => bail!("unkown entry"), - EntryKind::Dir(_) => { - attr.nlink(2); - attr.mode(libc::S_IFDIR | mode); - inode - } - EntryKind::SubDir(sub) => { - let inode = meta.dir_inode(&sub.key).await?; - // reset inode - attr.ino(inode.ino()); - attr.nlink(2); - attr.mode(libc::S_IFDIR | mode); - inode - } - EntryKind::File(_) => { - attr.nlink(1); - attr.mode(libc::S_IFREG | mode); - attr.blksize(4 * 1024); - inode - } - EntryKind::Link(link) => { - attr.nlink(1); - attr.size(link.target.len() as u64); - attr.mode(libc::S_IFLNK | 0o555); - inode - } - }; + attr.ino(self.ino); + attr.ctime(Duration::from_secs(self.ctime as u64)); + attr.mtime(Duration::from_secs(self.mtime as u64)); + attr.size(self.size); - Ok(inode) + match self.mode.file_type() { + FileType::Dir => attr.nlink(2), + FileType::Regular => attr.blksize(4 * 1024), + _ => (), + }; } } diff --git a/src/fungi/meta.rs b/src/fungi/meta.rs new file mode 100644 index 0000000..70ba3eb --- /dev/null +++ b/src/fungi/meta.rs @@ -0,0 +1,476 @@ +use std::path::Path; + +use sqlx::{sqlite::SqliteRow, FromRow, Row, SqlitePool}; + +const HASH_LEN: usize = 16; +const KEY_LEN: usize = 16; +const TYPE_MASK: u32 = nix::libc::S_IFMT; + +#[repr(u32)] +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FileType { + Regular = nix::libc::S_IFREG, + Dir = nix::libc::S_IFDIR, + Link = nix::libc::S_IFLNK, + Block = nix::libc::S_IFBLK, + Char = nix::libc::S_IFCHR, + Socket = nix::libc::S_IFSOCK, + FIFO = nix::libc::S_IFIFO, + Unknown = 0, +} + +impl From for FileType { + fn from(value: u32) -> Self { + match value { + nix::libc::S_IFREG => Self::Regular, + nix::libc::S_IFDIR => Self::Dir, + nix::libc::S_IFLNK => Self::Link, + nix::libc::S_IFBLK => Self::Block, + nix::libc::S_IFCHR => Self::Char, + nix::libc::S_IFSOCK => Self::Socket, + nix::libc::S_IFIFO => Self::FIFO, + _ => Self::Unknown, + } + } +} + +static SCHEMA: &'static str = include_str!("../../schema/schema.sql"); + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("failed to execute query: {0}")] + SqlError(#[from] sqlx::Error), + + #[error("invalid hash length")] + InvalidHash, + + #[error("invalid key length")] + InvalidKey, + + #[error("io error: {0}")] + IO(#[from] std::io::Error), +} + +pub type Result = std::result::Result; +pub type Ino = u64; + +#[derive(Debug, Clone, Default)] +pub struct Mode(u32); + +impl From for Mode { + fn from(value: u32) -> Self { + Self(value) + } +} + +impl Mode { + pub fn new(t: FileType, perm: u32) -> Self { + Self(t as u32 | (perm & !TYPE_MASK)) + } + + pub fn file_type(&self) -> FileType { + (self.0 as u32 & TYPE_MASK).into() + } + + pub fn permissions(&self) -> u32 { + self.0 & !TYPE_MASK + } + + pub fn mode(&self) -> u32 { + self.0 + } + + pub fn is(&self, typ: FileType) -> bool { + self.file_type() == typ + } +} + +#[derive(Debug, Clone, Default)] +pub struct Inode { + pub ino: Ino, + pub parent: Ino, + pub name: String, + pub size: u64, + pub uid: u32, + pub gid: u32, + pub mode: Mode, + pub rdev: u32, + pub ctime: i64, + pub mtime: i64, + pub data: Option>, +} + +impl FromRow<'_, SqliteRow> for Inode { + fn from_row(row: &'_ SqliteRow) -> std::result::Result { + Ok(Self { + ino: row.get::("ino") as Ino, + parent: row.get::("parent") as Ino, + name: row.get("name"), + size: row.get::("size") as u64, + uid: row.get("uid"), + gid: row.get("uid"), + mode: row.get::("mode").into(), + rdev: row.get("rdev"), + ctime: row.get("ctime"), + mtime: row.get("mtime"), + data: row.get("data"), + }) + } +} + +#[derive(Debug, Clone, Default)] +pub struct Block { + pub hash: [u8; HASH_LEN], + pub key: [u8; KEY_LEN], +} + +impl FromRow<'_, SqliteRow> for Block { + fn from_row(row: &'_ SqliteRow) -> std::result::Result { + let hash: &[u8] = row.get("hash"); + if hash.len() != HASH_LEN { + return Err(sqlx::Error::Decode(Box::new(Error::InvalidHash))); + } + + let key: &[u8] = row.get("key"); + + if hash.len() != KEY_LEN { + return Err(sqlx::Error::Decode(Box::new(Error::InvalidKey))); + } + + let mut block = Self::default(); + block.hash.copy_from_slice(hash); + block.key.copy_from_slice(key); + + Ok(block) + } +} + +#[derive(Debug, Clone, Default)] +pub struct Route { + pub start: u8, + pub end: u8, + pub url: String, +} + +impl FromRow<'_, SqliteRow> for Route { + fn from_row(row: &'_ SqliteRow) -> std::result::Result { + Ok(Self { + start: row.get("start"), + end: row.get("end"), + url: row.get("url"), + }) + } +} + +#[derive(Debug, Clone)] +pub enum Tag<'a> { + Version, + Description, + Author, + Custom(&'a str), +} + +impl<'a> Tag<'a> { + fn key(&self) -> &str { + match self { + Self::Version => "version", + Self::Description => "description", + Self::Author => "author", + Self::Custom(a) => a, + } + } +} + +#[derive(Clone)] +pub struct Reader { + pool: SqlitePool, +} + +impl Reader { + pub async fn new>(path: P) -> Result { + let con = format!("sqlite://{}", path.as_ref().to_str().unwrap()); + let pool = SqlitePool::connect(&con).await?; + + Ok(Self { pool }) + } + + pub async fn inode(&self, ino: Ino) -> Result { + let inode: Inode = sqlx::query_as(r#"select inode.*, extra.data + from inode left join extra on inode.ino = extra.ino + where inode.ino = ?;"#) + .bind(ino as i64).fetch_one(&self.pool).await?; + + Ok(inode) + } + + pub async fn children(&self, parent: Ino, limit: u32, offset: u64) -> Result> { + let results: Vec = sqlx::query_as( + r#"select inode.*, extra.data + from inode left join extra on inode.ino = extra.ino + where inode.parent = ? limit ? offset ?;"#, + ) + .bind(parent as i64) + .bind(limit) + .bind(offset as i64) + .fetch_all(&self.pool) + .await?; + + Ok(results) + } + + pub async fn lookup>(&self, parent: Ino, name: S) -> Result> { + let inode: Option = sqlx::query_as(r#"select inode.*, extra.data + from inode left join extra on inode.ino = extra.ino + where inode.parent = ? and inode.name = ?;"#) + .bind(parent as i64) + .bind(name.as_ref()) + .fetch_optional(&self.pool).await?; + Ok(inode) + } + + pub async fn blocks(&self, ino: Ino) -> Result> { + let results: Vec = sqlx::query_as("select hash, key from block where ino = ?;") + .bind(ino as i64) + .fetch_all(&self.pool) + .await?; + + Ok(results) + } + + pub async fn tag(&self, tag: Tag<'_>) -> Result> { + let value: Option<(String,)> = sqlx::query_as("select value from tag where key = ?;") + .bind(tag.key()) + .fetch_optional(&self.pool) + .await?; + + Ok(value.map(|v| v.0)) + } + + pub async fn routes(&self) -> Result> { + let results: Vec = sqlx::query_as("select start, end, url from route;") + .fetch_all(&self.pool) + .await?; + + Ok(results) + } +} + +#[derive(Clone)] +pub struct Writer { + pool: SqlitePool, +} + +impl Writer { + /// create a new mkondo writer + pub async fn new>(path: P) -> Result { + tokio::fs::OpenOptions::default() + .write(true) + .truncate(true) + .create(true) + .open(&path) + .await?; + + let con = format!("sqlite://{}", path.as_ref().to_str().unwrap()); + let pool = SqlitePool::connect(&con).await?; + + sqlx::query(SCHEMA).execute(&pool).await?; + + Ok(Self { pool }) + } + + /// inode add an inode to the flist + pub async fn inode(&self, inode: Inode) -> Result { + let result = sqlx::query( + r#"insert into inode (parent, name, size, uid, gid, mode, rdev, ctime, mtime) + values (?, ?, ?, ?, ?, ?, ?, ?, ?);"#, + ) + .bind(inode.parent as i64) + .bind(inode.name) + .bind(inode.size as i64) + .bind(inode.uid) + .bind(inode.gid) + .bind(inode.mode.0) + .bind(inode.rdev) + .bind(inode.ctime) + .bind(inode.mtime) + .execute(&self.pool) + .await?; + + let ino = result.last_insert_rowid() as Ino; + if let Some(data) = &inode.data { + sqlx::query("insert into extra(ino, data) values (?, ?)") + .bind(ino as i64) + .bind(data) + .execute(&self.pool) + .await?; + } + + Ok(ino) + } + + pub async fn block(&self, ino: Ino, hash: &[u8; HASH_LEN], key: &[u8; KEY_LEN]) -> Result<()> { + sqlx::query("insert into block (ino, hash, key) values (?, ?, ?)") + .bind(ino as i64) + .bind(&hash[..]) + .bind(&key[..]) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn route>(&self, start: u8, end: u8, url: U) -> Result<()> { + sqlx::query("insert into route (start, end, url) values (?, ?, ?)") + .bind(start) + .bind(end) + .bind(url.as_ref()) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn tag>(&self, tag: Tag<'_>, value: V) -> Result<()> { + sqlx::query("insert into tag (key, value) values (?, ?);") + .bind(tag.key()) + .bind(value.as_ref()) + .execute(&self.pool) + .await?; + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[tokio::test] + async fn test_inode() { + const PATH: &str = "/tmp/inode.fl"; + let meta = Writer::new(PATH).await.unwrap(); + + let ino = meta + .inode(Inode { + name: "/".into(), + data: Some("target".into()), + ..Inode::default() + }) + .await + .unwrap(); + + assert_eq!(ino, 1); + + let meta = Reader::new(PATH).await.unwrap(); + let inode = meta.inode(ino).await.unwrap(); + + assert_eq!(inode.name, "/"); + assert!(inode.data.is_some()); + assert_eq!(inode.data.unwrap().as_slice(), "target".as_bytes()); + } + + #[tokio::test] + async fn test_get_children() { + const PATH: &str = "/tmp/children.fl"; + let meta = Writer::new(PATH).await.unwrap(); + + let ino = meta + .inode(Inode { + name: "/".into(), + data: Some("target".into()), + ..Inode::default() + }) + .await + .unwrap(); + + for name in ["bin", "etc", "usr"] { + meta.inode(Inode { + parent: ino, + name: name.into(), + ..Inode::default() + }) + .await + .unwrap(); + } + let meta = Reader::new(PATH).await.unwrap(); + let children = meta.children(ino, 10, 0).await.unwrap(); + + assert_eq!(children.len(), 3); + assert_eq!(children[0].name, "bin"); + + let child = meta.lookup(ino, "bin").await.unwrap(); + assert!(child.is_some()); + assert_eq!(child.unwrap().name, "bin"); + + let child = meta.lookup(ino, "wrong").await.unwrap(); + assert!(child.is_none()); + } + + #[tokio::test] + async fn test_get_block() { + const PATH: &str = "/tmp/block.fl"; + let meta = Writer::new(PATH).await.unwrap(); + let hash: [u8; HASH_LEN] = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]; + let key1: [u8; KEY_LEN] = [1; KEY_LEN]; + let key2: [u8; KEY_LEN] = [2; KEY_LEN]; + + meta.block(1, &hash, &key1).await.unwrap(); + meta.block(1, &hash, &key2).await.unwrap(); + + let meta = Reader::new(PATH).await.unwrap(); + + let blocks = meta.blocks(1).await.unwrap(); + assert_eq!(blocks.len(), 2); + assert_eq!(blocks[0].hash, hash); + assert_eq!(blocks[0].key, key1); + assert_eq!(blocks[1].key, key2); + } + + #[tokio::test] + async fn test_get_tag() { + const PATH: &str = "/tmp/tag.fl"; + let meta = Writer::new(PATH).await.unwrap(); + meta.tag(Tag::Version, "0.1").await.unwrap(); + meta.tag(Tag::Author, "azmy").await.unwrap(); + meta.tag(Tag::Custom("custom"), "value").await.unwrap(); + + let meta = Reader::new(PATH).await.unwrap(); + + assert!(matches!( + meta.tag(Tag::Version).await.unwrap().as_deref(), + Some("0.1") + )); + + assert!(matches!( + meta.tag(Tag::Custom("custom")).await.unwrap().as_deref(), + Some("value") + )); + + assert!(matches!( + meta.tag(Tag::Custom("unknown")).await.unwrap(), + None + )); + } + + #[tokio::test] + async fn test_get_routes() { + const PATH: &str = "/tmp/route.fl"; + let meta = Writer::new(PATH).await.unwrap(); + + meta.route(0, 128, "zdb://hub1.grid.tf").await.unwrap(); + meta.route(129, 255, "zdb://hub2.grid.tf").await.unwrap(); + + let meta = Reader::new(PATH).await.unwrap(); + + let routes = meta.routes().await.unwrap(); + assert_eq!(routes.len(), 2); + assert_eq!(routes[0].start, 0); + assert_eq!(routes[0].end, 128); + assert_eq!(routes[0].url, "zdb://hub1.grid.tf"); + } + + #[test] + fn test_mode() { + let m = Mode::new(FileType::Regular, 0754); + + assert_eq!(m.permissions(), 0754); + assert_eq!(m.file_type(), FileType::Regular); + } +} diff --git a/src/fungi/mod.rs b/src/fungi/mod.rs index cfe9e1e..666150c 100644 --- a/src/fungi/mod.rs +++ b/src/fungi/mod.rs @@ -1,392 +1,3 @@ -use std::path::Path; +pub mod meta; -use sqlx::{sqlite::SqliteRow, FromRow, Row, SqlitePool}; - -const HASH_LEN: usize = 16; -const KEY_LEN: usize = 16; - -static SCHEMA: &'static str = include_str!("../../schema/schema.sql"); - -#[derive(thiserror::Error, Debug)] -pub enum Error { - #[error("failed to execute query: {0}")] - SqlError(#[from] sqlx::Error), - - #[error("invalid hash length")] - InvalidHash, - - #[error("invalid key length")] - InvalidKey, - - #[error("io error: {0}")] - IO(#[from] std::io::Error), -} - -pub type Ino = u64; -pub type Mode = u32; -pub type Result = std::result::Result; - -#[derive(Debug, Clone, Default)] -pub struct Inode { - pub ino: Ino, - pub parent: Ino, - pub name: String, - pub size: usize, - pub uid: u32, - pub gid: u32, - pub mode: Mode, - pub rdev: u32, - pub ctime: i64, - pub mtime: i64, - pub data: Option>, -} - -impl FromRow<'_, SqliteRow> for Inode { - fn from_row(row: &'_ SqliteRow) -> std::result::Result { - Ok(Self { - ino: row.get::("ino") as Ino, - parent: row.get::("parent") as Ino, - name: row.get("name"), - size: row.get::("size") as usize, - uid: row.get("uid"), - gid: row.get("uid"), - mode: row.get("mode"), - rdev: row.get("rdev"), - ctime: row.get("ctime"), - mtime: row.get("mtime"), - data: row.get("data"), - }) - } -} - -#[derive(Debug, Clone, Default)] -pub struct Block { - pub hash: [u8; HASH_LEN], - pub key: [u8; KEY_LEN], -} - -impl FromRow<'_, SqliteRow> for Block { - fn from_row(row: &'_ SqliteRow) -> std::result::Result { - let hash: &[u8] = row.get("hash"); - if hash.len() != HASH_LEN { - return Err(sqlx::Error::Decode(Box::new(Error::InvalidHash))); - } - - let key: &[u8] = row.get("key"); - - if hash.len() != KEY_LEN { - return Err(sqlx::Error::Decode(Box::new(Error::InvalidKey))); - } - - let mut block = Self::default(); - block.hash.copy_from_slice(hash); - block.key.copy_from_slice(key); - - Ok(block) - } -} - -#[derive(Debug, Clone, Default)] -pub struct Route { - pub start: u8, - pub end: u8, - pub url: String, -} - -impl FromRow<'_, SqliteRow> for Route { - fn from_row(row: &'_ SqliteRow) -> std::result::Result { - Ok(Self { - start: row.get("start"), - end: row.get("end"), - url: row.get("url"), - }) - } -} - -#[derive(Debug, Clone)] -pub enum Tag<'a> { - Version, - Description, - Author, - Custom(&'a str), -} - -impl<'a> Tag<'a> { - fn key(&self) -> &str { - match self { - Self::Version => "version", - Self::Description => "description", - Self::Author => "author", - Self::Custom(a) => a, - } - } -} - -#[derive(Clone)] -pub struct Reader { - pool: SqlitePool, -} - -impl Reader { - pub async fn new>(path: P) -> Result { - let con = format!("sqlite://{}", path.as_ref().to_str().unwrap()); - let pool = SqlitePool::connect(&con).await?; - - Ok(Self { pool }) - } - - pub async fn inode(&self, ino: Ino) -> Result { - let inode: Inode = sqlx::query_as(r#"select inode.*, extra.data - from inode left join extra on inode.ino = extra.ino - where inode.ino = ?;"#) - .bind(ino as i64).fetch_one(&self.pool).await?; - - Ok(inode) - } - - pub async fn children(&self, parent: Ino, limit: u32, offset: u32) -> Result> { - let results: Vec = sqlx::query_as( - r#"select inode.*, extra.data - from inode left join extra on inode.ino = extra.ino - where inode.parent = ? limit ? offset ?;"#, - ) - .bind(parent as i64) - .bind(limit) - .bind(offset) - .fetch_all(&self.pool) - .await?; - - Ok(results) - } - - pub async fn blocks(&self, ino: Ino) -> Result> { - let results: Vec = sqlx::query_as("select hash, key from block where ino = ?;") - .bind(ino as i64) - .fetch_all(&self.pool) - .await?; - - Ok(results) - } - - pub async fn tag(&self, tag: Tag<'_>) -> Result> { - let value: Option<(String,)> = sqlx::query_as("select value from tag where key = ?;") - .bind(tag.key()) - .fetch_optional(&self.pool) - .await?; - - Ok(value.map(|v| v.0)) - } - - pub async fn routes(&self) -> Result> { - let results: Vec = sqlx::query_as("select start, end, url from route;") - .fetch_all(&self.pool) - .await?; - - Ok(results) - } -} - -#[derive(Clone)] -pub struct Writer { - pool: SqlitePool, -} - -impl Writer { - /// create a new mkondo writer - pub async fn new>(path: P) -> Result { - tokio::fs::OpenOptions::default() - .write(true) - .truncate(true) - .create(true) - .open(&path) - .await?; - - let con = format!("sqlite://{}", path.as_ref().to_str().unwrap()); - let pool = SqlitePool::connect(&con).await?; - - sqlx::query(SCHEMA).execute(&pool).await?; - - Ok(Self { pool }) - } - - /// inode add an inode to the flist - pub async fn inode(&self, inode: Inode) -> Result { - let result = sqlx::query( - r#"insert into inode (parent, name, size, uid, gid, mode, rdev, ctime, mtime) - values (?, ?, ?, ?, ?, ?, ?, ?, ?);"#, - ) - .bind(inode.parent as i64) - .bind(inode.name) - .bind(inode.size as i64) - .bind(inode.uid) - .bind(inode.gid) - .bind(inode.mode) - .bind(inode.rdev) - .bind(inode.ctime) - .bind(inode.mtime) - .execute(&self.pool) - .await?; - - let ino = result.last_insert_rowid() as Ino; - if let Some(data) = &inode.data { - sqlx::query("insert into extra(ino, data) values (?, ?)") - .bind(ino as i64) - .bind(data) - .execute(&self.pool) - .await?; - } - - Ok(ino) - } - - pub async fn block(&self, ino: Ino, hash: &[u8; HASH_LEN], key: &[u8; KEY_LEN]) -> Result<()> { - sqlx::query("insert into block (ino, hash, key) values (?, ?, ?)") - .bind(ino as i64) - .bind(&hash[..]) - .bind(&key[..]) - .execute(&self.pool) - .await?; - Ok(()) - } - - pub async fn route>(&self, start: u8, end: u8, url: U) -> Result<()> { - sqlx::query("insert into route (start, end, url) values (?, ?, ?)") - .bind(start) - .bind(end) - .bind(url.as_ref()) - .execute(&self.pool) - .await?; - Ok(()) - } - - pub async fn tag>(&self, tag: Tag<'_>, value: V) -> Result<()> { - sqlx::query("insert into tag (key, value) values (?, ?);") - .bind(tag.key()) - .bind(value.as_ref()) - .execute(&self.pool) - .await?; - Ok(()) - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[tokio::test] - async fn test_inode() { - const PATH: &str = "/tmp/inode.fl"; - let meta = Writer::new(PATH).await.unwrap(); - - let ino = meta - .inode(Inode { - name: "/".into(), - data: Some("target".into()), - ..Inode::default() - }) - .await - .unwrap(); - - assert_eq!(ino, 1); - - let meta = Reader::new(PATH).await.unwrap(); - let inode = meta.inode(ino).await.unwrap(); - - assert_eq!(inode.name, "/"); - assert!(inode.data.is_some()); - assert_eq!(inode.data.unwrap().as_slice(), "target".as_bytes()); - } - - #[tokio::test] - async fn test_get_children() { - const PATH: &str = "/tmp/children.fl"; - let meta = Writer::new(PATH).await.unwrap(); - - let ino = meta - .inode(Inode { - name: "/".into(), - data: Some("target".into()), - ..Inode::default() - }) - .await - .unwrap(); - - for name in ["bin", "etc", "usr"] { - meta.inode(Inode { - parent: ino, - name: name.into(), - ..Inode::default() - }) - .await - .unwrap(); - } - let meta = Reader::new(PATH).await.unwrap(); - let children = meta.children(ino, 10, 0).await.unwrap(); - - assert_eq!(children.len(), 3); - assert_eq!(children[0].name, "bin"); - } - - #[tokio::test] - async fn test_get_block() { - const PATH: &str = "/tmp/block.fl"; - let meta = Writer::new(PATH).await.unwrap(); - let hash: [u8; HASH_LEN] = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]; - let key1: [u8; KEY_LEN] = [1; KEY_LEN]; - let key2: [u8; KEY_LEN] = [2; KEY_LEN]; - - meta.block(1, &hash, &key1).await.unwrap(); - meta.block(1, &hash, &key2).await.unwrap(); - - let meta = Reader::new(PATH).await.unwrap(); - - let blocks = meta.blocks(1).await.unwrap(); - assert_eq!(blocks.len(), 2); - assert_eq!(blocks[0].hash, hash); - assert_eq!(blocks[0].key, key1); - assert_eq!(blocks[1].key, key2); - } - - #[tokio::test] - async fn test_get_tag() { - const PATH: &str = "/tmp/tag.fl"; - let meta = Writer::new(PATH).await.unwrap(); - meta.tag(Tag::Version, "0.1").await.unwrap(); - meta.tag(Tag::Author, "azmy").await.unwrap(); - meta.tag(Tag::Custom("custom"), "value").await.unwrap(); - - let meta = Reader::new(PATH).await.unwrap(); - - assert!(matches!( - meta.tag(Tag::Version).await.unwrap().as_deref(), - Some("0.1") - )); - - assert!(matches!( - meta.tag(Tag::Custom("custom")).await.unwrap().as_deref(), - Some("value") - )); - - assert!(matches!( - meta.tag(Tag::Custom("unknown")).await.unwrap(), - None - )); - } - - #[tokio::test] - async fn test_get_routes() { - const PATH: &str = "/tmp/route.fl"; - let meta = Writer::new(PATH).await.unwrap(); - - meta.route(0, 128, "zdb://hub1.grid.tf").await.unwrap(); - meta.route(129, 255, "zdb://hub2.grid.tf").await.unwrap(); - - let meta = Reader::new(PATH).await.unwrap(); - - let routes = meta.routes().await.unwrap(); - assert_eq!(routes.len(), 2); - assert_eq!(routes[0].start, 0); - assert_eq!(routes[0].end, 128); - assert_eq!(routes[0].url, "zdb://hub1.grid.tf"); - } -} +pub use meta::{Reader, Writer}; diff --git a/src/lib.rs b/src/lib.rs index 5e71f34..b589cd4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,13 +8,10 @@ use anyhow::{Context, Result}; use std::fs; use std::path::Path; -pub mod schema_capnp { - include!(concat!(env!("OUT_DIR"), "/schema_capnp.rs")); -} - pub mod cache; pub mod fungi; -pub mod meta; +pub mod store; +/* use cache::Cache; use meta::{EntryKind, Metadata}; @@ -93,3 +90,4 @@ pub async fn extract>(meta: &Metadata, cache: &Cache, root: P) -> meta.walk(&mut visitor).await } +*/ diff --git a/src/main.rs b/src/main.rs index e0181fe..10b56d3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,4 @@ #[macro_use] -extern crate anyhow; -#[macro_use] -extern crate thiserror; -#[macro_use] extern crate log; use cache::{ConnectionInfo, IntoConnectionInfo}; use nix::sys::signal::{self, Signal}; @@ -14,7 +10,7 @@ use clap::{ArgAction, Parser}; mod cache; mod fs; -mod meta; +mod fungi; pub mod schema_capnp { include!(concat!(env!("OUT_DIR"), "/schema_capnp.rs")); } @@ -98,7 +94,7 @@ fn main() -> Result<()> { wait_child(target, pid_file); return Ok(()); } - daemonize::Outcome::Parent(Err(err)) => bail!("failed to daemonize: {}", err), + daemonize::Outcome::Parent(Err(err)) => anyhow::bail!("failed to daemonize: {}", err), _ => {} } } @@ -145,36 +141,31 @@ fn wait_child(target: String, mut pid_file: tempfile::NamedTempFile) { } async fn app(opts: Options) -> Result<()> { - let mgr = meta::Metadata::open(opts.meta) + let mgr = fungi::Reader::new(opts.meta) .await .context("failed to initialize metadata database")?; - let cache_info: ConnectionInfo = match mgr - .backend() - .await - .context("failed to get backend information")? - { - None => opts.storage_url.into_connection_info()?, - Some(backend) => backend.into_connection_info()?, - }; + // TODO: backend support with routers + let cache_info: ConnectionInfo = opts.storage_url.into_connection_info()?; info!("backend: {}", cache_info); let cache = cache::Cache::new(cache_info, opts.cache) .await .context("failed to initialize cache")?; - //print tags - match mgr.tags().await { - Ok(tags) => { - debug!("flist has {} tags", tags.len()); - for (k, v) in tags.iter() { - debug!("[tag][{}]: {}", k, v); - } - } - Err(err) => { - error!("failed to extract flist tags: {}", err); - } - } + //TODO: print tags + + // match mgr.tags().await { + // Ok(tags) => { + // debug!("flist has {} tags", tags.len()); + // for (k, v) in tags.iter() { + // debug!("[tag][{}]: {}", k, v); + // } + // } + // Err(err) => { + // error!("failed to extract flist tags: {}", err); + // } + // } let filesystem = fs::Filesystem::new(mgr, cache); filesystem.mount(opts.target).await diff --git a/src/meta/backend.rs b/src/meta/backend.rs deleted file mode 100644 index c23fa3c..0000000 --- a/src/meta/backend.rs +++ /dev/null @@ -1,37 +0,0 @@ -use crate::cache::{ConnectionInfo, IntoConnectionInfo}; -use bb8_redis::redis::{ - ConnectionAddr, ConnectionInfo as RedisConnectionInfo, - RedisConnectionInfo as InnerConnectionInfo, -}; -use serde::{Deserialize, Serialize}; - -/// Backend is the backend (storage) information -/// stored in the metadata (flist) -#[derive(Debug, Serialize, Deserialize)] -pub struct Backend { - host: String, - port: u16, - socket: Option, - namespace: Option, - password: Option, -} - -impl Backend { - pub fn load(data: &[u8]) -> Result { - serde_json::from_slice(data) - } -} - -impl IntoConnectionInfo for Backend { - fn into_connection_info(self) -> anyhow::Result { - let redis = RedisConnectionInfo { - addr: ConnectionAddr::Tcp(self.host, self.port), - redis: InnerConnectionInfo { - password: self.password, - ..Default::default() - }, - }; - - Ok(ConnectionInfo::new(redis, self.namespace)) - } -} diff --git a/src/meta/inode.rs b/src/meta/inode.rs deleted file mode 100644 index 564d440..0000000 --- a/src/meta/inode.rs +++ /dev/null @@ -1,109 +0,0 @@ -use std::fmt; - -#[derive(Debug, Clone, Copy, Eq)] -pub struct Mask(u8); - -impl std::cmp::PartialEq for Mask { - fn eq(&self, other: &Mask) -> bool { - self.0 == other.0 - } -} - -impl Mask { - /// get i node mask - pub fn from(max: u64) -> Mask { - let mut hold = max; - let mut width: u8 = 0; - while hold != 0 { - width += 1; - hold >>= 8; - } - // width is how many bytes can hold the max - // number of directories - Mask(width) - } - - /// split an inode into (dir, index) - pub fn split(&self, i: u64) -> (u64, u64) { - let index: u64 = i >> (self.0 * 8); - let shift = (std::mem::size_of::() - self.0 as usize) * 8; - let dir: u64 = (i << shift) >> shift; - - (dir, index) - } - - pub fn merge(&self, dir: u64, index: u64) -> u64 { - // to build an id of the dir+entry we now the width of the - // mask, right? so we can shift the index to the lift - // to make a free space at the right to the directory id - - index << (self.0 * 8) | dir - } -} - -#[derive(Debug, Clone, Copy, Eq)] -pub struct Inode(Mask, u64); - -impl fmt::Display for Inode { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "0x{:016x}", self.1) - } -} - -impl std::cmp::PartialEq for Inode { - fn eq(&self, other: &Inode) -> bool { - self.1 == other.1 - } -} - -impl Inode { - pub fn new(mask: Mask, ino: u64) -> Inode { - Inode(mask, ino) - } - - pub fn ino(&self) -> u64 { - self.1 - } - - /// split this value into (dir, index) - pub fn split(&self) -> (u64, u64) { - self.0.split(self.1) - } - - /// dir inode of this inode (parent). - /// Same value in case index part is 0 - pub fn dir(&self) -> Inode { - let (dir, _) = self.split(); - Inode::new(self.0, dir) - } - - /// gets the inode value of an entry under this inode directory - pub fn at(&self, index: u64) -> Inode { - let value = self.0.merge(self.dir().ino(), index); - Self::new(self.0, value) - } -} - -#[cfg(test)] -mod test { - use super::*; - #[test] - fn mask() { - let mask = Mask::from(0xff); - assert_eq!(mask.0, 1); - let inode = mask.merge(0xf1, 1000); - let (dir, index) = mask.split(inode); - assert_eq!(0xf1, dir); - assert_eq!(1000, index); - } - - #[test] - fn mask_big() { - let mask = Mask::from(0xffff); - assert_eq!(2, mask.0); - let inode = mask.merge(0xabcd, 0x1234); - let (dir, index) = mask.split(inode); - assert_eq!(0xabcd, dir); - assert_eq!(0x1234, index); - } -} diff --git a/src/meta/mod.rs b/src/meta/mod.rs deleted file mode 100644 index 7c3215d..0000000 --- a/src/meta/mod.rs +++ /dev/null @@ -1,278 +0,0 @@ -mod backend; -pub mod inode; -pub mod types; - -use anyhow::{Context, Result}; -pub use backend::Backend; -use flate2::read::GzDecoder; -use inode::Inode; -use sqlx::sqlite::SqlitePool; -pub use std::collections::HashMap; -use std::path::{Path, PathBuf}; -use std::sync::Arc; -use tar::Archive; -use tokio::sync::Mutex; -pub use types::{Entry, EntryKind}; - -/// 16 byte blake2b hash of the empty string -const ROOT_HASH: &str = "cae66941d9efbd404e4d88758ea67670"; - -#[derive(Error, Debug)] -pub enum MetaError { - #[error("error not found")] - EntryNotFound, -} - -#[allow(dead_code)] -pub enum Walk { - Skip, - Continue, -} - -#[async_trait::async_trait] -pub trait WalkVisitor: Send + Sync { - async fn visit + Send + Sync>(&mut self, path: P, entry: &Entry) - -> Result; -} - -#[derive(Clone, Debug)] -pub struct Metadata { - pool: SqlitePool, - mask: inode::Mask, - lru: Arc>>>, - acis: Arc>>, -} - -impl Metadata { - // new creates a new metadata given a .sqlite3 db file - async fn new>(p: P) -> Result { - let con = format!("sqlite://{}", p.as_ref().to_str().unwrap()); - let pool = SqlitePool::connect(&con) - .await - .context("failed to open metadata database")?; - - let (max,): (i64,) = sqlx::query_as("select max(rowid) from entries") - .fetch_one(&pool) - .await?; - - let mask = inode::Mask::from(max as u64); - let lru = Arc::new(Mutex::new(lru::LruCache::new(512))); - let acis = Arc::new(Mutex::new(lru::LruCache::new(10))); - Ok(Metadata { - pool, - mask, - lru, - acis, - }) - } - - pub async fn open>(p: P) -> Result { - let p = p.as_ref(); - if !p.exists() { - bail!("provided metadata path does not exist"); - } - - if p.is_dir() { - return Self::new(p.join("flistdb.sqlite3")).await; - } - - let ext = match p.extension() { - Some(ext) => ext, - None => std::ffi::OsStr::new(""), - }; - - if ext == "sqlite3" { - Self::new(p).await - } else { - // extract the flist - // create directory for extracted meta - let dir = p.with_file_name(format!("{}.d", p.file_name().unwrap().to_str().unwrap())); - std::fs::create_dir_all(&dir)?; - let tar_gz = std::fs::File::open(p)?; - let tar = GzDecoder::new(tar_gz); - let mut archive = Archive::new(tar); - archive.unpack(&dir)?; - - Self::new(dir.join("flistdb.sqlite3")).await - } - } - - pub async fn root(&self) -> Result> { - self.dir_by_key(ROOT_HASH).await - } - - #[async_recursion::async_recursion] - async fn walk_dir(&self, p: &Path, entry: &Entry, cb: &mut F) -> Result<()> - where - F: WalkVisitor, - { - if let Walk::Skip = cb.visit(p, entry).await? { - return Ok(()); - } - - let dir = match entry.kind { - EntryKind::Dir(ref dir) => dir, - _ => return Ok(()), - }; - - for entry in dir.entries.iter() { - let path = p.join(&entry.node.name); - match entry.kind { - EntryKind::SubDir(ref sub) => { - let dir = self.dir_by_key(&sub.key).await?; - self.walk_dir(path.as_path(), &dir, cb).await?; - } - _ => { - if let Walk::Skip = cb.visit(path.as_path(), entry).await? { - break; - } - } - }; - } - Ok(()) - } - - #[allow(dead_code)] - pub async fn walk(&self, cb: &mut F) -> Result<()> - where - F: WalkVisitor, - { - let root = self.root().await?; - let path: PathBuf = "/".into(); - self.walk_dir(path.as_path(), &root, cb).await - } - - fn inode(&self, ino: u64) -> Inode { - Inode::new(self.mask, ino) - } - - pub async fn entry(&self, i: u64) -> Result { - let inode = self.inode(i); - let (dir_ino, index) = inode.split(); - let dir = self.dir_by_inode(dir_ino).await?; - if index == 0 { - return Ok((*dir).clone()); - } - - let dir_kind = match &dir.kind { - EntryKind::Dir(dir) => dir, - _ => bail!("invalid directory kind"), - }; - - let index = index as usize - 1; - if index >= dir_kind.entries.len() { - bail!(MetaError::EntryNotFound); - } - - let entry = &dir_kind.entries[index]; - if let EntryKind::SubDir(ref sub) = entry.kind { - let dir = self.dir_by_key(&sub.key).await?; - // probably need to be cached - return Ok((*dir).clone()); - } - - Ok(entry.clone()) - } - - async fn dir_by_key>(&self, key: S) -> Result> { - let mut lru = self.lru.lock().await; - if let Some(dir) = lru.get(key.as_ref()) { - return Ok(dir.clone()); - } - - let (id, data): (i64, Vec) = - sqlx::query_as("select rowid, value from entries where key = ?") - .bind(key.as_ref()) - .fetch_one(&self.pool) - .await - .context("failed to find directory")?; - let id = id as u64; - - // that's only place where we create a directory - // so we can cache it in lru now. - let dir = types::Dir::from(key.as_ref(), inode::Inode::new(self.mask, id), data)?; - let dir = Arc::new(dir); - lru.put(key.as_ref().into(), dir.clone()); - - Ok(dir) - } - - async fn dir_by_inode(&self, ino: u64) -> Result> { - if ino == 1 { - return self.root().await; - } - - let (key,): (String,) = sqlx::query_as("select key from entries where rowid = ?") - .bind(ino as i64) - .fetch_one(&self.pool) - .await - .context("failed to find directory")?; - - self.dir_by_key(key).await - } - - #[cfg(feature = "build-binary")] - #[allow(dead_code)] // to avoid build warnings when building the binary with build-binary feature - pub(crate) async fn dir_inode>(&self, key: S) -> Result { - let (id,): (i64,) = sqlx::query_as("select rowid from entries where key = ?") - .bind(key.as_ref()) - .fetch_one(&self.pool) - .await?; - - Ok(Inode::new(self.mask, id as u64)) - } - - pub async fn aci>(&self, key: S) -> Result { - let mut acis = self.acis.lock().await; - if let Some(aci) = acis.get(key.as_ref()) { - return Ok(aci.clone()); - } - let (data,): (Vec,) = sqlx::query_as("select value from entries where key = ?") - .bind(key.as_ref()) - .fetch_one(&self.pool) - .await - .context("failed to find aci")?; - - // that's only place where we create a directory - // so we can cache it in lru now. - let aci = types::Aci::new(data)?; - acis.put(key.as_ref().into(), aci.clone()); - Ok(aci) - } - - pub async fn tags(&self) -> Result> { - let data: Vec<(String, String)> = match sqlx::query_as("select key, value from metadata;") - .fetch_all(&self.pool) - .await - { - Ok(data) => data, - Err(sqlx::Error::Database(_)) => Vec::default(), - Err(e) => return Err(anyhow!(e)), - }; - - Ok(data.into_iter().collect()) - } - - pub async fn tag>(&self, key: S) -> Result> { - let value: Option<(String,)> = - match sqlx::query_as("select value from metadata where key = ?;") - .bind(key.as_ref()) - .fetch_optional(&self.pool) - .await - { - Ok(data) => data, - Err(sqlx::Error::Database(_)) => None, - Err(err) => return Err(anyhow!(err)), - }; - - Ok(value.map(|v| v.0)) - } - - pub async fn backend(&self) -> Result> { - if let Some(data) = self.tag("backend").await? { - return Ok(Some(Backend::load(data.as_bytes())?)); - } - - Ok(None) - } -} diff --git a/src/meta/types.rs b/src/meta/types.rs deleted file mode 100644 index a9706dd..0000000 --- a/src/meta/types.rs +++ /dev/null @@ -1,218 +0,0 @@ -use super::inode::Inode; -use crate::schema_capnp; -use anyhow::Result; -use capnp::{message, serialize}; -use nix::unistd::{Group, User}; -use std::convert::TryInto; -use std::sync::Arc; - -#[derive(Debug, Clone)] -pub struct Node { - pub inode: Inode, - pub name: String, - pub size: u64, - pub acl: String, - pub modification: u32, - pub creation: u32, -} - -#[derive(Debug, Clone)] -pub struct SubDir { - pub key: String, -} - -#[derive(Debug, Clone)] -pub struct FileBlock { - pub hash: [u8; 16], - pub key: [u8; 16], -} - -#[derive(Debug, Clone)] -pub struct File { - pub block_size: u16, - pub blocks: Vec, -} - -#[derive(Debug, Clone)] -pub struct Link { - pub target: String, -} - -#[derive(Debug, Clone)] -pub struct Dir { - pub key: String, - pub parent: String, - // we use arch for shallow clone of directory - pub entries: Arc>, -} - -impl Dir { - pub fn from>(key: S, inode: Inode, data: Vec) -> Result { - let mut raw: &[u8] = data.as_ref(); - - let msg = serialize::read_message(&mut raw, message::ReaderOptions::default())?; - - let root = msg.get_root::()?; - let name: String = root.get_name()?.into(); - let parent: String = root.get_parent()?.into(); - let size = root.get_size(); - let modification = root.get_modification_time(); - let creation = root.get_creation_time(); - let entries = Dir::entries(inode, root)?; - - Ok(Entry { - node: Node { - inode, - name, - size, - acl: "".into(), - modification, - creation, - }, - kind: EntryKind::Dir(Dir { - parent, - key: key.as_ref().into(), - entries: Arc::new(entries), - }), - }) - } - - fn entries(ino: Inode, dir: schema_capnp::dir::Reader) -> Result> { - /* - This definitely needs refactoring - */ - use schema_capnp::inode::attributes::Which; - - let mut entries: Vec = vec![]; - let mut x = 0; - - for entry in dir.get_contents()? { - x += 1; - let inode = ino.at(x); - let node = Node { - inode, - //parent: inode, - name: String::from(entry.get_name()?), - size: entry.get_size(), - acl: String::from(entry.get_aclkey()?), - modification: entry.get_modification_time(), - creation: entry.get_creation_time(), - }; - - let attrs = entry.get_attributes(); - let kind = match attrs.which()? { - Which::Dir(d) => { - let key = String::from(d?.get_key()?); - EntryKind::SubDir(SubDir { key }) - } - Which::File(f) => { - let f = f?; - - EntryKind::File(File { - block_size: f.get_block_size(), - blocks: match f.get_blocks() { - Ok(blocks) => { - let mut result = vec![]; - for block in blocks { - result.push(FileBlock { - hash: block - .get_hash()? - .try_into() - .expect("block hash is 16 bytes"), - key: block - .get_key()? - .try_into() - .expect("block encryption key is 16 bytes"), - }); - } - result - } - Err(err) => return Err(anyhow!(err)), - }, - }) - } - Which::Link(l) => { - let l = l?; - EntryKind::Link(Link { - target: String::from(l.get_target()?), - }) - } - _ => EntryKind::Unknown, - }; - - if let EntryKind::Unknown = kind { - continue; - } - - entries.push(Entry { node, kind }); - } - - Ok(entries) - } -} - -#[derive(Debug, Clone)] -pub struct Entry { - pub node: Node, - pub kind: EntryKind, -} - -#[derive(Debug, Clone)] -pub enum EntryKind { - Unknown, - Dir(Dir), - SubDir(SubDir), - File(File), - Link(Link), -} - -#[derive(Clone)] -pub struct Aci { - pub user: u32, - pub group: u32, - pub mode: u32, -} - -impl Aci { - pub fn new(data: Vec) -> Result { - let mut raw: &[u8] = data.as_ref(); - let msg = serialize::read_message(&mut raw, message::ReaderOptions::default())?; - - let root = msg.get_root::()?; - let mut uid = root.get_uid(); - let mut gid = root.get_gid(); - let mode = root.get_mode(); - - if uid == -1 { - // backward compatibility with older flist - uid = if root.has_uname() { - let uname = root.get_uname().unwrap(); - match User::from_name(uname) { - Ok(Some(user)) => user.uid.as_raw() as i64, - _ => 1000, - } - } else { - 1000 - }; - } - - if gid == -1 { - // backward compatibility with older flist - gid = if root.has_gname() { - let gname = root.get_gname().unwrap(); - match Group::from_name(gname) { - Ok(Some(group)) => group.gid.as_raw() as i64, - _ => 1000, - } - } else { - 1000 - }; - } - - Ok(Aci { - user: uid as u32, - group: gid as u32, - mode: mode as u32, - }) - } -} diff --git a/src/store/mod.rs b/src/store/mod.rs new file mode 100644 index 0000000..fa5b750 --- /dev/null +++ b/src/store/mod.rs @@ -0,0 +1,67 @@ +mod router; +pub mod zdb; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("key not found")] + KeyNotFound, + #[error("invalid key")] + InvalidKey, + #[error("key is not routable")] + KeyNotRoutable, + #[error("store is not available")] + Unavailable, + + // TODO: better display for the Box> + #[error("multiple error: {0:?}")] + Multiple(Box>), + #[error("io error: {0}")] + IO(#[from] std::io::Error), + #[error("other: {0}")] + Other(#[from] anyhow::Error), +} + +pub type Result = std::result::Result; + +#[async_trait::async_trait] +pub trait Store: Send + Sync + 'static { + async fn get(&self, key: &[u8]) -> Result>; + async fn set(&self, key: &[u8], blob: &[u8]) -> Result<()>; +} + +pub type Router = router::Router>; + +#[async_trait::async_trait] +impl Store for Router { + async fn get(&self, key: &[u8]) -> Result> { + if key.len() == 0 { + return Err(Error::InvalidKey); + } + let mut errors = Vec::default(); + for store in self.route(key[0]) { + match store.get(key).await { + Ok(object) => return Ok(object), + Err(err) => errors.push(err), + }; + } + + if errors.is_empty() { + return Err(Error::KeyNotRoutable); + } + + // return aggregated errors + return Err(Error::Multiple(Box::new(errors))); + } + + async fn set(&self, key: &[u8], blob: &[u8]) -> Result<()> { + if key.len() == 0 { + return Err(Error::InvalidKey); + } + + for store in self.route(key[0]) { + store.set(key, blob).await?; + } + + Ok(()) + } +} diff --git a/src/store/router.rs b/src/store/router.rs new file mode 100644 index 0000000..760f626 --- /dev/null +++ b/src/store/router.rs @@ -0,0 +1,52 @@ +/// route implements a naive prefix router by going through the complete set of +/// available routers and find that ones that matches this given prefix +use std::ops::Range; + +#[derive(Default)] +pub struct Router { + routes: Vec<(Range, T)>, +} + +impl Router { + pub fn new() -> Self { + Self { + routes: Vec::default(), + } + } + + pub fn add(&mut self, start: u8, end: u8, route: T) { + self.routes.push((start..end, route)); + } + + pub fn route(&self, i: u8) -> impl Iterator { + self.routes + .iter() + .filter(move |f| f.0.contains(&i)) + .map(|v| &v.1) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test() { + let mut router = Router::default(); + + router.add(0, 255, "a"); + router.add(0, 255, "b"); + router.add(0, 128, "c"); + + let paths: Vec<&str> = router.route(200).map(|v| *v).collect(); + assert_eq!(paths.len(), 2); + assert_eq!(paths[0], "a"); + assert_eq!(paths[1], "b"); + + let paths: Vec<&str> = router.route(0).map(|v| *v).collect(); + assert_eq!(paths.len(), 3); + assert_eq!(paths[0], "a"); + assert_eq!(paths[1], "b"); + assert_eq!(paths[2], "c"); + } +} diff --git a/src/store/zdb.rs b/src/store/zdb.rs new file mode 100644 index 0000000..e69de29