Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GCSHybrid data storage #151

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 182 additions & 25 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ log = "0.4.20"
mime = "0.3.17"
mime_guess = "2.0.4"
mobc = "0.8.3"
object_store = { version = "0.10.1", features = ["gcp"] }
redis = { version = "0.25.3", features = ["tokio-comp", "connection-manager"] }
rustc-hash = "1.1.0"
serde = { version = "1.0.192", features = ["derive"] }
Expand Down
24 changes: 24 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,30 @@ pub struct DataStorageConfig {
/// This parameter is required fo s3-based storages.
#[arg(long, env = "RUSTUS_S3_HEADERS")]
pub s3_headers: Option<String>,

/// Service account key for GCS.
///
/// This parameter is required for GCS-based storages.
#[arg(long, env = "RUSTUS_GCS_SERVICE_ACCOUNT_KEY")]
pub gcs_service_account_key: Option<String>,

/// Service account key path for GCS.
///
/// This parameter is used for GCS-based storages.
#[arg(long, env = "RUSTUS_GCS_SERVICE_ACCOUNT_KEY_PATH")]
pub gcs_service_account_key_path: Option<PathBuf>,

/// Application credentials path for GCS.
///
/// This parameter is used for GCS-based storages.
#[arg(long, env = "RUSTUS_GCS_APPLICATION_CREDENTIALS_PATH")]
pub gcs_application_credentials_path: Option<String>,

/// Service account key path for GCS.
///
/// This parameter is used for GCS-based storages.
#[arg(long, env = "RUSTUS_GCS_BUCKET")]
pub gcs_bucket: Option<String>,
}

#[derive(Parser, Clone, Debug)]
Expand Down
5 changes: 4 additions & 1 deletion src/data_storage/impls/file_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl Storage for FileStorage {
.open(path)
.await?;
let mut writer = tokio::io::BufWriter::new(file);
for part in parts_info {
for part in &parts_info {
if part.path.is_none() {
return Err(RustusError::FileNotFound);
}
Expand All @@ -151,6 +151,9 @@ impl Storage for FileStorage {
writer.get_ref().sync_data().await?;
}
writer.into_inner().shutdown().await?;
for part in parts_info {
tokio::fs::remove_file(part.path.unwrap()).await?;
}
Ok(())
}

Expand Down
187 changes: 187 additions & 0 deletions src/data_storage/impls/gcs_hybrid.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
use axum::response::{IntoResponse, Response};
use bytes::Bytes;
use object_store::{
gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder},
WriteMultipart,
};
use object_store::{path::Path, ObjectStore};
use std::path::PathBuf;
use tokio::io::AsyncReadExt;

use super::file_storage::FileStorage;
use crate::{
data_storage::base::Storage,
errors::{RustusError, RustusResult},
models::file_info::FileInfo,
utils::{dir_struct::substr_time, headers::HeaderMapExt, result::MonadLogger},
};

/// It handles uploads localy, and after the upload is
/// complete, it uploads file to GCS.
#[derive(Debug)]
pub struct GCSHybridStorage {
store: GoogleCloudStorage,
local_storage: FileStorage,
dir_struct: String,
}

const UPLOAD_BUFFER_SIZE: usize = 1024 * 1024 * 20; // 20 MB

impl GCSHybridStorage {
/// Create new `GCSHybridStorage` instance.
///
/// # Panics
///
/// Might panic if credentials are invalid and cannot be parsed.
/// Or if bucket instance cannot be created.
#[allow(clippy::too_many_arguments)]
#[must_use]
pub fn new(
service_account_key: Option<String>,
application_credentials_path: Option<String>,
bucket_name: &str,
data_dir: PathBuf,
dir_struct: String,
force_fsync: bool,
) -> Self {
let mut store_builder = GoogleCloudStorageBuilder::new().with_bucket_name(bucket_name);

if let Some(path) = application_credentials_path {
store_builder = store_builder.with_application_credentials(path);
}

if let Some(key) = service_account_key {
store_builder = store_builder.with_service_account_key(key);
}

let store = store_builder
.build()
.mlog_err("Cannot create GCS storage")
.unwrap();

let local_storage = FileStorage::new(data_dir, dir_struct.clone(), force_fsync);

Self {
store,
local_storage,
dir_struct,
}
}

/// Upload file to GCS.
///
/// This function is called to upload file to GCS completely.
/// It streams file directly from disk to GCS.
async fn upload_file(&self, file_info: &FileInfo) -> RustusResult<()> {
let file_path = match &file_info.path {
Some(path) => path.clone(),
None => return Err(RustusError::UnableToWrite("Cannot get upload path.".into())),
};

let key = self.get_gcs_key(file_info);
tracing::debug!(
"Starting uploading {} to GCS with key `{}`",
file_info.id,
key
);
let file = tokio::fs::File::open(file_path).await?;
let mut reader = tokio::io::BufReader::new(file);

let upload = self.store.put_multipart(&key).await.map_err(|e| {
RustusError::UnableToWrite(format!("Failed to start upload of file to GCS: {e}"))
})?;
let mut write = WriteMultipart::new(upload);
let mut buffer = vec![0; UPLOAD_BUFFER_SIZE];

loop {
let bytes_read = reader.read(&mut buffer).await?;
if bytes_read == 0 {
break;
}
write.write(&buffer[..bytes_read]);
}

write
.finish()
.await
.map_err(|_| RustusError::UnableToWrite("Failed to upload file to GCS.".into()))?;

Ok(())
}

// Construct an GCS key which is used to upload files.
fn get_gcs_key(&self, file_info: &FileInfo) -> Path {
let base_path = substr_time(self.dir_struct.as_str(), file_info.created_at);
let trimmed_path = base_path.trim_end_matches(|c: char| c == '/');
Path::from(format!("{trimmed_path}/{}", file_info.id))
}
}

impl Storage for GCSHybridStorage {
fn get_name(&self) -> &'static str {
"gcs_hybrid"
}

async fn prepare(&mut self) -> RustusResult<()> {
self.local_storage.prepare().await
}

async fn get_contents(&self, file_info: &FileInfo) -> RustusResult<Response> {
if file_info.length != Some(file_info.offset) {
tracing::debug!("File isn't uploaded. Returning from local storage.");
return self.local_storage.get_contents(file_info).await;
}
let stream = self
.store
.get(&self.get_gcs_key(file_info))
.await
.unwrap()
.into_stream();
let mut resp = axum::body::Body::from_stream(stream).into_response();
resp.headers_mut()
.generate_disposition(file_info.get_filename());
Ok(resp)
}

async fn add_bytes(&self, file_info: &FileInfo, bytes: Bytes) -> RustusResult<()> {
self.local_storage.add_bytes(file_info, bytes).await?;

if !file_info.is_partial {
self.upload_file(file_info).await?;
self.remove_file(file_info).await?;
}

Ok(())
}

async fn create_file(&self, file_info: &FileInfo) -> RustusResult<String> {
self.local_storage.create_file(file_info).await
}

async fn concat_files(
&self,
file_info: &FileInfo,
parts_info: Vec<FileInfo>,
) -> RustusResult<()> {
self.local_storage
.concat_files(file_info, parts_info)
.await?;
self.upload_file(file_info).await?;
self.local_storage.remove_file(file_info).await?;
Ok(())
}

async fn remove_file(&self, file_info: &FileInfo) -> RustusResult<()> {
if Some(file_info.offset) == file_info.length {
self.store
.delete(&self.get_gcs_key(file_info))
.await
.map_err(|_| {
RustusError::UnableToRemove("Failed to delete file from GCS.".into())
})?;
} else {
self.local_storage.remove_file(file_info).await?;
}
Ok(())
}
}
1 change: 1 addition & 0 deletions src/data_storage/impls/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod file_storage;
pub mod gcs_hybrid;
pub mod s3_hybrid;
53 changes: 46 additions & 7 deletions src/data_storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use std::{

use crate::{config::Config, errors::RustusResult, from_str, utils::result::MonadLogger};

use self::impls::{file_storage::FileStorage, s3_hybrid::S3HybridStorage};
use self::impls::{
file_storage::FileStorage, gcs_hybrid::GCSHybridStorage, s3_hybrid::S3HybridStorage,
};

pub mod base;
pub mod impls;
Expand All @@ -17,14 +19,17 @@ pub enum AvailableStorages {
File,
#[strum(serialize = "hybrid-s3")]
S3Hybrid,
#[strum(serialize = "hybrid-gcs")]
GCSHybrid,
}

from_str!(AvailableStorages, "storages");

#[derive(Clone, Debug)]
#[derive(Debug)]
pub enum DataStorageImpl {
File(FileStorage),
S3Hybrid(S3HybridStorage),
GCSHybrid(GCSHybridStorage),
}

impl DataStorageImpl {
Expand All @@ -34,8 +39,8 @@ impl DataStorageImpl {
///
/// # Panics
///
/// Might panic if one of required fields is not set for `S3Hybrid` storage,
/// and `S3Hybrid` is selected as data storage.
/// Might panic if one of required fields is not set for `S3Hybrid` or `GCSHybrid` storages,
/// when they are selected as data storage.
#[must_use]
pub fn new(config: &Config) -> Self {
let data_conf = config.data_storage_config.clone();
Expand Down Expand Up @@ -71,6 +76,25 @@ impl DataStorageImpl {
data_conf.force_fsync,
))
}
AvailableStorages::GCSHybrid => {
let service_account_key = try_from_string_or_path(
&data_conf.gcs_service_account_key,
&data_conf.gcs_service_account_key_path,
);
Self::GCSHybrid(GCSHybridStorage::new(
service_account_key,
data_conf.gcs_application_credentials_path,
data_conf
.gcs_bucket
.clone()
.mlog_err("GCS bucket")
.unwrap()
.as_str(),
data_conf.data_dir.clone(),
data_conf.dir_structure.clone(),
data_conf.force_fsync,
))
}
}
}
}
Expand All @@ -80,13 +104,15 @@ impl base::Storage for DataStorageImpl {
match self {
Self::File(file) => file.get_name(),
Self::S3Hybrid(s3) => s3.get_name(),
Self::GCSHybrid(gcs) => gcs.get_name(),
}
}

async fn prepare(&mut self) -> RustusResult<()> {
match self {
Self::File(file) => file.prepare().await,
Self::S3Hybrid(s3) => s3.prepare().await,
Self::GCSHybrid(gcs) => gcs.prepare().await,
}
}

Expand All @@ -97,6 +123,7 @@ impl base::Storage for DataStorageImpl {
match self {
Self::File(file) => file.get_contents(file_info).await,
Self::S3Hybrid(s3) => s3.get_contents(file_info).await,
Self::GCSHybrid(gcs) => gcs.get_contents(file_info).await,
}
}

Expand All @@ -108,6 +135,7 @@ impl base::Storage for DataStorageImpl {
match self {
Self::File(file) => file.add_bytes(file_info, bytes).await,
Self::S3Hybrid(s3) => s3.add_bytes(file_info, bytes).await,
Self::GCSHybrid(gcs) => gcs.add_bytes(file_info, bytes).await,
}
}

Expand All @@ -118,6 +146,7 @@ impl base::Storage for DataStorageImpl {
match self {
Self::File(file) => file.create_file(file_info).await,
Self::S3Hybrid(s3) => s3.create_file(file_info).await,
Self::GCSHybrid(gcs) => gcs.create_file(file_info).await,
}
}

Expand All @@ -129,6 +158,7 @@ impl base::Storage for DataStorageImpl {
match self {
Self::File(file) => file.concat_files(file_info, parts_info).await,
Self::S3Hybrid(s3) => s3.concat_files(file_info, parts_info).await,
Self::GCSHybrid(gcs) => gcs.concat_files(file_info, parts_info).await,
}
}

Expand All @@ -139,21 +169,30 @@ impl base::Storage for DataStorageImpl {
match self {
Self::File(file) => file.remove_file(file_info).await,
Self::S3Hybrid(s3) => s3.remove_file(file_info).await,
Self::GCSHybrid(gcs) => gcs.remove_file(file_info).await,
}
}
}

fn from_string_or_path(variable: &Option<String>, path: &Option<PathBuf>) -> String {
match try_from_string_or_path(variable, path) {
Some(value) => value,
None => panic!("can't find {variable:?} or path {path:?}"),
}
}

fn try_from_string_or_path(variable: &Option<String>, path: &Option<PathBuf>) -> Option<String> {
if let Some(variable) = variable {
variable.to_string()
Some(variable.to_string())
} else if let Some(path) = path {
let file =
File::open(path).unwrap_or_else(|_| panic!("failed to open path {}", path.display()));
let mut contents = String::new();
BufReader::new(file)
.read_to_string(&mut contents)
.unwrap_or_else(|_| panic!("failed to read from path {}", path.display()));
contents
Some(contents)
} else {
panic!("can't find {variable:?} or path {path:?}")
None
}
}
Loading
Loading