Sometimes things can go wrong. Benthos supports a range of
processors such as http
and lambda
that have the potential to
fail if their retry attempts are exhausted. When this happens the data is not
dropped but instead continues through the pipeline mostly unchanged. The content
remains the same but a metadata flag is added to the message that can be
referred to later in the pipeline using the
processor_failed
condition.
This behaviour allows you to define in your config whether you would like the failed messages to be dropped, recovered with more processing, or routed to a dead-letter queue, or any combination thereof.
It's possible to define a list of processors which should be skipped for
messages that failed a previous stage using the try
processor:
- type: try
try:
- type: foo
- type: bar # Skipped if foo failed
- type: baz # Skipped if foo or bar failed
Failed messages can be fed into their own processor steps with a
catch
processor:
- catch:
- type: foo # Recover here
Once messages finish the catch block they will have their failure flags removed
and are treated like regular messages. If this behaviour is not desired then it
is possible to simulate a catch block with a conditional
processor placed within a for_each
processor:
- for_each:
- conditional:
condition:
type: processor_failed
processors:
- type: foo # Recover here
When an error occurs there will occasionally be useful information stored within
the error flag that can be exposed with the interpolation function
error
. This allows you to expose the
information with processors.
For example, when catching failed processors you can log
the messages:
- catch:
- log:
message: "Processing failed due to: ${!error}"
Or perhaps augment the message payload with the error message:
- catch:
- json:
operator: set
path: meta.error
value: ${!error}
It's possible to reattempt a processor for a particular message until it is
successful with a while
processor:
- for_each:
- while:
at_least_once: true
max_loops: 0 # Set this greater than zero to cap the number of attempts
condition:
type: processor_failed
processors:
- type: catch # Wipe any previous error
- type: foo # Attempt this processor until success
This loop will block the pipeline and prevent the blocking message from being acknowledged. It is therefore usually a good idea in practice to build your condition with an exit strategy after N failed attempts so that the pipeline can unblock itself without intervention.
In order to filter out any failed messages from your pipeline you can simply use
a filter_parts
processor:
- filter_parts:
not:
type: processor_failed
This will remove any failed messages from a batch.
It is possible to send failed messages to different destinations using either a
group_by
processor with a switch
output, or a
broker
output with filter_parts
processors.
pipeline:
processors:
- group_by:
- condition:
type: processor_failed
output:
switch:
outputs:
- output:
type: foo # Dead letter queue
condition:
type: processor_failed
- output:
type: bar # Everything else
Note that the group_by
processor is only necessary when messages
are batched.
Alternatively, using a broker
output looks like this:
output:
broker:
pattern: fan_out
outputs:
- type: foo # Dead letter queue
processors:
- filter_parts:
type: processor_failed
- type: bar # Everything else
processors:
- filter_parts:
not:
type: processor_failed