Skip to content

Commit

Permalink
Bounded parallelism and CPU Cores usage chart (#117)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Duncan Coutts <[email protected]>
  • Loading branch information
Saizan and dcoutts authored Dec 20, 2024
1 parent 827cfc7 commit 6da0918
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 47 deletions.
1 change: 1 addition & 0 deletions simulation/ouroboros-leios-sim.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ library
VizSimRelayP2P
VizSimTCP
VizUtils
WorkerPool

-- other-extensions:
build-depends:
Expand Down
28 changes: 24 additions & 4 deletions simulation/src/LeiosProtocol/Short/Node.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ import Control.Exception (assert)
import Control.Monad (forever, guard, when)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadThrow
import Control.Tracer
import Data.Bifunctor
import Data.Coerce (coerce)
import Data.Foldable (forM_)
import Data.Ix (Ix)
import Data.Ix (Ix, range)
import Data.List (sort, sortOn)
import Data.Map (Map)
import qualified Data.Map.Strict as Map
Expand All @@ -48,6 +49,7 @@ import qualified PraosProtocol.Common.Chain as Chain
import qualified PraosProtocol.PraosNode as PraosNode
import STMCompat
import System.Random
import WorkerPool

--------------------------------------------------------------
---- Events
Expand Down Expand Up @@ -297,7 +299,15 @@ newLeiosNodeState cfg = do

leiosNode ::
forall m.
(MonadMVar m, MonadFork m, MonadAsync m, MonadSTM m, MonadTime m, MonadDelay m, MonadMonotonicTimeNSec m) =>
( MonadMVar m
, MonadFork m
, MonadAsync m
, MonadSTM m
, MonadTime m
, MonadDelay m
, MonadMonotonicTimeNSec m
, MonadCatch m
) =>
Tracer m LeiosNodeEvent ->
LeiosNodeConfig ->
[Leios (Chan m)] ->
Expand Down Expand Up @@ -395,13 +405,23 @@ leiosNode tracer cfg followers peers = do
]

processCPUTasks ::
(MonadSTM m, MonadDelay m, MonadMonotonicTimeNSec m) =>
(MonadSTM m, MonadDelay m, MonadMonotonicTimeNSec m, MonadFork m, MonadAsync m, MonadCatch m) =>
NumCores ->
Tracer m CPUTask ->
TaskMultiQueue LeiosNodeTask m ->
m ()
processCPUTasks Infinite tracer queue = forever $ runInfParallelBlocking tracer queue
processCPUTasks (Finite _) _ _ = error "TBD"
processCPUTasks (Finite n) tracer queue = newBoundedWorkerPool n [taskSource l | l <- range (minBound, maxBound)]
where
taskSource l = do
(cpu, m) <- readTMQueue queue l
var <- newEmptyTMVar
let action = do
traceWith tracer cpu
threadDelay (cpuTaskDuration cpu)
m
-- TODO: read from var and log exception.
return $ Task action var

computeLedgerStateThread ::
forall m.
Expand Down
6 changes: 3 additions & 3 deletions simulation/src/LeiosProtocol/Short/SimP2P.hs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ traceLeiosP2P
linkTracer nfrom nto =
contramap (LeiosEventTcp . labelDirToLabelLink nfrom nto) tracer

exampleTrace2 :: StdGen -> Int -> P2PTopography -> LeiosTrace
exampleTrace2 rng0 sliceLength p2pTopography@P2PTopography{..} =
exampleTrace2 :: StdGen -> Int -> P2PTopography -> NumCores -> LeiosTrace
exampleTrace2 rng0 sliceLength p2pTopography@P2PTopography{..} processingCores =
traceLeiosP2P
rng0
p2pTopography
Expand All @@ -125,7 +125,7 @@ exampleTrace2 rng0 sliceLength p2pTopography@P2PTopography{..} =
, rankingBlockPayload = 0
, inputBlockPayload = kilobytes 96
, processingQueueBound = 100
, processingCores = Infinite
, processingCores
, nodeId
, rng
}
Expand Down
9 changes: 5 additions & 4 deletions simulation/src/LeiosProtocol/Short/VizSim.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import GHC.Records
import qualified Graphics.Rendering.Cairo as Cairo
import LeiosProtocol.Common hiding (Point)
import LeiosProtocol.Relay (Message (MsgRespondBodies), RelayMessage, relayMessageLabel)
import LeiosProtocol.Short.Node (BlockEvent (..), LeiosEventBlock (..), LeiosMessage (..), LeiosNodeEvent (..), RelayEBMessage, RelayIBMessage, RelayVoteMessage)
import LeiosProtocol.Short.Node (BlockEvent (..), LeiosEventBlock (..), LeiosMessage (..), LeiosNodeEvent (..), NumCores (Infinite), RelayEBMessage, RelayIBMessage, RelayVoteMessage)
import LeiosProtocol.Short.Sim (LeiosEvent (..), LeiosTrace, exampleTrace1)
import ModelTCP
import Network.TypedProtocol
Expand All @@ -52,7 +52,7 @@ example1 =
Layout $
leiosSimVizRender examplesLeiosSimVizConfig
where
model = leiosSimVizModel (LeiosModelConfig 5) trace
model = leiosSimVizModel (LeiosModelConfig 5 Infinite) trace
where
trace = exampleTrace1

Expand Down Expand Up @@ -229,9 +229,10 @@ accumDiffusionLatency' now _nid EnterState msgid _msg vs =
vs
accumDiffusionLatency' _now _nid _event _id _msg vs = vs

newtype LeiosModelConfig = LeiosModelConfig
{ recentSpan :: DiffTime
data LeiosModelConfig = LeiosModelConfig
{ recentSpan :: !DiffTime
-- ^ length of time the Recent* maps should cover
, numCores :: !NumCores
}

-- | Make the vizualisation model for the relay simulation from a simulation
Expand Down
102 changes: 76 additions & 26 deletions simulation/src/LeiosProtocol/Short/VizSimP2P.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import Data.Array.Unboxed (Ix, UArray, accumArray, (!))
import Data.Bifunctor (second)
import qualified Data.Colour.SRGB as Colour
import Data.Hashable (hash)
import qualified Data.IntervalMap.Strict as ILMap
import Data.List (foldl', intercalate, sortOn)
import qualified Data.Map.Strict as Map
import Data.Maybe (catMaybes, fromMaybe, maybeToList)
Expand Down Expand Up @@ -51,7 +52,7 @@ import Network.TypedProtocol
import P2P
import PraosProtocol.BlockFetch (Message (..))
import PraosProtocol.PraosNode (PraosMessage (..))
import SimTypes (Point (..), WorldShape (..))
import SimTypes (NodeId (..), Point (..), WorldShape (..))
import System.Random (uniformR)
import qualified System.Random as Random
import System.Random.Stateful (mkStdGen)
Expand Down Expand Up @@ -507,6 +508,8 @@ chartBandwidth LeiosModelConfig{recentSpan} =
where
recentPlot lbl color maps =
[ bandwidthHistPlot
maxX
(0, maxX)
lbl
color
( map
Expand Down Expand Up @@ -540,24 +543,68 @@ chartBandwidth LeiosModelConfig{recentSpan} =
]
maxX :: Num a => a
maxX = 150
bandwidthHistPlot title color values =
Chart.histToPlot $
Chart.defaultNormedPlotHist
{ Chart._plot_hist_title = title
, Chart._plot_hist_values = values
, Chart._plot_hist_range = Just (0, maxX)
, Chart._plot_hist_bins = maxX
, Chart._plot_hist_fill_style =
Chart.def
{ Chart._fill_color =
Chart.withOpacity color 0.4
}
, Chart._plot_hist_line_style =
Chart.def
{ Chart._line_color =
Chart.opaque color
}
}

bandwidthHistPlot :: RealFrac x => Int -> (x, x) -> String -> Chart.Colour Double -> [x] -> Chart.Plot x Double
bandwidthHistPlot maxX range title color values =
Chart.histToPlot $
Chart.defaultNormedPlotHist
{ Chart._plot_hist_title = title
, Chart._plot_hist_values = values
, Chart._plot_hist_range = Just range
, Chart._plot_hist_bins = maxX
, Chart._plot_hist_fill_style =
Chart.def
{ Chart._fill_color =
Chart.withOpacity color 0.4
}
, Chart._plot_hist_line_style =
Chart.def
{ Chart._line_color =
Chart.opaque color
}
}

chartCPUUsage :: LeiosModelConfig -> VizRender LeiosSimVizModel
chartCPUUsage LeiosModelConfig{numCores} =
chartVizRender 25 $
\(Time now)
_
( SimVizModel
_
vs
) ->
let
numNodes = Map.size vs.vizNodePos
maxCPUs = case numCores of
Infinite -> 20
Finite n -> n
values =
[ (nid, [(n, if n <= 5 then "" else show n)])
| (NodeId nid, m) <- Map.toList vs.nodeCpuUsage
, let n = sum . ILMap.elems . flip ILMap.containing now $ m
]
in
(Chart.def :: Chart.Layout Double Double)
{ Chart._layout_title = "Instantaneous CPU Usage per Node"
, Chart._layout_title_style = Chart.def{Chart._font_size = 15}
, Chart._layout_x_axis =
Chart.def
{ Chart._laxis_generate =
Chart.scaledIntAxis
Chart.defaultIntAxis{Chart._la_nLabels = 10}
(0, numNodes - 1)
, Chart._laxis_title = "Node #"
}
, Chart._layout_y_axis =
Chart.def
{ Chart._laxis_generate =
Chart.scaledIntAxis Chart.defaultIntAxis{Chart._la_nLabels = 5} (0, maxCPUs)
, Chart._laxis_title = "CPUs in use"
}
, Chart._layout_plots =
[ Chart.plotBars (Chart.def{Chart._plot_bars_values_with_labels = values})
]
}

chartLinkUtilisation :: VizRender LeiosSimVizModel
chartLinkUtilisation =
Expand Down Expand Up @@ -637,15 +684,15 @@ isRelayMessageControl (ProtocolMessage (SomeMessage msg)) = case msg of
_otherwise -> True

-- | takes stage length, assumes pipelines start at Slot 0.
defaultVizConfig :: Int -> LeiosP2PSimVizConfig
defaultVizConfig stageLength =
defaultVizConfig :: Int -> NumCores -> LeiosP2PSimVizConfig
defaultVizConfig stageLength numCores =
LeiosP2PSimVizConfig
{ nodeMessageColor = testNodeMessageColor
, ptclMessageColor = testPtclMessageColor
, voteColor = toSRGB . voteColor
, ebColor = toSRGB . ebColor
, ibColor = toSRGB . pipelineColor Propose . (hash . (.id) &&& (.slot))
, model = LeiosModelConfig{recentSpan = fromIntegral stageLength}
, model = LeiosModelConfig{recentSpan = fromIntegral stageLength, numCores}
}
where
testPtclMessageColor ::
Expand Down Expand Up @@ -706,8 +753,8 @@ blendColors (x : xs) = foldl' (Dia.blend 0.5) x xs
toSRGB :: Dia.Colour Double -> (Double, Double, Double)
toSRGB (Dia.toSRGB -> Dia.RGB r g b) = (r, g, b)

example2 :: Int -> Int -> Maybe P2PTopography -> Visualization
example2 seed sliceLength maybeP2PTopography =
example2 :: Int -> Int -> Maybe P2PTopography -> NumCores -> Visualization
example2 seed sliceLength maybeP2PTopography processingCores =
slowmoVisualization 0.5 $
Viz model $
LayoutAbove
Expand All @@ -734,14 +781,17 @@ example2 seed sliceLength maybeP2PTopography =
[ LayoutReqSize 350 300 $
Layout $
chartBandwidth modelConfig
, LayoutReqSize 350 300 $
Layout $
chartCPUUsage modelConfig
, LayoutReqSize 350 300 $
Layout chartLinkUtilisation
]
]
]
]
where
config = defaultVizConfig 5
config = defaultVizConfig 5 processingCores
modelConfig = config.model
rng0 = mkStdGen seed
(rng1, rng2) = Random.split rng0
Expand All @@ -758,4 +808,4 @@ example2 seed sliceLength maybeP2PTopography =
, p2pNodeLinksClose = 5
, p2pNodeLinksRandom = 5
}
model = leiosSimVizModel modelConfig (exampleTrace2 rng2 sliceLength p2pTopography)
model = leiosSimVizModel modelConfig (exampleTrace2 rng2 sliceLength p2pTopography processingCores)
21 changes: 13 additions & 8 deletions simulation/src/LeiosProtocol/TaskMultiQueue.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

module LeiosProtocol.TaskMultiQueue where

import Control.Monad (forM, forM_, when)
import Control.Monad
import Control.Monad.Class.MonadFork (MonadFork (forkIO))
import Control.Tracer
import Data.Array
import qualified Data.Map.Strict as Map
Expand Down Expand Up @@ -36,7 +37,7 @@ flushTMQueue (TaskMultiQueue mq) = forM (assocs mq) (\(l, q) -> (l,) <$> flushTB

runInfParallelBlocking ::
forall m l.
(MonadSTM m, MonadDelay m, IsLabel l, MonadMonotonicTimeNSec m) =>
(MonadSTM m, MonadDelay m, IsLabel l, MonadMonotonicTimeNSec m, MonadFork m) =>
Tracer m CPUTask ->
TaskMultiQueue l m ->
m ()
Expand All @@ -47,9 +48,13 @@ runInfParallelBlocking tracer mq = do
return xs
mapM_ (traceWith tracer . fst) xs
now <- getMonotonicTime

let tasksByEnd = Map.fromListWith (<>) [(addTime cpuTaskDuration now, [m]) | (CPUTask{..}, m) <- xs]

forM_ (Map.toAscList tasksByEnd) $ \(end, ms) -> do
waitUntil end
sequence_ ms
-- forking to do the waiting so we can go back to fetch more tasks.
-- on the worst case this forks for each task, which might degrade sim performance.
-- Andrea: a small experiment with short-leios-p2p-1 shows up to 16 tasks at once.
-- OTOH only 14% of the time we had more than 1 task.
void $ forkIO $ do
let tasksByEnd = Map.fromListWith (<>) [(addTime cpuTaskDuration now, [m]) | (CPUTask{..}, m) <- xs]

forM_ (Map.toAscList tasksByEnd) $ \(end, ms) -> do
waitUntil end
sequence_ ms
22 changes: 20 additions & 2 deletions simulation/src/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import Data.Maybe (fromMaybe)
import qualified ExamplesRelay
import qualified ExamplesRelayP2P
import qualified ExamplesTCP
import LeiosProtocol.Short.Node (NumCores (..))
import qualified LeiosProtocol.Short.VizSim as VizShortLeios
import qualified LeiosProtocol.Short.VizSimP2P as VizShortLeiosP2P
import qualified LeiosProtocol.VizSimTestRelay as VizSimTestRelay
Expand All @@ -32,6 +33,7 @@ import Options.Applicative (
optional,
prefs,
progDesc,
readerError,
short,
showHelpOnEmpty,
str,
Expand Down Expand Up @@ -176,7 +178,7 @@ data VizSubCommand
| VizRelayTest2
| VizRelayTest3
| VizShortLeios1
| VizShortLeiosP2P1 {seed :: Int, sliceLength :: Int, maybeTopologyFile :: Maybe FilePath}
| VizShortLeiosP2P1 {seed :: Int, sliceLength :: Int, maybeTopologyFile :: Maybe FilePath, numCores :: NumCores}

parserVizSubCommand :: Parser VizSubCommand
parserVizSubCommand =
Expand Down Expand Up @@ -277,6 +279,22 @@ parserShortLeiosP2P1 =
<> help "The file describing the network topology."
)
)
<*> option
readCores
( short 'N'
<> metavar "NUMBER"
<> value Infinite
<> help "number of simulated cores for node parallesim, or 'unbounded' (the default)."
)
where
readCores = unbounded <|> finite
where
unbounded = do
s <- str
if s == "unbounded" then pure Infinite else readerError "unrecognized"
finite = do
n <- auto
if n > 0 then pure (Finite n) else readerError "number of cores should be greater than 0"

vizOptionsToViz :: VizCommand -> IO Visualization
vizOptionsToViz VizCommandWithOptions{..} = case vizSubCommand of
Expand All @@ -302,7 +320,7 @@ vizOptionsToViz VizCommandWithOptions{..} = case vizSubCommand of
VizShortLeiosP2P1{..} -> do
let worldShape = WorldShape (1200, 1000) True
maybeP2PTopography <- traverse (readP2PTopography defaultParams worldShape) maybeTopologyFile
pure $ VizShortLeiosP2P.example2 seed sliceLength maybeP2PTopography
pure $ VizShortLeiosP2P.example2 seed sliceLength maybeP2PTopography numCores

type VizSize = (Int, Int)

Expand Down
Loading

0 comments on commit 6da0918

Please sign in to comment.