Skip to content

Commit

Permalink
Add the notion of node bandwidth
Browse files Browse the repository at this point in the history
This has influence on the how long it takes for a node to receive a message.
  • Loading branch information
dnadales committed Jun 19, 2024
1 parent b7a0783 commit 156e2e9
Showing 1 changed file with 72 additions and 11 deletions.
83 changes: 72 additions & 11 deletions leios-sim/src/Leios/Model.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@
-- ===========
--
-- \* ✅ Implement FFD policy.
-- \* Add the notion of ~~capacity~~ bandwidth.
-- \* Add the notion of ~~capacity~~ bandwidth.
-- \* Check that after adding capacity when blocks are queued we still deliver the freshest first.
-- \* Add EB production only with IBs so that λ parameter becomes relevant.
-- \* ⭐ Connect with the simulation front end and run.
-- \* Add other plots: eg latency distribution.
-- \* ...
-- \* Implement other roles/phase.
--
-- TODOs:
-- =====
--
-- \* Make the IB size configurable.
module Leios.Model where

import Prelude hiding (init)
Expand All @@ -45,6 +50,18 @@ import GHC.Generics (Generic)
import Leios.Trace (mkTracer)
import Text.Pretty.Simple (pPrint)

--------------------------------------------------------------------------------
-- FIXME: constants that should be configurable
--------------------------------------------------------------------------------

gNodeBandwidth = BitsPerSecond 100

gIBSize = NumberOfBits 300

--------------------------------------------------------------------------------
-- END FIXME: constants that should be configurable
--------------------------------------------------------------------------------

data RoleType = IBRole | EBRole | Vote1Role | Vote2Role
deriving (Show, Eq, Generic)
deriving anyclass (Aeson.ToJSON, Aeson.FromJSON)
Expand All @@ -59,16 +76,20 @@ newtype Slot = Slot Word
deriving anyclass (Aeson.ToJSON, Aeson.FromJSON)
deriving newtype (Enum)

tickSlot :: Slot -> Slot
tickSlot = succ

data IB
= IB
{ nodeId :: NodeId
, slot :: Slot
, ib_size :: NumberOfBits
}
deriving (Show, Eq, Generic)
deriving anyclass (Aeson.ToJSON, Aeson.FromJSON)

tickSlot :: Slot -> Slot
tickSlot = succ
instance HasSizeInBits IB where
size = ib_size

run ::
forall m.
Expand All @@ -82,8 +103,9 @@ run ::
Tracer m LeiosEvent ->
m ()
run tracer = do
world <- init
raceAll [register i world >> node i tracer schedule world | i <- [0 .. 2]]
owparamsTV <- newTVarIO $ OWParams $ gNodeBandwidth
world <- init owparamsTV
raceAll [register i world >> node i tracer schedule world | i <- [0 .. 1]]
where
-- TODO: we need to find a more general and realistic way to model the schedule.
--
Expand Down Expand Up @@ -134,7 +156,7 @@ node nodeId tracer schedule world = do
hasIBRole = schedule IBRole nodeId

produceIB slot = do
let newIB = IB nodeId slot
let newIB = IB nodeId slot gIBSize
traceWith tracer (ProducedIB newIB)
MsgIB newIB `sendTo` world

Expand Down Expand Up @@ -206,17 +228,29 @@ data Clock m
--------------------------------------------------------------------------------

data OutsideWorld m
= OutsideWorld {pqsTVar :: TVar m (Map NodeId (PQueueTVar m))}
= OutsideWorld
{ pqsTVar :: TVar m (Map NodeId (PQueueTVar m))
, paramsTV :: TVar m OWParams
}

data OWParams = OWParams {nodeBandwidth :: BitsPerSecond}

newtype BitsPerSecond = BitsPerSecond Word
deriving (Eq, Show, Generic, Ord)

data Msg = MsgIB {msgIB :: IB}

instance HasSizeInBits Msg where
size (MsgIB ib) = size ib

init ::
forall m.
(Monad m, MonadSTM m, Applicative m) =>
TVar m OWParams ->
m (OutsideWorld m)
init = do
init paramsTV = do
pqsTVar <- newTVarIO Map.empty
pure $ OutsideWorld{pqsTVar = pqsTVar}
pure $ OutsideWorld{pqsTVar = pqsTVar, paramsTV = paramsTV}

-- | .
--
Expand Down Expand Up @@ -252,13 +286,17 @@ sendTo msg world = atomically $ do
-- | ...
--
-- PRECONDITION: The node ID must exist.
receiveFrom :: forall m. MonadSTM m => NodeId -> OutsideWorld m -> m Msg
receiveFrom :: forall m. (MonadSTM m, MonadDelay m) => NodeId -> OutsideWorld m -> m Msg
receiveFrom nodeId world = do
m_pqTVar <- fmap (Map.lookup nodeId) $ atomically $ readTVar (pqsTVar world)
case m_pqTVar of
Nothing -> error $ "Node " <> show nodeId <> " does not exist."
Just pqTVar -> do
pop pqTVar
msg <- pop pqTVar
owNodeBandwidth <- nodeBandwidth <$> (atomically $ readTVar (paramsTV world))
let msToInt (Microseconds ms) = fromIntegral ms
threadDelay $ msToInt $ transmissionTime (size msg) owNodeBandwidth
pure msg

--------------------------------------------------------------------------------
-- A priority queue inside a transactional var
Expand Down Expand Up @@ -292,3 +330,26 @@ pop pqTVar = atomically $ do
Just (PMsg msg) -> do
writeTVar (getTQueueTVar pqTVar) (PQueue.deleteMax queue)
pure msg

--------------------------------------------------------------------------------
-- Byte Size Calculations
--------------------------------------------------------------------------------

class HasSizeInBits a where
size :: a -> NumberOfBits

newtype NumberOfBits = NumberOfBits Word
deriving (Generic, Show, Eq, Ord)
deriving anyclass (Aeson.ToJSON, Aeson.FromJSON)

newtype Microseconds = Microseconds Word
deriving (Generic, Show, Eq, Ord)

-- | .
--
-- PRECONDITION: rate =/ 0.
transmissionTime :: NumberOfBits -> BitsPerSecond -> Microseconds
transmissionTime (NumberOfBits nr_bits) (BitsPerSecond rate) = Microseconds $ ceiling $ seconds * 1_000_000
where
seconds :: Double
seconds = (fromIntegral nr_bits) / (fromIntegral rate)

0 comments on commit 156e2e9

Please sign in to comment.