Skip to content

Commit

Permalink
Haskell Leios P2P (#95)
Browse files Browse the repository at this point in the history
  • Loading branch information
Saizan authored Dec 13, 2024
1 parent 5334156 commit f754de2
Show file tree
Hide file tree
Showing 30 changed files with 2,541 additions and 311 deletions.
8 changes: 8 additions & 0 deletions simulation/ouroboros-leios-sim.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ library
LeiosProtocol.Short
LeiosProtocol.Short.Generate
LeiosProtocol.Short.Node
LeiosProtocol.Short.Sim
LeiosProtocol.Short.SimP2P
LeiosProtocol.Short.VizSim
LeiosProtocol.Short.VizSimP2P
LeiosProtocol.SimTestRelay
LeiosProtocol.VizSimTestRelay
ModelTCP
Expand Down Expand Up @@ -106,6 +110,9 @@ library
, containers
, contra-tracer
, deepseq
, diagrams-cairo
, diagrams-core
, diagrams-lib
, fgl
, filepath
, fingertree
Expand All @@ -116,6 +123,7 @@ library
, io-classes
, io-sim
, kdt
, linear
, mtl
, nothunks
, ouroboros-network-api
Expand Down
5 changes: 5 additions & 0 deletions simulation/src/ChanMux.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import Control.Tracer

import Chan
import ChanTCP
import qualified Control.Category as Cat
import TimeCompat

class MuxBundle bundle where
Expand Down Expand Up @@ -62,6 +63,10 @@ data ToFromMuxMsg mm a
, fromMuxMsg :: mm -> a
}

instance Cat.Category ToFromMuxMsg where
id = ToFromMuxMsg id id
(.) (ToFromMuxMsg f f') (ToFromMuxMsg g g') = ToFromMuxMsg (g . f) (f' . g')

dynToFromMuxMsg :: Typeable a => ToFromMuxMsg Dynamic a
dynToFromMuxMsg = ToFromMuxMsg toDyn (fromJust . fromDynamic)

Expand Down
32 changes: 25 additions & 7 deletions simulation/src/LeiosProtocol/Common.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}
Expand Down Expand Up @@ -32,14 +33,18 @@ module LeiosProtocol.Common (
rankingBlockBodyInvariant,
NodeId,
SubSlotNo (..),
Word64,
)
where

import ChanTCP
import Control.Exception (assert)
import Control.Monad (guard)
import Data.Hashable
import Data.Set (Set)
import Data.Word (Word8)
import Data.Map (Map)
import Data.Word (Word64, Word8)
import GHC.Generics
import GHC.Records
import Ouroboros.Network.Block as Block
import PraosProtocol.Common (
ChainHash (..),
Expand Down Expand Up @@ -88,6 +93,8 @@ data RankingBlockBody = RankingBlockBody
, payload :: !Bytes
-- ^ ranking blocks can also contain transactions directly, which we
-- do not model directly, but contribute to size.
, nodeId :: !NodeId
-- ^ convenience to keep track of origin, does not contribute to size.
, size :: !Bytes
}
deriving stock (Eq, Show, Generic)
Expand All @@ -104,7 +111,8 @@ data InputBlockId = InputBlockId
{ node :: !NodeId
, num :: !Int
}
deriving stock (Eq, Ord, Show)
deriving stock (Eq, Ord, Show, Generic)
deriving anyclass (Hashable)

newtype SubSlotNo = SubSlotNo Word8
deriving stock (Show)
Expand All @@ -129,6 +137,8 @@ data InputBlockBody = InputBlockBody
{ id :: !InputBlockId
, size :: !Bytes
-- ^ transactions not modeled, only their total size.
, slot :: !SlotNo
-- ^ duplicated here for convenience of vizualization, does not contribute to size.
}
deriving stock (Eq, Show)

Expand All @@ -141,6 +151,9 @@ data InputBlock = InputBlock
inputBlockInvariant :: InputBlock -> Bool
inputBlockInvariant ib = ib.header.id == ib.body.id

instance HasField "id" InputBlock InputBlockId where
getField = (.id) . (.header)

data EndorseBlockId = EndorseBlockId
{ node :: !NodeId
, num :: !Int
Expand Down Expand Up @@ -172,13 +185,14 @@ data VoteMsg = VoteMsg
{ id :: !VoteId
, slot :: !SlotNo
, producer :: !NodeId
, endorseBlock :: !EndorseBlockId
, votes :: Word64
, endorseBlocks :: ![EndorseBlockId]
, size :: !Bytes
}
deriving stock (Eq, Show)

data Certificate = Certificate
{ votes :: Set VoteId
{ votes :: Map VoteId Word64
}
deriving stock (Show, Eq, Generic)
deriving anyclass (Hashable)
Expand All @@ -187,11 +201,15 @@ data Certificate = Certificate
---- Common defs
-------------------------------------------

slice :: Int -> SlotNo -> Int -> (SlotNo, SlotNo)
slice l s x = (toEnum s', toEnum $ s' + l - 1)
slice :: Int -> SlotNo -> Int -> Maybe (SlotNo, SlotNo)
slice l s x = do
guard (s' >= 0)
return (a, b)
where
-- taken from formal spec
s' = (fromEnum s `div` l - x) * l
a = assert (s' >= 0) $ toEnum s'
b = assert (s' + l - 1 >= 0) $ toEnum $ s' + l - 1

-------------------------------------------
---- MessageSize instances
Expand Down
19 changes: 14 additions & 5 deletions simulation/src/LeiosProtocol/Relay.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import qualified Data.List as List
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as NonEmpty
import Data.Map (Map)
import qualified Data.Map as Map
import qualified Data.Map.Strict as Map
import Data.Maybe (isJust, isNothing, mapMaybe)
import Data.Monoid (Sum (..))
import Data.Sequence.Strict (StrictSeq)
Expand Down Expand Up @@ -497,6 +497,8 @@ data SubmitPolicy = SubmitInOrder | SubmitAll

data RelayConsumerConfig id header body m = RelayConsumerConfig
{ relay :: !RelayConfig
, headerValidationDelay :: header -> DiffTime
, threadDelayParallel :: [DiffTime] -> m ()
, headerId :: !(header -> id)
, prioritize :: !(Map id header -> [header])
-- ^ returns a subset of headers, in order of what should be fetched first.
Expand All @@ -510,8 +512,9 @@ data RelayConsumerConfig id header body m = RelayConsumerConfig
-- ^ sends blocks to be validated/added to the buffer. Allowed to be
-- blocking, but relayConsumer does not assume the blocks made it
-- into the relayBuffer. Also takes a delivery time (relevant for
-- e.g. IB endorsement) and a callback that expects the subset of
-- validated blocks.
-- e.g. IB endorsement) and a callback that expects a subset of
-- validated blocks. Callback might be called more than once, with
-- different subsets.
, submitPolicy :: !SubmitPolicy
, maxHeadersToRequest :: !Word16
, maxBodiesToRequest :: !Word16
Expand Down Expand Up @@ -782,6 +785,8 @@ relayConsumerPipelined config sst =
unless (Seq.length idsSeq <= fromIntegral windowExpand) $
throw IdsNotRequested

config.threadDelayParallel $ map config.headerValidationDelay headers

-- Upon receiving a batch of new headers we extend our available set,
-- and extend the unacknowledged sequence.
--
Expand Down Expand Up @@ -812,6 +817,10 @@ relayConsumerPipelined config sst =
unless (idsReceived `Set.isSubsetOf` idsRequested) $
throw BodiesNotRequested

let notReceived = idsRequested `Set.difference` idsReceived
unless (Set.null notReceived) $ do
atomically $ modifyTVar' sst.inFlightVar (`Set.difference` notReceived)

-- We can match up all the txids we requested, with those we
-- received.
let idsRequestedWithBodiesReceived :: Map id (Maybe (header, body))
Expand Down Expand Up @@ -883,8 +892,8 @@ relayConsumerPipelined config sst =
-- all blocks validated.
modifyTVar' sst.relayBufferVar $
flip (Foldable.foldl' (\buf blk@(h, _) -> RB.snoc (config.headerId h) blk buf)) validated
-- TODO?: the ids we did not receive could be taken out earlier.
modifyTVar' sst.inFlightVar (`Set.difference` idsRequested)
-- TODO: won't remove from inFlight blocks not validated.
modifyTVar' sst.inFlightVar (`Set.difference` Set.fromList (map (config.headerId . fst) validated))

return $
idle
Expand Down
2 changes: 1 addition & 1 deletion simulation/src/LeiosProtocol/RelayBuffer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import Data.FingerTree (FingerTree)
import qualified Data.FingerTree as FingerTree
import qualified Data.Foldable as Foldable
import Data.Map (Map)
import qualified Data.Map as Map
import qualified Data.Map.Strict as Map
import Data.Set (Set)
import Data.Word (Word64)

Expand Down
Loading

0 comments on commit f754de2

Please sign in to comment.