Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.bloxbean.cardano.yaci.core.network.server;

import com.bloxbean.cardano.yaci.core.protocol.handshake.messages.VersionTable;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class NodeServer {
private final int port;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private Channel serverChannel;
private VersionTable versionTable;
private final static Map<Channel, NodeServerSession> sessions = new ConcurrentHashMap<>();

public NodeServer(int port, VersionTable versionTable) {
this.port = port;
this.bossGroup = new NioEventLoopGroup(1);
this.workerGroup = new NioEventLoopGroup();
this.versionTable = versionTable;
}

public void start() {
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
NodeServerSession session = new NodeServerSession(ch, versionTable);
sessions.put(ch, session);
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true);

ChannelFuture future = bootstrap.bind(port).sync();
serverChannel = future.channel();
log.info("NodeServer started on port {}", port);

serverChannel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("NodeServer interrupted", e);
} finally {
shutdown();
}
}

public void shutdown() {
log.info("Shutting down NodeServer...");
for (NodeServerSession session : sessions.values()) {
session.close();
}
if (serverChannel != null) {
serverChannel.close();
}
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

public static void removeSession(Channel channel) {
NodeServerSession session = sessions.remove(channel);
if (session != null) {
session.close();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.bloxbean.cardano.yaci.core.network.server;

import com.bloxbean.cardano.yaci.core.network.handlers.MiniProtoRequestDataEncoder;
import com.bloxbean.cardano.yaci.core.network.handlers.MiniProtoStreamingByteToMessageDecoder;
import com.bloxbean.cardano.yaci.core.network.server.handlers.MiniProtoServerInboundHandler;
import com.bloxbean.cardano.yaci.core.protocol.Agent;
import com.bloxbean.cardano.yaci.core.protocol.handshake.HandshakeAgent;
import com.bloxbean.cardano.yaci.core.protocol.handshake.messages.VersionTable;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NodeServerSession {
private final Channel clientChannel;
private final HandshakeAgent handshakeAgent;
private final Agent[] agents;

public NodeServerSession(Channel clientChannel, VersionTable versionTable) {
this.clientChannel = clientChannel;
this.handshakeAgent = new HandshakeAgent(versionTable, false);
this.agents = createAgents();

setupPipeline();
}

private void setupPipeline() {
ChannelPipeline pipeline = clientChannel.pipeline();
pipeline.addLast(new MiniProtoRequestDataEncoder());
pipeline.addLast(new MiniProtoStreamingByteToMessageDecoder(agents));
pipeline.addLast(new MiniProtoServerInboundHandler(this));
}

public HandshakeAgent getHandshakeAgent() {
return handshakeAgent;
}

public Agent[] getAgents() {
return agents;
}

public Channel getClientChannel() {
return clientChannel;
}

public void close() {
log.info("Closing session for client: {}", clientChannel.remoteAddress());
clientChannel.close();
}

private Agent[] createAgents() {
// Initialize specific mini-protocol handlers (ChainSync, BlockFetch, etc.)
return new Agent[]{

};
}
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.bloxbean.cardano.yaci.core.network.server;

import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class SessionManager {
private static final Set<Channel> activeClients = Collections.newSetFromMap(new ConcurrentHashMap<>());

public static void addClient(Channel channel) {
activeClients.add(channel);
log.info("Client connected: {} (Total: {})", channel.remoteAddress(), activeClients.size());
}

public static void removeClient(Channel channel) {
activeClients.remove(channel);
log.info("Client disconnected: {} (Remaining: {})", channel.remoteAddress(), activeClients.size());
}

public static int getActiveClientCount() {
return activeClients.size();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.bloxbean.cardano.yaci.core.network.server.handlers;

import com.bloxbean.cardano.yaci.core.network.server.NodeServer;
import com.bloxbean.cardano.yaci.core.network.server.NodeServerSession;
import com.bloxbean.cardano.yaci.core.protocol.Agent;
import com.bloxbean.cardano.yaci.core.protocol.Message;
import com.bloxbean.cardano.yaci.core.protocol.Segment;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MiniProtoServerInboundHandler extends SimpleChannelInboundHandler<Segment> {
private final NodeServerSession session;

public MiniProtoServerInboundHandler(NodeServerSession session) {
this.session = session;
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
log.info("Client connected: {}", ctx.channel().remoteAddress());
session.getHandshakeAgent().setChannel(ctx.channel());
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, Segment segment) {
try {
log.debug("Received segment from client: Protocol {} - Length {}", segment.getProtocol(), segment.getPayload().length);

if (segment.getProtocol() == session.getHandshakeAgent().getProtocolId()) {
Message message = session.getHandshakeAgent().deserializeResponse(segment.getPayload());
session.getHandshakeAgent().receiveResponse(message);

if (!session.getHandshakeAgent().isDone() && session.getHandshakeAgent().hasAgency())
session.getHandshakeAgent().sendNextMessage();
} else {
for (Agent agent : session.getAgents()) {
if (!agent.isDone() && agent.getProtocolId() == segment.getProtocol()) {
Message message = agent.deserializeResponse(segment.getPayload());
agent.receiveResponse(message);

if (agent.hasAgency())
agent.sendNextMessage();
break;
}
}
}

ctx.flush();
} finally {
ReferenceCountUtil.release(segment);
}
}


@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("Error processing client request", cause);
ctx.close();
session.close();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
log.info("Client disconnected: {}", ctx.channel().remoteAddress());

// Remove session from NodeServer when client disconnects
session.close();
NodeServer.removeSession(ctx.channel());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ public abstract class Agent<T extends AgentListener> {
private final List<T> agentListeners = new ArrayList<>();
private AcceptVersion acceptVersion;

private final boolean isClient;

public Agent(boolean isClient) {
this.isClient = isClient;
}

public void setChannel(Channel channel) {
if (this.channel != null && this.channel.isActive())
log.warn("An active channel is already attached to this agent");
Expand All @@ -25,7 +31,7 @@ public void setChannel(Channel channel) {
}

public void sendRequest(Message message) {
if (currenState.hasAgency()) {
if (currenState.hasAgency(isClient)) {
currenState = currenState.nextState(message);
} else {
//TODO
Expand Down Expand Up @@ -69,7 +75,7 @@ public final void sendNextMessage() {
}

public final boolean hasAgency() {
return currenState.hasAgency();
return currenState.hasAgency(isClient);
}

public final synchronized void addListener(T agentListener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
public interface State {
State nextState(Message message);

boolean hasAgency();
boolean hasAgency(boolean isClient);

default Message handleInbound(byte[] bytes) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public class BlockfetchAgent extends Agent<BlockfetchAgentListener> {
private long errorBlks;

public BlockfetchAgent() {
this(true);
}
public BlockfetchAgent(boolean isClient) {
super(isClient);
this.currenState = Idle;

this.startTime = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ else if (message instanceof ClientDone)
}

@Override
public boolean hasAgency() {
return true;
public boolean hasAgency(boolean isClient) {
return isClient;
}
},
Busy {
Expand All @@ -33,8 +33,8 @@ else if (message instanceof StartBatch)
}

@Override
public boolean hasAgency() {
return false;
public boolean hasAgency(boolean isClient) {
return !isClient;
}
},
Streaming {
Expand All @@ -49,8 +49,8 @@ public State nextState(Message message) {
}

@Override
public boolean hasAgency() {
return false;
public boolean hasAgency(boolean isClient) {
return !isClient;
}
},
Done {
Expand All @@ -60,7 +60,7 @@ public State nextState(Message message) {
}

@Override
public boolean hasAgency() {
public boolean hasAgency(boolean isClient) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ public class LocalChainSyncAgent extends Agent<LocalChainSyncAgentListener> {
private int counter = 0;

public LocalChainSyncAgent(Point[] knownPoints) {
this(knownPoints, true);
}
public LocalChainSyncAgent(Point[] knownPoints, boolean isClient) {
super(isClient);
this.currenState = Idle;
this.knownPoints = knownPoints;

Expand All @@ -29,6 +33,10 @@ public LocalChainSyncAgent(Point[] knownPoints) {
}

public LocalChainSyncAgent(Point[] knownPoints, long stopSlotNo, int agentNo) {
this(knownPoints,stopSlotNo, agentNo, true);
}
public LocalChainSyncAgent(Point[] knownPoints, long stopSlotNo, int agentNo, boolean isClient) {
super(isClient);
this.currenState = Idle;
this.knownPoints = knownPoints;
this.stopAt = stopSlotNo;
Expand Down
Loading