diff --git a/src/NetMQ.Tests/CleanupTests.cs b/src/NetMQ.Tests/CleanupTests.cs new file mode 100644 index 0000000..bee7b83 --- /dev/null +++ b/src/NetMQ.Tests/CleanupTests.cs @@ -0,0 +1,71 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using NetMQ.Sockets; +using NUnit.Framework; + +namespace NetMQ.Tests +{ + [TestFixture] + public class CleanupTests + { + [Test] + public void Block() + { + const int count = 1000; + + NetMQConfig.Linger = TimeSpan.FromSeconds(0.5); + + using (var client = new DealerSocket(">tcp://localhost:5557")) + { + // Sending a lot of messages + client.Options.SendHighWatermark = count; + for (int i = 0; i < count; i++) + { + client.SendFrame("Hello"); + } + } + + Stopwatch stopwatch = Stopwatch.StartNew(); + NetMQConfig.Cleanup(); + stopwatch.Stop(); + + Assert.Greater(stopwatch.ElapsedMilliseconds, 500); + } + + [Test] + public void NoBlock() + { + const int count = 1000; + + NetMQConfig.Linger = TimeSpan.FromSeconds(0.5); + + using (var client = new DealerSocket(">tcp://localhost:5557")) + { + // Sending a lot of messages + client.Options.SendHighWatermark = count; + for (int i = 0; i < count; i++) + { + client.SendFrame("Hello"); + } + } + + Stopwatch stopwatch = Stopwatch.StartNew(); + NetMQConfig.Cleanup(false); + stopwatch.Stop(); + + Assert.Less(stopwatch.ElapsedMilliseconds, 500); + } + + [Test] + public void NoBlockNoDispose() + { + var client = new DealerSocket(">tcp://localhost:5557"); + NetMQConfig.Cleanup(false); + } + } +} diff --git a/src/NetMQ.Tests/NetMQ.Tests.csproj b/src/NetMQ.Tests/NetMQ.Tests.csproj index e2b817a..929c3a5 100644 --- a/src/NetMQ.Tests/NetMQ.Tests.csproj +++ b/src/NetMQ.Tests/NetMQ.Tests.csproj @@ -72,6 +72,7 @@ + @@ -91,6 +92,7 @@ + diff --git a/src/NetMQ.Tests/NetMQ3.5.Tests.csproj b/src/NetMQ.Tests/NetMQ3.5.Tests.csproj index 84d12db..646106d 100644 --- a/src/NetMQ.Tests/NetMQ3.5.Tests.csproj +++ b/src/NetMQ.Tests/NetMQ3.5.Tests.csproj @@ -73,6 +73,7 @@ + @@ -84,6 +85,7 @@ + diff --git a/src/NetMQ.Tests/NetMQMonitorTests.cs b/src/NetMQ.Tests/NetMQMonitorTests.cs index 14cb99b..0fa517a 100644 --- a/src/NetMQ.Tests/NetMQMonitorTests.cs +++ b/src/NetMQ.Tests/NetMQMonitorTests.cs @@ -8,7 +8,7 @@ namespace NetMQ.Tests { - [TestFixture] + [TestFixture(Category = "Monitor")] public class NetMQMonitorTests { [Test] @@ -16,7 +16,7 @@ public void Monitoring() { using (var rep = new ResponseSocket()) using (var req = new RequestSocket()) - using (var monitor = new NetMQMonitor(rep, "inproc://rep.inproc", SocketEvents.Accepted | SocketEvents.Listening)) + using (var monitor = new NetMQMonitor(rep, $"inproc://rep.inproc", SocketEvents.Accepted | SocketEvents.Listening)) { var listening = false; var accepted = false; @@ -28,6 +28,8 @@ public void Monitoring() var monitorTask = Task.Factory.StartNew(monitor.Start); + Thread.Sleep(10); + var port = rep.BindRandomPort("tcp://127.0.0.1"); req.Connect("tcp://127.0.0.1:" + port); @@ -47,8 +49,8 @@ public void Monitoring() Thread.Sleep(200); - Assert.IsTrue(monitorTask.IsCompleted); - } + Assert.IsTrue(monitorTask.IsCompleted); + } } #if !NET35 @@ -102,6 +104,7 @@ public void ErrorCodeTest() monitor.Timeout = TimeSpan.FromMilliseconds(100); var monitorTask = Task.Factory.StartNew(monitor.Start); + Thread.Sleep(10); var port = rep.BindRandomPort("tcp://127.0.0.1"); diff --git a/src/NetMQ.Tests/Setup.cs b/src/NetMQ.Tests/Setup.cs new file mode 100644 index 0000000..202014e --- /dev/null +++ b/src/NetMQ.Tests/Setup.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using NUnit.Framework; + +namespace NetMQ.Tests +{ + [SetUpFixture] + public class Setup + { + [TearDown] + public void TearDown() + { + NetMQConfig.Cleanup(false); + } + } +} diff --git a/src/NetMQ/Core/Command.cs b/src/NetMQ/Core/Command.cs index e08f2ed..bcf810e 100644 --- a/src/NetMQ/Core/Command.cs +++ b/src/NetMQ/Core/Command.cs @@ -52,7 +52,7 @@ public Command([CanBeNull] ZObject destination, CommandType type, [CanBeNull] ob /// Get the argument to this command. /// [CanBeNull] - public object Arg { get; private set; } + public object Arg { get; private set; } /// /// Override of ToString, which returns a string in the form [ command-type, destination ]. diff --git a/src/NetMQ/Core/CommandType.cs b/src/NetMQ/Core/CommandType.cs index 86a8d02..93d8413 100644 --- a/src/NetMQ/Core/CommandType.cs +++ b/src/NetMQ/Core/CommandType.cs @@ -101,6 +101,11 @@ internal enum CommandType /// Sent by reaper thread to the term thread when all the sockets /// have successfully been deallocated. /// - Done + Done, + + /// + /// Send to reaper to stop the reaper immediatly + /// + ForceStop } } \ No newline at end of file diff --git a/src/NetMQ/Core/Ctx.cs b/src/NetMQ/Core/Ctx.cs index ba3b4a3..0bebe48 100644 --- a/src/NetMQ/Core/Ctx.cs +++ b/src/NetMQ/Core/Ctx.cs @@ -36,8 +36,8 @@ namespace NetMQ.Core /// Internal analog of the public class. internal sealed class Ctx { - private const int DefaultIOThreads = 1; - private const int DefaultMaxSockets = 1024; + internal const int DefaultIOThreads = 1; + internal const int DefaultMaxSockets = 1024; #region Nested class: Endpoint @@ -217,8 +217,12 @@ public void Terminate() foreach (var socket in m_sockets) socket.Stop(); - if (m_sockets.Count == 0) - m_reaper.Stop(); + if (!Block) + { + m_reaper.ForceStop(); + } + else if (m_sockets.Count == 0) + m_reaper.Stop(); } finally { @@ -234,8 +238,7 @@ public void Terminate() Debug.Assert(found); Debug.Assert(command.CommandType == CommandType.Done); - Monitor.Enter(m_slotSync); - Debug.Assert(m_sockets.Count == 0); + Monitor.Enter(m_slotSync); } else Monitor.Enter(m_slotSync); diff --git a/src/NetMQ/Core/Reaper.cs b/src/NetMQ/Core/Reaper.cs index 1c7b4d3..7f93818 100644 --- a/src/NetMQ/Core/Reaper.cs +++ b/src/NetMQ/Core/Reaper.cs @@ -114,6 +114,11 @@ public void Stop() SendStop(); } + public void ForceStop() + { + SendForceStop(); + } + /// /// Handle input-ready events, by receiving and processing any commands /// that are waiting in the mailbox. @@ -168,6 +173,14 @@ protected override void ProcessStop() } } + protected override void ProcessForceStop() + { + m_terminating = true; + SendDone(); + m_poller.RemoveHandle(m_mailboxHandle); + m_poller.Stop(); + } + /// /// Add the given socket to the list to be reaped (terminated). /// @@ -196,6 +209,6 @@ protected override void ProcessReaped() m_poller.RemoveHandle(m_mailboxHandle); m_poller.Stop(); } - } + } } } diff --git a/src/NetMQ/Core/Utils/Poller.cs b/src/NetMQ/Core/Utils/Poller.cs index 5f29b2c..5b1cf7b 100644 --- a/src/NetMQ/Core/Utils/Poller.cs +++ b/src/NetMQ/Core/Utils/Poller.cs @@ -135,8 +135,7 @@ public void Destroy() if (!m_stopped) { try - { - m_stopping = true; + { m_workerThread.Join(); } catch (Exception) diff --git a/src/NetMQ/Core/ZObject.cs b/src/NetMQ/Core/ZObject.cs index 4cd434c..1efa4d6 100644 --- a/src/NetMQ/Core/ZObject.cs +++ b/src/NetMQ/Core/ZObject.cs @@ -125,6 +125,11 @@ protected void SendStop() m_ctx.SendCommand(m_threadId, new Command(this, CommandType.Stop)); } + protected void SendForceStop() + { + m_ctx.SendCommand(m_threadId, new Command(this, CommandType.ForceStop)); + } + /// /// Send the Plug command, incrementing the destinations sequence-number if incSeqnum is true. /// @@ -322,6 +327,10 @@ public void ProcessCommand([NotNull] Command cmd) ProcessReaped(); break; + case CommandType.ForceStop: + ProcessForceStop(); + break; + default: throw new ArgumentException(); } @@ -333,6 +342,12 @@ protected virtual void ProcessStop() throw new NotSupportedException(); } + /// Not supported on the ZObject class. + protected virtual void ProcessForceStop() + { + throw new NotSupportedException(); + } + /// Not supported on the ZObject class. protected virtual void ProcessPlug() { diff --git a/src/NetMQ/NetMQConfig.cs b/src/NetMQ/NetMQConfig.cs index aaab9fb..51320b9 100644 --- a/src/NetMQ/NetMQConfig.cs +++ b/src/NetMQ/NetMQConfig.cs @@ -8,29 +8,48 @@ public static class NetMQConfig private static TimeSpan s_linger; private static Ctx s_ctx; - private static object s_settingsSync; - + private static int s_threadPoolSize = Ctx.DefaultIOThreads; + private static int s_maxSockets = Ctx.DefaultMaxSockets; + private static readonly object s_sync; + static NetMQConfig() - { - s_ctx = new Ctx(); - s_ctx.Block = false; - s_settingsSync = new object(); - s_linger = TimeSpan.Zero; - - // Register to destory the context when application exit - AppDomain.CurrentDomain.ProcessExit += OnCurrentDomainOnProcessExit; + { + s_sync = new object(); + s_linger = TimeSpan.Zero; } - private static void OnCurrentDomainOnProcessExit(object sender, EventArgs args) + internal static Ctx Context { - s_ctx.Terminate(); + get + { + lock (s_sync) + { + if (s_ctx == null) + { + s_ctx = new Ctx(); + s_ctx.IOThreadCount = s_threadPoolSize; + s_ctx.MaxSockets = s_maxSockets; + } + + return s_ctx; + } + } } - internal static Ctx Context + /// + /// Cleanup library resources, call this method when your process is shutting-down. + /// + /// Set to true when you want to make sure sockets send all pending messages + public static void Cleanup(bool block = true) { - get + lock (s_sync) { - return s_ctx; + if (s_ctx != null) + { + s_ctx.Block = block; + s_ctx.Terminate(); + s_ctx = null; + } } } @@ -52,14 +71,14 @@ public static TimeSpan Linger { get { - lock (s_settingsSync) + lock (s_sync) { return s_linger; } } set { - lock (s_settingsSync) + lock (s_sync) { s_linger = value; } @@ -72,10 +91,20 @@ public static TimeSpan Linger /// public static int ThreadPoolSize { - get { return s_ctx.IOThreadCount; } + get + { + lock (s_sync) + return s_threadPoolSize; + } set { - s_ctx.IOThreadCount = value; + lock (s_sync) + { + s_threadPoolSize = value; + + if (s_ctx != null) + s_ctx.IOThreadCount = value; + } } } @@ -86,9 +115,20 @@ public static int MaxSockets { get { - return s_ctx.MaxSockets; + lock (s_sync) + return s_maxSockets; + } + set + { + lock (s_sync) + { + s_maxSockets = value; + + if (s_ctx != null) + s_ctx.MaxSockets = value; + } + } - set { s_ctx.MaxSockets = value; } - } + } } }