From 19b3942d16ca9ed6c5b4109167e0930698185daf Mon Sep 17 00:00:00 2001 From: "t.gusev" Date: Mon, 27 Nov 2023 12:26:22 +0300 Subject: [PATCH 1/6] wip --- .../ergoplatform/dex/executor/amm/App.scala | 6 +- .../ergoplatform/ergo/domain/BoxAsset.scala | 5 ++ .../org/ergoplatform/ergo/domain/Output.scala | 40 +++++++---- .../ergoplatform/ergo/domain/SConstant.scala | 55 ++++++++++++-- .../org/ergoplatform/ergo/domain/sigma.scala | 72 +++++++++++++++++++ .../org/ergoplatform/dex/tracker/App.scala | 15 +++- .../dex/tracker/configs/ConfigBundle.scala | 3 +- .../dex/tracker/domain/Transaction.scala | 23 ++++++ .../tracker/processes/MempoolTracker.scala | 63 ++++++---------- .../tracker/streaming/KafkaMempoolEvent.scala | 18 +++++ .../dex/tracker/streaming/MempoolEvent.scala | 49 +++++++++++++ .../dex/tracker/streaming/package.scala | 9 +++ project/versions.scala | 2 +- 13 files changed, 290 insertions(+), 70 deletions(-) create mode 100644 modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/sigma.scala create mode 100644 modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/domain/Transaction.scala create mode 100644 modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/KafkaMempoolEvent.scala create mode 100644 modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/MempoolEvent.scala create mode 100644 modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/package.scala diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala index ba6c91f3..8d9b9ac6 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala @@ -78,9 +78,9 @@ object App extends EnvApp[AppContext] { Resource.eval(N2TV3.make[InitF, RunF](configs.exchange, configs.monetary, context)) implicit0(n2tInt: InterpreterV3[T2T_CFMM, RunF]) <- Resource.eval(T2TV3.make[InitF, RunF](configs.exchange, configs.monetary, context)) - implicit0(interpreter: CFMMInterpreter[CFMMType, RunF]) <-Resource.eval(CFMMInterpreter.make[InitF, RunF]) - implicit0(execution: Execution[RunF]) <- Resource.eval(Execution.make[InitF, RunF]) - executor <- Resource.eval(Executor.make[InitF, StreamF, RunF]) + implicit0(interpreter: CFMMInterpreter[CFMMType, RunF]) <- Resource.eval(CFMMInterpreter.make[InitF, RunF]) + implicit0(execution: Execution[RunF]) <- Resource.eval(Execution.make[InitF, RunF]) + executor <- Resource.eval(Executor.make[InitF, StreamF, RunF]) } yield List(executor.run, networkContextUpdater.run) -> ctx private def makeBackend( diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/BoxAsset.scala b/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/BoxAsset.scala index 2af183db..627243cf 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/BoxAsset.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/BoxAsset.scala @@ -4,11 +4,13 @@ import derevo.cats.show import derevo.circe.{decoder, encoder} import derevo.derive import doobie.util.Write +import org.ergoplatform.ErgoBox import org.ergoplatform.ergo.TokenId import shapeless.Lazy import tofu.logging.derivation.loggable import org.ergoplatform.ergo.services.explorer.models.{BoxAsset => ExplorerBoxAsset} import org.ergoplatform.ergo.services.node.models.{BoxAsset => NodeAsset} +import scorex.util.encode.Base16 @derive(show, encoder, decoder, loggable) final case class BoxAsset( @@ -19,6 +21,9 @@ final case class BoxAsset( object BoxAsset { implicit def write: Write[BoxAsset] = Lazy(implicitly[Write[BoxAsset]]).value + def fromErgo(id: ErgoBox.TokenId, amount: Long): BoxAsset = + BoxAsset(TokenId.fromStringUnsafe(Base16.encode(id)), amount) + def fromExplorer(a: ExplorerBoxAsset): BoxAsset = BoxAsset(a.tokenId, a.amount) diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/Output.scala b/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/Output.scala index 941766aa..bb1eb27f 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/Output.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/Output.scala @@ -4,14 +4,17 @@ import derevo.cats.show import derevo.circe.{decoder, encoder} import derevo.derive import org.ergoplatform.ErgoBox +import org.ergoplatform.ErgoBox.NonMandatoryRegisterId import org.ergoplatform.dex.domain.DexOperatorOutput +import org.ergoplatform.dex.protocol.ErgoTreeSerializer.default._ +import org.ergoplatform.ergo.domain.sigma.renderEvaluatedValue import org.ergoplatform.ergo.services.explorer.models.{Output => ExplorerOutput} import org.ergoplatform.ergo.services.node.models.{Output => NodeOutput} import org.ergoplatform.ergo.state.{Predicted, Traced} -import org.ergoplatform.ergo.{BoxId, SErgoTree, TokenId, TxId} -import scorex.crypto.authds.ADKey +import org.ergoplatform.ergo.{BoxId, SErgoTree, TxId} import scorex.util.ModifierId -import scorex.util.encode.Base16 +import sigmastate.SType +import sigmastate.Values.EvaluatedValue import tofu.logging.derivation.loggable @derive(show, encoder, decoder, loggable) @@ -27,6 +30,7 @@ final case class Output( ) object Output { + def predicted(output: Output, prevBoxId: BoxId): Traced[Predicted[DexOperatorOutput]] = Traced(Predicted(DexOperatorOutput(output)), prevBoxId) @@ -54,17 +58,25 @@ object Output { Map.empty // todo ) - def fromErgoBox(b: ErgoBox): Output = { - val bId = ADKey !@@ b.id + def fromErgoBox(box: ErgoBox, txId: ModifierId): Output = Output( - BoxId(Base16.encode(bId)), - TxId(ModifierId !@@ b.transactionId), - b.value, - b.index, - b.creationHeight, - SErgoTree.fromBytes(b.ergoTree.bytes), - b.additionalTokens.toMap.map { case (id, l) => BoxAsset(TokenId.fromBytes(id), l) }.toList, - Map.empty + BoxId.fromErgo(box.id), + TxId(ModifierId !@@ txId), + box.value, + box.index, + box.creationHeight, + serialize(box.ergoTree), + box.additionalTokens.toArray.toList.map { case (id, amount) => BoxAsset.fromErgo(id, amount) }, + parseRegisters(box.additionalRegisters) ) - } + + private def parseRegisters( + additionalRegisters: Map[NonMandatoryRegisterId, _ <: EvaluatedValue[_ <: SType]] + ): Map[RegisterId, SConstant] = + additionalRegisters.flatMap { case (k, v) => + for { + register <- RegisterId.withNameOption(s"R${k.number}") + sConstant <- renderEvaluatedValue(v).map { case (t, eval) => SConstant.fromRenderValue(t, eval) } + } yield (register, sConstant) + } } diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/SConstant.scala b/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/SConstant.scala index adb483ca..57ab429b 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/SConstant.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/SConstant.scala @@ -3,33 +3,49 @@ package org.ergoplatform.ergo.domain import derevo.cats.show import derevo.circe.encoder import derevo.derive -import io.circe.Decoder +import io.circe.{Decoder, Encoder, Json} import org.ergoplatform.common.HexString import org.ergoplatform.ergo.PubKey import org.ergoplatform.ergo.domain.SigmaType.SimpleKindSigmaType._ import org.ergoplatform.ergo.domain.SigmaType._ import tofu.logging.derivation.loggable +import io.circe.syntax._ @derive(show, encoder, loggable) sealed trait SConstant object SConstant { - @derive(show, encoder, loggable) + @derive(loggable, show) final case class IntConstant(value: Int) extends SConstant - @derive(show, encoder, loggable) + @derive(loggable, show) final case class LongConstant(value: Long) extends SConstant - @derive(show, encoder, loggable) + @derive(loggable, show) final case class ByteaConstant(value: HexString) extends SConstant - @derive(show, encoder, loggable) + @derive(loggable, show) final case class SigmaPropConstant(value: PubKey) extends SConstant - @derive(show, encoder, loggable) + @derive(loggable, show) final case class UnresolvedConstant(raw: String) extends SConstant + @derive(loggable, show) + final case class IntsConstant(value: List[Int]) extends SConstant + + implicit val encoder: Encoder[SConstant] = { c => + val (renderedValue, sigmaType: SigmaType) = c match { + case IntConstant(value) => value.toString -> SInt + case LongConstant(value) => value.toString -> SLong + case ByteaConstant(value) => value.value.value -> SCollection(SByte) + case IntsConstant(value) => "[" ++ value.mkString(",") ++ "]" -> SCollection(SInt) + case SigmaPropConstant(value) => value.value.value.value -> SSigmaProp + case UnresolvedConstant(raw) => raw -> SAny + } + Json.obj("renderedValue" -> Json.fromString(renderedValue), "sigmaType" -> sigmaType.asJson) + } + implicit val decoder: Decoder[SConstant] = { c => c.downField("renderedValue").as[String].flatMap { value => c.downField("sigmaType").as[SigmaType].map { @@ -37,8 +53,35 @@ object SConstant { case SLong => LongConstant(value.toLong) case SSigmaProp => SigmaPropConstant(PubKey.unsafeFromString(value)) case SCollection(SByte) => ByteaConstant(HexString.unsafeFromString(value)) + case SCollection(SInt) => parseSInt(value) case _ => UnresolvedConstant(value) } } } + + def fromRenderValue(sType: SigmaType, value: String): SConstant = + sType match { + case SInt => IntConstant(value.toInt) + case SLong => LongConstant(value.toLong) + case SSigmaProp => SigmaPropConstant(PubKey.unsafeFromString(value)) + case SCollection(SByte) => ByteaConstant(HexString.unsafeFromString(value)) + case SCollection(SInt) => parseSInt(value) + case _ => UnresolvedConstant(value) + } + + def parseSInt(value: String): IntsConstant = { + val split = value.split(",") + if (split.length == 1) { + val splitHeadTail = split.headOption.map(_.drop(1).dropRight(1)).getOrElse("") + if (splitHeadTail.isEmpty) IntsConstant(List.empty) + else IntsConstant(List(splitHeadTail).map(_.toInt)) + } else { + val splitHead = split.headOption.map(_.drop(1)).getOrElse("") + val splitTail = split.lastOption.map(_.dropRight(1)).getOrElse("") + val splitList = split.drop(1).dropRight(1).toList + val splitTotal = (splitHead :: splitList) :+ splitTail + IntsConstant(splitTotal.map(_.toInt)) + } + + } } diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/sigma.scala b/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/sigma.scala new file mode 100644 index 00000000..a7859b27 --- /dev/null +++ b/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/sigma.scala @@ -0,0 +1,72 @@ +package org.ergoplatform.ergo.domain + +import cats.Eval +import cats.data.OptionT +import cats.syntax.traverse._ +import scorex.util.encode.Base16 +import sigmastate.Values.{Constant, ConstantNode, EvaluatedValue, SigmaPropConstant} +import sigmastate._ +import sigmastate.basics.DLogProtocol.ProveDlogProp + +object sigma { + + @inline def renderEvaluatedValue[T <: SType](ev: EvaluatedValue[T]): Option[(SigmaType, String)] = { + def goRender[T0 <: SType](ev0: EvaluatedValue[T0]): OptionT[Eval, (SigmaType, String)] = + ev0.tpe match { + case SSigmaProp | SGroupElement => + ev0 match { + case SigmaPropConstant(ProveDlogProp(dlog)) => + OptionT.some(SigmaType.SimpleKindSigmaType.SSigmaProp -> Base16.encode(dlog.pkBytes)) + case ConstantNode(groupElem, SGroupElement) => + OptionT.some( + SigmaType.SimpleKindSigmaType.SGroupElement -> + Base16.encode(groupElem.asInstanceOf[SGroupElement.WrappedType].getEncoded.toArray) + ) + case _ => OptionT.none + } + case prim: SPrimType => + val typeTerm = prim.toString.replaceAll("\\$", "") + OptionT.fromOption[Eval](SigmaType.parse(typeTerm)).map(_ -> ev0.value.toString) + case tuple: STuple => + val typeTerm = tuple.toString.replaceAll("\\$", "") + OptionT.fromOption[Eval](SigmaType.parse(typeTerm)).flatMap { tp => + val untypedElems = ev0.value match { + case (a, b) => List(a, b) + case _ => ev0.value.asInstanceOf[tuple.WrappedType].toArray.toList + } + val elems = + untypedElems.zip(tuple.items).map { case (vl, tp) => + Constant[SType](vl.asInstanceOf[tp.WrappedType], tp) + } + elems.traverse(e => goRender(e).map(_._2)).map { xs => + tp -> ("[" + xs.mkString(",") + "]") + } + } + case SCollectionType(SByte) => + OptionT.some( + SigmaType.SCollection(SigmaType.SimpleKindSigmaType.SByte) -> + Base16.encode(ev0.value.asInstanceOf[SCollection[SByte.type]#WrappedType].toArray) + ) + case coll: SCollection[_] => + val typeTerm = coll.toString.replaceAll("\\$", "") + OptionT.fromOption[Eval](SigmaType.parse(typeTerm)).flatMap { tp => + val elems = ev0.value.asInstanceOf[coll.WrappedType].toArray.toList.map(Constant(_, coll.elemType)) + elems.traverse(e => goRender(e).map(_._2)).map { xs => + tp -> ("[" + xs.mkString(",") + "]") + } + } + case option: SOption[_] => + OptionT.fromOption[Eval](SigmaType.parse(option.toTermString)).flatMap { tp => + val elem = ev0.value.asInstanceOf[option.WrappedType].map(Constant(_, option.elemType)) + elem match { + case Some(value) => OptionT(Eval.defer(goRender(value).value)).map(r => tp -> r._2) + case None => OptionT.some(tp -> "null") + } + } + case _ => OptionT.none + } + + goRender(ev).value.value + } + +} diff --git a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/App.scala b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/App.scala index 84e03510..4d30fb1d 100644 --- a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/App.scala +++ b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/App.scala @@ -1,17 +1,20 @@ package org.ergoplatform.dex.tracker import cats.effect.{Blocker, Resource} +import fs2.kafka.RecordDeserializer import fs2.kafka.serde._ import org.ergoplatform.ErgoAddressEncoder import org.ergoplatform.common.EnvApp import org.ergoplatform.common.cache.{MakeRedisTransaction, Redis} -import org.ergoplatform.common.streaming.Producer +import org.ergoplatform.common.streaming.{Consumer, Delayed, MakeKafkaConsumer, Producer} +import org.ergoplatform.dex.configs.ConsumerConfig import org.ergoplatform.dex.domain.amm.{CFMMOrder, CFMMPool, OrderId, PoolId} import org.ergoplatform.dex.tracker.configs.ConfigBundle import org.ergoplatform.dex.tracker.handlers.{lift, CFMMOpsHandler, CFMMPoolsHandler, SettledCFMMPoolsHandler} import org.ergoplatform.dex.tracker.processes.LedgerTracker.TrackerMode import org.ergoplatform.dex.tracker.processes.{LedgerTracker, MempoolTracker} import org.ergoplatform.dex.tracker.repositories.TrackerCache +import org.ergoplatform.dex.tracker.streaming.{MempoolConsumer, MempoolEvent} import org.ergoplatform.dex.tracker.validation.amm.CFMMRules import org.ergoplatform.ergo.modules.{ErgoNetwork, LedgerStreaming, MempoolStreaming} import org.ergoplatform.ergo.services.explorer.ErgoExplorerStreaming @@ -54,12 +57,13 @@ object App extends EnvApp[ConfigBundle] { Producer.make[InitF, StreamF, RunF, OrderId, Unconfirmed[CFMMOrder.AnyOrder]](configs.producers.unconfirmedAmmOrders) implicit0(producer4: Producer[PoolId, Unconfirmed[CFMMPool], StreamF]) <- Producer.make[InitF, StreamF, RunF, PoolId, Unconfirmed[CFMMPool]](configs.producers.unconfirmedAmmPools) + implicit0(consumerMempool: MempoolConsumer[StreamF, RunF]) = + makeConsumer[String, Option[MempoolEvent]](configs.mempoolTxConsumer) implicit0(backend: SttpBackend[RunF, Fs2Streams[RunF]]) <- makeBackend(configs, blocker) implicit0(explorer: ErgoExplorerStreaming[StreamF, RunF]) = ErgoExplorerStreaming.make[StreamF, RunF] implicit0(node: ErgoNode[RunF]) <- Resource.eval(ErgoNode.make[InitF, RunF]) implicit0(network: ErgoNetwork[RunF]) = ErgoNetwork.make[RunF] implicit0(ledger: LedgerStreaming[StreamF]) = LedgerStreaming.make[StreamF, RunF] - implicit0(mempool: MempoolStreaming[StreamF]) <- Resource.eval(MempoolStreaming.make[InitF, StreamF, RunF]) implicit0(cfmmRules: CFMMRules[RunF]) = CFMMRules.make[RunF](configs.tokenId) confirmedAmmOrderHandler <- Resource.eval(CFMMOpsHandler.make[InitF, StreamF, RunF, Confirmed](configs.tokenId)) unconfirmedAmmOrderHandler <- Resource.eval(CFMMOpsHandler.make[InitF, StreamF, RunF, Unconfirmed](configs.tokenId)) @@ -68,7 +72,7 @@ object App extends EnvApp[ConfigBundle] { implicit0(redis: Redis.Plain[RunF]) <- Redis.make[InitF, RunF](configs.redis) implicit0(cache: TrackerCache[RunF]) <- Resource.eval(TrackerCache.make[InitF, RunF]) ledgerTracker <- Resource.eval(LedgerTracker.make[InitF, StreamF, RunF](TrackerMode.Live, lift(confirmedAmmOrderHandler), confirmedAmmPoolsHandler)) - mempoolTracker <- Resource.eval(MempoolTracker.make[InitF, StreamF, RunF](unconfirmedAmmOrderHandler, unconfirmedAmmPoolsHandler)) + mempoolTracker <- Resource.eval(MempoolTracker.make[InitF, StreamF, RunF](consumerMempool, unconfirmedAmmOrderHandler, unconfirmedAmmPoolsHandler)) } yield (ledgerTracker, mempoolTracker, configs) // format: on @@ -80,4 +84,9 @@ object App extends EnvApp[ConfigBundle] { .eval(wr.concurrentEffect) .flatMap(implicit ce => AsyncHttpClientFs2Backend.resource[RunF](blocker)) .mapK(wr.runContextK(configs)) + + private def makeConsumer[K: RecordDeserializer[RunF, *], V: RecordDeserializer[RunF, *]](conf: ConsumerConfig) = { + implicit val maker = MakeKafkaConsumer.make[InitF, RunF, K, V] + Consumer.make[StreamF, RunF, K, V](conf) + } } diff --git a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/configs/ConfigBundle.scala b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/configs/ConfigBundle.scala index dc1d0143..87b3a15d 100644 --- a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/configs/ConfigBundle.scala +++ b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/configs/ConfigBundle.scala @@ -22,7 +22,8 @@ final case class ConfigBundle( @promote mempoolTracking: MempoolTrackingConfig, @promote monetary: MonetaryConfig, redis: RedisConfig, - tokenId: TokenId + tokenId: TokenId, + mempoolTxConsumer: ConsumerConfig ) object ConfigBundle extends Context.Companion[ConfigBundle] with ConfigBundleCompanion[ConfigBundle] { diff --git a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/domain/Transaction.scala b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/domain/Transaction.scala new file mode 100644 index 00000000..7921ee3e --- /dev/null +++ b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/domain/Transaction.scala @@ -0,0 +1,23 @@ +package org.ergoplatform.dex.tracker.domain + +import cats.data.NonEmptyList +import derevo.circe.{decoder, encoder} +import derevo.derive +import org.ergoplatform.ErgoLikeTransaction +import org.ergoplatform.ergo.domain.Output +import org.ergoplatform.ergo.{BoxId, TxId} +import scorex.util.ModifierId +import tofu.logging.derivation.loggable + +@derive(encoder, decoder, loggable) +final case class Transaction(id: TxId, inputs: NonEmptyList[BoxId], outputs: NonEmptyList[Output]) + +object Transaction { + + def fromErgoLike(tx: ErgoLikeTransaction): Transaction = + Transaction( + TxId(ModifierId !@@ tx.id), + NonEmptyList.fromListUnsafe(tx.inputs.map(_.boxId).map(BoxId.fromErgo).toList), + NonEmptyList.fromListUnsafe(tx.outputs.toList.map(Output.fromErgoBox(_, tx.id))).sortBy(_.index) + ) +} diff --git a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/processes/MempoolTracker.scala b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/processes/MempoolTracker.scala index 74b4495e..4def3206 100644 --- a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/processes/MempoolTracker.scala +++ b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/processes/MempoolTracker.scala @@ -1,71 +1,50 @@ package org.ergoplatform.dex.tracker.processes import cats.effect.{Clock, Timer} +import cats.instances.list._ import cats.{Defer, FlatMap, Monad, MonoidK} -import org.ergoplatform.common.data.TemporalFilter -import org.ergoplatform.dex.tracker.configs.MempoolTrackingConfig import org.ergoplatform.dex.tracker.handlers.BoxHandler -import org.ergoplatform.ergo.domain.Output -import org.ergoplatform.ergo.modules.MempoolStreaming -import org.ergoplatform.ergo.services.explorer.models.Transaction -import org.ergoplatform.ergo.services.node.ErgoNode +import org.ergoplatform.dex.tracker.streaming.{MempoolConsumer, MempoolEvent} import tofu.Catches -import tofu.concurrent.MakeRef import tofu.logging.{Logging, Logs} import tofu.streams.{Evals, Pace, ParFlatten} -import tofu.syntax.embed._ +import tofu.syntax.handle._ import tofu.syntax.logging._ import tofu.syntax.monadic._ -import tofu.syntax.handle._ import tofu.syntax.streams.all.{eval, _} -import cats.syntax.traverse._ - -import scala.concurrent.duration._ /** Tracks UTxOs from mempool. */ final class MempoolTracker[ F[_]: Monad: Evals[*[_], G]: ParFlatten: Pace: Defer: MonoidK: Catches, G[_]: Monad: Logging: Timer -](conf: MempoolTrackingConfig, filter: TemporalFilter[G], handlers: List[BoxHandler[F]])(implicit - mempool: MempoolStreaming[F] -) extends UtxoTracker[F] { - - def run: F[Unit] = { - def sync: F[Unit] = - for { - output <- mempool.streamUnspentOutputs - known <- eval(filter.probe(output.boxId)) - (n, mx) <- eval(filter.inspect) - _ <- eval(debug"MempoolFilter{N=$n, MX=$mx}") - _ <- if (!known) - eval(debug"Scanning unconfirmed output $output") >> - emits(handlers.map(_(output.pure[F]))).parFlattenUnbounded - else unit[F] - } yield () +](handlers: List[BoxHandler[F]], consumer: MempoolConsumer[F, G]) + extends UtxoTracker[F] { - sync.repeat - .throttled(conf.samplingInterval) + def run: F[Unit] = + consumer.stream + .evalMap { mempoolEvent => + mempoolEvent.message match { + case Some(MempoolEvent.MempoolApply(transaction)) => + eval(info"Scanning unconfirmed tx ${transaction.id}") >> + emits(transaction.outputs.map { out => + eval(debug"Scanning unconfirmed output ${out.boxId}") >> + emits(handlers.map(_(out.pure[F]))).parFlattenUnbounded + }).parFlattenUnbounded + case _ => eval(mempoolEvent.commit) + } + } .handleWith[Throwable](e => eval(warnCause"Mempool Tracker failed, restarting .." (e)) >> run) - } } object MempoolTracker { def make[ I[_]: FlatMap, - F[_]: Monad: Evals[*[_], G]: ParFlatten: Pace: Defer: MonoidK: MempoolTrackingConfig.Has: Catches, + F[_]: Monad: Evals[*[_], G]: ParFlatten: Pace: Defer: MonoidK: Catches, G[_]: Monad: Clock: Timer - ](handlers: BoxHandler[F]*)(implicit - mempool: MempoolStreaming[F], - logs: Logs[I, G], - makeRef: MakeRef[I, G] - ): I[UtxoTracker[F]] = + ](consumer: MempoolConsumer[F, G], handlers: BoxHandler[F]*)(implicit logs: Logs[I, G]): I[UtxoTracker[F]] = for { implicit0(l: Logging[G]) <- logs.forService[MempoolTracker[F, G]] - filter <- TemporalFilter.make[I, G](30.minutes, 12) - tracker = MempoolTrackingConfig.access - .map(conf => new MempoolTracker[F, G](conf, filter, handlers.toList): UtxoTracker[F]) - .embed - } yield tracker + } yield new MempoolTracker[F, G](handlers.toList, consumer): UtxoTracker[F] } diff --git a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/KafkaMempoolEvent.scala b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/KafkaMempoolEvent.scala new file mode 100644 index 00000000..51d6433a --- /dev/null +++ b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/KafkaMempoolEvent.scala @@ -0,0 +1,18 @@ +package org.ergoplatform.dex.tracker.streaming + +import derevo.circe.{decoder, encoder} +import derevo.derive + +@derive(encoder, decoder) +sealed trait KafkaMempoolEvent { + val tx: String +} + +object KafkaMempoolEvent { + + @derive(encoder, decoder) + final case class TxAccepted(tx: String) extends KafkaMempoolEvent + + @derive(encoder, decoder) + final case class TxWithdrawn(tx: String) extends KafkaMempoolEvent +} diff --git a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/MempoolEvent.scala b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/MempoolEvent.scala new file mode 100644 index 00000000..83c6e25d --- /dev/null +++ b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/MempoolEvent.scala @@ -0,0 +1,49 @@ +package org.ergoplatform.dex.tracker.streaming + +import cats.effect.Sync +import derevo.circe.decoder +import derevo.derive +import fs2.kafka.{Deserializer, RecordDeserializer} +import io.circe.parser.decode +import org.ergoplatform.ErgoLikeTransactionSerializer +import scorex.util.encode.Base64 +import cats.syntax.either._ +import org.ergoplatform.dex.tracker.domain.Transaction + +import scala.util.Try + +@derive(decoder) +sealed trait MempoolEvent { + val transaction: Transaction +} + +object MempoolEvent { + + @derive(decoder) + final case class MempoolApply(transaction: Transaction) extends MempoolEvent + + @derive(decoder) + final case class MempoolUnapply(transaction: Transaction) extends MempoolEvent + + implicit def mempoolEventDeserializer[F[_]: Sync]: RecordDeserializer[F, Option[MempoolEvent]] = + RecordDeserializer.lift(Deserializer.string.attempt.map { str => + str + .flatMap(decode[KafkaMempoolEvent](_)) + .toOption + .flatMap(fromKafkaEvent) + }) + + private def fromKafkaEvent(event: KafkaMempoolEvent): Option[MempoolEvent] = + Base64 + .decode(event.tx) + .flatMap { b => + Try(ErgoLikeTransactionSerializer.fromBytes(b)) + } + .toOption + .map { tx => + event match { + case KafkaMempoolEvent.TxAccepted(_) => MempoolApply(Transaction.fromErgoLike(tx)) + case KafkaMempoolEvent.TxWithdrawn(_) => MempoolUnapply(Transaction.fromErgoLike(tx)) + } + } +} \ No newline at end of file diff --git a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/package.scala b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/package.scala new file mode 100644 index 00000000..5e0b2e0b --- /dev/null +++ b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/package.scala @@ -0,0 +1,9 @@ +package org.ergoplatform.dex.tracker + +import fs2.kafka.types.KafkaOffset +import org.ergoplatform.common.streaming.Consumer + +package object streaming { + + type MempoolConsumer[S[_], F[_]] = Consumer.Aux[String, Option[MempoolEvent], KafkaOffset, S, F] +} diff --git a/project/versions.scala b/project/versions.scala index e22b3cb4..ff490e82 100644 --- a/project/versions.scala +++ b/project/versions.scala @@ -37,7 +37,7 @@ object versions { val LogbackVersion = "1.2.3" val Slf4jVersion = "1.7.25" - val PureConfigVersion = "0.14.1" + val PureConfigVersion = "0.14.0" val NewtypeVersion = "0.4.3" val RefinedVersion = "0.9.10" From c1b68170cdd9e16c08cec8525fd229b66439c359 Mon Sep 17 00:00:00 2001 From: "t.gusev" Date: Mon, 27 Nov 2023 13:21:57 +0300 Subject: [PATCH 2/6] compilation fixed --- .../main/scala/org/ergoplatform/ergo/domain/Output.scala | 6 +++--- .../main/scala/org/ergoplatform/ergo/domain/SConstant.scala | 6 +++--- .../org/ergoplatform/dex/tracker/domain/Transaction.scala | 2 +- project/versions.scala | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/Output.scala b/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/Output.scala index bb1eb27f..e3654001 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/Output.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/Output.scala @@ -58,10 +58,10 @@ object Output { Map.empty // todo ) - def fromErgoBox(box: ErgoBox, txId: ModifierId): Output = + def fromErgoBox(box: ErgoBox): Output = Output( BoxId.fromErgo(box.id), - TxId(ModifierId !@@ txId), + TxId(ModifierId !@@ box.transactionId), box.value, box.index, box.creationHeight, @@ -71,7 +71,7 @@ object Output { ) private def parseRegisters( - additionalRegisters: Map[NonMandatoryRegisterId, _ <: EvaluatedValue[_ <: SType]] + additionalRegisters: Map[NonMandatoryRegisterId, EvaluatedValue[SType]] ): Map[RegisterId, SConstant] = additionalRegisters.flatMap { case (k, v) => for { diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/SConstant.scala b/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/SConstant.scala index 57ab429b..5fc24210 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/SConstant.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/SConstant.scala @@ -11,7 +11,7 @@ import org.ergoplatform.ergo.domain.SigmaType._ import tofu.logging.derivation.loggable import io.circe.syntax._ -@derive(show, encoder, loggable) +@derive(show, loggable) sealed trait SConstant object SConstant { @@ -34,7 +34,7 @@ object SConstant { @derive(loggable, show) final case class IntsConstant(value: List[Int]) extends SConstant - implicit val encoder: Encoder[SConstant] = { c => + implicit val encoderSConstant: Encoder[SConstant] = { c => val (renderedValue, sigmaType: SigmaType) = c match { case IntConstant(value) => value.toString -> SInt case LongConstant(value) => value.toString -> SLong @@ -46,7 +46,7 @@ object SConstant { Json.obj("renderedValue" -> Json.fromString(renderedValue), "sigmaType" -> sigmaType.asJson) } - implicit val decoder: Decoder[SConstant] = { c => + implicit val decoderSConstant: Decoder[SConstant] = { c => c.downField("renderedValue").as[String].flatMap { value => c.downField("sigmaType").as[SigmaType].map { case SInt => IntConstant(value.toInt) diff --git a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/domain/Transaction.scala b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/domain/Transaction.scala index 7921ee3e..22600811 100644 --- a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/domain/Transaction.scala +++ b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/domain/Transaction.scala @@ -18,6 +18,6 @@ object Transaction { Transaction( TxId(ModifierId !@@ tx.id), NonEmptyList.fromListUnsafe(tx.inputs.map(_.boxId).map(BoxId.fromErgo).toList), - NonEmptyList.fromListUnsafe(tx.outputs.toList.map(Output.fromErgoBox(_, tx.id))).sortBy(_.index) + NonEmptyList.fromListUnsafe(tx.outputs.toList.map(Output.fromErgoBox(_))).sortBy(_.index) ) } diff --git a/project/versions.scala b/project/versions.scala index ff490e82..e22b3cb4 100644 --- a/project/versions.scala +++ b/project/versions.scala @@ -37,7 +37,7 @@ object versions { val LogbackVersion = "1.2.3" val Slf4jVersion = "1.7.25" - val PureConfigVersion = "0.14.0" + val PureConfigVersion = "0.14.1" val NewtypeVersion = "0.4.3" val RefinedVersion = "0.9.10" From 1d297f56683082fdbbdbdde7a235c5aa9f34e7fd Mon Sep 17 00:00:00 2001 From: "t.gusev" Date: Mon, 27 Nov 2023 13:56:48 +0300 Subject: [PATCH 3/6] ledger stream added --- .../org/ergoplatform/dex/tracker/App.scala | 13 ++-- .../dex/tracker/configs/ConfigBundle.scala | 3 +- .../dex/tracker/processes/LedgerTracker.scala | 64 +++++++------------ .../tracker/processes/MempoolTracker.scala | 2 +- .../dex/tracker/streaming/KafkaTxEvent.scala | 18 ++++++ .../tracker/streaming/TransactionEvent.scala | 42 ++++++++++++ .../dex/tracker/streaming/package.scala | 2 + 7 files changed, 91 insertions(+), 53 deletions(-) create mode 100644 modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/KafkaTxEvent.scala create mode 100644 modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/TransactionEvent.scala diff --git a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/App.scala b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/App.scala index 4d30fb1d..244e0de9 100644 --- a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/App.scala +++ b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/App.scala @@ -14,7 +14,7 @@ import org.ergoplatform.dex.tracker.handlers.{lift, CFMMOpsHandler, CFMMPoolsHan import org.ergoplatform.dex.tracker.processes.LedgerTracker.TrackerMode import org.ergoplatform.dex.tracker.processes.{LedgerTracker, MempoolTracker} import org.ergoplatform.dex.tracker.repositories.TrackerCache -import org.ergoplatform.dex.tracker.streaming.{MempoolConsumer, MempoolEvent} +import org.ergoplatform.dex.tracker.streaming.{MempoolConsumer, MempoolEvent, TransactionConsumer, TransactionEvent} import org.ergoplatform.dex.tracker.validation.amm.CFMMRules import org.ergoplatform.ergo.modules.{ErgoNetwork, LedgerStreaming, MempoolStreaming} import org.ergoplatform.ergo.services.explorer.ErgoExplorerStreaming @@ -59,19 +59,14 @@ object App extends EnvApp[ConfigBundle] { Producer.make[InitF, StreamF, RunF, PoolId, Unconfirmed[CFMMPool]](configs.producers.unconfirmedAmmPools) implicit0(consumerMempool: MempoolConsumer[StreamF, RunF]) = makeConsumer[String, Option[MempoolEvent]](configs.mempoolTxConsumer) - implicit0(backend: SttpBackend[RunF, Fs2Streams[RunF]]) <- makeBackend(configs, blocker) - implicit0(explorer: ErgoExplorerStreaming[StreamF, RunF]) = ErgoExplorerStreaming.make[StreamF, RunF] - implicit0(node: ErgoNode[RunF]) <- Resource.eval(ErgoNode.make[InitF, RunF]) - implicit0(network: ErgoNetwork[RunF]) = ErgoNetwork.make[RunF] - implicit0(ledger: LedgerStreaming[StreamF]) = LedgerStreaming.make[StreamF, RunF] + implicit0(consumerLedger: TransactionConsumer[StreamF, RunF]) = + makeConsumer[String, Option[TransactionEvent]](configs.ledgerTxConsumer) implicit0(cfmmRules: CFMMRules[RunF]) = CFMMRules.make[RunF](configs.tokenId) confirmedAmmOrderHandler <- Resource.eval(CFMMOpsHandler.make[InitF, StreamF, RunF, Confirmed](configs.tokenId)) unconfirmedAmmOrderHandler <- Resource.eval(CFMMOpsHandler.make[InitF, StreamF, RunF, Unconfirmed](configs.tokenId)) confirmedAmmPoolsHandler <- Resource.eval(SettledCFMMPoolsHandler.make[InitF, StreamF, RunF]) unconfirmedAmmPoolsHandler <- Resource.eval(CFMMPoolsHandler.make[InitF, StreamF, RunF, Unconfirmed]) - implicit0(redis: Redis.Plain[RunF]) <- Redis.make[InitF, RunF](configs.redis) - implicit0(cache: TrackerCache[RunF]) <- Resource.eval(TrackerCache.make[InitF, RunF]) - ledgerTracker <- Resource.eval(LedgerTracker.make[InitF, StreamF, RunF](TrackerMode.Live, lift(confirmedAmmOrderHandler), confirmedAmmPoolsHandler)) + ledgerTracker <- Resource.eval(LedgerTracker.make[InitF, StreamF, RunF](consumerLedger, lift(confirmedAmmOrderHandler), confirmedAmmPoolsHandler)) mempoolTracker <- Resource.eval(MempoolTracker.make[InitF, StreamF, RunF](consumerMempool, unconfirmedAmmOrderHandler, unconfirmedAmmPoolsHandler)) } yield (ledgerTracker, mempoolTracker, configs) // format: on diff --git a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/configs/ConfigBundle.scala b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/configs/ConfigBundle.scala index 87b3a15d..6e141f8b 100644 --- a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/configs/ConfigBundle.scala +++ b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/configs/ConfigBundle.scala @@ -23,7 +23,8 @@ final case class ConfigBundle( @promote monetary: MonetaryConfig, redis: RedisConfig, tokenId: TokenId, - mempoolTxConsumer: ConsumerConfig + mempoolTxConsumer: ConsumerConfig, + ledgerTxConsumer: ConsumerConfig ) object ConfigBundle extends Context.Companion[ConfigBundle] with ConfigBundleCompanion[ConfigBundle] { diff --git a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/processes/LedgerTracker.scala b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/processes/LedgerTracker.scala index 66d88417..6a17b8d2 100644 --- a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/processes/LedgerTracker.scala +++ b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/processes/LedgerTracker.scala @@ -3,14 +3,11 @@ package org.ergoplatform.dex.tracker.processes import cats.{Defer, FlatMap, Monad, MonoidK} import org.ergoplatform.dex.tracker.configs.LedgerTrackingConfig import org.ergoplatform.dex.tracker.handlers.SettledBoxHandler -import org.ergoplatform.dex.tracker.processes.LedgerTracker.TrackerMode -import org.ergoplatform.dex.tracker.repositories.TrackerCache -import org.ergoplatform.ergo.modules.{ErgoNetwork, LedgerStreaming} +import org.ergoplatform.dex.tracker.streaming.{TransactionConsumer, TransactionEvent} +import org.ergoplatform.ergo.domain.SettledOutput import tofu.Catches import tofu.logging.{Logging, Logs} import tofu.streams.{Evals, Pace, ParFlatten} -import tofu.syntax.context._ -import tofu.syntax.embed._ import tofu.syntax.handle._ import tofu.syntax.logging._ import tofu.syntax.monadic._ @@ -19,41 +16,28 @@ import tofu.syntax.streams.all._ final class LedgerTracker[ F[_]: Monad: Evals[*[_], G]: ParFlatten: Pace: Defer: MonoidK: Catches, G[_]: Monad: Logging -](mode: TrackerMode, cache: TrackerCache[G], conf: LedgerTrackingConfig, handlers: List[SettledBoxHandler[F]])(implicit - network: ErgoNetwork[G], - ledger: LedgerStreaming[F] -) extends UtxoTracker[F] { +](consumer: TransactionConsumer[F, G], handlers: List[SettledBoxHandler[F]]) + extends UtxoTracker[F] { def run: F[Unit] = - eval(info"Starting Ledger Tracker in mode [${mode.toString}] ..") >> - eval(cache.lastScannedBoxOffset).repeat - .flatMap { lastOffset => - eval(network.getNetworkInfo).flatMap { networkParams => - val offset = lastOffset max conf.initialOffset - val maxOffset = networkParams.maxBoxGix - val nextOffset = (offset + conf.batchSize) min maxOffset - val outputsStream = mode match { - case TrackerMode.Historical => ledger.streamOutputs(offset, conf.batchSize) - case TrackerMode.Live => ledger.streamUnspentOutputs(offset, conf.batchSize) - } - val scan = - eval(info"Requesting UTXO batch {offset=$offset, maxOffset=$maxOffset, batchSize=${conf.batchSize} ..") >> - outputsStream - .evalTap(out => trace"Scanning output $out") - .flatTap(out => emits(handlers.map(_(out.pure[F]))).parFlattenUnbounded) - .evalMap(out => cache.setLastScannedBoxOffset(out.gix)) - val finalizeOffset = eval(cache.setLastScannedBoxOffset(nextOffset)) - val pause = - eval(info"Upper limit {maxOffset=$maxOffset} was reached. Retrying in ${conf.retryDelay.toSeconds}s") >> - unit[F].delay(conf.retryDelay) - - emits(if (offset != maxOffset) List(scan, finalizeOffset) else List(pause)).flatten + eval(info"Starting Ledger Tracker ..") >> + consumer.stream + .evalMap { txEvent => + txEvent.message match { + case Some(TransactionEvent.TransactionApply(transaction, _, h)) => + eval(info"Scanning tx ${transaction.id}") >> + emits( + transaction.outputs + .map(out => SettledOutput(out, h, h)) + .map { out => + eval(debug"Scanning output ${out.output.boxId}") >> + emits(handlers.map(_(out.pure[F]))).parFlattenUnbounded + } + ).parFlattenUnbounded >> eval(txEvent.commit) + case _ => eval(txEvent.commit) } } - .handleWith[Throwable] { e => - val delay = conf.retryDelay - eval(warnCause"Tracker failed. Retrying in $delay ms" (e)) >> run.delay(delay) - } + .handleWith[Throwable](e => eval(warnCause"Ledger Tracker failed, restarting .." (e)) >> run) } object LedgerTracker { @@ -71,14 +55,10 @@ object LedgerTracker { I[_]: FlatMap, F[_]: Monad: Evals[*[_], G]: ParFlatten: Pace: Defer: MonoidK: LedgerTrackingConfig.Has: Catches, G[_]: Monad - ](mode: TrackerMode, handlers: SettledBoxHandler[F]*)(implicit - network: ErgoNetwork[G], - ledger: LedgerStreaming[F], - cache: TrackerCache[G], + ](consumer: TransactionConsumer[F, G], handlers: SettledBoxHandler[F]*)(implicit logs: Logs[I, G] ): I[UtxoTracker[F]] = logs.forService[LedgerTracker[F, G]].map { implicit l => - (context map - (conf => new LedgerTracker[F, G](mode, cache, conf, handlers.toList): UtxoTracker[F])).embed + new LedgerTracker[F, G](consumer, handlers.toList): UtxoTracker[F] } } diff --git a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/processes/MempoolTracker.scala b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/processes/MempoolTracker.scala index 4def3206..cc8d7686 100644 --- a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/processes/MempoolTracker.scala +++ b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/processes/MempoolTracker.scala @@ -30,7 +30,7 @@ final class MempoolTracker[ emits(transaction.outputs.map { out => eval(debug"Scanning unconfirmed output ${out.boxId}") >> emits(handlers.map(_(out.pure[F]))).parFlattenUnbounded - }).parFlattenUnbounded + }).parFlattenUnbounded >> eval(mempoolEvent.commit) case _ => eval(mempoolEvent.commit) } } diff --git a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/KafkaTxEvent.scala b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/KafkaTxEvent.scala new file mode 100644 index 00000000..2b85604d --- /dev/null +++ b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/KafkaTxEvent.scala @@ -0,0 +1,18 @@ +package org.ergoplatform.dex.tracker.streaming + +import derevo.circe.{decoder, encoder} +import derevo.derive + +@derive(encoder, decoder) +sealed trait KafkaTxEvent { + val tx: String +} + +object KafkaTxEvent { + + @derive(encoder, decoder) + final case class AppliedEvent(timestamp: Long, tx: String, height: Int) extends KafkaTxEvent + + @derive(encoder, decoder) + final case class UnappliedEvent(tx: String) extends KafkaTxEvent +} diff --git a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/TransactionEvent.scala b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/TransactionEvent.scala new file mode 100644 index 00000000..5f44a600 --- /dev/null +++ b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/TransactionEvent.scala @@ -0,0 +1,42 @@ +package org.ergoplatform.dex.tracker.streaming + +import cats.effect.Sync +import derevo.derive +import fs2.kafka.{Deserializer, RecordDeserializer} +import io.circe.parser.decode +import org.ergoplatform.ErgoLikeTransactionSerializer +import org.ergoplatform.dex.tracker.domain.Transaction +import scorex.util.encode.Base64 +import tofu.logging.derivation.loggable + +import scala.util.Try + +@derive(loggable) +sealed trait TransactionEvent { + val transaction: Transaction + val timestamp: Long + val height: Int +} + +object TransactionEvent { + + @derive(loggable) + final case class TransactionApply(transaction: Transaction, timestamp: Long, height: Int) extends TransactionEvent + + @derive(loggable) + final case class TransactionUnapply(transaction: Transaction, timestamp: Long, height: Int) extends TransactionEvent + + implicit def transactionEventDeserializer[F[_]: Sync]: RecordDeserializer[F, Option[TransactionEvent]] = + RecordDeserializer.lift(Deserializer.string.map { str => + decode[KafkaTxEvent](str).toOption.flatMap(fromKafkaEvent) + }) + + private def fromKafkaEvent(event: KafkaTxEvent): Option[TransactionEvent] = + Base64.decode(event.tx).flatMap(b => Try(ErgoLikeTransactionSerializer.fromBytes(b))).toOption.map { tx => + event match { + case KafkaTxEvent.AppliedEvent(timestamp, _, height) => + TransactionApply(Transaction.fromErgoLike(tx), timestamp, height) + case KafkaTxEvent.UnappliedEvent(_) => TransactionUnapply(Transaction.fromErgoLike(tx), 0, 0) + } + } +} diff --git a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/package.scala b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/package.scala index 5e0b2e0b..f01d2def 100644 --- a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/package.scala +++ b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/package.scala @@ -6,4 +6,6 @@ import org.ergoplatform.common.streaming.Consumer package object streaming { type MempoolConsumer[S[_], F[_]] = Consumer.Aux[String, Option[MempoolEvent], KafkaOffset, S, F] + + type TransactionConsumer[S[_], F[_]] = Consumer.Aux[String, Option[TransactionEvent], KafkaOffset, S, F] } From 4565b544e38c465b98b1f154dd7fd82131e900cc Mon Sep 17 00:00:00 2001 From: "t.gusev" Date: Mon, 27 Nov 2023 14:08:31 +0300 Subject: [PATCH 4/6] config --- modules/utxo-tracker/src/main/resources/application.conf | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/modules/utxo-tracker/src/main/resources/application.conf b/modules/utxo-tracker/src/main/resources/application.conf index 9c225e1f..4f495f6c 100644 --- a/modules/utxo-tracker/src/main/resources/application.conf +++ b/modules/utxo-tracker/src/main/resources/application.conf @@ -34,3 +34,11 @@ network.explorer-uri = "https://api.ergoplatform.com" network.node-uri = "http://localhost:9053" redis.uri = "redis://redis:6379" + +mempool-tx-consumer.group-id = "ergo-mempool" +mempool-tx-consumer.client-id = "ergo-mempool-1" +mempool-tx-consumer.topic-id = "dex.amm.cfmm.mempool.events" + +ledger-tx-consumer.group-id = "ergo-ledger" +ledger-tx-consumer.client-id = "ergo-ledger-1" +ledger-tx-consumer.topic-id = "dex.amm.cfmm.ledger.events" \ No newline at end of file From 92d4e4ea638c6c23eefbd4d67cbd29a6d7da849b Mon Sep 17 00:00:00 2001 From: "t.gusev" Date: Mon, 27 Nov 2023 15:22:23 +0300 Subject: [PATCH 5/6] fix --- .../org/ergoplatform/dex/tracker/App.scala | 4 +++- .../dex/tracker/streaming/MempoolEvent.scala | 2 +- .../parsers/amm/KafkaEventsParser.scala | 23 +++++++++++++++++++ 3 files changed, 27 insertions(+), 2 deletions(-) create mode 100644 modules/utxo-tracker/src/test/scala/org/ergoplatform/dex/tracker/parsers/amm/KafkaEventsParser.scala diff --git a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/App.scala b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/App.scala index 244e0de9..35d43b11 100644 --- a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/App.scala +++ b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/App.scala @@ -30,6 +30,8 @@ import tofu.lift.IsoK import tofu.syntax.unlift._ import zio.interop.catz._ import zio.{ExitCode, URIO, ZEnv} +import MempoolEvent._ +import TransactionEvent._ object App extends EnvApp[ConfigBundle] { @@ -67,7 +69,7 @@ object App extends EnvApp[ConfigBundle] { confirmedAmmPoolsHandler <- Resource.eval(SettledCFMMPoolsHandler.make[InitF, StreamF, RunF]) unconfirmedAmmPoolsHandler <- Resource.eval(CFMMPoolsHandler.make[InitF, StreamF, RunF, Unconfirmed]) ledgerTracker <- Resource.eval(LedgerTracker.make[InitF, StreamF, RunF](consumerLedger, lift(confirmedAmmOrderHandler), confirmedAmmPoolsHandler)) - mempoolTracker <- Resource.eval(MempoolTracker.make[InitF, StreamF, RunF](consumerMempool, unconfirmedAmmOrderHandler, unconfirmedAmmPoolsHandler)) + mempoolTracker <- Resource.eval(MempoolTracker.make[InitF, StreamF, RunF](consumerMempool, unconfirmedAmmOrderHandler, unconfirmedAmmPoolsHandler))) } yield (ledgerTracker, mempoolTracker, configs) // format: on diff --git a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/MempoolEvent.scala b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/MempoolEvent.scala index 83c6e25d..01744815 100644 --- a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/MempoolEvent.scala +++ b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/streaming/MempoolEvent.scala @@ -33,7 +33,7 @@ object MempoolEvent { .flatMap(fromKafkaEvent) }) - private def fromKafkaEvent(event: KafkaMempoolEvent): Option[MempoolEvent] = + def fromKafkaEvent(event: KafkaMempoolEvent): Option[MempoolEvent] = Base64 .decode(event.tx) .flatMap { b => diff --git a/modules/utxo-tracker/src/test/scala/org/ergoplatform/dex/tracker/parsers/amm/KafkaEventsParser.scala b/modules/utxo-tracker/src/test/scala/org/ergoplatform/dex/tracker/parsers/amm/KafkaEventsParser.scala new file mode 100644 index 00000000..305d94a1 --- /dev/null +++ b/modules/utxo-tracker/src/test/scala/org/ergoplatform/dex/tracker/parsers/amm/KafkaEventsParser.scala @@ -0,0 +1,23 @@ +package org.ergoplatform.dex.tracker.parsers.amm + +import org.ergoplatform.dex.CatsPlatform +import org.scalatest.matchers.should +import org.scalatest.propspec.AnyPropSpec +import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks +import io.circe.parser._ +import org.ergoplatform.dex.tracker.streaming.{KafkaMempoolEvent, MempoolEvent} + +class KafkaEventsParser extends AnyPropSpec with should.Matchers with ScalaCheckPropertyChecks with CatsPlatform { + + val mempoolString = """{"TxAccepted":{"tx":"Aq5Tjhs7D7DHKeeqUM12h8DdR21ZH0bxw0bANVo3BOtHAABD6tGobdar1ZpW4oXL+wfjH/RWqed1d2qAnOmxLmWt6DiZ4YxAxP64KVhuEQ0KbuJjiK0voKn5IN9bQx5UxeDiNCN6JmWpTICsTIgolnItJYa37X5mRRWk5AAAA5kW11EyWTyLB/4YvY1YO9oWUu7XVlz0Gkc43dkPyZLsMD85AmVyvLQGC1H6/JN4eiNrskN0S6uqmfzrgz1h4ZgD+vLLMp8ukNbSO1jZG7tsBGqhQyYcwh9S++KCS/y/BAPs4a2GnasoGZkDDwQABAIEAgQEBAQF/v//////////AQX+//////////8BBQAE0A8EAAQABAYFAAUABYDaxAnYGdYBsqVzAADWAuTGpwQE1gPbYwhyAdYE22MIp9YFsnIDcwEA1gaycgRzAgDWB7JyA3MDANYIsnIEcwQA1gmZcwWMcgYC1gqZmXMGjHIFAnIJ1gvBcgHWDMGn1g2ZcgtyDNYOkXINcwfWD4xyCALWEH5yDwbWEX5yDQbWEpmMcgcCcg/WE35yDAbWFHMI1hV+chIG1hZ+cgoG1hd+cgkG1hicchFyF9YZnHIVchfR7e3t7e3t7ZPCcgHCp5PkxnIBBARyApOycgNzCQCycgRzCgCTjHIFAYxyBgGTjHIHAYxyCAGTsXIDcwuVk3IKcwyVcg6SnJxyEHIRfnICBpx+8HISBpqcchN+chQGfpxyDX5yAgUGkpycchNyFX5yAgacfvByDQaanHIQfnIUBn6cchJ+cgIFBpXtcg6RchJzDZByFqGdchhyE51yGXIQ7ZJyGJxyFnITknIZnHIWchCRcgtzDtPmRQMAAQHPnqPBwv7//38CsfeSCgEExg/zqdeZ4S8ACM0CPF2XoH539G0EhJTqcSl5LLruN2mWL9df0b1Wyk+FL5rT5kUBAv+LAQEEqM2LAYCt4gQQBQQABAAONhACBKALCM0Ceb5mfvncu6xVoGKVzocLBwKb/NstzijZWfKBWxb4F5jqAtGSo5qMx6cBcwBzARABAgQC0ZaDAwGTo4zHsqVzAAABk8KypXMBAHRzAnMDgwEIze6sk7GlcwTT5kUAAA"}}""" + + property("Parse mempool tx correct") { + val a = parse(mempoolString).toOption.get + .as[KafkaMempoolEvent] + .toOption.get + + val b = MempoolEvent.fromKafkaEvent(a) + + println(b) + } +} From c84af9749d46f2e6e55dc93242d1ce55f58a5b9b Mon Sep 17 00:00:00 2001 From: "t.gusev" Date: Mon, 27 Nov 2023 15:28:08 +0300 Subject: [PATCH 6/6] fix --- .../org/ergoplatform/dex/executor/amm/App.scala | 1 + .../dex-core/src/main/scala/fs2/kafka/serde.scala | 13 ++++++++----- .../org/ergoplatform/dex/domain/amm/package.scala | 3 ++- .../org/ergoplatform/dex/domain/locks/types.scala | 1 + .../ergoplatform/dex/domain/orderbook/Order.scala | 3 ++- .../ergoplatform/dex/domain/orderbook/Trade.scala | 2 +- .../ergoplatform/dex/domain/orderbook/package.scala | 3 ++- .../main/scala/org/ergoplatform/dex/index/App.scala | 1 + .../scala/org/ergoplatform/dex/tracker/App.scala | 4 ++-- 9 files changed, 20 insertions(+), 11 deletions(-) diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala index 8d9b9ac6..cb12c9cb 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala @@ -4,6 +4,7 @@ import cats.Id import cats.effect.{Blocker, Resource} import fs2.kafka.RecordDeserializer import fs2.kafka.serde._ +import fs2.kafka.serde.ser._ import org.ergoplatform.ErgoAddressEncoder import org.ergoplatform.common.EnvApp import org.ergoplatform.common.streaming._ diff --git a/modules/dex-core/src/main/scala/fs2/kafka/serde.scala b/modules/dex-core/src/main/scala/fs2/kafka/serde.scala index 326f1c0f..f0ca7bca 100644 --- a/modules/dex-core/src/main/scala/fs2/kafka/serde.scala +++ b/modules/dex-core/src/main/scala/fs2/kafka/serde.scala @@ -16,10 +16,13 @@ object serde { Deserializer.lift(decoder.decode) } - implicit def serializerViaCirceEncoder[F[_]: Sync, A: Encoder]: RecordSerializer[F, A] = - RecordSerializer.lift { - Serializer.lift { a => - a.asJson.noSpacesSortKeys.getBytes(charset).pure + object ser { + + implicit def serializerViaCirceEncoder[F[_]: Sync, A: Encoder]: RecordSerializer[F, A] = + RecordSerializer.lift { + Serializer.lift { a => + a.asJson.noSpacesSortKeys.getBytes(charset).pure + } } - } + } } diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/package.scala b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/package.scala index b6a6c9b0..b630c146 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/package.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/package.scala @@ -5,7 +5,8 @@ import derevo.cats.show import derevo.circe.{decoder, encoder} import derevo.derive import doobie.{Get, Put} -import fs2.kafka.serde.{deserializerViaKafkaDecoder, serializerViaCirceEncoder} +import fs2.kafka.serde.deserializerViaKafkaDecoder +import fs2.kafka.serde.ser._ import fs2.kafka.{RecordDeserializer, RecordSerializer} import io.estatico.newtype.macros.newtype import org.ergoplatform.common.HexString diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/locks/types.scala b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/locks/types.scala index f99a46f8..fedd9c16 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/locks/types.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/locks/types.scala @@ -6,6 +6,7 @@ import derevo.circe.{decoder, encoder} import derevo.derive import doobie.{Get, Put} import fs2.kafka.serde._ +import fs2.kafka.serde.ser._ import fs2.kafka.{RecordDeserializer, RecordSerializer} import io.estatico.newtype.macros.newtype import org.ergoplatform.ergo.BoxId diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/orderbook/Order.scala b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/orderbook/Order.scala index 2034463a..f580c96a 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/orderbook/Order.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/orderbook/Order.scala @@ -11,6 +11,7 @@ import io.circe.{Decoder, Encoder} import io.estatico.newtype.ops._ import org.ergoplatform.dex.domain.PairId import org.ergoplatform.dex.protocol.instances._ +import fs2.kafka.serde.ser._ import org.ergoplatform.ergo.TokenId import tofu.logging.{Loggable, _} @@ -74,7 +75,7 @@ object Order { io.circe.derivation.deriveDecoder[AnyOrder] implicit def recordSerializer[F[_]: Sync]: RecordSerializer[F, AnyOrder] = - fs2.kafka.serde.serializerViaCirceEncoder + fs2.kafka.serde.ser.serializerViaCirceEncoder implicit def recordDeserializer[F[_]: Sync]: RecordDeserializer[F, AnyOrder] = fs2.kafka.serde.deserializerViaKafkaDecoder diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/orderbook/Trade.scala b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/orderbook/Trade.scala index 03f986f7..1073bad6 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/orderbook/Trade.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/orderbook/Trade.scala @@ -41,7 +41,7 @@ object Trade { implicit def decoder: Decoder[AnyTrade] = io.circe.derivation.deriveDecoder implicit def recordSerializer[F[_]: Sync]: RecordSerializer[F, AnyTrade] = - fs2.kafka.serde.serializerViaCirceEncoder + fs2.kafka.serde.ser.serializerViaCirceEncoder implicit def recordDeserializer[F[_]: Sync]: RecordDeserializer[F, AnyTrade] = fs2.kafka.serde.deserializerViaKafkaDecoder diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/orderbook/package.scala b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/orderbook/package.scala index 003eebd3..563c0214 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/orderbook/package.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/orderbook/package.scala @@ -2,7 +2,8 @@ package org.ergoplatform.dex.domain import cats.effect.Sync import doobie.{Get, Put} -import fs2.kafka.serde.{deserializerViaKafkaDecoder, serializerViaCirceEncoder} +import fs2.kafka.serde.{deserializerViaKafkaDecoder} +import fs2.kafka.serde.ser._ import fs2.kafka.{RecordDeserializer, RecordSerializer} import io.circe.{Decoder, Encoder} import io.estatico.newtype.macros.newtype diff --git a/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/App.scala b/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/App.scala index cb4882dd..78c33b6f 100644 --- a/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/App.scala +++ b/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/App.scala @@ -4,6 +4,7 @@ import cats.effect.{Blocker, Clock, Resource} import fs2.Chunk import fs2.kafka.RecordDeserializer import fs2.kafka.serde._ +import fs2.kafka.serde.ser._ import org.ergoplatform.ErgoAddressEncoder import org.ergoplatform.common.EnvApp import org.ergoplatform.common.cache.{MakeRedisTransaction, Redis} diff --git a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/App.scala b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/App.scala index 35d43b11..49116431 100644 --- a/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/App.scala +++ b/modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/App.scala @@ -2,7 +2,7 @@ package org.ergoplatform.dex.tracker import cats.effect.{Blocker, Resource} import fs2.kafka.RecordDeserializer -import fs2.kafka.serde._ +import fs2.kafka.serde.ser._ import org.ergoplatform.ErgoAddressEncoder import org.ergoplatform.common.EnvApp import org.ergoplatform.common.cache.{MakeRedisTransaction, Redis} @@ -69,7 +69,7 @@ object App extends EnvApp[ConfigBundle] { confirmedAmmPoolsHandler <- Resource.eval(SettledCFMMPoolsHandler.make[InitF, StreamF, RunF]) unconfirmedAmmPoolsHandler <- Resource.eval(CFMMPoolsHandler.make[InitF, StreamF, RunF, Unconfirmed]) ledgerTracker <- Resource.eval(LedgerTracker.make[InitF, StreamF, RunF](consumerLedger, lift(confirmedAmmOrderHandler), confirmedAmmPoolsHandler)) - mempoolTracker <- Resource.eval(MempoolTracker.make[InitF, StreamF, RunF](consumerMempool, unconfirmedAmmOrderHandler, unconfirmedAmmPoolsHandler))) + mempoolTracker <- Resource.eval(MempoolTracker.make[InitF, StreamF, RunF](consumerMempool, unconfirmedAmmOrderHandler, unconfirmedAmmPoolsHandler)) } yield (ledgerTracker, mempoolTracker, configs) // format: on