Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2029,25 +2029,45 @@ public void onClientInternalException(final Throwable error) {
* @param error
*/
public void onAsyncException(Throwable error) {
if (!closed.get() && !closing.get()) {
if (this.exceptionListener != null) {

if (!(error instanceof JMSException)) {
error = JMSExceptionSupport.create(error);
if (this.exceptionListener != null) {
if (!(error instanceof JMSException)) {
error = JMSExceptionSupport.create(error);
}
final JMSException e = (JMSException) error;
// Submit directly to executor bypassing closed/closing guards
// to ensure exception notifications are never silently dropped
try {
executor.execute(() -> exceptionListener.onException(e));
} catch (final RejectedExecutionException re) {
LOG.debug("Could not notify exception listener asynchronously (executor terminated), notifying inline: {}", error.getMessage());
try {
exceptionListener.onException(e);
} catch (final Exception ex) {
LOG.debug("Exception during inline ExceptionListener notification", ex);
}
final JMSException e = (JMSException) error;
executeAsync(() -> exceptionListener.onException(e));

} else {
LOG.debug("Async exception with no exception listener: {}", error, error);
}
} else {
LOG.debug("Async exception with no exception listener: {}", error, error);
}
}

@Override
public void onException(final IOException error) {
onAsyncException(error);
executeAsync(() -> {
// Combine JMS ExceptionListener and TransportListener notifications
// into a single async task to prevent a race condition where the
// ExceptionListener (e.g. ConnectionPool) closes the connection and
// shuts down the executor before the TransportListener task is queued.
final Runnable exceptionTask = () -> {
// Notify JMS ExceptionListener first (same as onAsyncException)
if (exceptionListener != null) {
try {
final JMSException jmsError = JMSExceptionSupport.create(error);
exceptionListener.onException(jmsError);
} catch (final Exception e) {
LOG.debug("Exception during JMS ExceptionListener notification", e);
}
}

transportFailed(error);
ServiceSupport.dispose(ActiveMQConnection.this.transport);
brokerInfoReceived.countDown();
Expand All @@ -2059,7 +2079,16 @@ public void onException(final IOException error) {
for (final TransportListener listener : transportListeners) {
listener.onException(error);
}
});
};

// Submit directly to executor bypassing closed/closing guards
// to ensure transport failure handling is never silently dropped
try {
executor.execute(exceptionTask);
} catch (final RejectedExecutionException e) {
LOG.debug("Could not execute exception task asynchronously (executor terminated), executing inline");
exceptionTask.run();
}
}

@Override
Expand Down
Loading