Skip to content

Commit

Permalink
Add EB production logic
Browse files Browse the repository at this point in the history
This commit incorporates parameters `L`, λ.
  • Loading branch information
dnadales committed Jun 23, 2024
1 parent 156e2e9 commit 39ad0a9
Showing 1 changed file with 207 additions and 25 deletions.
232 changes: 207 additions & 25 deletions leios-sim/src/Leios/Model.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
Expand All @@ -15,8 +16,8 @@
--
-- \* ✅ Implement FFD policy.
-- \* ✅ Add the notion of ~~capacity~~ bandwidth.
-- \* Check that after adding capacity when blocks are queued we still deliver the freshest first.
-- \* Add EB production only with IBs so that λ parameter becomes relevant.
-- \* Check that after adding capacity when blocks are queued we still deliver the freshest first.
-- \* Add EB production only with IBs so that λ parameter becomes relevant.
-- \* ⭐ Connect with the simulation front end and run.
-- \* Add other plots: eg latency distribution.
-- \* ...
Expand All @@ -26,14 +27,18 @@
-- =====
--
-- \* Make the IB size configurable.
-- \* Define a better/more-realistic schedule.
-- \* Minor: I need to use '_' for certain variable names so that they match the paper. Should I simply use snake case overall?
-- \* Address all the FIXMEs.
--
module Leios.Model where

import Prelude hiding (init)

import Control.Applicative (asum)
import Control.Concurrent.Class.MonadSTM.TChan (TChan, newTChanIO, readTChan, writeTChan)
import Control.Concurrent.Class.MonadSTM.TQueue (TQueue, newTQueueIO, readTQueue)
import Control.Concurrent.Class.MonadSTM.TVar (TVar, check, modifyTVar', newTVarIO, readTVar, writeTVar)
import Control.Concurrent.Class.MonadSTM.TVar (TVar, check, modifyTVar', newTVarIO, readTVar, readTVarIO, writeTVar)
import Control.Monad (forever)
import Control.Monad.Class.MonadAsync (Async, Concurrently (Concurrently, runConcurrently), MonadAsync, async, race_)
import Control.Monad.Class.MonadSTM (MonadSTM, STM, atomically, retry)
Expand All @@ -54,12 +59,56 @@ import Text.Pretty.Simple (pPrint)
-- FIXME: constants that should be configurable
--------------------------------------------------------------------------------

g_L = NumberOfSlots 4

g_λ = NumberOfSlices 3

gNodeBandwidth = BitsPerSecond 100

gIBSize = NumberOfBits 300

g_f_I = BlocksPerSecond 1

g_f_E = BlocksPerSecond 4

--------------------------------------------------------------------------------
-- END FIXME: constants that should be configurable
-- Model parameters
--------------------------------------------------------------------------------

-- FIXME: we should add a parameter to determine the number of slots per second (or slot duration).
data Parameters = Parameters
{ _L :: NumberOfSlots
-- ^ Slice length.
, λ :: NumberOfSlices
-- ^ Number of slices (of size '_L') the diffusion period takes.
, nodeBandwidth :: BitsPerSecond
, ibSize :: NumberOfBits
-- ^ Size of the diffusion block.
, f_I :: BlocksPerSecond
-- ^ Frequency of IBs per-node. FIXME: I think this will be determined by the leader schedule (VRF lottery).
, f_E :: BlocksPerSecond
-- ^ Frequency of EBs per-node. FIXME: I think this will be determined by the leader schedule (VRF lottery).
}
deriving (Show, Generic)
deriving anyclass (Aeson.ToJSON, Aeson.FromJSON)

newtype NumberOfSlots = NumberOfSlots Word
deriving stock (Generic)
deriving newtype (Show, Eq, Ord, Num)
deriving anyclass (Aeson.ToJSON, Aeson.FromJSON)

newtype NumberOfSlices = NumberOfSlices Word
deriving stock (Generic)
deriving newtype (Show, Eq, Ord, Num)
deriving anyclass (Aeson.ToJSON, Aeson.FromJSON)

newtype BlocksPerSecond = BlocksPerSecond Word
deriving stock (Generic)
deriving newtype (Show, Eq, Ord)
deriving anyclass (Aeson.ToJSON, Aeson.FromJSON)

--------------------------------------------------------------------------------
-- Model types
--------------------------------------------------------------------------------

data RoleType = IBRole | EBRole | Vote1Role | Vote2Role
Expand All @@ -79,10 +128,14 @@ newtype Slot = Slot Word
tickSlot :: Slot -> Slot
tickSlot = succ

--------------------------------------------------------------------------------
-- Input Blocks
--------------------------------------------------------------------------------

data IB
= IB
{ nodeId :: NodeId
, slot :: Slot
{ ib_slot :: Slot
, ib_producer :: NodeId
, ib_size :: NumberOfBits
}
deriving (Show, Eq, Generic)
Expand All @@ -91,6 +144,32 @@ data IB
instance HasSizeInBits IB where
size = ib_size

-- TOOD: for now the IB ref is just the IB itself. Eventually this should become a hash.
newtype IB_Ref = IB_Ref IB
deriving (Show, Eq, Generic)
deriving anyclass (Aeson.ToJSON, Aeson.FromJSON)

ib_ref :: IB -> IB_Ref
ib_ref = IB_Ref -- TODO: eventually this will compute the IB hash.

--------------------------------------------------------------------------------
-- Endorsement Blocks
--------------------------------------------------------------------------------

data EB
= EB
{ eb_slot :: Slot
, eb_producer :: NodeId
, eb_linked_IBs :: [IB_Ref]
}
deriving (Show, Eq, Generic)
deriving anyclass (Aeson.ToJSON, Aeson.FromJSON)

--------------------------------------------------------------------------------
-- Model
--------------------------------------------------------------------------------

-- TODO: when we connect the model with the server this function should be renamed to something like 'runStandalone'.
run ::
forall m.
( Monad m
Expand All @@ -103,17 +182,36 @@ run ::
Tracer m LeiosEvent ->
m ()
run tracer = do
owparamsTV <- newTVarIO $ OWParams $ gNodeBandwidth
world <- init owparamsTV
raceAll [register i world >> node i tracer schedule world | i <- [0 .. 1]]
let defaultParams =
Parameters
{ _L = g_L
, λ = g_λ
, nodeBandwidth = gNodeBandwidth
, ibSize = gIBSize
, f_I = g_f_I
, f_E = g_f_E
}
paramsTVar <- newTVarIO defaultParams
world <- init paramsTVar
let totalNodes = 1
raceAll
[ register (NodeId i) world
>> node (NodeId i) tracer (schedule totalNodes) world
| i <- [0 .. totalNodes]
]
where
-- TODO: we need to find a more general and realistic way to model the schedule.
--
-- In particular we should add the ledger state needed to determine leadership (eg stake distribution).
--
-- Also we would like more than one node to be elected to issue blocks or votes (specially the latter!).
schedule :: RoleType -> NodeId -> Slot -> m Bool
schedule _ (NodeId nid) (Slot slot) = pure $ slot `mod` (nid + 1) == 0
schedule :: Word -> RoleType -> NodeId -> Slot -> m Bool
schedule _ IBRole (NodeId nid) (Slot slot) = pure $ slot `mod` (nid + 1) == 0
schedule totalNodes EBRole _ (Slot slot) = pure $ slot `mod` totalNodes == 0

-- FIXME: for EBs we might want to define this as
--
-- > slot `mod` totalNumberOfNodes == 0

type Schedule m = RoleType -> NodeId -> Slot -> m Bool

Expand Down Expand Up @@ -143,23 +241,71 @@ node nodeId tracer schedule world = do
clock <- runClock
forever $ do
slot <- nextSlot clock
whenM (hasIBRole slot) $ produceIB slot
whenM (hasIBRole slot) $ produceIB slot -- TODO: do we want to produce IBs at a higher rate than 1 per-slot?
whenM (hasEBRole slot) $ produceEB slot
-- traceWith tracer (NextSlot nodeId slot)
pure ()

consumer = forever $ do
msg <- nodeId `receiveFrom` world
case msg of
MsgIB ib -> do
traceWith tracer (ReceivedIB ib nodeId)
-- traceWith tracer (ReceivedIB ib nodeId)
storeIB nodeId ib world
MsgEB eb -> do
traceWith tracer (ReceivedEB eb nodeId)

hasIBRole = schedule IBRole nodeId

produceIB slot = do
let newIB = IB nodeId slot gIBSize
let newIB = IB{ib_slot = slot, ib_producer = nodeId, ib_size = gIBSize}
traceWith tracer (ProducedIB newIB)
MsgIB newIB `sendTo` world

hasEBRole = schedule EBRole nodeId

produceEB slot = do
Parameters{_L, λ} <- getParams world
l_I <- fmap (fmap ib_ref) $ storedIBsBy nodeId world (slice _L slot (λ + 1))
let newEB = EB{eb_slot = slot, eb_producer = nodeId, eb_linked_IBs = l_I}
traceWith tracer (ProducedEB newEB)
MsgEB newEB `sendTo` world

-- | @slice _L s x@ returns the slice @y - x@, where @y@ is the slice that contains slot @s@.
--
-- We assume the time to be divided in slots, and the slots to be
-- grouped into slices of length @_L@. The following diagram
-- illustrates the fact that @slice 5 18 2@ should return @1@, because
-- slice @1@ is @2@ slices before slice @3@, which contains slot @18@.
--
-- >
-- > 0 1 2 3
-- > |-----|-----|-----|-----|-----|
-- > ↳ s = 18 (slot s is in slice 3)
--
-- In the figure above, @slice _L s x@ will return the interval of slots corresponding to slice 3 [15, 20).
slice :: NumberOfSlots -> Slot -> NumberOfSlices -> Slice
slice (NumberOfSlots _L) (Slot s) (NumberOfSlices x) =
Slice
{ lb_inclusive = Slot $ (sliceOf s - x) * _L
, ub_exclusive = Slot $ (sliceOf s - x + 1) * _L
}
where
sliceOf = (`div` _L)

-- | A slice is defined by its lower slot and its (exclusive) upper bound slot.
--
-- See 'isWithin'.
data Slice = Slice
{ lb_inclusive :: Slot
, ub_exclusive :: Slot
}
deriving (Show, Generic)

isWithin :: Slot -> Slice -> Bool
isWithin slot Slice{lb_inclusive, ub_exclusive} =
lb_inclusive <= slot && slot < ub_exclusive

--------------------------------------------------------------------------------
-- Events
--------------------------------------------------------------------------------
Expand All @@ -169,6 +315,8 @@ data LeiosEvent
= NextSlot {nsNodeId :: NodeId, nsSlot :: Slot} -- FIXME: temporary, just for testing purposes.
| ProducedIB {producedIB :: IB}
| ReceivedIB {receivedIB :: IB, receivedBy :: NodeId}
| ProducedEB {producedEB :: EB}
| ReceivedEB {receivedEB :: EB, receivedBy :: NodeId}
deriving (Show, Generic)
deriving anyclass (Aeson.ToJSON, Aeson.FromJSON)

Expand All @@ -190,6 +338,9 @@ runIO = do
-- Blockchain Clock
--------------------------------------------------------------------------------

-- TODO: we might want to add some mechanism to cancel the async tick
-- thread when the thread that has a reference to the returned clock
-- is canceled.
runClock :: (Monad m, MonadSTM m, MonadAsync m, MonadDelay m) => m (Clock m)
runClock = do
-- FIXME: maybe the slot needs to be determined from the chain start time.
Expand Down Expand Up @@ -230,27 +381,35 @@ data Clock m
data OutsideWorld m
= OutsideWorld
{ pqsTVar :: TVar m (Map NodeId (PQueueTVar m))
, paramsTV :: TVar m OWParams
, paramsTVar :: TVar m Parameters
, storedIBsTVar :: TVar m (Map NodeId [IB])
-- ^ Downloaded blocks per node
}

data OWParams = OWParams {nodeBandwidth :: BitsPerSecond}

newtype BitsPerSecond = BitsPerSecond Word
deriving (Eq, Show, Generic, Ord)
deriving anyclass (Aeson.ToJSON, Aeson.FromJSON)

data Msg = MsgIB {msgIB :: IB}
data Msg
= MsgIB {msgIB :: IB}
| MsgEB {msgEB :: EB}

instance HasSizeInBits Msg where
size (MsgIB ib) = size ib
size (MsgEB eb) = NumberOfBits 1 -- FIXME: for now we assume EB sizes are negligible.

init ::
forall m.
(Monad m, MonadSTM m, Applicative m) =>
TVar m OWParams ->
TVar m Parameters ->
m (OutsideWorld m)
init paramsTV = do
pqsTVar <- newTVarIO Map.empty
pure $ OutsideWorld{pqsTVar = pqsTVar, paramsTV = paramsTV}
init paramsTVar = do
pqsTVar <- newTVarIO mempty
storedIBsTVar <- newTVarIO mempty
pure $ OutsideWorld{pqsTVar = pqsTVar, paramsTVar = paramsTVar, storedIBsTVar = storedIBsTVar}

getParams :: MonadSTM m => OutsideWorld m -> m Parameters
getParams = readTVarIO . paramsTVar

-- | .
--
Expand Down Expand Up @@ -293,11 +452,28 @@ receiveFrom nodeId world = do
Nothing -> error $ "Node " <> show nodeId <> " does not exist."
Just pqTVar -> do
msg <- pop pqTVar
owNodeBandwidth <- nodeBandwidth <$> (atomically $ readTVar (paramsTV world))
owNodeBandwidth <- nodeBandwidth <$> (atomically $ readTVar (paramsTVar world))
let msToInt (Microseconds ms) = fromIntegral ms
threadDelay $ msToInt $ transmissionTime (size msg) owNodeBandwidth
pure msg

--------------------------------------------------------------------------------
-- Outside World Storage
--------------------------------------------------------------------------------

storeIB :: MonadSTM m => NodeId -> IB -> OutsideWorld m -> m ()
storeIB nodeId ib OutsideWorld{storedIBsTVar} =
atomically $
modifyTVar' storedIBsTVar (Map.alter (Just . maybe [ib] (ib :)) nodeId)

-- | Retrieve the downloaded IBs by the given node, which correspond to the given slice.
storedIBsBy :: MonadSTM m => NodeId -> OutsideWorld m -> Slice -> m [IB]
storedIBsBy nodeId OutsideWorld{storedIBsTVar} slice = do
mStoredIBs <- atomically (readTVar storedIBsTVar)
pure $
maybe [] (filter ((`isWithin` slice) . ib_slot)) $
Map.lookup nodeId mStoredIBs

--------------------------------------------------------------------------------
-- A priority queue inside a transactional var
--------------------------------------------------------------------------------
Expand All @@ -308,10 +484,16 @@ data PQueueTVar m = PQueueTVar {getTQueueTVar :: TVar m (MaxQueue PMsg)}
newtype PMsg = PMsg Msg

instance Eq PMsg where
PMsg (MsgIB ib_x) == PMsg (MsgIB ib_y) = slot ib_x == slot ib_y
PMsg (MsgIB x) == PMsg (MsgIB y) = ib_slot x == ib_slot y
PMsg (MsgEB _) == PMsg (MsgEB _) = True -- FIXME: For now EB messages have the same priority
PMsg (MsgIB _) == PMsg (MsgEB _) = False
PMsg (MsgEB _) == PMsg (MsgIB _) = False

instance Ord PMsg where
PMsg (MsgIB ib_x) <= PMsg (MsgIB ib_y) = slot ib_x <= slot ib_y
PMsg (MsgIB x) <= PMsg (MsgIB y) = ib_slot x <= ib_slot y
PMsg (MsgEB _) <= PMsg (MsgEB _) = True -- FIXME: For now EB messages have the same priority.
PMsg (MsgIB _) <= PMsg (MsgEB _) = True -- FIXME: For now EB messages take precedence over IB messages.
PMsg (MsgEB _) <= PMsg (MsgIB _) = False -- FIXME: For now EB messages take precedence over IB messages.

-- TODO: the 'queue' part can be dropped if these functions are put in a separate module.
newPQueue :: (Functor m, MonadSTM m) => m (PQueueTVar m)
Expand Down

0 comments on commit 39ad0a9

Please sign in to comment.