-
Notifications
You must be signed in to change notification settings - Fork 219
Description
Describe the bug
Although the aws_sdk_sqs/queue_poller
module does indeed delete only at the end of the code block passed to it while polling (assuming :skip_delete
is false
), the use of next unless @match_regexp.match?(key)
short circuits the block and the delete action occurs.
My set up involves multiple Fluentd nodes pointing to one SQS queue. Fluentd stores events in S3 using the hostname as part of the path, and a regex to match the hostname is used to pull events back into Fluentd, because I want the node that originally processed the event to process it the second time for sending to OpenSearch.
To Reproduce
Using multiple Fluentd nodes, send events to S3 using the unique hostname as part of the path.
Ingest the events using the S3 input plugin and an appropriately configured SQS queue, using match_regexp
to match the hostname part of the path.
If an event processed by host A is picked up by host B, for example, the event won't be processed but will be deleted and will never make its way to your ultimate destination.
Expected behavior
The expected behavior would be that the event would not be deleted, but left on the queue for the appropriate host to process.
Your Environment
- Fluentd version: 1.5.13
- fluent-plugin-s3 version: 1.7.2
- aws-sdk-s3 version: 1.119.1
- aws-sdk-sqs version: 1.53.0
- Operating system:
NAME="Ubuntu"
VERSION="20.04.6 LTS (Focal Fossa)"
ID=ubuntu
ID_LIKE=debian
PRETTY_NAME="Ubuntu 20.04.6 LTS"
VERSION_ID="20.04"
HOME_URL="https://www.ubuntu.com/"
SUPPORT_URL="https://help.ubuntu.com/"
BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/"
PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy"
VERSION_CODENAME=focal
UBUNTU_CODENAME=focal
- Kernel version: 5.15.0-1031-aws
Your Configuration
## ingest from UDP 25001 on localhost
<source>
@type udp
port 25001
bind localhost
tag port_25001
<parse>
@type json
</parse>
</source>
## ingest from S3 via SQS
<source>
@type s3
tag from_s3
s3_bucket BUCKET
s3_region REGION
add_object_metadata true
store_as json
match_regexp /logs/HOSTNAME/.*
<sqs>
queue_name QUEUE_NAME
</sqs>
</source>
## send traffic from port 25001 to S3, storing it by using HOSTNAME as part of the key
<match port_25001>
@type s3
s3_bucket BUCKET
s3_region REGION
store_as json
path /logs/HOSTNAME/${tag}/%Y-%m-%d/%H/%M%S
s3_object_key_format %{path}-%{uuid_flush}.%{file_extension}
auto_create_bucket false
check_object false
check_bucket false
slow_flush_log_threshold 120s
utc true
<buffer tag,time>
@type file
timekey 5m
timekey_wait 1m
timekey_use_utc true
path /data/var/s3_buffer
flush_thread_count 16
chunk_limit_size 32M
queue_limit_length 16
retry_max_interval 30
retry_forever true
</buffer>
</match>
## send items coming from S3 to stdout
<match from_s3>
@type stdout
format pretty_json
</match>
Your Error Log
There is no error log. The events just don't show up in the output if another host grabs the message in the queue.
Additional context
I have a patch that works to prevent this issue. You all may prefer a more nuanced approach, but this works for me:
--- lib/fluent/plugin/in_s3_orig.rb 2023-03-29 18:46:58.772216442 +0000
+++ lib/fluent/plugin/in_s3.rb 2023-03-29 18:19:44.867117653 +0000
@@ -211,7 +211,7 @@
if @match_regexp
raw_key = get_raw_key(body)
key = CGI.unescape(raw_key)
- next unless @match_regexp.match?(key)
+ throw :skip_delete unless @match_regexp.match?(key)
end
process(body)
rescue => e
Metadata
Metadata
Assignees
Labels
Type
Projects
Status