Skip to content

Commit

Permalink
Merge pull request #2 from somdoron/PGM
Browse files Browse the repository at this point in the history
Fix PGM crashing on network reset
  • Loading branch information
drewnoakes committed Mar 3, 2016
2 parents 2755c5b + ce0a41e commit ec2cf11
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 21 deletions.
2 changes: 1 addition & 1 deletion src/NetMQ/Core/SessionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ private void StartConnecting(bool wait)
case Address.PgmProtocol:
case Address.EpgmProtocol:
{
var pgmSender = new PgmSender(m_ioThread, m_options, m_addr);
var pgmSender = new PgmSender(m_ioThread, m_options, m_addr, wait);
pgmSender.Init((PgmAddress)m_addr.Resolved);
SendAttach(this, pgmSender);
return;
Expand Down
128 changes: 109 additions & 19 deletions src/NetMQ/Core/Transports/Pgm/PgmSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,14 @@ namespace NetMQ.Core.Transports.Pgm
{
internal sealed class PgmSender : IOObject, IEngine, IProactorEvents
{
/// <summary>
/// ID of the timer used to delay the reconnection. Value is 1.
/// </summary>
private const int ReconnectTimerId = 1;

private readonly Options m_options;
private readonly Address m_addr;
private readonly bool m_delayedStart;
private readonly V1Encoder m_encoder;

private AsyncSocket m_socket;
Expand All @@ -24,6 +30,7 @@ internal sealed class PgmSender : IOObject, IEngine, IProactorEvents
private enum State
{
Idle,
Delaying,
Connecting,
Active,
ActiveSendingIdle,
Expand All @@ -32,17 +39,21 @@ private enum State

private State m_state;
private PgmAddress m_pgmAddress;
private SessionBase m_session;
private int m_currentReconnectIvl;

public PgmSender([NotNull] IOThread ioThread, [NotNull] Options options, [NotNull] Address addr)
public PgmSender([NotNull] IOThread ioThread, [NotNull] Options options, [NotNull] Address addr, bool delayedStart)
: base(ioThread)
{
m_options = options;
m_addr = addr;
m_delayedStart = delayedStart;
m_encoder = null;
m_outBuffer = null;
m_outBufferSize = 0;
m_writeSize = 0;
m_encoder = new V1Encoder(0, m_options.Endian);
m_currentReconnectIvl = m_options.ReconnectIvl;

m_state = State.Idle;
}
Expand All @@ -68,6 +79,7 @@ public void Init([NotNull] PgmAddress pgmAddress)

public void Plug(IOThread ioThread, SessionBase session)
{
m_session = session;
m_encoder.SetMsgSource(session);

// get the first message from the session because we don't want to send identities
Expand All @@ -81,16 +93,39 @@ public void Plug(IOThread ioThread, SessionBase session)
msg.Close();
}

AddSocket(m_socket);
AddSocket(m_socket);

if (!m_delayedStart)
{
StartConnecting();
}
else
{
m_state = State.Delaying;
AddTimer(GetNewReconnectIvl(), ReconnectTimerId);
}
}

private void StartConnecting()
{
m_state = State.Connecting;
m_socket.Connect(m_pgmAddress.Address);

try
{
m_socket.Connect(m_pgmAddress.Address);
}
catch (SocketException ex)
{
if (ex.SocketErrorCode == SocketError.InvalidArgument)
Error();
else
throw ex;
}
}

public void Terminate()
{
RemoveSocket(m_socket);
m_encoder.SetMsgSource(null);
Destroy();
}

public void ActivateOut()
Expand All @@ -108,6 +143,23 @@ public void ActivateIn()
Debug.Assert(false);
}

/// <summary>
/// This would be called when a timer expires, although here it only throws a NotSupportedException.
/// </summary>
/// <param name="id">an integer used to identify the timer (not used here)</param>
/// <exception cref="NotImplementedException">This method must not be called on instances of PgmSender.</exception>
public override void TimerEvent(int id)
{
if (m_state == State.Delaying)
{
StartConnecting();
}
else
{
Debug.Assert(false);
}
}

/// <summary>
/// This method is called when a message Send operation has been completed.
/// </summary>
Expand Down Expand Up @@ -142,9 +194,10 @@ public override void OutCompleted(SocketError socketError, int bytesTransferred)
}
else
{
Debug.Assert(false);

throw NetMQException.Create(socketError.ToErrorCode());
if (socketError == SocketError.ConnectionReset)
Error();
else
throw NetMQException.Create(socketError.ToErrorCode());
}
}
else
Expand Down Expand Up @@ -185,29 +238,66 @@ private void BeginSending()
}
catch (SocketException ex)
{
throw NetMQException.Create(ex.SocketErrorCode, ex);
if (ex.SocketErrorCode == SocketError.ConnectionReset)
Error();
else
throw NetMQException.Create(ex.SocketErrorCode, ex);
}
}

private void Error()
{
Debug.Assert(m_session != null);
m_session.Detach();
Destroy();
}

private void Destroy()
{
if (m_state == State.Delaying)
{
CancelTimer(ReconnectTimerId);
}

m_pgmSocket.Dispose();
RemoveSocket(m_socket);
m_encoder.SetMsgSource(null);
}

/// <summary>
/// This method would be called when a message receive operation has been completed, although here it only throws a NotSupportedException.
/// Internal function to return a reconnect back-off delay.
/// Will modify the current_reconnect_ivl used for next call
/// Returns the currently used interval
/// </summary>
/// <param name="socketError">a SocketError value that indicates whether Success or an error occurred</param>
/// <param name="bytesTransferred">the number of bytes that were transferred</param>
/// <exception cref="NotImplementedException">This method must not be called on instances of PgmSender.</exception>
public override void InCompleted(SocketError socketError, int bytesTransferred)
private int GetNewReconnectIvl()
{
throw new NotImplementedException();
// The new interval is the current interval + random value.
int thisInterval = m_currentReconnectIvl + new Random().Next(0, m_options.ReconnectIvl);

// Only change the current reconnect interval if the maximum reconnect
// interval was set and if it's larger than the reconnect interval.
if (m_options.ReconnectIvlMax > 0 &&
m_options.ReconnectIvlMax > m_options.ReconnectIvl)
{
// Calculate the next interval
m_currentReconnectIvl = m_currentReconnectIvl * 2;
if (m_currentReconnectIvl >= m_options.ReconnectIvlMax)
{
m_currentReconnectIvl = m_options.ReconnectIvlMax;
}
}
return thisInterval;
}

/// <summary>
/// This would be called when a timer expires, although here it only throws a NotSupportedException.
/// This method would be called when a message receive operation has been completed, although here it only throws a NotSupportedException.
/// </summary>
/// <param name="id">an integer used to identify the timer (not used here)</param>
/// <param name="socketError">a SocketError value that indicates whether Success or an error occurred</param>
/// <param name="bytesTransferred">the number of bytes that were transferred</param>
/// <exception cref="NotImplementedException">This method must not be called on instances of PgmSender.</exception>
public override void TimerEvent(int id)
public override void InCompleted(SocketError socketError, int bytesTransferred)
{
throw new NotImplementedException();
}
}
}
}
7 changes: 6 additions & 1 deletion src/NetMQ/Core/Transports/Pgm/PgmSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ internal enum PgmSocketType
/// This is only supported on Windows when Microsoft Message Queueing (MSMQ) is installed.
/// See RFC 3208.
/// </summary>
internal sealed class PgmSocket
internal sealed class PgmSocket : IDisposable
{
public const int ProtocolTypeNumber = 113;
public const ProtocolType PgmProtocolType = (ProtocolType)113;
Expand Down Expand Up @@ -202,5 +202,10 @@ public override string ToString()
sb.Append(m_options).Append(")");
return sb.ToString();
}

public void Dispose()
{
Handle.Dispose();
}
}
}

0 comments on commit ec2cf11

Please sign in to comment.