Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

first cut: Cassandra flows with support for conditional writes #3165

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@ object CassandraFlow {
)
.asJava

def createConditional[T](
session: CassandraSession,
writeSettings: CassandraWriteSettings,
cqlStatement: String,
statementBinder: akka.japi.Function2[T, PreparedStatement, BoundStatement]
): Flow[T, ConditionalWriteResult[T], NotUsed] =
scaladsl.CassandraFlow
.createConditional(
writeSettings,
cqlStatement,
(t, preparedStatement) => statementBinder.apply(t, preparedStatement)
)(session.delegate)
.map(ConditionalWriteResultBuilder.fromEither)
.asJava

/**
* A flow writing to Cassandra for every stream element, passing context along.
* The element (to be persisted) and the context are emitted unchanged.
Expand All @@ -59,6 +74,22 @@ object CassandraFlow {
.asJava
}

def withContextConditional[T, Ctx](
session: CassandraSession,
writeSettings: CassandraWriteSettings,
cqlStatement: String,
statementBinder: akka.japi.Function2[T, PreparedStatement, BoundStatement]
): FlowWithContext[T, Ctx, ConditionalWriteResult[T], Ctx, NotUsed] =
scaladsl.CassandraFlow
.withContextConditional(
writeSettings,
cqlStatement,
(t, preparedStatement) => statementBinder.apply(t, preparedStatement)
)(session.delegate)
.map(ConditionalWriteResultBuilder.fromEither)
.asJava


/**
* Creates a flow that uses [[com.datastax.oss.driver.api.core.cql.BatchStatement]] and groups the
* elements internally into batches using the `writeSettings` and per `groupingKey`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import akka.Done
import akka.NotUsed
import akka.actor.{ActorSystem, ClassicActorSystemProvider}
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.event.LoggingAdapter
import akka.stream.alpakka.cassandra.CassandraServerMetaData
import akka.stream.alpakka.cassandra.{scaladsl, CqlSessionProvider}
Expand Down Expand Up @@ -157,6 +158,17 @@ final class CassandraSession(@InternalApi private[akka] val delegate: scaladsl.C
def executeWrite(stmt: String, bindValues: AnyRef*): CompletionStage[Done] =
delegate.executeWrite(stmt, bindValues: _*).toJava

def executeConditionalWrite(stmt: Statement[_]): CompletionStage[ConditionalWriteResult[Done]] =
delegate.executeConditionalWrite(stmt)
.map(ConditionalWriteResultBuilder.fromEither)(ExecutionContexts.parasitic)
.toJava

@varargs
def executeConditionalWrite(stmt: String, bindValues: AnyRef*): CompletionStage[ConditionalWriteResult[Done]] =
delegate.executeConditionalWrite(stmt, bindValues: _*)
.map(ConditionalWriteResultBuilder.fromEither)(ExecutionContexts.parasitic)
.toJava

/**
* Execute a select statement. First you must `prepare` the
* statement and bind its parameters.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.cassandra.javadsl

import com.datastax.oss.driver.api.core.cql.AsyncResultSet

import scala.util.Either

trait ConditionalWriteResult[T] {
def wasApplied: Boolean

@throws(classOf[NoSuchElementException])
def getContents: T

@throws(classOf[NoSuchElementException])
def getResultSet: AsyncResultSet
}

final class AppliedConditionalWriteResult[T](contents: T) extends ConditionalWriteResult[T] {
def wasApplied: Boolean = true

@throws(classOf[NoSuchElementException])
override def getContents: T = contents

@throws(classOf[NoSuchElementException])
override def getResultSet: AsyncResultSet =
throw new NoSuchElementException("No result set from Cassandra - conditional write was applied")
}

final class UnappliedConditionalWriteResult[T](resultSet: AsyncResultSet) extends ConditionalWriteResult[T] {
def wasApplied: Boolean = false

@throws(classOf[NoSuchElementException])
override def getContents: T =
throw new NoSuchElementException("Conditional write was not applied")

@throws(classOf[NoSuchElementException])
override def getResultSet: AsyncResultSet = resultSet
}

object ConditionalWriteResultBuilder {
def applied[T](contents: T): AppliedConditionalWriteResult[T] = new AppliedConditionalWriteResult(contents)
def unapplied[T](asyncResultSet: AsyncResultSet): UnappliedConditionalWriteResult[T] =
new UnappliedConditionalWriteResult(asyncResultSet)

def fromEither[T](either: Either[T, AsyncResultSet]): ConditionalWriteResult[T] =
either.fold(applied, unapplied)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ import akka.NotUsed
import akka.dispatch.ExecutionContexts
import akka.stream.alpakka.cassandra.CassandraWriteSettings
import akka.stream.scaladsl.{Flow, FlowWithContext}
import com.datastax.oss.driver.api.core.cql.{BatchStatement, BoundStatement, PreparedStatement}
import com.datastax.oss.driver.api.core.cql.{
AsyncResultSet,
BatchStatement,
BoundStatement,
PreparedStatement
}

import scala.jdk.CollectionConverters._
import scala.concurrent.Future
Expand Down Expand Up @@ -47,6 +52,25 @@ object CassandraFlow {
.mapMaterializedValue(_ => NotUsed)
}

def createConditional[T](
writeSettings: CassandraWriteSettings,
cqlStatement: String,
statementBinder: (T, PreparedStatement) => BoundStatement
)(implicit session: CassandraSession): Flow[T, Either[T, AsyncResultSet], NotUsed] = {
Flow
.lazyFutureFlow { () =>
val prepare = session.prepare(cqlStatement)
prepare.map { preparedStatement =>
Flow[T].mapAsync(writeSettings.parallelism) { element =>
session
.executeConditionalWrite(statementBinder(element, preparedStatement))
.map(_.left.map(_ => element))(ExecutionContexts.parasitic)
}
}(session.ec)
}
.mapMaterializedValue(_ => NotUsed)
}

/**
* A flow writing to Cassandra for every stream element, passing context along.
* The element (to be persisted) and the context are emitted unchanged.
Expand Down Expand Up @@ -80,6 +104,28 @@ object CassandraFlow {
}
}

def withContextConditional[T, Ctx](
writeSettings: CassandraWriteSettings,
cqlStatement: String,
statementBinder: (T, PreparedStatement) => BoundStatement
)(implicit session: CassandraSession): FlowWithContext[T, Ctx, Either[T, AsyncResultSet], Ctx, NotUsed] = {
FlowWithContext.fromTuples {
Flow
.lazyFutureFlow { () =>
val prepare = session.prepare(cqlStatement)
prepare.map { preparedStatement =>
Flow[(T, Ctx)].mapAsync(writeSettings.parallelism) {
case (element, ctx) =>
session
.executeConditionalWrite(statementBinder(element, preparedStatement))
.map(_.left.map(_ => element) -> ctx)(ExecutionContexts.parasitic)
}
}(session.ec)
}
.mapMaterializedValue(_ => NotUsed)
}
}

/**
* Creates a flow that uses [[com.datastax.oss.driver.api.core.cql.BatchStatement]] and groups the
* elements internally into batches using the `writeSettings` and per `groupingKey`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import scala.collection.immutable
import scala.compat.java8.FutureConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal
import scala.util.{Either, Left, Right}

/**
* Data Access Object for Cassandra. The statements are expressed in
Expand Down Expand Up @@ -181,6 +182,44 @@ final class CassandraSession(system: akka.actor.ActorSystem,
bind(stmt, bindValues).flatMap(b => executeWrite(b))
}

/**
* Execute one statement. First you must `prepare` the
* statement and bind its parameters.
*
* See <a href="https://docs.datastax.com/en/dse/6.7/cql/cql/cql_using/useInsertDataTOC.html">Inserting and updating data</a>.
*
* The configured write consistency level is used if a specific consistency
* level has not been set on the `Statement`.
*
* The returned `Future` is completed with a `Left(Done)` if the query was applied and with
* query-dependent results if the query succeeded but was not applied (e.g. for an
* INSERT ... IF NOT EXISTS query, the result would contain the already existing row)
*/
def executeConditionalWrite(stmt: Statement[_]): Future[Either[Done, AsyncResultSet]] = {
underlying().flatMap { cqlSession =>
cqlSession.executeAsync(stmt).toScala
.map { ars =>
if (ars.wasApplied) CassandraSession.LeftDone
else Right(ars)
}
}
}

/**
* Prepare, bind and execute one statement in one go.
*
* See <a href="https://docs.datastax.com/en/dse/6.7/cql/cql/cql_using/useInsertDataTOC.html">Inserting and updating data</a>.
*
* The configured write consistency level is used.
*
* The returned `Future` is completed with a `Left(Done)` if the query was applied and with
* query-dependent results if the query succeeded but was not applied (e.g. for an
* INSERT ... IF NOT EXISTS query, the result would contain the already existing row)
*/
def executeConditionalWrite(stmt: String, bindValues: AnyRef*): Future[Either[Done, AsyncResultSet]] = {
bind(stmt, bindValues).flatMap(b => executeConditionalWrite(b))
}

/**
* INTERNAL API
*/
Expand Down Expand Up @@ -312,5 +351,8 @@ final class CassandraSession(system: akka.actor.ActorSystem,
else ps.bind(bindValues: _*)
}
}
}

object CassandraSession {
val LeftDone = Left(Done)
}
Loading