diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index b9f0eebf9a..cf1f0bdd07 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -2029,25 +2029,45 @@ public void onClientInternalException(final Throwable error) { * @param error */ public void onAsyncException(Throwable error) { - if (!closed.get() && !closing.get()) { - if (this.exceptionListener != null) { - - if (!(error instanceof JMSException)) { - error = JMSExceptionSupport.create(error); + if (this.exceptionListener != null) { + if (!(error instanceof JMSException)) { + error = JMSExceptionSupport.create(error); + } + final JMSException e = (JMSException) error; + // Submit directly to executor bypassing closed/closing guards + // to ensure exception notifications are never silently dropped + try { + executor.execute(() -> exceptionListener.onException(e)); + } catch (final RejectedExecutionException re) { + LOG.debug("Could not notify exception listener asynchronously (executor terminated), notifying inline: {}", error.getMessage()); + try { + exceptionListener.onException(e); + } catch (final Exception ex) { + LOG.debug("Exception during inline ExceptionListener notification", ex); } - final JMSException e = (JMSException) error; - executeAsync(() -> exceptionListener.onException(e)); - - } else { - LOG.debug("Async exception with no exception listener: {}", error, error); } + } else { + LOG.debug("Async exception with no exception listener: {}", error, error); } } @Override public void onException(final IOException error) { - onAsyncException(error); - executeAsync(() -> { + // Combine JMS ExceptionListener and TransportListener notifications + // into a single async task to prevent a race condition where the + // ExceptionListener (e.g. ConnectionPool) closes the connection and + // shuts down the executor before the TransportListener task is queued. + final Runnable exceptionTask = () -> { + // Notify JMS ExceptionListener first (same as onAsyncException) + if (exceptionListener != null) { + try { + final JMSException jmsError = JMSExceptionSupport.create(error); + exceptionListener.onException(jmsError); + } catch (final Exception e) { + LOG.debug("Exception during JMS ExceptionListener notification", e); + } + } + transportFailed(error); ServiceSupport.dispose(ActiveMQConnection.this.transport); brokerInfoReceived.countDown(); @@ -2059,7 +2079,16 @@ public void onException(final IOException error) { for (final TransportListener listener : transportListeners) { listener.onException(error); } - }); + }; + + // Submit directly to executor bypassing closed/closing guards + // to ensure transport failure handling is never silently dropped + try { + executor.execute(exceptionTask); + } catch (final RejectedExecutionException e) { + LOG.debug("Could not execute exception task asynchronously (executor terminated), executing inline"); + exceptionTask.run(); + } } @Override