Skip to content

Commit b554d3d

Browse files
Merge pull request #1389 from input-output-hk/lc/distinguish-inbound-outbound
Allow network functions to distinguish between inbound and outbound m…
2 parents d944019 + b3b1e07 commit b554d3d

File tree

6 files changed

+61
-35
lines changed

6 files changed

+61
-35
lines changed

hydra-node/src/Hydra/Network/Authenticate.hs

+5-4
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,19 @@ instance FromCBOR msg => FromCBOR (Signed msg) where
5151
-- Only verified messages are pushed downstream to the internal network for the
5252
-- node to consume and process. Non-verified messages get discarded.
5353
withAuthentication ::
54-
( SignableRepresentation msg
55-
, ToJSON msg
54+
( SignableRepresentation inbound
55+
, ToJSON inbound
56+
, SignableRepresentation outbound
5657
) =>
5758
Tracer m AuthLog ->
5859
-- The party signing key
5960
SigningKey HydraKey ->
6061
-- Other party members
6162
[Party] ->
6263
-- The underlying raw network.
63-
NetworkComponent m (Signed msg) (Signed msg) a ->
64+
NetworkComponent m (Signed inbound) (Signed outbound) a ->
6465
-- The node internal authenticated network.
65-
NetworkComponent m (Authenticated msg) msg a
66+
NetworkComponent m (Authenticated inbound) outbound a
6667
withAuthentication tracer signingKey parties withRawNetwork callback action = do
6768
withRawNetwork checkSignature authenticate
6869
where

hydra-node/src/Hydra/Network/Heartbeat.hs

+11-11
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,10 @@ withHeartbeat ::
9292
-- | Callback listening to peers' status change as computed by the `withIncomingHeartbeat` layer.
9393
ConnectionMessages m ->
9494
-- | Underlying `NetworkComponent` for sending and consuming `Heartbeat` messages.
95-
NetworkComponent m (Heartbeat msg1) (Heartbeat msg) a ->
95+
NetworkComponent m (Heartbeat inbound) (Heartbeat outbound) a ->
9696
-- | Returns a network component that can be used to send and consume arbitrary messages.
9797
-- This layer will take care of peeling out/wrapping messages into `Heartbeat`s.
98-
NetworkComponent m msg1 msg a
98+
NetworkComponent m inbound outbound a
9999
withHeartbeat nodeId connectionMessages withNetwork =
100100
withIncomingHeartbeat connectionMessages $
101101
withOutgoingHeartbeat nodeId withNetwork
@@ -107,8 +107,8 @@ withIncomingHeartbeat ::
107107
ConnectionMessages m ->
108108
-- | Underlying `NetworkComponent`.
109109
-- We only care about the fact it notifies us with `Heartbeat` messages.
110-
NetworkComponent m (Heartbeat msg1) msg a ->
111-
NetworkComponent m msg1 msg a
110+
NetworkComponent m (Heartbeat inbound) outbound a ->
111+
NetworkComponent m inbound outbound a
112112
withIncomingHeartbeat connectionMessages withNetwork callback action = do
113113
heartbeat <- newTVarIO initialHeartbeatState
114114
withNetwork (updateStateFromIncomingMessages heartbeat connectionMessages callback) $ \network ->
@@ -119,8 +119,8 @@ updateStateFromIncomingMessages ::
119119
(MonadSTM m, MonadMonotonicTime m) =>
120120
TVar m HeartbeatState ->
121121
ConnectionMessages m ->
122-
NetworkCallback msg m ->
123-
NetworkCallback (Heartbeat msg) m
122+
NetworkCallback inbound m ->
123+
NetworkCallback (Heartbeat inbound) m
124124
updateStateFromIncomingMessages heartbeatState connectionMessages callback = \case
125125
Data nodeId msg -> notifyAlive nodeId >> callback msg
126126
Ping nodeId -> notifyAlive nodeId
@@ -143,8 +143,8 @@ withOutgoingHeartbeat ::
143143
NodeId ->
144144
-- | Underlying `NetworkComponent`.
145145
-- We only care about the fact it allows us to broadcast `Heartbeat` messages.
146-
NetworkComponent m msg1 (Heartbeat msg) a ->
147-
NetworkComponent m msg1 msg a
146+
NetworkComponent m inbound (Heartbeat outbound) a ->
147+
NetworkComponent m inbound outbound a
148148
withOutgoingHeartbeat nodeId withNetwork callback action = do
149149
lastSent <- newTVarIO Nothing
150150
withNetwork callback $ \network ->
@@ -155,8 +155,8 @@ updateStateFromOutgoingMessages ::
155155
(MonadSTM m, MonadMonotonicTime m) =>
156156
NodeId ->
157157
TVar m (Maybe Time) ->
158-
Network m (Heartbeat msg) ->
159-
Network m msg
158+
Network m (Heartbeat outbound) ->
159+
Network m outbound
160160
updateStateFromOutgoingMessages nodeId lastSent Network{broadcast} =
161161
Network $ \msg -> do
162162
now <- getMonotonicTime
@@ -172,7 +172,7 @@ checkHeartbeatState ::
172172
) =>
173173
NodeId ->
174174
TVar m (Maybe Time) ->
175-
Network m (Heartbeat msg) ->
175+
Network m (Heartbeat outbound) ->
176176
m ()
177177
checkHeartbeatState nodeId lastSent Network{broadcast} =
178178
forever $ do

hydra-node/src/Hydra/Network/Ouroboros.hs

+9-8
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,13 @@ import Ouroboros.Network.Subscription.Ip (SubscriptionParams (..), WithIPList (W
118118
import Ouroboros.Network.Subscription.Worker (LocalAddresses (LocalAddresses))
119119

120120
withOuroborosNetwork ::
121-
forall msg.
122-
(ToCBOR msg, FromCBOR msg) =>
123-
Tracer IO (WithHost (TraceOuroborosNetwork msg)) ->
121+
forall inbound outbound.
122+
(ToCBOR outbound, FromCBOR outbound) =>
123+
(ToCBOR inbound, FromCBOR inbound) =>
124+
Tracer IO (WithHost (TraceOuroborosNetwork outbound)) ->
124125
Host ->
125126
[Host] ->
126-
NetworkComponent IO msg msg ()
127+
NetworkComponent IO inbound outbound ()
127128
withOuroborosNetwork tracer localHost remoteHosts networkCallback between = do
128129
bchan <- newBroadcastTChanIO
129130
let newBroadcastChannel = atomically $ dupTChan bchan
@@ -225,7 +226,7 @@ withOuroborosNetwork tracer localHost remoteHosts networkCallback between = do
225226
}
226227

227228
hydraClient ::
228-
TChan msg ->
229+
TChan outbound ->
229230
OuroborosApplicationWithMinimalCtx 'InitiatorMode addr LByteString IO () Void
230231
hydraClient chan =
231232
OuroborosApplication
@@ -264,14 +265,14 @@ withOuroborosNetwork tracer localHost remoteHosts networkCallback between = do
264265
MiniProtocolLimits{maximumIngressQueue = maxBound}
265266

266267
client ::
267-
TChan msg ->
268-
FireForgetClient msg IO ()
268+
TChan outbound ->
269+
FireForgetClient outbound IO ()
269270
client chan =
270271
Idle $ do
271272
atomically (readTChan chan) <&> \msg ->
272273
SendMsg msg (pure $ client chan)
273274

274-
server :: FireForgetServer msg IO ()
275+
server :: FireForgetServer inbound IO ()
275276
server =
276277
FireForgetServer
277278
{ recvMsg = \msg -> networkCallback msg $> server

hydra-node/src/Hydra/Network/Reliability.hs

+3-3
Original file line numberDiff line numberDiff line change
@@ -217,14 +217,14 @@ withReliability ::
217217
-- | Tracer for logging messages.
218218
Tracer m ReliabilityLog ->
219219
-- | Our persistence handle
220-
MessagePersistence m msg ->
220+
MessagePersistence m outbound ->
221221
-- | Our own party identifier.
222222
Party ->
223223
-- | Other parties' identifiers.
224224
[Party] ->
225225
-- | Underlying network component providing consuming and sending channels.
226-
NetworkComponent m (Authenticated (ReliableMsg (Heartbeat msg))) (ReliableMsg (Heartbeat msg)) a ->
227-
NetworkComponent m (Authenticated (Heartbeat msg)) (Heartbeat msg) a
226+
NetworkComponent m (Authenticated (ReliableMsg (Heartbeat inbound))) (ReliableMsg (Heartbeat outbound)) a ->
227+
NetworkComponent m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a
228228
withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loadMessages} me otherParties withRawNetwork callback action = do
229229
acksCache <- loadAcks >>= newTVarIO
230230
sentMessages <- loadMessages >>= newTVarIO . Seq.fromList

hydra-node/src/Hydra/Node/Network.hs

+5-3
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ data NetworkConfiguration m = NetworkConfiguration
110110

111111
-- | Starts the network layer of a node, passing configured `Network` to its continuation.
112112
withNetwork ::
113+
forall msg tx.
113114
(ToCBOR msg, ToJSON msg, FromJSON msg, FromCBOR msg) =>
114115
-- | Tracer to use for logging messages.
115116
Tracer IO (LogEntry tx msg) ->
@@ -125,7 +126,8 @@ withNetwork tracer connectionMessages configuration callback action = do
125126
numberOfParties = length $ me : otherParties
126127
messagePersistence <- configureMessagePersistence (contramap Node tracer) persistenceDir numberOfParties
127128

128-
let reliability =
129+
let reliability :: NetworkComponent IO (Heartbeat (Authenticated msg)) (Heartbeat msg) ()
130+
reliability =
129131
withFlipHeartbeats $
130132
withReliability (contramap Reliability tracer) messagePersistence me otherParties $
131133
withAuthentication (contramap Authentication tracer) signingKey otherParties $
@@ -161,8 +163,8 @@ configureMessagePersistence tracer persistenceDir numberOfParties = do
161163
pure $ mkMessagePersistence numberOfParties msgPersistence ackPersistence'
162164

163165
withFlipHeartbeats ::
164-
NetworkComponent m (Authenticated (Heartbeat msg)) msg1 a ->
165-
NetworkComponent m (Heartbeat (Authenticated msg)) msg1 a
166+
NetworkComponent m (Authenticated (Heartbeat inbound)) outbound a ->
167+
NetworkComponent m (Heartbeat (Authenticated inbound)) outbound a
166168
withFlipHeartbeats withBaseNetwork callback =
167169
withBaseNetwork unwrapHeartbeats
168170
where

hydra-node/test/Hydra/Network/AuthenticateSpec.hs

+28-6
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ spec = parallel $ do
3333
receivedMessages <- newTVarIO []
3434

3535
withAuthentication
36+
@(Message SimpleTx)
37+
@(Message SimpleTx)
3638
nullTracer
3739
aliceSk
3840
[bob]
@@ -53,6 +55,8 @@ spec = parallel $ do
5355
receivedMessages <- newTVarIO []
5456

5557
withAuthentication
58+
@(Message SimpleTx)
59+
@(Message SimpleTx)
5660
nullTracer
5761
aliceSk
5862
[bob]
@@ -73,6 +77,8 @@ spec = parallel $ do
7377
receivedMessages <- newTVarIO []
7478

7579
withAuthentication
80+
@(Message SimpleTx)
81+
@(Message SimpleTx)
7682
nullTracer
7783
aliceSk
7884
[bob, carol]
@@ -92,10 +98,18 @@ spec = parallel $ do
9298
sentMsgs = runSimOrThrow $ do
9399
sentMessages <- newTVarIO []
94100

95-
withAuthentication nullTracer bobSk [] (captureOutgoing sentMessages) noop $ \Network{broadcast} -> do
96-
threadDelay 0.6
97-
broadcast someMessage
98-
threadDelay 1
101+
withAuthentication
102+
@(Message SimpleTx)
103+
@(Message SimpleTx)
104+
nullTracer
105+
bobSk
106+
[]
107+
(captureOutgoing sentMessages)
108+
noop
109+
$ \Network{broadcast} -> do
110+
threadDelay 0.6
111+
broadcast someMessage
112+
threadDelay 1
99113

100114
readTVarIO sentMessages
101115

@@ -108,8 +122,16 @@ spec = parallel $ do
108122
traces <- newTVarIO []
109123

110124
let tracer = traceInTVar traces "AuthenticateSpec"
111-
withAuthentication tracer aliceSk [bob, carol] (\incoming _ -> incoming signedMsg) noop $ \_ ->
112-
threadDelay 1
125+
withAuthentication
126+
@(Message SimpleTx)
127+
@(Message SimpleTx)
128+
tracer
129+
aliceSk
130+
[bob, carol]
131+
(\incoming _ -> incoming signedMsg)
132+
noop
133+
$ \_ ->
134+
threadDelay 1
113135

114136
readTVarIO traces
115137

0 commit comments

Comments
 (0)