diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 7d30c0ea97..a3eb68cb64 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -56,6 +56,9 @@ ergo { # Use external miner, native miner is used if set to `false` useExternalMiner = true + # Block candidate is regenerated periodically to include new transactions + blockCandidateGenerationInterval = 20s + # How many internal miner threads to spawn (used mainly for testing) internalMinersCount = 1 diff --git a/src/main/scala/org/ergoplatform/mining/CandidateGenerator.scala b/src/main/scala/org/ergoplatform/mining/CandidateGenerator.scala index a42b50e6db..49005b6a0d 100644 --- a/src/main/scala/org/ergoplatform/mining/CandidateGenerator.scala +++ b/src/main/scala/org/ergoplatform/mining/CandidateGenerator.scala @@ -52,6 +52,9 @@ class CandidateGenerator( import org.ergoplatform.mining.CandidateGenerator._ + private val candidateGenInterval = + ergoSettings.nodeSettings.blockCandidateGenerationInterval + /** retrieve Readers once on start and then get updated by events */ override def preStart(): Unit = { log.info("CandidateGenerator is starting") @@ -87,7 +90,8 @@ class CandidateGenerator( context.become( initialized( CandidateGeneratorState( - cache = None, + cachedCandidate = None, + cachedPreviousCandidate = None, solvedBlock = None, h, s, @@ -114,7 +118,16 @@ class CandidateGenerator( case ChangedState(s: UtxoStateReader) => context.become(initialized(state.copy(sr = s))) case ChangedMempool(mp: ErgoMemPoolReader) => - context.become(initialized(state.copy(mpr = mp))) + if (hasCandidateExpired( + state.cachedCandidate, + state.solvedBlock, + candidateGenInterval + )) { + context.become(initialized(state.copy(cachedCandidate = None, cachedPreviousCandidate = None, mpr = mp))) + self ! GenerateCandidate(txsToInclude = Seq.empty, reply = false) + } else { + context.become(initialized(state.copy(mpr = mp))) + } case _: NodeViewChange => // Just ignore all other NodeView Changes @@ -126,11 +139,11 @@ class CandidateGenerator( log.info( s"Preparing new candidate on getting new block at ${header.height}" ) - if (needNewCandidate(state.cache, header)) { + if (needNewCandidate(state.cachedCandidate, header)) { if (needNewSolution(state.solvedBlock, header.id)) - context.become(initialized(state.copy(cache = None, solvedBlock = None))) + context.become(initialized(state.copy(cachedCandidate = None, cachedPreviousCandidate = None, solvedBlock = None))) else - context.become(initialized(state.copy(cache = None))) + context.become(initialized(state.copy(cachedCandidate = None, cachedPreviousCandidate = None))) self ! GenerateCandidate(txsToInclude = Seq.empty, reply = false) } else { context.become(initialized(state)) @@ -138,8 +151,8 @@ class CandidateGenerator( case gen @ GenerateCandidate(txsToInclude, reply) => val senderOpt = if (reply) Some(sender()) else None - if (cachedFor(state.cache, txsToInclude)) { - senderOpt.foreach(_ ! StatusReply.success(state.cache.get)) + if (cachedFor(state.cachedCandidate, txsToInclude)) { + senderOpt.foreach(_ ! StatusReply.success(state.cachedCandidate.get)) } else { val start = System.currentTimeMillis() CandidateGenerator.generateCandidate( @@ -163,7 +176,7 @@ class CandidateGenerator( log.info(s"Generated new candidate in $generationTook ms") context.become( initialized( - state.copy(cache = Some(candidate), avgGenTime = generationTook.millis) + state.copy(cachedCandidate = Some(candidate), cachedPreviousCandidate = state.cachedCandidate, avgGenTime = generationTook.millis) ) ) senderOpt.foreach(_ ! StatusReply.success(candidate)) @@ -181,7 +194,7 @@ class CandidateGenerator( } case preSolution: AutolykosSolution - if state.solvedBlock.isEmpty && state.cache.nonEmpty => + if state.solvedBlock.isEmpty && state.cachedCandidate.nonEmpty => // Inject node pk if it is not externally set (in Autolykos 2) val solution = if (CryptoFacade.isInfinityPoint(preSolution.pk)) { @@ -190,7 +203,10 @@ class CandidateGenerator( preSolution } val result: StatusReply[Unit] = { - val newBlock = completeBlock(state.cache.get.candidateBlock, solution) + val newBlock = state.cachedPreviousCandidate + .map(prevCandidate => completeBlock( prevCandidate.candidateBlock, solution)) + .filter(block => ergoSettings.chainSettings.powScheme.validate(block.header).isSuccess) + .getOrElse(completeBlock( state.cachedCandidate.get.candidateBlock, solution)) log.info(s"New block mined, header: ${newBlock.header}") ergoSettings.chainSettings.powScheme .validate(newBlock.header) @@ -200,8 +216,8 @@ class CandidateGenerator( context.become(initialized(state.copy(solvedBlock = Some(newBlock)))) StatusReply.success(()) case Failure(exception) => - log.warn(s"Removing candidate due to invalid block", exception) - context.become(initialized(state.copy(cache = None))) + log.warn(s"Removing candidates due to invalid block", exception) + context.become(initialized(state.copy(cachedCandidate = None, cachedPreviousCandidate = None))) StatusReply.error( new Exception(s"Invalid block mined: ${exception.getMessage}", exception) ) @@ -242,7 +258,8 @@ object CandidateGenerator extends ScorexLogging { /** Local state of candidate generator to avoid mutable vars */ case class CandidateGeneratorState( - cache: Option[Candidate], + cachedCandidate: Option[Candidate], + cachedPreviousCandidate: Option[Candidate], solvedBlock: Option[ErgoFullBlock], hr: ErgoHistoryReader, sr: UtxoStateReader, @@ -297,6 +314,33 @@ object CandidateGenerator extends ScorexLogging { solvedBlock.nonEmpty && !solvedBlock.map(_.parentId).contains(bestFullBlockId) } + /** Regenerate candidate to let new transactions in, miners are polling for candidate in ~ 100ms + * interval so they switch to it. + * If blockCandidateGenerationInterval elapsed since last block generation, + * then new tx in mempool is a reasonable trigger of candidate regeneration + */ + def hasCandidateExpired( + cachedCandidate: Option[Candidate], + solvedBlock: Option[ErgoFullBlock], + candidateGenInterval: FiniteDuration + ): Boolean = { + def candidateAge(c: Candidate): FiniteDuration = + (System.currentTimeMillis() - c.candidateBlock.timestamp).millis + // non-empty solved block means we wait for newly mined block to be applied + if (solvedBlock.isDefined) { + false + } else { + cachedCandidate match { + // if current candidate is older than candidateGenInterval + case Some(c) if candidateGenInterval.compare(candidateAge(c)) <= 0 => + log.info(s"Regenerating block candidate") + true + case _ => + false + } + } + } + /** Calculate average mining time from latest block header timestamps */ def getBlockMiningTimeAvg( timestamps: IndexedSeq[Header.Timestamp] diff --git a/src/main/scala/org/ergoplatform/settings/NodeConfigurationSettings.scala b/src/main/scala/org/ergoplatform/settings/NodeConfigurationSettings.scala index fdfbbc962f..ae45758b59 100644 --- a/src/main/scala/org/ergoplatform/settings/NodeConfigurationSettings.scala +++ b/src/main/scala/org/ergoplatform/settings/NodeConfigurationSettings.scala @@ -33,6 +33,7 @@ case class NodeConfigurationSettings(override val stateType: StateType, mining: Boolean, maxTransactionCost: Int, maxTransactionSize: Int, + blockCandidateGenerationInterval: FiniteDuration, useExternalMiner: Boolean, internalMinersCount: Int, internalMinerPollingInterval: FiniteDuration, @@ -77,6 +78,7 @@ trait NodeConfigurationReaders extends StateTypeReaders with CheckpointingSettin cfg.as[Boolean](s"$path.mining"), cfg.as[Int](s"$path.maxTransactionCost"), cfg.as[Int](s"$path.maxTransactionSize"), + cfg.as[FiniteDuration](s"$path.blockCandidateGenerationInterval"), cfg.as[Boolean](s"$path.useExternalMiner"), cfg.as[Int](s"$path.internalMinersCount"), cfg.as[FiniteDuration](s"$path.internalMinerPollingInterval"), diff --git a/src/test/scala/org/ergoplatform/mining/CandidateGeneratorSpec.scala b/src/test/scala/org/ergoplatform/mining/CandidateGeneratorSpec.scala index 7e3a383111..2a17bff09c 100644 --- a/src/test/scala/org/ergoplatform/mining/CandidateGeneratorSpec.scala +++ b/src/test/scala/org/ergoplatform/mining/CandidateGeneratorSpec.scala @@ -8,8 +8,9 @@ import org.bouncycastle.util.BigIntegers import org.ergoplatform.mining.CandidateGenerator.{Candidate, GenerateCandidate} import org.ergoplatform.modifiers.ErgoFullBlock import org.ergoplatform.modifiers.history.header.Header -import org.ergoplatform.modifiers.mempool.{ErgoTransaction, UnsignedErgoTransaction} +import org.ergoplatform.modifiers.mempool.{ErgoTransaction, UnconfirmedTransaction, UnsignedErgoTransaction} import org.ergoplatform.network.ErgoNodeViewSynchronizerMessages.FullBlockApplied +import org.ergoplatform.nodeView.ErgoNodeViewHolder.ReceivableMessages.LocallyGeneratedTransaction import org.ergoplatform.nodeView.ErgoReadersHolder.{GetReaders, Readers} import org.ergoplatform.nodeView.history.ErgoHistoryReader import org.ergoplatform.nodeView.state.StateType @@ -148,9 +149,108 @@ class CandidateGeneratorSpec extends AnyFlatSpec with Matchers with ErgoTestHelp } candidateGenerator.tell(block.header.powSolution, testProbe.ref) - testProbe.expectMsg(blockValidationDelay, StatusReply.success(())) - // after applying solution - testProbe.expectMsgClass(newBlockDelay, newBlockSignal) + // we fish either for ack or SSM as the order is non-deterministic + testProbe.fishForMessage(blockValidationDelay) { + case StatusReply.Success(()) => + testProbe.expectMsgPF(candidateGenDelay) { + case FullBlockApplied(header) if header.id != block.header.parentId => + } + true + case FullBlockApplied(header) if header.id != block.header.parentId => + testProbe.expectMsg(StatusReply.Success(())) + true + } + + system.terminate() + } + + it should "regenerate candidate periodically" in new TestKit( + ActorSystem() + ) { + val testProbe = new TestProbe(system) + system.eventStream.subscribe(testProbe.ref, newBlockSignal) + + val settingsWithShortRegeneration: ErgoSettings = + ErgoSettingsReader.read() + .copy( + nodeSettings = defaultSettings.nodeSettings + .copy(blockCandidateGenerationInterval = 1.millis), + chainSettings = + ErgoSettingsReader.read().chainSettings.copy(blockInterval = 1.seconds) + ) + + val viewHolderRef: ActorRef = + ErgoNodeViewRef(settingsWithShortRegeneration) + val readersHolderRef: ActorRef = ErgoReadersHolderRef(viewHolderRef) + + val candidateGenerator: ActorRef = + CandidateGenerator( + defaultMinerSecret.publicImage, + readersHolderRef, + viewHolderRef, + settingsWithShortRegeneration + ) + + val readers: Readers = await((readersHolderRef ? GetReaders).mapTo[Readers]) + + // generate block to use reward as our tx input + candidateGenerator.tell(GenerateCandidate(Seq.empty, reply = true), testProbe.ref) + testProbe.expectMsgPF(candidateGenDelay) { + case StatusReply.Success(candidate: Candidate) => + val block = settingsWithShortRegeneration.chainSettings.powScheme + .proveCandidate(candidate.candidateBlock, defaultMinerSecret.w, 0, 1000) + .get + candidateGenerator.tell(block.header.powSolution, testProbe.ref) + // we fish either for ack or SSM as the order is non-deterministic + testProbe.fishForMessage(blockValidationDelay) { + case StatusReply.Success(()) => + testProbe.expectMsgPF(candidateGenDelay) { + case FullBlockApplied(header) if header.id != block.header.parentId => + } + true + case FullBlockApplied(header) if header.id != block.header.parentId => + testProbe.expectMsg(StatusReply.Success(())) + true + } + } + + // build new transaction that uses miner's reward as input + val prop: DLogProtocol.ProveDlog = + DLogProverInput(BigIntegers.fromUnsignedByteArray("test".getBytes())).publicImage + val newlyMinedBlock = readers.h.bestFullBlockOpt.get + val rewardBox: ErgoBox = newlyMinedBlock.transactions.last.outputs.last + rewardBox.propositionBytes shouldBe ErgoTreePredef + .rewardOutputScript(emission.settings.minerRewardDelay, defaultMinerPk) + .bytes + val input = Input(rewardBox.id, emptyProverResult) + + val outputs = IndexedSeq( + new ErgoBoxCandidate(rewardBox.value, prop, readers.s.stateContext.currentHeight) + ) + val unsignedTx = new UnsignedErgoTransaction(IndexedSeq(input), IndexedSeq(), outputs) + + val tx = ErgoTransaction( + defaultProver + .sign(unsignedTx, IndexedSeq(rewardBox), IndexedSeq(), readers.s.stateContext) + .get + ) + + // candidate should be regenerated immediately after a mempool change + candidateGenerator.tell(GenerateCandidate(Seq.empty, reply = true), testProbe.ref) + testProbe.expectMsgPF(candidateGenDelay) { + case StatusReply.Success(candidate: Candidate) => + // this triggers mempool change that triggers candidate regeneration + viewHolderRef ! LocallyGeneratedTransaction(UnconfirmedTransaction(tx, None)) + expectNoMessage(candidateGenDelay) + candidateGenerator.tell(GenerateCandidate(Seq.empty, reply = true), testProbe.ref) + testProbe.expectMsgPF(candidateGenDelay) { + case StatusReply.Success(regeneratedCandidate: Candidate) => + // regeneratedCandidate now contains new transaction + regeneratedCandidate.candidateBlock shouldNot be( + candidate.candidateBlock + ) + } + } system.terminate() } diff --git a/src/test/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexerTestActor.scala b/src/test/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexerTestActor.scala index e7fd56f122..db988121eb 100644 --- a/src/test/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexerTestActor.scala +++ b/src/test/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexerTestActor.scala @@ -25,8 +25,8 @@ class ExtraIndexerTestActor(test: ExtraIndexerSpecification) extends ExtraIndexe val nodeSettings: NodeConfigurationSettings = NodeConfigurationSettings(StateType.Utxo, verifyTransactions = true, -1, UtxoSettings(utxoBootstrap = false, 0, 2), NipopowSettings(nipopowBootstrap = false, 1), mining = false, - ChainGenerator.txCostLimit, ChainGenerator.txSizeLimit, useExternalMiner = false, internalMinersCount = 1, - internalMinerPollingInterval = 1.second, miningPubKeyHex = None, offlineGeneration = false, + ChainGenerator.txCostLimit, ChainGenerator.txSizeLimit, blockCandidateGenerationInterval = 20.seconds, useExternalMiner = false, + internalMinersCount = 1, internalMinerPollingInterval = 1.second, miningPubKeyHex = None, offlineGeneration = false, 200, 5.minutes, 100000, 1.minute, mempoolSorting = SortingOption.FeePerByte, rebroadcastCount = 20, 1000000, 100, adProofsSuffixLength = 112 * 1024, extraIndex = false) diff --git a/src/test/scala/org/ergoplatform/settings/ErgoSettingsSpecification.scala b/src/test/scala/org/ergoplatform/settings/ErgoSettingsSpecification.scala index fc6adce7fd..ccee904dc2 100644 --- a/src/test/scala/org/ergoplatform/settings/ErgoSettingsSpecification.scala +++ b/src/test/scala/org/ergoplatform/settings/ErgoSettingsSpecification.scala @@ -31,6 +31,7 @@ class ErgoSettingsSpecification extends ErgoCorePropertyTest { txCostLimit, txSizeLimit, useExternalMiner = false, + blockCandidateGenerationInterval = 20.seconds, internalMinersCount = 1, internalMinerPollingInterval = 1.second, miningPubKeyHex = None, @@ -80,6 +81,7 @@ class ErgoSettingsSpecification extends ErgoCorePropertyTest { txCostLimit, txSizeLimit, useExternalMiner = false, + blockCandidateGenerationInterval = 20.seconds, internalMinersCount = 1, internalMinerPollingInterval = 1.second, miningPubKeyHex = None, @@ -122,6 +124,7 @@ class ErgoSettingsSpecification extends ErgoCorePropertyTest { txCostLimit, txSizeLimit, useExternalMiner = false, + blockCandidateGenerationInterval = 20.seconds, internalMinersCount = 1, internalMinerPollingInterval = 1.second, miningPubKeyHex = None, diff --git a/src/test/scala/org/ergoplatform/tools/ChainGenerator.scala b/src/test/scala/org/ergoplatform/tools/ChainGenerator.scala index 7b7175feec..93996ceb4f 100644 --- a/src/test/scala/org/ergoplatform/tools/ChainGenerator.scala +++ b/src/test/scala/org/ergoplatform/tools/ChainGenerator.scala @@ -63,8 +63,8 @@ object ChainGenerator extends App with ErgoTestHelpers with Matchers { val txCostLimit = initSettings.nodeSettings.maxTransactionCost val txSizeLimit = initSettings.nodeSettings.maxTransactionSize val nodeSettings: NodeConfigurationSettings = NodeConfigurationSettings(StateType.Utxo, verifyTransactions = true, - -1, UtxoSettings(false, 0, 2), NipopowSettings(false, 1), mining = false, txCostLimit, txSizeLimit, useExternalMiner = false, - internalMinersCount = 1, internalMinerPollingInterval = 1.second, miningPubKeyHex = None, offlineGeneration = false, + -1, UtxoSettings(false, 0, 2), NipopowSettings(false, 1), mining = false, txCostLimit, txSizeLimit, blockCandidateGenerationInterval = 20.seconds, + useExternalMiner = false, internalMinersCount = 1, internalMinerPollingInterval = 1.second, miningPubKeyHex = None, offlineGeneration = false, 200, 5.minutes, 100000, 1.minute, mempoolSorting = SortingOption.FeePerByte, rebroadcastCount = 20, 1000000, 100, adProofsSuffixLength = 112*1024, extraIndex = false) val ms = settings.chainSettings.monetary.copy( diff --git a/src/test/scala/org/ergoplatform/utils/HistoryTestHelpers.scala b/src/test/scala/org/ergoplatform/utils/HistoryTestHelpers.scala index e8e9756da4..fd3aebbfaf 100644 --- a/src/test/scala/org/ergoplatform/utils/HistoryTestHelpers.scala +++ b/src/test/scala/org/ergoplatform/utils/HistoryTestHelpers.scala @@ -50,8 +50,8 @@ object HistoryTestHelpers extends FileUtils { val txCostLimit = initSettings.nodeSettings.maxTransactionCost val txSizeLimit = initSettings.nodeSettings.maxTransactionSize val nodeSettings: NodeConfigurationSettings = NodeConfigurationSettings(stateType, verifyTransactions, blocksToKeep, - UtxoSettings(false, 0, 2), NipopowSettings(false, 1), mining = false, txCostLimit, txSizeLimit, useExternalMiner = false, - internalMinersCount = 1, internalMinerPollingInterval = 1.second, miningPubKeyHex = None, + UtxoSettings(false, 0, 2), NipopowSettings(false, 1), mining = false, txCostLimit, txSizeLimit, blockCandidateGenerationInterval = 20.seconds, + useExternalMiner = false, internalMinersCount = 1, internalMinerPollingInterval = 1.second, miningPubKeyHex = None, offlineGeneration = false, 200, 5.minutes, 100000, 1.minute, mempoolSorting = SortingOption.FeePerByte, rebroadcastCount = 200, 1000000, 100, adProofsSuffixLength = 112*1024, extraIndex = false ) diff --git a/src/test/scala/org/ergoplatform/utils/Stubs.scala b/src/test/scala/org/ergoplatform/utils/Stubs.scala index b63519caf1..6a7e816aff 100644 --- a/src/test/scala/org/ergoplatform/utils/Stubs.scala +++ b/src/test/scala/org/ergoplatform/utils/Stubs.scala @@ -375,8 +375,8 @@ trait Stubs extends ErgoTestHelpers with TestFileUtils { val txCostLimit = initSettings.nodeSettings.maxTransactionCost val txSizeLimit = initSettings.nodeSettings.maxTransactionSize val nodeSettings: NodeConfigurationSettings = NodeConfigurationSettings(stateType, verifyTransactions, blocksToKeep, - UtxoSettings(false, 0, 2), NipopowSettings(poPoWBootstrap, 1), mining = false, txCostLimit, txSizeLimit, useExternalMiner = false, - internalMinersCount = 1, internalMinerPollingInterval = 1.second,miningPubKeyHex = None, + UtxoSettings(false, 0, 2), NipopowSettings(poPoWBootstrap, 1), mining = false, txCostLimit, txSizeLimit, blockCandidateGenerationInterval = 20.seconds, + useExternalMiner = false, internalMinersCount = 1, internalMinerPollingInterval = 1.second,miningPubKeyHex = None, offlineGeneration = false, 200, 5.minutes, 100000, 1.minute, mempoolSorting = SortingOption.FeePerByte, rebroadcastCount = 200, 1000000, 100, adProofsSuffixLength = 112*1024, extraIndex = false )