diff --git a/simulation/src/LeiosProtocol/Relay.hs b/simulation/src/LeiosProtocol/Relay.hs index 2f71918..5adbd36 100644 --- a/simulation/src/LeiosProtocol/Relay.hs +++ b/simulation/src/LeiosProtocol/Relay.hs @@ -497,8 +497,7 @@ data SubmitPolicy = SubmitInOrder | SubmitAll data RelayConsumerConfig id header body m = RelayConsumerConfig { relay :: !RelayConfig - , headerValidationDelay :: header -> DiffTime - , threadDelayParallel :: [DiffTime] -> m () + , validateHeaders :: [header] -> m () , headerId :: !(header -> id) , prioritize :: !(Map id header -> [header]) -- ^ returns a subset of headers, in order of what should be fetched first. @@ -785,7 +784,7 @@ relayConsumerPipelined config sst = unless (Seq.length idsSeq <= fromIntegral windowExpand) $ throw IdsNotRequested - config.threadDelayParallel $ map config.headerValidationDelay headers + config.validateHeaders headers -- Upon receiving a batch of new headers we extend our available set, -- and extend the unacknowledged sequence. diff --git a/simulation/src/LeiosProtocol/Short/Generate.hs b/simulation/src/LeiosProtocol/Short/Generate.hs index c680447..2522d38 100644 --- a/simulation/src/LeiosProtocol/Short/Generate.hs +++ b/simulation/src/LeiosProtocol/Short/Generate.hs @@ -92,7 +92,7 @@ data BlockGeneratorConfig m = BlockGeneratorConfig , nodeId :: NodeId , buffers :: BuffersView m , schedule :: SlotNo -> m [(SomeRole, Word64)] - , submit :: [([CPUTask], SomeAction)] -> m () + , submit :: [(Maybe CPUTask, SomeAction)] -> m () } blockGenerator :: @@ -109,28 +109,28 @@ blockGenerator BlockGeneratorConfig{..} = go (0, 0) submit actions go (blkId', slot + 1) execute slot (SomeRole r, wins) = assert (wins >= 1) $ second (SomeAction r) <$> execute' slot r wins - execute' :: SlotNo -> Role a -> Word64 -> StateT Int m ([CPUTask], a) + execute' :: SlotNo -> Role a -> Word64 -> StateT Int m (Maybe CPUTask, a) execute' slot Base _wins = do rbData <- lift $ atomically buffers.newRBData let meb = rbData.freshestCertifiedEB let !task = CPUTask $ maybe 0 (leios.delays.certificateCreation . snd) meb let body = mkRankingBlockBody leios nodeId meb rbData.txsPayload let !rb = mkPartialBlock slot body - return ([task], rb) + return (Just task, rb) execute' slot Propose wins = - ([],) <$> do + (Nothing,) <$> do ibData <- lift $ atomically buffers.newIBData forM [toEnum $ fromIntegral sub | sub <- [0 .. wins - 1]] $ \sub -> do i <- nextBlkId InputBlockId let header = mkInputBlockHeader leios i slot sub nodeId ibData.referenceRankingBlock return $! mkInputBlock leios header ibData.txsPayload execute' slot Endorse _wins = - ([],) <$> do + (Nothing,) <$> do i <- nextBlkId EndorseBlockId ibs <- lift $ atomically buffers.ibs return $! mkEndorseBlock leios i slot nodeId $ inputBlocksToEndorse leios slot ibs execute' slot Vote votes = - ([],) <$> do + (Nothing,) <$> do votingFor <- lift $ atomically $ do ibs <- buffers.ibs ebs <- buffers.ebs diff --git a/simulation/src/LeiosProtocol/Short/Node.hs b/simulation/src/LeiosProtocol/Short/Node.hs index 5a6f6bc..523d1dd 100644 --- a/simulation/src/LeiosProtocol/Short/Node.hs +++ b/simulation/src/LeiosProtocol/Short/Node.hs @@ -13,6 +13,7 @@ module LeiosProtocol.Short.Node where import ChanMux import Control.Category ((>>>)) import Control.Concurrent.Class.MonadMVar +import Control.Concurrent.Class.MonadSTM.TSem import Control.Exception (assert) import Control.Monad (forever, guard, when) import Control.Monad.Class.MonadAsync @@ -21,6 +22,7 @@ import Control.Tracer import Data.Bifunctor import Data.Coerce (coerce) import Data.Foldable (forM_) +import Data.Ix (Ix) import Data.List (sort, sortOn) import Data.Map (Map) import qualified Data.Map.Strict as Map @@ -34,12 +36,13 @@ import qualified LeiosProtocol.RelayBuffer as RB import LeiosProtocol.Short import LeiosProtocol.Short.Generate import qualified LeiosProtocol.Short.Generate as Generate +import LeiosProtocol.TaskMultiQueue import ModelTCP import Numeric.Natural (Natural) import PraosProtocol.BlockFetch ( BlockFetchControllerState (blocksVar), addProducedBlock, - processWaiting, + processWaiting', ) import qualified PraosProtocol.Common.Chain as Chain import qualified PraosProtocol.PraosNode as PraosNode @@ -82,8 +85,11 @@ data LeiosNodeConfig = LeiosNodeConfig , inputBlockPayload :: !Bytes -- ^ overall size of txs to include in IBs , processingQueueBound :: !Natural + , processingCores :: NumCores } +data NumCores = Infinite | Finite Int + -------------------------------------------------------------- ---- Node State -------------------------------------------------------------- @@ -94,15 +100,28 @@ data LeiosNodeState m = LeiosNodeState , relayEBState :: !(RelayEBState m) , relayVoteState :: !(RelayVoteState m) , ibDeliveryTimesVar :: !(TVar m (Map InputBlockId UTCTime)) - , validationQueue :: !(TBQueue m (ValidationRequest m)) - , waitingForRBVar :: !(TVar m (Map (HeaderHash RankingBlock) [(DiffTime, m ())])) + , taskQueue :: !(TaskMultiQueue LeiosNodeTask m) + , waitingForRBVar :: !(TVar m (Map (HeaderHash RankingBlock) [m ()])) -- ^ waiting for RB block itself to be validated. - , waitingForLedgerStateVar :: !(TVar m (Map (HeaderHash RankingBlock) [(DiffTime, m ())])) + , waitingForLedgerStateVar :: !(TVar m (Map (HeaderHash RankingBlock) [m ()])) -- ^ waiting for ledger state of RB block to be validated. , ledgerStateVar :: !(TVar m (Map (HeaderHash RankingBlock) LedgerState)) , ibsNeededForEBVar :: !(TVar m (Map EndorseBlockId (Set InputBlockId))) } +data LeiosNodeTask + = ValIB + | ValEB + | ValVote + | ValRB + | ValIH + | ValRH + | GenIB + | GenEB + | GenVote + | GenRB + deriving (Eq, Ord, Ix, Bounded) + type RelayIBState = RelayConsumerSharedState InputBlockId InputBlockHeader InputBlockBody type RelayEBState = RelayConsumerSharedState EndorseBlockId EndorseBlockId EndorseBlock type RelayVoteState = RelayConsumerSharedState VoteId VoteId VoteMsg @@ -193,14 +212,14 @@ relayIBConfig :: (MonadAsync m, MonadSTM m, MonadDelay m, MonadTime m) => Tracer m LeiosNodeEvent -> LeiosNodeConfig -> + ([InputBlockHeader] -> m ()) -> SubmitBlocks m InputBlockHeader InputBlockBody -> RelayConsumerConfig InputBlockId InputBlockHeader InputBlockBody m -relayIBConfig tracer cfg submitBlocks = +relayIBConfig _tracer _cfg validateHeaders submitBlocks = RelayConsumerConfig { relay = RelayConfig{maxWindowSize = 100} , headerId = (.id) - , headerValidationDelay = cfg.leios.delays.inputBlockHeaderValidation - , threadDelayParallel = threadDelayParallel tracer + , validateHeaders , -- TODO: add prioritization policy to LeiosConfig prioritize = sortOn (Down . (.slot)) . Map.elems , submitPolicy = SubmitAll @@ -215,12 +234,11 @@ relayEBConfig :: LeiosNodeConfig -> SubmitBlocks m EndorseBlockId EndorseBlock -> RelayConsumerConfig EndorseBlockId EndorseBlockId EndorseBlock m -relayEBConfig tracer _cfg submitBlocks = +relayEBConfig _tracer _cfg submitBlocks = RelayConsumerConfig { relay = RelayConfig{maxWindowSize = 100} , headerId = id - , headerValidationDelay = const 0 - , threadDelayParallel = threadDelayParallel tracer + , validateHeaders = const $ return () , -- TODO: add prioritization policy to LeiosConfig? prioritize = sort . Map.elems , submitPolicy = SubmitAll @@ -235,12 +253,11 @@ relayVoteConfig :: LeiosNodeConfig -> SubmitBlocks m VoteId VoteMsg -> RelayConsumerConfig VoteId VoteId VoteMsg m -relayVoteConfig tracer _cfg submitBlocks = +relayVoteConfig _tracer _cfg submitBlocks = RelayConsumerConfig { relay = RelayConfig{maxWindowSize = 100} , headerId = id - , headerValidationDelay = const 0 - , threadDelayParallel = threadDelayParallel tracer + , validateHeaders = const $ return () , -- TODO: add prioritization policy to LeiosConfig? prioritize = sort . Map.elems , submitPolicy = SubmitAll @@ -249,12 +266,16 @@ relayVoteConfig tracer _cfg submitBlocks = , submitBlocks } -threadDelayParallel :: MonadDelay m => Tracer m LeiosNodeEvent -> [DiffTime] -> m () -threadDelayParallel _tracer [] = return () -threadDelayParallel tracer ds = do - forM_ ds (traceWith tracer . LeiosNodeEventCPU . CPUTask) - let d = maximum ds - when (d >= 0) $ threadDelaySI d +queueAndWait :: (MonadSTM m, MonadDelay m) => LeiosNodeState m -> LeiosNodeTask -> [CPUTask] -> m () +queueAndWait _st _lbl [] = return () +queueAndWait st lbl ds = do + let l = fromIntegral $ length ds + sem <- atomically $ do + sem <- newTSem (1 - l) + forM_ ds $ \task -> do + writeTMQueue st.taskQueue lbl (task, atomically $ signalTSem sem) + return sem + atomically $ waitTSem sem newLeiosNodeState :: forall m. @@ -263,7 +284,6 @@ newLeiosNodeState :: m (LeiosNodeState m) newLeiosNodeState cfg = do praosState <- PraosNode.newPraosNodeState cfg.baseChain - validationQueue <- newTBQueueIO cfg.processingQueueBound relayIBState <- newRelayState relayEBState <- newRelayState relayVoteState <- newRelayState @@ -272,11 +292,12 @@ newLeiosNodeState cfg = do ledgerStateVar <- newTVarIO Map.empty waitingForRBVar <- newTVarIO Map.empty waitingForLedgerStateVar <- newTVarIO Map.empty + taskQueue <- atomically $ newTaskMultiQueue cfg.processingQueueBound return $ LeiosNodeState{..} leiosNode :: forall m. - (MonadMVar m, MonadFork m, MonadAsync m, MonadSTM m, MonadTime m, MonadDelay m) => + (MonadMVar m, MonadFork m, MonadAsync m, MonadSTM m, MonadTime m, MonadDelay m, MonadMonotonicTime m) => Tracer m LeiosNodeEvent -> LeiosNodeConfig -> [Leios (Chan m)] -> @@ -288,22 +309,29 @@ leiosNode tracer cfg followers peers = do traceReceived :: [a] -> (a -> LeiosEventBlock) -> m () traceReceived xs f = mapM_ (traceWith tracer . LeiosNodeEvent Received . f) xs + let dispatch = dispatchValidation tracer cfg leiosState -- tracing for RB already covered in blockFetchConsumer. - let submitRB rb completion = atomically $ writeTBQueue validationQueue $! ValidateRB rb completion + let submitRB rb completion = dispatch $! ValidateRB rb completion let submitIB xs deliveryTime completion = do traceReceived xs $ EventIB . uncurry InputBlock - atomically $ writeTBQueue validationQueue $! ValidateIBS xs deliveryTime completion + dispatch $! ValidateIBS xs deliveryTime completion let submitVote (map snd -> xs) _ completion = do traceReceived xs EventVote - atomically $ writeTBQueue validationQueue $! ValidateVotes xs $ completion . map (\v -> (v.id, v)) + dispatch $! ValidateVotes xs $ completion . map (\v -> (v.id, v)) let submitEB (map snd -> xs) _ completion = do traceReceived xs EventEB - atomically $ writeTBQueue validationQueue $! ValidateEBS xs $ completion . map (\eb -> (eb.id, eb)) + dispatch $! ValidateEBS xs $ completion . map (\eb -> (eb.id, eb)) + let valHeaderIB = + queueAndWait leiosState ValIH . map (CPUTask . cfg.leios.delays.inputBlockHeaderValidation) + let valHeaderRB h = do + let !delay = cfg.leios.praos.headerValidationDelay h + queueAndWait leiosState ValRH [CPUTask delay] praosThreads <- PraosNode.setupPraosThreads' (contramap PraosNodeEvent tracer) cfg.leios.praos + valHeaderRB submitRB praosState (map protocolPraos followers) @@ -311,7 +339,7 @@ leiosNode tracer cfg followers peers = do ibThreads <- setupRelay - (relayIBConfig tracer cfg submitIB) + (relayIBConfig tracer cfg valHeaderIB submitIB) relayIBState (map protocolIB followers) (map protocolIB peers) @@ -331,21 +359,17 @@ leiosNode tracer cfg followers peers = do (map protocolVote peers) let processWaitingForRB = - processWaiting - (contramap LeiosNodeEventCPU tracer) - Nothing -- unbounded parallelism + processWaiting' praosState.blockFetchControllerState.blocksVar waitingForRBVar let processWaitingForLedgerState = - processWaiting - (contramap LeiosNodeEventCPU tracer) - Nothing -- unbounded parallelism + processWaiting' ledgerStateVar waitingForLedgerStateVar let processingThreads = - [ validationDispatcher tracer cfg leiosState + [ processCPUTasks cfg.processingCores (contramap LeiosNodeEventCPU tracer) leiosState.taskQueue , processWaitingForRB , processWaitingForLedgerState ] @@ -360,16 +384,25 @@ leiosNode tracer cfg followers peers = do return $ concat - [ coerce praosThreads + [ processingThreads + , blockGenerationThreads , ibThreads , ebThreads , voteThreads - , processingThreads - , blockGenerationThreads - , pruningThreads + , coerce praosThreads , computeLedgerStateThreads + , pruningThreads ] +processCPUTasks :: + (MonadSTM m, MonadDelay m, MonadMonotonicTime m) => + NumCores -> + Tracer m CPUTask -> + TaskMultiQueue LeiosNodeTask m -> + m () +processCPUTasks Infinite tracer queue = forever $ runInfParallelBlocking tracer queue +processCPUTasks (Finite _) _ _ = error "TBD" + computeLedgerStateThread :: forall m. (MonadMVar m, MonadFork m, MonadAsync m, MonadSTM m, MonadTime m, MonadDelay m) => @@ -396,79 +429,81 @@ computeLedgerStateThread _tracer _cfg st = forever $ do -- TODO? trace readyLedgerState return () -validationDispatcher :: +dispatchValidation :: forall m. (MonadMVar m, MonadFork m, MonadAsync m, MonadSTM m, MonadTime m, MonadDelay m) => Tracer m LeiosNodeEvent -> LeiosNodeConfig -> LeiosNodeState m -> + ValidationRequest m -> m () -validationDispatcher tracer cfg leiosState = forever $ do - -- NOTE: IOSim deschedules the thread after an `atomically`, we - -- might get more parallelism by reading the whole buffer at once, - -- collect all resulting delays and do a single - -- `threadDelayParallel` call. - req <- atomically $ readTBQueue leiosState.validationQueue - case req of +dispatchValidation tracer cfg leiosState req = + atomically $ mapM_ (uncurry $ writeTMQueue leiosState.taskQueue) =<< go req + where + queue = atomically . mapM_ (uncurry $ writeTMQueue leiosState.taskQueue) + valRB rb m = do + let !delay = cfg.leios.praos.blockValidationDelay rb + (ValRB, (CPUTask delay, m)) + valIB x deliveryTime completion = + let + !delay = CPUTask $ cfg.leios.delays.inputBlockValidation (uncurry InputBlock x) + task = atomically $ do + completion [x] + + -- NOTE: voting relies on delivery times for IBs + modifyTVar' + leiosState.ibDeliveryTimesVar + (Map.insertWith min (fst x).id deliveryTime) + + -- TODO: likely needs optimization + modifyTVar' leiosState.ibsNeededForEBVar (Map.map (Set.delete (fst x).id)) + in + (ValIB, (delay, task >> traceEnterState [uncurry InputBlock x] EventIB)) + valEB eb completion = (ValEB,) . (CPUTask $ cfg.leios.delays.endorseBlockValidation eb,) $ do + atomically $ do + completion [eb] + ibs <- RB.keySet <$> readTVar leiosState.relayIBState.relayBufferVar + let ibsNeeded = Map.fromList $ [(eb.id, Set.fromList eb.inputBlocks Set.\\ ibs)] + modifyTVar' leiosState.ibsNeededForEBVar (`Map.union` ibsNeeded) + traceEnterState [eb] EventEB + valVote v completion = (ValVote,) . (CPUTask $ cfg.leios.delays.voteMsgValidation v,) $ do + atomically $ completion [v] + traceEnterState [v] EventVote + + go :: ValidationRequest m -> STM m [(LeiosNodeTask, (CPUTask, m ()))] + go x = case x of ValidateRB rb completion -> do - let !delay = cfg.leios.praos.blockValidationDelay rb + let task = valRB rb completion case blockPrevHash rb of GenesisHash -> do - traceWith tracer . LeiosNodeEventCPU . CPUTask $ delay - threadDelaySI delay - completion - BlockHash prev -> atomically $ do + return [task] + BlockHash prev -> do let var = assert (rb.blockBody.payload >= 0) $ if rb.blockBody.payload == 0 then leiosState.waitingForRBVar -- TODO: assumes payload can be validated without content of EB, check with spec. else leiosState.waitingForLedgerStateVar - modifyTVar' var $ Map.insertWith (++) prev [(delay, completion)] + modifyTVar' var $ Map.insertWith (++) prev [queue [task]] + return [] ValidateIBS ibs deliveryTime completion -> do -- NOTE: IBs with an RB reference have to wait for ledger state of that RB. - let valIB x = - let - !delay = cfg.leios.delays.inputBlockValidation (uncurry InputBlock x) - task = atomically $ do - completion [x] - - -- NOTE: voting relies on delivery times for IBs - modifyTVar' - leiosState.ibDeliveryTimesVar - (Map.insertWith min (fst x).id deliveryTime) - - -- TODO: likely needs optimization - modifyTVar' leiosState.ibsNeededForEBVar (Map.map (Set.delete (fst x).id)) - in - (delay, task >> traceEnterState [uncurry InputBlock x] EventIB) let waitingLedgerState = Map.fromListWith (++) - [ (rbHash, [valIB ib]) + [ (rbHash, [queue [valIB ib deliveryTime completion]]) | ib <- ibs , BlockHash rbHash <- [(fst ib).rankingBlock] ] - atomically $ modifyTVar' leiosState.waitingForLedgerStateVar (`Map.union` waitingLedgerState) + modifyTVar' leiosState.waitingForLedgerStateVar (`Map.union` waitingLedgerState) - let (delays, ms) = unzip [valIB ib | ib@(h, _) <- ibs, GenesisHash <- [h.rankingBlock]] - threadDelayParallel tracer delays - sequence_ ms + return [valIB ib deliveryTime completion | ib@(h, _) <- ibs, GenesisHash <- [h.rankingBlock]] ValidateEBS ebs completion -> do -- NOTE: block references are only inspected during voting. - threadDelayParallel tracer $ map cfg.leios.delays.endorseBlockValidation ebs - atomically $ do - completion ebs - ibs <- RB.keySet <$> readTVar leiosState.relayIBState.relayBufferVar - let ibsNeeded = Map.fromList $ map (\eb -> (eb.id, Set.fromList eb.inputBlocks Set.\\ ibs)) ebs - modifyTVar' leiosState.ibsNeededForEBVar (`Map.union` ibsNeeded) - traceEnterState ebs EventEB + return [valEB eb completion | eb <- ebs] ValidateVotes vs completion -> do - threadDelayParallel tracer $ map cfg.leios.delays.voteMsgValidation vs - atomically $ completion vs - traceEnterState vs EventVote - where + return [valVote v completion | v <- vs] traceEnterState :: [a] -> (a -> LeiosEventBlock) -> m () traceEnterState xs f = forM_ xs $ traceWith tracer . LeiosNodeEvent EnterState . f @@ -483,28 +518,29 @@ generator tracer cfg st = do schedule <- mkSchedule cfg let buffers = mkBuffersView cfg st let - submitOne :: ([CPUTask], SomeAction) -> m () - submitOne (delays, x) = do - threadDelayParallel tracer (coerce delays) + withDelay Nothing (_lbl, m) = m + withDelay (Just d) (lbl, m) = atomically $ writeTMQueue st.taskQueue lbl (d, m) + let + submitOne :: (Maybe CPUTask, SomeAction) -> m () + submitOne (delay, x) = withDelay delay $ case x of - SomeAction Generate.Base rb0 -> do + SomeAction Generate.Base rb0 -> (GenRB,) $ do rb <- atomically $ do ha <- Chain.headAnchor <$> PraosNode.preferredChain st.praosState let rb = fixupBlock ha rb0 addProducedBlock st.praosState.blockFetchControllerState rb return rb traceWith tracer (PraosNodeEvent (PraosNodeEventGenerate rb)) - SomeAction Generate.Propose ibs -> forM_ ibs $ \ib -> do + SomeAction Generate.Propose ibs -> (GenIB,) $ forM_ ibs $ \ib -> do atomically $ modifyTVar' st.relayIBState.relayBufferVar (RB.snoc ib.header.id (ib.header, ib.body)) traceWith tracer (LeiosNodeEvent Generate (EventIB ib)) - SomeAction Generate.Endorse eb -> do + SomeAction Generate.Endorse eb -> (GenEB,) $ do atomically $ modifyTVar' st.relayEBState.relayBufferVar (RB.snoc eb.id (eb.id, eb)) traceWith tracer (LeiosNodeEvent Generate (EventEB eb)) - SomeAction Generate.Vote v -> do + SomeAction Generate.Vote v -> (GenVote,) $ do atomically $ modifyTVar' st.relayVoteState.relayBufferVar (RB.snoc v.id (v.id, v)) traceWith tracer (LeiosNodeEvent Generate (EventVote v)) let LeiosNodeConfig{..} = cfg - -- TODO: more parallelism `mapM_ submitOne` will make each later submission wait. blockGenerator $ BlockGeneratorConfig{submit = mapM_ submitOne, ..} mkBuffersView :: forall m. MonadSTM m => LeiosNodeConfig -> LeiosNodeState m -> BuffersView m diff --git a/simulation/src/LeiosProtocol/Short/Sim.hs b/simulation/src/LeiosProtocol/Short/Sim.hs index a9274c0..9badbb4 100644 --- a/simulation/src/LeiosProtocol/Short/Sim.hs +++ b/simulation/src/LeiosProtocol/Short/Sim.hs @@ -117,6 +117,7 @@ traceRelayLink1 tcpprops = inputBlockPayload = 96 * 1024 , -- \^ overall size of txs to include in IBs processingQueueBound = 100 + , processingCores = Infinite , .. } diff --git a/simulation/src/LeiosProtocol/Short/SimP2P.hs b/simulation/src/LeiosProtocol/Short/SimP2P.hs index c8ae41f..e1e73d0 100644 --- a/simulation/src/LeiosProtocol/Short/SimP2P.hs +++ b/simulation/src/LeiosProtocol/Short/SimP2P.hs @@ -125,6 +125,7 @@ exampleTrace2 rng0 sliceLength p2pTopography@P2PTopography{..} = , rankingBlockPayload = 0 , inputBlockPayload = kilobytes 96 , processingQueueBound = 100 + , processingCores = Infinite , nodeId , rng } diff --git a/simulation/src/LeiosProtocol/Short/VizSim.hs b/simulation/src/LeiosProtocol/Short/VizSim.hs index 1dcf2b9..b04c64e 100644 --- a/simulation/src/LeiosProtocol/Short/VizSim.hs +++ b/simulation/src/LeiosProtocol/Short/VizSim.hs @@ -176,17 +176,20 @@ data LinkPoints {-# UNPACK #-} !Point deriving (Show) -accumNodeCpuUsage :: Time -> LeiosEvent -> Map NodeId (IntervalMap DiffTime Int) -> Map NodeId (IntervalMap DiffTime Int) -accumNodeCpuUsage (Time now) (LeiosEventNode (LabelNode nid (PraosNodeEvent (PraosNodeEventCPU task)))) = - Map.insertWith ILMap.union nid (ILMap.singleton (ClosedInterval now (now + cpuTaskDuration task)) 1) -accumNodeCpuUsage (Time now) (LeiosEventNode (LabelNode nid (LeiosNodeEventCPU task))) = +accumNodeCpuUsage :: + Time -> + NodeId -> + CPUTask -> + Map NodeId (IntervalMap DiffTime Int) -> + Map NodeId (IntervalMap DiffTime Int) +accumNodeCpuUsage (Time now) nid task = Map.insertWith ILMap.union nid (ILMap.singleton (ClosedInterval now (now + cpuTaskDuration task)) 1) -accumNodeCpuUsage _ _ = id type ChainsMap = IntMap (Chain RankingBlock) accumChains :: Time -> LeiosEvent -> ChainsMap -> ChainsMap -accumChains _ (LeiosEventNode (LabelNode nid (PraosNodeEvent (PraosNodeEventNewTip ch)))) = IMap.insert (coerce nid) ch +accumChains _ (LeiosEventNode (LabelNode nid (PraosNodeEvent (PraosNodeEventNewTip ch)))) = + IMap.insert (coerce nid) ch accumChains _ _ = id type DiffusionLatencyMap = DiffusionLatencyMap' (HeaderHash RankingBlockHeader) RankingBlockHeader @@ -383,14 +386,14 @@ leiosSimVizModel LeiosModelConfig{recentSpan} = [(msg, msgforecast, msgforecasts)] (vizMsgsInTransit vs) } - accumEventVizState now e@(LeiosEventNode (LabelNode _nodeId (LeiosNodeEventCPU _task))) vs = - vs{nodeCpuUsage = accumNodeCpuUsage now e (nodeCpuUsage vs)} + accumEventVizState now (LeiosEventNode (LabelNode nid (LeiosNodeEventCPU task))) vs = + vs{nodeCpuUsage = accumNodeCpuUsage now nid task (nodeCpuUsage vs)} accumEventVizState - now - e@( LeiosEventNode - (LabelNode _nodeId (PraosNodeEvent (PraosNodeEventCPU _task))) - ) - vs = vs{nodeCpuUsage = accumNodeCpuUsage now e (nodeCpuUsage vs)} + _now + ( LeiosEventNode + (LabelNode _nodeId (PraosNodeEvent (PraosNodeEventCPU _task))) + ) + _vs = error "PraosNodeEventCPU should not be generated by leios nodes" pruneVisState :: Time -> diff --git a/simulation/src/LeiosProtocol/SimTestRelay.hs b/simulation/src/LeiosProtocol/SimTestRelay.hs index 7b92bb6..9040667 100644 --- a/simulation/src/LeiosProtocol/SimTestRelay.hs +++ b/simulation/src/LeiosProtocol/SimTestRelay.hs @@ -138,8 +138,8 @@ relayNode let relayConsumerConfig = RelayConsumerConfig { relay = relayConfig - , headerValidationDelay = const 0.1 - , threadDelayParallel = sum >>> \d -> when (d >= 0) $ threadDelaySI d + , -- sequential validation of headers + validateHeaders = map (const 0.1) >>> sum >>> \d -> when (d >= 0) $ threadDelaySI d , headerId = testHeaderId , prioritize = sortOn (Down . testHeaderExpiry) . Map.elems , submitPolicy = SubmitAll diff --git a/simulation/src/PraosProtocol/BlockFetch.hs b/simulation/src/PraosProtocol/BlockFetch.hs index 603b14b..577affe 100644 --- a/simulation/src/PraosProtocol/BlockFetch.hs +++ b/simulation/src/PraosProtocol/BlockFetch.hs @@ -659,3 +659,20 @@ processWaiting tracer npar blocksVar waitingVar = go let chunks Nothing xs = [xs] chunks (Just m) xs = map (take m) . takeWhile (not . null) . iterate (drop m) $ xs return . mapM_ parallelDelay . chunks npar . concat . Map.elems $ toValidate + +processWaiting' :: + forall m a b. + (MonadSTM m, MonadDelay m) => + TVar m (Map ConcreteHeaderHash a) -> + TVar m (Map ConcreteHeaderHash [m b]) -> + m () +processWaiting' blocksVar waitingVar = go + where + go = forever $ join $ atomically $ do + waiting <- readTVar waitingVar + when (Map.null waiting) retry + blocks <- readTVar blocksVar + let toValidate = Map.intersection waiting blocks + when (Map.null toValidate) retry + writeTVar waitingVar $! waiting Map.\\ toValidate + return . sequence_ . concat . Map.elems $ toValidate diff --git a/simulation/src/PraosProtocol/ChainSync.hs b/simulation/src/PraosProtocol/ChainSync.hs index 614a8ca..fa1608e 100644 --- a/simulation/src/PraosProtocol/ChainSync.hs +++ b/simulation/src/PraosProtocol/ChainSync.hs @@ -7,6 +7,7 @@ {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE TypeFamilies #-} @@ -18,7 +19,7 @@ import Chan (Chan) import ChanDriver (ProtocolMessage, chanDriver) import Control.Exception (assert) import Control.Monad (void) -import Control.Tracer (Tracer, traceWith) +import Control.Tracer (Tracer) import Data.Maybe (fromMaybe) import Data.Type.Equality ((:~:) (Refl)) import Network.TypedProtocol ( @@ -147,8 +148,9 @@ type ChainSyncMessage = ProtocolMessage ChainSyncState ---- ChainSync Consumer -------------------------------- -newtype ChainConsumerState m = ChainConsumerState - { chainVar :: TVar m (Chain BlockHeader) +data ChainConsumerState m = ChainConsumerState + { chainVar :: !(TVar m (Chain BlockHeader)) + , validateHeader :: !(BlockHeader -> m ()) } runChainConsumer :: @@ -170,7 +172,7 @@ chainConsumer :: PraosConfig body -> ChainConsumerState m -> ChainConsumer 'StIdle m () -chainConsumer tracer cfg (ChainConsumerState hchainVar) = idle True +chainConsumer _tracer _cfg (ChainConsumerState{chainVar = hchainVar, ..}) = idle True where -- NOTE: The specification says to do an initial intersection with -- exponentially spaced points, and perform binary search to @@ -204,9 +206,7 @@ chainConsumer tracer cfg (ChainConsumerState hchainVar) = idle True rollForward :: BlockHeader -> ChainConsumer 'StIdle m () rollForward header = TC.Effect $ do - let !delay = cfg.headerValidationDelay header - traceWith tracer $ PraosNodeEventCPU (CPUTask delay) - threadDelaySI delay + validateHeader header atomically $ do modifyTVar' hchainVar $ Chain.addBlock header return $ idle False diff --git a/simulation/src/PraosProtocol/PraosNode.hs b/simulation/src/PraosProtocol/PraosNode.hs index a7f994e..8e67b66 100644 --- a/simulation/src/PraosProtocol/PraosNode.hs +++ b/simulation/src/PraosProtocol/PraosNode.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE BangPatterns #-} {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TypeFamilies #-} @@ -10,7 +11,7 @@ where import ChanMux import Control.Monad.Class.MonadAsync (Concurrently (..), MonadAsync (..)) -import Control.Tracer (Tracer) +import Control.Tracer (Tracer, traceWith) import Data.ByteString (ByteString) import Data.Coerce (coerce) import Data.Either (fromLeft, fromRight) @@ -57,12 +58,13 @@ preferredChain st = do -- Peer requires ChainSyncConsumer and BlockFetchConsumer addPeer :: (IsBody body, MonadSTM m, MonadDelay m) => + (BlockHeader -> m ()) -> PraosNodeState body m -> m (PraosNodeState body m, PeerId) -addPeer st = do +addPeer f st = do chainVar <- newTVarIO Chain.Genesis (blockFetchControllerState, peerId) <- BlockFetch.addPeer (asReadOnly chainVar) st.blockFetchControllerState - let chainSyncConsumerStates = Map.insert peerId (ChainConsumerState chainVar) st.chainSyncConsumerStates + let chainSyncConsumerStates = Map.insert peerId (ChainConsumerState chainVar f) st.chainSyncConsumerStates return (PraosNodeState{..}, peerId) runPeer :: @@ -138,20 +140,25 @@ setupPraosThreads :: m [Concurrently m ()] setupPraosThreads tracer cfg st0 followers peers = do (ts, f) <- BlockFetch.setupValidatorThreads tracer cfg st0.blockFetchControllerState 1 -- TODO: parameter - (map Concurrently ts ++) <$> setupPraosThreads' tracer cfg f st0 followers peers + let valHeader h = do + let !delay = cfg.headerValidationDelay h + traceWith tracer (PraosNodeEventCPU (CPUTask delay)) + threadDelaySI delay + (map Concurrently ts ++) <$> setupPraosThreads' tracer cfg valHeader f st0 followers peers setupPraosThreads' :: (IsBody body, Show body, MonadAsync m, MonadSTM m, MonadDelay m) => Tracer m (PraosNodeEvent body) -> PraosConfig body -> + (BlockHeader -> m ()) -> (Block body -> m () -> m ()) -> PraosNodeState body m -> [Praos body (Chan m)] -> [Praos body (Chan m)] -> m [Concurrently m ()] -setupPraosThreads' tracer cfg submitFetchedBlock st0 followers peers = do +setupPraosThreads' tracer cfg valHeader submitFetchedBlock st0 followers peers = do (st1, followerIds) <- repeatM addFollower (length followers) st0 - (st2, peerIds) <- repeatM addPeer (length peers) st1 + (st2, peerIds) <- repeatM (addPeer valHeader) (length peers) st1 let controllerThread = Concurrently $ blockFetchController tracer st2.blockFetchControllerState let followerThreads = zipWith (runFollower st2) followerIds followers let peerThreads = zipWith (runPeer tracer cfg submitFetchedBlock st2) peerIds peers diff --git a/simulation/src/PraosProtocol/SimChainSync.hs b/simulation/src/PraosProtocol/SimChainSync.hs index 5945d1a..1599580 100644 --- a/simulation/src/PraosProtocol/SimChainSync.hs +++ b/simulation/src/PraosProtocol/SimChainSync.hs @@ -88,7 +88,8 @@ traceRelayLink1 tcpprops = return () where consumerNode cfg chan = do - st <- ChainConsumerState <$> newTVarIO Chain.Genesis + let valHeader = threadDelaySI . cfg.headerValidationDelay + st <- ChainConsumerState <$> newTVarIO Chain.Genesis <*> pure valHeader let nullTracer = Tracer $ const $ return () runChainConsumer nullTracer cfg chan st producerNode chan = do