Skip to content

Commit 42798da

Browse files
committed
QOL Improvements
1 parent 40bda16 commit 42798da

File tree

5 files changed

+165
-34
lines changed

5 files changed

+165
-34
lines changed

zio-sqs/src/main/scala/zio/sqs/SqsStream.scala

+17-9
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,26 @@ import zio.aws.core.GenericAwsError
1010

1111
object SqsStream {
1212

13+
/**
14+
* Creates a consumer stream that pulls messages from the SQS queue.
15+
* Note that if you set autoDelete to true, the message will be deleted from SQS before you can process the message (so if you fail to process the message, it will be deleted).
16+
*
17+
* @param queueUrl is the SQS queue URL (example: https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue)
18+
* @param settings are the settings for reading messages from the queue
19+
* @return a stream of messages from the queue
20+
*/
1321
def apply(
1422
queueUrl: String,
15-
settings: SqsStreamSettings = SqsStreamSettings()
23+
settings: SqsStreamSettings = SqsStreamSettings.default
1624
): ZStream[Sqs, Throwable, Message.ReadOnly] = {
1725

1826
val request = ReceiveMessageRequest(
1927
queueUrl = queueUrl,
20-
attributeNames = Some(settings.attributeNames),
21-
messageAttributeNames = Some(settings.messageAttributeNames.map(MessageAttributeName.apply(_))),
22-
maxNumberOfMessages = Some(settings.maxNumberOfMessages),
23-
visibilityTimeout = Some(settings.visibilityTimeout.getOrElse(30)),
24-
waitTimeSeconds = Some(settings.waitTimeSeconds.getOrElse(20))
28+
attributeNames = Option(settings.attributeNames).filter(_.nonEmpty),
29+
messageAttributeNames = Option(settings.messageAttributeNames.map(MessageAttributeName.apply(_))),
30+
maxNumberOfMessages = settings.maxNumberOfMessages,
31+
visibilityTimeout = settings.visibilityTimeout,
32+
waitTimeSeconds = settings.waitTimeSeconds
2533
)
2634

2735
ZStream
@@ -60,9 +68,9 @@ object SqsStream {
6068
)(process: Chunk[Message.ReadOnly] => Task[Unit]): RIO[Sqs, Unit] = {
6169
val request = ReceiveMessageRequest(
6270
queueUrl = queueUrl,
63-
attributeNames = Option(settings.attributeNames),
64-
messageAttributeNames = Option(settings.messageAttributeNames.map(MessageAttributeName.apply(_))),
65-
maxNumberOfMessages = Option(settings.maxNumberOfMessages),
71+
attributeNames = Option(settings.attributeNames).filter(_.nonEmpty),
72+
messageAttributeNames = Option(settings.messageAttributeNames.map(MessageAttributeName.apply(_))).filter(_.nonEmpty),
73+
maxNumberOfMessages = settings.maxNumberOfMessages,
6674
visibilityTimeout = settings.visibilityTimeout,
6775
waitTimeSeconds = settings.waitTimeSeconds
6876
)

zio-sqs/src/main/scala/zio/sqs/SqsStreamSettings.scala

+56-9
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,69 @@ package zio.sqs
33
import zio.aws.sqs.model._
44
import zio._
55

6-
case class SqsStreamSettings(
7-
attributeNames: List[QueueAttributeName] = Nil,
8-
maxNumberOfMessages: Int = 1,
9-
messageAttributeNames: List[String] = Nil,
10-
visibilityTimeout: Option[Int] = Some(30),
11-
waitTimeSeconds: Option[Int] = Some(20),
12-
autoDelete: Boolean = true,
13-
stopWhenQueueEmpty: Boolean = false
14-
)
6+
final case class SqsStreamSettings(
7+
attributeNames: List[QueueAttributeName],
8+
maxNumberOfMessages: Option[Int],
9+
messageAttributeNames: List[String],
10+
visibilityTimeout: Option[Int],
11+
waitTimeSeconds: Option[Int],
12+
autoDelete: Boolean,
13+
stopWhenQueueEmpty: Boolean
14+
) {
15+
def withAttributeName(attributeName: QueueAttributeName): SqsStreamSettings =
16+
copy(attributeNames = attributeName :: attributeNames)
17+
18+
def withAttributeNames(attributeNames: List[QueueAttributeName]): SqsStreamSettings =
19+
copy(attributeNames = attributeNames)
20+
21+
def withMaxNumberOfMessages(maxNumberOfMessages: Int): SqsStreamSettings =
22+
copy(maxNumberOfMessages = Some(maxNumberOfMessages))
23+
24+
def withMessageAttributeName(name: String): SqsStreamSettings =
25+
copy(messageAttributeNames = name :: messageAttributeNames)
26+
27+
def withMessageAttributeNames(names: List[String]): SqsStreamSettings =
28+
copy(messageAttributeNames = messageAttributeNames ::: names)
29+
30+
def withVisibilityTimeout(seconds: Int): SqsStreamSettings =
31+
copy(visibilityTimeout = Some(seconds))
32+
33+
def withWaitTimeSeconds(seconds: Int): SqsStreamSettings =
34+
copy(waitTimeSeconds = Some(seconds))
35+
36+
def withAutoDelete(autoDelete: Boolean): SqsStreamSettings =
37+
copy(autoDelete = autoDelete)
38+
39+
def withStopWhenQueueEmpty(stopWhenQueueEmpty: Boolean): SqsStreamSettings =
40+
copy(stopWhenQueueEmpty = stopWhenQueueEmpty)
41+
}
42+
object SqsStreamSettings {
43+
val default = SqsStreamSettings(
44+
attributeNames = Nil,
45+
maxNumberOfMessages = None,
46+
messageAttributeNames = Nil,
47+
visibilityTimeout = None,
48+
waitTimeSeconds = None,
49+
autoDelete = false,
50+
stopWhenQueueEmpty = false
51+
)
52+
}
1553

1654
final case class SqsMessageLifetimeExtensionSettings(
1755
automaticExtension: Boolean,
1856
maximumRetries: Int,
1957
overrideInitialDelay: Option[Duration],
2058
overrideRepeatSchedule: Option[Schedule[Any, Any, Any]]
2159
) {
60+
def withAutomaticExtension(automaticExtension: Boolean): SqsMessageLifetimeExtensionSettings =
61+
copy(automaticExtension = automaticExtension)
62+
63+
def withMaximumRetries(maximumRetries: Int): SqsMessageLifetimeExtensionSettings =
64+
copy(maximumRetries = maximumRetries)
65+
66+
def withOverrideInitialDelay(overrideInitialDelay: Duration): SqsMessageLifetimeExtensionSettings =
67+
copy(overrideInitialDelay = Some(overrideInitialDelay))
68+
2269
def schedule(settings: SqsStreamSettings): Schedule[Any, Any, Any] =
2370
overrideRepeatSchedule.getOrElse(
2471
Schedule.spaced(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package examples
2+
3+
import zio._
4+
import zio.sqs.producer._
5+
import zio.sqs.serialization.Serializer
6+
import zio.aws.netty.NettyHttpClient
7+
import zio.aws.sqs.Sqs
8+
import zio.aws.core.config._
9+
import software.amazon.awssdk.regions.Region
10+
import software.amazon.awssdk.auth.credentials._
11+
import zio.sqs._
12+
import zio.aws.sqs.model.Message
13+
14+
object AtLeastOnceExample extends ZIOAppDefault {
15+
16+
val producerLayer: RLayer[Sqs, Producer[String]] =
17+
ZLayer.scoped(
18+
Producer.make(
19+
queueUrl = "https://sqs.us-east-1.amazonaws.com/000/calq.fifo",
20+
serializer = Serializer.serializeString,
21+
settings = ProducerSettings(parallelism = 1)
22+
)
23+
)
24+
25+
val producerExample =
26+
ZIO.serviceWithZIO[Producer[String]] { producer =>
27+
producer.produceBatch(
28+
(200 to 300).map(i =>
29+
ProducerEvent(
30+
data = s"Message $i",
31+
attributes = Map.empty,
32+
groupId = Some(
33+
if (i % 2 == 0) "even"
34+
else "odd"
35+
),
36+
deduplicationId = None
37+
)
38+
)
39+
)
40+
}
41+
42+
val consumerExample =
43+
SqsStream.consumeChunkAtLeastOnce(
44+
queueUrl = "https://sqs.us-east-1.amazonaws.com/000/calq.fifo",
45+
settings = SqsStreamSettings.default.copy(
46+
maxNumberOfMessages = Option(10),
47+
visibilityTimeout = Some(5),
48+
waitTimeSeconds = Some(20)
49+
),
50+
extensionSettings = SqsMessageLifetimeExtensionSettings.default,
51+
consumerParallelism = 10
52+
) { (messages: Chunk[Message.ReadOnly]) =>
53+
ZIO.debug(messages.map(_.body.getOrElse(""))) *> ZIO.sleep(14.seconds)
54+
}
55+
56+
override val run: ZIO[Environment with ZIOAppArgs with Scope, Any, Any] =
57+
(producerExample *> consumerExample)
58+
.provide(
59+
producerLayer,
60+
Sqs.live,
61+
NettyHttpClient.default,
62+
AwsConfig.configured(),
63+
ZLayer.succeed(
64+
CommonAwsConfig(
65+
region = Option(Region.US_EAST_1),
66+
credentialsProvider = StaticCredentialsProvider.create(
67+
AwsBasicCredentials.create("key", "secret")
68+
),
69+
endpointOverride = None,
70+
commonClientConfig = None
71+
)
72+
)
73+
)
74+
75+
}

zio-sqs/src/test/scala/examples/TestApp.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ object TestApp extends zio.ZIOAppDefault {
3636
}
3737
_ <- SqsStream(
3838
queueUrl,
39-
SqsStreamSettings(stopWhenQueueEmpty = true, waitTimeSeconds = Some(3))
39+
SqsStreamSettings.default.withStopWhenQueueEmpty(true).withWaitTimeSeconds(3)
4040
).foreach(msg => ZIO.succeed(println(msg.body)))
4141
} yield ()
4242

zio-sqs/src/test/scala/zio/sqs/ZioSqsSpec.scala

+16-15
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ object ZioSqsSpec extends ZIOSpecDefault {
1717
override def spec =
1818
suite("ZioSqsSpec")(
1919
test("send messages") {
20-
val settings: SqsStreamSettings = SqsStreamSettings(stopWhenQueueEmpty = true)
20+
val settings: SqsStreamSettings = SqsStreamSettings.default.withStopWhenQueueEmpty(true)
2121

2222
for {
2323
messages <- gen.runHead.someOrFailException
@@ -27,7 +27,10 @@ object ZioSqsSpec extends ZIOSpecDefault {
2727
},
2828
test("delete messages manually") {
2929
val settings: SqsStreamSettings =
30-
SqsStreamSettings(stopWhenQueueEmpty = true, autoDelete = false, waitTimeSeconds = Some(1))
30+
SqsStreamSettings.default
31+
.withStopWhenQueueEmpty(true)
32+
.withAutoDelete(false)
33+
.withWaitTimeSeconds(1)
3134

3235
for {
3336
messages <- gen.runHead.someOrFailException
@@ -43,7 +46,7 @@ object ZioSqsSpec extends ZIOSpecDefault {
4346
} yield assert(list)(isEmpty)
4447
},
4548
test("delete messages automatically") {
46-
val settings: SqsStreamSettings = SqsStreamSettings(stopWhenQueueEmpty = true, waitTimeSeconds = Some(1))
49+
val settings: SqsStreamSettings = SqsStreamSettings.default.withStopWhenQueueEmpty(true).withWaitTimeSeconds(1)
4750

4851
for {
4952
messages <- gen.runHead.someOrFailException
@@ -58,12 +61,11 @@ object ZioSqsSpec extends ZIOSpecDefault {
5861
} yield assert(list)(isEmpty)
5962
},
6063
test("consumeChunkAtLeastOnce will not delete messages if there is an error encountered when processing") {
61-
val settings = SqsStreamSettings(
62-
stopWhenQueueEmpty = true,
63-
waitTimeSeconds = Some(1),
64-
visibilityTimeout = Some(2),
65-
maxNumberOfMessages = 10000
66-
)
64+
val settings = SqsStreamSettings.default
65+
.withStopWhenQueueEmpty(true)
66+
.withWaitTimeSeconds(1)
67+
.withVisibilityTimeout(2)
68+
.withMaxNumberOfMessages(10000)
6769

6870
val program =
6971
for {
@@ -87,12 +89,11 @@ object ZioSqsSpec extends ZIOSpecDefault {
8789
ZIO.scoped(program)
8890
} @@ TestAspect.withLiveClock,
8991
test("consumeChunkAtLeastOnce will automatically extend message lifetime and delete messages after successful processing") {
90-
val settings = SqsStreamSettings(
91-
stopWhenQueueEmpty = true,
92-
waitTimeSeconds = Some(1),
93-
visibilityTimeout = Some(2),
94-
maxNumberOfMessages = 10000
95-
)
92+
val settings = SqsStreamSettings.default
93+
.withStopWhenQueueEmpty(true)
94+
.withWaitTimeSeconds(1)
95+
.withVisibilityTimeout(2)
96+
.withMaxNumberOfMessages(10000)
9697

9798
val program = for {
9899
_ <- serverResource

0 commit comments

Comments
 (0)