Skip to content

Commit

Permalink
feat: adding option to pass channel type to socket
Browse files Browse the repository at this point in the history
useful when using steam or epic relay and you want to use their reliability
  • Loading branch information
James-Frowen committed Apr 27, 2024
1 parent 2e4149f commit 5418d39
Show file tree
Hide file tree
Showing 12 changed files with 611 additions and 83 deletions.
2 changes: 1 addition & 1 deletion Assets/Mirage/Runtime/SocketLayer/ByteBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace Mirage.SocketLayer
{
/// <summary>
/// Warpper around a byte[] that belongs to a <see cref="Pool{T}"/>
/// Wrapper around a byte[] that belongs to a <see cref="Pool{T}"/>
/// </summary>
public sealed class ByteBuffer : IDisposable
{
Expand Down
5 changes: 5 additions & 0 deletions Assets/Mirage/Runtime/SocketLayer/Config.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public class Config
/// Enable if the Socket you are using has its own Reliable layer. For example using Websocket, which is TCP.
/// </summary>
public bool DisableReliableLayer = false;

/// <summary>
/// Passes channel to socket rather than Peer handling reliable layer. This is useful when using a relay which has its own reliable layer, such as Steam or Epic Relay
/// </summary>
public bool PassthroughReliableLayer = false;
#endregion
}
}
8 changes: 4 additions & 4 deletions Assets/Mirage/Runtime/SocketLayer/Connection/Batch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,17 @@ public void Flush()

public class ArrayBatch : Batch
{
private readonly Action<byte[], int> _send;
private readonly IRawConnection _connection;
private readonly PacketType _packetType;

private readonly byte[] _batch;
private int _batchLength;

public ArrayBatch(int maxPacketSize, Action<byte[], int> send, PacketType reliable)
public ArrayBatch(int maxPacketSize, IRawConnection connection, PacketType reliable)
: base(maxPacketSize)
{
_batch = new byte[maxPacketSize];
_send = send;
_connection = connection;
_packetType = reliable;
}

Expand All @@ -84,7 +84,7 @@ protected override void CreateNewBatch()

protected override void SendAndReset()
{
_send.Invoke(_batch, _batchLength);
_connection.SendRaw(_batch, _batchLength);
_batchLength = 0;
}
}
Expand Down
7 changes: 6 additions & 1 deletion Assets/Mirage/Runtime/SocketLayer/Connection/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Mirage.SocketLayer
{
internal abstract class Connection : IConnection
internal abstract class Connection : IConnection, IRawConnection
{
protected readonly ILogger _logger;
protected readonly int _maxPacketSize;
Expand Down Expand Up @@ -73,6 +73,11 @@ protected Connection(Peer peer, IEndPoint endPoint, IDataHandler dataHandler, Co
_metrics = metrics;
}

void IRawConnection.SendRaw(byte[] packet, int length)
{
_peer.Send(this, packet, length);
}

public override string ToString()
{
return $"[{EndPoint}]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

namespace Mirage.SocketLayer
{

/// <summary>
/// Connection that does not run its own reliablity layer, good for TCP sockets
/// Connection that does not run its own reliability layer, good for TCP sockets
/// </summary>
internal sealed class NoReliableConnection : Connection
{
Expand All @@ -15,19 +16,14 @@ internal sealed class NoReliableConnection : Connection
internal NoReliableConnection(Peer peer, IEndPoint endPoint, IDataHandler dataHandler, Config config, int maxPacketSize, Time time, ILogger logger, Metrics metrics)
: base(peer, endPoint, dataHandler, config, maxPacketSize, time, logger, metrics)
{
_nextBatchReliable = new ArrayBatch(maxPacketSize, SendBatchInternal, PacketType.Reliable);
_nextBatchReliable = new ArrayBatch(maxPacketSize, this, PacketType.Reliable);

if (maxPacketSize > ushort.MaxValue)
{
throw new ArgumentException($"Max package size can not bigger than {ushort.MaxValue}. NoReliableConnection uses 2 bytes for message length, maxPacketSize over that value will mean that message will be incorrectly batched.");
}
}

private void SendBatchInternal(byte[] batch, int length)
{
_peer.Send(this, batch, length);
}

// just sue SendReliable for unreliable/notify
// note: we dont need to pass in that it is reliable, receiving doesn't really care what channel it is
public override void SendUnreliable(byte[] packet, int offset, int length) => SendReliable(packet, offset, length);
Expand Down
149 changes: 149 additions & 0 deletions Assets/Mirage/Runtime/SocketLayer/Connection/PassthroughConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
using System;
using UnityEngine;

namespace Mirage.SocketLayer
{
internal class PassthroughConnection : Connection, IRawConnection
{
private const int HEADER_SIZE = 1 + Batch.MESSAGE_LENGTH_SIZE;

private readonly Batch _reliableBatch;
private readonly Batch _unreliableBatch;
private readonly AckSystem _ackSystem;

public PassthroughConnection(Peer peer, IEndPoint endPoint, IDataHandler dataHandler, Config config, int maxPacketSize, Time time, Pool<ByteBuffer> bufferPool, ILogger logger, Metrics metrics)
: base(peer, endPoint, dataHandler, config, maxPacketSize, time, logger, metrics)
{
_reliableBatch = new ArrayBatch(maxPacketSize, this, PacketType.Reliable);
_unreliableBatch = new ArrayBatch(maxPacketSize, this, PacketType.Unreliable);
_ackSystem = new AckSystem(this, config, maxPacketSize, time, bufferPool, logger, metrics);

if (maxPacketSize > ushort.MaxValue)
{
throw new ArgumentException($"Max package size can not bigger than {ushort.MaxValue}. NoReliableConnection uses 2 bytes for message length, maxPacketSize over that value will mean that message will be incorrectly batched.");
}
}

public override void SendUnreliable(byte[] message, int offset, int length)
{
ThrowIfNotConnectedOrConnecting();

if (length + HEADER_SIZE > _maxPacketSize)
{
throw new ArgumentException($"Message is bigger than MTU, size:{length} but max message size is {_maxPacketSize - HEADER_SIZE}");
}

_unreliableBatch.AddMessage(message, offset, length);
_metrics?.OnSendMessageUnreliable(length);
}

/// <summary>
/// Use <see cref="INotifyCallBack"/> version for non-alloc
/// </summary>
public override INotifyToken SendNotify(byte[] packet, int offset, int length)
{
ThrowIfNotConnectedOrConnecting();
var token = _ackSystem.SendNotify(packet, offset, length);
_metrics?.OnSendMessageNotify(length);
return token;
}

/// <summary>
/// Use <see cref="INotifyCallBack"/> version for non-alloc
/// </summary>
public override void SendNotify(byte[] packet, int offset, int length, INotifyCallBack callBacks)
{
ThrowIfNotConnectedOrConnecting();
_ackSystem.SendNotify(packet, offset, length, callBacks);
_metrics?.OnSendMessageNotify(length);
}

/// <summary>
/// single message, batched by AckSystem
/// </summary>
/// <param name="message"></param>
public override void SendReliable(byte[] message, int offset, int length)
{
ThrowIfNotConnectedOrConnecting();

if (length + HEADER_SIZE > _maxPacketSize)
{
throw new ArgumentException($"Message is bigger than MTU, size:{length} but max message size is {_maxPacketSize - HEADER_SIZE}");
}

_reliableBatch.AddMessage(message, offset, length);
_metrics?.OnSendMessageReliable(length);
}

internal override void ReceiveUnreliablePacket(Packet packet)
{
HandleReliableBatched(packet.Buffer.array, 1, packet.Length, PacketType.Unreliable);
}

internal override void ReceiveReliablePacket(Packet packet)
{
HandleReliableBatched(packet.Buffer.array, 1, packet.Length, PacketType.Reliable);
}

internal override void ReceiveReliableFragment(Packet packet) => throw new NotSupportedException();

internal override void ReceiveNotifyPacket(Packet packet)
{
var segment = _ackSystem.ReceiveNotify(packet.Buffer.array, packet.Length);
if (segment != default)
{
_metrics?.OnReceiveMessageNotify(packet.Length);
_dataHandler.ReceiveMessage(this, segment);
}
}

internal override void ReceiveNotifyAck(Packet packet)
{
_ackSystem.ReceiveAck(packet.Buffer.array);
}

public override void FlushBatch()
{
_ackSystem.Update();
_reliableBatch.Flush();
_unreliableBatch.Flush();
}

internal override bool IsValidSize(Packet packet)
{
const int minPacketSize = 1;

var length = packet.Length;
if (length < minPacketSize)
return false;

// Min size of message given to Mirage
const int minMessageSize = 2;

const int minCommandSize = 2;
const int minUnreliableSize = 1 + minMessageSize;

switch (packet.Type)
{
case PacketType.Command:
return length >= minCommandSize;

case PacketType.Reliable:
case PacketType.Unreliable:
return length >= minUnreliableSize;

case PacketType.Notify:
return length >= AckSystem.NOTIFY_HEADER_SIZE + minMessageSize;
case PacketType.Ack:
return length >= AckSystem.ACK_HEADER_SIZE;
case PacketType.ReliableFragment:
// not supported
return false;

default:
case PacketType.KeepAlive:
return true;
}
}
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 2 additions & 14 deletions Assets/Mirage/Runtime/SocketLayer/Connection/ReliableConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,23 @@ namespace Mirage.SocketLayer
/// <summary>
/// Objects that represents a connection to/from a server/client. Holds state that is needed to update, send, and receive data
/// </summary>
internal sealed class ReliableConnection : Connection, IRawConnection, IDisposable
internal sealed class ReliableConnection : Connection, IDisposable
{
private readonly AckSystem _ackSystem;
private readonly Batch _unreliableBatch;
private readonly Pool<ByteBuffer> _bufferPool;

internal ReliableConnection(Peer peer, IEndPoint endPoint, IDataHandler dataHandler, Config config, int maxPacketSize, Time time, Pool<ByteBuffer> bufferPool, ILogger logger, Metrics metrics)
: base(peer, endPoint, dataHandler, config, maxPacketSize, time, logger, metrics)
{
_bufferPool = bufferPool;
_unreliableBatch = new ArrayBatch(_maxPacketSize, SendBatchInternal, PacketType.Unreliable);
_unreliableBatch = new ArrayBatch(_maxPacketSize, this, PacketType.Unreliable);
_ackSystem = new AckSystem(this, config, maxPacketSize, time, bufferPool, logger, metrics);
}

private void SendBatchInternal(byte[] batch, int length)
{
_peer.Send(this, batch, length);
}

public void Dispose()
{
_ackSystem.Dispose();
}

void IRawConnection.SendRaw(byte[] packet, int length)
{
_peer.Send(this, packet, length);
}

/// <summary>
/// Use <see cref="INotifyCallBack"/> version for non-alloc
/// </summary>
Expand Down
4 changes: 4 additions & 0 deletions Assets/Mirage/Runtime/SocketLayer/Peer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,10 @@ private Connection CreateNewConnection(IEndPoint newEndPoint)
{
connection = new NoReliableConnection(this, endPoint, _dataHandler, _config, _maxPacketSize, _time, _logger, _metrics);
}
else if (_config.PassthroughReliableLayer)
{
connection = new PassthroughConnection(this, endPoint, _dataHandler, _config, _maxPacketSize, _time, _bufferPool, _logger, _metrics);
}
else
{
connection = new ReliableConnection(this, endPoint, _dataHandler, _config, _maxPacketSize, _time, _bufferPool, _logger, _metrics);
Expand Down
Loading

0 comments on commit 5418d39

Please sign in to comment.