Skip to content

Commit

Permalink
simulation: TaskMultiQueue: labeled task/job queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Saizan committed Dec 18, 2024
1 parent a51544e commit 453bab2
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 0 deletions.
1 change: 1 addition & 0 deletions simulation/ouroboros-leios-sim.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ library
LeiosProtocol.Short.VizSim
LeiosProtocol.Short.VizSimP2P
LeiosProtocol.SimTestRelay
LeiosProtocol.TaskMultiQueue
LeiosProtocol.VizSimTestRelay
ModelTCP
P2P
Expand Down
58 changes: 58 additions & 0 deletions simulation/src/LeiosProtocol/TaskMultiQueue.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TupleSections #-}

module LeiosProtocol.TaskMultiQueue where

import Control.Monad (forM, forM_, when)
import Control.Tracer
import Data.Array
import qualified Data.Map.Strict as Map
import GHC.Natural
import LeiosProtocol.Common
import STMCompat

type IsLabel lbl = (Ix lbl, Bounded lbl)

newtype TaskMultiQueue lbl m = TaskMultiQueue (Array lbl (TBQueue m (CPUTask, m ())))

newTaskMultiQueue' :: (MonadSTM m, Ix l) => (l, l) -> Natural -> STM m (TaskMultiQueue l m)
newTaskMultiQueue' (a, b) n =
TaskMultiQueue . listArray (a, b) <$> mapM (const $ newTBQueue n) (range (a, b))

newTaskMultiQueue :: (MonadSTM m, IsLabel l) => Natural -> STM m (TaskMultiQueue l m)
newTaskMultiQueue = newTaskMultiQueue' (minBound, maxBound)

writeTMQueue :: (MonadSTM m, IsLabel l) => TaskMultiQueue l m -> l -> (CPUTask, m ()) -> STM m ()
writeTMQueue (TaskMultiQueue mq) lbl task = writeTBQueue (mq ! lbl) task

readTMQueue :: forall m l. (MonadSTM m, IsLabel l) => TaskMultiQueue l m -> l -> STM m (CPUTask, m ())
readTMQueue (TaskMultiQueue mq) lbl = readTBQueue (mq ! lbl)

flushTMQueue :: forall m l. (MonadSTM m, IsLabel l) => TaskMultiQueue l m -> STM m [(l, [(CPUTask, m ())])]
flushTMQueue (TaskMultiQueue mq) = forM (assocs mq) (\(l, q) -> (l,) <$> flushTBQueue q)

runInfParallelBlocking ::
forall m l.
(MonadSTM m, MonadDelay m, IsLabel l, MonadMonotonicTime m) =>
Tracer m CPUTask ->
TaskMultiQueue l m ->
m ()
runInfParallelBlocking tracer mq = do
xs <- atomically $ do
xs <- concat . map snd <$> flushTMQueue mq
when (null xs) retry
return xs
mapM_ (traceWith tracer . fst) xs
now <- getMonotonicTime

let tasksByEnd = Map.fromListWith (<>) [(addTime cpuTaskDuration now, [m]) | (CPUTask{..}, m) <- xs]

forM_ (Map.toAscList tasksByEnd) $ \(end, ms) -> do
waitUntil end
sequence_ ms
8 changes: 8 additions & 0 deletions simulation/src/TimeCompat.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ module TimeCompat (
diffUTCTime,
threadDelaySI,
threadDelayNDT,
waitUntil,
)
where

import Control.Monad (when)
import Control.Monad.Class.MonadTime.SI (
DiffTime,
MonadMonotonicTime (getMonotonicTime),
Expand All @@ -39,3 +41,9 @@ threadDelaySI = threadDelay . round . (* 1e6)

threadDelayNDT :: MonadDelay m => NominalDiffTime -> m ()
threadDelayNDT = threadDelay . round . (* 1e6)

waitUntil :: (MonadMonotonicTime m, MonadDelay m) => Time -> m ()
waitUntil endtime = do
now <- getMonotonicTime
let delay = endtime `diffTime` now
when (delay > 0) (threadDelaySI delay)

0 comments on commit 453bab2

Please sign in to comment.