diff --git a/README.md b/README.md index 519a17f97a..26e2f9d742 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ Ergo Platform website: [https://ergoplatform.org/](https://ergoplatform.org/) * Memory-hard Proof-of-Work function [Autolykos2](https://docs.ergoplatform.com/ErgoPow.pdf) * Support for stateless clients (asymmetric, based on [https://eprint.iacr.org/2016/994](https://eprint.iacr.org/2016/994)), [NiPoPoWs](https://eprint.iacr.org/2017/963.pdf), hybrid modes -* [Alternative transactional language](https://github.com/ScorexFoundation/sigmastate-interpreter), which is more powerful that Bitcoin Script but also safe against +* [Alternative transactional language](https://github.com/ScorexFoundation/sigmastate-interpreter), which is more powerful than Bitcoin Script but also safe against heavy validation attacks * Alternative fee model with [mandatory storage-rent component](https://fc18.ifca.ai/bitcoin/papers/bitcoin18-final18.pdf ) diff --git a/benchmarks/src/test/scala/org/ergoplatform/bench/CrawlerActor.scala b/benchmarks/src/test/scala/org/ergoplatform/bench/CrawlerActor.scala deleted file mode 100644 index b6df6f3141..0000000000 --- a/benchmarks/src/test/scala/org/ergoplatform/bench/CrawlerActor.scala +++ /dev/null @@ -1,34 +0,0 @@ -package org.ergoplatform.bench - -import akka.actor.{Actor, ActorRef, ActorSystem, Props} -import org.ergoplatform.bench.misc.CrawlerConfig -import org.ergoplatform.network.ErgoNodeViewSynchronizer.ReceivableMessages.FullBlockApplied -import scorex.util.ScorexLogging - -class CrawlerActor(c: CrawlerConfig) extends Actor with ScorexLogging { - - override def preStart(): Unit = { - context.system.eventStream.subscribe(self, classOf[FullBlockApplied]) - () - } - - override def receive: Receive = { - case FullBlockApplied(header) => - val height = header.height - if (height % 100 == 0) logger.info(s"Got $height modifiers") - if (header.height >= c.threshold) { - log.error("Got enough modifiers.") - log.warn("Exiting benchmark..") - System.exit(0) - } - } - -} - -object CrawlerActor { - - def apply(cc: CrawlerConfig)(implicit ac: ActorSystem): ActorRef = { - ac.actorOf(Props.apply(classOf[CrawlerActor], cc)) - } - -} diff --git a/benchmarks/src/test/scala/org/ergoplatform/bench/CrawlerRunner.scala b/benchmarks/src/test/scala/org/ergoplatform/bench/CrawlerRunner.scala deleted file mode 100644 index 1be231ebd1..0000000000 --- a/benchmarks/src/test/scala/org/ergoplatform/bench/CrawlerRunner.scala +++ /dev/null @@ -1,76 +0,0 @@ -package org.ergoplatform.bench - -import java.io.File - -import akka.actor.ActorRef -import org.ergoplatform.bench.misc.{CrawlerConfig, TempDir} -import org.ergoplatform.http.api.{BlocksApiRoute, ErgoUtilsApiRoute, InfoApiRoute, TransactionsApiRoute} -import org.ergoplatform.local.ErgoStatsCollectorRef -import org.ergoplatform.mining.ErgoMiner -import org.ergoplatform.mining.emission.EmissionRules -import org.ergoplatform.network.{ErgoNodeViewSynchronizer, ErgoSyncTracker} -import org.ergoplatform.nodeView.history.ErgoSyncInfoMessageSpec -import org.ergoplatform.nodeView.{ErgoNodeViewRef, ErgoReadersHolderRef} -import org.ergoplatform.settings.{Args, ErgoSettings} -import scorex.core.api.http.ApiRoute -import scorex.core.app.Application -import scorex.core.network.{DeliveryTracker, PeerFeature} -import scorex.core.network.message.MessageSpec -import scorex.core.settings.ScorexSettings - -import scala.concurrent.ExecutionContextExecutor -import scala.io.Source - -class CrawlerRunner(args: Array[String]) extends Application { - - lazy val fileToSave: String = args.headOption.getOrElse("/") - lazy val threshold: Int = args.lift(1).getOrElse("15000").toInt - lazy val cfgPath: Option[String] = args.lift(2) - lazy val benchConfig: CrawlerConfig = CrawlerConfig(fileToSave, threshold) - lazy val tempDir: File = TempDir.createTempDir - - log.info(s"Temp dir is ${tempDir.getAbsolutePath}") - log.info(s"Config is $benchConfig") - - override protected lazy val features: Seq[PeerFeature] = Seq() - - implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher - - override val ergoSettings: ErgoSettings = ErgoSettings.read(Args(cfgPath, None)) - - lazy val emission = new EmissionRules(ergoSettings.chainSettings.monetary) - - override implicit lazy val scorexSettings: ScorexSettings = ergoSettings.scorexSettings - - override protected lazy val additionalMessageSpecs: Seq[MessageSpec[_]] = Seq(ErgoSyncInfoMessageSpec) - override val nodeViewHolderRef: ActorRef = ErgoNodeViewRef(ergoSettings, timeProvider) - - val readersHolderRef: ActorRef = ErgoReadersHolderRef(nodeViewHolderRef) - - val minerRef: ActorRef = ErgoMiner(ergoSettings, nodeViewHolderRef, readersHolderRef, timeProvider) - - private val syncTracker = ErgoSyncTracker(scorexSettings.network, timeProvider) - - val statsCollectorRef: ActorRef = - ErgoStatsCollectorRef(nodeViewHolderRef, networkControllerRef, syncTracker, ergoSettings, timeProvider) - - - override val apiRoutes: Seq[ApiRoute] = Seq( - ErgoUtilsApiRoute(ergoSettings), - InfoApiRoute(statsCollectorRef, scorexSettings.restApi, timeProvider), - BlocksApiRoute(nodeViewHolderRef, readersHolderRef, ergoSettings), - TransactionsApiRoute(readersHolderRef, nodeViewHolderRef, ergoSettings)) - - override val swaggerConfig: String = Source.fromResource("api/openapi.yaml").getLines.mkString("\n") - - private val deliveryTracker: DeliveryTracker = DeliveryTracker.empty(ergoSettings) - - override val nodeViewSynchronizer: ActorRef = - ErgoNodeViewSynchronizer(networkControllerRef, nodeViewHolderRef, ErgoSyncInfoMessageSpec, - ergoSettings, timeProvider, syncTracker, deliveryTracker) - -} - -object CrawlerRunner { - def main(args: Array[String]): Unit = new CrawlerRunner(args).run() -} diff --git a/src/main/resources/api/openapi.yaml b/src/main/resources/api/openapi.yaml index d5f8822eb0..c5a13018e1 100644 --- a/src/main/resources/api/openapi.yaml +++ b/src/main/resources/api/openapi.yaml @@ -1,7 +1,7 @@ openapi: "3.0.2" info: - version: "5.0.3" + version: "5.0.4" title: Ergo Node API description: API docs for Ergo Node. Models are shared between all Ergo products contact: diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 1a9fc953d0..58d20ccb78 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -408,7 +408,7 @@ scorex { nodeName = "ergo-node" # Network protocol version to be sent in handshakes - appVersion = 5.0.3 + appVersion = 5.0.4 # Network agent name. May contain information about client code # stack, starting from core code-base up to the end graphical interface. diff --git a/src/main/resources/mainnet.conf b/src/main/resources/mainnet.conf index ea3c7a51ce..9d62d3aca0 100644 --- a/src/main/resources/mainnet.conf +++ b/src/main/resources/mainnet.conf @@ -80,7 +80,7 @@ scorex { network { magicBytes = [1, 0, 2, 4] bindAddress = "0.0.0.0:9030" - nodeName = "ergo-mainnet-5.0.3" + nodeName = "ergo-mainnet-5.0.4" nodeName = ${?NODENAME} knownPeers = [ "213.239.193.208:9030", diff --git a/src/main/resources/testnet.conf b/src/main/resources/testnet.conf index 98448953af..d9f7a4e840 100644 --- a/src/main/resources/testnet.conf +++ b/src/main/resources/testnet.conf @@ -95,7 +95,7 @@ scorex { network { magicBytes = [2, 0, 2, 3] bindAddress = "0.0.0.0:9022" - nodeName = "ergo-testnet-5.0" + nodeName = "ergo-testnet-5.0.4" nodeName = ${?NODENAME} knownPeers = [ "213.239.193.208:9022", diff --git a/src/main/scala/org/ergoplatform/ErgoApp.scala b/src/main/scala/org/ergoplatform/ErgoApp.scala index 8f8eaa23f0..8065c11c99 100644 --- a/src/main/scala/org/ergoplatform/ErgoApp.scala +++ b/src/main/scala/org/ergoplatform/ErgoApp.scala @@ -20,6 +20,7 @@ import scorex.core.api.http._ import scorex.core.app.ScorexContext import scorex.core.network.NetworkController.ReceivableMessages.ShutdownNetwork import scorex.core.network._ +import scorex.core.network.message.Message.MessageCode import scorex.core.network.message._ import scorex.core.network.peer.PeerManagerRef import scorex.core.settings.ScorexSettings @@ -36,7 +37,10 @@ class ErgoApp(args: Args) extends ScorexLogging { private val ergoSettings: ErgoSettings = ErgoSettings.read(args) - require(ergoSettings.scorexSettings.restApi.apiKeyHash.isDefined, "API key hash must be set") + require( + ergoSettings.scorexSettings.restApi.apiKeyHash.isDefined, + "API key hash must be set" + ) log.info(s"Working directory: ${ergoSettings.directory}") log.info(s"Secret directory: ${ergoSettings.walletSettings.secretStorage.secretDir}") @@ -45,20 +49,25 @@ class ErgoApp(args: Args) extends ScorexLogging { overrideLogLevel() - implicit private val actorSystem: ActorSystem = ActorSystem(scorexSettings.network.agentName) + implicit private val actorSystem: ActorSystem = ActorSystem( + scorexSettings.network.agentName + ) implicit private val executionContext: ExecutionContext = actorSystem.dispatcher private val timeProvider = new NetworkTimeProvider(scorexSettings.ntp) private val upnpGateway: Option[UPnPGateway] = - if (scorexSettings.network.upnpEnabled) UPnP.getValidGateway(scorexSettings.network) else None + if (scorexSettings.network.upnpEnabled) UPnP.getValidGateway(scorexSettings.network) + else None upnpGateway.foreach(_.addPort(scorexSettings.network.bindAddress.getPort)) //an address to send to peers private val externalSocketAddress: Option[InetSocketAddress] = - scorexSettings.network.declaredAddress orElse { - upnpGateway.map(u => new InetSocketAddress(u.externalAddress, scorexSettings.network.bindAddress.getPort)) - } + scorexSettings.network.declaredAddress orElse { + upnpGateway.map(u => + new InetSocketAddress(u.externalAddress, scorexSettings.network.bindAddress.getPort) + ) + } private val basicSpecs = { Seq( @@ -73,17 +82,14 @@ class ErgoApp(args: Args) extends ScorexLogging { private val additionalMessageSpecs: Seq[MessageSpec[_]] = Seq(ErgoSyncInfoMessageSpec) private val scorexContext = ScorexContext( - messageSpecs = basicSpecs ++ additionalMessageSpecs, - upnpGateway = upnpGateway, - timeProvider = timeProvider, + messageSpecs = basicSpecs ++ additionalMessageSpecs, + upnpGateway = upnpGateway, + timeProvider = timeProvider, externalNodeAddress = externalSocketAddress ) private val peerManagerRef = PeerManagerRef(ergoSettings, scorexContext) - private val networkControllerRef: ActorRef = NetworkControllerRef( - "networkController", ergoSettings, peerManagerRef, scorexContext) - private val nodeViewHolderRef: ActorRef = ErgoNodeViewRef(ergoSettings, timeProvider) private val readersHolderRef: ActorRef = ErgoReadersHolderRef(nodeViewHolderRef) @@ -96,14 +102,12 @@ class ErgoApp(args: Args) extends ScorexLogging { None } - private val syncTracker = ErgoSyncTracker(scorexSettings.network, timeProvider) - private val statsCollectorRef: ActorRef = ErgoStatsCollectorRef(readersHolderRef, networkControllerRef, syncTracker, ergoSettings, timeProvider) + private val deliveryTracker: DeliveryTracker = DeliveryTracker.empty(ergoSettings) // touch it to run preStart method of the actor which is in turn running schedulers - ErgoNodeViewSynchronizer( - networkControllerRef, + private val ergoNodeViewSynchronizerRefPartial = ErgoNodeViewSynchronizer.make( nodeViewHolderRef, ErgoSyncInfoMessageSpec, ergoSettings, @@ -112,29 +116,73 @@ class ErgoApp(args: Args) extends ScorexLogging { deliveryTracker ) - if (ergoSettings.scorexSettings.network.peerDiscovery) { - // Launching PeerSynchronizer actor which is then registering itself at network controller - PeerSynchronizerRef("PeerSynchronizer", networkControllerRef, peerManagerRef, scorexSettings.network) - } + private val messageHandlers: ActorRef => Map[MessageCode, ActorRef] = + networkControllerRef => { + val ergoNodeViewSynchronizerRef = ergoNodeViewSynchronizerRefPartial( + networkControllerRef + ) + var map: Map[MessageCode, ActorRef] = Map( + InvSpec.messageCode -> ergoNodeViewSynchronizerRef, + RequestModifierSpec.messageCode -> ergoNodeViewSynchronizerRef, + ModifiersSpec.messageCode -> ergoNodeViewSynchronizerRef, + ErgoSyncInfoMessageSpec.messageCode -> ergoNodeViewSynchronizerRef + ) + // Launching PeerSynchronizer actor which is then registering itself at network controller + if (ergoSettings.scorexSettings.network.peerDiscovery) { + val psr = PeerSynchronizerRef( + "PeerSynchronizer", + networkControllerRef, + peerManagerRef, + scorexSettings.network + ) + map ++= Map( + PeersSpec.messageCode -> psr, + GetPeersSpec.messageCode -> psr + ) + } + map + } - private val apiRoutes: Seq[ApiRoute] = Seq( - EmissionApiRoute(ergoSettings), - ErgoUtilsApiRoute(ergoSettings), - ErgoPeersApiRoute(peerManagerRef, networkControllerRef, syncTracker, deliveryTracker, scorexSettings.restApi), - InfoApiRoute(statsCollectorRef, scorexSettings.restApi, timeProvider), - BlocksApiRoute(nodeViewHolderRef, readersHolderRef, ergoSettings), - NipopowApiRoute(nodeViewHolderRef, readersHolderRef, ergoSettings), - TransactionsApiRoute(readersHolderRef, nodeViewHolderRef, ergoSettings), - WalletApiRoute(readersHolderRef, nodeViewHolderRef, ergoSettings), - UtxoApiRoute(readersHolderRef, scorexSettings.restApi), - ScriptApiRoute(readersHolderRef, ergoSettings), - ScanApiRoute(readersHolderRef, ergoSettings), - NodeApiRoute(ergoSettings) - ) ++ minerRefOpt.map(minerRef => MiningApiRoute(minerRef, ergoSettings)).toSeq + private val networkControllerRef: ActorRef = + NetworkControllerRef( + "networkController", + ergoSettings, + peerManagerRef, + scorexContext, + messageHandlers + ) + + private val statsCollectorRef: ActorRef = ErgoStatsCollectorRef( + readersHolderRef, + networkControllerRef, + syncTracker, + ergoSettings, + timeProvider + ) + private val apiRoutes: Seq[ApiRoute] = Seq( + EmissionApiRoute(ergoSettings), + ErgoUtilsApiRoute(ergoSettings), + ErgoPeersApiRoute( + peerManagerRef, + networkControllerRef, + syncTracker, + deliveryTracker, + scorexSettings.restApi + ), + InfoApiRoute(statsCollectorRef, scorexSettings.restApi, timeProvider), + BlocksApiRoute(nodeViewHolderRef, readersHolderRef, ergoSettings), + NipopowApiRoute(nodeViewHolderRef, readersHolderRef, ergoSettings), + TransactionsApiRoute(readersHolderRef, nodeViewHolderRef, ergoSettings), + WalletApiRoute(readersHolderRef, nodeViewHolderRef, ergoSettings), + UtxoApiRoute(readersHolderRef, scorexSettings.restApi), + ScriptApiRoute(readersHolderRef, ergoSettings), + ScanApiRoute(readersHolderRef, ergoSettings), + NodeApiRoute(ergoSettings) + ) ++ minerRefOpt.map(minerRef => MiningApiRoute(minerRef, ergoSettings)).toSeq private val swaggerRoute = SwaggerRoute(scorexSettings.restApi, swaggerConfig) - private val panelRoute = NodePanelRoute() + private val panelRoute = NodePanelRoute() private val httpService = ErgoHttpService(apiRoutes, swaggerRoute, panelRoute) @@ -154,8 +202,12 @@ class ErgoApp(args: Args) extends ScorexLogging { Some(ShutdownNetwork) ) - coordinatedShutdown.addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "stop-upnpGateway") { () => - Future(upnpGateway.foreach(_.deletePort(scorexSettings.network.bindAddress.getPort))).map(_ => Done) + coordinatedShutdown.addTask( + CoordinatedShutdown.PhaseBeforeServiceUnbind, + "stop-upnpGateway" + ) { () => + Future(upnpGateway.foreach(_.deletePort(scorexSettings.network.bindAddress.getPort))) + .map(_ => Done) } if (!ergoSettings.nodeSettings.stateType.requireProofs) { @@ -167,12 +219,13 @@ class ErgoApp(args: Args) extends ScorexLogging { */ private def overrideLogLevel() { val loggerContext = LoggerFactory.getILoggerFactory.asInstanceOf[LoggerContext] - val root = loggerContext.getLogger(Logger.ROOT_LOGGER_NAME) + val root = loggerContext.getLogger(Logger.ROOT_LOGGER_NAME) root.setLevel(Level.toLevel(ergoSettings.scorexSettings.logging.level)) } - private def swaggerConfig: String = Source.fromResource("api/openapi.yaml").getLines.mkString("\n") + private def swaggerConfig: String = + Source.fromResource("api/openapi.yaml").getLines.mkString("\n") private def run(): Future[ServerBinding] = { require(scorexSettings.network.agentName.length <= ErgoApp.ApplicationNameLimit) @@ -183,36 +236,41 @@ class ErgoApp(args: Args) extends ScorexLogging { if (ergoSettings.chainSettings.reemission.checkReemissionRules) { log.info("Checking re-emission rules enabled") - log.info(s"EIP27 activation height: " + ergoSettings.chainSettings.reemission.activationHeight) + log.info( + s"EIP27 activation height: " + ergoSettings.chainSettings.reemission.activationHeight + ) } val bindAddress = scorexSettings.restApi.bindAddress - Http().newServerAt(bindAddress.getAddress.getHostAddress, bindAddress.getPort).bindFlow(httpService.compositeRoute) + Http() + .newServerAt(bindAddress.getAddress.getHostAddress, bindAddress.getPort) + .bindFlow(httpService.compositeRoute) } } object ErgoApp extends ScorexLogging { val ApplicationNameLimit: Int = 50 + val argParser = new scopt.OptionParser[Args]("ergo") { - opt[String]("config") - .abbr("c") - .action((x, c) => c.copy(userConfigPathOpt = Some(x))) - .text("location of ergo node configuration") - .optional() - opt[Unit]("devnet") - .action((_, c) => c.copy(networkTypeOpt = Some(NetworkType.DevNet))) - .text("set network to devnet") - .optional() - opt[Unit]("testnet") - .action((_, c) => c.copy(networkTypeOpt = Some(NetworkType.TestNet))) - .text("set network to testnet") - .optional() - opt[Unit]("mainnet") - .action((_, c) => c.copy(networkTypeOpt = Some(NetworkType.MainNet))) - .text("set network to mainnet") - .optional() - help("help").text("prints this usage text") + opt[String]("config") + .abbr("c") + .action((x, c) => c.copy(userConfigPathOpt = Some(x))) + .text("location of ergo node configuration") + .optional() + opt[Unit]("devnet") + .action((_, c) => c.copy(networkTypeOpt = Some(NetworkType.DevNet))) + .text("set network to devnet") + .optional() + opt[Unit]("testnet") + .action((_, c) => c.copy(networkTypeOpt = Some(NetworkType.TestNet))) + .text("set network to testnet") + .optional() + opt[Unit]("mainnet") + .action((_, c) => c.copy(networkTypeOpt = Some(NetworkType.MainNet))) + .text("set network to mainnet") + .optional() + help("help").text("prints this usage text") } /** Internal failure causing shutdown */ @@ -228,8 +286,9 @@ object ErgoApp extends ScorexLogging { def forceStopApplication(code: Int = 1): Nothing = sys.exit(code) /** The only proper way of application shutdown after actor system initialization */ - def shutdownSystem(reason: CoordinatedShutdown.Reason = InternalShutdown) - (implicit system: ActorSystem): Future[Done] = + def shutdownSystem( + reason: CoordinatedShutdown.Reason = InternalShutdown + )(implicit system: ActorSystem): Future[Done] = CoordinatedShutdown(system).run(reason) def main(args: Array[String]): Unit = { diff --git a/src/main/scala/org/ergoplatform/network/ErgoNodeViewSynchronizer.scala b/src/main/scala/org/ergoplatform/network/ErgoNodeViewSynchronizer.scala index 747dab25cf..61ab61ec0b 100644 --- a/src/main/scala/org/ergoplatform/network/ErgoNodeViewSynchronizer.scala +++ b/src/main/scala/org/ergoplatform/network/ErgoNodeViewSynchronizer.scala @@ -17,7 +17,7 @@ import org.ergoplatform.nodeView.ErgoNodeViewHolder._ import scorex.core.consensus.{Equal, Fork, Nonsense, Older, Unknown, Younger} import scorex.core.network.ModifiersStatus.Requested import scorex.core.{ModifierTypeId, NodeViewModifier, idsToString} -import scorex.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork} +import scorex.core.network.NetworkController.ReceivableMessages.{PenalizePeer, SendToNetwork} import org.ergoplatform.network.ErgoNodeViewSynchronizer.ReceivableMessages._ import org.ergoplatform.nodeView.state.{ErgoStateReader, UtxoStateReader} import org.ergoplatform.nodeView.wallet.ErgoWalletReader @@ -75,6 +75,8 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, private var syncInfoV2CacheByHeadersHeight: Option[(Int, ErgoSyncInfoV2)] = Option.empty + private var syncInfoV3CacheByHeadersHeight: Option[(Int, ErgoSyncInfoV3)] = Option.empty + private val networkSettings: NetworkSettings = settings.scorexSettings.network protected val deliveryTimeout: FiniteDuration = networkSettings.deliveryTimeout @@ -203,10 +205,6 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, val toDownloadCheckInterval = networkSettings.syncInterval - // register as a handler for synchronization-specific types of messages - val messageSpecs: Seq[MessageSpec[_]] = Seq(InvSpec, RequestModifierSpec, ModifiersSpec, syncInfoSpec) - networkControllerRef ! RegisterMessageSpecs(messageSpecs, self) - // register as a listener for peers got connected (handshaked) or disconnected context.system.eventStream.subscribe(self, classOf[HandshakedPeer]) context.system.eventStream.subscribe(self, classOf[DisconnectedPeer]) @@ -271,11 +269,24 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, } } + private def getV3SyncInfo(history: ErgoHistory, full: Boolean): ErgoSyncInfoV3 = { + val headersHeight = history.headersHeight + syncInfoV3CacheByHeadersHeight + .collect { case (height, syncInfo) if height == headersHeight => syncInfo } + .getOrElse { + val v3SyncInfo = history.syncInfoV3(full) + syncInfoV3CacheByHeadersHeight = Some(headersHeight -> v3SyncInfo) + v3SyncInfo + } + } + /** * Whether neighbour peer `remote` supports sync protocol V2. */ def syncV2Supported(remote: ConnectedPeer): Boolean = SyncV2Filter.condition(remote) + def syncV3Supported(remote: ConnectedPeer): Boolean = SyncV3Filter.condition(remote) + /** * Send synchronization statuses to neighbour peers * @@ -288,7 +299,20 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, */ protected def sendSync(history: ErgoHistory): Unit = { val peers = syncTracker.peersToSyncWith() - val (peersV2, peersV1) = peers.partition(p => syncV2Supported(p)) + val peersV1 = mutable.Buffer[ConnectedPeer]() + val peersV2 = mutable.Buffer[ConnectedPeer]() + val peersV3 = mutable.Buffer[ConnectedPeer]() + + peers.foreach{peer => + if(syncV3Supported(peer)) { + peersV3 += peer + } else if(syncV2Supported(peer)) { + peersV2 += peer + } else { + peersV1 += peer + } + } + log.debug(s"Syncing with ${peersV1.size} peers via sync v1, ${peersV2.size} peers via sync v2") if (peersV1.nonEmpty) { val msg = Message(syncInfoSpec, Right(getV1SyncInfo(history)), None) @@ -299,6 +323,11 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, val v2SyncInfo = getV2SyncInfo(history, full = true) networkControllerRef ! SendToNetwork(Message(syncInfoSpec, Right(v2SyncInfo), None), SendToPeers(peersV2)) } + if (peersV3.nonEmpty) { + //todo: send only last header to peers which are equal or younger + val v3SyncInfo = getV3SyncInfo(history, full = true) + networkControllerRef ! SendToNetwork(Message(syncInfoSpec, Right(v3SyncInfo), None), SendToPeers(peersV3)) + } } /** @@ -330,7 +359,7 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, log.debug(s"Processing sync from $remote") syncInfo match { case syncV1: ErgoSyncInfoV1 => processSyncV1(hr, syncV1, remote) - case syncV2: ErgoSyncInfoV2 => processSyncV2(hr, syncV2, remote) + case otherVersion: HeadersBasedSyncInfo => processSyncV2(hr, otherVersion, remote) } } else { log.debug(s"Spammy sync detected from $remote") @@ -391,7 +420,7 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, /** * Processing sync V2 message `syncInfo` got from neighbour peer `remote` (supporting sync v2) */ - protected def processSyncV2(hr: ErgoHistory, syncInfo: ErgoSyncInfoV2, remote: ConnectedPeer): Unit = { + protected def processSyncV2(hr: ErgoHistory, syncInfo: HeadersBasedSyncInfo, remote: ConnectedPeer): Unit = { val (status, syncSendNeeded) = syncTracker.updateStatus(remote, syncInfo, hr) status match { @@ -441,7 +470,7 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, * * @param syncInfo other's node sync info */ - private def applyValidContinuationHeaderV2(syncInfo: ErgoSyncInfoV2, + private def applyValidContinuationHeaderV2(syncInfo: HeadersBasedSyncInfo, history: ErgoHistory, peer: ConnectedPeer): Unit = { history.continuationHeaderV2(syncInfo).foreach { continuationHeader => @@ -950,7 +979,7 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, protected def peerManagerEvents: Receive = { case HandshakedPeer(remote) => - syncTracker.updateStatus(remote, status = Unknown, height = None) + syncTracker.updateStatus(remote, status = Unknown, height = None, peerHeaders = Seq(0 -> 0), peerFullblocks = Seq(0 -> 0)) case DisconnectedPeer(connectedPeer) => syncTracker.clearStatus(connectedPeer) @@ -994,8 +1023,7 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, minModifiersPerBucket, maxModifiersPerBucket )(getPeersForDownloadingBlocks) { howManyPerType => - val tip = historyReader.estimatedTip() - historyReader.nextModifiersToDownload(howManyPerType, tip, downloadRequired(historyReader)) + historyReader.nextModifiersToDownload(howManyPerType, downloadRequired(historyReader)) } } @@ -1157,15 +1185,14 @@ object ErgoNodeViewSynchronizer { Props(new ErgoNodeViewSynchronizer(networkControllerRef, viewHolderRef, syncInfoSpec, settings, timeProvider, syncTracker, deliveryTracker)) - def apply(networkControllerRef: ActorRef, - viewHolderRef: ActorRef, + def make(viewHolderRef: ActorRef, syncInfoSpec: ErgoSyncInfoMessageSpec.type, settings: ErgoSettings, timeProvider: NetworkTimeProvider, syncTracker: ErgoSyncTracker, deliveryTracker: DeliveryTracker) - (implicit context: ActorRefFactory, ex: ExecutionContext): ActorRef = - context.actorOf(props(networkControllerRef, viewHolderRef, syncInfoSpec, settings, timeProvider, syncTracker, deliveryTracker)) + (implicit context: ActorRefFactory, ex: ExecutionContext): ActorRef => ActorRef = + networkControllerRef => context.actorOf(props(networkControllerRef, viewHolderRef, syncInfoSpec, settings, timeProvider, syncTracker, deliveryTracker)) /** * Container for aggregated costs of accepted, declined or invalidated transactions. Can be used to track global diff --git a/src/main/scala/org/ergoplatform/network/ErgoPeerStatus.scala b/src/main/scala/org/ergoplatform/network/ErgoPeerStatus.scala index cef1acd5a8..99dee29ca7 100644 --- a/src/main/scala/org/ergoplatform/network/ErgoPeerStatus.scala +++ b/src/main/scala/org/ergoplatform/network/ErgoPeerStatus.scala @@ -12,13 +12,17 @@ import scorex.core.utils.TimeProvider.Time * * @param peer - peer information (public address, exposed info on operating mode etc) * @param status - peer's blockchain status (is it ahead or behind our, or on fork) - * @param height - peer's height + * @param headersHeight - peer's height + * @param storedHeaders + * @param storedFullblocks * @param lastSyncSentTime - last time peer was asked to sync, None if never * @param lastSyncGetTime - last time peer received sync, None if never */ case class ErgoPeerStatus(peer: ConnectedPeer, status: PeerChainStatus, - height: Height, + headersHeight: Height, + storedHeaders: Seq[(Height, Height)], + storedFullblocks: Seq[(Height, Height)], lastSyncSentTime: Option[Time], lastSyncGetTime: Option[Time]) { val mode: Option[ModePeerFeature] = ErgoPeerStatus.mode(peer) @@ -45,7 +49,7 @@ object ErgoPeerStatus { "version" -> status.version.map(_.toString).getOrElse("N/A").asJson, "mode" -> status.mode.asJson, "status" -> status.status.toString.asJson, - "height" -> status.height.asJson + "height" -> status.headersHeight.asJson ) } diff --git a/src/main/scala/org/ergoplatform/network/ErgoSyncTracker.scala b/src/main/scala/org/ergoplatform/network/ErgoSyncTracker.scala index 3c0fcec684..7cfd2b26a2 100644 --- a/src/main/scala/org/ergoplatform/network/ErgoSyncTracker.scala +++ b/src/main/scala/org/ergoplatform/network/ErgoSyncTracker.scala @@ -1,9 +1,9 @@ package org.ergoplatform.network -import org.ergoplatform.nodeView.history.{ErgoHistory, ErgoHistoryReader, ErgoSyncInfo, ErgoSyncInfoV1, ErgoSyncInfoV2} +import org.ergoplatform.nodeView.history.{ErgoHistory, ErgoHistoryReader, ErgoSyncInfo, ErgoSyncInfoV1, ErgoSyncInfoV3, HeadersBasedSyncInfo} import org.ergoplatform.nodeView.history.ErgoHistory.Height -import scorex.core.consensus.{Fork, PeerChainStatus, Older, Unknown} +import scorex.core.consensus.{Fork, Older, PeerChainStatus, Unknown} import scorex.core.network.ConnectedPeer import scorex.core.settings.NetworkSettings import scorex.core.utils.TimeProvider @@ -13,7 +13,8 @@ import scala.collection.mutable import scala.concurrent.duration._ import scorex.core.utils.MapPimp -final case class ErgoSyncTracker(networkSettings: NetworkSettings, timeProvider: TimeProvider) extends ScorexLogging { +final case class ErgoSyncTracker(networkSettings: NetworkSettings, + timeProvider: TimeProvider) extends ScorexLogging { private val MinSyncInterval: FiniteDuration = 20.seconds private val SyncThreshold: FiniteDuration = 1.minute @@ -60,9 +61,17 @@ final case class ErgoSyncTracker(networkSettings: NetworkSettings, timeProvider: val height = syncInfo match { case _: ErgoSyncInfoV1 => None - case sv2: ErgoSyncInfoV2 => sv2.height + case otherVersion: HeadersBasedSyncInfo => otherVersion.height } - updateStatus(peer, status, height) + val peerHeaders = syncInfo match { + case v3: ErgoSyncInfoV3 => v3.headersRanges + case _ => Seq.empty + } + val peerFullBlocks = syncInfo match { + case v3: ErgoSyncInfoV3 => v3.fullBlocksRanges + case _ => Seq.empty + } + updateStatus(peer, status, height, peerHeaders, peerFullBlocks) val syncSendNeeded = (oldStatus != status) || notSyncedOrOutdated(peer) || status == Older || status == Fork @@ -71,13 +80,20 @@ final case class ErgoSyncTracker(networkSettings: NetworkSettings, timeProvider: def updateStatus(peer: ConnectedPeer, status: PeerChainStatus, - height: Option[Height]): Unit = { + height: Option[Height], + peerHeaders: Seq[(Height, Height)], + peerFullblocks: Seq[(Height, Height)]): Unit = { val seniorsBefore = numOfSeniors() statuses.adjust(peer){ case None => - ErgoPeerStatus(peer, status, height.getOrElse(ErgoHistory.EmptyHistoryHeight), None, None) + ErgoPeerStatus(peer, status, height.getOrElse(ErgoHistory.EmptyHistoryHeight), peerHeaders, peerFullblocks, None, None) case Some(existingPeer) => - existingPeer.copy(status = status, height = height.getOrElse(existingPeer.height)) + existingPeer.copy( + status = status, + headersHeight = height.getOrElse(existingPeer.headersHeight), + storedHeaders = peerHeaders, + storedFullblocks = peerFullblocks + ) } val seniorsAfter = numOfSeniors() @@ -125,7 +141,7 @@ final case class ErgoSyncTracker(networkSettings: NetworkSettings, timeProvider: if (peersToClear.nonEmpty) { log.debug(s"Clearing stalled statuses for $peersToClear") // we set status to `Unknown` and reset peer's height - peersToClear.foreach(p => updateStatus(p, Unknown, None)) + peersToClear.foreach(p => updateStatus(p, Unknown, None, Seq(0 -> 0), Seq(0 -> 0))) } } @@ -149,7 +165,7 @@ final case class ErgoSyncTracker(networkSettings: NetworkSettings, timeProvider: def maxHeight(): Option[Int] = { if (statuses.nonEmpty) { - Some(statuses.maxBy(_._2.height)._2.height) + Some(statuses.maxBy(_._2.headersHeight)._2.headersHeight) } else { None } @@ -194,7 +210,7 @@ final case class ErgoSyncTracker(networkSettings: NetworkSettings, timeProvider: case (peer, status) => (peer.connectionId.remoteAddress, statuses.get(peer), status.lastSyncSentTime.map(now - _)) }.map { case (address, status, millisSinceLastSync) => - s"$address, height: ${status.map(_.height)}, status: ${status.map(_.status)}, lastSync: $millisSinceLastSync ms ago" + s"$address, height: ${status.map(_.headersHeight)}, status: ${status.map(_.status)}, lastSync: $millisSinceLastSync ms ago" }.mkString("\n") } diff --git a/src/main/scala/org/ergoplatform/network/PeerFilteringRule.scala b/src/main/scala/org/ergoplatform/network/PeerFilteringRule.scala index 09af67319d..f71adaa0f9 100644 --- a/src/main/scala/org/ergoplatform/network/PeerFilteringRule.scala +++ b/src/main/scala/org/ergoplatform/network/PeerFilteringRule.scala @@ -78,10 +78,22 @@ final case class BlockSectionsDownloadFilter(stateType: StateType) extends PeerF * If peer's version is >= 4.0.16, the peer is supporting sync V2 */ object SyncV2Filter extends PeerFilteringRule { + private val syncV2Version = Version(4, 0, 16) override def condition(version: Version): Boolean = { - val syncV2Version = Version(4, 0, 16) version.compare(syncV2Version) >= 0 } } + +/** + * If peer's version is >= 4.0.16, the peer is supporting sync V2 + */ +object SyncV3Filter extends PeerFilteringRule { + private val syncV3Version = Version(5, 0, 10) // todo: set before release + + override def condition(version: Version): Boolean = { + version.compare(syncV3Version) >= 0 + } + +} diff --git a/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistoryReader.scala b/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistoryReader.scala index 0637f04cdd..b1b8d5893c 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistoryReader.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistoryReader.scala @@ -134,8 +134,8 @@ trait ErgoHistoryReader info match { case syncV1: ErgoSyncInfoV1 => compareV1(syncV1) - case syncV2: ErgoSyncInfoV2 => - compareV2(syncV2) + case otherVersion: HeadersBasedSyncInfo => + compareV2(otherVersion) } } @@ -148,7 +148,7 @@ trait ErgoHistoryReader * Older if the neighbour is ahead, * Fork if the neighbour is on a fork */ - def compareV2(info: ErgoSyncInfoV2): PeerChainStatus = { + def compareV2(info: HeadersBasedSyncInfo): PeerChainStatus = { bestHeaderOpt.map { myLastHeader => if (info.lastHeaders.isEmpty) { Younger @@ -260,7 +260,7 @@ trait ErgoHistoryReader * Calculating continuation from common header which will be sent to another node * if comparison status is YOUNGER of FORK, for sync message V2. */ - def continuationIdsV2(syncV2: ErgoSyncInfoV2, size: Int): ModifierIds = { + def continuationIdsV2(syncV2: HeadersBasedSyncInfo, size: Int): ModifierIds = { if (syncV2.lastHeaders.isEmpty) { // if other node has no headers yet, send up to `size` headers from genesis val heightTo = Math.min(headersHeight, size + ErgoHistory.EmptyHistoryHeight) @@ -285,7 +285,7 @@ trait ErgoHistoryReader * @param syncInfo other's node sync info * @return maybe continuation header */ - def continuationHeaderV2(syncInfo: ErgoSyncInfoV2): Option[Header] = { + def continuationHeaderV2(syncInfo: HeadersBasedSyncInfo): Option[Header] = { if (syncInfo.lastHeaders.isEmpty) { Option.empty } else { @@ -310,7 +310,7 @@ trait ErgoHistoryReader def continuationIds(syncInfo: ErgoSyncInfo, size: Int): ModifierIds = { syncInfo match { case syncV1: ErgoSyncInfoV1 => continuationIdsV1(syncV1, size) - case syncV2: ErgoSyncInfoV2 => continuationIdsV2(syncV2, size) + case otherVersion: HeadersBasedSyncInfo => continuationIdsV2(otherVersion, size) } } @@ -373,15 +373,9 @@ trait ErgoHistoryReader } } - - /** - * @return sync info for neigbour peers, V2 message - * @param full - if false, only last header to be sent, otherwise, multiple headers - * full info is needed when - */ - def syncInfoV2(full: Boolean): ErgoSyncInfoV2 = { + private def headersForSyncInfo(full: Boolean): Array[Header] = { if (isEmpty) { - ErgoSyncInfoV2(Nil) + Array.empty } else { val h = headersHeight @@ -391,12 +385,24 @@ trait ErgoHistoryReader ErgoHistoryReader.ReducedV2SyncOffsets } - val headers = offsets.flatMap(offset => bestHeaderAtHeight(h - offset)) - - ErgoSyncInfoV2(headers) + offsets.flatMap(offset => bestHeaderAtHeight(h - offset)) } } + /** + * @return sync info for neigbour peers, V2 message + * @param full - if false, only last header to be sent, otherwise, multiple headers + * full info is needed when + */ + def syncInfoV2(full: Boolean): ErgoSyncInfoV2 = { + ErgoSyncInfoV2(headersForSyncInfo(full)) + } + + def syncInfoV3(full: Boolean): ErgoSyncInfoV3 = { + val headers = headersForSyncInfo(full) + ErgoSyncInfoV3(headers, Seq(1 -> headersHeight), Seq(minFullBlockAvailable -> fullBlockHeight)) + } + /** * Return last count headers from best headers chain if exist or chain up to genesis otherwise */ diff --git a/src/main/scala/org/ergoplatform/nodeView/history/ErgoSyncInfo.scala b/src/main/scala/org/ergoplatform/nodeView/history/ErgoSyncInfo.scala index cb627e3a4b..7302fdba0c 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/ErgoSyncInfo.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/ErgoSyncInfo.scala @@ -1,6 +1,7 @@ package org.ergoplatform.nodeView.history import org.ergoplatform.modifiers.history.header.{Header, HeaderSerializer} +import org.ergoplatform.nodeView.history.ErgoHistory.Height import scorex.core.NodeViewModifier import scorex.core.consensus.SyncInfo import scorex.core.network.message.SyncInfoMessageSpec @@ -30,10 +31,9 @@ case class ErgoSyncInfoV1(lastHeaderIds: Seq[ModifierId]) extends ErgoSyncInfo { override val nonEmpty: Boolean = lastHeaderIds.nonEmpty } -/** - * @param lastHeaders - some recent headers (including last one) known to a peer - */ -case class ErgoSyncInfoV2(lastHeaders: Seq[Header]) extends ErgoSyncInfo { +trait HeadersBasedSyncInfo extends ErgoSyncInfo { + val lastHeaders: Seq[Header] + /** * Height of a chain reported by a peer (so most recent header it shows) */ @@ -42,6 +42,15 @@ case class ErgoSyncInfoV2(lastHeaders: Seq[Header]) extends ErgoSyncInfo { override val nonEmpty: Boolean = lastHeaders.nonEmpty } +/** + * @param lastHeaders - some recent headers (including last one) known to a peer + */ +case class ErgoSyncInfoV2(lastHeaders: Seq[Header]) extends ErgoSyncInfo with HeadersBasedSyncInfo + +case class ErgoSyncInfoV3(lastHeaders: Seq[Header], + headersRanges: Seq[(Height, Height)], + fullBlocksRanges: Seq[(Height, Height)]) extends ErgoSyncInfo with HeadersBasedSyncInfo + object ErgoSyncInfo { val MaxBlockIds = 1000 } @@ -50,6 +59,8 @@ object ErgoSyncInfoSerializer extends ScorexSerializer[ErgoSyncInfo] with Scorex val v2HeaderMode: Byte = -1 // used to mark sync v2 messages + val v3HeaderMode: Byte = -2 // used to mark sync v2 messages + val MaxHeadersAllowed = 50 // in sync v2 message, no more than 50 headers allowed val MaxHeaderSize = 1000 // currently header is about 200+ bytes, but new fields can be added via a SF, @@ -70,7 +81,33 @@ object ErgoSyncInfoSerializer extends ScorexSerializer[ErgoSyncInfo] with Scorex w.putUShort(headerBytes.length) w.putBytes(headerBytes) } + case v3: ErgoSyncInfoV3 => + w.putUShort(0) // to stop sync v1 parser + w.put(v3HeaderMode) // signal that v2 message started + w.putUByte(v3.lastHeaders.length) // number of headers peer is announcing + // write last headers + v3.lastHeaders.foreach { h => + val headerBytes = h.bytes + w.putUShort(headerBytes.length) + w.putBytes(headerBytes) + } + // write headers available + // todo: limit max number of records, add checks + val headerRangesCount = v3.headersRanges.length.toByte + w.put(headerRangesCount) + v3.headersRanges.foreach { case (start, end) => + w.putUInt(start) + w.putUInt(end) + } + // write full-blocks available + // todo: limit max number of records, add checks + val fullblocksRangesCount = v3.fullBlocksRanges.length.toByte + w.put(fullblocksRangesCount) + v3.fullBlocksRanges.foreach { case (start, end) => + w.putUInt(start) + w.putUInt(end) + } case _ => log.error(s"Wrong SyncInfo version: $obj") } @@ -93,8 +130,11 @@ object ErgoSyncInfoSerializer extends ScorexSerializer[ErgoSyncInfo] with Scorex HeaderSerializer.parseBytes(headerBytes) } ErgoSyncInfoV2(headers) + } else if (mode == v3HeaderMode) { + //todo: do sync v3 reader + ??? } else { - throw new Exception(s"Wrong SyncInfo version: $r") + throw new Exception(s"Wrong SyncInfo version encoded with $mode") } } else { // parse v1 sync message require(length <= ErgoSyncInfo.MaxBlockIds + 1, "Too many block ids in sync info") diff --git a/src/main/scala/org/ergoplatform/nodeView/history/storage/modifierprocessors/ToDownloadProcessor.scala b/src/main/scala/org/ergoplatform/nodeView/history/storage/modifierprocessors/ToDownloadProcessor.scala index bb814acc00..ef1e49c0c4 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/storage/modifierprocessors/ToDownloadProcessor.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/storage/modifierprocessors/ToDownloadProcessor.scala @@ -3,6 +3,7 @@ package org.ergoplatform.nodeView.history.storage.modifierprocessors import org.ergoplatform.modifiers.ErgoFullBlock import org.ergoplatform.modifiers.history._ import org.ergoplatform.modifiers.history.header.Header +import org.ergoplatform.nodeView.history.ErgoHistory.Height import org.ergoplatform.settings.{ChainSettings, ErgoSettings, NodeConfigurationSettings} import scorex.core.ModifierTypeId import scorex.core.utils.NetworkTimeProvider @@ -35,10 +36,18 @@ trait ToDownloadProcessor extends BasicReaders with ScorexLogging { def isInBestChain(id: ModifierId): Boolean + def estimatedTip(): Option[Height] + /** Returns true if we estimate that our chain is synced with the network. Start downloading full blocks after that */ def isHeadersChainSynced: Boolean = pruningProcessor.isHeadersChainSynced + /** + * @return min full block height the node has + * // todo: improve scaladoc + */ + def minFullBlockAvailable: Int = pruningProcessor.minimalFullBlockHeight + /** * Get modifier ids to download to synchronize full blocks * @param howManyPerType how many ModifierIds per ModifierTypeId to fetch @@ -46,18 +55,16 @@ trait ToDownloadProcessor extends BasicReaders with ScorexLogging { * @return next max howManyPerType ModifierIds by ModifierTypeId to download filtered by condition */ def nextModifiersToDownload(howManyPerType: Int, - estimatedTip: Option[Int], condition: (ModifierTypeId, ModifierId) => Boolean): Map[ModifierTypeId, Seq[ModifierId]] = { val FullBlocksToDownloadAhead = 192 // how many full blocks to download forwards during active sync - - def farAwayFromBeingSynced(fb: ErgoFullBlock) = fb.height < (estimatedTip.getOrElse(0) - 128) + def farAwayFromBeingSynced(fb: ErgoFullBlock) = fb.height < (estimatedTip().getOrElse(0) - 128) @tailrec def continuation(height: Int, acc: Map[ModifierTypeId, Vector[ModifierId]], - maxHeight: Int = Int.MaxValue): Map[ModifierTypeId, Vector[ModifierId]] = { + maxHeight: Int): Map[ModifierTypeId, Vector[ModifierId]] = { if (height > maxHeight) { acc } else { @@ -90,10 +97,10 @@ trait ToDownloadProcessor extends BasicReaders with ScorexLogging { // when blockchain is about to be synced, // download children blocks of last 100 full blocks applied to the best chain, to get block sections from forks val minHeight = Math.max(1, fb.header.height - 100) - continuation(minHeight, Map.empty) + continuation(minHeight, Map.empty, maxHeight = Int.MaxValue) case _ => // if headers-chain is synced and no full blocks applied yet, find full block height to go from - continuation(pruningProcessor.minimalFullBlockHeight, Map.empty) + continuation(pruningProcessor.minimalFullBlockHeight, Map.empty, maxHeight = Int.MaxValue) } } diff --git a/src/main/scala/scorex/core/app/Application.scala b/src/main/scala/scorex/core/app/Application.scala index 9df3b390e4..a7224b5791 100644 --- a/src/main/scala/scorex/core/app/Application.scala +++ b/src/main/scala/scorex/core/app/Application.scala @@ -4,9 +4,16 @@ import akka.actor.{ActorRef, ActorSystem} import akka.http.scaladsl.Http import akka.http.scaladsl.server.{ExceptionHandler, RejectionHandler, Route} import org.ergoplatform.ErgoApp +import org.ergoplatform.nodeView.history.ErgoSyncInfoMessageSpec import org.ergoplatform.settings.ErgoSettings -import scorex.core.api.http.{ApiErrorHandler, ApiRejectionHandler, ApiRoute, CompositeHttpService} +import scorex.core.api.http.{ + ApiErrorHandler, + ApiRejectionHandler, + ApiRoute, + CompositeHttpService +} import scorex.core.network._ +import scorex.core.network.message.Message.MessageCode import scorex.core.network.message._ import scorex.core.network.peer.PeerManagerRef import scorex.core.settings.ScorexSettings @@ -28,14 +35,20 @@ trait Application extends ScorexLogging { implicit def exceptionHandler: ExceptionHandler = ApiErrorHandler.exceptionHandler implicit def rejectionHandler: RejectionHandler = ApiRejectionHandler.rejectionHandler - protected implicit lazy val actorSystem: ActorSystem = ActorSystem(scorexSettings.network.agentName) - implicit val executionContext: ExecutionContext = actorSystem.dispatchers.lookup("scorex.executionContext") + implicit protected lazy val actorSystem: ActorSystem = ActorSystem( + scorexSettings.network.agentName + ) + + implicit val executionContext: ExecutionContext = + actorSystem.dispatchers.lookup("scorex.executionContext") protected val features: Seq[PeerFeature] protected val additionalMessageSpecs: Seq[MessageSpec[_]] //p2p - private val upnpGateway: Option[UPnPGateway] = if (scorexSettings.network.upnpEnabled) UPnP.getValidGateway(scorexSettings.network) else None + private val upnpGateway: Option[UPnPGateway] = + if (scorexSettings.network.upnpEnabled) UPnP.getValidGateway(scorexSettings.network) + else None // TODO use available port on gateway instead settings.network.bindAddress.getPort upnpGateway.foreach(_.addPort(scorexSettings.network.bindAddress.getPort)) @@ -61,26 +74,63 @@ trait Application extends ScorexLogging { lazy val externalSocketAddress: Option[InetSocketAddress] = { scorexSettings.network.declaredAddress orElse { // TODO use available port on gateway instead settings.bindAddress.getPort - upnpGateway.map(u => new InetSocketAddress(u.externalAddress, scorexSettings.network.bindAddress.getPort)) + upnpGateway.map(u => + new InetSocketAddress( + u.externalAddress, + scorexSettings.network.bindAddress.getPort + ) + ) } } val scorexContext = ScorexContext( - messageSpecs = basicSpecs ++ additionalMessageSpecs, - upnpGateway = upnpGateway, - timeProvider = timeProvider, + messageSpecs = basicSpecs ++ additionalMessageSpecs, + upnpGateway = upnpGateway, + timeProvider = timeProvider, externalNodeAddress = externalSocketAddress ) val peerManagerRef = PeerManagerRef(ergoSettings, scorexContext) - val networkControllerRef: ActorRef = NetworkControllerRef( - "networkController", ergoSettings, peerManagerRef, scorexContext) + private val messageHandlers: ActorRef => Map[MessageCode, ActorRef] = + networkControllerRef => { + Map( + InvSpec.messageCode -> nodeViewSynchronizer, + RequestModifierSpec.messageCode -> nodeViewSynchronizer, + ModifiersSpec.messageCode -> nodeViewSynchronizer, + ErgoSyncInfoMessageSpec.messageCode -> nodeViewSynchronizer, + PeersSpec.messageCode -> PeerSynchronizerRef( + "PeerSynchronizer", + networkControllerRef, + peerManagerRef, + scorexSettings.network + ) + ) + } + + val networkControllerRef: ActorRef = + NetworkControllerRef( + "networkController", + ergoSettings, + peerManagerRef, + scorexContext, + messageHandlers + ) val peerSynchronizer: ActorRef = - PeerSynchronizerRef("PeerSynchronizer", networkControllerRef, peerManagerRef, scorexSettings.network) + PeerSynchronizerRef( + "PeerSynchronizer", + networkControllerRef, + peerManagerRef, + scorexSettings.network + ) - lazy val combinedRoute: Route = CompositeHttpService(actorSystem, apiRoutes, scorexSettings.restApi, swaggerConfig).compositeRoute + lazy val combinedRoute: Route = CompositeHttpService( + actorSystem, + apiRoutes, + scorexSettings.restApi, + swaggerConfig + ).compositeRoute def run(): Unit = { val applicationNameLimit: Int = 50 @@ -92,7 +142,9 @@ trait Application extends ScorexLogging { val bindAddress = scorexSettings.restApi.bindAddress - Http().newServerAt(bindAddress.getAddress.getHostAddress, bindAddress.getPort).bindFlow(combinedRoute) + Http() + .newServerAt(bindAddress.getAddress.getHostAddress, bindAddress.getPort) + .bindFlow(combinedRoute) //on unexpected shutdown Runtime.getRuntime.addShutdownHook(new Thread() { diff --git a/src/main/scala/scorex/core/network/NetworkController.scala b/src/main/scala/scorex/core/network/NetworkController.scala index ced4175fd7..18bf3f886b 100644 --- a/src/main/scala/scorex/core/network/NetworkController.scala +++ b/src/main/scala/scorex/core/network/NetworkController.scala @@ -1,7 +1,7 @@ package scorex.core.network import java.net._ -import akka.actor._ +import akka.actor.{ActorRef, _} import akka.io.Tcp._ import akka.io.{IO, Tcp} import akka.pattern.ask @@ -11,7 +11,7 @@ import org.ergoplatform.network.ErgoNodeViewSynchronizer.ReceivableMessages.{Dis import org.ergoplatform.network.ModePeerFeature import org.ergoplatform.settings.ErgoSettings import scorex.core.network.message.Message.MessageCode -import scorex.core.network.message.{Message, MessageSpec} +import scorex.core.network.message.Message import scorex.core.network.peer.PeerManager.ReceivableMessages._ import scorex.core.network.peer.{LocalAddressPeerFeature, PeerInfo, PeerManager, PeersStatus, PenaltyType, RestApiUrlPeerFeature, SessionIdPeerFeature} import scorex.core.utils.TimeProvider.Time @@ -29,7 +29,8 @@ import scala.util.{Random, Try} class NetworkController(ergoSettings: ErgoSettings, peerManagerRef: ActorRef, scorexContext: ScorexContext, - tcpManager: ActorRef + tcpManager: ActorRef, + messageHandlersPartial: ActorRef => Map[MessageCode, ActorRef] )(implicit ec: ExecutionContext) extends Actor with ScorexLogging { import NetworkController.ReceivableMessages._ @@ -54,11 +55,9 @@ class NetworkController(ergoSettings: ErgoSettings, // capabilities of our node private val modePeerFeature = ModePeerFeature(ergoSettings.nodeSettings) + private val messageHandlers = messageHandlersPartial(self) private implicit val timeout: Timeout = Timeout(networkSettings.controllerTimeout.getOrElse(5.seconds)) - - private var messageHandlers = Map.empty[MessageCode, ActorRef] - private lazy val bindAddress = networkSettings.bindAddress private var connections = Map.empty[InetSocketAddress, ConnectedPeer] @@ -136,10 +135,6 @@ class NetworkController(ergoSettings: ErgoSettings, filterConnections(sendingStrategy, message.spec.protocolVersion).foreach { connectedPeer => connectedPeer.handlerRef ! message } - - case RegisterMessageSpecs(specs, handler) => - log.info(s"Registering handlers for ${specs.map(s => s.messageCode -> s.messageName)}") - messageHandlers ++= specs.map(_.messageCode -> handler) } private def peerCommands: Receive = { @@ -537,7 +532,7 @@ object NetworkController { case class Handshaked(peer: PeerInfo) - case class RegisterMessageSpecs(specs: Seq[MessageSpec[_]], handler: ActorRef) + //case class RegisterMessageSpecs(specs: Seq[MessageSpec[_]], handler: ActorRef) case class SendToNetwork(message: Message[_], sendingStrategy: SendingStrategy) @@ -561,22 +556,24 @@ object NetworkController { } object NetworkControllerRef { + def props(settings: ErgoSettings, peerManagerRef: ActorRef, scorexContext: ScorexContext, - tcpManager: ActorRef)(implicit ec: ExecutionContext): Props = { - Props(new NetworkController(settings, peerManagerRef, scorexContext, tcpManager)) + tcpManager: ActorRef, + messageHandlers: ActorRef => Map[MessageCode, ActorRef] + )(implicit ec: ExecutionContext): Props = { + Props(new NetworkController(settings, peerManagerRef, scorexContext, tcpManager, messageHandlers) + ) } def apply(name: String, settings: ErgoSettings, peerManagerRef: ActorRef, - scorexContext: ScorexContext) + scorexContext: ScorexContext, + messageHandlers: ActorRef => Map[MessageCode, ActorRef]) (implicit system: ActorSystem, ec: ExecutionContext): ActorRef = { - system.actorOf( - props(settings, peerManagerRef, scorexContext, IO(Tcp)), - name) + system.actorOf(props(settings, peerManagerRef, scorexContext, IO(Tcp), messageHandlers), name) } - } diff --git a/src/main/scala/scorex/core/network/PeerSynchronizer.scala b/src/main/scala/scorex/core/network/PeerSynchronizer.scala index 33c61877f7..27ebb84a95 100644 --- a/src/main/scala/scorex/core/network/PeerSynchronizer.scala +++ b/src/main/scala/scorex/core/network/PeerSynchronizer.scala @@ -4,7 +4,7 @@ import akka.actor.SupervisorStrategy.{Restart, Stop} import akka.actor.{Actor, ActorInitializationException, ActorKilledException, ActorRef, ActorSystem, DeathPactException, OneForOneStrategy, Props} import akka.pattern.ask import akka.util.Timeout -import scorex.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork} +import scorex.core.network.NetworkController.ReceivableMessages.{PenalizePeer, SendToNetwork} import scorex.core.network.message.{GetPeersSpec, Message, MessageSpec, PeersSpec} import scorex.core.network.peer.{PeerInfo, PenaltyType} import scorex.core.network.peer.PeerManager.ReceivableMessages.{AddPeerIfEmpty, SeenPeers} @@ -50,8 +50,6 @@ class PeerSynchronizer(val networkControllerRef: ActorRef, override def preStart: Unit = { super.preStart() - networkControllerRef ! RegisterMessageSpecs(Seq(GetPeersSpec, peersSpec), self) - val msg = Message[Unit](GetPeersSpec, Right(Unit), None) val stn = SendToNetwork(msg, SendToRandom) context.system.scheduler.scheduleWithFixedDelay(2.seconds, settings.getPeersInterval, networkControllerRef, stn) diff --git a/src/test/scala/org/ergoplatform/network/ErgoSyncTrackerSpecification.scala b/src/test/scala/org/ergoplatform/network/ErgoSyncTrackerSpecification.scala index 02872c3449..f12aff4e62 100644 --- a/src/test/scala/org/ergoplatform/network/ErgoSyncTrackerSpecification.scala +++ b/src/test/scala/org/ergoplatform/network/ErgoSyncTrackerSpecification.scala @@ -15,14 +15,14 @@ class ErgoSyncTrackerSpecification extends ErgoPropertyTest { val height = 1000 // add peer to sync - syncTracker.updateStatus(connectedPeer, Younger, Some(height)) + syncTracker.updateStatus(connectedPeer, Younger, Some(height), Seq(0 -> height), Seq(0 -> height)) syncTracker.maxHeight() shouldBe Some(height) - syncTracker.statuses(connectedPeer) shouldBe ErgoPeerStatus(connectedPeer, Younger, height, None, None) + syncTracker.statuses(connectedPeer) shouldBe ErgoPeerStatus(connectedPeer, Younger, height, Seq(0 -> height), Seq(0 -> height), None, None) // updating status should change status and height of existing peer - syncTracker.updateStatus(connectedPeer, Older, Some(height+1)) + syncTracker.updateStatus(connectedPeer, Older, Some(height+1), Seq(0 -> (height + 1)), Seq(0 -> (height + 1))) syncTracker.maxHeight() shouldBe Some(height + 1) syncTracker.getStatus(connectedPeer) shouldBe Some(Older) - syncTracker.fullInfo().head.height shouldBe height+1 + syncTracker.fullInfo().head.headersHeight shouldBe height+1 syncTracker.peersByStatus.apply(Older).head shouldBe connectedPeer // peer should not be synced yet diff --git a/src/test/scala/org/ergoplatform/nodeView/history/VerifyNonADHistorySpecification.scala b/src/test/scala/org/ergoplatform/nodeView/history/VerifyNonADHistorySpecification.scala index 4ef3e68b54..ce63ea22c5 100644 --- a/src/test/scala/org/ergoplatform/nodeView/history/VerifyNonADHistorySpecification.scala +++ b/src/test/scala/org/ergoplatform/nodeView/history/VerifyNonADHistorySpecification.scala @@ -122,13 +122,13 @@ class VerifyNonADHistorySpecification extends HistoryTestHelpers { newAcc.adjust(mType)(_.fold(Seq(mId))(_ :+ mId)) } - history.nextModifiersToDownload(1, None, (_, id) => !history.contains(id)) + history.nextModifiersToDownload(1, (_, id) => !history.contains(id)) .map(id => (id._1, id._2.map(Algos.encode))) shouldEqual missedBS.mapValues(_.take(1)).view.force - history.nextModifiersToDownload(2 * (BlocksToKeep - 1), None, (_, id) => !history.contains(id)) + history.nextModifiersToDownload(2 * (BlocksToKeep - 1), (_, id) => !history.contains(id)) .map(id => (id._1, id._2.map(Algos.encode))) shouldEqual missedBS - history.nextModifiersToDownload(2, None, (_, id) => !history.contains(id) && (id != missedChain.head.blockTransactions.id)) + history.nextModifiersToDownload(2, (_, id) => !history.contains(id) && (id != missedChain.head.blockTransactions.id)) .map(id => (id._1, id._2.map(Algos.encode))) shouldEqual missedBS.mapValues(_.take(2).filter( _ != missedChain.head.blockTransactions.id)).view.force }