Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aws s3 server_side_encryption configuration when upload object to s3 #5400

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
78ec724
add aws s3 sse
samoii Sep 5, 2024
183de31
Merge branch 'main' of github.com:samoii/quickwit into aws-s3-sse
samoii Sep 7, 2024
716d39b
Merge branch 'main' of github.com:samoii/quickwit into aws-s3-sse
samoii Sep 10, 2024
f28677f
Merge branch 'main' of github.com:samoii/quickwit into aws-s3-sse
samoii Sep 12, 2024
42045be
server_side_encryption use enum varialble
samoii Sep 12, 2024
38af88b
remove unused
samoii Sep 12, 2024
4aad3c9
Merge pull request #1 from samoii/fix-s3-sse
samoii Sep 12, 2024
b8e5b78
add kms key id variable
samoii Sep 13, 2024
4e39153
Merge branch 'main' of github.com:samoii/quickwit into fix-s3-sse
samoii Sep 14, 2024
6319147
add kms key id
samoii Sep 14, 2024
21d8add
Merge pull request #2 from samoii/fix-s3-sse
samoii Sep 14, 2024
e6f0c16
function apply sse
samoii Sep 14, 2024
5147505
add sse to multipart upload and edit shown log
samoii Sep 14, 2024
ad68a93
Merge branch 'main' of github.com:samoii/quickwit into fix-s3-sse
samoii Sep 20, 2024
2996aee
fix lint
samoii Sep 20, 2024
c5d50d2
fix lint
samoii Sep 20, 2024
202c6e9
fix lint
samoii Sep 20, 2024
5730b1e
fix clippy
samoii Sep 21, 2024
4243a5f
fix rustfmt
samoii Sep 21, 2024
d9c4b8b
Merge pull request #3 from samoii/fix-s3-sse
samoii Sep 21, 2024
6ef96ad
Merge branch 'main' of github.com:samoii/quickwit into aws-s3-sse
samoii Sep 29, 2024
6529325
Merge branch 'main' of github.com:samoii/quickwit into aws-s3-sse
samoii Oct 3, 2024
660db5a
Merge branch 'main' of github.com:samoii/quickwit into aws-s3-sse
samoii Oct 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ pub use crate::node_config::{
use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig};
pub use crate::storage_config::{
AzureStorageConfig, FileStorageConfig, GoogleCloudStorageConfig, RamStorageConfig,
S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, StorageConfigs,
S3ServerSideEncryption, S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig,
StorageConfigs,
};

/// Returns true if the ingest API v2 is enabled.
Expand Down
20 changes: 19 additions & 1 deletion quickwit/quickwit-config/src/storage_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,13 @@ impl fmt::Debug for AzureStorageConfig {
.finish()
}
}

#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum S3ServerSideEncryption {
Aes256,
AwsKms,
AwsKmsDsse,
}
#[derive(Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct S3StorageConfig {
Expand All @@ -334,6 +340,10 @@ pub struct S3StorageConfig {
pub disable_multi_object_delete: bool,
#[serde(default)]
pub disable_multipart_upload: bool,
#[serde(default)]
pub server_side_encryption: Option<S3ServerSideEncryption>,
#[serde(default)]
pub sse_kms_key_id: Option<String>,
}

impl S3StorageConfig {
Expand Down Expand Up @@ -384,6 +394,12 @@ impl S3StorageConfig {

impl fmt::Debug for S3StorageConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let display_sse_kms_key_id = match &self.server_side_encryption {
Some(S3ServerSideEncryption::AwsKms) | Some(S3ServerSideEncryption::AwsKmsDsse) => {
&self.sse_kms_key_id
}
_ => &None,
};
f.debug_struct("S3StorageConfig")
.field("access_key_id", &self.access_key_id)
.field(
Expand All @@ -397,6 +413,8 @@ impl fmt::Debug for S3StorageConfig {
"disable_multi_object_delete",
&self.disable_multi_object_delete,
)
.field("server_side_encryption", &self.server_side_encryption)
.field("sse_kms_key_id", &display_sse_kms_key_id)
.finish()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use aws_sdk_s3::operation::delete_objects::DeleteObjectsOutput;
use aws_sdk_s3::operation::get_object::{GetObjectError, GetObjectOutput};
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::types::builders::ObjectIdentifierBuilder;
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier};
use aws_sdk_s3::types::{
CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier, ServerSideEncryption,
};
use aws_sdk_s3::Client as S3Client;
use base64::prelude::{Engine, BASE64_STANDARD};
use futures::{stream, StreamExt};
Expand All @@ -43,7 +45,7 @@ use quickwit_aws::retry::{aws_retry, AwsRetryable};
use quickwit_common::retry::{Retry, RetryParams};
use quickwit_common::uri::Uri;
use quickwit_common::{chunk_range, into_u64_range};
use quickwit_config::S3StorageConfig;
use quickwit_config::{S3ServerSideEncryption, S3StorageConfig};
use regex::Regex;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader, ReadBuf};
use tokio::sync::Semaphore;
Expand Down Expand Up @@ -91,6 +93,8 @@ pub struct S3CompatibleObjectStorage {
retry_params: RetryParams,
disable_multi_object_delete: bool,
disable_multipart_upload: bool,
server_side_encryption: Option<S3ServerSideEncryption>,
sse_kms_key_id: Option<String>,
}

impl fmt::Debug for S3CompatibleObjectStorage {
Expand Down Expand Up @@ -177,6 +181,8 @@ impl S3CompatibleObjectStorage {
let retry_params = RetryParams::aggressive();
let disable_multi_object_delete = s3_storage_config.disable_multi_object_delete;
let disable_multipart_upload = s3_storage_config.disable_multipart_upload;
let server_side_encryption = s3_storage_config.server_side_encryption;
let sse_kms_key_id = s3_storage_config.sse_kms_key_id.clone();
Ok(Self {
s3_client,
uri: uri.clone(),
Expand All @@ -186,6 +192,8 @@ impl S3CompatibleObjectStorage {
retry_params,
disable_multi_object_delete,
disable_multipart_upload,
server_side_encryption,
sse_kms_key_id,
})
}

Expand All @@ -203,6 +211,8 @@ impl S3CompatibleObjectStorage {
retry_params: self.retry_params,
disable_multi_object_delete: self.disable_multi_object_delete,
disable_multipart_upload: self.disable_multipart_upload,
server_side_encryption: self.server_side_encryption,
sse_kms_key_id: self.sse_kms_key_id,
}
}

Expand Down Expand Up @@ -278,6 +288,26 @@ impl S3CompatibleObjectStorage {
.to_path_buf()
}

fn apply_server_side_encryption(
&self,
encryption: Option<S3ServerSideEncryption>,
kms_key_id: Option<String>,
) -> (Option<ServerSideEncryption>, Option<String>) {
let server_side_encryption = match encryption {
Some(S3ServerSideEncryption::Aes256) => Some(ServerSideEncryption::Aes256),
Some(S3ServerSideEncryption::AwsKms) => Some(ServerSideEncryption::AwsKms),
Some(S3ServerSideEncryption::AwsKmsDsse) => Some(ServerSideEncryption::AwsKmsDsse),
None => None,
};
let kms_key_id = match server_side_encryption {
Some(ServerSideEncryption::AwsKms) | Some(ServerSideEncryption::AwsKmsDsse) => {
kms_key_id
}
_ => None,
};
(server_side_encryption, kms_key_id)
}

async fn put_single_part_single_try<'a>(
&'a self,
bucket: &'a str,
Expand All @@ -289,21 +319,28 @@ impl S3CompatibleObjectStorage {
.byte_stream()
.await
.map_err(|io_error| Retry::Permanent(StorageError::from(io_error)))?;
self.s3_client
let mut put_object_request = self
.s3_client
.put_object()
.bucket(bucket)
.key(key)
.body(body)
.content_length(len as i64)
.send()
.await
.map_err(|sdk_error| {
if sdk_error.is_retryable() {
Retry::Transient(StorageError::from(sdk_error))
} else {
Retry::Permanent(StorageError::from(sdk_error))
}
})?;
.content_length(len as i64);
let (s3_sse, kms_key_id) = self
.apply_server_side_encryption(self.server_side_encryption, self.sse_kms_key_id.clone());
if let Some(encryption) = s3_sse {
put_object_request = put_object_request.server_side_encryption(encryption);
if let Some(kms_key_id) = kms_key_id {
put_object_request = put_object_request.ssekms_key_id(kms_key_id);
}
}
put_object_request.send().await.map_err(|sdk_error| {
if sdk_error.is_retryable() {
Retry::Transient(StorageError::from(sdk_error))
} else {
Retry::Permanent(StorageError::from(sdk_error))
}
})?;

crate::STORAGE_METRICS.object_storage_put_parts.inc();
crate::STORAGE_METRICS
Expand All @@ -330,12 +367,22 @@ impl S3CompatibleObjectStorage {

async fn create_multipart_upload(&self, key: &str) -> StorageResult<MultipartUploadId> {
let upload_id = aws_retry(&self.retry_params, || async {
self.s3_client
let mut create_multipart_req = self
.s3_client
.create_multipart_upload()
.bucket(self.bucket.clone())
.key(key)
.send()
.await
.key(key);
let (s3_sse, kms_key_id) = self.apply_server_side_encryption(
self.server_side_encryption,
self.sse_kms_key_id.clone(),
);
if let Some(encryption) = s3_sse {
create_multipart_req = create_multipart_req.server_side_encryption(encryption);
if let Some(kms_key_id) = kms_key_id {
create_multipart_req = create_multipart_req.ssekms_key_id(kms_key_id);
}
}
create_multipart_req.send().await
})
.await?
.upload_id
Expand Down Expand Up @@ -956,6 +1003,8 @@ mod tests {
retry_params: RetryParams::for_test(),
disable_multi_object_delete: false,
disable_multipart_upload: false,
server_side_encryption: None,
sse_kms_key_id: None,
};
assert_eq!(
s3_storage.relative_path("indexes/foo"),
Expand Down Expand Up @@ -1011,6 +1060,8 @@ mod tests {
retry_params: RetryParams::for_test(),
disable_multi_object_delete: true,
disable_multipart_upload: false,
server_side_encryption: None,
sse_kms_key_id: None,
};
let _ = s3_storage
.bulk_delete(&[Path::new("foo"), Path::new("bar")])
Expand Down Expand Up @@ -1052,6 +1103,8 @@ mod tests {
retry_params: RetryParams::for_test(),
disable_multi_object_delete: false,
disable_multipart_upload: false,
server_side_encryption: None,
sse_kms_key_id: None,
};
let _ = s3_storage
.bulk_delete(&[Path::new("foo"), Path::new("bar")])
Expand Down Expand Up @@ -1134,6 +1187,8 @@ mod tests {
retry_params: RetryParams::for_test(),
disable_multi_object_delete: false,
disable_multipart_upload: false,
server_side_encryption: None,
sse_kms_key_id: None,
};
let bulk_delete_error = s3_storage
.bulk_delete(&[
Expand Down Expand Up @@ -1227,6 +1282,8 @@ mod tests {
retry_params: RetryParams::for_test(),
disable_multi_object_delete: false,
disable_multipart_upload: false,
server_side_encryption: None,
sse_kms_key_id: None,
};
s3_storage
.put(Path::new("my-path"), Box::new(vec![1, 2, 3]))
Expand Down
Loading