Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Web Socket Implementation Fixes #3773

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 40 additions & 14 deletions src/BizHawk.Client.Common/Api/ClientWebSocketWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,45 +5,63 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;

namespace BizHawk.Client.Common
{
public struct ClientWebSocketWrapper
public class ClientWebSocketWrapper
{
private ClientWebSocket? _w;

private List<string> _receivedMessages;

Uri _uri;

/// <summary>calls <see cref="ClientWebSocket.State"/> getter (unless closed/disposed, then <see cref="WebSocketState.Closed"/> is always returned)</summary>
public WebSocketState State => _w?.State ?? WebSocketState.Closed;

public ClientWebSocketWrapper(Uri uri, CancellationToken? cancellationToken = null)
public ClientWebSocketWrapper(Uri uri, int bufferSize, int maxMessages)
{
_uri = uri;
_w = new ClientWebSocket();
_w.ConnectAsync(uri, cancellationToken ?? CancellationToken.None).Wait();
_receivedMessages = new List<string>();
try{
Connect(bufferSize, maxMessages).Wait();
}
catch(Exception ex){}
}

/// <summary>calls <see cref="ClientWebSocket.CloseAsync"/></summary>
/// <remarks>also calls <see cref="ClientWebSocket.Dispose"/> (wrapper property <see cref="State"/> will continue to work, method calls will throw <see cref="ObjectDisposedException"/>)</remarks>
public Task Close(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken? cancellationToken = null)
{
if (_w == null) throw new ObjectDisposedException(nameof(_w));
var task = _w.CloseAsync(closeStatus, statusDescription, cancellationToken ?? CancellationToken.None);
var task = _w.CloseOutputAsync(closeStatus, statusDescription, cancellationToken ?? CancellationToken.None);
_w.Dispose();
_w = null;
return task;
}

/// <summary>calls <see cref="ClientWebSocket.ReceiveAsync"/></summary>
public Task<WebSocketReceiveResult> Receive(ArraySegment<byte> buffer, CancellationToken? cancellationToken = null)
=> _w?.ReceiveAsync(buffer, cancellationToken ?? CancellationToken.None)
?? throw new ObjectDisposedException(nameof(_w));
public async Task Receive(int bufferSize, int maxMessages){
var buffer = new ArraySegment<byte>(new byte[bufferSize]);
while (_w != null && _w.State == WebSocketState.Open)
{
WebSocketReceiveResult result;
result = await _w.ReceiveAsync(buffer, CancellationToken.None);
if (maxMessages == 0 || _receivedMessages.Count < maxMessages)
_receivedMessages.Add(Encoding.UTF8.GetString(buffer.Array,0,result.Count));
}
}

/// <summary>calls <see cref="ClientWebSocket.ReceiveAsync"/></summary>
public string Receive(int bufferCap, CancellationToken? cancellationToken = null)
{
if (_w == null) throw new ObjectDisposedException(nameof(_w));
var buffer = new byte[bufferCap];
var result = Receive(new ArraySegment<byte>(buffer), cancellationToken ?? CancellationToken.None).Result;
return Encoding.UTF8.GetString(buffer, 0, result.Count);
public async Task Connect(int bufferSize, int maxMessages){
if (_w == null){
_w = new ClientWebSocket();
}
if(_w != null && _w.State != WebSocketState.Open){
_w.ConnectAsync(_uri, CancellationToken.None).Wait();
Receive(bufferSize, maxMessages);
}
}

/// <summary>calls <see cref="ClientWebSocket.SendAsync"/></summary>
Expand All @@ -62,5 +80,13 @@ public Task Send(string message, bool endOfMessage, CancellationToken? cancellat
cancellationToken ?? CancellationToken.None
);
}

public string GetMessage()
{
if (_receivedMessages == null || _receivedMessages.Count == 0) return "";
string returnThis = _receivedMessages[0];
_receivedMessages.RemoveAt(0);
return returnThis;
}
}
}
2 changes: 0 additions & 2 deletions src/BizHawk.Client.Common/Api/Interfaces/ICommApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ public interface ICommApi : IExternalApi

SocketServer? Sockets { get; }

#if ENABLE_WEBSOCKETS
WebSocketServer WebSockets { get; }
#endif

string? HttpTest();

Expand Down
2 changes: 1 addition & 1 deletion src/BizHawk.Client.Common/Api/WebSocketServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ namespace BizHawk.Client.Common
{
public sealed class WebSocketServer
{
public ClientWebSocketWrapper Open(Uri uri, CancellationToken? cancellationToken = null) => new ClientWebSocketWrapper(uri, cancellationToken);
public ClientWebSocketWrapper Open(Uri uri, int bufferSize, int maxMessages) => new ClientWebSocketWrapper(uri, bufferSize, maxMessages);
}
}
31 changes: 17 additions & 14 deletions src/BizHawk.Client.Common/lua/CommonLibs/CommLuaLibrary.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
using System.ComponentModel;
using System.Linq;
using System.Text;
using System.Net.WebSockets;

using NLua;
using System.Threading.Tasks;

namespace BizHawk.Client.Common
{
Expand Down Expand Up @@ -254,20 +256,22 @@ private void CheckHttp()
}
}

#if ENABLE_WEBSOCKETS
[LuaMethod("ws_open", "Opens a websocket and returns the id so that it can be retrieved later.")]
[LuaMethod("ws_open", "Opens a websocket and returns the id so that it can be retrieved later. If an id is provided, reconnects to the ")]
[LuaMethodExample("local ws_id = comm.ws_open(\"wss://echo.websocket.org\");")]
public string WebSocketOpen(string uri)
public string WebSocketOpen(string uri, string guid = null, int bufferSize = 1024, int maxMessages = 20)
{
var wsServer = APIs.Comm.WebSockets;
var localGuid = guid == null ? new Guid() : Guid.Parse(guid);
if (wsServer == null)
{
Log("WebSocket server is somehow not available");
Log("WebSocket server is not available");
return null;
}
var guid = new Guid();
_websockets[guid] = wsServer.Open(new Uri(uri));
return guid.ToString();
if (guid == null)
_websockets[localGuid] = wsServer.Open(new Uri(uri),bufferSize, maxMessages);
else
_websockets[localGuid].Connect(bufferSize, maxMessages);
return localGuid.ToString();
}

[LuaMethod("ws_send", "Send a message to a certain websocket id (boolean flag endOfMessage)")]
Expand All @@ -280,11 +284,11 @@ public void WebSocketSend(
if (_websockets.TryGetValue(Guid.Parse(guid), out var wrapper)) wrapper.Send(content, endOfMessage);
}

[LuaMethod("ws_receive", "Receive a message from a certain websocket id and a maximum number of bytes to read")]
[LuaMethodExample("local ws = comm.ws_receive(ws_id, str_len);")]
public string WebSocketReceive(string guid, int bufferCap)
[LuaMethod("ws_receive", "Get a receive message from a certain websocket id")]
[LuaMethodExample("local ws = comm.ws_receive(ws_id);")]
public string WebSocketReceive(string guid)
=> _websockets.TryGetValue(Guid.Parse(guid), out var wrapper)
? wrapper.Receive(bufferCap)
? wrapper.GetMessage()
: null;

[LuaMethod("ws_get_status", "Get a websocket's status")]
Expand All @@ -298,11 +302,10 @@ public string WebSocketReceive(string guid, int bufferCap)
[LuaMethodExample("local ws_status = comm.ws_close(ws_id, close_status);")]
public void WebSocketClose(
string guid,
WebSocketCloseStatus status,
int status,
string closeMessage)
{
if (_websockets.TryGetValue(Guid.Parse(guid), out var wrapper)) wrapper.Close(status, closeMessage);
if (_websockets.TryGetValue(Guid.Parse(guid), out var wrapper)) wrapper.Close((WebSocketCloseStatus)status, closeMessage);
}
#endif
}
}