diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/network/server/NodeServer.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/network/server/NodeServer.java new file mode 100644 index 00000000..1a3bc9ac --- /dev/null +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/network/server/NodeServer.java @@ -0,0 +1,76 @@ +package com.bloxbean.cardano.yaci.core.network.server; + +import com.bloxbean.cardano.yaci.core.protocol.handshake.messages.VersionTable; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import lombok.extern.slf4j.Slf4j; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Slf4j +public class NodeServer { + private final int port; + private final EventLoopGroup bossGroup; + private final EventLoopGroup workerGroup; + private Channel serverChannel; + private VersionTable versionTable; + private final static Map sessions = new ConcurrentHashMap<>(); + + public NodeServer(int port, VersionTable versionTable) { + this.port = port; + this.bossGroup = new NioEventLoopGroup(1); + this.workerGroup = new NioEventLoopGroup(); + this.versionTable = versionTable; + } + + public void start() { + try { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + NodeServerSession session = new NodeServerSession(ch, versionTable); + sessions.put(ch, session); + } + }) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.TCP_NODELAY, true); + + ChannelFuture future = bootstrap.bind(port).sync(); + serverChannel = future.channel(); + log.info("NodeServer started on port {}", port); + + serverChannel.closeFuture().sync(); + } catch (InterruptedException e) { + log.error("NodeServer interrupted", e); + } finally { + shutdown(); + } + } + + public void shutdown() { + log.info("Shutting down NodeServer..."); + for (NodeServerSession session : sessions.values()) { + session.close(); + } + if (serverChannel != null) { + serverChannel.close(); + } + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + + public static void removeSession(Channel channel) { + NodeServerSession session = sessions.remove(channel); + if (session != null) { + session.close(); + } + } + +} diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/network/server/NodeServerSession.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/network/server/NodeServerSession.java new file mode 100644 index 00000000..5095b320 --- /dev/null +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/network/server/NodeServerSession.java @@ -0,0 +1,59 @@ +package com.bloxbean.cardano.yaci.core.network.server; + +import com.bloxbean.cardano.yaci.core.network.handlers.MiniProtoRequestDataEncoder; +import com.bloxbean.cardano.yaci.core.network.handlers.MiniProtoStreamingByteToMessageDecoder; +import com.bloxbean.cardano.yaci.core.network.server.handlers.MiniProtoServerInboundHandler; +import com.bloxbean.cardano.yaci.core.protocol.Agent; +import com.bloxbean.cardano.yaci.core.protocol.handshake.HandshakeAgent; +import com.bloxbean.cardano.yaci.core.protocol.handshake.messages.VersionTable; +import io.netty.channel.Channel; +import io.netty.channel.ChannelPipeline; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class NodeServerSession { + private final Channel clientChannel; + private final HandshakeAgent handshakeAgent; + private final Agent[] agents; + + public NodeServerSession(Channel clientChannel, VersionTable versionTable) { + this.clientChannel = clientChannel; + this.handshakeAgent = new HandshakeAgent(versionTable, false); + this.agents = createAgents(); + + setupPipeline(); + } + + private void setupPipeline() { + ChannelPipeline pipeline = clientChannel.pipeline(); + pipeline.addLast(new MiniProtoRequestDataEncoder()); + pipeline.addLast(new MiniProtoStreamingByteToMessageDecoder(agents)); + pipeline.addLast(new MiniProtoServerInboundHandler(this)); + } + + public HandshakeAgent getHandshakeAgent() { + return handshakeAgent; + } + + public Agent[] getAgents() { + return agents; + } + + public Channel getClientChannel() { + return clientChannel; + } + + public void close() { + log.info("Closing session for client: {}", clientChannel.remoteAddress()); + clientChannel.close(); + } + + private Agent[] createAgents() { + // Initialize specific mini-protocol handlers (ChainSync, BlockFetch, etc.) + return new Agent[]{ + + }; + } +} + + diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/network/server/SessionManager.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/network/server/SessionManager.java new file mode 100644 index 00000000..fcd944dd --- /dev/null +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/network/server/SessionManager.java @@ -0,0 +1,27 @@ +package com.bloxbean.cardano.yaci.core.network.server; + +import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +@Slf4j +public class SessionManager { + private static final Set activeClients = Collections.newSetFromMap(new ConcurrentHashMap<>()); + + public static void addClient(Channel channel) { + activeClients.add(channel); + log.info("Client connected: {} (Total: {})", channel.remoteAddress(), activeClients.size()); + } + + public static void removeClient(Channel channel) { + activeClients.remove(channel); + log.info("Client disconnected: {} (Remaining: {})", channel.remoteAddress(), activeClients.size()); + } + + public static int getActiveClientCount() { + return activeClients.size(); + } +} diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/network/server/handlers/MiniProtoServerInboundHandler.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/network/server/handlers/MiniProtoServerInboundHandler.java new file mode 100644 index 00000000..fa45f2ee --- /dev/null +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/network/server/handlers/MiniProtoServerInboundHandler.java @@ -0,0 +1,74 @@ +package com.bloxbean.cardano.yaci.core.network.server.handlers; + +import com.bloxbean.cardano.yaci.core.network.server.NodeServer; +import com.bloxbean.cardano.yaci.core.network.server.NodeServerSession; +import com.bloxbean.cardano.yaci.core.protocol.Agent; +import com.bloxbean.cardano.yaci.core.protocol.Message; +import com.bloxbean.cardano.yaci.core.protocol.Segment; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.util.ReferenceCountUtil; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class MiniProtoServerInboundHandler extends SimpleChannelInboundHandler { + private final NodeServerSession session; + + public MiniProtoServerInboundHandler(NodeServerSession session) { + this.session = session; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + log.info("Client connected: {}", ctx.channel().remoteAddress()); + session.getHandshakeAgent().setChannel(ctx.channel()); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Segment segment) { + try { + log.debug("Received segment from client: Protocol {} - Length {}", segment.getProtocol(), segment.getPayload().length); + + if (segment.getProtocol() == session.getHandshakeAgent().getProtocolId()) { + Message message = session.getHandshakeAgent().deserializeResponse(segment.getPayload()); + session.getHandshakeAgent().receiveResponse(message); + + if (!session.getHandshakeAgent().isDone() && session.getHandshakeAgent().hasAgency()) + session.getHandshakeAgent().sendNextMessage(); + } else { + for (Agent agent : session.getAgents()) { + if (!agent.isDone() && agent.getProtocolId() == segment.getProtocol()) { + Message message = agent.deserializeResponse(segment.getPayload()); + agent.receiveResponse(message); + + if (agent.hasAgency()) + agent.sendNextMessage(); + break; + } + } + } + + ctx.flush(); + } finally { + ReferenceCountUtil.release(segment); + } + } + + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + log.error("Error processing client request", cause); + ctx.close(); + session.close(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + log.info("Client disconnected: {}", ctx.channel().remoteAddress()); + + // Remove session from NodeServer when client disconnects + session.close(); + NodeServer.removeSession(ctx.channel()); + } + +} diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/Agent.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/Agent.java index e6e732f5..c83b82aa 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/Agent.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/Agent.java @@ -17,6 +17,12 @@ public abstract class Agent { private final List agentListeners = new ArrayList<>(); private AcceptVersion acceptVersion; + private final boolean isClient; + + public Agent(boolean isClient) { + this.isClient = isClient; + } + public void setChannel(Channel channel) { if (this.channel != null && this.channel.isActive()) log.warn("An active channel is already attached to this agent"); @@ -25,7 +31,7 @@ public void setChannel(Channel channel) { } public void sendRequest(Message message) { - if (currenState.hasAgency()) { + if (currenState.hasAgency(isClient)) { currenState = currenState.nextState(message); } else { //TODO @@ -69,7 +75,7 @@ public final void sendNextMessage() { } public final boolean hasAgency() { - return currenState.hasAgency(); + return currenState.hasAgency(isClient); } public final synchronized void addListener(T agentListener) { diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/State.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/State.java index ec357d84..ec6d0565 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/State.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/State.java @@ -6,7 +6,7 @@ public interface State { State nextState(Message message); - boolean hasAgency(); + boolean hasAgency(boolean isClient); default Message handleInbound(byte[] bytes) { return null; diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/blockfetch/BlockfetchAgent.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/blockfetch/BlockfetchAgent.java index 6e8eafaf..fd8b4ef5 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/blockfetch/BlockfetchAgent.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/blockfetch/BlockfetchAgent.java @@ -33,6 +33,10 @@ public class BlockfetchAgent extends Agent { private long errorBlks; public BlockfetchAgent() { + this(true); + } + public BlockfetchAgent(boolean isClient) { + super(isClient); this.currenState = Idle; this.startTime = System.currentTimeMillis(); diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/blockfetch/BlockfetchState.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/blockfetch/BlockfetchState.java index da60a2d0..e5a1d693 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/blockfetch/BlockfetchState.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/blockfetch/BlockfetchState.java @@ -17,8 +17,8 @@ else if (message instanceof ClientDone) } @Override - public boolean hasAgency() { - return true; + public boolean hasAgency(boolean isClient) { + return isClient; } }, Busy { @@ -33,8 +33,8 @@ else if (message instanceof StartBatch) } @Override - public boolean hasAgency() { - return false; + public boolean hasAgency(boolean isClient) { + return !isClient; } }, Streaming { @@ -49,8 +49,8 @@ public State nextState(Message message) { } @Override - public boolean hasAgency() { - return false; + public boolean hasAgency(boolean isClient) { + return !isClient; } }, Done { @@ -60,7 +60,7 @@ public State nextState(Message message) { } @Override - public boolean hasAgency() { + public boolean hasAgency(boolean isClient) { return false; } } diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/n2c/LocalChainSyncAgent.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/n2c/LocalChainSyncAgent.java index eb9ac4ac..f85b3350 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/n2c/LocalChainSyncAgent.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/n2c/LocalChainSyncAgent.java @@ -21,6 +21,10 @@ public class LocalChainSyncAgent extends Agent { private int counter = 0; public LocalChainSyncAgent(Point[] knownPoints) { + this(knownPoints, true); + } + public LocalChainSyncAgent(Point[] knownPoints, boolean isClient) { + super(isClient); this.currenState = Idle; this.knownPoints = knownPoints; @@ -29,6 +33,10 @@ public LocalChainSyncAgent(Point[] knownPoints) { } public LocalChainSyncAgent(Point[] knownPoints, long stopSlotNo, int agentNo) { + this(knownPoints,stopSlotNo, agentNo, true); + } + public LocalChainSyncAgent(Point[] knownPoints, long stopSlotNo, int agentNo, boolean isClient) { + super(isClient); this.currenState = Idle; this.knownPoints = knownPoints; this.stopAt = stopSlotNo; diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/n2c/LocalChainSyncState.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/n2c/LocalChainSyncState.java index 114b9f17..125e3e6c 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/n2c/LocalChainSyncState.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/n2c/LocalChainSyncState.java @@ -21,8 +21,8 @@ else if (message instanceof ChainSyncMsgDone) } @Override - public boolean hasAgency() { - return true; + public boolean hasAgency(boolean isClient) { + return isClient; } }, CanAwait { @@ -39,8 +39,8 @@ else if (message instanceof Rollbackward) } @Override - public boolean hasAgency() { - return false; + public boolean hasAgency(boolean isClient) { + return !isClient; } }, MustReply { @@ -55,8 +55,8 @@ else if (message instanceof Rollbackward) } @Override - public boolean hasAgency() { - return false; + public boolean hasAgency(boolean isClient) { + return !isClient; } }, Intersect { @@ -71,8 +71,8 @@ else if (message instanceof IntersectNotFound) } @Override - public boolean hasAgency() { - return false; + public boolean hasAgency(boolean isClient) { + return !isClient; } }, Done { @@ -82,7 +82,7 @@ public State nextState(Message message) { } @Override - public boolean hasAgency() { + public boolean hasAgency(boolean isClient) { return false; } } diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/n2n/ChainSyncState.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/n2n/ChainSyncState.java index 150f22bf..88ab67d3 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/n2n/ChainSyncState.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/n2n/ChainSyncState.java @@ -21,8 +21,8 @@ else if (message instanceof ChainSyncMsgDone) } @Override - public boolean hasAgency() { - return true; + public boolean hasAgency(boolean isClient) { + return isClient; } }, CanAwait { @@ -39,8 +39,8 @@ else if (message instanceof Rollbackward) } @Override - public boolean hasAgency() { - return false; + public boolean hasAgency(boolean isClient) { + return !isClient; } }, MustReply { @@ -55,8 +55,8 @@ else if (message instanceof Rollbackward) } @Override - public boolean hasAgency() { - return false; + public boolean hasAgency(boolean isClient) { + return !isClient; } }, Intersect { @@ -71,8 +71,8 @@ else if (message instanceof IntersectNotFound) } @Override - public boolean hasAgency() { - return false; + public boolean hasAgency(boolean isClient) { + return !isClient; } }, Done { @@ -82,7 +82,7 @@ public State nextState(Message message) { } @Override - public boolean hasAgency() { + public boolean hasAgency(boolean isClient) { return false; } } diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/n2n/ChainsyncAgent.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/n2n/ChainsyncAgent.java index c8e910fb..b654c035 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/n2n/ChainsyncAgent.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/n2n/ChainsyncAgent.java @@ -23,6 +23,10 @@ public class ChainsyncAgent extends Agent { private long startTime; public ChainsyncAgent(Point[] knownPoints) { + this(knownPoints, true); + } + public ChainsyncAgent(Point[] knownPoints, boolean isClient) { + super(isClient); this.currenState = Idle; this.knownPoints = knownPoints; @@ -31,6 +35,10 @@ public ChainsyncAgent(Point[] knownPoints) { } public ChainsyncAgent(Point[] knownPoints, long stopSlotNo, int agentNo) { + this(knownPoints, stopSlotNo, agentNo, true); + } + public ChainsyncAgent(Point[] knownPoints, long stopSlotNo, int agentNo, boolean isClient) { + super(isClient); this.currenState = Idle; this.knownPoints = knownPoints; this.stopAt = stopSlotNo; diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/FindIntersectSerializer.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/FindIntersectSerializer.java index f8c03df4..cfda672a 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/FindIntersectSerializer.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/FindIntersectSerializer.java @@ -1,6 +1,7 @@ package com.bloxbean.cardano.yaci.core.protocol.chainsync.serializers; import co.nstant.in.cbor.model.Array; +import co.nstant.in.cbor.model.DataItem; import co.nstant.in.cbor.model.UnsignedInteger; import com.bloxbean.cardano.yaci.core.protocol.Serializer; import com.bloxbean.cardano.yaci.core.protocol.chainsync.messages.FindIntersect; @@ -26,4 +27,18 @@ public byte[] serialize(FindIntersect intersect) { return bytes; } + @Override + public FindIntersect deserializeDI(DataItem di) { + Array array = (Array) di; + DataItem pointsDataItem = array.getDataItems().get(1); + Array pointsArray = (Array) pointsDataItem; + var pointsDIList = pointsArray.getDataItems(); + + Point[] points = new Point[pointsDIList.size()]; + for (int i=0; i < points.length; i++) { + points[i] = PointSerializer.INSTANCE.deserializeDI(pointsDIList.get(i)); + } + return new FindIntersect(points); + + } } diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/IntersectFoundSerializer.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/IntersectFoundSerializer.java index e9837e48..ca74dbd7 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/IntersectFoundSerializer.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/IntersectFoundSerializer.java @@ -26,4 +26,13 @@ public IntersectFound deserialize(byte[] bytes) { return intersectFound; } + + @Override + public DataItem serializeDI(IntersectFound inFnd) { + Array array = new Array(); + array.add(new UnsignedInteger(5)); + array.add(PointSerializer.INSTANCE.serializeDI(inFnd.getPoint())); + array.add(TipSerializer.INSTANCE.serializeDI(inFnd.getTip())); + return array; + } } diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/IntersectNotFoundSerializer.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/IntersectNotFoundSerializer.java index 5e12ce4c..9e3ba2cb 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/IntersectNotFoundSerializer.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/IntersectNotFoundSerializer.java @@ -10,11 +10,6 @@ public enum IntersectNotFoundSerializer implements Serializer { INSTANCE(); - @Override - public byte[] serialize(IntersectNotFound object) { - return new byte[0]; - } - public IntersectNotFound deserialize(byte[] bytes) { DataItem di = CborSerializationUtil.deserializeOne(bytes); Array array = (Array) di; @@ -26,4 +21,13 @@ public IntersectNotFound deserialize(byte[] bytes) { = new IntersectNotFound(TipSerializer.INSTANCE.deserializeDI(array.getDataItems().get(1))); return intersectNotFound; } + + @Override + public DataItem serializeDI(IntersectNotFound obj) { + Array array = new Array(); + array.add(new UnsignedInteger(6)); + array.add(TipSerializer.INSTANCE.serializeDI(obj.getTip())); + return array; + + } } diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/LocalRollForwardSerializer.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/LocalRollForwardSerializer.java index 5ab76712..b86beb6c 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/LocalRollForwardSerializer.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/LocalRollForwardSerializer.java @@ -53,4 +53,36 @@ public LocalRollForward deserialize(byte[] bytes) { return new LocalRollForward(byronEbBlock, byronMainBlock, block, tip); } + //TODO -- Need tests + @Override + public DataItem serializeDI(LocalRollForward object) { + Array contentArr = new Array(); + + // Add the rollForwardType for serialization + if (object.getByronEbBlock() != null) { + contentArr.add(new UnsignedInteger(0)); // Era 0 for ByronEbBlock + } else if (object.getByronBlock() != null) { + contentArr.add(new UnsignedInteger(1)); // ByronMainBlock era value TODO ?? + } else if (object.getBlock() != null) { + contentArr.add(new UnsignedInteger(object.getBlock().getEra().getValue())); // Other block's era value + } else { + throw new IllegalArgumentException("Incomplete LocalRollForward object."); + } + + // Add the serialized block + byte[] blockBytes; + if (object.getByronEbBlock() != null) { + blockBytes = ByronEbBlockSerializer.INSTANCE.serialize(object.getByronEbBlock()); + } else if (object.getByronBlock() != null) { + blockBytes = ByronBlockSerializer.INSTANCE.serialize(object.getByronBlock()); + } else { + blockBytes = BlockSerializer.INSTANCE.serialize(object.getBlock()); + } + contentArr.add(new ByteString(blockBytes)); + + // Add the serialized Tip + contentArr.add(TipSerializer.INSTANCE.serializeDI(object.getTip())); + + return contentArr; + } } diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/RequestNextSerializer.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/RequestNextSerializer.java index 0c712889..9aac3bd1 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/RequestNextSerializer.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/RequestNextSerializer.java @@ -1,6 +1,7 @@ package com.bloxbean.cardano.yaci.core.protocol.chainsync.serializers; import co.nstant.in.cbor.model.Array; +import co.nstant.in.cbor.model.DataItem; import co.nstant.in.cbor.model.UnsignedInteger; import com.bloxbean.cardano.yaci.core.protocol.Serializer; import com.bloxbean.cardano.yaci.core.protocol.chainsync.messages.RequestNext; @@ -14,4 +15,16 @@ public byte[] serialize(RequestNext requestNext) { array.add(new UnsignedInteger(0)); return CborSerializationUtil.serialize(array, false); } + + @Override + public RequestNext deserializeDI(DataItem di) { + Array array = (Array) di; + + UnsignedInteger idx = (UnsignedInteger) array.getDataItems().get(0); + if (idx.getValue().intValue() == 0) { + return new RequestNext(); + } + + throw new IllegalArgumentException("Invalid DataItem for RequestNext deserialization"); + } } diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/RollbackwardSerializer.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/RollbackwardSerializer.java index 1c15b8d3..40f3444b 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/RollbackwardSerializer.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/RollbackwardSerializer.java @@ -1,6 +1,8 @@ package com.bloxbean.cardano.yaci.core.protocol.chainsync.serializers; import co.nstant.in.cbor.model.Array; +import co.nstant.in.cbor.model.DataItem; +import co.nstant.in.cbor.model.UnsignedInteger; import com.bloxbean.cardano.yaci.core.protocol.Serializer; import com.bloxbean.cardano.yaci.core.protocol.chainsync.messages.Point; import com.bloxbean.cardano.yaci.core.protocol.chainsync.messages.Rollbackward; @@ -19,4 +21,13 @@ public Rollbackward deserialize(byte[] bytes) { return new Rollbackward(point, tip); } + + @Override + public DataItem serializeDI(Rollbackward object) { + Array array = new Array(); + array.add(new UnsignedInteger(3)); + array.add(PointSerializer.INSTANCE.serializeDI(object.getPoint())); + array.add(TipSerializer.INSTANCE.serializeDI(object.getTip())); + return array; + } } diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/TipSerializer.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/TipSerializer.java index 1ca50140..66ab9434 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/TipSerializer.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/chainsync/serializers/TipSerializer.java @@ -19,4 +19,12 @@ public Tip deserializeDI(DataItem di) { return new Tip(tipPoint, block); } + + @Override + public DataItem serializeDI(Tip tip) { + Array array = new Array(); + array.add(PointSerializer.INSTANCE.serializeDI(tip.getPoint())); + array.add(new UnsignedInteger(tip.getBlock())); + return array; + } } diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/handshake/HandshakeAgent.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/handshake/HandshakeAgent.java index 5686c201..4e032a2c 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/handshake/HandshakeAgent.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/handshake/HandshakeAgent.java @@ -2,20 +2,28 @@ import com.bloxbean.cardano.yaci.core.protocol.Agent; import com.bloxbean.cardano.yaci.core.protocol.Message; -import com.bloxbean.cardano.yaci.core.protocol.handshake.messages.AcceptVersion; -import com.bloxbean.cardano.yaci.core.protocol.handshake.messages.ProposedVersions; -import com.bloxbean.cardano.yaci.core.protocol.handshake.messages.Reason; -import com.bloxbean.cardano.yaci.core.protocol.handshake.messages.VersionTable; +import com.bloxbean.cardano.yaci.core.protocol.handshake.messages.*; import lombok.extern.slf4j.Slf4j; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + import static com.bloxbean.cardano.yaci.core.protocol.handshake.HandshkeState.Propose; @Slf4j public class HandshakeAgent extends Agent { + private final VersionTable versionTable; private boolean suppressConnectionInfoLog = false; + private ProposedVersions proposedVersions; + public HandshakeAgent(VersionTable versionTable) { + this(versionTable,true); + } + public HandshakeAgent(VersionTable versionTable, boolean isClient) { + super(isClient); this.versionTable = versionTable; this.currenState = Propose; } @@ -30,15 +38,53 @@ public Message buildNextMessage() { switch ((HandshkeState)currenState) { case Propose: return new ProposedVersions(versionTable); //TODO + case Confirm: + return prepareConfirmMessage(); default: return null; } } + private Message prepareConfirmMessage() { + var versionDataMap = proposedVersions.getVersionTable().getVersionDataMap(); + + List supportedProtocolVersions = new ArrayList<>(); + for (var entry : versionDataMap.entrySet()) { + long proposedVersion = entry.getKey(); + if (versionTable.getVersionDataMap().containsKey(proposedVersion) && + versionTable.getVersionDataMap().get(proposedVersion).getNetworkMagic() == + entry.getValue().getNetworkMagic()) { + supportedProtocolVersions.add(proposedVersion); + } + } + + if (supportedProtocolVersions.size() > 0) { + supportedProtocolVersions.sort(Long::compareTo); + long highestVersion = supportedProtocolVersions.get(supportedProtocolVersions.size() - 1); + + var acceptedVersionData = versionDataMap.get(highestVersion); + // Accept the highest version + var acceptVersion = new AcceptVersion(highestVersion, acceptedVersionData); + System.out.println("Accept Version constructed !!!"); + return acceptVersion; + } else { + var versions = proposedVersions.getVersionTable().getVersionDataMap().keySet() + .stream().collect(Collectors.toList()); + + Reason reason = new ReasonVersionMismatch(versions); + Refuse refuse = new Refuse(reason); + + return refuse; + } + + } + @Override public void processResponse(Message message) { if (message == null) return; - if (message instanceof AcceptVersion) { + if (message instanceof ProposedVersions) { + this.proposedVersions = (ProposedVersions) message; + } else if (message instanceof AcceptVersion) { if (log.isDebugEnabled() || !suppressConnectionInfoLog) log.info("Handshake Ok!!! {}", message); setProtocolVersion((AcceptVersion)message); diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/handshake/HandshkeState.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/handshake/HandshkeState.java index ac333737..9eb721de 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/handshake/HandshkeState.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/handshake/HandshkeState.java @@ -10,8 +10,8 @@ public HandshkeState nextState(Message message) { } @Override - public boolean hasAgency() { - return true; + public boolean hasAgency(boolean isClient) { + return isClient; } }, Confirm { @@ -21,8 +21,8 @@ public HandshkeState nextState(Message message) { } @Override - public boolean hasAgency() { - return false; + public boolean hasAgency(boolean isClient) { + return !isClient; } }, Done { @@ -32,7 +32,7 @@ public HandshkeState nextState(Message message) { } @Override - public boolean hasAgency() { + public boolean hasAgency(boolean isClient) { return false; } } diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/handshake/messages/AcceptVersion.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/handshake/messages/AcceptVersion.java index 4536d3f2..94c4d91e 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/handshake/messages/AcceptVersion.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/handshake/messages/AcceptVersion.java @@ -1,6 +1,7 @@ package com.bloxbean.cardano.yaci.core.protocol.handshake.messages; import com.bloxbean.cardano.yaci.core.protocol.Message; +import com.bloxbean.cardano.yaci.core.protocol.handshake.serializers.HandshakeSerializers; import lombok.*; @Getter @@ -11,4 +12,9 @@ public class AcceptVersion implements Message { private long versionNumber; private VersionData versionData; + + @Override + public byte[] serialize() { + return HandshakeSerializers.AcceptVersionSerializer.INSTANCE.serialize(this); + } } diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/handshake/serializers/HandshakeSerializers.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/handshake/serializers/HandshakeSerializers.java index c5484ffb..0db1562e 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/handshake/serializers/HandshakeSerializers.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/handshake/serializers/HandshakeSerializers.java @@ -9,6 +9,7 @@ import com.bloxbean.cardano.yaci.core.util.HexUtil; import lombok.extern.slf4j.Slf4j; +import java.math.BigInteger; import java.util.*; import java.util.Map; @@ -29,6 +30,20 @@ public byte[] serialize(ProposedVersions proposedVersions) { log.debug(HexUtil.encodeHexString(CborSerializationUtil.serialize(array))); return CborSerializationUtil.serialize(array); } + + @Override + public ProposedVersions deserializeDI(DataItem di) { + Array array = (Array) di; + var dataItems = array.getDataItems(); + var label = ((UnsignedInteger)dataItems.get(0)).getValue(); + if (label != BigInteger.ZERO) + throw new RuntimeException("Expected label : " + 0 + ", found : " + label); //TODO should we throw here or return null + + var versionTableDI = dataItems.get(1); + var versionTable = VersionTableSerializer.INSTANCE.deserializeDI(versionTableDI); + + return new ProposedVersions(versionTable); + } } public enum VersionTableSerializer implements Serializer { @@ -141,6 +156,33 @@ public AcceptVersion deserializeDI(DataItem di) { } else throw new CborRuntimeException("Parsing error. Invalid dataitem type : " + versionDataDI); } + + @Override + public DataItem serializeDI(AcceptVersion acceptVersion) { + Array array = new Array(); + + //label + array.add(new UnsignedInteger(1)); + + //version number + array.add(new UnsignedInteger(acceptVersion.getVersionNumber())); + + Array versionData = new Array(); + versionData.add(new UnsignedInteger(acceptVersion.getVersionData().getNetworkMagic())); + + //N2N version data + if (acceptVersion.getVersionData() instanceof N2NVersionData) { + var n2nVersionData = (N2NVersionData) acceptVersion.getVersionData(); + versionData.add(n2nVersionData.getInitiatorOnlyDiffusionMode()? SimpleValue.TRUE: SimpleValue.FALSE); + versionData.add(new UnsignedInteger(n2nVersionData.getPeerSharing())); + versionData.add(n2nVersionData.getQuery() ? SimpleValue.TRUE: SimpleValue.FALSE); + } + //TODO N2C + + array.add(versionData); + + return array; + } } public enum ReasonVersionMismatchSerializer implements Serializer { @@ -165,7 +207,24 @@ public ReasonVersionMismatch deserializeDI(DataItem di) { return new ReasonVersionMismatch(versionNumbers); } - //TODO -- deserialize not used + @Override + public DataItem serializeDI(ReasonVersionMismatch reasonVersionMismatch) { + Array array = new Array(); + array.add(new UnsignedInteger(0)); + + if (reasonVersionMismatch.getVersionNumbers() == null) { + array.add(new Array()); + } else { + Array versionArr = new Array(); + for (Long version: reasonVersionMismatch.getVersionNumbers()) { + versionArr.add(new UnsignedInteger(version)); + } + + array.add(versionArr); + } + + return array; + } } public enum ReasonHandshakeDecodeErrorSerializer implements Serializer { @@ -186,7 +245,16 @@ public ReasonHandshakeDecodeError deserializeDI(DataItem di) { return new ReasonHandshakeDecodeError(versionNumber, str); } - //TODO -- deserialize not used + @Override + public DataItem serializeDI(ReasonHandshakeDecodeError reasonHandshakeDecodeError) { + Array array = new Array(); + array.add(new UnsignedInteger(1)); + + array.add(new UnsignedInteger(reasonHandshakeDecodeError.getVersionNumber())); + array.add(new UnicodeString(reasonHandshakeDecodeError.getStr())); + + return array; + } } public enum ReasonRefusedSerializer implements Serializer { @@ -207,7 +275,16 @@ public ReasonRefused deserializeDI(DataItem di) { return new ReasonRefused(versionNumber, str); } - //TODO -- deserialize not used + @Override + public DataItem serializeDI(ReasonRefused reasonRefused) { + Array array = new Array(); + array.add(new UnsignedInteger(2)); + + array.add(new UnsignedInteger(reasonRefused.getVersionNumber())); + array.add(new UnicodeString(reasonRefused.getStr())); + + return array; + } } public enum QueryReplySerializer implements Serializer { diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/keepalive/KeepAliveAgent.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/keepalive/KeepAliveAgent.java index 7e008b04..ad8f669d 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/keepalive/KeepAliveAgent.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/keepalive/KeepAliveAgent.java @@ -21,6 +21,10 @@ public class KeepAliveAgent extends Agent { private Queue reqQueue; public KeepAliveAgent() { + this(true); + } + public KeepAliveAgent(boolean isClient) { + super(isClient); this.currenState = Client; this.reqQueue = new ConcurrentLinkedQueue<>(); } diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/keepalive/KeepAliveState.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/keepalive/KeepAliveState.java index 9cd81d30..29c30ad0 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/keepalive/KeepAliveState.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/keepalive/KeepAliveState.java @@ -17,8 +17,8 @@ else if (message instanceof MsgDone) } @Override - public boolean hasAgency() { - return true; + public boolean hasAgency(boolean isClient) { + return isClient; } }, Server { @@ -28,8 +28,8 @@ public KeepAliveState nextState(Message message) { } @Override - public boolean hasAgency() { - return false; + public boolean hasAgency(boolean isClient) { + return !isClient; } }, Done { @@ -39,8 +39,8 @@ public KeepAliveState nextState(Message message) { } @Override - public boolean hasAgency() { - return false; + public boolean hasAgency(boolean isClient) { + return !isClient; } } } diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localstate/LocalStateQueryAgent.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localstate/LocalStateQueryAgent.java index 10bcbfb7..fe79f57f 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localstate/LocalStateQueryAgent.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localstate/LocalStateQueryAgent.java @@ -24,6 +24,10 @@ public class LocalStateQueryAgent extends Agent { private Queue pendingQueryCommands; public LocalStateQueryAgent() { + this(true); + } + public LocalStateQueryAgent(boolean isClient) { + super(isClient); this.currenState = Idle; acquiredCommands = new ConcurrentLinkedQueue<>(); diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localstate/LocalStateQueryState.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localstate/LocalStateQueryState.java index 7ad8b2c0..f70fb7ec 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localstate/LocalStateQueryState.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localstate/LocalStateQueryState.java @@ -20,8 +20,8 @@ else if (message instanceof MsgDone) } @Override - public boolean hasAgency() { - return true; + public boolean hasAgency(boolean isClient) { + return isClient; } List allowedMsgTypes = List.of(MsgAcquire.class, MsgDone.class); @@ -44,8 +44,8 @@ else if (message instanceof MsgFailure) } @Override - public boolean hasAgency() { - return false; + public boolean hasAgency(boolean isClient) { + return !isClient; } List allowedMsgTypes = List.of(MsgAcquired.class, MsgFailure.class); @@ -70,8 +70,8 @@ else if (message instanceof MsgRelease) } @Override - public boolean hasAgency() { - return true; + public boolean hasAgency(boolean isClient) { + return isClient; } List allowedMsgTypes = List.of(MsgQuery.class, MsgReAcquire.class, MsgRelease.class); @@ -92,8 +92,8 @@ public State nextState(Message message) { } @Override - public boolean hasAgency() { - return false; + public boolean hasAgency(boolean isClient) { + return !isClient; } List allowedMsgTypes = List.of(MsgResult.class); @@ -111,7 +111,7 @@ public State nextState(Message message) { } @Override - public boolean hasAgency() { + public boolean hasAgency(boolean isClient) { return false; } } diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localtx/LocalTxSubmissionAgent.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localtx/LocalTxSubmissionAgent.java index a8f32397..18931927 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localtx/LocalTxSubmissionAgent.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localtx/LocalTxSubmissionAgent.java @@ -19,6 +19,10 @@ public class LocalTxSubmissionAgent extends Agent { private Queue pendingQueue; public LocalTxSubmissionAgent() { + this(true); + } + public LocalTxSubmissionAgent(boolean isClient) { + super(isClient); txnQueue = new ConcurrentLinkedQueue<>(); pendingQueue = new ConcurrentLinkedQueue<>(); this.currenState = LocalTxSubmissionState.Idle; diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localtx/LocalTxSubmissionState.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localtx/LocalTxSubmissionState.java index ccb25035..e66803a4 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localtx/LocalTxSubmissionState.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localtx/LocalTxSubmissionState.java @@ -20,8 +20,8 @@ else if (message instanceof MsgDone) } @Override - public boolean hasAgency() { - return true; + public boolean hasAgency(boolean isClient) { + return isClient; } }, Busy { @@ -34,8 +34,8 @@ public State nextState(Message message) { } @Override - public boolean hasAgency() { - return false; + public boolean hasAgency(boolean isClient) { + return !isClient; } }, Done { @@ -45,7 +45,7 @@ public State nextState(Message message) { } @Override - public boolean hasAgency() { + public boolean hasAgency(boolean isClient) { return false; } } diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localtxmonitor/LocalTxMonitorAgent.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localtxmonitor/LocalTxMonitorAgent.java index 33d1b61b..a47ac672 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localtxmonitor/LocalTxMonitorAgent.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localtxmonitor/LocalTxMonitorAgent.java @@ -19,6 +19,10 @@ public class LocalTxMonitorAgent extends Agent { private Queue pendingQueryQueue; public LocalTxMonitorAgent() { + this(true); + } + public LocalTxMonitorAgent(boolean isClient) { + super(isClient); this.currenState = Idle; this.acquiredCommands = new ConcurrentLinkedQueue<>(); this.pendingQueryQueue = new ConcurrentLinkedQueue<>(); diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localtxmonitor/LocalTxMonitorState.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localtxmonitor/LocalTxMonitorState.java index cc37ce62..2af3e22c 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localtxmonitor/LocalTxMonitorState.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/localtxmonitor/LocalTxMonitorState.java @@ -19,8 +19,8 @@ else if (message instanceof MsgDone) } @Override - public boolean hasAgency() { - return true; + public boolean hasAgency(boolean isClient) { + return isClient; } List allowedMsgTypes = List.of(MsgAwaitAcquire.class, MsgAcquire.class, MsgDone.class); @@ -40,8 +40,8 @@ public State nextState(Message message) { } @Override - public boolean hasAgency() { - return false; + public boolean hasAgency(boolean isClient) { + return !isClient; } List allowedMsgTypes = List.of(MsgAcquired.class); @@ -65,8 +65,8 @@ else if (message instanceof MsgHasTx || message instanceof MsgNextTx || message } @Override - public boolean hasAgency() { - return true; + public boolean hasAgency(boolean isClient) { + return isClient; } List allowedMsgTypes = List.of(MsgAwaitAcquire.class, MsgRelease.class, MsgHasTx.class, MsgNextTx.class, MsgGetSizes.class); @@ -86,8 +86,8 @@ public State nextState(Message message) { } @Override - public boolean hasAgency() { - return false; + public boolean hasAgency(boolean isClient) { + return !isClient; } List allowedMsgTypes = List.of(MsgReplyHasTx.class, MsgReplyNextTx.class, MsgReplyGetSizes.class); @@ -104,7 +104,7 @@ public State nextState(Message message) { } @Override - public boolean hasAgency() { + public boolean hasAgency(boolean isClient) { return false; } } diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/txsubmission/TxSubmissionAgent.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/txsubmission/TxSubmissionAgent.java index 0f84ccac..966820ce 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/txsubmission/TxSubmissionAgent.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/txsubmission/TxSubmissionAgent.java @@ -24,6 +24,10 @@ public class TxSubmissionAgent extends Agent { private final ConcurrentLinkedQueue requestedTxIds; public TxSubmissionAgent() { + this(true); + } + public TxSubmissionAgent(boolean isClient) { + super(isClient); this.currenState = TxSubmissionState.Init; this.txs = new ConcurrentLinkedQueue<>(); this.pendingTxIds = new ConcurrentLinkedQueue<>(); @@ -36,7 +40,7 @@ public int getProtocolId() { } @Override - public Message buildNextMessage() { +public Message buildNextMessage() { switch ((TxSubmissionState) currenState) { case Init: return new Init(); diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/txsubmission/TxSubmissionState.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/txsubmission/TxSubmissionState.java index 93a62cc5..fab5bd1a 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/txsubmission/TxSubmissionState.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/txsubmission/TxSubmissionState.java @@ -18,8 +18,8 @@ public State nextState(Message message) { } @Override - public boolean hasAgency() { - return true; + public boolean hasAgency(boolean isClient) { + return isClient; } }, Idle { @@ -35,8 +35,8 @@ public State nextState(Message message) { } @Override - public boolean hasAgency() { - return false; + public boolean hasAgency(boolean isClient) { + return !isClient; } }, TxIdsBlocking { @@ -49,8 +49,8 @@ public State nextState(Message message) { } @Override - public boolean hasAgency() { - return true; + public boolean hasAgency(boolean isClient) { + return isClient; } }, TxIdsNonBlocking { @@ -63,8 +63,8 @@ public State nextState(Message message) { } @Override - public boolean hasAgency() { - return true; + public boolean hasAgency(boolean isClient) { + return isClient; } }, Txs { @@ -77,8 +77,8 @@ public State nextState(Message message) { } @Override - public boolean hasAgency() { - return true; + public boolean hasAgency(boolean isClient) { + return isClient; } }, Done { @@ -88,7 +88,7 @@ public State nextState(Message message) { } @Override - public boolean hasAgency() { + public boolean hasAgency(boolean isClient) { return false; } } diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/txsubmission/messges/ReplyTxIds.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/txsubmission/messges/ReplyTxIds.java index c8cfac34..dbae4d43 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/txsubmission/messges/ReplyTxIds.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/txsubmission/messges/ReplyTxIds.java @@ -21,6 +21,11 @@ public ReplyTxIds() { this(Era.Babbage); } + public ReplyTxIds(Era era, Map txIdAndSizeMap) { + this.era = era; + this.txIdAndSizeMap = txIdAndSizeMap; + } + public void addTxId(String id, int size) { if (txIdAndSizeMap == null) txIdAndSizeMap = new HashMap<>(); diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/txsubmission/messges/ReplyTxs.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/txsubmission/messges/ReplyTxs.java index 1fa5161e..33095fb6 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/txsubmission/messges/ReplyTxs.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/txsubmission/messges/ReplyTxs.java @@ -18,7 +18,7 @@ public class ReplyTxs implements Message { private List txns; public ReplyTxs() { - this(Era.Babbage); + this(Era.Conway); } public void addTx(byte[] tx) { diff --git a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/txsubmission/serializers/TxSubmissionMessagesSerializers.java b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/txsubmission/serializers/TxSubmissionMessagesSerializers.java index d5162d22..171b3389 100644 --- a/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/txsubmission/serializers/TxSubmissionMessagesSerializers.java +++ b/core/src/main/java/com/bloxbean/cardano/yaci/core/protocol/txsubmission/serializers/TxSubmissionMessagesSerializers.java @@ -3,6 +3,7 @@ import co.nstant.in.cbor.model.*; import com.bloxbean.cardano.client.exception.CborRuntimeException; import com.bloxbean.cardano.yaci.core.protocol.Serializer; +import com.bloxbean.cardano.yaci.core.protocol.localstate.api.Era; import com.bloxbean.cardano.yaci.core.protocol.txsubmission.messges.*; import com.bloxbean.cardano.yaci.core.util.CborSerializationUtil; import com.bloxbean.cardano.yaci.core.util.HexUtil; @@ -99,7 +100,37 @@ public byte[] serialize(ReplyTxIds replyTxIds) { return CborSerializationUtil.serialize(array); } - //TODO deserializeDI() -- Not used + @Override + public ReplyTxIds deserializeDI(DataItem di) { + Array array = (Array) di; + List dataItemList = array.getDataItems(); + + int label = ((UnsignedInteger) dataItemList.get(0)).getValue().intValue(); + if (label != 1) + throw new CborRuntimeException("Parsing error. Invalid label: " + di); + + Array pairsArray = (Array) dataItemList.get(1); + List pairs = pairsArray.getDataItems(); + var txIdAndSizeMap = new java.util.HashMap(); + + for (DataItem pairDI : pairs) { + if (pairDI instanceof Special) { + break; // Handle the BREAK special case + } + + Array pair = (Array) pairDI; + Array eraAndIdArray = (Array) pair.getDataItems().get(0); + + int eraValue = ((UnsignedInteger) eraAndIdArray.getDataItems().get(0)).getValue().intValue(); // Era + String txId = HexUtil.encodeHexString(((ByteString) eraAndIdArray.getDataItems().get(1)).getBytes()); + int size = ((UnsignedInteger) pair.getDataItems().get(1)).getValue().intValue(); + + txIdAndSizeMap.put(txId, size); + } + + //TODO -- Change era + return new ReplyTxIds(Era.Conway, txIdAndSizeMap); + } } public enum RequestTxsSerializer implements Serializer { diff --git a/core/src/test/java/com/bloxbean/cardano/yaci/core/network/server/NodeServerTest.java b/core/src/test/java/com/bloxbean/cardano/yaci/core/network/server/NodeServerTest.java new file mode 100644 index 00000000..b5d72917 --- /dev/null +++ b/core/src/test/java/com/bloxbean/cardano/yaci/core/network/server/NodeServerTest.java @@ -0,0 +1,57 @@ +package com.bloxbean.cardano.yaci.core.network.server; + +import com.bloxbean.cardano.yaci.core.network.TCPNodeClient; +import com.bloxbean.cardano.yaci.core.protocol.handshake.HandshakeAgent; +import com.bloxbean.cardano.yaci.core.protocol.handshake.HandshakeAgentListener; +import com.bloxbean.cardano.yaci.core.protocol.handshake.messages.Reason; +import com.bloxbean.cardano.yaci.core.protocol.handshake.util.N2NVersionTableConstant; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.assertj.core.api.Assertions.assertThat; + +@Slf4j +public class NodeServerTest { + + @Test + void startNodeServer_successfulHandshake() throws InterruptedException { + NodeServer server = new NodeServer(3333, N2NVersionTableConstant.v11AndAbove(1)); + Thread t = new Thread() { + @Override + public void run() { + server.start(); + } + }; + + t.start(); + + HandshakeAgent handshakeAgent = new HandshakeAgent(N2NVersionTableConstant.v11AndAbove(1), true); + TCPNodeClient tcpNodeClient = new TCPNodeClient("localhost", 3333, handshakeAgent); + + AtomicBoolean success = new AtomicBoolean(); + CountDownLatch countDownLatch = new CountDownLatch(1); + handshakeAgent.addListener(new HandshakeAgentListener() { + @Override + public void handshakeOk() { + log.info("HANDSHAKE Successful"); + countDownLatch.countDown(); + success.set(true); + } + + @Override + public void handshakeError(Reason reason) { + log.info("ERROR {}", reason); + } + }); + + tcpNodeClient.start(); + + countDownLatch.await(10, TimeUnit.SECONDS); + t.stop(); + assertThat(success).isTrue(); + } +} diff --git a/helper/src/main/java/com/bloxbean/cardano/yaci/helper/TxSubmissionClient.java b/helper/src/main/java/com/bloxbean/cardano/yaci/helper/TxSubmissionClient.java index a2fee4ba..9b6759a8 100644 --- a/helper/src/main/java/com/bloxbean/cardano/yaci/helper/TxSubmissionClient.java +++ b/helper/src/main/java/com/bloxbean/cardano/yaci/helper/TxSubmissionClient.java @@ -13,7 +13,7 @@ import com.bloxbean.cardano.yaci.core.protocol.txsubmission.messges.RequestTxs; import lombok.extern.slf4j.Slf4j; -import static com.bloxbean.cardano.yaci.core.common.TxBodyType.BABBAGE; +import static com.bloxbean.cardano.yaci.core.common.TxBodyType.CONWAY; /** * This helper is still under development. @@ -105,7 +105,7 @@ public boolean isRunning() { public void submitTxBytes(byte[] txBytes) { var txHash = TransactionUtil.getTxHash(txBytes); - this.submitTxBytes(txHash, txBytes, BABBAGE); + this.submitTxBytes(txHash, txBytes, CONWAY); } public void submitTxBytes(String txHash, byte[] txBytes, TxBodyType txBodyType) {