What's Changed
We have added a new high level API called SqsStream.consumeChunkAtLeastOnce
inspired by FS2 Kafka's consumeChunk API. This new API allows you to focus on processing a chunk of messages and we take of deleting the messages once you have processed them successfully. Failure to process messages will prevent the messages from being deleted from the SQS queue so you can receive it again (hence at-least-once). If your messages take long to process (i.e. past the visibility timeout), you can leverage the automatic extension feature (on by default) which will automatically submit visibility extension messages for the chunk of messages you are working on so that the messages won't reappear back on the queue for other consumers to process limiting the chance of multiple consumers working on the same message.
The new API looks lke this:
val consumerExample =
SqsStream.consumeChunkAtLeastOnce(
queueUrl = queueUrl,
settings = SqsStreamSettings.default
extensionSettings = SqsMessageLifetimeExtensionSettings.default
) { (messages: Chunk[Message.ReadOnly]) =>
ZIO.debug(s"working on $messages").unit
}
As long as your ZIO effect successfully completes, the API will take care of deleting the messages (using batching) and pulling the next set of messages to process.
We didn't forget about SqsStream
and performance improvements have been applied to preserve chunking. If you use autoDelete=true
(which is at-most-once delivery), we also perform batch deletes and reduce the number of API calls made.
We have also added SqsStream.deleteMessageBatchSink
to help facilitate at-least-once delivery semantics for users of SqsStream
so you can use SqsStream
with autoDelete=false
, process your messages as you see fit and then hook your stream to thedeleteMessageBatchSink
ZSink to delete messages from the queue.
Here is an example:
val consumerStreamExample =
SqsStream(queueUrl, SqsStreamSettings.default.withAutoDelete(false))
.tap(message => ZIO.debug(s"processing $message here")) // for illustration purposes only
.run(SqsStream.deleteMessageBatchSink(queueUrl))
We've also added extendMessageLifetime
and batch variants so that you can utilize these methods if message processing goes past the visibility timeout. For batch methods like deleteMessageBatch
and extendMessageLifetimeBatch
, we take care to perform retries since a batch request can come back with individual message failures that need to be retried. Note that if your extendMessageLifetimeBatch
is invoked too late (past the visibility timeout duration), you will get errors since you cannot keep the message invisible from the queue and you will be forced to reprocess the message. We make use of extendMessageLifetimeBatch
inside consumeChunkAtLeastOnce
Note: We've made minor breaking changes to SqsSettings
to adhere to the AWS defaults and not override them if you don't specify them. We've also removed the R
type parameter in Producer.make
since requirements are contravariant.
More information about testing the new changes against AWS SQS and AWS SQS FIFO can be found in #773
Full Changelog: v0.6.5...v0.7.0