Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

i1784 remove invalid tx from mempool and delivery tracker #1803

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/scala/org/ergoplatform/local/CleanupWorker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class CleanupWorker(nodeViewHolderRef: ActorRef,
val toEliminate = validatePool(validator, mempool)
if (toEliminate.nonEmpty) {
log.info(s"${toEliminate.size} transactions from mempool were invalidated")
nodeViewHolderRef ! EliminateTransactions(toEliminate)
nodeViewHolderRef ! EliminateTransactions(toEliminate, EliminateTransactions.FromPoolValidation)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ object CandidateGenerator extends ScorexLogging {
emissionTxs ++ prioritizedTransactions ++ poolTxs
)

val eliminateTransactions = EliminateTransactions(toEliminate)
val eliminateTransactions = EliminateTransactions(toEliminate, EliminateTransactions.FromMiningValidation)

if (txs.isEmpty) {
throw new IllegalArgumentException(
Expand Down
48 changes: 34 additions & 14 deletions src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import org.ergoplatform.modifiers.history.extension.Extension
import org.ergoplatform.modifiers.history.header.Header
import org.ergoplatform.modifiers.mempool.ErgoTransaction
import org.ergoplatform.modifiers.{BlockSection, ErgoFullBlock}
import org.ergoplatform.nodeView.ErgoNodeViewHolder.BlockAppliedTransactions
import org.ergoplatform.nodeView.history.{ErgoHistory, ErgoHistoryReader}
import org.ergoplatform.nodeView.mempool.ErgoMemPool
import org.ergoplatform.nodeView.mempool.ErgoMemPool.ProcessingOutcome
Expand All @@ -23,15 +22,15 @@ import org.ergoplatform.nodeView.ErgoNodeViewHolder.ReceivableMessages._
import scorex.core.consensus.ProgressInfo
import scorex.core.settings.ScorexSettings
import scorex.core.utils.{NetworkTimeProvider, ScorexEncoding}
import scorex.core.validation.RecoverableModifierError
import scorex.core.validation.{MalformedModifierError, RecoverableModifierError}
import scorex.util.ScorexLogging
import spire.syntax.all.cfor
import java.io.File

import java.io.File
import org.ergoplatform.modifiers.history.{ADProofs, HistoryModifierSerializer}


import org.ergoplatform.nodeView.ErgoNodeViewHolder.ReceivableMessages.EliminateTransactions.ValidationSource
import org.ergoplatform.nodeView.history.ErgoHistory.Height
import scorex.core.transaction.Transaction

import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -240,18 +239,27 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti
log.error("Reporting modifier failed", ex)
f
case (success@Success(updateInfo), modToApply) =>
def reportInvalidModifier(ex: Throwable): Try[UpdateInformation] =
history.reportModifierIsInvalid(modToApply, progressInfo).map { case (newHis, newProgressInfo) =>
context.system.eventStream.publish(SemanticallyFailedModification(modToApply, ex))
UpdateInformation(newHis, updateInfo.state, Some(modToApply), Some(newProgressInfo), updateInfo.suffix)
}
if (updateInfo.failedMod.isEmpty) {
updateInfo.state.applyModifier(modToApply, estimatedTip())(lm => pmodModify(lm.pmod, local = true)) match {
case Success(stateAfterApply) =>
history.reportModifierIsValid(modToApply).map { newHis =>
context.system.eventStream.publish(SemanticallySuccessfulModifier(modToApply))
UpdateInformation(newHis, stateAfterApply, None, None, updateInfo.suffix :+ modToApply)
}
case Failure(e) =>
history.reportModifierIsInvalid(modToApply, progressInfo).map { case (newHis, newProgressInfo) =>
context.system.eventStream.publish(SemanticallyFailedModification(modToApply, e))
UpdateInformation(newHis, updateInfo.state, Some(modToApply), Some(newProgressInfo), updateInfo.suffix)
case Failure(ex: MalformedModifierError) if ex.modifierTypeId == Transaction.ModifierTypeId =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can transaction appear here at all?

Failed modifier should be a block I guess

Copy link
Collaborator Author

@pragmaxim pragmaxim Aug 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works like this :

I assume that even though block is considered invalid, we wanted to flag invalidate this particular TX in mempool, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not so simple, as history.reportModifierIsInvalid is expecting block ,not transaction

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We call reportModifierIsInvalid(modToApply) in both cases, just in case of MalformedModifierError with TxType modifier we EliminateTransactions so it gets removed from MemPool

// invalid transaction in a block will likely be in mempool and should be removed
logger.warn(s"Invalidating transaction ${ex.modifierId} in mempool due to ${ex.getMessage}", ex)
reportInvalidModifier(ex).map { updateInformation =>
self ! EliminateTransactions(List(ex.modifierId), EliminateTransactions.FromBlockValidation)
updateInformation
}
case Failure(ex) =>
reportInvalidModifier(ex)
}
} else success
}
Expand Down Expand Up @@ -390,7 +398,7 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti
case Success(state) =>
log.info(s"State database read, state synchronized")
val wallet = ErgoWallet.readOrGenerate(
history.getReader.asInstanceOf[ErgoHistoryReader],
history.getReader,
settings,
state.parameters)
log.info("Wallet database read")
Expand Down Expand Up @@ -585,12 +593,17 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti
txs.foreach(txModify)
case LocallyGeneratedTransaction(tx) =>
sender() ! txModify(tx)
case EliminateTransactions(ids) =>
case EliminateTransactions(ids, source) =>
val immediateFailure = source match {
case EliminateTransactions.FromBlockValidation => true
case _ => false
}
val updatedPool = memoryPool().filter(tx => !ids.contains(tx.id))
updateNodeView(updatedMempool = Some(updatedPool))
ids.foreach { id =>
val e = new Exception("Became invalid")
context.system.eventStream.publish(FailedTransaction(id, e, immediateFailure = false))
context.system.eventStream.publish(
FailedTransaction(id, new Exception(s"Tx $id rejection coming : $source"), immediateFailure)
)
}
}

Expand Down Expand Up @@ -661,7 +674,14 @@ object ErgoNodeViewHolder {

case class LocallyGeneratedModifier(pmod: BlockSection)

case class EliminateTransactions(ids: Seq[scorex.util.ModifierId])
case class EliminateTransactions(ids: Seq[scorex.util.ModifierId], source: ValidationSource)

object EliminateTransactions {
sealed trait ValidationSource
case object FromMiningValidation extends ValidationSource
case object FromBlockValidation extends ValidationSource
case object FromPoolValidation extends ValidationSource
}

case object IsChainHealthy
sealed trait HealthCheckResult
Expand Down
10 changes: 6 additions & 4 deletions src/main/scala/org/ergoplatform/nodeView/state/DigestState.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package org.ergoplatform.nodeView.state

import java.io.File

import org.ergoplatform.ErgoBox
import org.ergoplatform.ErgoLikeContext.Height
import org.ergoplatform.modifiers.history.ADProofs
import org.ergoplatform.modifiers.history.header.Header
import org.ergoplatform.modifiers.mempool.ErgoTransaction
import org.ergoplatform.modifiers.{ErgoFullBlock, BlockSection}
import org.ergoplatform.modifiers.{BlockSection, ErgoFullBlock}
import org.ergoplatform.nodeView.ErgoNodeViewHolder.ReceivableMessages.LocallyGeneratedModifier
import org.ergoplatform.nodeView.state.ErgoState.ModifierProcessing
import org.ergoplatform.settings._
Expand All @@ -16,6 +15,7 @@ import org.ergoplatform.wallet.boxes.ErgoBoxSerializer
import scorex.db.{ByteArrayWrapper, LDBVersionedStore}
import scorex.core._
import scorex.core.utils.ScorexEncoding
import scorex.core.validation.MalformedModifierError
import scorex.crypto.authds.ADDigest
import scorex.util.ScorexLogging

Expand Down Expand Up @@ -52,11 +52,13 @@ class DigestState protected(override val version: VersionTag,
(transactions.flatMap(_.outputs) ++ boxesFromProofs).map(o => (ByteArrayWrapper(o.id), o)).toMap
}

def checkBoxExistence(id: ErgoBox.BoxId): Try[ErgoBox] =
def checkBoxExistence(tx: ErgoTransaction, id: ErgoBox.BoxId): Try[ErgoBox] =
knownBoxesTry.flatMap { knownBoxes =>
knownBoxes
.get(ByteArrayWrapper(id))
.fold[Try[ErgoBox]](Failure(new Exception(s"Box with id ${Algos.encode(id)} not found")))(Success(_))
.fold[Try[ErgoBox]](
Failure(new MalformedModifierError(s"Box with id ${Algos.encode(id)} not found", tx.id, tx.modifierTypeId))
)(Success(_))
}

ErgoState.execTransactions(transactions, currentStateContext)(checkBoxExistence)
Expand Down
21 changes: 13 additions & 8 deletions src/main/scala/org/ergoplatform/nodeView/state/ErgoState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.ergoplatform.settings.ValidationRules._
import org.ergoplatform.settings.{ChainSettings, Constants, ErgoSettings, LaunchParameters}
import org.ergoplatform.wallet.interpreter.ErgoInterpreter
import scorex.core.validation.ValidationResult.Valid
import scorex.core.validation.{ModifierValidator, ValidationResult}
import scorex.core.validation.{MalformedModifierError, ModifierValidator, ValidationResult}
import scorex.core.{VersionTag, idToVersion}
import scorex.crypto.authds.avltree.batch.{Insert, Lookup, Remove}
import scorex.crypto.authds.{ADDigest, ADValue}
Expand Down Expand Up @@ -91,12 +91,12 @@ object ErgoState extends ScorexLogging {
* Tries to validate and execute transactions.
* @param transactions to be validated and executed
* @param currentStateContext to be used for tx execution
* @param checkBoxExistence function to provide ErgoBox by BoxId
* @param checkBoxExistence function to provide ErgoBox by BoxId and ErgoTransaction
* @return Result of transactions execution with total cost inside
*/
def execTransactions(transactions: Seq[ErgoTransaction],
currentStateContext: ErgoStateContext)
(checkBoxExistence: ErgoBox.BoxId => Try[ErgoBox]): ValidationResult[Long] = {
(checkBoxExistence: (ErgoTransaction, ErgoBox.BoxId) => Try[ErgoBox]): ValidationResult[Long] = {
val verifier: ErgoInterpreter = ErgoInterpreter(currentStateContext.currentParameters)

def preAllocatedBuilder[T: ClassTag](sizeHint: Int): mutable.ArrayBuilder[T] = {
Expand All @@ -107,15 +107,16 @@ object ErgoState extends ScorexLogging {

@tailrec
def collectBoxesById(
tx: ErgoTransaction,
remainingBoxIds: Iterator[ErgoBox.BoxId],
resultingBoxes: Try[mutable.ArrayBuilder[ErgoBox]]
): Try[IndexedSeq[ErgoBox]] = {
if (!remainingBoxIds.hasNext) {
resultingBoxes.map(_.result())
} else {
checkBoxExistence(remainingBoxIds.next()) match {
checkBoxExistence(tx, remainingBoxIds.next()) match {
case Success(box) =>
collectBoxesById(remainingBoxIds, resultingBoxes.map(_ += box))
collectBoxesById(tx, remainingBoxIds, resultingBoxes.map(_ += box))
case Failure(ex) =>
Failure(ex)
}
Expand All @@ -132,9 +133,9 @@ object ErgoState extends ScorexLogging {
val validCostResult = costResult.asInstanceOf[Valid[Long]]
val tx = transactions(i)
val boxesToSpendTry: Try[IndexedSeq[ErgoBox]] =
collectBoxesById(tx.inputs.iterator.map(_.boxId), Success(preAllocatedBuilder(tx.inputs.length)))
collectBoxesById(tx, tx.inputs.iterator.map(_.boxId), Success(preAllocatedBuilder(tx.inputs.length)))
lazy val dataBoxesTry: Try[IndexedSeq[ErgoBox]] =
collectBoxesById(tx.dataInputs.iterator.map(_.boxId), Success(preAllocatedBuilder(tx.inputs.length)))
collectBoxesById(tx, tx.dataInputs.iterator.map(_.boxId), Success(preAllocatedBuilder(tx.inputs.length)))
lazy val boxes: Try[(IndexedSeq[ErgoBox], IndexedSeq[ErgoBox])] = dataBoxesTry.flatMap(db => boxesToSpendTry.map(bs => (db, bs)))
costResult = tx.validateStateless()
.validateNoFailure(txBoxesToSpend, boxesToSpendTry, tx.id, tx.modifierTypeId)
Expand Down Expand Up @@ -166,7 +167,11 @@ object ErgoState extends ScorexLogging {
toInsert.remove(wrappedBoxId) match {
case None =>
if (toRemove.put(wrappedBoxId, Remove(i.boxId)).nonEmpty) {
throw new IllegalArgumentException(s"Tx : ${tx.id} is double-spending input id : $wrappedBoxId")
throw new MalformedModifierError(
s"Tx : ${tx.id} is double-spending input id : $wrappedBoxId",
tx.id,
tx.modifierTypeId
)
}
case _ => // old value removed, do nothing
}
Expand Down
50 changes: 41 additions & 9 deletions src/main/scala/org/ergoplatform/nodeView/state/UtxoState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.ergoplatform.ErgoLikeContext.Height
import org.ergoplatform.modifiers.history.header.Header
import org.ergoplatform.modifiers.history.ADProofs
import org.ergoplatform.modifiers.mempool.ErgoTransaction
import org.ergoplatform.modifiers.state.StateChanges
import org.ergoplatform.modifiers.{BlockSection, ErgoFullBlock}
import org.ergoplatform.settings.Algos.HF
import org.ergoplatform.settings.ValidationRules.{fbDigestIncorrect, fbOperationFailed}
Expand All @@ -17,13 +18,15 @@ import scorex.core._
import scorex.core.transaction.Transaction
import scorex.core.transaction.state.TransactionValidation
import scorex.core.utils.ScorexEncoding
import scorex.core.validation.{ModifierValidator}
import scorex.core.validation.{MalformedModifierError, ModifierValidator}
import scorex.crypto.authds.avltree.batch._
import scorex.crypto.authds.{ADDigest, ADValue}
import scorex.crypto.hash.Digest32
import scorex.db.{ByteArrayWrapper, LDBVersionedStore}
import scorex.util.ModifierId

import scala.collection.breakOut
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

/**
Expand Down Expand Up @@ -79,20 +82,49 @@ class UtxoState(override val persistentProver: PersistentBatchAVLProver[Digest32
import cats.implicits._
val createdOutputs = transactions.flatMap(_.outputs).map(o => (ByteArrayWrapper(o.id), o)).toMap

def checkBoxExistence(id: ErgoBox.BoxId): Try[ErgoBox] = createdOutputs
def checkBoxExistence(tx: ErgoTransaction, id: ErgoBox.BoxId): Try[ErgoBox] = createdOutputs
.get(ByteArrayWrapper(id))
.orElse(boxById(id))
.fold[Try[ErgoBox]](Failure(new Exception(s"Box with id ${Algos.encode(id)} not found")))(Success(_))
.fold[Try[ErgoBox]](
Failure(new MalformedModifierError(s"Box with id ${Algos.encode(id)} not found", tx.id, tx.modifierTypeId))
)(Success(_))

def performStateChangingOperations(stateChanges: StateChanges): Try[List[Option[ADValue]]] = {
// finding just one suspected transaction makes it invalid
def transactionError(isTxSuspected: ErgoTransaction => Boolean, cause: Throwable): Throwable = {
val suspectedTxs = transactions.filter(isTxSuspected)
if (suspectedTxs.size == 1)
new MalformedModifierError(cause.getMessage, suspectedTxs.head.id, Transaction.ModifierTypeId)
else {
cause
}
}

// performing operations may fail in which case we can track that failed operation back to its transaction
def performOperation(operation: Operation): Try[Option[ADValue]] =
persistentProver.performOneOperation(operation).recoverWith {
case NonFatal(ex) =>
operation match {
case Insert(outputBoxId, _) =>
Failure(transactionError(_.outputs.map(_.id).contains(outputBoxId), ex))
case Remove(inputBoxId) =>
Failure(transactionError(_.inputs.map(_.boxId).contains(inputBoxId), ex))
case Lookup(inputBoxId) =>
Failure(transactionError(_.dataInputs.map(_.boxId).contains(inputBoxId), ex))
case _ =>
Failure(ex) // cannot happen as we process only insert,remove,lookup operations
}
}

val results: List[Try[Option[ADValue]]] = stateChanges.operations.map(performOperation)(breakOut)
Traverse[List].sequence(results)
}

val txProcessing = ErgoState.execTransactions(transactions, currentStateContext)(checkBoxExistence)
if (txProcessing.isValid) {
val resultTry =
ErgoState.stateChanges(transactions).map { stateChanges =>
val mods = stateChanges.operations
Traverse[List].sequence(mods.map(persistentProver.performOneOperation).toList).map(_ => ())
}
val operationsProcessing = ErgoState.stateChanges(transactions).flatMap(performStateChangingOperations)
ModifierValidator(stateContext.validationSettings)
.validateNoFailure(fbOperationFailed, resultTry, Transaction.ModifierTypeId)
.validateNoFailure(fbOperationFailed, operationsProcessing, Transaction.ModifierTypeId)
.validateEquals(fbDigestIncorrect, expectedDigest, persistentProver.digest, headerId, Header.modifierTypeId)
.result
.toTry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,28 +163,28 @@ class ErgoStateSpecification extends ErgoPropertyTest {
val expectedCost = 535995

// successful validation
ErgoState.execTransactions(txs, stateContext)(id => Try(boxes(ByteArrayWrapper(id)))) shouldBe Valid(expectedCost)
ErgoState.execTransactions(txs, stateContext)((_, id) => Try(boxes(ByteArrayWrapper(id)))) shouldBe Valid(expectedCost)

// cost limit exception expected when crossing MaxBlockCost
val tooManyTxs = txs ++ generateTxs
assert(
ErgoState.execTransactions(tooManyTxs, stateContext)(id => Try(boxes(ByteArrayWrapper(id)))).errors.head.message.contains(
ErgoState.execTransactions(tooManyTxs, stateContext)((_, id) => Try(boxes(ByteArrayWrapper(id)))).errors.head.message.contains(
"Estimated execution cost 23533 exceeds the limit 23009"
)
)

// missing box in state
ErgoState.execTransactions(txs, stateContext)(_ => Failure(new RuntimeException)).errors.head.message shouldBe
ErgoState.execTransactions(txs, stateContext)((_, _) => Failure(new RuntimeException)).errors.head.message shouldBe
"Every input of the transaction should be in UTXO. null"

// tx validation should kick in and detect block height violation
val invalidTx = invalidErgoTransactionGen.sample.get
assert(
ErgoState.execTransactions(txs :+ invalidTx, stateContext)(id => Try(boxes.getOrElse(ByteArrayWrapper(id), invalidTx.outputs.head)))
ErgoState.execTransactions(txs :+ invalidTx, stateContext)((_, id) => Try(boxes.getOrElse(ByteArrayWrapper(id), invalidTx.outputs.head)))
.errors.head.message.startsWith("Transaction outputs should have creationHeight not exceeding block height.")
)

// no transactions are valid
assert(ErgoState.execTransactions(Seq.empty, stateContext)(id => Try(boxes(ByteArrayWrapper(id)))).isValid)
assert(ErgoState.execTransactions(Seq.empty, stateContext)((_, id) => Try(boxes(ByteArrayWrapper(id)))).isValid)
}
}
Loading