From f754de2a811df0857084b5eceb0c035674bc1cca Mon Sep 17 00:00:00 2001 From: Andrea Vezzosi Date: Fri, 13 Dec 2024 14:45:10 +0100 Subject: [PATCH] Haskell Leios P2P (#95) --- simulation/ouroboros-leios-sim.cabal | 8 + simulation/src/ChanMux.hs | 5 + simulation/src/LeiosProtocol/Common.hs | 32 +- simulation/src/LeiosProtocol/Relay.hs | 19 +- simulation/src/LeiosProtocol/RelayBuffer.hs | 2 +- simulation/src/LeiosProtocol/Short.hs | 159 ++-- .../src/LeiosProtocol/Short/Generate.hs | 102 ++- simulation/src/LeiosProtocol/Short/Node.hs | 518 +++++++++---- simulation/src/LeiosProtocol/Short/Sim.hs | 152 ++++ simulation/src/LeiosProtocol/Short/SimP2P.hs | 193 +++++ simulation/src/LeiosProtocol/Short/VizSim.hs | 732 ++++++++++++++++++ .../src/LeiosProtocol/Short/VizSimP2P.hs | 714 +++++++++++++++++ simulation/src/LeiosProtocol/SimTestRelay.hs | 3 + simulation/src/Main.hs | 14 +- simulation/src/P2P.hs | 12 +- simulation/src/PraosProtocol/BlockFetch.hs | 85 +- simulation/src/PraosProtocol/ChainSync.hs | 21 +- simulation/src/PraosProtocol/Common.hs | 6 +- simulation/src/PraosProtocol/Common/Chain.hs | 9 +- .../src/PraosProtocol/ExamplesPraosP2P.hs | 22 +- simulation/src/PraosProtocol/PraosNode.hs | 6 +- simulation/src/PraosProtocol/SimBlockFetch.hs | 2 +- simulation/src/PraosProtocol/SimChainSync.hs | 5 +- .../src/PraosProtocol/VizSimBlockFetch.hs | 2 +- .../src/PraosProtocol/VizSimChainSync.hs | 2 +- simulation/src/PraosProtocol/VizSimPraos.hs | 16 +- simulation/src/RelayProtocol.hs | 2 +- simulation/src/SimTypes.hs | 5 + simulation/src/Topology.hs | 2 +- simulation/src/Viz.hs | 2 +- 30 files changed, 2541 insertions(+), 311 deletions(-) create mode 100644 simulation/src/LeiosProtocol/Short/Sim.hs create mode 100644 simulation/src/LeiosProtocol/Short/SimP2P.hs create mode 100644 simulation/src/LeiosProtocol/Short/VizSim.hs create mode 100644 simulation/src/LeiosProtocol/Short/VizSimP2P.hs diff --git a/simulation/ouroboros-leios-sim.cabal b/simulation/ouroboros-leios-sim.cabal index 94445e1b..25e1ddbf 100644 --- a/simulation/ouroboros-leios-sim.cabal +++ b/simulation/ouroboros-leios-sim.cabal @@ -52,6 +52,10 @@ library LeiosProtocol.Short LeiosProtocol.Short.Generate LeiosProtocol.Short.Node + LeiosProtocol.Short.Sim + LeiosProtocol.Short.SimP2P + LeiosProtocol.Short.VizSim + LeiosProtocol.Short.VizSimP2P LeiosProtocol.SimTestRelay LeiosProtocol.VizSimTestRelay ModelTCP @@ -106,6 +110,9 @@ library , containers , contra-tracer , deepseq + , diagrams-cairo + , diagrams-core + , diagrams-lib , fgl , filepath , fingertree @@ -116,6 +123,7 @@ library , io-classes , io-sim , kdt + , linear , mtl , nothunks , ouroboros-network-api diff --git a/simulation/src/ChanMux.hs b/simulation/src/ChanMux.hs index 071a20eb..347b0639 100644 --- a/simulation/src/ChanMux.hs +++ b/simulation/src/ChanMux.hs @@ -35,6 +35,7 @@ import Control.Tracer import Chan import ChanTCP +import qualified Control.Category as Cat import TimeCompat class MuxBundle bundle where @@ -62,6 +63,10 @@ data ToFromMuxMsg mm a , fromMuxMsg :: mm -> a } +instance Cat.Category ToFromMuxMsg where + id = ToFromMuxMsg id id + (.) (ToFromMuxMsg f f') (ToFromMuxMsg g g') = ToFromMuxMsg (g . f) (f' . g') + dynToFromMuxMsg :: Typeable a => ToFromMuxMsg Dynamic a dynToFromMuxMsg = ToFromMuxMsg toDyn (fromJust . fromDynamic) diff --git a/simulation/src/LeiosProtocol/Common.hs b/simulation/src/LeiosProtocol/Common.hs index a32893fc..476093f9 100644 --- a/simulation/src/LeiosProtocol/Common.hs +++ b/simulation/src/LeiosProtocol/Common.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE DataKinds #-} {-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE DerivingStrategies #-} @@ -32,14 +33,18 @@ module LeiosProtocol.Common ( rankingBlockBodyInvariant, NodeId, SubSlotNo (..), + Word64, ) where import ChanTCP +import Control.Exception (assert) +import Control.Monad (guard) import Data.Hashable -import Data.Set (Set) -import Data.Word (Word8) +import Data.Map (Map) +import Data.Word (Word64, Word8) import GHC.Generics +import GHC.Records import Ouroboros.Network.Block as Block import PraosProtocol.Common ( ChainHash (..), @@ -88,6 +93,8 @@ data RankingBlockBody = RankingBlockBody , payload :: !Bytes -- ^ ranking blocks can also contain transactions directly, which we -- do not model directly, but contribute to size. + , nodeId :: !NodeId + -- ^ convenience to keep track of origin, does not contribute to size. , size :: !Bytes } deriving stock (Eq, Show, Generic) @@ -104,7 +111,8 @@ data InputBlockId = InputBlockId { node :: !NodeId , num :: !Int } - deriving stock (Eq, Ord, Show) + deriving stock (Eq, Ord, Show, Generic) + deriving anyclass (Hashable) newtype SubSlotNo = SubSlotNo Word8 deriving stock (Show) @@ -129,6 +137,8 @@ data InputBlockBody = InputBlockBody { id :: !InputBlockId , size :: !Bytes -- ^ transactions not modeled, only their total size. + , slot :: !SlotNo + -- ^ duplicated here for convenience of vizualization, does not contribute to size. } deriving stock (Eq, Show) @@ -141,6 +151,9 @@ data InputBlock = InputBlock inputBlockInvariant :: InputBlock -> Bool inputBlockInvariant ib = ib.header.id == ib.body.id +instance HasField "id" InputBlock InputBlockId where + getField = (.id) . (.header) + data EndorseBlockId = EndorseBlockId { node :: !NodeId , num :: !Int @@ -172,13 +185,14 @@ data VoteMsg = VoteMsg { id :: !VoteId , slot :: !SlotNo , producer :: !NodeId - , endorseBlock :: !EndorseBlockId + , votes :: Word64 + , endorseBlocks :: ![EndorseBlockId] , size :: !Bytes } deriving stock (Eq, Show) data Certificate = Certificate - { votes :: Set VoteId + { votes :: Map VoteId Word64 } deriving stock (Show, Eq, Generic) deriving anyclass (Hashable) @@ -187,11 +201,15 @@ data Certificate = Certificate ---- Common defs ------------------------------------------- -slice :: Int -> SlotNo -> Int -> (SlotNo, SlotNo) -slice l s x = (toEnum s', toEnum $ s' + l - 1) +slice :: Int -> SlotNo -> Int -> Maybe (SlotNo, SlotNo) +slice l s x = do + guard (s' >= 0) + return (a, b) where -- taken from formal spec s' = (fromEnum s `div` l - x) * l + a = assert (s' >= 0) $ toEnum s' + b = assert (s' + l - 1 >= 0) $ toEnum $ s' + l - 1 ------------------------------------------- ---- MessageSize instances diff --git a/simulation/src/LeiosProtocol/Relay.hs b/simulation/src/LeiosProtocol/Relay.hs index 37396b79..faebf83c 100644 --- a/simulation/src/LeiosProtocol/Relay.hs +++ b/simulation/src/LeiosProtocol/Relay.hs @@ -38,7 +38,7 @@ import qualified Data.List as List import Data.List.NonEmpty (NonEmpty) import qualified Data.List.NonEmpty as NonEmpty import Data.Map (Map) -import qualified Data.Map as Map +import qualified Data.Map.Strict as Map import Data.Maybe (isJust, isNothing, mapMaybe) import Data.Monoid (Sum (..)) import Data.Sequence.Strict (StrictSeq) @@ -497,6 +497,8 @@ data SubmitPolicy = SubmitInOrder | SubmitAll data RelayConsumerConfig id header body m = RelayConsumerConfig { relay :: !RelayConfig + , headerValidationDelay :: header -> DiffTime + , threadDelayParallel :: [DiffTime] -> m () , headerId :: !(header -> id) , prioritize :: !(Map id header -> [header]) -- ^ returns a subset of headers, in order of what should be fetched first. @@ -510,8 +512,9 @@ data RelayConsumerConfig id header body m = RelayConsumerConfig -- ^ sends blocks to be validated/added to the buffer. Allowed to be -- blocking, but relayConsumer does not assume the blocks made it -- into the relayBuffer. Also takes a delivery time (relevant for - -- e.g. IB endorsement) and a callback that expects the subset of - -- validated blocks. + -- e.g. IB endorsement) and a callback that expects a subset of + -- validated blocks. Callback might be called more than once, with + -- different subsets. , submitPolicy :: !SubmitPolicy , maxHeadersToRequest :: !Word16 , maxBodiesToRequest :: !Word16 @@ -782,6 +785,8 @@ relayConsumerPipelined config sst = unless (Seq.length idsSeq <= fromIntegral windowExpand) $ throw IdsNotRequested + config.threadDelayParallel $ map config.headerValidationDelay headers + -- Upon receiving a batch of new headers we extend our available set, -- and extend the unacknowledged sequence. -- @@ -812,6 +817,10 @@ relayConsumerPipelined config sst = unless (idsReceived `Set.isSubsetOf` idsRequested) $ throw BodiesNotRequested + let notReceived = idsRequested `Set.difference` idsReceived + unless (Set.null notReceived) $ do + atomically $ modifyTVar' sst.inFlightVar (`Set.difference` notReceived) + -- We can match up all the txids we requested, with those we -- received. let idsRequestedWithBodiesReceived :: Map id (Maybe (header, body)) @@ -883,8 +892,8 @@ relayConsumerPipelined config sst = -- all blocks validated. modifyTVar' sst.relayBufferVar $ flip (Foldable.foldl' (\buf blk@(h, _) -> RB.snoc (config.headerId h) blk buf)) validated - -- TODO?: the ids we did not receive could be taken out earlier. - modifyTVar' sst.inFlightVar (`Set.difference` idsRequested) + -- TODO: won't remove from inFlight blocks not validated. + modifyTVar' sst.inFlightVar (`Set.difference` Set.fromList (map (config.headerId . fst) validated)) return $ idle diff --git a/simulation/src/LeiosProtocol/RelayBuffer.hs b/simulation/src/LeiosProtocol/RelayBuffer.hs index 75cb11ee..c748d0e6 100644 --- a/simulation/src/LeiosProtocol/RelayBuffer.hs +++ b/simulation/src/LeiosProtocol/RelayBuffer.hs @@ -10,7 +10,7 @@ import Data.FingerTree (FingerTree) import qualified Data.FingerTree as FingerTree import qualified Data.Foldable as Foldable import Data.Map (Map) -import qualified Data.Map as Map +import qualified Data.Map.Strict as Map import Data.Set (Set) import Data.Word (Word64) diff --git a/simulation/src/LeiosProtocol/Short.hs b/simulation/src/LeiosProtocol/Short.hs index 10010888..e7b4a741 100644 --- a/simulation/src/LeiosProtocol/Short.hs +++ b/simulation/src/LeiosProtocol/Short.hs @@ -5,6 +5,7 @@ {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TupleSections #-} +{-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeSynonymInstances #-} {-# LANGUAGE NoFieldSelectors #-} @@ -12,11 +13,11 @@ module LeiosProtocol.Short where import Control.Exception (assert) import Control.Monad (guard) -import Data.Set (Set) -import qualified Data.Set as Set +import Data.Map (Map) +import qualified Data.Map.Strict as Map +import Data.Maybe import LeiosProtocol.Common import ModelTCP -import Ouroboros.Network.AnchoredFragment (Anchor) import Prelude hiding (id) data SizesConfig = SizesConfig @@ -31,6 +32,17 @@ data SizesConfig = SizesConfig -- ^ certificate size might depend on number of votes. } +-- Note: ranking block validation delays are in the PraosConfig, covers certificate validation. +data LeiosDelays = LeiosDelays + { inputBlockHeaderValidation :: InputBlockHeader -> DiffTime + -- ^ vrf and signature + , inputBlockValidation :: InputBlock -> DiffTime + -- ^ hash matching and payload validation (incl. tx scripts) + , endorseBlockValidation :: EndorseBlock -> DiffTime + , voteMsgValidation :: VoteMsg -> DiffTime + , certificateCreation :: Certificate -> DiffTime + } + -- TODO: add feature flags to generalize from (Uniform) Short leios to other variants. -- Would need to rework def. of Stage to accomodate different pipeline shapes. data LeiosConfig = LeiosConfig @@ -46,7 +58,8 @@ data LeiosConfig = LeiosConfig , votingFrequencyPerStage :: Double , votesForCertificate :: Int , sizes :: SizesConfig - -- TODO: validation times and max sizes parameters. + , delays :: LeiosDelays + -- TODO?: max size parameters. } class FixSize a where @@ -95,7 +108,8 @@ instance FixSize VoteMsg where { size = cfg.sizes.producerId + messageSizeBytes v.slot - + cfg.sizes.reference {- EB ref -} + + 64 {- votes -} + + sum (map (const cfg.sizes.reference {- EB ref -}) endorseBlocks) + cfg.sizes.voteCrypto , .. } @@ -131,40 +145,47 @@ stageRange :: SlotNo -> -- | stage to compute the range for Stage -> - (SlotNo, SlotNo) + Maybe (SlotNo, SlotNo) stageRange cfg = stageRange' cfg.sliceLength -stageRange' :: Int -> Stage -> SlotNo -> Stage -> (SlotNo, SlotNo) +stageRange' :: Int -> Stage -> SlotNo -> Stage -> Maybe (SlotNo, SlotNo) stageRange' l s0 slot s = slice l slot (fromEnum s0 - fromEnum s) stageRange'_prop :: Int -> SlotNo -> Bool stageRange'_prop l slot = - and [slot `inRange` stageRange' l stage slot stage | stage <- stages] - && and [contiguous $ map (stageRange' l stage slot) stages | stage <- stages] + and [fromMaybe False $ (slot `inRange`) <$> stageRange' l stage slot stage | stage <- stages] + && and [contiguous $ mapMaybe (stageRange' l stage slot) stages | stage <- stages] where stages = [minBound .. maxBound] rightSize (a, b) = length [a .. b] == l contiguous (x : y : xs) = rightSize x && succ (snd x) == fst y && contiguous (y : xs) contiguous _ = True -stageEnd :: LeiosConfig -> Stage -> SlotNo -> Stage -> SlotNo -stageEnd l s0 slot s = snd $ stageRange l s0 slot s +stageEnd :: LeiosConfig -> Stage -> SlotNo -> Stage -> Maybe SlotNo +stageEnd l s0 slot s = snd <$> stageRange l s0 slot s -stageStart :: LeiosConfig -> Stage -> SlotNo -> Stage -> SlotNo -stageStart l s0 slot s = fst $ stageRange l s0 slot s +stageStart :: LeiosConfig -> Stage -> SlotNo -> Stage -> Maybe SlotNo +stageStart l s0 slot s = fst <$> stageRange l s0 slot s + +-- | Assumes pipelines start at slot 0 and keep going. +isStage :: LeiosConfig -> Stage -> SlotNo -> Bool +isStage cfg stage slot = fromEnum slot >= cfg.sliceLength * fromEnum stage ---------------------------------------------------------------------------------------------- ---- Smart constructors ---------------------------------------------------------------------------------------------- -mkRankingBlockBody :: LeiosConfig -> Maybe (EndorseBlockId, Certificate) -> Bytes -> RankingBlockBody -mkRankingBlockBody cfg ebs payload = - fixSize cfg $ - RankingBlockBody - { endorseBlocks = maybe [] (: []) ebs - , payload - , size = 0 - } +mkRankingBlockBody :: LeiosConfig -> NodeId -> Maybe (EndorseBlockId, Certificate) -> Bytes -> RankingBlockBody +mkRankingBlockBody cfg nodeId ebs payload = assert (isNothing ebs || messageSizeBytes rb >= segmentSize) $ rb + where + rb = + fixSize cfg $ + RankingBlockBody + { endorseBlocks = maybe [] (: []) ebs + , payload + , nodeId + , size = 0 + } mkInputBlockHeader :: LeiosConfig -> @@ -178,37 +199,40 @@ mkInputBlockHeader cfg id slot subSlot producer rankingBlock = fixSize cfg $ InputBlockHeader{size = 0, ..} mkInputBlock :: LeiosConfig -> InputBlockHeader -> Bytes -> InputBlock -mkInputBlock _cfg header bodySize = - InputBlock{header, body = InputBlockBody{id = header.id, size = bodySize}} +mkInputBlock _cfg header bodySize = assert (messageSizeBytes ib >= segmentSize) $ ib + where + ib = InputBlock{header, body = InputBlockBody{id = header.id, size = bodySize, slot = header.slot}} mkEndorseBlock :: LeiosConfig -> EndorseBlockId -> SlotNo -> NodeId -> [InputBlockId] -> EndorseBlock mkEndorseBlock cfg id slot producer inputBlocks = -- Endorse blocks are produced at the beginning of the stage. - assert (stageStart cfg Endorse slot Endorse == slot) $ + assert (stageStart cfg Endorse slot Endorse == Just slot) $ fixSize cfg $ EndorseBlock{endorseBlocksEarlierStage = [], endorseBlocksEarlierPipeline = [], size = 0, ..} -mkVoteMsg :: LeiosConfig -> VoteId -> SlotNo -> NodeId -> EndorseBlockId -> VoteMsg -mkVoteMsg cfg id slot producer endorseBlock = fixSize cfg $ VoteMsg{size = 0, ..} +mkVoteMsg :: LeiosConfig -> VoteId -> SlotNo -> NodeId -> Word64 -> [EndorseBlockId] -> VoteMsg +mkVoteMsg cfg id slot producer votes endorseBlocks = fixSize cfg $ VoteMsg{size = 0, ..} -mkCertificate :: LeiosConfig -> Set VoteId -> Certificate -mkCertificate cfg vs = assert (Set.size vs <= cfg.votesForCertificate) $ Certificate vs +mkCertificate :: LeiosConfig -> Map VoteId Word64 -> Certificate +mkCertificate cfg vs = + assert (fromIntegral cfg.votesForCertificate <= sum (Map.elems vs)) $ + Certificate vs --------------------------------------------------------------------------------------- ---- Selecting data to build blocks --------------------------------------------------------------------------------------- --- Buffers views, divided to avoid reading in unneeded buffers. +-- Buffers views, divided to avoid reading unneeded buffers. data NewRankingBlockData = NewRankingBlockData { freshestCertifiedEB :: Maybe (EndorseBlockId, Certificate) , txsPayload :: Bytes - , headAnchor :: Anchor RankingBlock } data NewInputBlockData = NewInputBlockData { referenceRankingBlock :: ChainHash RankingBlock + -- ^ points to prefix of current chain with ledger state computed. , txsPayload :: Bytes } @@ -232,12 +256,15 @@ inputBlocksToEndorse :: SlotNo -> InputBlocksSnapshot -> [InputBlockId] -inputBlocksToEndorse cfg current buffer = - buffer.validInputBlocks - InputBlocksQuery - { generatedBetween = stageRange cfg Endorse current Propose - , receivedBy = stageEnd cfg Endorse current Deliver2 - } +inputBlocksToEndorse cfg current buffer = fromMaybe [] $ do + generatedBetween <- stageRange cfg Endorse current Propose + receivedBy <- stageEnd cfg Endorse current Deliver2 + pure $ + buffer.validInputBlocks + InputBlocksQuery + { generatedBetween + , receivedBy + } shouldVoteOnEB :: LeiosConfig -> @@ -246,21 +273,22 @@ shouldVoteOnEB :: InputBlocksSnapshot -> EndorseBlock -> Bool +shouldVoteOnEB cfg slot _buffers | Nothing <- stageRange cfg Vote slot Propose = const False shouldVoteOnEB cfg slot buffers = cond where - generatedBetween = stageRange cfg Vote slot Propose + generatedBetween = fromMaybe (error "impossible") $ stageRange cfg Vote slot Propose receivedByEndorse = buffers.validInputBlocks InputBlocksQuery { generatedBetween - , receivedBy = stageEnd cfg Vote slot Endorse + , receivedBy = fromMaybe (error "impossible") $ stageEnd cfg Vote slot Endorse } receivedByDeliver1 = buffers.validInputBlocks q where q = InputBlocksQuery { generatedBetween - , receivedBy = stageEnd cfg Vote slot Deliver1 + , receivedBy = fromMaybe (error "impossible") $ stageEnd cfg Vote slot Deliver1 } -- TODO: use sets in EndorseBlock? subset xs ys = all (`elem` ys) xs @@ -271,7 +299,7 @@ shouldVoteOnEB cfg slot buffers = cond assumptions = null eb.endorseBlocksEarlierStage && null eb.endorseBlocksEarlierPipeline - && eb.slot `inRange` stageRange cfg Vote slot Endorse + && eb.slot `inRange` (fromMaybe (error "impossible") $ stageRange cfg Vote slot Endorse) -- A. all referenced IBs have been received by the end of the Endorse stage, -- C. all referenced IBs validate (wrt. script execution), and, -- D. only IBs from this pipeline’s Propose stage are referenced (and not from other pipelines). @@ -289,7 +317,7 @@ endorseBlocksToVoteFor :: endorseBlocksToVoteFor cfg slot ibs ebs = let cond = shouldVoteOnEB cfg slot ibs in map (.id) . filter cond $ - ebs.validEndorseBlocks (stageRange cfg Vote slot Endorse) + maybe [] ebs.validEndorseBlocks (stageRange cfg Vote slot Endorse) ----------------------------------------------------------------- ---- Expected generation rates in each slot. @@ -299,26 +327,37 @@ newtype NetworkRate = NetworkRate Double newtype NodeRate = NodeRate Double newtype StakeFraction = StakeFraction Double --- | Note: each SubSlot rate is `<= 1` by construction. -inputBlockRate :: LeiosConfig -> SlotNo -> [(SubSlotNo, NetworkRate)] -inputBlockRate LeiosConfig{inputBlockFrequencyPerSlot} _slot - | inputBlockFrequencyPerSlot <= 1 = [(0, NetworkRate inputBlockFrequencyPerSlot)] +splitIntoSubSlots :: NetworkRate -> [NetworkRate] +splitIntoSubSlots (NetworkRate r) + | r <= 1 = [NetworkRate r] | otherwise = - let q = ceiling inputBlockFrequencyPerSlot - fq = NetworkRate $ inputBlockFrequencyPerSlot / fromIntegral q - in map (,fq) [0 .. toEnum (q - 1)] - --- | Note: if the NodeRate ends up `>= 1`, you still only produce one block. -endorseBlockRate :: LeiosConfig -> SlotNo -> Maybe NetworkRate -endorseBlockRate cfg slot = do - guard $ stageStart cfg Endorse slot Endorse == slot - return $ NetworkRate cfg.endorseBlockFrequencyPerStage - --- | TODO: a little unclear what to do if the NodeRate is `>= 1`. -votingRate :: LeiosConfig -> SlotNo -> Maybe NetworkRate -votingRate cfg slot = do - guard $ slot `inRange` rangePrefix cfg.activeVotingStageLength (stageRange cfg Vote slot Vote) - return $ NetworkRate $ cfg.votingFrequencyPerStage / fromIntegral cfg.activeVotingStageLength + let + q = ceiling r + fq = NetworkRate $ r / fromIntegral q + in + replicate q fq + +inputBlockRate :: LeiosConfig -> SlotNo -> [NetworkRate] +inputBlockRate cfg@LeiosConfig{inputBlockFrequencyPerSlot} slot = + assert (isStage cfg Propose slot) $ + splitIntoSubSlots $ + NetworkRate inputBlockFrequencyPerSlot + +endorseBlockRate :: LeiosConfig -> SlotNo -> [NetworkRate] +endorseBlockRate cfg slot = fromMaybe [] $ do + guard $ isStage cfg Endorse slot + startEndorse <- stageStart cfg Endorse slot Endorse + guard $ startEndorse == slot + return $ splitIntoSubSlots $ NetworkRate cfg.endorseBlockFrequencyPerStage + +-- TODO: double check with technical report section on voting when ready. +votingRate :: LeiosConfig -> SlotNo -> [NetworkRate] +votingRate cfg slot = fromMaybe [] $ do + guard $ isStage cfg Vote slot + range <- stageRange cfg Vote slot Vote + guard $ slot `inRange` rangePrefix cfg.activeVotingStageLength range + let votingFrequencyPerSlot = cfg.votingFrequencyPerStage / fromIntegral cfg.activeVotingStageLength + return $ splitIntoSubSlots $ NetworkRate votingFrequencyPerSlot -- mostly here to showcase the types. nodeRate :: StakeFraction -> NetworkRate -> NodeRate diff --git a/simulation/src/LeiosProtocol/Short/Generate.hs b/simulation/src/LeiosProtocol/Short/Generate.hs index 5f14784d..4d8cce50 100644 --- a/simulation/src/LeiosProtocol/Short/Generate.hs +++ b/simulation/src/LeiosProtocol/Short/Generate.hs @@ -4,6 +4,7 @@ {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TupleSections #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE NoFieldSelectors #-} @@ -15,11 +16,13 @@ import Control.Concurrent.Class.MonadSTM ( MonadSTM (..), ) import Control.Exception (assert) +import Control.Monad (forM) +import Data.Bifunctor import Data.Kind -import Data.Traversable (forM) +import Data.Maybe (fromMaybe) import LeiosProtocol.Common import LeiosProtocol.Short hiding (Stage (..)) -import PraosProtocol.Common (fixupBlock, mkPartialBlock) +import PraosProtocol.Common (CPUTask (CPUTask), mkPartialBlock) import System.Random data BuffersView m = BuffersView @@ -31,9 +34,9 @@ data BuffersView m = BuffersView data Role :: Type -> Type where Base :: Role RankingBlock - Propose :: SubSlotNo -> Role InputBlock + Propose :: Role [InputBlock] Endorse :: Role EndorseBlock - Vote :: Role [VoteMsg] + Vote :: Role VoteMsg data SomeRole :: Type where SomeRole :: Role a -> SomeRole @@ -41,29 +44,39 @@ data SomeRole :: Type where data SomeAction :: Type where SomeAction :: Role a -> a -> SomeAction -mkScheduler :: MonadSTM m => StdGen -> (SlotNo -> [(SomeRole, NodeRate)]) -> m (SlotNo -> m [SomeRole]) +mkScheduler :: MonadSTM m => StdGen -> (SlotNo -> [(SomeRole, [NodeRate])]) -> m (SlotNo -> m [(SomeRole, Word64)]) mkScheduler rng0 rates = do - let sampleRate (role, NodeRate lambda) = do + let sampleRate (NodeRate lambda) = do (sample, rng') <- gets $ uniformR (0, 1) put $! rng' -- TODO: check poisson dist. math. let prob = lambda * exp (-lambda) - pure [role | sample <= prob] - + pure $ sample <= prob + sampleRates (role, rs) = do + wins <- fromIntegral . length . filter id <$> mapM sampleRate rs + return [(role, wins) | wins >= 1] rngVar <- newTVarIO rng0 let sched slot = atomically $ do rng <- readTVar rngVar - let (acts, rng1) = flip runState rng . fmap concat . mapM sampleRate $ (rates slot) + let (acts, rng1) = flip runState rng . fmap concat . mapM sampleRates $ (rates slot) writeTVar rngVar rng1 return $ acts return sched -waitNextSlot :: (Monad m, MonadTime m, MonadDelay m) => LeiosConfig -> m SlotNo -waitNextSlot cfg = do +-- | @waitNextSlot cfg targetSlot@ waits until the beginning of +-- @targetSlot@ if that's now or in the future, otherwise the closest slot. +waitNextSlot :: (Monad m, MonadTime m, MonadDelay m) => LeiosConfig -> SlotNo -> m SlotNo +waitNextSlot cfg targetSlot = do now <- getCurrentTime - let slot = - assert (cfg.praos.slotConfig.duration == 1) $ - toEnum (ceiling $ now `diffUTCTime` cfg.praos.slotConfig.start) + let targetSlotTime = slotTime cfg.praos.slotConfig targetSlot + let slot + | now <= targetSlotTime = targetSlot + | otherwise = assert (nextSlotIndex >= 0) $ toEnum nextSlotIndex + where + nextSlotIndex = + assert (cfg.praos.slotConfig.duration == 1) $ + ceiling $ + now `diffUTCTime` cfg.praos.slotConfig.start let tgt = slotTime cfg.praos.slotConfig slot threadDelayNDT (tgt `diffUTCTime` now) return slot @@ -72,8 +85,8 @@ data BlockGeneratorConfig m = BlockGeneratorConfig { leios :: LeiosConfig , nodeId :: NodeId , buffers :: BuffersView m - , schedule :: SlotNo -> m [SomeRole] - , submit :: [SomeAction] -> m () + , schedule :: SlotNo -> m [(SomeRole, Word64)] + , submit :: [([CPUTask], SomeAction)] -> m () } blockGenerator :: @@ -81,38 +94,43 @@ blockGenerator :: (MonadSTM m, MonadDelay m, MonadTime m) => BlockGeneratorConfig m -> m () -blockGenerator BlockGeneratorConfig{..} = go 0 +blockGenerator BlockGeneratorConfig{..} = go (0, 0) where - go !blkId = do - slot <- waitNextSlot leios + go (!blkId, !tgtSlot) = do + slot <- waitNextSlot leios tgtSlot roles <- schedule slot (actions, blkId') <- runStateT (mapM (execute slot) roles) blkId submit actions - go blkId' - execute slot (SomeRole r) = SomeAction r <$> execute' slot r - execute' :: SlotNo -> Role a -> StateT Int m a - execute' slot Base = do + go (blkId', slot + 1) + execute slot (SomeRole r, wins) = assert (wins >= 1) $ second (SomeAction r) <$> execute' slot r wins + execute' :: SlotNo -> Role a -> Word64 -> StateT Int m ([CPUTask], a) + execute' slot Base _wins = do rbData <- lift $ atomically $ buffers.newRBData - let body = mkRankingBlockBody leios rbData.freshestCertifiedEB rbData.txsPayload - -- TODO: maybe submit should do the fixupBlock. - return $! fixupBlock @_ @RankingBlock rbData.headAnchor (mkPartialBlock slot body) - execute' slot (Propose sub) = do - i <- nextBlkId InputBlockId - ibData <- lift $ atomically $ buffers.newIBData - let header = mkInputBlockHeader leios i slot sub nodeId ibData.referenceRankingBlock - return $! mkInputBlock leios header ibData.txsPayload - execute' slot Endorse = do - i <- nextBlkId EndorseBlockId - ibs <- lift $ atomically $ buffers.ibs - return $! mkEndorseBlock leios i slot nodeId $ inputBlocksToEndorse leios slot ibs - execute' slot Vote = do - votingFor <- lift $ atomically $ do - ibs <- buffers.ibs - ebs <- buffers.ebs - pure $ endorseBlocksToVoteFor leios slot ibs ebs - forM votingFor $ \eb -> do + let meb = rbData.freshestCertifiedEB + let !task = CPUTask $ fromMaybe 0 $ leios.delays.certificateCreation . snd <$> meb + let body = mkRankingBlockBody leios nodeId meb rbData.txsPayload + let !rb = mkPartialBlock slot body + return ([task], rb) + execute' slot Propose wins = + ([],) <$> do + ibData <- lift $ atomically $ buffers.newIBData + forM [toEnum $ fromIntegral sub | sub <- [0 .. wins - 1]] $ \sub -> do + i <- nextBlkId InputBlockId + let header = mkInputBlockHeader leios i slot sub nodeId ibData.referenceRankingBlock + return $! mkInputBlock leios header ibData.txsPayload + execute' slot Endorse _wins = + ([],) <$> do + i <- nextBlkId EndorseBlockId + ibs <- lift $ atomically $ buffers.ibs + return $! mkEndorseBlock leios i slot nodeId $ inputBlocksToEndorse leios slot ibs + execute' slot Vote votes = + ([],) <$> do + votingFor <- lift $ atomically $ do + ibs <- buffers.ibs + ebs <- buffers.ebs + pure $ endorseBlocksToVoteFor leios slot ibs ebs i <- nextBlkId VoteId - return $! mkVoteMsg leios i slot nodeId eb + return $! mkVoteMsg leios i slot nodeId votes votingFor nextBlkId :: (NodeId -> Int -> a) -> StateT Int m a nextBlkId f = do i <- get diff --git a/simulation/src/LeiosProtocol/Short/Node.hs b/simulation/src/LeiosProtocol/Short/Node.hs index 4d34f3d4..2c248525 100644 --- a/simulation/src/LeiosProtocol/Short/Node.hs +++ b/simulation/src/LeiosProtocol/Short/Node.hs @@ -1,27 +1,33 @@ +{-# LANGUAGE BangPatterns #-} {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE NondecreasingIndentation #-} {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TupleSections #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE ViewPatterns #-} module LeiosProtocol.Short.Node where import ChanMux +import Control.Category ((>>>)) import Control.Concurrent.Class.MonadMVar import Control.Concurrent.Class.MonadSTM -import Control.Monad (forever, guard, join) +import Control.Exception (assert) +import Control.Monad (forever, guard, when) import Control.Monad.Class.MonadAsync import Control.Monad.Class.MonadFork import Control.Tracer -import Data.Bifunctor (Bifunctor (first), second) +import Data.Bifunctor import Data.Coerce (coerce) import Data.Foldable (forM_) import Data.List (sort, sortOn) import Data.Map (Map) -import qualified Data.Map as Map +import qualified Data.Map.Strict as Map import Data.Maybe import Data.Ord +import Data.Set (Set) import qualified Data.Set as Set import LeiosProtocol.Common import LeiosProtocol.Relay @@ -31,10 +37,9 @@ import LeiosProtocol.Short.Generate import qualified LeiosProtocol.Short.Generate as Generate import ModelTCP import Numeric.Natural (Natural) -import Ouroboros.Network.Mock.Chain (headHash) -import PraosProtocol.BlockFetch (addProducedBlock) +import PraosProtocol.BlockFetch import PraosProtocol.Common -import PraosProtocol.Common.Chain (headAnchor) +import PraosProtocol.Common.Chain (dropUntil, headAnchor, headHash) import qualified PraosProtocol.PraosNode as PraosNode import System.Random @@ -47,15 +52,14 @@ data LeiosMessage = RelayIB {fromRelayIB :: RelayIBMessage} | RelayEB {fromRelayEB :: RelayEBMessage} | RelayVote {fromRelayVote :: RelayVoteMessage} - | -- | `BearerMsg` here is a bit ugly, but allows us to not have to split up PraosMessage in the Leios bundle. - PraosMsg {fromPraosMsg :: BearerMsg PraosMessage} + | PraosMsg {fromPraosMsg :: PraosMessage} + deriving (Show) data Leios f = Leios { protocolIB :: f RelayIBMessage , protocolEB :: f RelayEBMessage , protocolVote :: f RelayVoteMessage - , protocolPraos :: f (BearerMsg PraosMessage) - -- ^ use newChanMux on (Chan m PraosMessage) to get PraosNode.Praos bundle. + , protocolPraos :: PraosNode.Praos RankingBlockBody f } instance MessageSize LeiosMessage where @@ -72,19 +76,25 @@ instance MuxBundle Leios where { protocolIB = ToFromMuxMsg RelayIB fromRelayIB , protocolEB = ToFromMuxMsg RelayEB fromRelayEB , protocolVote = ToFromMuxMsg RelayVote fromRelayVote - , protocolPraos = ToFromMuxMsg PraosMsg fromPraosMsg + , protocolPraos = case toFromMuxMsgBundle @(PraosNode.Praos RankingBlockBody) of + PraosNode.Praos a b -> PraosNode.Praos (p >>> a) (p >>> b) } - traverseMuxBundle f (Leios a b c d) = Leios <$> f a <*> f b <*> f c <*> f d + where + p = ToFromMuxMsg PraosMsg fromPraosMsg + + traverseMuxBundle f (Leios a b c d) = Leios <$> f a <*> f b <*> f c <*> traverseMuxBundle f d type RelayIBState = RelayConsumerSharedState InputBlockId InputBlockHeader InputBlockBody type RelayEBState = RelayConsumerSharedState EndorseBlockId EndorseBlockId EndorseBlock type RelayVoteState = RelayConsumerSharedState VoteId VoteId VoteMsg data ValidationRequest m - = ValidateRB RankingBlock (m ()) - | ValidateIBS [(InputBlockHeader, InputBlockBody)] ([(InputBlockHeader, InputBlockBody)] -> STM m ()) - | ValidateEBS [EndorseBlock] ([EndorseBlock] -> STM m ()) - | ValidateVotes [VoteMsg] ([VoteMsg] -> STM m ()) + = ValidateRB !RankingBlock !(m ()) + | ValidateIBS ![(InputBlockHeader, InputBlockBody)] !UTCTime !([(InputBlockHeader, InputBlockBody)] -> STM m ()) + | ValidateEBS ![EndorseBlock] !([EndorseBlock] -> STM m ()) + | ValidateVotes ![VoteMsg] !([VoteMsg] -> STM m ()) + +data LedgerState = LedgerState data LeiosNodeState m = LeiosNodeState { praosState :: PraosNode.PraosNodeState RankingBlockBody m @@ -93,6 +103,12 @@ data LeiosNodeState m = LeiosNodeState , relayVoteState :: RelayVoteState m , ibDeliveryTimesVar :: TVar m (Map InputBlockId UTCTime) , validationQueue :: TBQueue m (ValidationRequest m) + , waitingForRBVar :: TVar m (Map (HeaderHash RankingBlock) [(DiffTime, m ())]) + -- ^ waiting for RB block itself to be validated. + , waitingForLedgerStateVar :: TVar m (Map (HeaderHash RankingBlock) [(DiffTime, m ())]) + -- ^ waiting for ledger state of RB block to be validated. + , ledgerStateVar :: TVar m (Map (HeaderHash RankingBlock) LedgerState) + , ibsNeededForEBVar :: TVar m (Map EndorseBlockId (Set InputBlockId)) } data LeiosNodeConfig = LeiosNodeConfig @@ -110,25 +126,132 @@ data LeiosNodeConfig = LeiosNodeConfig , processingQueueBound :: Natural } -data LeiosNodeEvent = PraosNodeEvent (PraosNode.PraosNodeEvent RankingBlockBody) - -- TODO: other events +data LeiosEventBlock + = EventIB InputBlock + | EventEB EndorseBlock + | EventVote VoteMsg + deriving (Show) + +data BlockEvent = Generate | Received | EnterState + deriving (Show) +data LeiosNodeEvent + = PraosNodeEvent (PraosNode.PraosNodeEvent RankingBlockBody) + | LeiosNodeEventCPU CPUTask + | LeiosNodeEvent BlockEvent LeiosEventBlock deriving (Show) +newRelayState :: + (Ord id, MonadSTM m) => + m (RelayConsumerSharedState id header body m) +newRelayState = do + relayBufferVar <- newTVarIO RB.empty + inFlightVar <- newTVarIO Set.empty + return $ RelayConsumerSharedState{relayBufferVar, inFlightVar} + setupRelay :: (Ord id, MonadAsync m, MonadSTM m, MonadDelay m, MonadTime m) => RelayConsumerConfig id header body m -> + RelayConsumerSharedState id header body m -> [Chan m (RelayMessage id header body)] -> [Chan m (RelayMessage id header body)] -> - m (RelayConsumerSharedState id header body m, [m ()]) -setupRelay cfg followers peers = do - relayBufferVar <- newTVarIO RB.empty - inFlightVar <- newTVarIO Set.empty - - let consumerSST = RelayConsumerSharedState{relayBufferVar, inFlightVar} + m [m ()] +setupRelay cfg consumerSST followers peers = do let producerSST = RelayProducerSharedState{relayBufferVar = asReadOnly consumerSST.relayBufferVar} let consumers = map (runRelayConsumer cfg consumerSST) peers let producers = map (runRelayProducer cfg.relay producerSST) followers - return $ (consumerSST, consumers ++ producers) + return $ consumers ++ producers + +type SubmitBlocks m header body = + [(header, body)] -> + UTCTime -> + ([(header, body)] -> STM m ()) -> + m () + +relayIBConfig :: + (MonadAsync m, MonadSTM m, MonadDelay m, MonadTime m) => + Tracer m LeiosNodeEvent -> + LeiosNodeConfig -> + SubmitBlocks m InputBlockHeader InputBlockBody -> + RelayConsumerConfig InputBlockId InputBlockHeader InputBlockBody m +relayIBConfig tracer cfg submitBlocks = + RelayConsumerConfig + { relay = RelayConfig{maxWindowSize = 100} + , headerId = (.id) + , headerValidationDelay = cfg.leios.delays.inputBlockHeaderValidation + , threadDelayParallel = threadDelayParallel tracer + , -- TODO: add prioritization policy to LeiosConfig + prioritize = sortOn (Down . (.slot)) . Map.elems + , submitPolicy = SubmitAll + , maxHeadersToRequest = 100 + , maxBodiesToRequest = 1 + , submitBlocks + } + +relayEBConfig :: + MonadDelay m => + Tracer m LeiosNodeEvent -> + LeiosNodeConfig -> + SubmitBlocks m EndorseBlockId EndorseBlock -> + RelayConsumerConfig EndorseBlockId EndorseBlockId EndorseBlock m +relayEBConfig tracer _cfg submitBlocks = + RelayConsumerConfig + { relay = RelayConfig{maxWindowSize = 100} + , headerId = id + , headerValidationDelay = const 0 + , threadDelayParallel = threadDelayParallel tracer + , -- TODO: add prioritization policy to LeiosConfig? + prioritize = sort . Map.elems + , submitPolicy = SubmitAll + , maxHeadersToRequest = 100 + , maxBodiesToRequest = 1 -- should we chunk bodies here? + , submitBlocks + } + +relayVoteConfig :: + MonadDelay m => + Tracer m LeiosNodeEvent -> + LeiosNodeConfig -> + SubmitBlocks m VoteId VoteMsg -> + RelayConsumerConfig VoteId VoteId VoteMsg m +relayVoteConfig tracer _cfg submitBlocks = + RelayConsumerConfig + { relay = RelayConfig{maxWindowSize = 100} + , headerId = id + , headerValidationDelay = const 0 + , threadDelayParallel = threadDelayParallel tracer + , -- TODO: add prioritization policy to LeiosConfig? + prioritize = sort . Map.elems + , submitPolicy = SubmitAll + , maxHeadersToRequest = 100 + , maxBodiesToRequest = 1 -- should we chunk bodies here? + , submitBlocks + } + +threadDelayParallel :: MonadDelay m => Tracer m LeiosNodeEvent -> [DiffTime] -> m () +threadDelayParallel _tracer [] = return () +threadDelayParallel tracer ds = do + forM_ ds (traceWith tracer . LeiosNodeEventCPU . CPUTask) + let d = maximum ds + when (d >= 0) $ threadDelaySI d + +newLeiosNodeState :: + forall m. + (MonadMVar m, MonadSTM m) => + LeiosNodeConfig -> + m (LeiosNodeState m) +newLeiosNodeState cfg = do + praosState <- PraosNode.newPraosNodeState cfg.baseChain + validationQueue <- newTBQueueIO cfg.processingQueueBound + relayIBState <- newRelayState + relayEBState <- newRelayState + relayVoteState <- newRelayState + ibDeliveryTimesVar <- newTVarIO Map.empty + ibsNeededForEBVar <- newTVarIO Map.empty + ledgerStateVar <- newTVarIO Map.empty + waitingForRBVar <- newTVarIO Map.empty + waitingForLedgerStateVar <- newTVarIO Map.empty + + return $ LeiosNodeState{..} leiosNode :: forall m. @@ -139,139 +262,260 @@ leiosNode :: [Leios (Chan m)] -> m ([m ()]) leiosNode tracer cfg followers peers = do - praosState <- PraosNode.newPraosNodeState cfg.baseChain - validationQueue <- newTBQueueIO cfg.processingQueueBound + leiosState@LeiosNodeState{..} <- newLeiosNodeState cfg + let + traceReceived :: [a] -> (a -> LeiosEventBlock) -> m () + traceReceived xs f = mapM_ (traceWith tracer . LeiosNodeEvent Received . f) xs + + -- tracing for RB already covered in blockFetchConsumer. let submitRB rb completion = atomically $ writeTBQueue validationQueue $! ValidateRB rb completion + let submitIB xs deliveryTime completion = do + traceReceived xs $ EventIB . uncurry InputBlock + atomically $ writeTBQueue validationQueue $! ValidateIBS xs deliveryTime $ completion + + let submitVote (map snd -> xs) _ completion = do + traceReceived xs EventVote + atomically $ writeTBQueue validationQueue $! ValidateVotes xs $ completion . map (\v -> (v.id, v)) + + let submitEB (map snd -> xs) _ completion = do + traceReceived xs EventEB + atomically $ writeTBQueue validationQueue $! ValidateEBS xs $ completion . map (\eb -> (eb.id, eb)) + praosThreads <- - join $ - PraosNode.setupPraosThreads' (contramap PraosNodeEvent tracer) cfg.leios.praos submitRB praosState - <$> (mapM (newMuxChan . protocolPraos) followers) - <*> (mapM (newMuxChan . protocolPraos) peers) - ibDeliveryTimesVar <- newTVarIO Map.empty - let relayIBConfig = - RelayConsumerConfig - { relay = RelayConfig{maxWindowSize = 100} - , headerId = (.id) - , -- TODO: add prioritization policy to LeiosConfig - prioritize = sortOn (Down . (.slot)) . Map.elems - , submitPolicy = SubmitAll - , maxHeadersToRequest = 100 - , maxBodiesToRequest = 1 - , submitBlocks = \xs deliveryTime completion -> atomically $ do - writeTBQueue validationQueue $! ValidateIBS xs $ \ys -> do - completion ys - modifyTVar' - ibDeliveryTimesVar - (Map.union $ Map.fromList [(h.id, deliveryTime) | (h, _) <- ys]) - } - let relayEBConfig = - RelayConsumerConfig - { relay = RelayConfig{maxWindowSize = 100} - , headerId = id - , -- TODO: add prioritization policy to LeiosConfig? - prioritize = sort . Map.elems - , submitPolicy = SubmitAll - , maxHeadersToRequest = 100 - , maxBodiesToRequest = 1 -- should we chunk bodies here? - , submitBlocks = \xs _ completion -> atomically $ do - writeTBQueue validationQueue $! - ValidateEBS (map snd xs) $ - completion . map (\eb -> (eb.id, eb)) - } - let relayVoteConfig = - RelayConsumerConfig - { relay = RelayConfig{maxWindowSize = 100} - , headerId = id - , -- TODO: add prioritization policy to LeiosConfig? - prioritize = sort . Map.elems - , submitPolicy = SubmitAll - , maxHeadersToRequest = 100 - , maxBodiesToRequest = 1 -- should we chunk bodies here? - , submitBlocks = \xs _ completion -> atomically $ do - writeTBQueue validationQueue $! - ValidateVotes (map snd xs) $ - completion . map (\v -> (v.id, v)) - } - (relayIBState, ibThreads) <- setupRelay relayIBConfig (map protocolIB followers) (map protocolIB peers) - (relayEBState, ebThreads) <- setupRelay relayEBConfig (map protocolEB followers) (map protocolEB peers) - (relayVoteState, voteThreads) <- setupRelay relayVoteConfig (map protocolVote followers) (map protocolVote peers) - let leiosState = LeiosNodeState{..} - let genThreads = [generate leiosState] - let processingThreads = [processing leiosState] - let pruningThreads = [] -- TODO: need EB/IBs to be around long enough to validate RBs + PraosNode.setupPraosThreads' + (contramap PraosNodeEvent tracer) + cfg.leios.praos + submitRB + praosState + (map protocolPraos followers) + (map protocolPraos peers) + + ibThreads <- + setupRelay + (relayIBConfig tracer cfg submitIB) + relayIBState + (map protocolIB followers) + (map protocolIB peers) + + ebThreads <- + setupRelay + (relayEBConfig tracer cfg submitEB) + relayEBState + (map protocolEB followers) + (map protocolEB peers) + + voteThreads <- + setupRelay + (relayVoteConfig tracer cfg submitVote) + relayVoteState + (map protocolVote followers) + (map protocolVote peers) + + let processWaitingForRB = + processWaiting + (contramap LeiosNodeEventCPU tracer) + Nothing -- unbounded parallelism + praosState.blockFetchControllerState.blocksVar + waitingForRBVar + + let processWaitingForLedgerState = + processWaiting + (contramap LeiosNodeEventCPU tracer) + Nothing -- unbounded parallelism + ledgerStateVar + waitingForLedgerStateVar + + let processingThreads = + [ validationDispatcher tracer cfg leiosState + , processWaitingForRB + , processWaitingForLedgerState + ] + + let blockGenerationThreads = [generator tracer cfg leiosState] + + let computeLedgerStateThreads = [computeLedgerStateThread tracer cfg leiosState] + + -- TODO: expiration times to be decided. At least need EB/IBs to be + -- around long enough to compute ledger state if they end in RB. + let pruningThreads = [] + return $ concat - [ genThreads - , processingThreads - , pruningThreads + [ coerce praosThreads , ibThreads , ebThreads , voteThreads - , coerce praosThreads + , processingThreads + , blockGenerationThreads + , pruningThreads + , computeLedgerStateThreads ] + +computeLedgerStateThread :: + forall m. + (MonadMVar m, MonadFork m, MonadAsync m, MonadSTM m, MonadTime m, MonadDelay m) => + Tracer m LeiosNodeEvent -> + LeiosNodeConfig -> + LeiosNodeState m -> + m () +computeLedgerStateThread _tracer _cfg st = forever $ do + _readyLedgerState <- atomically $ do + blocks <- readTVar st.praosState.blockFetchControllerState.blocksVar + when (Map.null blocks) retry + ledgerMissing <- Map.elems . (blocks Map.\\) <$> readTVar st.ledgerStateVar + when (null ledgerMissing) retry + ibsNeededForEB <- readTVar st.ibsNeededForEBVar + let readyLedgerState = + [ (blockHash rb, LedgerState) + | rb <- ledgerMissing + , flip all rb.blockBody.endorseBlocks $ \(ebId, _) -> + Map.lookup ebId ibsNeededForEB == Just Set.empty + ] + when (null readyLedgerState) retry + modifyTVar' st.ledgerStateVar (`Map.union` Map.fromList readyLedgerState) + return readyLedgerState + -- TODO? trace readyLedgerState + return () + +-- TODO: tracing events +validationDispatcher :: + forall m. + (MonadMVar m, MonadFork m, MonadAsync m, MonadSTM m, MonadTime m, MonadDelay m) => + Tracer m LeiosNodeEvent -> + LeiosNodeConfig -> + LeiosNodeState m -> + m () +validationDispatcher tracer cfg leiosState = forever $ do + -- NOTE: IOSim deschedules the thread after an `atomically`, we + -- might get more parallelism by reading the whole buffer at once, + -- collect all resulting delays and do a single + -- `threadDelayParallel` call. + req <- atomically $ readTBQueue leiosState.validationQueue + case req of + ValidateRB rb completion -> do + let !delay = cfg.leios.praos.blockValidationDelay rb + case blockPrevHash rb of + GenesisHash -> do + traceWith tracer . LeiosNodeEventCPU . CPUTask $ delay + threadDelaySI delay + completion + BlockHash prev -> atomically $ do + let var = + assert (rb.blockBody.payload >= 0) $ + if rb.blockBody.payload == 0 + then leiosState.waitingForRBVar + -- TODO: assumes payload can be validated without content of EB, check with spec. + else leiosState.waitingForLedgerStateVar + modifyTVar' var $ Map.insertWith (++) prev [(delay, completion)] + ValidateIBS ibs deliveryTime completion -> do + -- NOTE: IBs with an RB reference have to wait for ledger state of that RB. + let valIB x = + let + !delay = cfg.leios.delays.inputBlockValidation (uncurry InputBlock x) + task = atomically $ do + completion [x] + + -- NOTE: voting relies on delivery times for IBs + modifyTVar' + leiosState.ibDeliveryTimesVar + (Map.insertWith min (fst x).id deliveryTime) + + -- TODO: likely needs optimization + modifyTVar' leiosState.ibsNeededForEBVar (Map.map (Set.delete (fst x).id)) + in + (delay, task >> traceEnterState [uncurry InputBlock x] EventIB) + let waitingLedgerState = + Map.fromListWith + (++) + [ (rbHash, [valIB ib]) + | ib <- ibs + , BlockHash rbHash <- [(fst ib).rankingBlock] + ] + + atomically $ modifyTVar' leiosState.waitingForLedgerStateVar (`Map.union` waitingLedgerState) + + let (delays, ms) = unzip [valIB ib | ib@(h, _) <- ibs, GenesisHash <- [h.rankingBlock]] + threadDelayParallel tracer delays + sequence_ ms + ValidateEBS ebs completion -> do + -- NOTE: block references are only inspected during voting. + threadDelayParallel tracer $ map cfg.leios.delays.endorseBlockValidation ebs + atomically $ do + completion ebs + ibs <- RB.keySet <$> readTVar leiosState.relayIBState.relayBufferVar + let ibsNeeded = Map.fromList $ map (\eb -> (eb.id, Set.fromList eb.inputBlocks Set.\\ ibs)) ebs + modifyTVar' leiosState.ibsNeededForEBVar $ (`Map.union` ibsNeeded) + traceEnterState ebs EventEB + ValidateVotes vs completion -> do + threadDelayParallel tracer $ map cfg.leios.delays.voteMsgValidation vs + atomically $ completion vs + traceEnterState vs EventVote where - generate :: LeiosNodeState m -> m () - generate st = do - schedule <- mkSchedule cfg - let buffers = mkBuffersView cfg st - -- TODO: tracing events - let - submitOne :: SomeAction -> m () - submitOne x = atomically $ case x of - SomeAction Generate.Base rb -> - -- TODO: certificate construction delay? - addProducedBlock st.praosState.blockFetchControllerState rb - SomeAction (Generate.Propose _) ib -> - modifyTVar' st.relayIBState.relayBufferVar (RB.snoc ib.header.id (ib.header, ib.body)) - SomeAction Generate.Endorse eb -> - modifyTVar' st.relayEBState.relayBufferVar (RB.snoc eb.id (eb.id, eb)) - SomeAction Generate.Vote vs -> forM_ vs $ \v -> - modifyTVar' st.relayVoteState.relayBufferVar (RB.snoc v.id (v.id, v)) - let LeiosNodeConfig{..} = cfg - blockGenerator $ BlockGeneratorConfig{submit = mapM_ submitOne, ..} - - -- TODO: validation delays and tracing events. - processing :: LeiosNodeState m -> m () - processing leiosState = forever $ do - req <- atomically $ readTBQueue leiosState.validationQueue - case req of - ValidateRB _rb completion -> do - -- NOTE: in actual impl. have to wait for previous RB, but - -- not for its ledger state unless rb.payload > 0. - completion - ValidateIBS ibs completion -> atomically $ do - -- NOTE: in actual impl. have to wait for ledger state of referenced RBs. - completion ibs - ValidateEBS ebs completion -> do - -- TODO: only delay for crypto checks, contents used for voting. - atomically $ completion ebs - ValidateVotes vs completion -> do - -- TODO: only delay for crypto checks - atomically $ completion vs + traceEnterState :: [a] -> (a -> LeiosEventBlock) -> m () + traceEnterState xs f = forM_ xs $ traceWith tracer . LeiosNodeEvent EnterState . f + +generator :: + forall m. + (MonadMVar m, MonadFork m, MonadAsync m, MonadSTM m, MonadTime m, MonadDelay m) => + Tracer m LeiosNodeEvent -> + LeiosNodeConfig -> + LeiosNodeState m -> + m () +generator tracer cfg st = do + schedule <- mkSchedule cfg + let buffers = mkBuffersView cfg st + let + submitOne :: ([CPUTask], SomeAction) -> m () + submitOne (delays, x) = do + threadDelayParallel tracer (coerce delays) + case x of + SomeAction Generate.Base rb0 -> do + rb <- atomically $ do + ha <- headAnchor <$> PraosNode.preferredChain st.praosState + let rb = fixupBlock ha rb0 + addProducedBlock st.praosState.blockFetchControllerState rb + return rb + traceWith tracer (PraosNodeEvent (PraosNodeEventGenerate rb)) + SomeAction Generate.Propose ibs -> forM_ ibs $ \ib -> do + atomically $ modifyTVar' st.relayIBState.relayBufferVar (RB.snoc ib.header.id (ib.header, ib.body)) + traceWith tracer (LeiosNodeEvent Generate (EventIB ib)) + SomeAction Generate.Endorse eb -> do + atomically $ modifyTVar' st.relayEBState.relayBufferVar (RB.snoc eb.id (eb.id, eb)) + traceWith tracer (LeiosNodeEvent Generate (EventEB eb)) + SomeAction Generate.Vote v -> do + atomically $ modifyTVar' st.relayVoteState.relayBufferVar (RB.snoc v.id (v.id, v)) + traceWith tracer (LeiosNodeEvent Generate (EventVote v)) + let LeiosNodeConfig{..} = cfg + -- TODO: more parallelism `mapM_ submitOne` will make each later submission wait. + blockGenerator $ BlockGeneratorConfig{submit = mapM_ submitOne, ..} mkBuffersView :: forall m. MonadSTM m => LeiosNodeConfig -> LeiosNodeState m -> BuffersView m mkBuffersView cfg st = BuffersView{..} where newRBData = do - headAnchor' <- headAnchor <$> PraosNode.preferredChain st.praosState bufferEB <- map snd . RB.values <$> readTVar st.relayEBState.relayBufferVar bufferVotes <- map snd . RB.values <$> readTVar st.relayVoteState.relayBufferVar -- TODO: cache? - let votesForEB = Map.fromListWith Set.union [(v.endorseBlock, Set.singleton v.id) | v <- bufferVotes] + let votesForEB = Map.fromListWith Map.union [(eb, Map.singleton v.id v.votes) | v <- bufferVotes, eb <- v.endorseBlocks] + -- TODO: certificate construction delay? + let totalVotes = fromIntegral . sum . Map.elems let tryCertify eb = do votes <- Map.lookup eb.id votesForEB - guard (cfg.leios.votesForCertificate <= Set.size votes) - return (eb.id, mkCertificate cfg.leios votes) + guard (cfg.leios.votesForCertificate <= totalVotes votes) + return $! (eb.id,) $! mkCertificate cfg.leios votes + -- TODO: cache index of EBs ordered by .slot descending? let freshestCertifiedEB = listToMaybe . mapMaybe tryCertify . sortOn (Down . (.slot)) $ bufferEB return $ NewRankingBlockData { freshestCertifiedEB , txsPayload = cfg.rankingBlockPayload - , headAnchor = headAnchor' } newIBData = do - referenceRankingBlock <- headHash <$> PraosNode.preferredChain st.praosState + ledgerState <- readTVar st.ledgerStateVar + referenceRankingBlock <- + headHash . dropUntil (flip Map.member ledgerState . blockHash) + <$> PraosNode.preferredChain st.praosState let txsPayload = cfg.inputBlockPayload return $ NewInputBlockData{referenceRankingBlock, txsPayload} ibs = do @@ -297,13 +541,13 @@ mkBuffersView cfg st = BuffersView{..} return EndorseBlocksSnapshot{..} -mkSchedule :: MonadSTM m => LeiosNodeConfig -> m (SlotNo -> m [SomeRole]) +mkSchedule :: MonadSTM m => LeiosNodeConfig -> m (SlotNo -> m [(SomeRole, Word64)]) mkSchedule cfg = mkScheduler cfg.rng rates where rates slot = - (map . second) (nodeRate cfg.stake) . concat $ - [ (map . first) (SomeRole . Generate.Propose) $ inputBlockRate cfg.leios slot - , map (SomeRole Generate.Endorse,) . maybe [] (: []) $ endorseBlockRate cfg.leios slot - , map (SomeRole Generate.Vote,) . maybe [] (: []) $ votingRate cfg.leios slot - , [(SomeRole Generate.Base, NetworkRate cfg.rankingBlockFrequencyPerSlot)] + (map . second . map) (nodeRate cfg.stake) $ + [ (SomeRole Generate.Propose, inputBlockRate cfg.leios slot) + , (SomeRole Generate.Endorse, endorseBlockRate cfg.leios slot) + , (SomeRole Generate.Vote, votingRate cfg.leios slot) + , (SomeRole Generate.Base, [NetworkRate cfg.rankingBlockFrequencyPerSlot]) ] diff --git a/simulation/src/LeiosProtocol/Short/Sim.hs b/simulation/src/LeiosProtocol/Short/Sim.hs new file mode 100644 index 00000000..01da0113 --- /dev/null +++ b/simulation/src/LeiosProtocol/Short/Sim.hs @@ -0,0 +1,152 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module LeiosProtocol.Short.Sim where + +import ChanMux +import ChanTCP +import Control.Monad.IOSim as IOSim (IOSim, runSimTrace) +import Control.Tracer as Tracer ( + Contravariant (contramap), + Tracer, + traceWith, + ) +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as Map +import Data.Set (Set) +import qualified Data.Set as Set + +-- import PraosProtocol.Common hiding (Point) + +import Control.Monad (forever) +import Control.Monad.Class.MonadFork (MonadFork (forkIO)) +import LeiosProtocol.Common hiding (Point) +import LeiosProtocol.Short +import LeiosProtocol.Short.Node +import PraosProtocol.Common (defaultPraosConfig) +import PraosProtocol.Common.Chain (Chain (..)) +import SimTCPLinks +import SimTypes +import System.Random (mkStdGen) + +type LeiosTrace = [(Time, LeiosEvent)] + +data LeiosEvent + = -- | Declare the nodes and links + LeiosEventSetup + !WorldShape + !(Map NodeId Point) -- nodes and locations + !(Set (NodeId, NodeId)) -- links between nodes + | -- | An event at a node + LeiosEventNode (LabelNode LeiosNodeEvent) + | -- | An event on a tcp link between two nodes + LeiosEventTcp (LabelLink (TcpEvent LeiosMessage)) + deriving (Show) + +messages :: [(a, LeiosEvent)] -> [(a, LabelLink LeiosMessage)] +messages trace = [(t, LabelLink x y msg) | (t, LeiosEventTcp (LabelLink x y (TcpSendMsg msg _ _))) <- trace] + +exampleTrace1 :: LeiosTrace +exampleTrace1 = traceRelayLink1 $ mkTcpConnProps 0.1 1000000 + +traceRelayLink1 :: + TcpConnProps -> + LeiosTrace +traceRelayLink1 tcpprops = + selectTimedEvents $ + runSimTrace $ do + traceWith tracer $ + LeiosEventSetup + WorldShape + { worldDimensions = (500, 500) + , worldIsCylinder = False + } + ( Map.fromList + [ (nodeA, Point 50 100) + , (nodeB, Point 450 100) + ] + ) + ( Set.fromList + [(nodeA, nodeB), (nodeB, nodeA)] + ) + praosConfig <- defaultPraosConfig + let leiosConfig = + LeiosConfig + { praos = praosConfig + , sliceLength = 5 -- matching the interval between RBs + , -- \^ measured in slots, also stage length in Short leios. + inputBlockFrequencyPerSlot = 5 + , -- \^ expected InputBlock generation rate per slot. + endorseBlockFrequencyPerStage = 4 + , -- \^ expected EndorseBlock generation rate per stage, at most one per _node_ in each (pipeline, stage). + activeVotingStageLength = 1 + , votingFrequencyPerStage = 4 + , votesForCertificate = 1 -- just two nodes available to vote! + , sizes -- TODO: realistic sizes + = + SizesConfig + { producerId = 4 + , vrfProof = 32 + , signature_ = 32 + , reference = 32 + , voteCrypto = 64 + , certificate = const (50 * 1024) + } + , delays = + LeiosDelays + { inputBlockHeaderValidation = const 0.005 + , -- \^ vrf and signature + inputBlockValidation = const 0.1 + , -- \^ hash matching and payload validation (incl. tx scripts) + endorseBlockValidation = const 0.005 + , voteMsgValidation = const 0.005 + , certificateCreation = const 0.050 + } + } + let leiosNodeConfig nodeId@(NodeId i) = + LeiosNodeConfig + { leios = leiosConfig + , rankingBlockFrequencyPerSlot = 1 / fromIntegral leiosConfig.sliceLength -- every 5 seconds + , stake = StakeFraction 0.5 -- just two nodes! + , rng = mkStdGen i + , -- \^ for block generation + baseChain = Genesis + , rankingBlockPayload = 0 + , -- \^ overall size of txs to include in RBs + inputBlockPayload = 96 * 1024 + , -- \^ overall size of txs to include in IBs + processingQueueBound = 100 + , .. + } + + (pA, cB) <- newConnectionBundleTCP (leiosTracer nodeA nodeB) tcpprops + (cA, pB) <- newConnectionBundleTCP (leiosTracer nodeA nodeB) tcpprops + threads <- + (++) + <$> (leiosNode (nodeTracer nodeA) (leiosNodeConfig nodeA) [pA] [cA]) + <*> (leiosNode (nodeTracer nodeB) (leiosNodeConfig nodeB) [pB] [cB]) + mapM_ forkIO threads + forever $ threadDelaySI 1000 + where + (nodeA, nodeB) = (NodeId 0, NodeId 1) + + tracer :: Tracer (IOSim s) LeiosEvent + tracer = simTracer + + nodeTracer :: NodeId -> Tracer (IOSim s) LeiosNodeEvent + nodeTracer n = + contramap (LeiosEventNode . LabelNode n) tracer + + leiosTracer :: + NodeId -> + NodeId -> + Tracer (IOSim s) (LabelTcpDir (TcpEvent LeiosMessage)) + leiosTracer nfrom nto = + contramap (LeiosEventTcp . labelDirToLabelLink nfrom nto) tracer diff --git a/simulation/src/LeiosProtocol/Short/SimP2P.hs b/simulation/src/LeiosProtocol/Short/SimP2P.hs new file mode 100644 index 00000000..ac2e0511 --- /dev/null +++ b/simulation/src/LeiosProtocol/Short/SimP2P.hs @@ -0,0 +1,193 @@ +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeApplications #-} + +module LeiosProtocol.Short.SimP2P where + +import Control.Monad.IOSim as IOSim (IOSim, runSimTrace) +import Control.Tracer as Tracer ( + Contravariant (contramap), + Tracer, + traceWith, + ) +import Data.List (unfoldr) +import qualified Data.Map.Strict as Map +import System.Random (StdGen, mkStdGen, split) + +import ChanMux (newConnectionBundleTCP) +import ChanTCP +import Control.Monad (forever) +import Control.Monad.Class.MonadFork (MonadFork (forkIO)) +import LeiosProtocol.Short +import LeiosProtocol.Short.Node +import LeiosProtocol.Short.Sim +import P2P (P2PTopography (..), P2PTopographyCharacteristics (..), genArbitraryP2PTopography) +import PraosProtocol.Common +import PraosProtocol.Common.Chain (Chain (..)) +import SimTCPLinks (labelDirToLabelLink, mkTcpConnProps, selectTimedEvents, simTracer) +import SimTypes + +traceLeiosP2P :: + StdGen -> + P2PTopography -> + (DiffTime -> TcpConnProps) -> + (SlotConfig -> NodeId -> StdGen -> LeiosNodeConfig) -> + LeiosTrace +traceLeiosP2P + rng0 + P2PTopography + { p2pNodes + , p2pLinks + , p2pWorldShape + } + tcpprops + leiosNodeConfig = + selectTimedEvents $ + runSimTrace $ do + slotConfig <- slotConfigFromNow + traceWith tracer $ + LeiosEventSetup + p2pWorldShape + p2pNodes + (Map.keysSet p2pLinks) + tcplinks <- + sequence + [ do + (inChan, outChan) <- + newConnectionBundleTCP @Leios + (linkTracer na nb) + (tcpprops (realToFrac latency)) + return ((na, nb), (inChan, outChan)) + | ((na, nb), latency) <- Map.toList p2pLinks + ] + let tcplinksInChan = + Map.fromListWith + (++) + [ (nfrom, [inChan]) + | ((nfrom, _nto), (inChan, _outChan)) <- tcplinks + ] + tcplinksOutChan = + Map.fromListWith + (++) + [ (nto, [outChan]) + | ((_nfrom, nto), (_inChan, outChan)) <- tcplinks + ] + -- Note that the incomming edges are the output ends of the + -- channels and vice versa. That's why it looks backwards. + + -- Nested children threads are slow with IOSim, this impl forks them all as direct children. + mapM_ + (\m -> mapM_ forkIO =<< m) + [ leiosNode + (nodeTracer nid) + (leiosNodeConfig slotConfig nid rng) + (Map.findWithDefault [] nid tcplinksInChan) + (Map.findWithDefault [] nid tcplinksOutChan) + | (nid, rng) <- + zip + (Map.keys p2pNodes) + (unfoldr (Just . split) rng0) + ] + forever $ threadDelaySI 1000 + where + tracer :: Tracer (IOSim s) LeiosEvent + tracer = simTracer + + nodeTracer :: NodeId -> Tracer (IOSim s) LeiosNodeEvent + nodeTracer n = + contramap (LeiosEventNode . LabelNode n) tracer + + linkTracer :: + NodeId -> + NodeId -> + Tracer + (IOSim s) + (LabelTcpDir (TcpEvent LeiosMessage)) + linkTracer nfrom nto = + contramap (LeiosEventTcp . labelDirToLabelLink nfrom nto) tracer + +exampleTrace2 :: LeiosTrace +exampleTrace2 = exampleTrace2' (mkStdGen 4) True + +exampleTrace2' :: StdGen -> Bool -> LeiosTrace +exampleTrace2' rng0 worldIsCylinder = + traceLeiosP2P + rng0 + p2pTopography + (\latency -> mkTcpConnProps latency (kilobytes 1000)) + leiosNodeConfig + where + leiosNodeConfig slotConfig nodeId rng = + LeiosNodeConfig + { stake = StakeFraction $ 1 / fromIntegral p2pNumNodes + , baseChain = Genesis + , leios + , rankingBlockFrequencyPerSlot = 1 / fromIntegral leios.sliceLength + , rankingBlockPayload = 0 + , inputBlockPayload = kilobytes 96 + , processingQueueBound = 100 + , nodeId + , rng + } + where + leios = exampleLeiosConfig slotConfig + p2pTopography = + genArbitraryP2PTopography p2pTopographyCharacteristics rng0 + + p2pNumNodes = 100 + p2pWorldShape = + WorldShape + { worldDimensions = (0.600, 0.300) + , worldIsCylinder + } + p2pTopographyCharacteristics = + P2PTopographyCharacteristics + { p2pWorldShape + , p2pNumNodes + , p2pNodeLinksClose = 5 + , p2pNodeLinksRandom = 5 + } + +exampleLeiosConfig :: SlotConfig -> LeiosConfig +exampleLeiosConfig slotConfig = leios + where + -- TODO: review voting numbers, these might not make sense. + leios = + LeiosConfig + { praos + , sliceLength = 5 -- matching the interval between RBs + , inputBlockFrequencyPerSlot = 5 + , endorseBlockFrequencyPerStage = 1.5 + , activeVotingStageLength = 1 + , votingFrequencyPerStage = 500 + , votesForCertificate = 150 + , sizes + , delays + } + -- TODO: realistic sizes + sizes = + SizesConfig + { producerId = 4 + , vrfProof = 32 + , signature_ = 32 + , reference = 32 + , voteCrypto = 64 + , certificate = const (50 * 1024) + } + delays = + LeiosDelays + { inputBlockHeaderValidation = const 0.005 + , inputBlockValidation = const 0.1 + , endorseBlockValidation = const 0.005 + , voteMsgValidation = const 0.005 + , certificateCreation = const 0.050 + } + + praos = + PraosConfig + { slotConfig + , blockValidationDelay = const 0.1 -- 100ms --TODO: should depend on certificate/payload + , headerValidationDelay = const 0.005 -- 5ms + } diff --git a/simulation/src/LeiosProtocol/Short/VizSim.hs b/simulation/src/LeiosProtocol/Short/VizSim.hs new file mode 100644 index 00000000..1efc1fa7 --- /dev/null +++ b/simulation/src/LeiosProtocol/Short/VizSim.hs @@ -0,0 +1,732 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE MonoLocalBinds #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeApplications #-} + +module LeiosProtocol.Short.VizSim where + +import ChanDriver +import Control.Exception (assert) +import Data.Coerce (coerce) +import Data.Hashable (hash) +import Data.IntMap (IntMap) +import qualified Data.IntMap.Strict as IMap +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as Map +import Data.Maybe (fromMaybe) +import Data.PQueue.Min (MinQueue) +import qualified Data.PQueue.Min as PQ +import Data.Set (Set) +import qualified Data.Set as Set +import GHC.Records +import qualified Graphics.Rendering.Cairo as Cairo +import LeiosProtocol.Common hiding (Point) +import LeiosProtocol.Relay (Message (MsgRespondBodies), RelayMessage, relayMessageLabel) +import LeiosProtocol.Short.Node (BlockEvent (..), LeiosEventBlock (..), LeiosMessage (..), LeiosNodeEvent (..), RelayEBMessage, RelayIBMessage, RelayVoteMessage) +import LeiosProtocol.Short.Sim (LeiosEvent (..), LeiosTrace, exampleTrace1) +import ModelTCP +import Network.TypedProtocol +import P2P (linkPathLatenciesSquared) +import PraosProtocol.Common hiding (Point) +import PraosProtocol.PraosNode (PraosMessage (..)) +import qualified PraosProtocol.VizSimPraos as VizSimPraos +import SimTypes +import Viz +import VizSim +import VizSimTCP ( + TcpSimVizConfig (..), + lineMessageInFlight, + renderMessagesInFlight, + ) +import VizUtils + +example1 :: Visualization +example1 = + slowmoVisualization 0.5 $ + Viz model $ + LayoutReqSize 500 650 $ + Layout $ + leiosSimVizRender examplesLeiosSimVizConfig + where + model = leiosSimVizModel trace + where + trace = exampleTrace1 + +examplesLeiosSimVizConfig :: LeiosVizConfig +examplesLeiosSimVizConfig = LeiosVizConfig{..} + where + VizSimPraos.PraosVizConfig{..} = VizSimPraos.examplesPraosSimVizConfig + praosMessageColor = either chainSyncMessageColor blockFetchMessageColor . coerce + praosMessageText = either chainSyncMessageText blockFetchMessageText . coerce + relayIBMessageColor :: RelayIBMessage -> (Double, Double, Double) + relayIBMessageColor = relayMessageColor $ \(InputBlockId x y) -> (x, y) + relayEBMessageColor :: RelayEBMessage -> (Double, Double, Double) + relayEBMessageColor = relayMessageColor $ \(EndorseBlockId x y) -> (x, y) + relayVoteMessageColor :: RelayVoteMessage -> (Double, Double, Double) + relayVoteMessageColor = relayMessageColor $ \(VoteId x y) -> (x, y) + relayIBMessageText :: RelayIBMessage -> Maybe String + relayIBMessageText = relayMessageText "IB:" + relayEBMessageText :: RelayEBMessage -> Maybe String + relayEBMessageText = relayMessageText "EB:" + relayVoteMessageText :: RelayVoteMessage -> Maybe String + relayVoteMessageText = relayMessageText "Vote:" + relayMessageText prefix (ProtocolMessage (SomeMessage msg)) = Just $ prefix ++ relayMessageLabel msg + relayMessageColor :: (id -> (NodeId, Int)) -> RelayMessage id header body -> (Double, Double, Double) + relayMessageColor f (ProtocolMessage (SomeMessage msg)) = case msg of + MsgRespondBodies bodies -> hashToColor . hash $ map (f . fst) bodies + _otherwise -> (1, 0, 0) + +------------------------------------------------------------------------------ +-- The vizualisation model +-- + +-- | The vizualisation data model for the relay simulation +type LeiosSimVizModel = + SimVizModel + LeiosEvent + LeiosSimVizState + +-- | The vizualisation state within the data model for the relay simulation +data LeiosSimVizState + = LeiosSimVizState + { vizWorldShape :: !WorldShape + , vizNodePos :: !(Map NodeId Point) + , vizNodeLinks :: !(Map (NodeId, NodeId) LinkPoints) + , vizMsgsInTransit :: + !( Map + (NodeId, NodeId) + [ ( LeiosMessage + , TcpMsgForecast + , [TcpMsgForecast] + ) + ] + ) + , vizNodeTip :: !(Map NodeId FullTip) + , -- the Buffer and Queue names are legacy from VizSimRelay. + -- In Praos we consider: + -- * Queue = seen by blockFetchConsumer and not yet in Buffer + -- * Buffer = added to blocksVar + vizMsgsAtNodeQueue :: !(Map NodeId [RankingBlockHeader]) + , vizMsgsAtNodeBuffer :: !(Map NodeId [RankingBlockHeader]) + , vizMsgsAtNodeRecentQueue :: !(Map NodeId RecentRate) + , vizMsgsAtNodeRecentBuffer :: !(Map NodeId RecentRate) + , vizMsgsAtNodeTotalQueue :: !(Map NodeId Int) + , vizMsgsAtNodeTotalBuffer :: !(Map NodeId Int) + , -- these are `Block`s generated (globally). + vizNumMsgsGenerated :: !Int + , vizMsgsDiffusionLatency :: !DiffusionLatencyMap + , ibMsgs :: !(LeiosSimVizMsgsState InputBlockId InputBlock) + , ebMsgs :: !(LeiosSimVizMsgsState EndorseBlockId EndorseBlock) + , voteMsgs :: !(LeiosSimVizMsgsState VoteId VoteMsg) + , ibsInRBs :: !IBsInRBsState + } + +data LeiosSimVizMsgsState id msg = LeiosSimVizMsgsState + { msgsAtNodeQueue :: !(Map NodeId [msg]) + , msgsAtNodeBuffer :: !(Map NodeId [msg]) + , msgsAtNodeRecentQueue :: !(Map NodeId RecentRate) + , msgsAtNodeRecentBuffer :: !(Map NodeId RecentRate) + , msgsAtNodeTotalQueue :: !(Map NodeId Int) + , msgsAtNodeTotalBuffer :: !(Map NodeId Int) + , -- these are `Block`s generated (globally). + numMsgsGenerated :: !Int + } + +data IBsInRBsState = IBsInRBsState + { ibsInEBs :: !(Map EndorseBlockId (Set InputBlockId)) + , ebsInRBs :: !(Map RankingBlockId (Set EndorseBlockId)) + } + +data IBsInRBsReport = IBsInRBsReport {ibsInRBsNum :: !Int, ibsInEBsNum :: !Int, ebsInRBsNum :: !Int} + +totalIBsInRBs :: IBsInRBsState -> IBsInRBsReport +totalIBsInRBs s = IBsInRBsReport{..} + where + elemsSet x = Set.unions . Map.elems $ x + ibsInRBsNum = Set.size $ elemsSet $ Map.restrictKeys s.ibsInEBs ebsInRBsSet + ebsInRBsSet = elemsSet s.ebsInRBs + ebsInRBsNum = Set.size ebsInRBsSet + ibsInEBsNum = Set.size $ elemsSet s.ibsInEBs + +-- | The end points where the each link, including the case where the link +-- wraps around on a cylindrical world. +data LinkPoints + = -- | The link goes directly from the node point to node point, without + -- wrapping around across the West\/East edge. + LinkPointsNoWrap {-# UNPACK #-} !Point {-# UNPACK #-} !Point + | -- The link does cross the West\/East edge. The points provided are the + -- starting node point, two virtual points outside the world rectangle + -- indicating where a node following a straight line would be, and the + -- final node point. So if one draws lines between the two pairs of + -- points (with appropriate clipping) then it will look like it wraps + -- around. + LinkPointsWrap + {-# UNPACK #-} !Point + {-# UNPACK #-} !Point + {-# UNPACK #-} !Point + {-# UNPACK #-} !Point + deriving (Show) + +type ChainsMap = IntMap (Chain (Block RankingBlockBody)) + +accumChains :: Time -> LeiosEvent -> ChainsMap -> ChainsMap +accumChains _ (LeiosEventNode (LabelNode nid (PraosNodeEvent (PraosNodeEventNewTip ch)))) = IMap.insert (coerce nid) ch +accumChains _ _ = id + +type DiffusionLatencyMap = Map (HeaderHash RankingBlockHeader) (RankingBlockHeader, NodeId, Time, [Time]) + +accumDiffusionLatency :: Time -> LeiosEvent -> DiffusionLatencyMap -> DiffusionLatencyMap +accumDiffusionLatency now (LeiosEventNode (LabelNode n (PraosNodeEvent e))) = accumDiffusionLatency' now (LabelNode n e) +accumDiffusionLatency _ _ = id +accumDiffusionLatency' :: Time -> LabelNode (PraosNodeEvent RankingBlockBody) -> DiffusionLatencyMap -> DiffusionLatencyMap +accumDiffusionLatency' now (LabelNode nid (PraosNodeEventGenerate blk)) vs = + assert (not (blockHash blk `Map.member` vs)) $ + Map.insert + (blockHash blk) + (blockHeader blk, nid, now, [now]) + vs +accumDiffusionLatency' now (LabelNode _nid (PraosNodeEventEnterState blk)) vs = + Map.adjust + ( \(hdr, nid', created, arrivals) -> + (hdr, nid', created, now : arrivals) + ) + (blockHash blk) + vs +accumDiffusionLatency' _ _ vs = vs + +-- | Make the vizualisation model for the relay simulation from a simulation +-- trace. +leiosSimVizModel :: + LeiosTrace -> + VizModel LeiosSimVizModel +leiosSimVizModel = + simVizModel + accumEventVizState + pruneVisState + initVizState + where + initVizState = + LeiosSimVizState + { vizWorldShape = WorldShape (0, 0) False + , vizNodePos = Map.empty + , vizNodeLinks = Map.empty + , vizMsgsInTransit = Map.empty + , vizNodeTip = Map.empty + , vizMsgsAtNodeQueue = Map.empty + , vizMsgsAtNodeBuffer = Map.empty + , vizMsgsAtNodeRecentQueue = Map.empty + , vizMsgsAtNodeRecentBuffer = Map.empty + , vizMsgsAtNodeTotalQueue = Map.empty + , vizMsgsAtNodeTotalBuffer = Map.empty + , vizNumMsgsGenerated = 0 + , vizMsgsDiffusionLatency = Map.empty + , ibMsgs = initMsgs + , ebMsgs = initMsgs + , voteMsgs = initMsgs + , ibsInRBs = IBsInRBsState Map.empty Map.empty + } + + accumEventVizState :: + Time -> + LeiosEvent -> + LeiosSimVizState -> + LeiosSimVizState + accumEventVizState _now (LeiosEventSetup shape nodes links) vs = + vs + { vizWorldShape = shape + , vizNodePos = nodes + , vizNodeLinks = + Map.fromSet + ( \(n1, n2) -> + linkPoints + shape + (nodes Map.! n1) + (nodes Map.! n2) + ) + links + } + accumEventVizState _now (LeiosEventNode (LabelNode nid (PraosNodeEvent (PraosNodeEventNewTip tip)))) vs = + vs{vizNodeTip = Map.insert nid (fullTip tip) (vizNodeTip vs)} + accumEventVizState now (LeiosEventNode (LabelNode nid (LeiosNodeEvent event blk))) vs = + case blk of + EventIB x -> vs{ibMsgs = accumLeiosMsgs now nid event x vs.ibMsgs} + EventEB x -> + vs + { ebMsgs = accumLeiosMsgs now nid event x vs.ebMsgs + , ibsInRBs = case event of + Generate -> accumIBsInRBs (Right x) vs.ibsInRBs + _ -> vs.ibsInRBs + } + EventVote x -> vs{voteMsgs = accumLeiosMsgs now nid event x vs.voteMsgs} + accumEventVizState now (LeiosEventNode (LabelNode nid (PraosNodeEvent (PraosNodeEventGenerate blk)))) vs = + vs + { vizMsgsAtNodeBuffer = + Map.insertWith (flip (++)) nid [blockHeader blk] (vizMsgsAtNodeBuffer vs) + , vizMsgsAtNodeRecentBuffer = + Map.alter + (Just . recentAdd now . fromMaybe recentEmpty) + nid + (vizMsgsAtNodeRecentBuffer vs) + , vizMsgsAtNodeTotalBuffer = + Map.insertWith (+) nid 1 (vizMsgsAtNodeTotalBuffer vs) + , vizNumMsgsGenerated = vizNumMsgsGenerated vs + 1 + , vizMsgsDiffusionLatency = + assert (not (blockHash blk `Map.member` vizMsgsDiffusionLatency vs)) $ + Map.insert + (blockHash blk) + (blockHeader blk, nid, now, [now]) + (vizMsgsDiffusionLatency vs) + , ibsInRBs = accumIBsInRBs (Left blk) vs.ibsInRBs + } + accumEventVizState now (LeiosEventNode (LabelNode nid (PraosNodeEvent (PraosNodeEventReceived blk)))) vs = + vs + { vizMsgsAtNodeQueue = + Map.insertWith (flip (++)) nid [blockHeader blk] (vizMsgsAtNodeQueue vs) + , vizMsgsAtNodeRecentQueue = + Map.alter + (Just . recentAdd now . fromMaybe recentEmpty) + nid + (vizMsgsAtNodeRecentQueue vs) + , vizMsgsAtNodeTotalQueue = + Map.insertWith (+) nid 1 (vizMsgsAtNodeTotalQueue vs) + } + accumEventVizState now (LeiosEventNode (LabelNode nid (PraosNodeEvent (PraosNodeEventEnterState blk)))) vs = + vs + { vizMsgsAtNodeBuffer = + Map.insertWith (flip (++)) nid [blockHeader blk] (vizMsgsAtNodeBuffer vs) + , vizMsgsAtNodeQueue = + Map.adjust + (filter (\blk' -> blockHash blk' /= blockHash blk)) + nid + (vizMsgsAtNodeQueue vs) + , vizMsgsAtNodeRecentBuffer = + Map.alter + (Just . recentAdd now . fromMaybe recentEmpty) + nid + (vizMsgsAtNodeRecentBuffer vs) + , vizMsgsAtNodeTotalBuffer = + Map.insertWith (+) nid 1 (vizMsgsAtNodeTotalBuffer vs) + , vizMsgsDiffusionLatency = + Map.adjust + ( \(hdr, nid', created, arrivals) -> + (hdr, nid', created, now : arrivals) + ) + (blockHash blk) + (vizMsgsDiffusionLatency vs) + } + accumEventVizState + _now + ( LeiosEventTcp + ( LabelLink + nfrom + nto + (TcpSendMsg msg msgforecast msgforecasts) + ) + ) + vs = + vs + { vizMsgsInTransit = + Map.insertWith + (flip (++)) + (nfrom, nto) + [(msg, msgforecast, msgforecasts)] + (vizMsgsInTransit vs) + } + accumEventVizState _now (LeiosEventNode (LabelNode _nodeId (LeiosNodeEventCPU _task))) vs = vs + accumEventVizState + _now + ( LeiosEventNode + (LabelNode _nodeId (PraosNodeEvent (PraosNodeEventCPU _task))) + ) + vs = vs + + pruneVisState :: + Time -> + LeiosSimVizState -> + LeiosSimVizState + pruneVisState now vs = + vs + { vizMsgsInTransit = + Map.map + ( filter + ( \(_, msgforecast, _) -> + now <= msgAcknowledgement msgforecast + ) + ) + (vizMsgsInTransit vs) + , vizMsgsAtNodeRecentQueue = + Map.map (recentPrune secondsAgo30) (vizMsgsAtNodeRecentQueue vs) + , vizMsgsAtNodeRecentBuffer = + Map.map (recentPrune secondsAgo30) (vizMsgsAtNodeRecentBuffer vs) + , vizMsgsDiffusionLatency = + Map.filter (\(_, _, t, _) -> t >= secondsAgo30) (vizMsgsDiffusionLatency vs) + , ibMsgs = pruneLeiosMsgsState now vs.ibMsgs + , ebMsgs = pruneLeiosMsgsState now vs.ebMsgs + , voteMsgs = pruneLeiosMsgsState now vs.voteMsgs + } + where + secondsAgo30 :: Time + secondsAgo30 = addTime (-30) now + +initMsgs :: LeiosSimVizMsgsState id msg +initMsgs = + LeiosSimVizMsgsState + { msgsAtNodeQueue = Map.empty + , msgsAtNodeBuffer = Map.empty + , msgsAtNodeRecentQueue = Map.empty + , msgsAtNodeRecentBuffer = Map.empty + , msgsAtNodeTotalQueue = Map.empty + , msgsAtNodeTotalBuffer = Map.empty + , numMsgsGenerated = 0 + } + +accumLeiosMsgs :: + (Eq id, HasField "id" msg id) => + Time -> + NodeId -> + BlockEvent -> + msg -> + LeiosSimVizMsgsState id msg -> + LeiosSimVizMsgsState id msg +accumLeiosMsgs now nid Generate blk vs = + vs + { msgsAtNodeBuffer = + Map.insertWith (flip (++)) nid [blk] (msgsAtNodeBuffer vs) + , msgsAtNodeRecentBuffer = + Map.alter + (Just . recentAdd now . fromMaybe recentEmpty) + nid + (msgsAtNodeRecentBuffer vs) + , msgsAtNodeTotalBuffer = + Map.insertWith (+) nid 1 (msgsAtNodeTotalBuffer vs) + , numMsgsGenerated = numMsgsGenerated vs + 1 + } +accumLeiosMsgs now nid Received blk vs = + vs + { msgsAtNodeQueue = + Map.insertWith (flip (++)) nid [blk] (msgsAtNodeQueue vs) + , msgsAtNodeRecentQueue = + Map.alter + (Just . recentAdd now . fromMaybe recentEmpty) + nid + (msgsAtNodeRecentQueue vs) + , msgsAtNodeTotalQueue = + Map.insertWith (+) nid 1 (msgsAtNodeTotalQueue vs) + } +accumLeiosMsgs now nid EnterState blk vs = + vs + { msgsAtNodeBuffer = + Map.insertWith (flip (++)) nid [blk] (msgsAtNodeBuffer vs) + , msgsAtNodeQueue = + Map.adjust + (filter (\blk' -> blk'.id /= blk.id)) + nid + (msgsAtNodeQueue vs) + , msgsAtNodeRecentBuffer = + Map.alter + (Just . recentAdd now . fromMaybe recentEmpty) + nid + (msgsAtNodeRecentBuffer vs) + , msgsAtNodeTotalBuffer = + Map.insertWith (+) nid 1 (msgsAtNodeTotalBuffer vs) + } + +pruneLeiosMsgsState :: + (Eq id, HasField "id" msg id) => + Time -> + LeiosSimVizMsgsState id msg -> + LeiosSimVizMsgsState id msg +pruneLeiosMsgsState now vs = + vs + { msgsAtNodeRecentQueue = + Map.map (recentPrune secondsAgo30) (msgsAtNodeRecentQueue vs) + , msgsAtNodeRecentBuffer = + Map.map (recentPrune secondsAgo30) (msgsAtNodeRecentBuffer vs) + } + where + secondsAgo30 :: Time + secondsAgo30 = addTime (-30) now + +accumIBsInRBs :: Either RankingBlock EndorseBlock -> IBsInRBsState -> IBsInRBsState +accumIBsInRBs (Left rb) s = s{ebsInRBs = Map.insertWith Set.union (blockHash rb) (Set.fromList $ map fst rb.blockBody.endorseBlocks) s.ebsInRBs} +accumIBsInRBs (Right eb) s = s{ibsInEBs = Map.insertWith Set.union eb.id (Set.fromList eb.inputBlocks) s.ibsInEBs} + +-- | The shortest distance between two points, given that the world may be +-- considered to be a cylinder. +-- +-- These points are computed in normalised (unit square) coordinates +linkPoints :: WorldShape -> Point -> Point -> LinkPoints +linkPoints + WorldShape{worldDimensions = (widthSeconds, _), worldIsCylinder} + p1@(Point x1 y1) + p2@(Point x2 y2) + | not worldIsCylinder || d2 < d2' = + LinkPointsNoWrap (Point x1 y1) (Point x2 y2) + | x1 <= x2 = + LinkPointsWrap + (Point x1 y1) + (Point (x2 - widthSeconds) y2) + (Point (x1 + widthSeconds) y1) + (Point x2 y2) + | otherwise = + LinkPointsWrap + (Point x1 y1) + (Point (x2 + widthSeconds) y2) + (Point (x1 - widthSeconds) y1) + (Point x2 y2) + where + (d2, d2') = linkPathLatenciesSquared widthSeconds p1 p2 + +newtype RecentRate = RecentRate (MinQueue Time) + +recentEmpty :: RecentRate +recentEmpty = RecentRate PQ.empty + +recentRate :: RecentRate -> Int +recentRate (RecentRate q) = PQ.size q + +recentAdd :: Time -> RecentRate -> RecentRate +recentAdd t (RecentRate pq) = RecentRate (PQ.insert t pq) + +recentPrune :: Time -> RecentRate -> RecentRate +recentPrune now (RecentRate pq) = + case PQ.minView pq of + Just (t, pq') + | t < now -> recentPrune now (RecentRate pq') + _ -> RecentRate pq + +------------------------------------------------------------------------------ +-- The vizualisation rendering +-- + +data LeiosVizConfig + = LeiosVizConfig + { praosMessageColor :: PraosMessage RankingBlockBody -> (Double, Double, Double) + , praosMessageText :: PraosMessage RankingBlockBody -> Maybe String + , relayIBMessageColor :: RelayIBMessage -> (Double, Double, Double) + , relayIBMessageText :: RelayIBMessage -> Maybe String + , relayEBMessageColor :: RelayEBMessage -> (Double, Double, Double) + , relayEBMessageText :: RelayEBMessage -> Maybe String + , relayVoteMessageColor :: RelayVoteMessage -> (Double, Double, Double) + , relayVoteMessageText :: RelayVoteMessage -> Maybe String + } + +leiosMessageColor :: LeiosVizConfig -> LeiosMessage -> (Double, Double, Double) +leiosMessageColor LeiosVizConfig{..} msg = + case msg of + RelayIB x -> relayIBMessageColor x + RelayEB x -> relayEBMessageColor x + RelayVote x -> relayVoteMessageColor x + PraosMsg x -> praosMessageColor x + +leiosMessageText :: LeiosVizConfig -> LeiosMessage -> Maybe String +leiosMessageText LeiosVizConfig{..} msg = + case msg of + RelayIB x -> relayIBMessageText x + RelayEB x -> relayEBMessageText x + RelayVote x -> relayVoteMessageText x + PraosMsg x -> praosMessageText x + +leiosSimVizRender :: + LeiosVizConfig -> + VizRender LeiosSimVizModel +leiosSimVizRender vizConfig = + VizRender + { renderReqSize = (500, 500) + , renderChanged = \_t _fn _m -> True + , renderModel = \t _fn m sz -> leiosSimVizRenderModel vizConfig t m sz + } + +leiosSimVizRenderModel :: + LeiosVizConfig -> + Time -> + SimVizModel event LeiosSimVizState -> + (Double, Double) -> + Cairo.Render () +leiosSimVizRenderModel + cfg + now + ( SimVizModel + _events + LeiosSimVizState + { vizWorldShape = WorldShape{worldDimensions} + , vizNodePos + , vizNodeTip + , vizNodeLinks + , vizMsgsInTransit + , ibMsgs + } + ) + screenSize = do + renderLinks + renderMessagesAtNodes ibMsgs + renderNodes + where + renderNodes = do + Cairo.save + Cairo.setLineWidth 3 + sequence_ + [ do + Cairo.arc x y 25 0 (pi * 2) + Cairo.setSourceRGB r b g + Cairo.fillPreserve + Cairo.setSourceRGB 0 0 0 + Cairo.stroke + | (node, pos) <- Map.toList vizNodePos + , let (Point x y) = simPointToPixel worldDimensions screenSize pos + , let (r, b, g) = case Map.lookup node vizNodeTip of + Just (FullTip hdr) -> blockHeaderColorAsBody hdr + _ -> (0.7, 0.7, 0.7) + ] + Cairo.restore + + renderLinks = do + Cairo.save + Cairo.setLineCap Cairo.LineCapButt + Cairo.setLineWidth 3 + sequence_ + [ do + Cairo.save + renderPathRoundedRect fromPos toPos 20 + Cairo.setSourceRGB 0.9 0.9 0.9 + Cairo.fillPreserve + Cairo.clip + Cairo.newPath + -- draw all the messages within the clipping region of the link + renderMessagesInFlight + ( TcpSimVizConfig $ leiosMessageColor cfg + ) + now + fromPos + toPos + msgs + Cairo.restore + -- the draw the link border on top (without clipping) + renderPathRoundedRect fromPos toPos 20 + Cairo.setSourceRGB 0 0 0 + Cairo.stroke + | (fromPos, toPos, msgs) <- linksAndMsgs + ] + -- draw the message labels on top of the links + sequence_ + [ renderMessageLabelsInFlight fromPos toPos msgs + | (fromPos, toPos, msgs) <- linksAndMsgs + ] + Cairo.restore + where + linksAndMsgs = + [ (fromPos, toPos, msgs) + | (fromNode, toNode) <- Map.keys vizNodeLinks + , let (fromPos, toPos) = + translateLineNormal + displace + ( simPointToPixel worldDimensions screenSize (vizNodePos Map.! fromNode) + , simPointToPixel worldDimensions screenSize (vizNodePos Map.! toNode) + ) + -- For links in both directions, we need to displace them + -- so they don't overlap each other, but for unidirectional + -- links we can draw it centrally. + displace + | Map.notMember (toNode, fromNode) vizNodeLinks = 0 + | otherwise = -10 + + msgs = + Map.findWithDefault + [] + (fromNode, toNode) + vizMsgsInTransit + ] + + renderMessageLabelsInFlight fromPos toPos msgs = do + Cairo.save + Cairo.setSourceRGB 0.5 0.5 0.5 + Cairo.setFontSize 10 + Cairo.setLineWidth 0.6 + -- draw lines from labels to messages + sequence_ + [ do + withPoint Cairo.moveTo (labelsOrigin `addP` Vector 0 (n * 10)) + withPoint Cairo.lineTo msgTrailingEdge + | ((_msgLabel, msgforecast), n) <- zip msgLabels [0 ..] + , let (msgTrailingEdge, _msgLeadingEdge) = + lineMessageInFlight now fromPos toPos msgforecast + ] + Cairo.stroke + -- draw the labels themselves + Cairo.setSourceRGB 0 0 0 + sequence_ + [ do + withPoint Cairo.moveTo (labelsOrigin `addP` Vector 0 (n * 10)) + Cairo.showText msgLabel + Cairo.newPath + | ((msgLabel, _), n) <- zip msgLabels [0 ..] + ] + Cairo.restore + where + labelsOrigin = midpointP fromPos toPos `addP` labelsOffset + labelsOffset = scaleV (-50) $ unitV $ normalV $ vector fromPos toPos + msgLabels = + [ (msgLabel, msgforecast) + | (msg, msgforecast, _) <- msgs + , now <= msgRecvTrailingEdge msgforecast + , Just msgLabel <- [leiosMessageText cfg msg] + ] + + renderMessagesAtNodes LeiosSimVizMsgsState{..} = do + Cairo.save + sequence_ + [ do + Cairo.setSourceRGB r g b + Cairo.arc (x - 10) y' 10 0 (2 * pi) + Cairo.fillPreserve + Cairo.setSourceRGB 0 0 0 + Cairo.setLineWidth 1 + Cairo.stroke + case nodeMessageText msg of + Nothing -> return () + Just txt -> do + Cairo.moveTo (x - 32) (y' + 5) + Cairo.showText txt + Cairo.newPath + | (node, msgs) <- Map.toList msgsAtNodeQueue + , (n, msg) <- zip [1 ..] msgs + , let (Point x y) = + simPointToPixel + worldDimensions + screenSize + (vizNodePos Map.! node) + y' = y + 16 + 20 * n + (r, g, b) = nodeMessageColor msg + ] + sequence_ + [ do + Cairo.setSourceRGB r g b + Cairo.arc (x + 10) y' 10 0 (2 * pi) + Cairo.fillPreserve + Cairo.setSourceRGB 0 0 0 + Cairo.setLineWidth 1 + Cairo.stroke + case nodeMessageText msg of + Nothing -> return () + Just txt -> do + Cairo.moveTo (x + 22) (y' + 5) + Cairo.showText txt + Cairo.newPath + | (node, msgs) <- Map.toList msgsAtNodeBuffer + , (n, msg) <- zip [1 ..] msgs + , let (Point x y) = + simPointToPixel + worldDimensions + screenSize + (vizNodePos Map.! node) + y' = y + 16 + 20 * n + (r, g, b) = nodeMessageColor msg + ] + Cairo.restore + where + nodeMessageText msg = Just $ show $ (\(InputBlockId (NodeId x) y) -> (x, y)) $ msg.id + nodeMessageColor msg = hashToColor $ hash msg.id diff --git a/simulation/src/LeiosProtocol/Short/VizSimP2P.hs b/simulation/src/LeiosProtocol/Short/VizSimP2P.hs new file mode 100644 index 00000000..c357ddc0 --- /dev/null +++ b/simulation/src/LeiosProtocol/Short/VizSimP2P.hs @@ -0,0 +1,714 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# HLINT ignore "Use const" #-} +{-# LANGUAGE TupleSections #-} +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE ViewPatterns #-} +{-# OPTIONS_GHC -Wno-unrecognised-pragmas #-} + +module LeiosProtocol.Short.VizSimP2P where + +import Data.Array.Unboxed (Ix, UArray, accumArray, (!)) +import qualified Data.Colour.SRGB as Colour +import qualified Data.Map.Strict as Map +import Data.Maybe (catMaybes, maybeToList) +import qualified Diagrams.Backend.Cairo as Dia +import qualified Diagrams.Backend.Cairo.Internal as Dia +import qualified Diagrams.Core as Dia +import qualified Diagrams.TwoD as Dia +import qualified Graphics.Rendering.Cairo as Cairo +import qualified Graphics.Rendering.Chart.Easy as Chart + +import ChanDriver +import Data.Bifunctor (Bifunctor (bimap), second) +import Data.Hashable (hash) +import Data.List (foldl', intercalate, sortOn) +import Data.Monoid +import Diagrams ((#)) +import qualified Diagrams.Prelude as Dia +import qualified Diagrams.TwoD.Adjust as Dia +import LeiosProtocol.Common hiding (Point) +import LeiosProtocol.Relay +import LeiosProtocol.Short +import LeiosProtocol.Short.Node +import LeiosProtocol.Short.SimP2P (exampleTrace2) +import LeiosProtocol.Short.VizSim ( + IBsInRBsReport (..), + LeiosSimVizModel, + LeiosSimVizMsgsState (..), + LeiosSimVizState (..), + LeiosVizConfig (praosMessageColor), + LinkPoints (..), + examplesLeiosSimVizConfig, + leiosSimVizModel, + recentRate, + totalIBsInRBs, + ) +import Linear.V2 +import ModelTCP (TcpMsgForecast (..)) +import Network.TypedProtocol +import P2P +import PraosProtocol.BlockFetch (Message (..)) +import PraosProtocol.Common (BlockHeader, FullTip (FullTip), blockHeaderColorAsBody) +import PraosProtocol.PraosNode (PraosMessage (..)) +import SimTypes (Point (..), WorldShape (..)) +import System.Random (uniformR) +import System.Random.Stateful (mkStdGen) +import Text.Printf (printf) +import Viz +import VizChart +import VizSim +import VizSimTCP (lineMessageInFlight) +import VizUtils + +type CairoDiagram = Dia.QDiagram Dia.Cairo V2 Double Any +renderDiagramAt :: (Double, Double) -> (Double, Double) -> CairoDiagram -> Cairo.Render () +-- the reflection here is fishy, but otherwise text and shapes are upside down. +renderDiagramAt (w, h) pos (Dia.reflectY -> d0) = do + Cairo.save + let sizesp = Dia.mkSizeSpec2D (Just w) (Just h) + let opts = Dia.CairoOptions "" sizesp Dia.RenderOnly True + let space = Dia.lc Dia.blue $ Dia.rect w h :: CairoDiagram + let (opts', t, _) = Dia.adjustDia2D Dia.cairoSizeSpec Dia.Cairo opts space + let pos' = Dia.inv t `Dia.papply` Dia.p2 pos + let d = Dia.position [(pos', d0)] + snd (Dia.renderDia Dia.Cairo opts' $ d # Dia.transform t) + Cairo.restore + +messageDiagram :: (MsgTag, Dia.Colour Double) -> CairoDiagram +messageDiagram (tag, c) = Dia.fc c $ + Dia.lc c $ + case tag of + RB -> Dia.square 16 + IB -> sizedAs $ Dia.triangle 16 + EB -> sizedAs $ Dia.hexagon 16 + VT -> sizedAs $ Dia.strokePath $ Dia.star (Dia.StarSkip 2) (Dia.regPoly 5 16) + where + sizedAs d = Dia.sizedAs (Dia.square 18 :: CairoDiagram) d + +messageLegend :: CairoDiagram +messageLegend = + Dia.fontSizeO 20 $ + Dia.lc Dia.black $ + Dia.hcat + [Dia.hcat [messageDiagram (tag, Dia.black), textBox s] | (s, tag) <- [("RB", RB), ("IB", IB), ("EB", EB), ("Vote", VT)]] + where + textBox s = Dia.alignedText 0.7 0.5 s `Dia.atop` Dia.phantom (Dia.rect (fromIntegral $ length s * 20 + 10) 20 :: CairoDiagram) + +------------------------------------------------------------------------------ +-- The vizualisation rendering +-- +data MsgTag = RB | IB | EB | VT + +data LeiosP2PSimVizConfig + = LeiosP2PSimVizConfig + { nodeMessageColor :: BlockHeader -> (Double, Double, Double) + , ptclMessageColor :: LeiosMessage -> Maybe (MsgTag, Dia.Colour Double) + } + +leiosP2PSimVizRender :: + LeiosP2PSimVizConfig -> + VizRender LeiosSimVizModel +leiosP2PSimVizRender vizConfig = + VizRender + { renderReqSize = (500, 500) -- nominal, should be overridden + , renderChanged = \_t _fn _m -> True + , renderModel = \t _fn m sz -> + leiosP2PSimVizRenderModel vizConfig t m sz + } + +-- TODO: should be a table? +leiosGenCountRender :: VizRender LeiosSimVizModel +leiosGenCountRender = + VizRender + { renderReqSize = (400, 24) -- A little taller than font to avoid clipping + , renderChanged = \_t _fn _m -> True + , renderModel = \t _fn m sz -> + leiosP2PSimVizRenderGenCount t m sz + } + where + leiosP2PSimVizRenderGenCount :: + Time -> + SimVizModel event LeiosSimVizState -> + (Double, Double) -> + Cairo.Render () + leiosP2PSimVizRenderGenCount (Time t) (SimVizModel _events st) _screenSize = do + Cairo.moveTo 5 20 + Cairo.setFontSize 20 + Cairo.setSourceRGB 0 0 0 + let perSec n = fromIntegral (n :: Int) / realToFrac t :: Double + let rbs = st.vizNumMsgsGenerated + let ibs = st.ibMsgs.numMsgsGenerated + let ebs = st.ebMsgs.numMsgsGenerated + let votes = st.voteMsgs.numMsgsGenerated + let IBsInRBsReport{..} = totalIBsInRBs st.ibsInRBs + Cairo.showText $ + intercalate + "; " + [ "Blocks generated: " + ++ intercalate + ", " + [ printf "%s: %i (%.2f %s/s)" lbl n (perSec n) lbl + | (n, lbl) <- [(rbs, "RB"), (ibs, "IB"), (ebs, "EB"), (votes, "Vote")] + ] + , printf "IBs in RBs: %i (%i%%)" ibsInRBsNum ((ibsInRBsNum * 100) `div` ibs) + , printf "IBs in EBs: %i (%i%%)" ibsInEBsNum ((ibsInEBsNum * 100) `div` ibs) + , printf "EBs in RBs: %i (%i%%)" ebsInRBsNum ((ebsInRBsNum * 100) `div` ibs) + ] + +leiosP2PSimVizRenderModel :: + LeiosP2PSimVizConfig -> + Time -> + SimVizModel event LeiosSimVizState -> + (Double, Double) -> + Cairo.Render () +leiosP2PSimVizRenderModel + LeiosP2PSimVizConfig + { nodeMessageColor + , ptclMessageColor + } + now + ( SimVizModel + _events + LeiosSimVizState + { vizWorldShape = WorldShape{worldDimensions} + , vizNodePos + , vizNodeLinks + , vizNodeTip + , vizMsgsInTransit + } + ) + screenSize = do + renderDiagramAt screenSize (20, 22) messageLegend + renderLinks + renderNodes + where + renderNodes = do + Cairo.save + Cairo.setFontSize 10 + sequence_ + [ do + Cairo.arc x y 10 0 (pi * 2) + Cairo.setSourceRGB r g b + Cairo.fillPreserve + Cairo.setSourceRGB 0 0 0 + Cairo.stroke + {- + -- Label with message counts, processing and buffer + let qlabel = show nqmsgs ++ "," ++ show rqmsgs ++ "," ++ show tqmsgs + Cairo.moveTo (x-6) (y-5) + Cairo.setSourceRGB 1 1 1 -- white backdrop text for readabilty + Cairo.showText qlabel -- on dark backgrounds + Cairo.moveTo (x-7) (y-4) + Cairo.setSourceRGB 0 0 0 + Cairo.showText qlabel + + let blabel = show nbmsgs ++ "," ++ show rbmsgs ++ "," ++ show tbmsgs + Cairo.moveTo (x-6) (y+10) + Cairo.setSourceRGB 1 1 1 -- white backdrop text for readabilty + Cairo.showText blabel -- on dark backgrounds + Cairo.moveTo (x-7) (y+9) + Cairo.setSourceRGB 0 0 0 + Cairo.showText blabel + -} + Cairo.newPath + | (node, pos) <- Map.toList vizNodePos + , let Point x y = toScreenPoint pos + -- qmsgs = fromMaybe [] (Map.lookup node vizMsgsAtNodeQueue) + -- bmsgs = fromMaybe [] (Map.lookup node vizMsgsAtNodeBuffer) + -- nqmsgs = length qmsgs + -- nbmsgs = length bmsgs + (r, g, b) = case Map.lookup node vizNodeTip of + Just (FullTip hdr) -> nodeMessageColor hdr + _ -> (0.7, 0.7, 0.7) + -- rqmsgs = maybe 0 recentRate (Map.lookup node vizMsgsAtNodeRecentQueue) + -- rbmsgs = maybe 0 recentRate (Map.lookup node vizMsgsAtNodeRecentBuffer) + -- tqmsgs = fromMaybe 0 (Map.lookup node vizMsgsAtNodeTotalQueue) + -- tbmsgs = fromMaybe 0 (Map.lookup node vizMsgsAtNodeTotalBuffer) + ] + Cairo.restore + + renderLinks = do + Cairo.save + Cairo.setLineCap Cairo.LineCapButt + Cairo.setLineWidth 1 + Cairo.setSourceRGB 0.4 0.4 0.4 + -- draw all the lines + Cairo.save + sequence_ + [ case classifyInFlightMsgs msgs of + -- We don't even draw links that are inactive + MsgsInFlightNone -> return () + -- Similarly, all links will have boring control messages + -- it'd be too much details + MsgsInFlightControl -> return () + -- We draw with a dotted line + MsgsInFlightNonBallistic -> + case catMaybes [snd <$> ptclMessageColor msg | (msg, _, _) <- msgs] of + [] -> return () + ((toSRGB -> (r, g, b)) : _) -> do + Cairo.setSourceRGB r g b + Cairo.setLineWidth 1 + Cairo.setDash [10, 5] 0 + case vizNodeLinks Map.! (fromNode, toNode) of + LinkPointsNoWrap fromPos toPos -> do + withPoint Cairo.moveTo (toScreenPoint fromPos) + withPoint Cairo.lineTo (toScreenPoint toPos) + Cairo.stroke + LinkPointsWrap fromPos toPos fromPos' toPos' -> do + withPoint Cairo.moveTo (toScreenPoint fromPos) + withPoint Cairo.lineTo (toScreenPoint toPos) + Cairo.stroke + withPoint Cairo.moveTo (toScreenPoint fromPos') + withPoint Cairo.lineTo (toScreenPoint toPos') + Cairo.stroke + + -- We draw with a full line + MsgsInFlightBallistic -> + case catMaybes [snd <$> ptclMessageColor msg | (msg, _, _) <- msgs] of + [] -> return () + ((toSRGB -> (r, g, b)) : _) -> do + Cairo.setSourceRGB r g b + Cairo.setDash [] 0 + Cairo.setLineWidth 2 + case vizNodeLinks Map.! (fromNode, toNode) of + LinkPointsNoWrap fromPos toPos -> do + withPoint Cairo.moveTo (toScreenPoint fromPos) + withPoint Cairo.lineTo (toScreenPoint toPos) + Cairo.stroke + LinkPointsWrap fromPos toPos fromPos' toPos' -> do + withPoint Cairo.moveTo (toScreenPoint fromPos) + withPoint Cairo.lineTo (toScreenPoint toPos) + Cairo.stroke + withPoint Cairo.moveTo (toScreenPoint fromPos') + withPoint Cairo.lineTo (toScreenPoint toPos') + Cairo.stroke + | ((fromNode, toNode), msgs) <- Map.toList vizMsgsInTransit + ] + Cairo.restore + -- draw the messages in flight on top + sequence_ + [ case vizNodeLinks Map.! (fromNode, toNode) of + LinkPointsNoWrap fromPos toPos -> do + let (msgTrailingEdge, _msgLeadingEdge) = + lineMessageInFlight now fromPos toPos msgforecast + Point x y = toScreenPoint msgTrailingEdge + renderDiagramAt screenSize (x, y) $ messageDiagram msgViz + LinkPointsWrap fromPos toPos fromPos' toPos' -> do + let (msgTrailingEdge, _msgLeadingEdge) = + lineMessageInFlight now fromPos toPos msgforecast + Point x y = toScreenPoint msgTrailingEdge + renderDiagramAt screenSize (x, y) $ messageDiagram msgViz + let (msgTrailingEdge', _msgLeadingEdge) = + lineMessageInFlight now fromPos' toPos' msgforecast + Point x' y' = toScreenPoint msgTrailingEdge' + renderDiagramAt screenSize (x', y') $ messageDiagram msgViz + | ((fromNode, toNode), msgs) <- Map.toList vizMsgsInTransit + , (msg, msgforecast, _msgforecasts) <- msgs + , now >= msgSendTrailingEdge msgforecast + , now <= msgRecvTrailingEdge msgforecast + , msgViz <- maybeToList (ptclMessageColor msg) + ] + Cairo.restore + + toScreenPoint = simPointToPixel worldDimensions screenSize + +data MsgsInFlightClassification + = MsgsInFlightNone + | MsgsInFlightControl + | MsgsInFlightNonBallistic + | MsgsInFlightBallistic + deriving (Eq, Ord, Enum, Bounded, Ix) + +classifyInFlightMsgs :: + [(LeiosMessage, TcpMsgForecast, [TcpMsgForecast])] -> + MsgsInFlightClassification +classifyInFlightMsgs = classifyInFlightMsgs' isLeiosMessageControl +classifyInFlightMsgs' :: + (msg -> Bool) -> + [(msg, TcpMsgForecast, [TcpMsgForecast])] -> + MsgsInFlightClassification +classifyInFlightMsgs' _isControl [] = MsgsInFlightNone +classifyInFlightMsgs' isControl msgs + | all control msgs = MsgsInFlightControl + | all ballistic msgs = MsgsInFlightBallistic + | otherwise = MsgsInFlightNonBallistic + where + -- We rely on contiguous forecast fragments having been merged, + -- see mergeAdjacentForecasts + ballistic (_msg, _msgforecast, _msgforecasts@[_]) = True + ballistic _ = False + + -- In leios some msgs (empty rbs (before first pipeline ends), ebs, + -- and votes) might be small, but shouldn't be considered control. + -- We take an extra predicate. + control (msg, _msgforecast, _msgforecasts) = isControl msg + +------------------------------------------------------------------------------ +-- The charts +-- + +diffusionLatencyPerStakeFraction :: Int -> Time -> [Time] -> [(DiffTime, Double)] +diffusionLatencyPerStakeFraction nnodes created arrivals = + [ (latency, percent) + | (arrival, n) <- zip (reverse arrivals) [1 :: Int ..] + , let !latency = arrival `diffTime` created + !percent = + (fromIntegral n / fromIntegral nnodes) + ] + +chartDiffusionLatency :: + LeiosP2PSimVizConfig -> + VizRender LeiosSimVizModel +chartDiffusionLatency LeiosP2PSimVizConfig{nodeMessageColor} = + chartVizRender 25 $ + \_ + _ + ( SimVizModel + _ + LeiosSimVizState + { vizNodePos + , vizMsgsDiffusionLatency + } + ) -> + (Chart.def :: Chart.Layout DiffTime Chart.Percent) + { Chart._layout_title = "Diffusion latency" + , Chart._layout_title_style = Chart.def{Chart._font_size = 15} + , Chart._layout_y_axis = + (Chart.def :: Chart.LayoutAxis Chart.Percent) + { Chart._laxis_generate = + Chart.scaledAxis Chart.def{Chart._la_nLabels = 10} (0, 1) + , Chart._laxis_title = "Stake fraction reached" + } + , Chart._layout_x_axis = + Chart.def + { Chart._laxis_title = "Time (seconds)" + } + , Chart._layout_plots = + [ Chart.toPlot $ + Chart.def + { Chart._plot_lines_values = [timeseries] + , Chart._plot_lines_style = + let (r, g, b) = nodeMessageColor blk + in Chart.def + { Chart._line_color = Chart.opaque (Colour.sRGB r g b) + } + } + | let nnodes = Map.size vizNodePos + , (blk, _nid, created, arrivals) <- Map.elems vizMsgsDiffusionLatency + , let timeseries = + map (second Chart.Percent) $ + diffusionLatencyPerStakeFraction nnodes created arrivals + ] + } + +chartDiffusionImperfection :: + P2PTopography -> + DiffTime -> + DiffTime -> + LeiosP2PSimVizConfig -> + VizRender LeiosSimVizModel +chartDiffusionImperfection + p2ptopography + processingDelay + serialisationDelay + LeiosP2PSimVizConfig{nodeMessageColor} + | Map.size (p2pNodes p2ptopography) > 100 = + nullVizRender + | otherwise = + chartVizRender 25 $ + \_ + _ + (SimVizModel _ LeiosSimVizState{vizMsgsDiffusionLatency}) -> + (Chart.def :: Chart.Layout DiffTime DiffTime) + { Chart._layout_title = "Diffusion latency imperfection" + , Chart._layout_title_style = Chart.def{Chart._font_size = 15} + , Chart._layout_y_axis = + Chart.def + { Chart._laxis_title = "Time behind perfect (seconds)" + } + , Chart._layout_x_axis = + Chart.def + { Chart._laxis_title = "Time (seconds)" + } + , Chart._layout_plots = + [ Chart.toPlot $ + Chart.def + { Chart._plot_lines_values = [timeseries] + , Chart._plot_lines_style = + let (r, g, b) = nodeMessageColor blk + in Chart.def + { Chart._line_color = Chart.opaque (Colour.sRGB r g b) + } + } + | (blk, nid, created, arrivals) <- Map.elems vizMsgsDiffusionLatency + , let timeseries = + [ (latencyActual, imperfection) + | (arrivalActual, latencyIdeal) <- + zip + (reverse arrivals) + ( p2pGraphIdealDiffusionTimesFromNode + idealDiffusionTimes + nid + ) + , let !latencyActual = arrivalActual `diffTime` created + !imperfection = latencyActual - latencyIdeal + ] + ] + } + where + idealDiffusionTimes :: P2PIdealDiffusionTimes + idealDiffusionTimes = + p2pGraphIdealDiffusionTimes + p2ptopography + (\_ -> processingDelay) + (\_ _ linkLatency -> 3 * linkLatency + serialisationDelay) + +chartBandwidth :: VizRender LeiosSimVizModel +chartBandwidth = + chartVizRender 25 $ + \_ + _ + ( SimVizModel + _ + LeiosSimVizState + { vizMsgsAtNodeRecentQueue + , vizMsgsAtNodeRecentBuffer + } + ) -> + (Chart.def :: Chart.Layout Double Double) + { Chart._layout_title = "Distribution of block frequency" + , Chart._layout_title_style = Chart.def{Chart._font_size = 15} + , Chart._layout_x_axis = + Chart.def + { Chart._laxis_generate = + Chart.scaledAxis Chart.def{Chart._la_nLabels = maxX} (0, maxX) + , Chart._laxis_title = "Count of events within last 30 seconds" + } + , Chart._layout_y_axis = + Chart.def + { Chart._laxis_generate = + Chart.scaledAxis Chart.def{Chart._la_nLabels = 4} (0, 0.35) + , Chart._laxis_title = "Number of nodes in each bin (normalised)" + } + , Chart._layout_plots = + [ bandwidthHistPlot + "CPU (block validation completion)" + Chart.red + ( map + ((fromIntegral :: Int -> Double) . recentRate) + (Map.elems vizMsgsAtNodeRecentBuffer) + ) + | not (Map.null vizMsgsAtNodeRecentBuffer) + ] + ++ [ bandwidthHistPlot + "Network (block arrival)" + Chart.blue + ( map + ((fromIntegral :: Int -> Double) . recentRate) + (Map.elems vizMsgsAtNodeRecentQueue) + ) + | not (Map.null vizMsgsAtNodeRecentQueue) + ] + } + where + maxX :: Num a => a + maxX = 15 + + bandwidthHistPlot title color values = + Chart.histToPlot $ + Chart.defaultNormedPlotHist + { Chart._plot_hist_title = title + , Chart._plot_hist_values = values + , Chart._plot_hist_range = Just (0, maxX) + , Chart._plot_hist_bins = maxX + , Chart._plot_hist_fill_style = + Chart.def + { Chart._fill_color = + Chart.withOpacity color 0.4 + } + , Chart._plot_hist_line_style = + Chart.def + { Chart._line_color = + Chart.opaque color + } + } + +chartLinkUtilisation :: VizRender LeiosSimVizModel +chartLinkUtilisation = + chartVizRender 25 $ + \_ + _ + ( SimVizModel + _ + LeiosSimVizState + { vizMsgsInTransit + } + ) -> + let counts :: UArray MsgsInFlightClassification Int + counts = + accumArray + (\i () -> i + 1) + 0 + (minBound, maxBound) + $ [ (classifyInFlightMsgs msgs, ()) + | msgs <- Map.elems vizMsgsInTransit + ] + in (Chart.def :: Chart.PieLayout) + { Chart._pie_title = "TCP link usage" + , Chart._pie_title_style = Chart.def{Chart._font_size = 15} + , Chart._pie_plot = + Chart.def + { Chart._pie_data = + [ let v = counts ! MsgsInFlightNone + in Chart.def + { Chart._pitem_label = "Idle (" ++ show v ++ ")" + , Chart._pitem_value = fromIntegral v + } + , let v = counts ! MsgsInFlightControl + in Chart.def + { Chart._pitem_label = "Control (" ++ show v ++ ")" + , Chart._pitem_value = fromIntegral v + } + , let v = counts ! MsgsInFlightNonBallistic + in Chart.def + { Chart._pitem_label = "Non-ballistic (" ++ show v ++ ")" + , Chart._pitem_value = fromIntegral v + } + , let v = counts ! MsgsInFlightBallistic + in Chart.def + { Chart._pitem_label = "Ballistic (" ++ show v ++ ")" + , Chart._pitem_value = fromIntegral v + } + ] + , Chart._pie_colors = + [ lightBlueShade 0.9 + , lightBlueShade 0.7 + , lightBlueShade 0.3 + , lightBlueShade 0 + ] + } + } + where + lightBlueShade :: Double -> Chart.AlphaColour Double + lightBlueShade x = + Chart.withOpacity Chart.white x + `Chart.atop` Chart.opaque Chart.blue + +isLeiosMessageControl :: LeiosMessage -> Bool +isLeiosMessageControl msg0 = + case msg0 of + PraosMsg msg -> + case msg of + PraosMessage (Right (ProtocolMessage (SomeMessage MsgBlock{}))) -> False + _ -> True + RelayIB msg -> isRelayMessageControl msg + RelayEB msg -> isRelayMessageControl msg + RelayVote msg -> isRelayMessageControl msg + +isRelayMessageControl :: RelayMessage id header body -> Bool +isRelayMessageControl (ProtocolMessage (SomeMessage msg)) = case msg of + MsgRespondBodies _bodies -> False + _otherwise -> True + +-- | takes stage length, assumes pipelines start at Slot 0. +defaultVizConfig :: Int -> LeiosP2PSimVizConfig +defaultVizConfig stageLength = + LeiosP2PSimVizConfig + { nodeMessageColor = testNodeMessageColor + , ptclMessageColor = testPtclMessageColor + } + where + testPtclMessageColor :: + LeiosMessage -> + Maybe (MsgTag, Dia.Colour Double) + testPtclMessageColor msg0 = + case msg0 of + PraosMsg msg -> + (RB,) <$> case msg of + PraosMessage (Right (ProtocolMessage (SomeMessage MsgBlock{}))) -> do + let (r, g, b) = praosMessageColor examplesLeiosSimVizConfig msg + Just $ Dia.sRGB r g b + _ -> Nothing + RelayIB msg -> (IB,) <$> relayMessageColor (pipelineColor Propose . bimap hash (.slot)) msg + RelayEB msg -> (EB,) <$> relayMessageColor (pipelineColor Endorse . bimap hash (.slot)) msg + RelayVote msg -> (VT,) <$> relayMessageColor (pipelineColor Vote . bimap hash (.slot)) msg + relayMessageColor :: ((id, body) -> Dia.Colour Double) -> RelayMessage id header body -> Maybe (Dia.Colour Double) + relayMessageColor f (ProtocolMessage (SomeMessage msg)) = case msg of + MsgRespondBodies bodies -> Just $ blendColors $ map f bodies + _otherwise -> Nothing + testNodeMessageColor :: BlockHeader -> (Double, Double, Double) + testNodeMessageColor = blockHeaderColorAsBody + -- alternating cold and warm colours for visual contrast. + palettes = + map snd $ + sortOn fst $ + zip [0 :: Int, 2 ..] [Dia.orangered, Dia.red, Dia.magenta, Dia.plum] + ++ zip [1, 3 ..] [Dia.blue, Dia.cyan, Dia.lime, Dia.yellow] + palettes_num = length palettes + paletteColor p seed = Dia.blend f Dia.white c + where + -- TODO?: better palettes than gradients on a color + c = palettes !! p + f = fst $ uniformR (0, 0.5) seed + pipelineColor :: Stage -> (Int, SlotNo) -> Dia.Colour Double + pipelineColor slotStage (i, slot) = case stageRange' stageLength slotStage slot Propose of + Just (fromEnum -> startOfPipeline, _) -> + let + -- every `stageLength` a new pipeline begins + pipeline_idx = startOfPipeline `div` stageLength + -- There are at most |stages| active pipelines at once, + -- however we use a few more palettes to avoid reusing the + -- same color too soon in time. + palette_idx = pipeline_idx `mod` palettes_num + in + paletteColor palette_idx (mkStdGen i) + Nothing -> Dia.black + +-- might be ugly blending, but in practice it's going to be singleton lists? +blendColors :: [Dia.Colour Double] -> Dia.Colour Double +blendColors [x] = x +blendColors [] = Dia.black +blendColors (x : xs) = foldl' (Dia.blend 0.5) x xs + +toSRGB :: Dia.Colour Double -> (Double, Double, Double) +toSRGB (Dia.toSRGB -> Dia.RGB r g b) = (r, g, b) + +example2 :: Visualization +example2 = + slowmoVisualization 0.5 $ + Viz (leiosSimVizModel exampleTrace2) $ + LayoutAbove + [ LayoutBeside [layoutLabelTime, Layout leiosGenCountRender] + , LayoutBeside + [ LayoutReqSize 1200 1000 $ + Layout $ + leiosP2PSimVizRender config + , LayoutBeside + [ LayoutAbove + [ LayoutReqSize 350 300 $ + Layout $ + chartDiffusionLatency config + -- , LayoutReqSize 350 300 $ + -- Layout $ + -- chartDiffusionImperfection + -- p2pTopography + -- 0.1 + -- (96 / 1000) + -- config + ] + , LayoutAbove + [ LayoutReqSize 350 300 $ + Layout chartBandwidth + , LayoutReqSize 350 300 $ + Layout chartLinkUtilisation + ] + ] + ] + ] + where + config = defaultVizConfig 5 diff --git a/simulation/src/LeiosProtocol/SimTestRelay.hs b/simulation/src/LeiosProtocol/SimTestRelay.hs index d4f84a68..f3699e3a 100644 --- a/simulation/src/LeiosProtocol/SimTestRelay.hs +++ b/simulation/src/LeiosProtocol/SimTestRelay.hs @@ -64,6 +64,7 @@ import SimTypes import TimeCompat (threadDelayNDT, threadDelaySI) import ChanMux +import Control.Category ((>>>)) import Control.Exception (assert) import Data.Foldable (forM_) import Data.List (sortOn) @@ -163,6 +164,8 @@ relayNode let relayConsumerConfig = RelayConsumerConfig { relay = relayConfig + , headerValidationDelay = const 0.1 + , threadDelayParallel = sum >>> \d -> if d >= 0 then threadDelaySI d else return () , headerId = testHeaderId , prioritize = sortOn (Down . testHeaderExpiry) . Map.elems , submitPolicy = SubmitAll diff --git a/simulation/src/Main.hs b/simulation/src/Main.hs index a8272081..39e26e12 100644 --- a/simulation/src/Main.hs +++ b/simulation/src/Main.hs @@ -9,6 +9,8 @@ import Data.Maybe (fromMaybe) import qualified ExamplesRelay import qualified ExamplesRelayP2P import qualified ExamplesTCP +import qualified LeiosProtocol.Short.VizSim as VizShortLeios +import qualified LeiosProtocol.Short.VizSimP2P as VizShortLeiosP2P import qualified LeiosProtocol.VizSimTestRelay as VizSimTestRelay import Options.Applicative import qualified PraosProtocol.ExamplesPraosP2P as VizPraosP2P @@ -135,6 +137,8 @@ data VizSubCommand | VizRelayTest1 | VizRelayTest2 | VizRelayTest3 + | VizShortLeios1 + | VizShortLeiosP2P1 parserVizSubCommand :: Parser VizSubCommand parserVizSubCommand = @@ -176,6 +180,12 @@ parserVizSubCommand = progDesc "" , command "relay-test-3" . info (pure VizRelayTest3) $ progDesc "" + , command "short-leios-1" . info (pure VizShortLeios1) $ + progDesc + "A simulation of two nodes running Short Leios." + , command "short-leios-p2p-1" . info (pure VizShortLeiosP2P1) $ + progDesc + "A simulation of 100 nodes running Short Leios." ] parserPraosP2P1 :: Parser VizSubCommand @@ -224,6 +234,8 @@ vizOptionsToViz VizCommandWithOptions{..} = case vizSubCommand of VizRelayTest1 -> pure VizSimTestRelay.example1 VizRelayTest2 -> pure VizSimTestRelay.example2 VizRelayTest3 -> pure VizSimTestRelay.example3 + VizShortLeios1 -> pure VizShortLeios.example1 + VizShortLeiosP2P1 -> pure VizShortLeiosP2P.example2 type VizSize = (Int, Int) @@ -268,7 +280,7 @@ parserSimOptions = SimOptions <$> parserSimCommand <*> option - (Time <$> auto) + (Time . fromIntegral @Int <$> auto) ( long "output-seconds" <> metavar "SECONDS" <> help "Output N seconds of simulation." diff --git a/simulation/src/P2P.hs b/simulation/src/P2P.hs index 9d83f909..c9029b9a 100644 --- a/simulation/src/P2P.hs +++ b/simulation/src/P2P.hs @@ -5,6 +5,7 @@ module P2P where +import Control.Exception (assert) import Control.Monad (when) import Control.Monad.Class.MonadTime.SI (DiffTime) import Control.Monad.ST (ST) @@ -121,16 +122,17 @@ genArbitraryP2PTopography | (nid, rng) <- zip nodes (unfoldr (Just . Random.split) rngLinks) , let p = nodePositions Map.! nid , nid' <- - pickNodeLinksClose p + pickNodeLinksClose nid p ++ pickNodeLinksRandom nid rng , let p' = nodePositions Map.! nid' !latency = linkLatency p p' ] - pickNodeLinksClose :: Point -> [NodeId] - pickNodeLinksClose = - map snd - . KdMap.kNearest nodesKdMap p2pNodeLinksClose + pickNodeLinksClose :: NodeId -> Point -> [NodeId] + pickNodeLinksClose nid p = + case map snd $ KdMap.kNearest nodesKdMap (p2pNodeLinksClose + 1) p of + (nid' : nids) -> assert (nid == nid') $ nids + [] -> assert False [] -- For efficiency in finding the K nearest neighbours, we use a K-D map -- of all the nodes, and then do queries in that. diff --git a/simulation/src/PraosProtocol/BlockFetch.hs b/simulation/src/PraosProtocol/BlockFetch.hs index 1931544f..df32f4fe 100644 --- a/simulation/src/PraosProtocol/BlockFetch.hs +++ b/simulation/src/PraosProtocol/BlockFetch.hs @@ -42,8 +42,9 @@ import Control.Concurrent.Class.MonadSTM ( import Control.Concurrent.Class.MonadSTM.TBQueue import Control.Exception (assert) import Control.Monad (forM, forever, guard, join, unless, void, when, (<=<)) -import Control.Tracer (Tracer, traceWith) +import Control.Tracer (Contravariant (contramap), Tracer, traceWith) import Data.Bifunctor (second) +import Data.Foldable (forM_) import Data.Kind import qualified Data.List as List import Data.Map.Strict (Map) @@ -603,31 +604,73 @@ initBlockFetchConsumerStateForPeerId tracer peerId blockFetchControllerState sub setupValidatorThreads :: (MonadSTM m, MonadDelay m) => + Tracer m (PraosNodeEvent BlockBody) -> PraosConfig BlockBody -> BlockFetchControllerState BlockBody m -> -- | bound on queue length. Natural -> m ([m ()], Block BlockBody -> m () -> m ()) -setupValidatorThreads cfg st n = do +setupValidatorThreads tracer cfg st n = do queue <- newTBQueueIO n - waitingVar <- newTVarIO Map.empty - let validate (block, completion) = threadDelaySI (cfg.blockValidationDelay block) >> completion - let fetch = forever $ do - req@(block, _) <- atomically $ readTBQueue queue + (waitingVar, processWaitingThread) <- setupProcessWaitingThread (contramap PraosNodeEventCPU tracer) (Just 1) st.blocksVar + let doTask (delay, m) = do + traceWith tracer . PraosNodeEventCPU . CPUTask $ delay + threadDelaySI delay + m + + -- if we have the previous block, we process the task sequentially to provide back pressure on the queue. + let waitForPrev block task = case blockPrevHash block of + GenesisHash -> doTask task + BlockHash prev -> do + havePrev <- Map.member prev <$> readTVarIO st.blocksVar + -- Note: for pure praos this also means we have the ledger state. + if havePrev + then doTask task + else atomically $ modifyTVar' waitingVar (Map.insertWith (++) prev [task]) + fetch = forever $ do + (block, completion) <- atomically $ readTBQueue queue assert (blockInvariant block) $ do - case blockPrevHash block of - GenesisHash -> validate req - BlockHash prev -> do - havePrev <- Map.member prev <$> readTVarIO st.blocksVar - -- Note: for pure praos this also means we have the ledger state. - if havePrev - then validate req - else atomically $ modifyTVar' waitingVar (Map.insertWith (++) prev [req]) + waitForPrev block $ + let !delay = cfg.blockValidationDelay block + in (delay, completion) add block completion = atomically $ writeTBQueue queue (block, completion) - processWaiting = forever $ join $ atomically $ do - blocks <- readTVar st.blocksVar - waiting <- readTVar waitingVar - let toValidate = Map.intersection waiting blocks - when (Map.null toValidate) retry - return $ mapM_ validate $ concat $ Map.elems $ toValidate - return ([fetch, processWaiting], add) + return ([fetch, processWaitingThread], add) + +setupProcessWaitingThread :: + forall m a b. + (MonadSTM m, MonadDelay m) => + Tracer m CPUTask -> + -- | how many waiting to process in parallel + Maybe Int -> + TVar m (Map ConcreteHeaderHash a) -> + m (TVar m (Map ConcreteHeaderHash [(DiffTime, m b)]), m ()) +setupProcessWaitingThread tracer npar blocksVar = do + waitingVar <- newTVarIO Map.empty + return $ (waitingVar, processWaiting tracer npar blocksVar waitingVar) + +processWaiting :: + forall m a b. + (MonadSTM m, MonadDelay m) => + Tracer m CPUTask -> + -- | how many waiting to process in parallel + Maybe Int -> + TVar m (Map ConcreteHeaderHash a) -> + TVar m (Map ConcreteHeaderHash [(DiffTime, m b)]) -> + m () +processWaiting tracer npar blocksVar waitingVar = go + where + parallelDelay xs = do + let !d = maximum $ map fst xs + forM_ xs $ traceWith tracer . CPUTask . fst + threadDelaySI d + mapM_ snd xs + go = forever $ join $ atomically $ do + waiting <- readTVar waitingVar + when (Map.null waiting) retry + blocks <- readTVar blocksVar + let toValidate = Map.intersection waiting blocks + when (Map.null toValidate) retry + writeTVar waitingVar $! waiting Map.\\ toValidate + let chunks Nothing xs = [xs] + chunks (Just m) xs = map (take m) . takeWhile (not . null) . iterate (drop m) $ xs + return $ mapM_ parallelDelay $ chunks npar $ concat $ Map.elems $ toValidate diff --git a/simulation/src/PraosProtocol/ChainSync.hs b/simulation/src/PraosProtocol/ChainSync.hs index 9d15a8a7..fb816d3e 100644 --- a/simulation/src/PraosProtocol/ChainSync.hs +++ b/simulation/src/PraosProtocol/ChainSync.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE BangPatterns #-} {-# LANGUAGE ConstraintKinds #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE DerivingVia #-} @@ -20,6 +21,7 @@ import Control.Concurrent.Class.MonadSTM ( ) import Control.Exception (assert) import Control.Monad (void) +import Control.Tracer import Data.Maybe (fromMaybe) import Data.Type.Equality ((:~:) (Refl)) import Network.TypedProtocol ( @@ -153,22 +155,24 @@ newtype ChainConsumerState m = ChainConsumerState runChainConsumer :: (MonadSTM m, MonadDelay m) => + Tracer m (PraosNodeEvent body) -> PraosConfig body -> Chan m ChainSyncMessage -> ChainConsumerState m -> m () -runChainConsumer cfg chan st = - void $ runPeerWithDriver (chanDriver decideChainSyncState chan) (chainConsumer cfg st) +runChainConsumer tracer cfg chan st = + void $ runPeerWithDriver (chanDriver decideChainSyncState chan) (chainConsumer tracer cfg st) type ChainConsumer st m a = TC.Client ChainSyncState 'NonPipelined st m a chainConsumer :: forall m body. (MonadSTM m, MonadDelay m) => + Tracer m (PraosNodeEvent body) -> PraosConfig body -> ChainConsumerState m -> ChainConsumer 'StIdle m () -chainConsumer cfg (ChainConsumerState hchainVar) = idle True +chainConsumer tracer cfg (ChainConsumerState hchainVar) = idle True where -- NOTE: The specification says to do an initial intersection with -- exponentially spaced points, and perform binary search to @@ -202,7 +206,9 @@ chainConsumer cfg (ChainConsumerState hchainVar) = idle True rollForward :: BlockHeader -> ChainConsumer 'StIdle m () rollForward header = TC.Effect $ do - threadDelaySI (cfg.headerValidationDelay header) + let !delay = cfg.headerValidationDelay header + traceWith tracer $ PraosNodeEventCPU (CPUTask delay) + threadDelaySI delay atomically $ do modifyTVar' hchainVar $ Chain.addBlock header return $ idle False @@ -219,7 +225,12 @@ chainConsumer cfg (ChainConsumerState hchainVar) = idle True ---- ChainSync Producer -------------------------------- -runChainProducer :: (IsBody body, MonadSTM m) => Chan m ChainSyncMessage -> FollowerId -> TVar m (ChainProducerState (Block body)) -> m () +runChainProducer :: + (IsBody body, MonadSTM m) => + Chan m ChainSyncMessage -> + FollowerId -> + TVar m (ChainProducerState (Block body)) -> + m () runChainProducer chan followerId stVar = void $ runPeerWithDriver (chanDriver decideChainSyncState chan) (chainProducer followerId stVar) diff --git a/simulation/src/PraosProtocol/Common.hs b/simulation/src/PraosProtocol/Common.hs index 00487d55..074375ef 100644 --- a/simulation/src/PraosProtocol/Common.hs +++ b/simulation/src/PraosProtocol/Common.hs @@ -37,6 +37,8 @@ module PraosProtocol.Common ( kilobytes, module TimeCompat, defaultPraosConfig, + CPUTask (..), + hashToColor, ) where import Control.Concurrent.Class.MonadSTM ( @@ -55,6 +57,7 @@ import ChanTCP (MessageSize (..)) import Data.Coerce (coerce) import Data.Word (Word8) import SimTCPLinks (kilobytes) +import SimTypes (CPUTask (..)) import System.Random (mkStdGen, uniform) import TimeCompat @@ -120,7 +123,7 @@ slotConfigFromNow = do start <- getCurrentTime return $ SlotConfig{start, duration = 1} -blockBodyColor :: BlockBody -> (Double, Double, Double) +blockBodyColor :: IsBody body => body -> (Double, Double, Double) blockBodyColor = hashToColor . coerce . hashBody blockHeaderColor :: BlockHeader -> (Double, Double, Double) @@ -140,6 +143,7 @@ data PraosNodeEvent body | PraosNodeEventReceived (Block body) | PraosNodeEventEnterState (Block body) | PraosNodeEventNewTip (Chain (Block body)) + | PraosNodeEventCPU CPUTask deriving (Show) data PraosConfig body = PraosConfig diff --git a/simulation/src/PraosProtocol/Common/Chain.hs b/simulation/src/PraosProtocol/Common/Chain.hs index 6c11ec8b..fbf0d3f2 100644 --- a/simulation/src/PraosProtocol/Common/Chain.hs +++ b/simulation/src/PraosProtocol/Common/Chain.hs @@ -1,4 +1,4 @@ -module PraosProtocol.Common.Chain (module Chain) where +module PraosProtocol.Common.Chain (module Chain, dropUntil) where import Ouroboros.Network.Mock.Chain as Chain ( Chain (..), @@ -45,3 +45,10 @@ import Ouroboros.Network.Mock.Chain as Chain ( valid, validExtension, ) + +-- | Returns prefix where the head block satisfies the predicate. +dropUntil :: (blk -> Bool) -> Chain blk -> Chain blk +dropUntil _ Genesis = Genesis +dropUntil p c0@(c :> blk) + | p blk = c0 + | otherwise = dropUntil p c diff --git a/simulation/src/PraosProtocol/ExamplesPraosP2P.hs b/simulation/src/PraosProtocol/ExamplesPraosP2P.hs index 4a174f01..f18119f6 100644 --- a/simulation/src/PraosProtocol/ExamplesPraosP2P.hs +++ b/simulation/src/PraosProtocol/ExamplesPraosP2P.hs @@ -32,7 +32,7 @@ import qualified PraosProtocol.Common.Chain as Chain import PraosProtocol.PraosNode import PraosProtocol.SimPraos import PraosProtocol.SimPraosP2P -import PraosProtocol.VizSimPraos (ChainsMap, DiffusionLatencyMap, PraosVizConfig (..), accumChains, accumDiffusionLatency, examplesPraosSimVizConfig, praosSimVizModel) +import PraosProtocol.VizSimPraos (ChainsMap, DiffusionLatencyMap, PraosVizConfig' (blockFetchMessageColor), accumChains, accumDiffusionLatency, examplesPraosSimVizConfig, praosSimVizModel) import PraosProtocol.VizSimPraosP2P import Sample import SimTCPLinks (mkTcpConnProps) @@ -104,9 +104,11 @@ data LatencyPerStake = LatencyPerStake data DiffusionData = DiffusionData { topography :: P2PTopographyCharacteristics + , topography_details :: P2PTopography , entries :: [DiffusionEntry] , latency_per_stake :: [LatencyPerStake] , stable_chain_hashes :: [Int] + , cpuTasks :: Map.Map NodeId [(DiffTime, CPUTask)] } deriving (Generic, ToJSON, FromJSON) @@ -117,22 +119,24 @@ diffusionEntryToLatencyPerStake nnodes DiffusionEntry{..} = , latencies = bin $ diffusionLatencyPerStakeFraction nnodes (Time created) (map Time arrivals) } where - bins = [0.5, 0.8, 0.9, 0.92, 0.94, 0.96, 0.98, 1] + bins = [0.05, 0.10, 0.20, 0.30, 0.40, 0.50, 0.60, 0.70, 0.8, 0.9, 0.92, 0.94, 0.96, 0.98, 1] bin xs = map (\b -> (,b) $ fst <$> listToMaybe (dropWhile (\(_, x) -> x < b) xs)) $ bins data DiffusionLatencyState = DiffusionLatencyState { chains :: !ChainsMap , diffusions :: !DiffusionLatencyMap + , cpuTasks :: !(Map.Map NodeId [(DiffTime, CPUTask)]) } -diffusionSampleModel :: P2PTopographyCharacteristics -> FilePath -> SampleModel PraosEvent DiffusionLatencyState -diffusionSampleModel p2pTopographyCharacteristics fp = SampleModel initState accum render +diffusionSampleModel :: P2PTopographyCharacteristics -> P2PTopography -> FilePath -> SampleModel PraosEvent DiffusionLatencyState +diffusionSampleModel p2pTopographyCharacteristics p2pTopography fp = SampleModel initState accum render where - initState = DiffusionLatencyState IMap.empty Map.empty + initState = DiffusionLatencyState IMap.empty Map.empty Map.empty accum t e DiffusionLatencyState{..} = DiffusionLatencyState { chains = accumChains t e chains , diffusions = accumDiffusionLatency t e diffusions + , cpuTasks = accumCPUTasks t e cpuTasks } nnodes = p2pNumNodes p2pTopographyCharacteristics render DiffusionLatencyState{..} = do @@ -155,9 +159,11 @@ diffusionSampleModel p2pTopographyCharacteristics fp = SampleModel initState acc let diffusionData = DiffusionData { topography = p2pTopographyCharacteristics + , topography_details = p2pTopography , entries , latency_per_stake = map (diffusionEntryToLatencyPerStake nnodes) entries , stable_chain_hashes + , cpuTasks } encodeFile fp diffusionData @@ -169,6 +175,10 @@ diffusionSampleModel p2pTopographyCharacteristics fp = SampleModel initState acc putStrLn $ "with a maximum diffusion latency: " ++ show (maximum $ snd arrived98) putStrLn $ "Blocks in longest common prefix that did not reach 98% stake: " ++ show missing +accumCPUTasks :: Time -> PraosEvent -> Map.Map NodeId [(DiffTime, CPUTask)] -> Map.Map NodeId [(DiffTime, CPUTask)] +accumCPUTasks (Time t) (PraosEventNode (LabelNode nId (PraosNodeEventCPU task))) = Map.insertWith (++) nId [(t, task)] +accumCPUTasks _ _ = id + -- | Diffusion example with 1000 nodes. example1000Diffusion :: -- | number of close links @@ -181,7 +191,7 @@ example1000Diffusion :: FilePath -> IO () example1000Diffusion clinks rlinks stop fp = - runSampleModel (diffusionSampleModel p2pTopographyCharacteristics fp) stop $ + runSampleModel (diffusionSampleModel p2pTopographyCharacteristics p2pTopography fp) stop $ example1Trace rng 20 p2pTopography where rng = mkStdGen 42 diff --git a/simulation/src/PraosProtocol/PraosNode.hs b/simulation/src/PraosProtocol/PraosNode.hs index c4fb8b6c..b790d3ca 100644 --- a/simulation/src/PraosProtocol/PraosNode.hs +++ b/simulation/src/PraosProtocol/PraosNode.hs @@ -16,7 +16,7 @@ import Data.ByteString (ByteString) import Data.Coerce (coerce) import Data.Either (fromLeft, fromRight) import Data.Map (Map) -import qualified Data.Map as Map +import qualified Data.Map.Strict as Map import PraosProtocol.BlockFetch (BlockFetchControllerState, BlockFetchMessage, BlockFetchProducerState (..), PeerId, blockFetchController, initBlockFetchConsumerStateForPeerId, newBlockFetchControllerState, runBlockFetchConsumer, runBlockFetchProducer) import qualified PraosProtocol.BlockFetch as BlockFetch import PraosProtocol.BlockGeneration @@ -77,7 +77,7 @@ runPeer :: runPeer tracer cfg f st peerId chan = do let chainConsumerState = st.chainSyncConsumerStates Map.! peerId let blockFetchConsumerState = initBlockFetchConsumerStateForPeerId tracer peerId st.blockFetchControllerState f - [ Concurrently $ runChainConsumer cfg (protocolChainSync chan) chainConsumerState + [ Concurrently $ runChainConsumer tracer cfg (protocolChainSync chan) chainConsumerState , Concurrently $ runBlockFetchConsumer tracer cfg (protocolBlockFetch chan) blockFetchConsumerState ] @@ -137,7 +137,7 @@ setupPraosThreads :: [Praos BlockBody (Chan m)] -> m [Concurrently m ()] setupPraosThreads tracer cfg st0 followers peers = do - (ts, f) <- BlockFetch.setupValidatorThreads cfg st0.blockFetchControllerState 1 -- TODO: parameter + (ts, f) <- BlockFetch.setupValidatorThreads tracer cfg st0.blockFetchControllerState 1 -- TODO: parameter (map Concurrently ts ++) <$> setupPraosThreads' tracer cfg f st0 followers peers setupPraosThreads' :: diff --git a/simulation/src/PraosProtocol/SimBlockFetch.hs b/simulation/src/PraosProtocol/SimBlockFetch.hs index ab68f22d..b6621b60 100644 --- a/simulation/src/PraosProtocol/SimBlockFetch.hs +++ b/simulation/src/PraosProtocol/SimBlockFetch.hs @@ -84,7 +84,7 @@ traceRelayLink1 tcpprops = nodeA praosConfig chan = do peerChainVar <- newTVarIO (blockHeader <$> bchain) (st, peerId) <- newBlockFetchControllerState Genesis >>= addPeer (asReadOnly peerChainVar) - (ts, submitFetchedBlock) <- setupValidatorThreads praosConfig st 1 + (ts, submitFetchedBlock) <- setupValidatorThreads nullTracer praosConfig st 1 concurrently_ (mapConcurrently_ id ts) $ concurrently_ ( blockFetchController nullTracer st diff --git a/simulation/src/PraosProtocol/SimChainSync.hs b/simulation/src/PraosProtocol/SimChainSync.hs index d20c6a54..fb17e96f 100644 --- a/simulation/src/PraosProtocol/SimChainSync.hs +++ b/simulation/src/PraosProtocol/SimChainSync.hs @@ -16,7 +16,7 @@ import Control.Monad.Class.MonadAsync ( import Control.Monad.IOSim as IOSim (IOSim, runSimTrace) import Control.Tracer as Tracer ( Contravariant (contramap), - Tracer, + Tracer (Tracer), traceWith, ) import qualified Data.ByteString as BS @@ -89,7 +89,8 @@ traceRelayLink1 tcpprops = where consumerNode cfg chan = do st <- ChainConsumerState <$> newTVarIO Chain.Genesis - runChainConsumer cfg chan st + let nullTracer = Tracer $ const $ return () + runChainConsumer nullTracer cfg chan st producerNode chan = do let chain = mkChainSimple $ replicate 10 (BlockBody $ BS.replicate 100 0) let (cps, fId) = initFollower GenesisPoint $ initChainProducerState chain diff --git a/simulation/src/PraosProtocol/VizSimBlockFetch.hs b/simulation/src/PraosProtocol/VizSimBlockFetch.hs index 7efaf55e..edbd0364 100644 --- a/simulation/src/PraosProtocol/VizSimBlockFetch.hs +++ b/simulation/src/PraosProtocol/VizSimBlockFetch.hs @@ -1,7 +1,6 @@ {-# LANGUAGE MonoLocalBinds #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE ScopedTypeVariables #-} -{-# OPTIONS_GHC -Wno-incomplete-patterns #-} module PraosProtocol.VizSimBlockFetch where @@ -185,6 +184,7 @@ relaySimVizModel = [(msg, msgforecast, msgforecasts)] (vizMsgsInTransit vs) } + accumEventVizState _ (BlockFetchEventNode _) vs = vs pruneVisState :: Time -> diff --git a/simulation/src/PraosProtocol/VizSimChainSync.hs b/simulation/src/PraosProtocol/VizSimChainSync.hs index d37a7e4b..098e7b68 100644 --- a/simulation/src/PraosProtocol/VizSimChainSync.hs +++ b/simulation/src/PraosProtocol/VizSimChainSync.hs @@ -1,7 +1,6 @@ {-# LANGUAGE MonoLocalBinds #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE ScopedTypeVariables #-} -{-# OPTIONS_GHC -Wno-incomplete-patterns #-} module PraosProtocol.VizSimChainSync where @@ -191,6 +190,7 @@ relaySimVizModel = [(msg, msgforecast, msgforecasts)] (vizMsgsInTransit vs) } + accumEventVizState _ (ChainSyncEventNode _) vs = vs -- accumEventVizState now (ChainSyncEventNode (LabelNode nid (RelayNodeEventEnterQueue msg))) vs = -- vs -- { vizMsgsAtNodeQueue = diff --git a/simulation/src/PraosProtocol/VizSimPraos.hs b/simulation/src/PraosProtocol/VizSimPraos.hs index 30390a7e..41afca52 100644 --- a/simulation/src/PraosProtocol/VizSimPraos.hs +++ b/simulation/src/PraosProtocol/VizSimPraos.hs @@ -3,7 +3,6 @@ {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} -{-# OPTIONS_GHC -Wno-incomplete-patterns #-} module PraosProtocol.VizSimPraos where @@ -48,7 +47,7 @@ example1 = where trace = exampleTrace1 -examplesPraosSimVizConfig :: PraosVizConfig +examplesPraosSimVizConfig :: forall body. IsBody body => PraosVizConfig' body examplesPraosSimVizConfig = PraosVizConfig{..} where chainSyncMessageColor :: @@ -64,14 +63,14 @@ examplesPraosSimVizConfig = PraosVizConfig{..} chainSyncMessageText (ProtocolMessage (SomeMessage msg)) = Just $ chainSyncMessageLabel msg blockFetchMessageColor :: - BlockFetchMessage BlockBody -> + BlockFetchMessage body -> (Double, Double, Double) blockFetchMessageColor (ProtocolMessage (SomeMessage msg)) = case msg of MsgBlock blk -> blockBodyColor blk _otherwise -> (1, 0, 0) blockFetchMessageText :: - BlockFetchMessage BlockBody -> + BlockFetchMessage body -> Maybe String blockFetchMessageText (ProtocolMessage (SomeMessage msg)) = Just $ blockFetchMessageLabel msg @@ -284,6 +283,7 @@ praosSimVizModel = [(msg, msgforecast, msgforecasts)] (vizMsgsInTransit vs) } + accumEventVizState _ (PraosEventNode (LabelNode _ (PraosNodeEventCPU _))) vs = vs pruneVisState :: Time -> @@ -357,13 +357,13 @@ recentPrune now (RecentRate pq) = ------------------------------------------------------------------------------ -- The vizualisation rendering -- - -data PraosVizConfig +type PraosVizConfig = PraosVizConfig' BlockBody +data PraosVizConfig' body = PraosVizConfig { chainSyncMessageColor :: ChainSyncMessage -> (Double, Double, Double) , chainSyncMessageText :: ChainSyncMessage -> Maybe String - , blockFetchMessageColor :: BlockFetchMessage BlockBody -> (Double, Double, Double) - , blockFetchMessageText :: BlockFetchMessage BlockBody -> Maybe String + , blockFetchMessageColor :: BlockFetchMessage body -> (Double, Double, Double) + , blockFetchMessageText :: BlockFetchMessage body -> Maybe String } praosSimVizRender :: diff --git a/simulation/src/RelayProtocol.hs b/simulation/src/RelayProtocol.hs index 4d8ab8a3..20219457 100644 --- a/simulation/src/RelayProtocol.hs +++ b/simulation/src/RelayProtocol.hs @@ -47,7 +47,7 @@ import qualified Data.Foldable as Foldable import Data.Function (on) import Data.List (sortBy) import Data.Map (Map) -import qualified Data.Map as Map +import qualified Data.Map.Strict as Map import Data.Set (Set) import qualified Data.Set as Set import Data.Word (Word64) diff --git a/simulation/src/SimTypes.hs b/simulation/src/SimTypes.hs index bda0dae2..3e26c5ec 100644 --- a/simulation/src/SimTypes.hs +++ b/simulation/src/SimTypes.hs @@ -8,6 +8,11 @@ import Data.Aeson.Types (FromJSON, FromJSONKey, ToJSON (..), ToJSONKey, defaultO import Data.Hashable import Data.Ix (Ix) import GHC.Generics (Generic) +import TimeCompat (DiffTime) + +newtype CPUTask = CPUTask {cpuTaskDuration :: DiffTime} + deriving (Eq, Ord, Show, Generic) + deriving newtype (ToJSON, FromJSON) newtype NodeId = NodeId Int deriving (Eq, Ord, Ix, Show) diff --git a/simulation/src/Topology.hs b/simulation/src/Topology.hs index 0f28ed5d..32359c36 100644 --- a/simulation/src/Topology.hs +++ b/simulation/src/Topology.hs @@ -35,7 +35,7 @@ import qualified Data.GraphViz.Types as GVT (PrintDot) import qualified Data.GraphViz.Types.Generalised as GVTG import Data.IORef (atomicModifyIORef', newIORef, readIORef) import Data.Map (Map) -import qualified Data.Map as M +import qualified Data.Map.Strict as M import Data.Maybe (maybeToList) import qualified Data.Sequence as Seq import qualified Data.Set as S diff --git a/simulation/src/Viz.hs b/simulation/src/Viz.hs index eb75bff6..97c871d0 100644 --- a/simulation/src/Viz.hs +++ b/simulation/src/Viz.hs @@ -700,7 +700,7 @@ layoutLabelTime = LayoutFixed $ Layout VizRender - { renderReqSize = (400, 20) + { renderReqSize = (200, 20) , renderChanged = \_t _fn _ -> True , renderModel }