Skip to content
1 change: 1 addition & 0 deletions include/fluent-bit/aws/flb_aws_compress.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#define FLB_AWS_COMPRESS_GZIP 1
#define FLB_AWS_COMPRESS_ARROW 2
#define FLB_AWS_COMPRESS_PARQUET 3
#define FLB_AWS_COMPRESS_ZSTD 4

/*
* Get compression type from compression keyword. The return value is used to identify
Expand Down
50 changes: 39 additions & 11 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,31 @@ static void remove_from_queue(struct upload_queue *entry);

static int blob_initialize_authorization_endpoint_upstream(struct flb_s3 *context);

static struct flb_aws_header content_encoding_header = {
.key = "Content-Encoding",
.key_len = 16,
.val = "gzip",
.val_len = 4,
};
static struct flb_aws_header *get_content_encoding_header(int compression_type)
{
static struct flb_aws_header gzip_header = {
.key = "Content-Encoding",
.key_len = 16,
.val = "gzip",
.val_len = 4,
};

static struct flb_aws_header zstd_header = {
.key = "Content-Encoding",
.key_len = 16,
.val = "zstd",
.val_len = 4,
};

switch (compression_type) {
case FLB_AWS_COMPRESS_GZIP:
return &gzip_header;
case FLB_AWS_COMPRESS_ZSTD:
return &zstd_header;
default:
return NULL;
}
}

static struct flb_aws_header content_type_header = {
.key = "Content-Type",
Expand Down Expand Up @@ -158,11 +177,12 @@ int create_headers(struct flb_s3 *ctx, char *body_md5,
int n = 0;
int headers_len = 0;
struct flb_aws_header *s3_headers = NULL;
struct flb_aws_header *encoding_header = NULL;

if (ctx->content_type != NULL) {
headers_len++;
}
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
if (ctx->compression == FLB_AWS_COMPRESS_GZIP || ctx->compression == FLB_AWS_COMPRESS_ZSTD) {
headers_len++;
}
if (ctx->canned_acl != NULL) {
Expand Down Expand Up @@ -192,8 +212,15 @@ int create_headers(struct flb_s3 *ctx, char *body_md5,
s3_headers[n].val_len = strlen(ctx->content_type);
n++;
}
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
s3_headers[n] = content_encoding_header;
if (ctx->compression == FLB_AWS_COMPRESS_GZIP || ctx->compression == FLB_AWS_COMPRESS_ZSTD) {
encoding_header = get_content_encoding_header(ctx->compression);

if (encoding_header == NULL) {
flb_errno();
flb_free(s3_headers);
return -1;
}
s3_headers[n] = *encoding_header;
n++;
}
if (ctx->canned_acl != NULL) {
Expand Down Expand Up @@ -1175,7 +1202,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
goto multipart;
}
else {
if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_GZIP) {
if ((ctx->use_put_object == FLB_FALSE && (ctx->compression == FLB_AWS_COMPRESS_GZIP || ctx->compression == FLB_AWS_COMPRESS_ZSTD))) {
flb_plg_info(ctx->ins, "Pre-compression upload_chunk_size= %zu, After compression, chunk is only %zu bytes, "
"the chunk was too small, using PutObject to upload", preCompress_size, body_size);
}
Expand Down Expand Up @@ -3998,10 +4025,11 @@ static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "compression", NULL,
0, FLB_FALSE, 0,
"Compression type for S3 objects. 'gzip', 'arrow' and 'parquet' are the supported values. "
"Compression type for S3 objects. 'gzip', 'arrow', 'parquet' and 'zstd' are the supported values. "
"'arrow' and 'parquet' are only available if Apache Arrow was enabled at compile time. "
"Defaults to no compression. "
"If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'."
"If 'zstd' is selected, the Content-Encoding HTTP Header will be set to 'zstd'."
},
{
FLB_CONFIG_MAP_STR, "content_type", NULL,
Expand Down
6 changes: 6 additions & 0 deletions src/aws/flb_aws_compress.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include <fluent-bit/aws/flb_aws_compress.h>
#include <fluent-bit/flb_gzip.h>
#include <fluent-bit/flb_zstd.h>

#include <stdint.h>

Expand All @@ -48,6 +49,11 @@ static const struct compression_option compression_options[] = {
"gzip",
&flb_gzip_compress
},
{
FLB_AWS_COMPRESS_ZSTD,
"zstd",
&flb_zstd_compress
},
#ifdef FLB_HAVE_ARROW
{
FLB_AWS_COMPRESS_ARROW,
Expand Down
46 changes: 46 additions & 0 deletions tests/internal/aws_compress.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_gzip.h>
#include <fluent-bit/flb_zstd.h>

#include <fluent-bit/aws/flb_aws_compress.h>
#include "flb_tests_internal.h"
Expand Down Expand Up @@ -35,6 +36,9 @@ static void flb_aws_compress_test_cases(struct flb_aws_test_case *cases);
static void flb_aws_compress_truncate_b64_test_cases__gzip_decode(
struct flb_aws_test_case *cases,
size_t max_out_len);
static void flb_aws_compress_truncate_b64_test_cases__zstd_decode(
struct flb_aws_test_case *cases,
size_t max_out_len);

/** ------ Test Cases ------ **/
void test_compression_gzip()
Expand All @@ -53,6 +57,22 @@ void test_compression_gzip()
flb_aws_compress_test_cases(cases);
}

void test_compression_zstd()
{
struct flb_aws_test_case cases[] =
{
{
"zstd",
"hello hello hello hello hello hello",
"KLUv/SAjZQAAMGhlbGxvIAEAuUsR",
0
},
{ 0 }
};

flb_aws_compress_test_cases(cases);
}

void test_b64_truncated_gzip()
{
struct flb_aws_test_case cases[] =
Expand All @@ -70,6 +90,22 @@ struct flb_aws_test_case cases[] =
41);
}

void test_b64_truncated_zstd()
{
struct flb_aws_test_case cases[] =
{
{
"zstd",
"hello hello hello hello hello hello",
"hello hello hello hello hello hello",
0 /* Expected ret */
},
{ 0 }
};

flb_aws_compress_truncate_b64_test_cases__zstd_decode(cases,41);
}

void test_b64_truncated_gzip_truncation()
{
struct flb_aws_test_case cases[] =
Expand Down Expand Up @@ -202,7 +238,9 @@ struct flb_aws_test_case cases[] =

TEST_LIST = {
{ "test_compression_gzip", test_compression_gzip },
{ "test_compression_zstd", test_compression_zstd },
{ "test_b64_truncated_gzip", test_b64_truncated_gzip },
{ "test_b64_truncated_zstd", test_b64_truncated_zstd },
{ "test_b64_truncated_gzip_truncation", test_b64_truncated_gzip_truncation },
{ "test_b64_truncated_gzip_truncation_buffer_too_small",
test_b64_truncated_gzip_truncation_buffer_too_small },
Expand Down Expand Up @@ -231,6 +269,14 @@ static void flb_aws_compress_truncate_b64_test_cases__gzip_decode(
cases, max_out_len, &flb_gzip_uncompress);
}

static void flb_aws_compress_truncate_b64_test_cases__zstd_decode(
struct flb_aws_test_case *cases,
size_t max_out_len)
{
flb_aws_compress_general_test_cases(FLB_AWS_COMPRESS_TEST_TYPE_B64_TRUNCATE,
cases, max_out_len, &flb_zstd_uncompress);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we needed to add type casting here:

diff --git a/tests/internal/aws_compress.c b/tests/internal/aws_compress.c
index 7bb9d24b0..f26536eed 100644
--- a/tests/internal/aws_compress.c
+++ b/tests/internal/aws_compress.c
@@ -274,7 +274,7 @@ static void flb_aws_compress_truncate_b64_test_cases__zstd_decode(
                                                         size_t max_out_len)
 {
    flb_aws_compress_general_test_cases(FLB_AWS_COMPRESS_TEST_TYPE_B64_TRUNCATE,
-                                      cases, max_out_len, &flb_zstd_uncompress);
+                                       cases, max_out_len, (int *)&flb_zstd_uncompress);
 }
 
 /* General test case loop flb_aws_compress */

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done @cosmo0920 . Sorry I left it out I think


/* General test case loop flb_aws_compress */
static void flb_aws_compress_general_test_cases(int test_type,
struct flb_aws_test_case *cases,
Expand Down
Loading