Skip to content

Commit

Permalink
Sampling block diffusion from 1000 nodes praos sim (#58)
Browse files Browse the repository at this point in the history
Co-authored-by: Wen Kokke <[email protected]>
  • Loading branch information
Saizan and wenkokke authored Oct 31, 2024
1 parent efa02c0 commit 163f7ed
Show file tree
Hide file tree
Showing 20 changed files with 705 additions and 251 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ jobs:
- name: Run tests
run: cabal test all

fourmolu-check:
name: Check Haskell sources with fourmolu
runs-on: ubuntu-22.04
steps:
# Note that you must checkout your code before running haskell-actions/run-fourmolu
- uses: actions/checkout@v4
- uses: haskell-actions/run-fourmolu@v11
with:
version: "0.15.0.0"

build-docusaurus:
runs-on: ubuntu-22.04
steps:
Expand Down
7 changes: 7 additions & 0 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,10 @@ source-repository-package
ntp-client
cardano-client

source-repository-package
type: git
location: https://github.com/Saizan/io-sim.git
tag: 2ea49cd65ae82ec11826e2c966d77ffb877bca8c
subdir:
io-sim
io-classes
34 changes: 34 additions & 0 deletions hooks/pre-commit
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/bin/sh

# To install as a Git pre-commit hook, run:
#
# > ln hooks/pre-commit .git/hooks/pre-commit
#

# Check for fourmolu
fourmolu_required_version="0.15.0.0"
fourmolu="$(which fourmolu)"
if [ "${fourmolu}" = "" ]; then
echo "pre-commit: Requires fourmolu version ${fourmolu_required_version}; no version found"
exit 1
fi
fourmolu_installed_version="$($fourmolu --version | head -n 1 | cut -d' ' -f2)"
if [ ! "${fourmolu_installed_version}" = "${fourmolu_required_version}" ]; then
echo "pre-commit: Requires fourmolu version ${fourmolu_required_version}; found version ${fourmolu_installed_version}"
exit 1
fi

# Check for unstaged Haskell files
unstaged_haskell_files="$(git ls-files --exclude-standard --no-deleted --deduplicate --modified '*.hs')"
if [ ! "${unstaged_haskell_files}" = "" ]; then
echo "pre-commit: Found unstaged Haskell files"
echo "${unstaged_haskell_files}"
exit 1
fi

# Check Haskell files with fourmolu
echo "Formatting Haskell source files with fourmolu version ${fourmolu_required_version}"
if ! git ls-files --exclude-standard --no-deleted --deduplicate '*.hs' | xargs -L 1 fourmolu --mode=check --quiet; then
git ls-files --exclude-standard --no-deleted --deduplicate '*.hs' | xargs -L 1 fourmolu --mode=inplace --quiet
exit 1
fi
15 changes: 14 additions & 1 deletion simulation/ouroboros-leios-sim.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,18 @@ library
PraosProtocol.Common
PraosProtocol.Common.AnchoredFragment
PraosProtocol.Common.Chain
PraosProtocol.ExamplesPraosP2P
PraosProtocol.PraosNode
PraosProtocol.SimBlockFetch
PraosProtocol.SimChainSync
PraosProtocol.SimPraos
PraosProtocol.SimPraosP2P
PraosProtocol.ExamplesPraosP2P
PraosProtocol.VizSimBlockFetch
PraosProtocol.VizSimChainSync
PraosProtocol.VizSimPraos
PraosProtocol.VizSimPraosP2P
RelayProtocol
Sample
SimRelay
SimRelayP2P
SimTCPLinks
Expand All @@ -66,6 +67,7 @@ library

-- other-extensions:
build-depends:
, aeson
, array
, base
, bytestring
Expand Down Expand Up @@ -106,3 +108,14 @@ executable viz

default-language: Haskell2010
ghc-options: -Wall

executable sample
main-is: src/SampleMain.hs
build-depends:
, base
, filepath
, optparse-applicative
, ouroboros-leios-sim

default-language: Haskell2010
ghc-options: -Wall
1 change: 1 addition & 0 deletions simulation/src/ChanDriver.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
{-# LANGUAGE QuantifiedConstraints #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeAbstractions #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE UndecidableInstances #-}
Expand Down
86 changes: 51 additions & 35 deletions simulation/src/PraosProtocol/BlockFetch.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingVia #-}
Expand All @@ -13,6 +14,7 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE UndecidableInstances #-}
Expand All @@ -36,6 +38,7 @@ import Control.Concurrent.Class.MonadSTM (
)
import Control.Exception (assert)
import Control.Monad (forM, forever, guard, unless, void, when, (<=<))
import Control.Tracer (Tracer, traceWith)
import Data.Bifunctor (second)
import qualified Data.List as List
import Data.Map.Strict (Map)
Expand Down Expand Up @@ -239,16 +242,24 @@ data BlockFetchConsumerState m = BlockFetchConsumerState
, removeInFlight :: [Point Block] -> m ()
}

runBlockFetchConsumer :: MonadSTM m => Chan m BlockFetchMessage -> BlockFetchConsumerState m -> m ()
runBlockFetchConsumer chan blockFetchConsumerState =
void $ runPeerWithDriver (chanDriver decideBlockFetchState chan) (blockFetchConsumer blockFetchConsumerState)
runBlockFetchConsumer ::
(MonadSTM m, MonadDelay m) =>
Tracer m PraosNodeEvent ->
PraosConfig ->
Chan m BlockFetchMessage ->
BlockFetchConsumerState m ->
m ()
runBlockFetchConsumer tracer cfg chan blockFetchConsumerState =
void $ runPeerWithDriver (chanDriver decideBlockFetchState chan) (blockFetchConsumer tracer cfg blockFetchConsumerState)

blockFetchConsumer ::
forall m.
MonadSTM m =>
(MonadSTM m, MonadDelay m) =>
Tracer m PraosNodeEvent ->
PraosConfig ->
BlockFetchConsumerState m ->
TC.Client BlockFetchState NonPipelined StIdle m ()
blockFetchConsumer st = idle
blockFetchConsumer tracer cfg st = idle
where
-- does not support preemption of in-flight requests.
blockRequest :: STM m (AnchoredFragment BlockHeader)
Expand Down Expand Up @@ -278,22 +289,18 @@ blockFetchConsumer st = idle
streaming range headers = TC.Await $ \msg ->
case (msg, headers) of
(MsgBatchDone, []) -> idle
(MsgBlock block, header : headers') -> TC.Effect $ do
ifValidBlockBody
header
block
( do
st.addFetchedBlock (Block header block)
return (streaming range headers')
)
(error $ "blockFetchConsumer: invalid block\n" ++ show (Block header block)) -- TODO
(MsgBlock body, header : headers') -> TC.Effect $ do
let block = Block header body
traceWith tracer $ PraosNodeEventReceived block
threadDelaySI (cfg.blockValidationDelay block)
if blockInvariant block
then do
st.addFetchedBlock block
traceWith tracer (PraosNodeEventEnterState block)
return (streaming range headers')
else error $ "blockFetchConsumer: invalid block\n" ++ show block -- TODO
(MsgBatchDone, _ : _) -> TC.Effect $ error "TooFewBlocks" -- TODO?
(MsgBlock _, []) -> TC.Effect $ error "TooManyBlocks" -- TODO?
ifValidBlockBody hdr bdy t f = do
-- TODO: threadDelay
if blockInvariant $ Block hdr bdy
then t
else f

--------------------------------------------
---- BlockFetch controller
Expand Down Expand Up @@ -377,11 +384,11 @@ newBlockFetchControllerState chain = atomically $ do
cpsVar <- newTVar $ initChainProducerState chain
return BlockFetchControllerState{..}

blockFetchController :: forall m. MonadSTM m => BlockFetchControllerState m -> m ()
blockFetchController st@BlockFetchControllerState{..} = forever (atomically makeRequests)
blockFetchController :: forall m. MonadSTM m => Tracer m PraosNodeEvent -> BlockFetchControllerState m -> m ()
blockFetchController tracer st@BlockFetchControllerState{..} = forever makeRequests
where
makeRequests :: STM m ()
makeRequests = do
makeRequests :: m ()
makeRequests = (traceNewTip tracer =<<) . atomically $ do
let peerChainVars = (map . second) (.peerChainVar) $ Map.toList peers
mchainSwitch <- longestChainSelection peerChainVars (asReadOnly cpsVar) blockHeader
case mchainSwitch of
Expand All @@ -390,13 +397,14 @@ blockFetchController st@BlockFetchControllerState{..} = forever (atomically make
blocks <- readTVar blocksVar
chain <- chainState <$> readTVar cpsVar
let chainUpdate = initMissingBlocksChain blocks chain fragment
useful <- updateChains st chainUpdate
(useful, mtip) <- updateChains st chainUpdate
whenMissing chainUpdate $ \_missingChain -> do
-- TODO: filterFetched could be reusing the missingChain suffix.
br <- filterInFlight <=< filterFetched $ fragment
if null br.blockRequestFragments
then unless useful retry
else addRequest peerId br
return mtip

filterFetched :: AnchoredFragment BlockHeader -> STM m BlockRequest
filterFetched fr = do
Expand Down Expand Up @@ -504,24 +512,26 @@ updateChains ::
MonadSTM m =>
BlockFetchControllerState m ->
ChainsUpdate ->
STM m Bool
STM m (Bool, Maybe FullTip)
updateChains BlockFetchControllerState{..} e =
case e of
FullChain fullChain -> do
writeTVar targetChainVar Nothing
let !newTip = fullTip fullChain
modifyTVar' cpsVar (switchFork fullChain)
return True
return (True, Just newTip)
ImprovedPrefix missingChain -> do
writeTVar targetChainVar (Just missingChain)
let improvedChain = fromMaybe (error "prefix not from Genesis") $ Chain.fromAnchoredFragment missingChain.prefix
!newTip = fullTip improvedChain
modifyTVar' cpsVar (switchFork improvedChain)
return True
return (True, Just $ newTip)
SamePrefix missingChain -> do
target <- readTVar targetChainVar
let useful = Just (headPointMChain missingChain) /= fmap headPointMChain target
when useful $ do
writeTVar targetChainVar (Just missingChain)
return useful
return (useful, Nothing)

-----------------------------------------------------------
---- Methods for blockFetchConsumer and blockFetchProducer
Expand All @@ -536,16 +546,22 @@ removeInFlight BlockFetchControllerState{..} pId points = do
-- * removes block from PeerId's in-flight set
-- * adds block to blocksVar
-- * @fillInBlocks@ on @selectedChain@, and @updateChains@
addFetchedBlock :: MonadSTM m => BlockFetchControllerState m -> PeerId -> Block -> STM m ()
addFetchedBlock st pId blk = do
addFetchedBlock :: MonadSTM m => Tracer m PraosNodeEvent -> BlockFetchControllerState m -> PeerId -> Block -> m ()
addFetchedBlock tracer st pId blk = (traceNewTip tracer =<<) . atomically $ do
removeInFlight st pId [blockPoint blk]
modifyTVar' st.blocksVar (Map.insert (blockHash blk) blk)

selected <- readTVar st.targetChainVar
case selected of
Nothing -> return () -- I suppose we do not need this block anymore.
Nothing -> return Nothing -- I suppose we do not need this block anymore.
Just missingChain -> do
void $ updateChains st =<< fillInBlocks <$> readTVar st.blocksVar <*> pure missingChain
fmap snd $ updateChains st =<< fillInBlocks <$> readTVar st.blocksVar <*> pure missingChain

traceNewTip :: Monad m => Tracer m PraosNodeEvent -> Maybe FullTip -> m ()
traceNewTip tracer x =
case x of
Nothing -> return ()
(Just tip) -> traceWith tracer (PraosNodeEventNewTip tip)

addProducedBlock :: MonadSTM m => BlockFetchControllerState m -> Block -> STM m ()
addProducedBlock BlockFetchControllerState{..} blk = do
Expand All @@ -565,9 +581,9 @@ blockRequestVarForPeerId peerId blockFetchControllerState =
Nothing -> error $ "blockRequestVarForPeerId: no peer with id " <> show peerId
Just peerStatus -> peerStatus.blockRequestVar

initBlockFetchConsumerStateForPeerId :: MonadSTM m => PeerId -> BlockFetchControllerState m -> BlockFetchConsumerState m
initBlockFetchConsumerStateForPeerId peerId blockFetchControllerState =
initBlockFetchConsumerStateForPeerId :: MonadSTM m => Tracer m PraosNodeEvent -> PeerId -> BlockFetchControllerState m -> BlockFetchConsumerState m
initBlockFetchConsumerStateForPeerId tracer peerId blockFetchControllerState =
BlockFetchConsumerState
(blockRequestVarForPeerId peerId blockFetchControllerState)
(atomically . addFetchedBlock blockFetchControllerState peerId)
(addFetchedBlock tracer blockFetchControllerState peerId)
(atomically . removeInFlight blockFetchControllerState peerId)
42 changes: 22 additions & 20 deletions simulation/src/PraosProtocol/BlockGeneration.hs
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE ScopedTypeVariables #-}

module PraosProtocol.BlockGeneration where

import Cardano.Slotting.Slot (WithOrigin (..))
import ChanTCP (Bytes)
import Control.Concurrent.Class.MonadSTM (
MonadSTM (..),
)
import Control.Monad (forever, when)
import Control.Monad.Class.MonadTimer.SI (MonadDelay)
import Control.Monad (forever)
import Control.Tracer
import Data.ByteString as BS
import Data.ByteString.Char8 as BS8
import System.Random (StdGen, uniformR)

import Cardano.Slotting.Slot (WithOrigin (..))

import ChanTCP (Bytes)
import Data.Word (Word64)

import PraosProtocol.Common
import qualified PraosProtocol.Common.Chain as Chain
import System.Random (StdGen, uniformR)

-- | Returns a block that can extend the chain.
-- PRECONDITION: the SlotNo is ahead of the chain tip.
Expand Down Expand Up @@ -73,27 +71,31 @@ mkNextBlock (PoissonGenerationPattern sz rng0 lambda) prefix = do

blockGenerator ::
(MonadSTM m, MonadDelay m, MonadTime m) =>
SlotConfig ->
Tracer m PraosNodeEvent ->
PraosConfig ->
TVar m (ChainProducerState Block) ->
(Block -> STM m ()) ->
Maybe (m (SlotNo, BlockBody)) ->
m ()
blockGenerator _slotConfig _cpsVar _addBlockSt Nothing = return ()
blockGenerator slotConfig cpsVar addBlockSt (Just nextBlock) = forever $ go
blockGenerator _tracer _praosConfig _cpsVar _addBlockSt Nothing = return ()
blockGenerator tracer praosConfig cpsVar addBlockSt (Just nextBlock) = forever $ go
where
go = do
(sl, body) <- nextBlock
waitForSlot sl
atomically $ do
mblk <- atomically $ do
chain <- chainState <$> readTVar cpsVar
when (Chain.headSlot chain <= At sl) $
addBlockSt (mkBlock chain sl body)
let block = mkBlock chain sl body
if (Chain.headSlot chain <= At sl)
then
addBlockSt block >> return (Just block)
else return Nothing
case mblk of
Nothing -> return ()
Just blk -> do
traceWith tracer (PraosNodeEventGenerate blk)
traceWith tracer (PraosNodeEventNewTip (FullTip (blockHeader blk)))
waitForSlot sl = do
let tgt = slotTime slotConfig sl
let tgt = slotTime praosConfig.slotConfig sl
now <- getCurrentTime
threadDelayNDT (tgt `diffUTCTime` now)

slotConfigFromNow :: MonadTime m => m SlotConfig
slotConfigFromNow = do
start <- getCurrentTime
return $ SlotConfig{start, duration = 1}
Loading

0 comments on commit 163f7ed

Please sign in to comment.