Skip to content

Commit

Permalink
Add GCSHybrid data storage
Browse files Browse the repository at this point in the history
- Adds support for uploading files to GCS
- Uses [object_store](https://docs.rs/object_store/latest/object_store/index.html) as an abstraction layer on top of GCS. (Also supports AWS S3, Azure Blob Storage, local files, memory so might be helpful with simplying code while adding support for more data stores)
- Works with concatenation/parallel uploads so supports big files
  • Loading branch information
reneklacan committed May 15, 2024
1 parent 977ed22 commit b503aa6
Show file tree
Hide file tree
Showing 8 changed files with 419 additions and 31 deletions.
207 changes: 182 additions & 25 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ log = "0.4.20"
mime = "0.3.17"
mime_guess = "2.0.4"
mobc = "0.8.3"
object_store = { version = "0.10.1", features = ["gcp"] }
redis = { version = "0.25.3", features = ["tokio-comp", "connection-manager"] }
rustc-hash = "1.1.0"
serde = { version = "1.0.192", features = ["derive"] }
Expand Down
18 changes: 18 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,24 @@ pub struct DataStorageConfig {
/// This parameter is required fo s3-based storages.
#[arg(long, env = "RUSTUS_S3_HEADERS")]
pub s3_headers: Option<String>,

/// Service account key for GCS.
///
/// This parameter is required for GCS-based storages.
#[arg(long, env = "RUSTUS_GCS_SERVICE_ACCOUNT_KEY")]
pub gcs_service_account_key: Option<String>,

/// Service account key path for GCS.
///
/// This parameter is used for GCS-based storages.
#[arg(long, env = "RUSTUS_GCS_SERVICE_ACCOUNT_KEY_PATH")]
pub gcs_service_account_key_path: Option<PathBuf>,

/// Service account key path for GCS.
///
/// This parameter is used for GCS-based storages.
#[arg(long, env = "RUSTUS_GCS_BUCKET")]
pub gcs_bucket: Option<String>,
}

#[derive(Parser, Clone, Debug)]
Expand Down
5 changes: 4 additions & 1 deletion src/data_storage/impls/file_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl Storage for FileStorage {
.open(path)
.await?;
let mut writer = tokio::io::BufWriter::new(file);
for part in parts_info {
for part in &parts_info {
if part.path.is_none() {
return Err(RustusError::FileNotFound);
}
Expand All @@ -151,6 +151,9 @@ impl Storage for FileStorage {
writer.get_ref().sync_data().await?;
}
writer.into_inner().shutdown().await?;
for part in parts_info {
tokio::fs::remove_file(part.path.unwrap()).await?;
}
Ok(())
}

Expand Down
178 changes: 178 additions & 0 deletions src/data_storage/impls/gcs_hybrid.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
use axum::response::{IntoResponse, Response};
use bytes::Bytes;
use object_store::{
gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder},
WriteMultipart,
};
use object_store::{path::Path, ObjectStore};
use std::path::PathBuf;
use tokio::io::AsyncReadExt;

use super::file_storage::FileStorage;
use crate::{
data_storage::base::Storage,
errors::{RustusError, RustusResult},
models::file_info::FileInfo,
utils::{dir_struct::substr_time, headers::HeaderMapExt, result::MonadLogger},
};

/// It handles uploads localy, and after the upload is
/// complete, it uploads file to GCS.
#[derive(Debug)]
pub struct GCSHybridStorage {
store: GoogleCloudStorage,
local_storage: FileStorage,
dir_struct: String,
}

const UPLOAD_BUFFER_SIZE: usize = 1024 * 1024 * 20; // 20 MB

impl GCSHybridStorage {
/// Create new `GCSHybridStorage` instance.
///
/// # Panics
///
/// Might panic if credentials are invalid and cannot be parsed.
/// Or if bucket instance cannot be created.
#[allow(clippy::too_many_arguments)]
#[must_use]
pub fn new(
service_account_key: String,
bucket_name: &str,
data_dir: PathBuf,
dir_struct: String,
force_fsync: bool,
) -> Self {
let store = GoogleCloudStorageBuilder::new()
.with_service_account_key(service_account_key)
.with_bucket_name(bucket_name)
.build()
.mlog_err("Cannot create GCS storage")
.unwrap();

let local_storage = FileStorage::new(data_dir, dir_struct.clone(), force_fsync);

Self {
store,
local_storage,
dir_struct,
}
}

/// Upload file to GCS.
///
/// This function is called to upload file to GCS completely.
/// It streams file directly from disk to GCS.
async fn upload_file(&self, file_info: &FileInfo) -> RustusResult<()> {
let file_path = match &file_info.path {
Some(path) => path.clone(),
None => return Err(RustusError::UnableToWrite("Cannot get upload path.".into())),
};

let key = self.get_gcs_key(file_info);
tracing::debug!(
"Starting uploading {} to GCS with key `{}`",
file_info.id,
key
);
let file = tokio::fs::File::open(file_path).await?;
let mut reader = tokio::io::BufReader::new(file);

let upload = self.store.put_multipart(&key).await.map_err(|_| {
RustusError::UnableToWrite("Failed to start upload of file to GCS.".into())
})?;
let mut write = WriteMultipart::new(upload);
let mut buffer = vec![0; UPLOAD_BUFFER_SIZE];

loop {
let bytes_read = reader.read(&mut buffer).await?;
if bytes_read == 0 {
break;
}
write.write(&buffer[..bytes_read]);
}

write
.finish()
.await
.map_err(|_| RustusError::UnableToWrite("Failed to upload file to GCS.".into()))?;

Ok(())
}

// Construct an GCS key which is used to upload files.
fn get_gcs_key(&self, file_info: &FileInfo) -> Path {
let base_path = substr_time(self.dir_struct.as_str(), file_info.created_at);
let trimmed_path = base_path.trim_end_matches(|c: char| c == '/');
Path::from(format!("{trimmed_path}/{}", file_info.id))
}
}

impl Storage for GCSHybridStorage {
fn get_name(&self) -> &'static str {
"gcs_hybrid"
}

async fn prepare(&mut self) -> RustusResult<()> {
Ok(())
}

async fn get_contents(&self, file_info: &FileInfo) -> RustusResult<Response> {
if file_info.length != Some(file_info.offset) {
tracing::debug!("File isn't uploaded. Returning from local storage.");
return self.local_storage.get_contents(file_info).await;
}
let stream = self
.store
.get(&self.get_gcs_key(file_info))
.await
.unwrap()
.into_stream();
let mut resp = axum::body::Body::from_stream(stream).into_response();
resp.headers_mut()
.generate_disposition(file_info.get_filename());
Ok(resp)
}

async fn add_bytes(&self, file_info: &FileInfo, bytes: Bytes) -> RustusResult<()> {
self.local_storage.add_bytes(file_info, bytes).await?;

if !file_info.is_partial {
self.upload_file(file_info).await?;
self.remove_file(file_info).await?;
}

Ok(())
}

async fn create_file(&self, file_info: &FileInfo) -> RustusResult<String> {
self.local_storage.create_file(file_info).await
}

async fn concat_files(
&self,
file_info: &FileInfo,
parts_info: Vec<FileInfo>,
) -> RustusResult<()> {
self.local_storage
.concat_files(file_info, parts_info)
.await?;
self.upload_file(file_info).await?;
self.local_storage.remove_file(file_info).await?;
Ok(())
}

async fn remove_file(&self, file_info: &FileInfo) -> RustusResult<()> {
if Some(file_info.offset) == file_info.length {
self.store
.delete(&self.get_gcs_key(file_info))
.await
.map_err(|_| {
RustusError::UnableToRemove("Failed to delete file from GCS.".into())
})?;
} else {
self.local_storage.remove_file(file_info).await?;
}
Ok(())
}
}
1 change: 1 addition & 0 deletions src/data_storage/impls/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod file_storage;
pub mod gcs_hybrid;
pub mod s3_hybrid;
38 changes: 34 additions & 4 deletions src/data_storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use std::{

use crate::{config::Config, errors::RustusResult, from_str, utils::result::MonadLogger};

use self::impls::{file_storage::FileStorage, s3_hybrid::S3HybridStorage};
use self::impls::{
file_storage::FileStorage, gcs_hybrid::GCSHybridStorage, s3_hybrid::S3HybridStorage,
};

pub mod base;
pub mod impls;
Expand All @@ -17,14 +19,17 @@ pub enum AvailableStorages {
File,
#[strum(serialize = "hybrid-s3")]
S3Hybrid,
#[strum(serialize = "hybrid-gcs")]
GCSHybrid,
}

from_str!(AvailableStorages, "storages");

#[derive(Clone, Debug)]
#[derive(Debug)]
pub enum DataStorageImpl {
File(FileStorage),
S3Hybrid(S3HybridStorage),
GCSHybrid(GCSHybridStorage),
}

impl DataStorageImpl {
Expand All @@ -34,8 +39,8 @@ impl DataStorageImpl {
///
/// # Panics
///
/// Might panic if one of required fields is not set for `S3Hybrid` storage,
/// and `S3Hybrid` is selected as data storage.
/// Might panic if one of required fields is not set for `S3Hybrid` or `GCSHybrid` storages,
/// when they are selected as data storage.
#[must_use]
pub fn new(config: &Config) -> Self {
let data_conf = config.data_storage_config.clone();
Expand Down Expand Up @@ -71,6 +76,24 @@ impl DataStorageImpl {
data_conf.force_fsync,
))
}
AvailableStorages::GCSHybrid => {
let service_account_key = from_string_or_path(
&data_conf.gcs_service_account_key,
&data_conf.gcs_service_account_key_path,
);
Self::GCSHybrid(GCSHybridStorage::new(
service_account_key,
data_conf
.gcs_bucket
.clone()
.mlog_err("GCS bucket")
.unwrap()
.as_str(),
data_conf.data_dir.clone(),
data_conf.dir_structure.clone(),
data_conf.force_fsync,
))
}
}
}
}
Expand All @@ -80,13 +103,15 @@ impl base::Storage for DataStorageImpl {
match self {
Self::File(file) => file.get_name(),
Self::S3Hybrid(s3) => s3.get_name(),
Self::GCSHybrid(gcs) => gcs.get_name(),
}
}

async fn prepare(&mut self) -> RustusResult<()> {
match self {
Self::File(file) => file.prepare().await,
Self::S3Hybrid(s3) => s3.prepare().await,
Self::GCSHybrid(gcs) => gcs.prepare().await,
}
}

Expand All @@ -97,6 +122,7 @@ impl base::Storage for DataStorageImpl {
match self {
Self::File(file) => file.get_contents(file_info).await,
Self::S3Hybrid(s3) => s3.get_contents(file_info).await,
Self::GCSHybrid(gcs) => gcs.get_contents(file_info).await,
}
}

Expand All @@ -108,6 +134,7 @@ impl base::Storage for DataStorageImpl {
match self {
Self::File(file) => file.add_bytes(file_info, bytes).await,
Self::S3Hybrid(s3) => s3.add_bytes(file_info, bytes).await,
Self::GCSHybrid(gcs) => gcs.add_bytes(file_info, bytes).await,
}
}

Expand All @@ -118,6 +145,7 @@ impl base::Storage for DataStorageImpl {
match self {
Self::File(file) => file.create_file(file_info).await,
Self::S3Hybrid(s3) => s3.create_file(file_info).await,
Self::GCSHybrid(gcs) => gcs.create_file(file_info).await,
}
}

Expand All @@ -129,6 +157,7 @@ impl base::Storage for DataStorageImpl {
match self {
Self::File(file) => file.concat_files(file_info, parts_info).await,
Self::S3Hybrid(s3) => s3.concat_files(file_info, parts_info).await,
Self::GCSHybrid(gcs) => gcs.concat_files(file_info, parts_info).await,
}
}

Expand All @@ -139,6 +168,7 @@ impl base::Storage for DataStorageImpl {
match self {
Self::File(file) => file.remove_file(file_info).await,
Self::S3Hybrid(s3) => s3.remove_file(file_info).await,
Self::GCSHybrid(gcs) => gcs.remove_file(file_info).await,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
notifiers::NotificationManager,
};

#[derive(Clone, Debug)]
#[derive(Debug)]
#[allow(clippy::module_name_repetitions)]
pub struct RustusState {
pub config: Config,
Expand Down

0 comments on commit b503aa6

Please sign in to comment.