Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,22 @@ public abstract class NodeClient {
private Agent[] agents;
private EventLoopGroup workerGroup;
private Session session;
protected NodeClientConfig config;

public NodeClient() {

}

public NodeClient(HandshakeAgent handshakeAgent, Agent... agents) {
/**
* Constructor with NodeClientConfig for configurable connection behavior.
*
* @param config the connection configuration
* @param handshakeAgent the handshake agent
* @param agents the protocol agents
*/
public NodeClient(NodeClientConfig config, HandshakeAgent handshakeAgent, Agent... agents) {
this.sessionListener = new SessionListenerAdapter();
this.config = config != null ? config : NodeClientConfig.defaultConfig();

this.handshakeAgent = handshakeAgent;
this.agents = agents;
Expand All @@ -37,6 +46,16 @@ public NodeClient(HandshakeAgent handshakeAgent, Agent... agents) {
attachHandshakeListener();
}

/**
* Constructor with default configuration (for backward compatibility).
*
* @param handshakeAgent the handshake agent
* @param agents the protocol agents
*/
public NodeClient(HandshakeAgent handshakeAgent, Agent... agents) {
this(NodeClientConfig.defaultConfig(), handshakeAgent, agents);
}

private void attachHandshakeListener() {
handshakeAgent.addListener(new HandshakeAgentListener() {
@Override
Expand Down Expand Up @@ -80,7 +99,7 @@ public void initChannel(Channel ch)

SocketAddress socketAddress = createSocketAddress();

session = new Session(socketAddress, b, handshakeAgent, agents);
session = new Session(socketAddress, b, config, handshakeAgent, agents);
session.setSessionListener(sessionListener);
session.start();
} catch (Exception e) {
Expand All @@ -92,6 +111,16 @@ public boolean isRunning() {
return session != null;
}

/**
* Get the current NodeClientConfig.
* This is primarily for use by subclasses to access configuration options.
*
* @return the current configuration
*/
protected NodeClientConfig getConfig() {
return config;
}

public void shutdown() {
if (showConnectionLog())
log.info("Shutdown connection !!!");
Expand Down Expand Up @@ -190,6 +219,7 @@ public void connected() {
}

private boolean showConnectionLog() {
return log.isDebugEnabled() || (handshakeAgent != null && !handshakeAgent.isSuppressConnectionInfoLog());
return (config != null && config.isEnableConnectionLogging()) &&
(log.isDebugEnabled() || (handshakeAgent != null && !handshakeAgent.isSuppressConnectionInfoLog()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.bloxbean.cardano.yaci.core.network;

import lombok.*;

/**
* Configuration for NodeClient connection behavior.
* This class controls reconnection policies, retry strategies, and logging preferences.
*
* <p>Use the builder pattern to create instances:</p>
* <pre>{@code
* NodeClientConfig config = NodeClientConfig.builder()
* .autoReconnect(false)
* .maxRetryAttempts(3)
* .build();
* }</pre>
*
*/
@Getter
@AllArgsConstructor(access = AccessLevel.PRIVATE)
@EqualsAndHashCode
@ToString
@Builder(toBuilder = true)
public class NodeClientConfig {

/**
* Whether to automatically reconnect when connection is lost.
* Default: true (maintains backward compatibility for long-running processes like indexers)
* Set to false for short-lived connections (e.g., peer discovery)
*/
@Builder.Default
private final boolean autoReconnect = true;

/**
* Initial delay in milliseconds between retry attempts during connection establishment.
* Default: 8000ms (8 seconds)
*/
@Builder.Default
private final int initialRetryDelayMs = 8000;

/**
* Maximum number of retry attempts before giving up.
* Default: Integer.MAX_VALUE (unlimited retries for backward compatibility)
*/
@Builder.Default
private final int maxRetryAttempts = Integer.MAX_VALUE;

/**
* Whether to enable connection-related logging (connect, disconnect, reconnect messages).
* Default: true
*/
@Builder.Default
private final boolean enableConnectionLogging = true;

/**
* Connection timeout in milliseconds for establishing TCP connections.
* This configures Netty's CONNECT_TIMEOUT_MILLIS option.
* Default: 30000ms (30 seconds)
*
* For short-lived connections like peer discovery, consider using a lower value (e.g., 10000ms).
* For long-running connections, the default is appropriate.
*/
@Builder.Default
private final int connectionTimeoutMs = 30000;

/**
* Creates a default configuration with backward-compatible settings.
* This is equivalent to calling {@code NodeClientConfig.builder().build()}
*
* @return a new NodeClientConfig with default values
*/
public static NodeClientConfig defaultConfig() {
return NodeClientConfig.builder().build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,47 @@ class Session implements Disposable {
private final SocketAddress socketAddress;
private final Bootstrap clientBootstrap;
private Channel activeChannel;
private final AtomicBoolean shouldReconnect; //Not used currently
private final AtomicBoolean shouldReconnect;
private final HandshakeAgent handshakeAgent;
private final Agent[] agents;
private final NodeClientConfig config;

private SessionListener sessionListener;

public Session(SocketAddress socketAddress, Bootstrap clientBootstrap, HandshakeAgent handshakeAgent, Agent[] agents) {
/**
* Constructor with NodeClientConfig for configurable connection behavior.
*
* @param socketAddress the socket address to connect to
* @param clientBootstrap the Netty bootstrap
* @param config the connection configuration
* @param handshakeAgent the handshake agent
* @param agents the protocol agents
*/
public Session(SocketAddress socketAddress, Bootstrap clientBootstrap, NodeClientConfig config,
HandshakeAgent handshakeAgent, Agent[] agents) {
this.socketAddress = socketAddress;
this.clientBootstrap = clientBootstrap;
this.shouldReconnect = new AtomicBoolean(true);
this.config = config != null ? config : NodeClientConfig.defaultConfig();
this.shouldReconnect = new AtomicBoolean(this.config.isAutoReconnect());

this.handshakeAgent = handshakeAgent;
this.agents = agents;
}

/**
* Constructor with default configuration (for backward compatibility).
*
* @param socketAddress the socket address to connect to
* @param clientBootstrap the Netty bootstrap
* @param handshakeAgent the handshake agent
* @param agents the protocol agents
* @deprecated Use {@link #Session(SocketAddress, Bootstrap, NodeClientConfig, HandshakeAgent, Agent[])} instead
*/
@Deprecated
public Session(SocketAddress socketAddress, Bootstrap clientBootstrap, HandshakeAgent handshakeAgent, Agent[] agents) {
this(socketAddress, clientBootstrap, NodeClientConfig.defaultConfig(), handshakeAgent, agents);
}

public void setSessionListener(SessionListener sessionListener) {
if (this.sessionListener != null)
throw new RuntimeException("SessionListener is already there. Can't set another SessionListener");
Expand All @@ -43,15 +69,21 @@ public void setSessionListener(SessionListener sessionListener) {
public Disposable start() throws InterruptedException {
//Create a new connectFuture
ChannelFuture connectFuture = null;
while (connectFuture == null && shouldReconnect.get()) {
// Always try to connect at least once, then retry only if shouldReconnect is true
do {
try {
connectFuture = clientBootstrap.connect(socketAddress).sync();
} catch (Exception e) {
log.error("Connection failed", e);
Thread.sleep(8000);
log.debug("Trying to reconnect !!!");
if (shouldReconnect.get()) {
Thread.sleep(config.getInitialRetryDelayMs());
log.debug("Trying to reconnect !!!");
} else {
// If auto-reconnect is disabled, fail fast
throw e;
}
}
}
} while (connectFuture == null && shouldReconnect.get());

handshakeAgent.reset();
for (Agent agent: agents) {
Expand Down Expand Up @@ -134,6 +166,7 @@ public void handshake() {
}

private boolean showConnectionLog() {
return log.isDebugEnabled() || (handshakeAgent != null && !handshakeAgent.isSuppressConnectionInfoLog());
return config.isEnableConnectionLogging() &&
(log.isDebugEnabled() || (handshakeAgent != null && !handshakeAgent.isSuppressConnectionInfoLog()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,33 @@ public class TCPNodeClient extends NodeClient {
private String host;
private int port;

public TCPNodeClient(String host, int port, HandshakeAgent handshakeAgent, Agent... agents) {
super(handshakeAgent, agents);
/**
* Constructor with NodeClientConfig for configurable connection behavior.
*
* @param host the host to connect to
* @param port the port to connect to
* @param config the connection configuration
* @param handshakeAgent the handshake agent
* @param agents the protocol agents
*/
public TCPNodeClient(String host, int port, NodeClientConfig config, HandshakeAgent handshakeAgent, Agent... agents) {
super(config, handshakeAgent, agents);
this.host = host;
this.port = port;
}

/**
* Constructor with default configuration (for backward compatibility).
*
* @param host the host to connect to
* @param port the port to connect to
* @param handshakeAgent the handshake agent
* @param agents the protocol agents
*/
public TCPNodeClient(String host, int port, HandshakeAgent handshakeAgent, Agent... agents) {
this(host, port, NodeClientConfig.defaultConfig(), handshakeAgent, agents);
}

@Override
protected SocketAddress createSocketAddress() {
return new InetSocketAddress(host, port);
Expand All @@ -46,5 +67,6 @@ protected Class getChannelClass() {
protected void configureChannel(Bootstrap bootstrap) {
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConfig().getConnectionTimeoutMs());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.bloxbean.cardano.yaci.core.protocol.handshake.HandshakeAgent;
import com.bloxbean.cardano.yaci.core.util.OSUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
Expand All @@ -21,11 +22,30 @@
public class UnixSocketNodeClient extends NodeClient {
private String nodeSocketFile;

public UnixSocketNodeClient(String nodeSocketFile, HandshakeAgent handshakeAgent, Agent... agents) {
super(handshakeAgent, agents);
/**
* Constructor with NodeClientConfig for configurable connection behavior.
*
* @param nodeSocketFile the path to the Unix domain socket
* @param config the connection configuration
* @param handshakeAgent the handshake agent
* @param agents the protocol agents
*/
public UnixSocketNodeClient(String nodeSocketFile, NodeClientConfig config, HandshakeAgent handshakeAgent, Agent... agents) {
super(config, handshakeAgent, agents);
this.nodeSocketFile = nodeSocketFile;
}

/**
* Constructor with default configuration (for backward compatibility).
*
* @param nodeSocketFile the path to the Unix domain socket
* @param handshakeAgent the handshake agent
* @param agents the protocol agents
*/
public UnixSocketNodeClient(String nodeSocketFile, HandshakeAgent handshakeAgent, Agent... agents) {
this(nodeSocketFile, NodeClientConfig.defaultConfig(), handshakeAgent, agents);
}

@Override
protected SocketAddress createSocketAddress() {
return new DomainSocketAddress(new File(nodeSocketFile));
Expand All @@ -49,6 +69,6 @@ protected Class getChannelClass() {

@Override
protected void configureChannel(Bootstrap bootstrap) {

bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConfig().getConnectionTimeoutMs());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
public class PeerSharingAgent extends Agent<PeerSharingAgentListener> {
public static final int DEFAULT_REQUEST_AMOUNT = 10;
public static final int MAX_REQUEST_AMOUNT = 100;
public static final long RESPONSE_TIMEOUT_MS = 30000; // 30 seconds
public static final long RESPONSE_TIMEOUT_MS = 5000;

private boolean shutDown;
private Queue<MsgShareRequest> requestQueue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -100,7 +101,7 @@ private Array serializePeerAddress(PeerAddress peerAddress) {
array.add(new UnsignedInteger(0));

// Convert 4 bytes to word32
ByteBuffer buffer = ByteBuffer.wrap(addressBytes);
ByteBuffer buffer = ByteBuffer.wrap(addressBytes).order(ByteOrder.LITTLE_ENDIAN);
long ipv4AsLong = buffer.getInt() & 0xFFFFFFFFL; // Convert to unsigned
array.add(new UnsignedInteger(ipv4AsLong));

Expand All @@ -111,7 +112,7 @@ private Array serializePeerAddress(PeerAddress peerAddress) {
array.add(new UnsignedInteger(1));

// Convert 16 bytes to 4 word32s
ByteBuffer buffer = ByteBuffer.wrap(addressBytes);
ByteBuffer buffer = ByteBuffer.wrap(addressBytes).order(ByteOrder.LITTLE_ENDIAN);
for (int i = 0; i < 4; i++) {
long word32 = buffer.getInt() & 0xFFFFFFFFL; // Convert to unsigned
array.add(new UnsignedInteger(word32));
Expand All @@ -136,7 +137,7 @@ private PeerAddress deserializePeerAddress(DataItem di) {
int port = ((UnsignedInteger) dataItemList.get(2)).getValue().intValue();

// Convert word32 back to IPv4 address
byte[] addressBytes = ByteBuffer.allocate(4).putInt((int) ipv4Long).array();
byte[] addressBytes = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt((int) ipv4Long).array();
try {
InetAddress inetAddress = InetAddress.getByAddress(addressBytes);
return PeerAddress.ipv4(inetAddress.getHostAddress(), port);
Expand All @@ -146,15 +147,19 @@ private PeerAddress deserializePeerAddress(DataItem di) {

} else if (type == 1) { // IPv6
byte[] addressBytes = new byte[16];
ByteBuffer buffer = ByteBuffer.wrap(addressBytes);
ByteBuffer buffer = ByteBuffer.wrap(addressBytes).order(ByteOrder.LITTLE_ENDIAN);

// Convert 4 word32s back to 16 bytes
for (int i = 1; i <= 4; i++) {
long word32 = ((UnsignedInteger) dataItemList.get(i)).getValue().longValue();
buffer.putInt((int) word32);
}

int port = ((UnsignedInteger) dataItemList.get(5)).getValue().intValue();
// Handle both V11-12 (8 elements) and V13+ (6 elements) formats
// V11-12: [type, addr1-4, flowInfo, scopeId, port] - port at index 7
// V13+: [type, addr1-4, port] - port at index 5
int portIndex = dataItemList.size() == 8 ? 7 : 5;
int port = ((UnsignedInteger) dataItemList.get(portIndex)).getValue().intValue();

try {
InetAddress inetAddress = InetAddress.getByAddress(addressBytes);
Expand Down
Loading