From cce20c5cb59033d9200595c8f1aca2d729398ab1 Mon Sep 17 00:00:00 2001 From: CodingEnthusiast Date: Tue, 30 Jul 2024 07:13:19 +0330 Subject: [PATCH] Introduce a new SPV client (can only sync headers for now) --- .../Clients/ISpvClientSettings.cs | 31 ++ Src/Autarkysoft.Bitcoin/Clients/SpvClient.cs | 225 ++++++++++++++ .../Clients/SpvClientSettings.cs | 144 +++++++++ .../P2PNetwork/ReplyManager.cs | 2 +- .../P2PNetwork/SpvReplyManager.cs | 276 ++++++++++++++++++ 5 files changed, 677 insertions(+), 1 deletion(-) create mode 100644 Src/Autarkysoft.Bitcoin/Clients/ISpvClientSettings.cs create mode 100644 Src/Autarkysoft.Bitcoin/Clients/SpvClient.cs create mode 100644 Src/Autarkysoft.Bitcoin/Clients/SpvClientSettings.cs create mode 100644 Src/Autarkysoft.Bitcoin/P2PNetwork/SpvReplyManager.cs diff --git a/Src/Autarkysoft.Bitcoin/Clients/ISpvClientSettings.cs b/Src/Autarkysoft.Bitcoin/Clients/ISpvClientSettings.cs new file mode 100644 index 0000000..12b6c20 --- /dev/null +++ b/Src/Autarkysoft.Bitcoin/Clients/ISpvClientSettings.cs @@ -0,0 +1,31 @@ +// Autarkysoft.Bitcoin +// Copyright (c) 2020 Autarkysoft +// Distributed under the MIT software license, see the accompanying +// file LICENCE or http://www.opensource.org/licenses/mit-license.php. + +using Autarkysoft.Bitcoin.Blockchain; +using Autarkysoft.Bitcoin.P2PNetwork.Messages; +using System.Collections.Generic; +using System.Net; + +namespace Autarkysoft.Bitcoin.Clients +{ + public interface ISpvClientSettings : IClientSettings + { + /// + /// Returns the blockchain instance to be shared among all node instances + /// + IChain Blockchain { get; } + + int GetRandomNodeAddrs(int count, bool v, List addrs); + + /// + /// Returns if the provided service flags contains services that are needed for syncing based on + /// 's current state. + /// + /// Flags to check + /// True if the required services are available; otherwise false. + bool HasNeededServices(NodeServiceFlags flags); + void RemoveNodeAddr(IPAddress e); + } +} diff --git a/Src/Autarkysoft.Bitcoin/Clients/SpvClient.cs b/Src/Autarkysoft.Bitcoin/Clients/SpvClient.cs new file mode 100644 index 0000000..9c112e0 --- /dev/null +++ b/Src/Autarkysoft.Bitcoin/Clients/SpvClient.cs @@ -0,0 +1,225 @@ +// Autarkysoft.Bitcoin +// Copyright (c) 2020 Autarkysoft +// Distributed under the MIT software license, see the accompanying +// file LICENCE or http://www.opensource.org/licenses/mit-license.php. + +using Autarkysoft.Bitcoin.Blockchain; +using Autarkysoft.Bitcoin.P2PNetwork; +using Autarkysoft.Bitcoin.P2PNetwork.Messages; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Net; +using System.Threading; +using System.Threading.Tasks; + +namespace Autarkysoft.Bitcoin.Clients +{ + public class SpvClient : IDisposable + { + /// + /// Initializes a new instance of using the given parameters. + /// + /// Client settings + public SpvClient(ISpvClientSettings settings) + { + Settings = settings; + connector = new NodeConnector(settings); + + connector.ConnectFailureEvent += Connector_ConnectFailureEvent; + settings.AllNodes.AddRemoveEvent += AllNodes_AddRemoveEvent; + Settings.Blockchain.HeaderSyncEndEvent += Blockchain_HeaderSyncEndEvent; + } + + + private void AllNodes_AddRemoveEvent(object sender, NodePool.AddRemoveEventArgs e) + { + if (isDisposed) + return; + + if (e.Action == NodePool.CollectionAction.Add) + { + if (inQueue > 0) + { + Interlocked.Decrement(ref inQueue); + } + } + else if (e.Action == NodePool.CollectionAction.Remove) + { + if (Settings.Blockchain.State == BlockchainState.HeadersSync) + { + ConnectToPeers(1); + } + else + { + ConnectToPeers(Settings.MaxConnectionCount - Settings.AllNodes.Count - inQueue); + } + } + } + + + private void Connector_ConnectFailureEvent(object sender, IPAddress e) + { + if (isDisposed) + return; + + Settings.RemoveNodeAddr(e); + Debug.Assert(inQueue > 0); + Interlocked.Decrement(ref inQueue); + if (Settings.Blockchain.State == BlockchainState.HeadersSync) + { + ConnectToPeers(1); + } + else + { + ConnectToPeers(Settings.MaxConnectionCount - Settings.AllNodes.Count - inQueue); + } + } + + private void Blockchain_HeaderSyncEndEvent(object sender, EventArgs e) + { + if (isDisposed) + return; + + // Increase the number of connections to max connection and start dowloading blocks + ConnectToPeers(Settings.MaxConnectionCount - Settings.AllNodes.Count - inQueue); + } + + + private readonly NodeConnector connector; + private int inQueue; + + private const int DnsDigCount = 3; + + /// + /// Settings used in this client + /// + public ISpvClientSettings Settings { get; set; } + + + private async Task DigDnsSeeds(bool all) + { + // // TODO: this could be used for DNS seeds + // https://github.com/sipa/bitcoin-seeder/blob/a09d2870d1b7f4dd3c1753bbf4fd0bc3690b7ef9/main.cpp#L165-L174 + List result = new List(); + int[] indices; + if (all) + { + indices = Enumerable.Range(0, Settings.DnsSeeds.Length).ToArray(); + } + else + { + indices = Settings.Rng.GetDistinct(0, Settings.DnsSeeds.Length, Math.Min(DnsDigCount, Settings.DnsSeeds.Length)); + } + + foreach (int i in indices) + { + try + { + IPAddress[] temp = await Dns.GetHostAddressesAsync(Settings.DnsSeeds[i]); + if (!(temp is null)) + { + result.AddRange(temp); + } + } + catch (Exception) { } + } + + return result.ToArray(); + } + + private async void ConnectToPeers(int count) + { + // TODO: should we add a "connectLock" in callers? It could prevent calling this method before inQueue is updated + + if (count <= 0 || isDisposed) + { + return; + } + + List addrs = new List(count); + + int added = Settings.GetRandomNodeAddrs(count, false, addrs); + if (added < count) + { + IPAddress[] ips = await DigDnsSeeds(false); + if (ips is null) + { + // If random dig failed, dig all of them. + ips = await DigDnsSeeds(true); + if (ips is null && added == 0) + { + // This could mean we can not connect to the internet + // TODO: we need some sort of message manager or logger to post results, errors,... to + // TODO: shut down FullClient? + return; + } + } + + // This is like shuffling the whole array without changing the array itself: + int[] indices = Settings.Rng.GetDistinct(0, ips.Length, ips.Length); + int index = 0; + while (added < count && index < indices.Length) + { + NetworkAddressWithTime toAdd = new NetworkAddressWithTime() + { + NodeIP = ips[index++], + NodePort = Settings.DefaultPort + }; + + if (!addrs.Contains(toAdd)) + { + addrs.Add(toAdd); + added++; + } + } + } + + // inQueue has to be incremented here instead of one at a time otherwise if halfway through the list a connection + // fails ConnectToPeers() will be called with a wrong count and will create a connection queue in connector. + Interlocked.Add(ref inQueue, addrs.Count); + foreach (var item in addrs) + { + await Task.Run(() => connector.StartConnect(new IPEndPoint(item.NodeIP, item.NodePort))); + } + } + + /// + /// Start this client by selecting a random peer's IP address and connecting to it. + /// + public void Start() + { + Settings.Blockchain.State = BlockchainState.HeadersSync; + ConnectToPeers(1); + } + + + /// + /// Sends the given message to all connected peers. + /// + /// Message to send + public void Send(Message msg) + { + foreach (var peer in Settings.AllNodes) + { + peer.Send(msg); + } + } + + + private bool isDisposed = false; + + /// + public void Dispose() + { + if (!isDisposed) + { + isDisposed = true; + + Settings.AllNodes.Dispose(); + connector?.Dispose(); + } + } + } +} diff --git a/Src/Autarkysoft.Bitcoin/Clients/SpvClientSettings.cs b/Src/Autarkysoft.Bitcoin/Clients/SpvClientSettings.cs new file mode 100644 index 0000000..357f88b --- /dev/null +++ b/Src/Autarkysoft.Bitcoin/Clients/SpvClientSettings.cs @@ -0,0 +1,144 @@ +// Autarkysoft.Bitcoin +// Copyright (c) 2020 Autarkysoft +// Distributed under the MIT software license, see the accompanying +// file LICENCE or http://www.opensource.org/licenses/mit-license.php. + +using Autarkysoft.Bitcoin.Blockchain; +using Autarkysoft.Bitcoin.P2PNetwork; +using Autarkysoft.Bitcoin.P2PNetwork.Messages; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.NetworkInformation; +using System.Net.Sockets; + +namespace Autarkysoft.Bitcoin.Clients +{ + public class SpvClientSettings : ClientSettingsBase, ISpvClientSettings + { + /// + /// Initializes a new instance of with the given parameters. + /// + /// Network type + /// Maximum number of connections + /// List of peers (can be null) + /// File manager + public SpvClientSettings(NetworkType netType, int maxConnection, NodePool nodes, IFileManager fileMan) + : base(netType, maxConnection, nodes, NodeServiceFlags.NodeNone) + { + Relay = false; + + FileMan = fileMan ?? throw new ArgumentNullException(); + + var c = new Consensus(netType); + Blockchain = new Chain(FileMan, new BlockVerifier(null, c), c, Time, netType); + FileMan = fileMan; + + // TODO: find a better way for this + supportsIpV6 = NetworkInterface.GetAllNetworkInterfaces().All(x => x.Supports(NetworkInterfaceComponent.IPv6)); + } + + + private readonly bool supportsIpV6; + private readonly object addrLock = new object(); + private const string NodeAddrs = "NodeAddrs"; + + /// + public override IReplyManager CreateReplyManager(INodeStatus nodeStatus) => new SpvReplyManager(nodeStatus, this); + + + /// + public IFileManager FileMan { get; } + /// + public IChain Blockchain { get; } + + + private const ulong HdrSyncMask = (ulong)(NodeServiceFlags.NodeNetwork | NodeServiceFlags.NodeNetworkLimited); + /// + public bool IsGoodForHeaderSync(NodeServiceFlags flags) => ((ulong)flags & HdrSyncMask) != 0; + /// + public bool HasNeededServices(NodeServiceFlags flags) + { + return IsGoodForHeaderSync(flags); + } + + + /// + public int GetRandomNodeAddrs(int count, bool skipCheck, List result) + { + if (count <= 0) + { + return 0; + } + + lock (addrLock) + { + byte[] data = FileMan.ReadData(NodeAddrs); + if (data is null || data.Length == 0 || data.Length % NetworkAddressWithTime.Size != 0) + { + // File doesn't exist or is corrupted + return 0; + } + else + { + int total = data.Length / NetworkAddressWithTime.Size; + // This is like shuffling the entire array itself, but we just have the random index + int[] indices = Rng.GetDistinct(0, total, total); + + var stream = new FastStreamReader(data); + int i = 0; + int temp = result.Count; + while (result.Count < count && i < indices.Length) + { + stream.ChangePosition(indices[i] * NetworkAddressWithTime.Size); + var addr = new NetworkAddressWithTime(); + if (addr.TryDeserialize(stream, out _)) + { + if ((skipCheck || + !AllNodes.Contains(addr.NodeIP) && + (supportsIpV6 || addr.NodeIP.AddressFamily != AddressFamily.InterNetworkV6) && + HasNeededServices(addr.NodeServices)) && + !result.Contains(addr)) + { + result.Add(addr); + } + } + i++; + } + + return result.Count - temp; + } + } + } + + /// + public void RemoveNodeAddr(IPAddress ip) + { + lock (addrLock) + { + byte[] data = FileMan.ReadData(NodeAddrs); + if (!(data is null) && data.Length % NetworkAddressWithTime.Size == 0) + { + int total = data.Length / NetworkAddressWithTime.Size; + var reader = new FastStreamReader(data); + for (int i = 0; i < total; i++) + { + var addr = new NetworkAddressWithTime(); + if (addr.TryDeserialize(reader, out _) && addr.NodeIP.Equals(ip)) + { + byte[] result = new byte[data.Length - NetworkAddressWithTime.Size]; + int startPos = i * NetworkAddressWithTime.Size; + int EndPos = startPos + NetworkAddressWithTime.Size; + Buffer.BlockCopy(data, 0, result, 0, startPos); + Buffer.BlockCopy(data, EndPos, result, startPos, data.Length - EndPos); + + FileMan.WriteData(result, NodeAddrs); + break; + } + } + } + } + } + } +} diff --git a/Src/Autarkysoft.Bitcoin/P2PNetwork/ReplyManager.cs b/Src/Autarkysoft.Bitcoin/P2PNetwork/ReplyManager.cs index 1a42da8..0686d6f 100644 --- a/Src/Autarkysoft.Bitcoin/P2PNetwork/ReplyManager.cs +++ b/Src/Autarkysoft.Bitcoin/P2PNetwork/ReplyManager.cs @@ -40,7 +40,7 @@ public ReplyManager(INodeStatus ns, IFullClientSettings cs) : base(ns, cs) /// public override Message GetVersionMsg() { - return GetVersionMsg(new NetworkAddress(fullSettings.Services, nodeStatus.IP, nodeStatus.Port), fullSettings.Blockchain.Height); + return GetVersionMsg(new NetworkAddress(0, nodeStatus.IP, nodeStatus.Port), fullSettings.Blockchain.Height); } private Message[] GetSettingsMessages(Message extraMsg) diff --git a/Src/Autarkysoft.Bitcoin/P2PNetwork/SpvReplyManager.cs b/Src/Autarkysoft.Bitcoin/P2PNetwork/SpvReplyManager.cs new file mode 100644 index 0000000..6bfbfb0 --- /dev/null +++ b/Src/Autarkysoft.Bitcoin/P2PNetwork/SpvReplyManager.cs @@ -0,0 +1,276 @@ +// Autarkysoft.Bitcoin +// Copyright (c) 2020 Autarkysoft +// Distributed under the MIT software license, see the accompanying +// file LICENCE or http://www.opensource.org/licenses/mit-license.php. + +using Autarkysoft.Bitcoin.Blockchain; +using Autarkysoft.Bitcoin.Blockchain.Blocks; +using Autarkysoft.Bitcoin.Clients; +using Autarkysoft.Bitcoin.P2PNetwork.Messages; +using Autarkysoft.Bitcoin.P2PNetwork.Messages.MessagePayloads; +using System; +using System.Collections.Generic; + +namespace Autarkysoft.Bitcoin.P2PNetwork +{ + public class SpvReplyManager : ReplyManagerBase + { + /// + /// Initializes a new instanse of using the given parameters. + /// + /// Node status + /// Client settings + public SpvReplyManager(INodeStatus ns, ISpvClientSettings cs) : base(ns, cs) + { + spvSettings = cs; + } + + + private readonly ISpvClientSettings spvSettings; + + private Message[] GetSettingsMessages(Message extraMsg) + { + var result = new List(7); + if (!(extraMsg is null)) + { + result.Add(extraMsg); + } + + if (!spvSettings.HasNeededServices(nodeStatus.Services)) + { + nodeStatus.SignalDisconnect(); + return null; + } + + // We want the other node to respond to our initial settings quickly to check headers. + // TODO: this may be a bad thing to enfoce on all nodes. Maybe force it based on Blockchain.State + nodeStatus.StartDisconnectTimer(TimeConstants.MilliSeconds.OneMin); + + //result.Add(new Message(new GetAddrPayload(), settings.Network)); + + if (nodeStatus.ProtocolVersion > Constants.P2PBip31ProtVer) + { + // We don't bother sending ping to a node that doesn't support nonce in ping/pong messages. + // This will set default value for latency and this node will be ignored when latency is used later. + result.Add(GetPingMsg()); + } + + if (nodeStatus.ProtocolVersion >= Constants.P2PBip130ProtVer) + { + result.Add(new Message(new SendHeadersPayload(), settings.Network)); + } + + // Always send GetHeaders message during handshake + result.Add(GetLocatorMessage()); + + return result.Count == 0 ? null : result.ToArray(); + } + + + private Message GetLocatorMessage() + { + BlockHeader[] headers = spvSettings.Blockchain.GetBlockHeaderLocator(); + if (headers.Length > GetHeadersPayload.MaximumHashes) + { + // This should never happen but since IBlockchain is a dependency we have to check it here + // to prevent an exception being thrown. + BlockHeader[] temp = new BlockHeader[GetHeadersPayload.MaximumHashes]; + Array.Copy(headers, 0, temp, 0, temp.Length); + headers = temp; + } + return new Message(new GetHeadersPayload(settings.ProtocolVersion, headers, null), settings.Network); + } + + + /// + public override Message[] GetReply(Message msg) + { + if (!msg.TryGetPayloadType(out PayloadType plt)) + { + // Undefined payload type (this is a violation since other node knows our protocol version) + nodeStatus.AddSmallViolation(); + nodeStatus.UpdateTime(); + return null; + } + + if (nodeStatus.HandShake != HandShakeState.Finished && plt != PayloadType.Version && plt != PayloadType.Verack) + { + nodeStatus.AddMediumViolation(); + nodeStatus.UpdateTime(); + return null; + } + + Message[] result = null; + + switch (plt) + { + case PayloadType.GetHeaders: + if (spvSettings.Blockchain.State == BlockchainState.HeadersSync) + { + nodeStatus.UpdateTime(); + // If the client is syncing its headers it can't provide headers + return null; + } + + if (Deser(msg.PayloadData, out GetHeadersPayload getHdrs)) + { + BlockHeader[] hds = spvSettings.Blockchain.GetMissingHeaders(getHdrs.Hashes, getHdrs.StopHash); + if (!(hds is null)) + { + if (hds.Length > HeadersPayload.MaxCount) + { + // This should never happen but since IBlockchain is a dependency we have to check it here + // to prevent an exception being thrown. + BlockHeader[] temp = new BlockHeader[HeadersPayload.MaxCount]; + Array.Copy(hds, 0, temp, 0, temp.Length); + hds = temp; + } + + result = new Message[1] { new Message(new HeadersPayload(hds), settings.Network) }; + } + } + break; + case PayloadType.Headers: + if (Deser(msg.PayloadData, out HeadersPayload hdrs)) + { + if (hdrs.Headers.Length == 0) + { + nodeStatus.UpdateTime(); + // Header locator will always create a request that will fetch at least one header. + // Additionally sending an empty header array is a violation on its own. + nodeStatus.AddMediumViolation(); + return null; + } + + if (spvSettings.Blockchain.ProcessHeaders(hdrs.Headers, nodeStatus)) + { + nodeStatus.ReStartDisconnectTimer(); + result = new Message[1] { GetLocatorMessage() }; + } + } + break; + case PayloadType.Ping: + if (Deser(msg.PayloadData, out PingPayload ping)) + { + result = new Message[1] { new Message(new PongPayload(ping.Nonce), settings.Network) }; + } + break; + case PayloadType.Pong: + if (Deser(msg.PayloadData, out PongPayload pong)) + { + nodeStatus.CheckPing(pong.Nonce); + } + break; + case PayloadType.SendHeaders: + // Empty payload + if (nodeStatus.SendHeaders) + { + // It's a violation if the other node "spams" the same settings more than once. + nodeStatus.AddSmallViolation(); + } + else + { + nodeStatus.SendHeaders = true; + } + break; + case PayloadType.Verack: + result = CheckVerack(); + break; + case PayloadType.Version: + result = CheckVersion(msg); + break; + } + + nodeStatus.UpdateTime(); + return result; + } + + /// + public override Message GetVersionMsg() => GetVersionMsg(new NetworkAddress(0, nodeStatus.IP, nodeStatus.Port), 0); + + private Message[] CheckVerack() + { + // VerackPayload doesn't have a body and won't deserialize anything + // If anything were added to it in the future a TryDeserialize() should be written here + + switch (nodeStatus.HandShake) + { + case HandShakeState.None: + case HandShakeState.SentAndConfirmed: + case HandShakeState.Finished: + nodeStatus.AddMediumViolation(); + break; + case HandShakeState.ReceivedAndReplied: + case HandShakeState.SentAndReceived: + nodeStatus.HandShake = HandShakeState.Finished; + return GetSettingsMessages(null); + case HandShakeState.Sent: + nodeStatus.HandShake = HandShakeState.SentAndConfirmed; + break; + default: + break; + } + + return null; + } + + private Message[] CheckVersion(Message msg) + { + var version = new VersionPayload(); + if (!version.TryDeserialize(new FastStreamReader(msg.PayloadData), out _)) + { + nodeStatus.AddSmallViolation(); + return null; + } + + if (version.Version < Constants.P2PMinProtoVer) + { + nodeStatus.SignalDisconnect(); + return null; + } + + nodeStatus.ProtocolVersion = version.Version; + nodeStatus.Services = version.Services; + nodeStatus.Nonce = version.Nonce; + nodeStatus.UserAgent = version.UserAgent; + nodeStatus.StartHeight = version.StartHeight; + nodeStatus.Relay = version.Relay; + settings.UpdateMyIP(version.ReceivingNodeNetworkAddress.NodeIP); + settings.Time.UpdateTime(version.Timestamp); + + Message[] result = null; + + switch (nodeStatus.HandShake) + { + case HandShakeState.None: + nodeStatus.HandShake = HandShakeState.ReceivedAndReplied; + result = new Message[2] + { + new Message(new VerackPayload(), settings.Network), + GetVersionMsg() + }; + break; + case HandShakeState.Sent: + nodeStatus.HandShake = HandShakeState.SentAndReceived; + result = new Message[1] + { + new Message(new VerackPayload(), settings.Network) + }; + break; + case HandShakeState.SentAndConfirmed: + nodeStatus.HandShake = HandShakeState.Finished; + result = GetSettingsMessages(new Message(new VerackPayload(), settings.Network)); + break; + case HandShakeState.ReceivedAndReplied: + case HandShakeState.SentAndReceived: + case HandShakeState.Finished: + nodeStatus.AddMediumViolation(); + break; + default: + break; + } + + return result; + } + } +}