Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#6279] feat (gvfs-fuse): Add gvfs-fuse integration tests for big files and open-file flag test cases #6280

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions clients/filesystem-fuse/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ doc-test:
unit-test: doc-test
cargo test --no-fail-fast --lib --all-features --workspace

test-it:
cargo test --test fuse_test

test-fuse-it:
@bash ./tests/bin/run_fuse_testers.sh test

Expand All @@ -71,5 +74,7 @@ test-s3:
test: doc-test
cargo test --no-fail-fast --all-targets --all-features --workspace

test-all: test test-s3 test-fuse-it

clean:
cargo clean
20 changes: 13 additions & 7 deletions clients/filesystem-fuse/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ pub trait FileWriter: Sync + Send {
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use libc::{O_APPEND, O_CREAT, O_RDONLY};
use libc::{O_CREAT, O_RDONLY, O_WRONLY};
use std::collections::HashMap;
use std::path::Component;

Expand Down Expand Up @@ -461,7 +461,11 @@ pub(crate) mod tests {

// Test create file
let file_handle = self
.test_create_file(parent_file_id, "file1.txt".as_ref())
.test_create_file(
parent_file_id,
"file1.txt".as_ref(),
(O_CREAT | O_WRONLY) as u32,
)
.await;

// Test write file
Expand Down Expand Up @@ -545,11 +549,13 @@ pub(crate) mod tests {
self.files.insert(file_stat.file_id, file_stat);
}

async fn test_create_file(&mut self, root_file_id: u64, name: &OsStr) -> FileHandle {
let file = self
.fs
.create_file(root_file_id, name, (O_CREAT | O_APPEND) as u32)
.await;
async fn test_create_file(
&mut self,
root_file_id: u64,
name: &OsStr,
flags: u32,
) -> FileHandle {
let file = self.fs.create_file(root_file_id, name, flags).await;
assert!(file.is_ok());
let file = file.unwrap();
assert!(file.handle_id > 0);
Expand Down
2 changes: 1 addition & 1 deletion clients/filesystem-fuse/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tokio::signal;

#[tokio::main]
async fn main() -> fuse3::Result<()> {
tracing_subscriber::fmt().init();
tracing_subscriber::fmt::init();

// todo need inmprove the args parsing
let args: Vec<String> = std::env::args().collect();
Expand Down
35 changes: 18 additions & 17 deletions clients/filesystem-fuse/src/memory_filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl PathFileSystem for MemoryFileSystem {
Ok(results)
}

async fn open_file(&self, path: &Path, _flags: OpenFileFlags) -> Result<OpenedFile> {
async fn open_file(&self, path: &Path, flags: OpenFileFlags) -> Result<OpenedFile> {
let file_stat = self.stat(path).await?;
let mut opened_file = OpenedFile::new(file_stat);
match opened_file.file_stat.kind {
Expand All @@ -105,8 +105,17 @@ impl PathFileSystem for MemoryFileSystem {
.unwrap()
.data
.clone();
opened_file.reader = Some(Box::new(MemoryFileReader { data: data.clone() }));
opened_file.writer = Some(Box::new(MemoryFileWriter { data: data }));
if flags.is_read() {
opened_file.reader = Some(Box::new(MemoryFileReader { data: data.clone() }));
}
if flags.is_write() || flags.is_append() || flags.is_truncate() {
opened_file.writer = Some(Box::new(MemoryFileWriter { data: data.clone() }));
}

if flags.is_truncate() {
let mut data = data.lock().unwrap();
data.clear();
}
Ok(opened_file)
}
_ => Err(Errno::from(libc::EBADF)),
Expand All @@ -117,27 +126,19 @@ impl PathFileSystem for MemoryFileSystem {
self.open_file(path, flags).await
}

async fn create_file(&self, path: &Path, _flags: OpenFileFlags) -> Result<OpenedFile> {
let mut file_map = self.file_map.write().unwrap();
if file_map.contains_key(path) {
async fn create_file(&self, path: &Path, flags: OpenFileFlags) -> Result<OpenedFile> {
if self.file_map.read().unwrap().contains_key(path) && flags.is_exclusive() {
return Err(Errno::from(libc::EEXIST));
}

let mut opened_file = OpenedFile::new(FileStat::new_file_filestat_with_path(path, 0));

let data = Arc::new(Mutex::new(Vec::new()));
file_map.insert(
opened_file.file_stat.path.clone(),
self.file_map.write().unwrap().insert(
path.to_path_buf(),
MemoryFile {
kind: RegularFile,
data: data.clone(),
data: Arc::new(Mutex::new(Vec::new())),
},
);

opened_file.reader = Some(Box::new(MemoryFileReader { data: data.clone() }));
opened_file.writer = Some(Box::new(MemoryFileWriter { data: data }));

Ok(opened_file)
self.open_file(path, flags).await
}

async fn create_dir(&self, path: &Path) -> Result<FileStat> {
Expand Down
129 changes: 116 additions & 13 deletions clients/filesystem-fuse/src/open_dal_filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use bytes::Bytes;
use fuse3::FileType::{Directory, RegularFile};
use fuse3::{Errno, FileType, Timestamp};
use log::error;
use opendal::{EntryMode, ErrorKind, Metadata, Operator};
use opendal::{Buffer, EntryMode, ErrorKind, Metadata, Operator};
use std::path::{Path, PathBuf};
use std::time::SystemTime;

Expand All @@ -37,6 +37,8 @@ pub(crate) struct OpenDalFileSystem {
impl OpenDalFileSystem {}

impl OpenDalFileSystem {
const WRITE_BUFFER_SIZE: usize = 5 * 1024 * 1024;

pub(crate) fn new(op: Operator, _config: &AppConfig, _fs_context: &FileSystemContext) -> Self {
Self { op: op }
}
Expand Down Expand Up @@ -120,14 +122,27 @@ impl PathFileSystem for OpenDalFileSystem {
.map_err(opendal_error_to_errno)?;
file.reader = Some(Box::new(FileReaderImpl { reader }));
}
if flags.is_write() || flags.is_create() || flags.is_append() || flags.is_truncate() {
if !flags.is_create() && flags.is_append() {
error!("The file system does not support open a exists file with the append mode");
return Err(Errno::from(libc::EBADF));
}

if flags.is_truncate() {
self.op
.write(&file_name, Buffer::new())
.await
.map_err(opendal_error_to_errno)?;
}

if flags.is_write() || flags.is_append() || flags.is_truncate() {
let writer = self
.op
.writer_with(&file_name)
.await
.map_err(opendal_error_to_errno)?;
file.writer = Some(Box::new(FileWriterImpl { writer }));
file.writer = Some(Box::new(FileWriterImpl::new(writer)));
}

Ok(file)
}

Expand All @@ -141,15 +156,17 @@ impl PathFileSystem for OpenDalFileSystem {

async fn create_file(&self, path: &Path, flags: OpenFileFlags) -> Result<OpenedFile> {
let file_name = path.to_string_lossy().to_string();
if flags.is_exclusive() {
let meta = self.op.stat(&file_name).await;
if meta.is_ok() {
return Err(Errno::from(libc::EEXIST));
}
}

let mut writer = self
.op
.writer_with(&file_name)
self.op
.write(&file_name, Buffer::new())
.await
.map_err(opendal_error_to_errno)?;

writer.close().await.map_err(opendal_error_to_errno)?;

let file = self.open_file(path, flags).await?;
Ok(file)
}
Expand Down Expand Up @@ -210,19 +227,44 @@ impl FileReader for FileReaderImpl {

struct FileWriterImpl {
writer: opendal::Writer,
buffer: Vec<u8>,
}

impl FileWriterImpl {
fn new(writer: opendal::Writer) -> Self {
Self {
writer,
buffer: Vec::with_capacity(OpenDalFileSystem::WRITE_BUFFER_SIZE + 4096),
}
}
}

#[async_trait]
impl FileWriter for FileWriterImpl {
async fn write(&mut self, _offset: u64, data: &[u8]) -> Result<u32> {
self.writer
.write(data.to_vec())
.await
.map_err(opendal_error_to_errno)?;
if self.buffer.len() > OpenDalFileSystem::WRITE_BUFFER_SIZE {
let mut new_buffer: Vec<u8> =
Vec::with_capacity(OpenDalFileSystem::WRITE_BUFFER_SIZE + 4096);
new_buffer.append(&mut self.buffer);

self.writer
.write(new_buffer)
.await
.map_err(opendal_error_to_errno)?;
}
self.buffer.extend(data);
Ok(data.len() as u32)
}

async fn close(&mut self) -> Result<()> {
if !self.buffer.is_empty() {
let mut new_buffer: Vec<u8> = vec![];
new_buffer.append(&mut self.buffer);
self.writer
.write(new_buffer)
.await
.map_err(opendal_error_to_errno)?;
}
self.writer.close().await.map_err(opendal_error_to_errno)?;
Ok(())
}
Expand Down Expand Up @@ -260,10 +302,12 @@ fn opendal_filemode_to_filetype(mode: EntryMode) -> FileType {
#[cfg(test)]
mod test {
use crate::config::AppConfig;
use crate::open_dal_filesystem::OpenDalFileSystem;
use crate::s3_filesystem::extract_s3_config;
use crate::s3_filesystem::tests::s3_test_config;
use crate::test_enable_with;
use crate::RUN_TEST_WITH_S3;
use bytes::Buf;
use opendal::layers::LoggingLayer;
use opendal::{services, Builder, Operator};

Expand Down Expand Up @@ -327,4 +371,63 @@ mod test {
}
}
}

#[tokio::test]
async fn s3_ut_test_s3_write() {
test_enable_with!(RUN_TEST_WITH_S3);
let config = s3_test_config();

let op = create_opendal(&config);
let path = "/s1/fileset1/gvfs_test/test_dir/test_file";
let mut writer = op.writer_with(path).await.unwrap();

let mut buffer: Vec<u8> = vec![];
let num_batch = 10 * 1024;
for i in 0..num_batch {
let data = vec![i as u8; num_batch];
buffer.extend(&data);

if buffer.len() > OpenDalFileSystem::WRITE_BUFFER_SIZE {
writer.write(buffer).await.unwrap();
buffer = vec![];
};
}

if !buffer.is_empty() {
writer.write(buffer).await.unwrap();
}
writer.close().await.unwrap();
}

#[tokio::test]
async fn s3_ut_test_s3_read() {
test_enable_with!(RUN_TEST_WITH_S3);
let config = s3_test_config();

let op = create_opendal(&config);
let path = "/s1/fileset1/test_dir/test_big_file";
let meta = op.stat(path).await;
if meta.is_err() {
println!("stat error: {:?}", meta.err());
return;
}
let reader = op.reader(path).await.unwrap();

let mut buffer = Vec::new();

let batch_size = 1024;
let mut start = 0;
let mut end = batch_size;
loop {
let buf = reader.read(start..end).await.unwrap();
if buf.is_empty() {
break;
}
buffer.extend_from_slice(buf.chunk());
start = end;
end += batch_size;
}

println!("Read {} bytes.", buffer.len());
}
}
11 changes: 10 additions & 1 deletion clients/filesystem-fuse/tests/bin/run_fuse_testers.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,22 @@ if [ "$1" == "test" ]; then
echo "Running tests..."
cd $CLIENT_FUSE_DIR
export RUN_TEST_WITH_FUSE=1
cargo test --test fuse_test fuse_it_
cargo test --test fuse_test fuse_it_ -- weak_consistency

elif [ "$1" == "start" ]; then
# Start the servers
echo "Starting servers..."
start_servers

elif [ "$1" == "restart" ]; then
# Stop the servers
echo "Stopping servers..."
stop_servers

# Start the servers
echo "Starting servers..."
start_servers

elif [ "$1" == "stop" ]; then
# Stop the servers
echo "Stopping servers..."
Expand Down
9 changes: 9 additions & 0 deletions clients/filesystem-fuse/tests/bin/run_s3fs_testers.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ elif [ "$1" == "start" ]; then
echo "Starting servers..."
start_servers

elif [ "$1" == "restart" ]; then
# Stop the servers
echo "Stopping servers..."
stop_servers

# Start the servers
echo "Starting servers..."
start_servers

elif [ "$1" == "stop" ]; then
# Stop the servers
echo "Stopping servers..."
Expand Down
Loading
Loading