Skip to content

Commit 5875bff

Browse files
authored
feat: reject events on retryable errors (#75)
* feat: reject events on retryable errors * update: inline retryable errors, reject failure for each write
1 parent 1a75c83 commit 5875bff

File tree

2 files changed

+23
-7
lines changed

2 files changed

+23
-7
lines changed

core/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournal.scala

+16-7
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
package akka.persistence.dynamodb.journal
66

77
import java.time.Instant
8+
import java.util.concurrent.CompletionException
89

9-
import scala.collection.immutable
1010
import scala.concurrent.ExecutionContext
1111
import scala.concurrent.Future
1212
import scala.util.Failure
@@ -41,6 +41,7 @@ import akka.serialization.SerializationExtension
4141
import akka.serialization.Serializers
4242
import akka.stream.scaladsl.Sink
4343
import com.typesafe.config.Config
44+
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException
4445

4546
/**
4647
* INTERNAL API
@@ -104,14 +105,14 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String)
104105

105106
// if there are pending writes when an actor restarts we must wait for
106107
// them to complete before we can read the highest sequence number or we will miss it
107-
private val writesInProgress = new java.util.HashMap[String, Future[Done]]()
108+
private val writesInProgress = new java.util.HashMap[String, Future[Seq[Try[Unit]]]]()
108109

109110
override def receivePluginInternal: Receive = { case WriteFinished(pid, f) =>
110111
writesInProgress.remove(pid, f)
111112
}
112113

113-
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
114-
def atomicWrite(atomicWrite: AtomicWrite): Future[Done] = {
114+
override def asyncWriteMessages(messages: Seq[AtomicWrite]): Future[Seq[Try[Unit]]] = {
115+
def atomicWrite(atomicWrite: AtomicWrite): Future[Seq[Try[Unit]]] = {
115116
val serialized: Try[Seq[SerializedJournalItem]] = Try {
116117
atomicWrite.payload.map { pr =>
117118
val (event, tags) = pr.payload match {
@@ -166,7 +167,15 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String)
166167
ps.publish(pr, serialized.writeTimestamp)
167168
}
168169
}
169-
Done
170+
Nil // successful writes
171+
}
172+
.recoverWith { case e: CompletionException =>
173+
e.getCause match {
174+
case error: ProvisionedThroughputExceededException => // reject retryable errors
175+
Future.successful(atomicWrite.payload.map(_ => Failure(error)))
176+
case error => // otherwise journal failure
177+
Future.failed(error)
178+
}
170179
}
171180

172181
case Failure(exc) =>
@@ -175,7 +184,7 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String)
175184
}
176185

177186
val persistenceId = messages.head.persistenceId
178-
val writeResult: Future[Done] =
187+
val writeResult: Future[Seq[Try[Unit]]] =
179188
if (messages.size == 1)
180189
atomicWrite(messages.head)
181190
else {
@@ -189,7 +198,7 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String)
189198
writeResult.onComplete { _ =>
190199
self ! WriteFinished(persistenceId, writeResult)
191200
}
192-
writeResult.map(_ => Nil)(ExecutionContexts.parasitic)
201+
writeResult
193202
}
194203

195204
override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = {

docs/src/main/paradox/journal.md

+7
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,13 @@ The events are serialized with @extref:[Akka Serialization](akka:serialization.h
6868
is stored in the `event_payload` column together with information about what serializer that was used in the
6969
`event_ser_id` and `event_ser_manifest` columns.
7070

71+
## Retryable errors
72+
73+
When persisting events, any DynamoDB errors that are considered retryable, such as when provisioned throughput capacity
74+
is exceeded, will cause events to be @extref:[rejected](akka:typed/persistence.html#journal-rejections) rather than
75+
marked as a journal failure. A supervision strategy for `EventRejectedException` failures can then be added to
76+
EventSourcedBehaviors, so that entities can be resumed on these retryable errors rather than stopped or restarted.
77+
7178
## Deletes
7279

7380
The journal supports deletes through hard deletes, which means that journal entries are actually deleted from the

0 commit comments

Comments
 (0)