Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TCP transport not ThreadSafe #111

Open
inf9144 opened this issue Feb 23, 2024 · 4 comments
Open

TCP transport not ThreadSafe #111

inf9144 opened this issue Feb 23, 2024 · 4 comments

Comments

@inf9144
Copy link

inf9144 commented Feb 23, 2024

Hey, the implementation of TCP Transport is not thread safe and you can get in situations where the stream is null while tcp client is connected. Also reconnecting does not follow a proper path. I switched to a custom ITransport implementation which solves thoses problems for me:

public class CustomizedTcpTransport : ITransport, IDisposable {
   private const int _defaultPort = 12201;

   private readonly GraylogSinkOptionsBase _options;
   private readonly SemaphoreSlim _lock;

   private TcpClient? _client;
   private Stream? _stream;

   public delegate CustomizedTcpTransport Factory(GraylogSinkOptionsBase options);
   public CustomizedTcpTransport(GraylogSinkOptionsBase options) {
      _options = options;
      _lock = new(1, 1);
   }

   public Task Send(string message) {
      var payload = new byte[message.Length + 1];
      System.Text.Encoding.UTF8.GetBytes(message.AsSpan(), payload.AsSpan());
      payload[^1] = 0x00;

      return Send(payload);
   }

   private async Task Send(byte[] payload) {
      await _lock.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
      try {
         var stream = await EnsureConnection().ConfigureAwait(continueOnCapturedContext: false);
         await stream.WriteAsync(payload).ConfigureAwait(continueOnCapturedContext: false);
         await stream.FlushAsync().ConfigureAwait(continueOnCapturedContext: false);
      } finally {
         _lock.Release();
      }
   }

   private async Task<Stream> EnsureConnection() {
      if (_client is not null && _client.Connected && _stream is not null)
         return _stream;

      _stream?.Dispose();
      _stream = null;
      _client?.Dispose();
      _client = new TcpClient();

      var iPAddress = await GetIpAddress(_options.HostnameOrAddress ?? throw new InvalidOperationException("Could not find hostname to connect to!"));
      if (iPAddress == null)
         throw new InvalidOperationException($"IP address of host '{_options.HostnameOrAddress}' could not be resolved.");

      var port = _options.Port.GetValueOrDefault(12201);
      var sslHost = _options.UseSsl ? _options.HostnameOrAddress : null;
      await _client.ConnectAsync(iPAddress, port).ConfigureAwait(continueOnCapturedContext: false);
      _stream = _client.GetStream();
      if (!string.IsNullOrWhiteSpace(sslHost)) {
         var sslStream = new SslStream(_stream, false);
         await sslStream.AuthenticateAsClientAsync(sslHost).ConfigureAwait(continueOnCapturedContext: false);
         if (sslStream.RemoteCertificate != null) {
            SelfLog.WriteLine("Remote cert was issued to {0} and is valid from {1} until {2}.", sslStream.RemoteCertificate.Subject, sslStream.RemoteCertificate.GetEffectiveDateString(), sslStream.RemoteCertificate.GetExpirationDateString());
            _stream = (Stream?) (object) sslStream;
         } else {
            SelfLog.WriteLine("Remote certificate is null.");
         }

         if (_stream is null)
            throw new InvalidOperationException($"SSL connection with host '{_options.HostnameOrAddress}' could not be established.");
      }

      return _stream;
   }

   public async Task<IPAddress?> GetIpAddress(string hostNameOrAddress) {
      var addresses = await Dns.GetHostAddressesAsync(hostNameOrAddress).ConfigureAwait(false);
      return addresses.FirstOrDefault(c => c.AddressFamily == AddressFamily.InterNetwork);
   }

   public void Dispose() {
      GC.SuppressFinalize(this);
      _stream?.Dispose();
      _client?.Dispose();
   }
}

Are you open for PRs? I could try to integrate this into you original classes.

@mikeries
Copy link

I believe I've also run into issues with this... I usually get the following exception, but I occasionally see others, including one about connecting to an already connected client.

2024-09-24T18:49:03.6418124Z Oops something going wrong System.AggregateException: One or more errors occurred. (An invalid argument was supplied.)
---> System.Net.Sockets.SocketException (10022): An invalid argument was supplied.
at System.Net.Sockets.Socket.UpdateStatusAfterSocketErrorAndThrowException(SocketError error, Boolean disconnectOnFailure, String callerName)
at System.Net.Sockets.Socket.DoBind(EndPoint endPointSnapshot, SocketAddress socketAddress)
at System.Net.Sockets.Socket.ConnectAsync(SocketAsyncEventArgs e, Boolean userSocket, Boolean saeaCancelable)
at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.ConnectAsync(Socket socket)
at System.Net.Sockets.Socket.ConnectAsync(EndPoint remoteEP, CancellationToken cancellationToken)
at System.Net.Sockets.Socket.ConnectAsync(EndPoint remoteEP)
at System.Net.Sockets.TcpClient.ConnectAsync(IPAddress address, Int32 port)
at Serilog.Sinks.Graylog.Core.Transport.Tcp.TcpTransportClient.Connect()
at Serilog.Sinks.Graylog.Core.Transport.Tcp.TcpTransportClient.EnsureConnection()
at Serilog.Sinks.Graylog.Core.Transport.Tcp.TcpTransportClient.Send(Byte[] payload)
--- End of inner exception stack trace ---

@mikeries
Copy link

Is there a simple workaround? I'd rather not have to create and maintain a custom fork of this project, but as it stands it is completely unusable for me.

@mikeries
Copy link

mikeries commented Sep 26, 2024

@inf9144 Thanks for the code. I have managed to get it working as a custom transport, but I'm not sure I configured the TransportFactory correctly... it seems a little janky.

Here's what I have. Is there a neater solution?

var graylogOptions = new GraylogSinkOptions()
{
    HostnameOrAddress = "redacted.ip.address",
    Port = 12201,
    TransportType = Serilog.Sinks.Graylog.Core.Transport.TransportType.Custom,
    UseSsl = true
};
graylogOptions.TransportFactory = () => new CustomizedTcpTransport(graylogOptions);

Log.Logger = new LoggerConfiguration()
              .ReadFrom.Configuration(config)
              .WriteTo.Graylog(graylogOptions)
              .CreateLogger();```

@CwjXFH
Copy link

CwjXFH commented Oct 17, 2024

Maybe we have the same problem: #115

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants