diff --git a/Assets/Mirage/Runtime/SocketLayer/ByteBuffer.cs b/Assets/Mirage/Runtime/SocketLayer/ByteBuffer.cs index 6eaa58b820..32fccf5e83 100644 --- a/Assets/Mirage/Runtime/SocketLayer/ByteBuffer.cs +++ b/Assets/Mirage/Runtime/SocketLayer/ByteBuffer.cs @@ -3,7 +3,7 @@ namespace Mirage.SocketLayer { /// - /// Warpper around a byte[] that belongs to a + /// Wrapper around a byte[] that belongs to a /// public sealed class ByteBuffer : IDisposable { diff --git a/Assets/Mirage/Runtime/SocketLayer/Config.cs b/Assets/Mirage/Runtime/SocketLayer/Config.cs index 922867e4be..c44bc280cd 100644 --- a/Assets/Mirage/Runtime/SocketLayer/Config.cs +++ b/Assets/Mirage/Runtime/SocketLayer/Config.cs @@ -103,6 +103,11 @@ public class Config /// Enable if the Socket you are using has its own Reliable layer. For example using Websocket, which is TCP. /// public bool DisableReliableLayer = false; + + /// + /// Passes channel to socket rather than Peer handling reliable layer. This is useful when using a relay which has its own reliable layer, such as Steam or Epic Relay + /// + public bool PassthroughReliableLayer = false; #endregion } } diff --git a/Assets/Mirage/Runtime/SocketLayer/Connection/Batch.cs b/Assets/Mirage/Runtime/SocketLayer/Connection/Batch.cs index e5f4168cde..4bf51cd602 100644 --- a/Assets/Mirage/Runtime/SocketLayer/Connection/Batch.cs +++ b/Assets/Mirage/Runtime/SocketLayer/Connection/Batch.cs @@ -57,17 +57,17 @@ public void Flush() public class ArrayBatch : Batch { - private readonly Action _send; + private readonly IRawConnection _connection; private readonly PacketType _packetType; private readonly byte[] _batch; private int _batchLength; - public ArrayBatch(int maxPacketSize, Action send, PacketType reliable) + public ArrayBatch(int maxPacketSize, IRawConnection connection, PacketType reliable) : base(maxPacketSize) { _batch = new byte[maxPacketSize]; - _send = send; + _connection = connection; _packetType = reliable; } @@ -84,7 +84,7 @@ protected override void CreateNewBatch() protected override void SendAndReset() { - _send.Invoke(_batch, _batchLength); + _connection.SendRaw(_batch, _batchLength); _batchLength = 0; } } diff --git a/Assets/Mirage/Runtime/SocketLayer/Connection/Connection.cs b/Assets/Mirage/Runtime/SocketLayer/Connection/Connection.cs index 779613c6aa..c1ee43b1d2 100644 --- a/Assets/Mirage/Runtime/SocketLayer/Connection/Connection.cs +++ b/Assets/Mirage/Runtime/SocketLayer/Connection/Connection.cs @@ -4,7 +4,7 @@ namespace Mirage.SocketLayer { - internal abstract class Connection : IConnection + internal abstract class Connection : IConnection, IRawConnection { protected readonly ILogger _logger; protected readonly int _maxPacketSize; @@ -73,6 +73,11 @@ protected Connection(Peer peer, IEndPoint endPoint, IDataHandler dataHandler, Co _metrics = metrics; } + void IRawConnection.SendRaw(byte[] packet, int length) + { + _peer.Send(this, packet, length); + } + public override string ToString() { return $"[{EndPoint}]"; diff --git a/Assets/Mirage/Runtime/SocketLayer/Connection/NoReliableConnection.cs b/Assets/Mirage/Runtime/SocketLayer/Connection/NoReliableConnection.cs index 646007ebe3..a380d6907d 100644 --- a/Assets/Mirage/Runtime/SocketLayer/Connection/NoReliableConnection.cs +++ b/Assets/Mirage/Runtime/SocketLayer/Connection/NoReliableConnection.cs @@ -3,8 +3,9 @@ namespace Mirage.SocketLayer { + /// - /// Connection that does not run its own reliablity layer, good for TCP sockets + /// Connection that does not run its own reliability layer, good for TCP sockets /// internal sealed class NoReliableConnection : Connection { @@ -15,7 +16,7 @@ internal sealed class NoReliableConnection : Connection internal NoReliableConnection(Peer peer, IEndPoint endPoint, IDataHandler dataHandler, Config config, int maxPacketSize, Time time, ILogger logger, Metrics metrics) : base(peer, endPoint, dataHandler, config, maxPacketSize, time, logger, metrics) { - _nextBatchReliable = new ArrayBatch(maxPacketSize, SendBatchInternal, PacketType.Reliable); + _nextBatchReliable = new ArrayBatch(maxPacketSize, this, PacketType.Reliable); if (maxPacketSize > ushort.MaxValue) { @@ -23,11 +24,6 @@ internal NoReliableConnection(Peer peer, IEndPoint endPoint, IDataHandler dataHa } } - private void SendBatchInternal(byte[] batch, int length) - { - _peer.Send(this, batch, length); - } - // just sue SendReliable for unreliable/notify // note: we dont need to pass in that it is reliable, receiving doesn't really care what channel it is public override void SendUnreliable(byte[] packet, int offset, int length) => SendReliable(packet, offset, length); diff --git a/Assets/Mirage/Runtime/SocketLayer/Connection/PassthroughConnection.cs b/Assets/Mirage/Runtime/SocketLayer/Connection/PassthroughConnection.cs new file mode 100644 index 0000000000..7203e00919 --- /dev/null +++ b/Assets/Mirage/Runtime/SocketLayer/Connection/PassthroughConnection.cs @@ -0,0 +1,149 @@ +using System; +using UnityEngine; + +namespace Mirage.SocketLayer +{ + internal class PassthroughConnection : Connection, IRawConnection + { + private const int HEADER_SIZE = 1 + Batch.MESSAGE_LENGTH_SIZE; + + private readonly Batch _reliableBatch; + private readonly Batch _unreliableBatch; + private readonly AckSystem _ackSystem; + + public PassthroughConnection(Peer peer, IEndPoint endPoint, IDataHandler dataHandler, Config config, int maxPacketSize, Time time, Pool bufferPool, ILogger logger, Metrics metrics) + : base(peer, endPoint, dataHandler, config, maxPacketSize, time, logger, metrics) + { + _reliableBatch = new ArrayBatch(maxPacketSize, this, PacketType.Reliable); + _unreliableBatch = new ArrayBatch(maxPacketSize, this, PacketType.Unreliable); + _ackSystem = new AckSystem(this, config, maxPacketSize, time, bufferPool, logger, metrics); + + if (maxPacketSize > ushort.MaxValue) + { + throw new ArgumentException($"Max package size can not bigger than {ushort.MaxValue}. NoReliableConnection uses 2 bytes for message length, maxPacketSize over that value will mean that message will be incorrectly batched."); + } + } + + public override void SendUnreliable(byte[] message, int offset, int length) + { + ThrowIfNotConnectedOrConnecting(); + + if (length + HEADER_SIZE > _maxPacketSize) + { + throw new ArgumentException($"Message is bigger than MTU, size:{length} but max message size is {_maxPacketSize - HEADER_SIZE}"); + } + + _unreliableBatch.AddMessage(message, offset, length); + _metrics?.OnSendMessageUnreliable(length); + } + + /// + /// Use version for non-alloc + /// + public override INotifyToken SendNotify(byte[] packet, int offset, int length) + { + ThrowIfNotConnectedOrConnecting(); + var token = _ackSystem.SendNotify(packet, offset, length); + _metrics?.OnSendMessageNotify(length); + return token; + } + + /// + /// Use version for non-alloc + /// + public override void SendNotify(byte[] packet, int offset, int length, INotifyCallBack callBacks) + { + ThrowIfNotConnectedOrConnecting(); + _ackSystem.SendNotify(packet, offset, length, callBacks); + _metrics?.OnSendMessageNotify(length); + } + + /// + /// single message, batched by AckSystem + /// + /// + public override void SendReliable(byte[] message, int offset, int length) + { + ThrowIfNotConnectedOrConnecting(); + + if (length + HEADER_SIZE > _maxPacketSize) + { + throw new ArgumentException($"Message is bigger than MTU, size:{length} but max message size is {_maxPacketSize - HEADER_SIZE}"); + } + + _reliableBatch.AddMessage(message, offset, length); + _metrics?.OnSendMessageReliable(length); + } + + internal override void ReceiveUnreliablePacket(Packet packet) + { + HandleReliableBatched(packet.Buffer.array, 1, packet.Length, PacketType.Unreliable); + } + + internal override void ReceiveReliablePacket(Packet packet) + { + HandleReliableBatched(packet.Buffer.array, 1, packet.Length, PacketType.Reliable); + } + + internal override void ReceiveReliableFragment(Packet packet) => throw new NotSupportedException(); + + internal override void ReceiveNotifyPacket(Packet packet) + { + var segment = _ackSystem.ReceiveNotify(packet.Buffer.array, packet.Length); + if (segment != default) + { + _metrics?.OnReceiveMessageNotify(packet.Length); + _dataHandler.ReceiveMessage(this, segment); + } + } + + internal override void ReceiveNotifyAck(Packet packet) + { + _ackSystem.ReceiveAck(packet.Buffer.array); + } + + public override void FlushBatch() + { + _ackSystem.Update(); + _reliableBatch.Flush(); + _unreliableBatch.Flush(); + } + + internal override bool IsValidSize(Packet packet) + { + const int minPacketSize = 1; + + var length = packet.Length; + if (length < minPacketSize) + return false; + + // Min size of message given to Mirage + const int minMessageSize = 2; + + const int minCommandSize = 2; + const int minUnreliableSize = 1 + minMessageSize; + + switch (packet.Type) + { + case PacketType.Command: + return length >= minCommandSize; + + case PacketType.Reliable: + case PacketType.Unreliable: + return length >= minUnreliableSize; + + case PacketType.Notify: + return length >= AckSystem.NOTIFY_HEADER_SIZE + minMessageSize; + case PacketType.Ack: + return length >= AckSystem.ACK_HEADER_SIZE; + case PacketType.ReliableFragment: + // not supported + return false; + + default: + case PacketType.KeepAlive: + return true; + } + } + } +} diff --git a/Assets/Mirage/Runtime/SocketLayer/Connection/PassthroughConnection.cs.meta b/Assets/Mirage/Runtime/SocketLayer/Connection/PassthroughConnection.cs.meta new file mode 100644 index 0000000000..323b22f926 --- /dev/null +++ b/Assets/Mirage/Runtime/SocketLayer/Connection/PassthroughConnection.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: bc9c29bb7ffe23349b834ab6a92617c6 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Assets/Mirage/Runtime/SocketLayer/Connection/ReliableConnection.cs b/Assets/Mirage/Runtime/SocketLayer/Connection/ReliableConnection.cs index 0c5fb98a2e..e6a5feb20f 100644 --- a/Assets/Mirage/Runtime/SocketLayer/Connection/ReliableConnection.cs +++ b/Assets/Mirage/Runtime/SocketLayer/Connection/ReliableConnection.cs @@ -6,35 +6,23 @@ namespace Mirage.SocketLayer /// /// Objects that represents a connection to/from a server/client. Holds state that is needed to update, send, and receive data /// - internal sealed class ReliableConnection : Connection, IRawConnection, IDisposable + internal sealed class ReliableConnection : Connection, IDisposable { private readonly AckSystem _ackSystem; private readonly Batch _unreliableBatch; - private readonly Pool _bufferPool; internal ReliableConnection(Peer peer, IEndPoint endPoint, IDataHandler dataHandler, Config config, int maxPacketSize, Time time, Pool bufferPool, ILogger logger, Metrics metrics) : base(peer, endPoint, dataHandler, config, maxPacketSize, time, logger, metrics) { - _bufferPool = bufferPool; - _unreliableBatch = new ArrayBatch(_maxPacketSize, SendBatchInternal, PacketType.Unreliable); + _unreliableBatch = new ArrayBatch(_maxPacketSize, this, PacketType.Unreliable); _ackSystem = new AckSystem(this, config, maxPacketSize, time, bufferPool, logger, metrics); } - private void SendBatchInternal(byte[] batch, int length) - { - _peer.Send(this, batch, length); - } - public void Dispose() { _ackSystem.Dispose(); } - void IRawConnection.SendRaw(byte[] packet, int length) - { - _peer.Send(this, packet, length); - } - /// /// Use version for non-alloc /// diff --git a/Assets/Mirage/Runtime/SocketLayer/Peer.cs b/Assets/Mirage/Runtime/SocketLayer/Peer.cs index 4c22829430..53c73be9f0 100644 --- a/Assets/Mirage/Runtime/SocketLayer/Peer.cs +++ b/Assets/Mirage/Runtime/SocketLayer/Peer.cs @@ -429,6 +429,10 @@ private Connection CreateNewConnection(IEndPoint newEndPoint) { connection = new NoReliableConnection(this, endPoint, _dataHandler, _config, _maxPacketSize, _time, _logger, _metrics); } + else if (_config.PassthroughReliableLayer) + { + connection = new PassthroughConnection(this, endPoint, _dataHandler, _config, _maxPacketSize, _time, _bufferPool, _logger, _metrics); + } else { connection = new ReliableConnection(this, endPoint, _dataHandler, _config, _maxPacketSize, _time, _bufferPool, _logger, _metrics); diff --git a/Assets/Tests/SocketLayer/AckSystem/NoReliableConnectionTest.cs b/Assets/Tests/SocketLayer/AckSystem/NoReliableConnectionTest.cs index 9bb8ecdacf..e01fce9212 100644 --- a/Assets/Tests/SocketLayer/AckSystem/NoReliableConnectionTest.cs +++ b/Assets/Tests/SocketLayer/AckSystem/NoReliableConnectionTest.cs @@ -5,30 +5,28 @@ using NSubstitute; using NUnit.Framework; -namespace Mirage.SocketLayer.Tests.AckSystemTests +namespace Mirage.SocketLayer.Tests { [Category("SocketLayer")] - public class NoReliableConnectionTest + public abstract class ConnectionTestBase { - private const int MAX_PACKET_SIZE = 100; + protected const int MAX_PACKET_SIZE = 100; - private IConnection _connection; - private byte[] _buffer; - private Config _config; - private PeerInstance _peerInstance; - private Pool _bufferPool; - private readonly Random rand = new Random(); - private byte[] _sentArray; + protected IConnection _connection; + protected byte[] _buffer; + protected Config _config; + protected PeerInstance _peerInstance; + protected Pool _bufferPool; + protected readonly Random rand = new Random(); + protected List _sentArrays = new List(); - private ISocket Socket => _peerInstance.socket; + protected ISocket Socket => _peerInstance.socket; + protected abstract Config CreateConfig(); [SetUp] public void Setup() { - _config = new Config - { - DisableReliableLayer = true, - }; + _config = CreateConfig(); _peerInstance = new PeerInstance(_config, maxPacketSize: MAX_PACKET_SIZE); _bufferPool = new Pool(ByteBuffer.CreateNew, MAX_PACKET_SIZE, 0, 100); @@ -41,38 +39,20 @@ public void Setup() } // clear calls, Connect will have sent one + _sentArrays.Clear(); Socket.ClearReceivedCalls(); Socket.When(x => x.Send(Arg.Any(), Arg.Any(), Arg.Any())) .Do(x => { - var arg = (byte[])x.Args()[1]; + var packet = (byte[])x.Args()[1]; + var length = (int)x.Args()[2]; // create copy - _sentArray = arg.ToArray(); + _sentArrays.Add(packet.Take(length).ToArray()); }); } - [Test] - public void IsNoReliableConnection() - { - Assert.That(_connection, Is.TypeOf()); - } - [Test] - public void ThrowsIfTooBig() - { - // 3 byte header, so max size is over max - var bigBuffer = new byte[MAX_PACKET_SIZE - 2]; - - var exception = Assert.Throws(() => - { - _connection.SendReliable(bigBuffer); - }); - - var expected = new ArgumentException($"Message is bigger than MTU, size:{bigBuffer.Length} but max message size is {MAX_PACKET_SIZE - 3}"); - Assert.That(exception, Has.Message.EqualTo(expected.Message)); - } - - private void AssertSentPacket(IEnumerable messageLengths) + protected void AssertSentPacket(PacketType type, IEnumerable messageLengths) { var totalLength = 1 + (2 * messageLengths.Count()) + messageLengths.Sum(); @@ -82,33 +62,118 @@ private void AssertSentPacket(IEnumerable messageLengths) Socket.Received(1).Send(Arg.Any(), Arg.Any(), totalLength); // check packet was correct - CheckMessage(_sentArray, messageLengths); + CheckMessage(type, 0, 1, messageLengths); // clear calls after, so we are ready to process next message Socket.ClearReceivedCalls(); + _sentArrays.Clear(); } - private void CheckMessage(byte[] packet, IEnumerable messageLengths) + protected void CheckMessage(PacketType type, int sentIndex, int sendCount, IEnumerable messageLengths, int skipHeader = 0) { - if (packet[0] != (byte)PacketType.Reliable) - Assert.Fail($"First byte was not Reliable, it was {packet[0]} instead"); + Assert.That(_sentArrays.Count, Is.EqualTo(sendCount)); + var packet = _sentArrays[sentIndex]; + if (packet[0] != (byte)type) + Assert.Fail($"First byte should be the packet type, {type}, it was {(PacketType)packet[0]} instead"); var offset = 1; foreach (var length in messageLengths) { - var ln = ByteUtils.ReadUShort(packet, ref offset); - if (ln != length) - Assert.Fail($"Length at offset {offset - 2} was incorrect.\n Expected:{length}\n But war:{ln}"); + if (skipHeader == 0) + { + var ln = ByteUtils.ReadUShort(packet, ref offset); + if (ln != length) + Assert.Fail($"Length at offset {offset - 2} was incorrect.\n Expected:{length}\n But war:{ln}"); - for (var i = 0; i < length; i++) - { - if (packet[offset + i] != _buffer[i]) - Assert.Fail($"Value at offset {offset + i} was incorrect.\n Expected:{_buffer[i]}\n But war:{packet[offset + i]}"); + for (var i = 0; i < length; i++) + { + if (packet[offset + i] != _buffer[i]) + Assert.Fail($"Value at offset {offset + i} was incorrect.\n Expected:{_buffer[i]}\n But war:{packet[offset + i]}"); + } + offset += length; + } + else + { + offset += skipHeader; + for (var i = 0; i < length; i++) + { + if (packet[offset + i] != _buffer[i]) + Assert.Fail($"Value at offset {offset + i} was incorrect.\n Expected:{_buffer[i]}\n But war:{packet[offset + i]}"); + + } + offset += length; } - offset += length; } + + Assert.That(offset, Is.EqualTo(packet.Length)); + } + + protected void SendIntoBatch(int length, bool reliable, ref int total, List currentBatch) + { + // will write length+2 + var newTotal = total + 2 + length; + if (newTotal > MAX_PACKET_SIZE) + { + Send(reliable, _buffer, length); + // was over max, so should have sent + AssertSentPacket(reliable ? PacketType.Reliable : PacketType.Unreliable, currentBatch); + + currentBatch.Clear(); + // new batch + total = 1 + 2 + length; + } + else + { + Send(reliable, _buffer, length); + Socket.DidNotReceive().Send(Arg.Any(), Arg.Any(), Arg.Any()); + total = newTotal; + } + currentBatch.Add(length); + } + + protected void Send(bool reliable, byte[] buffer, int length) + { + if (reliable) + _connection.SendReliable(buffer, 0, length); + else + _connection.SendUnreliable(buffer, 0, length); + } + } + + [Category("SocketLayer")] + public class NoReliableConnectionTest : ConnectionTestBase + { + private new Connection _connection => (Connection)base._connection; + + protected override Config CreateConfig() + { + return new Config + { + DisableReliableLayer = true, + }; + } + + [Test] + public void IsNoReliableConnection() + { + Assert.That(_connection, Is.TypeOf()); + } + + [Test] + public void ThrowsIfTooBig() + { + // 3 byte header, so max size is over max + var bigBuffer = new byte[MAX_PACKET_SIZE - 2]; + + var exception = Assert.Throws(() => + { + _connection.SendReliable(bigBuffer); + }); + + var expected = new ArgumentException($"Message is bigger than MTU, size:{bigBuffer.Length} but max message size is {MAX_PACKET_SIZE - 3}"); + Assert.That(exception, Has.Message.EqualTo(expected.Message)); } [Test] @@ -130,7 +195,7 @@ public void MessageAreBatched() // should be 97 in buffer now => 1+(length+2)*3 _connection.SendReliable(_buffer, 0, overBatch); - AssertSentPacket(lessThanBatchLengths); + AssertSentPacket(PacketType.Reliable, lessThanBatchLengths); } [Test] @@ -154,7 +219,7 @@ public void MessageAreBatched_Repeat() { _connection.SendReliable(_buffer, 0, length); // was over max, so should have sent - AssertSentPacket(currentBatch); + AssertSentPacket(PacketType.Reliable, currentBatch); currentBatch.Clear(); // new batch @@ -187,7 +252,7 @@ public void FlushSendsMessageInBatch() } _connection.FlushBatch(); - AssertSentPacket(lessThanBatchLengths); + AssertSentPacket(PacketType.Reliable, lessThanBatchLengths); } [Test] @@ -241,7 +306,7 @@ public void SendingToUnreliableUsesReliable() _connection.SendUnreliable(_buffer, 0, counts[1]); _connection.FlushBatch(); - AssertSentPacket(counts); + AssertSentPacket(PacketType.Reliable, counts); } [Test] @@ -252,7 +317,7 @@ public void SendingToNotifyUsesReliable() _connection.SendNotify(_buffer, 0, counts[1]); _connection.FlushBatch(); - AssertSentPacket(counts); + AssertSentPacket(PacketType.Reliable, counts); } [Test] public void SendingToNotifyTokenUsesReliable() @@ -263,7 +328,7 @@ public void SendingToNotifyTokenUsesReliable() _connection.SendNotify(_buffer, 0, counts[1], token); _connection.FlushBatch(); - AssertSentPacket(counts); + AssertSentPacket(PacketType.Reliable, counts); } [Test] @@ -272,6 +337,10 @@ public void NotifyOnDeliveredInvoke() var counts = new List() { 10, 20 }; var token = _connection.SendNotify(_buffer, 0, counts[0]); Assert.That(token, Is.TypeOf()); + + var action = Substitute.For(); + token.Delivered += action; + action.Received(1).Invoke(); } [Test] diff --git a/Assets/Tests/SocketLayer/AckSystem/PassthroughConnectionTest.cs b/Assets/Tests/SocketLayer/AckSystem/PassthroughConnectionTest.cs new file mode 100644 index 0000000000..aa30762cbc --- /dev/null +++ b/Assets/Tests/SocketLayer/AckSystem/PassthroughConnectionTest.cs @@ -0,0 +1,290 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using NSubstitute; +using NUnit.Framework; + +namespace Mirage.SocketLayer.Tests +{ + [Category("SocketLayer")] + public class PassthroughConnectionTest : ConnectionTestBase + { + private new Connection _connection => (Connection)base._connection; + + protected override Config CreateConfig() + { + return new Config + { + PassthroughReliableLayer = true, + }; + } + + [Test] + public void IsNoReliableConnection() + { + Assert.That(_connection, Is.TypeOf()); + } + + [Test] + [TestCase(true)] + [TestCase(false)] + public void ThrowsIfTooBig(bool reliable) + { + // 3 byte header, so max size is over max + var bigBuffer = new byte[MAX_PACKET_SIZE - 2]; + + var exception = Assert.Throws(() => + { + Send(reliable, bigBuffer, bigBuffer.Length); + }); + + var expected = new ArgumentException($"Message is bigger than MTU, size:{bigBuffer.Length} but max message size is {MAX_PACKET_SIZE - 3}"); + Assert.That(exception, Has.Message.EqualTo(expected.Message)); + } + + [Test] + [TestCase(true)] + [TestCase(false)] + public void MessageAreBatched(bool reliable) + { + // max is 100 + + var lessThanBatchLengths = new int[] + { + 20, 40, 30 + }; + var overBatch = 11; + + foreach (var length in lessThanBatchLengths) + { + Send(reliable, _buffer, length); + Socket.DidNotReceive().Send(Arg.Any(), Arg.Any(), Arg.Any()); + } + + // should be 97 in buffer now => 1+(length+2)*3 + Send(reliable, _buffer, overBatch); + AssertSentPacket(reliable ? PacketType.Reliable : PacketType.Unreliable, lessThanBatchLengths); + } + + [Test] + [Repeat(100)] + [TestCase(10)] + [TestCase(100)] + public void MessageAreBatched_Repeat(int messageCount) + { + var lengths = new int[messageCount]; + for (var i = 0; i < messageCount; i++) + lengths[i] = rand.Next(10, MAX_PACKET_SIZE - 3); + + var currentBatch_reliable = new List(); + var currentBatch_unreliable = new List(); + var total_reliable = 1; + var total_unreliable = 1; + foreach (var length in lengths) + { + var reliable = rand.Next(0, 1) == 1; + if (reliable) + SendIntoBatch(length, true, ref total_reliable, currentBatch_reliable); + else + SendIntoBatch(length, false, ref total_unreliable, currentBatch_unreliable); + } + } + + [Test] + [TestCase(true)] + [TestCase(false)] + public void FlushSendsMessageInBatch(bool reliable) + { + // max is 100 + + var lessThanBatchLengths = new int[] + { + 20, 40 + }; + + foreach (var length in lessThanBatchLengths) + { + Send(reliable, _buffer, length); + Socket.DidNotReceive().Send(Arg.Any(), Arg.Any(), Arg.Any()); + } + + _connection.FlushBatch(); + AssertSentPacket(reliable ? PacketType.Reliable : PacketType.Unreliable, lessThanBatchLengths); + } + + [Test] + public void FlushSendsMessageInBatch_BothTypes() + { + // max is 100 + + var lessThanBatchLengths_reliable = new int[] + { + 20, 40 + }; + var lessThanBatchLengths_unreliable = new int[] + { + 15, 35, 20 + }; + + foreach (var length in lessThanBatchLengths_reliable) + { + _connection.SendReliable(_buffer, 0, length); + Socket.DidNotReceive().Send(Arg.Any(), Arg.Any(), Arg.Any()); + } + foreach (var length in lessThanBatchLengths_unreliable) + { + _connection.SendUnreliable(_buffer, 0, length); + Socket.DidNotReceive().Send(Arg.Any(), Arg.Any(), Arg.Any()); + } + + _connection.FlushBatch(); + + var totalLength_reliable = 1 + (2 * lessThanBatchLengths_reliable.Count()) + lessThanBatchLengths_reliable.Sum(); + var totalLength_unreliable = 1 + (2 * lessThanBatchLengths_unreliable.Count()) + lessThanBatchLengths_unreliable.Sum(); + + // only 2 at any length + Socket.Received(2).Send(Arg.Any(), Arg.Any(), Arg.Any()); + // but also check we received length + Socket.Received(1).Send(Arg.Any(), Arg.Any(), totalLength_reliable); + Socket.Received(1).Send(Arg.Any(), Arg.Any(), totalLength_unreliable); + + // check packet was correct + CheckMessage(PacketType.Reliable, 0, 2, lessThanBatchLengths_reliable); + CheckMessage(PacketType.Unreliable, 1, 2, lessThanBatchLengths_unreliable); + + // clear calls after, so we are ready to process next message + Socket.ClearReceivedCalls(); + _sentArrays.Clear(); + } + + [Test] + public void FlushDoesNotSendEmptyMessage() + { + _connection.FlushBatch(); + Socket.DidNotReceive().Send(Arg.Any(), Arg.Any(), Arg.Any()); + _connection.FlushBatch(); + Socket.DidNotReceive().Send(Arg.Any(), Arg.Any(), Arg.Any()); + } + + [Test] + [TestCase(true)] + [TestCase(false)] + public void UnbatchesMessageOnReceive(bool reliable) + { + var receive = _bufferPool.Take(); + receive.array[0] = (byte)(reliable ? PacketType.Reliable : PacketType.Unreliable); + var offset = 1; + AddMessage(receive.array, ref offset, 10); + AddMessage(receive.array, ref offset, 30); + AddMessage(receive.array, ref offset, 20); + + var segments = new List>(); + _peerInstance.dataHandler + .When(x => x.ReceiveMessage(_connection, Arg.Any>())) + .Do(x => segments.Add(x.ArgAt>(1))); + if (reliable) + _connection.ReceiveReliablePacket(new Packet(receive, offset)); + else + _connection.ReceiveUnreliablePacket(new Packet(receive, offset)); + _peerInstance.dataHandler.Received(3).ReceiveMessage(_connection, Arg.Any>()); + + + Assert.That(segments[0].Count, Is.EqualTo(10)); + Assert.That(segments[1].Count, Is.EqualTo(30)); + Assert.That(segments[2].Count, Is.EqualTo(20)); + Assert.That(segments[0].SequenceEqual(new ArraySegment(_buffer, 0, 10))); + Assert.That(segments[1].SequenceEqual(new ArraySegment(_buffer, 0, 30))); + Assert.That(segments[2].SequenceEqual(new ArraySegment(_buffer, 0, 20))); + } + + private void AddMessage(byte[] receive, ref int offset, int size) + { + ByteUtils.WriteUShort(receive, ref offset, (ushort)size); + Buffer.BlockCopy(_buffer, 0, receive, offset, size); + offset += size; + } + + [Test] + public void SendingToUnreliableUsesUnreliable() + { + var counts = new List() { 10, 20 }; + _connection.SendUnreliable(_buffer, 0, counts[0]); + _connection.SendUnreliable(_buffer, 0, counts[1]); + _connection.FlushBatch(); + + AssertSentPacket(PacketType.Unreliable, counts); + } + + [Test] + public void SendingToNotifyUsesUnreliable() + { + var counts = new List() { 10, 20 }; + _connection.SendNotify(_buffer, 0, counts[0]); + _connection.SendNotify(_buffer, 0, counts[1]); + _connection.FlushBatch(); + + // only 1 at any length + Socket.Received(2).Send(Arg.Any(), Arg.Any(), Arg.Any()); + // but also check we received length + Socket.Received(1).Send(Arg.Any(), Arg.Any(), AckSystem.NOTIFY_HEADER_SIZE + counts[0]); + Socket.Received(1).Send(Arg.Any(), Arg.Any(), AckSystem.NOTIFY_HEADER_SIZE + counts[1]); + + // check packet was correct + CheckMessage(PacketType.Notify, 0, 2, counts.Take(1), AckSystem.NOTIFY_HEADER_SIZE - 1); + CheckMessage(PacketType.Notify, 1, 2, counts.Skip(1).Take(1), AckSystem.NOTIFY_HEADER_SIZE - 1); + + // clear calls after, so we are ready to process next message + Socket.ClearReceivedCalls(); + _sentArrays.Clear(); + } + [Test] + public void SendingToNotifyTokenUsesUnreliable() + { + var token = Substitute.For(); + var counts = new List() { 10, 20 }; + _connection.SendNotify(_buffer, 0, counts[0], token); + _connection.SendNotify(_buffer, 0, counts[1], token); + _connection.FlushBatch(); + + // only 1 at any length + Socket.Received(2).Send(Arg.Any(), Arg.Any(), Arg.Any()); + // but also check we received length + Socket.Received(1).Send(Arg.Any(), Arg.Any(), AckSystem.NOTIFY_HEADER_SIZE + counts[0]); + Socket.Received(1).Send(Arg.Any(), Arg.Any(), AckSystem.NOTIFY_HEADER_SIZE + counts[1]); + + // check packet was correct + CheckMessage(PacketType.Notify, 0, 2, counts.Take(1), AckSystem.NOTIFY_HEADER_SIZE - 1); + CheckMessage(PacketType.Notify, 1, 2, counts.Skip(1).Take(1), AckSystem.NOTIFY_HEADER_SIZE - 1); + + // clear calls after, so we are ready to process next message + Socket.ClearReceivedCalls(); + _sentArrays.Clear(); + } + + [Test] + [Ignore("Not implemented")] + public void NotifyOnDeliveredInvokeAfterReceivingReply() + { + var counts = new List() { 10, 20 }; + var token = _connection.SendNotify(_buffer, 0, counts[0]); + + var action = Substitute.For(); + token.Delivered += action; + action.DidNotReceive().Invoke(); + + // todo receive message here, and then check if Delivered is infact called + } + + [Test] + [Ignore("Not implemented")] + public void NotifyTokenOnDeliveredInvokeAfterReceivingReply() + { + var token = Substitute.For(); + var counts = new List() { 10, 20 }; + _connection.SendNotify(_buffer, 0, counts[0], token); + token.DidNotReceive().OnDelivered(); + + // todo receive message here, and then check if Delivered is infact called + } + } +} diff --git a/Assets/Tests/SocketLayer/AckSystem/PassthroughConnectionTest.cs.meta b/Assets/Tests/SocketLayer/AckSystem/PassthroughConnectionTest.cs.meta new file mode 100644 index 0000000000..3071ba5eb7 --- /dev/null +++ b/Assets/Tests/SocketLayer/AckSystem/PassthroughConnectionTest.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: a78f3f5648f4170458c64f606fa76afd +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: