diff --git a/Jenkinsfile b/Jenkinsfile
index 95fb388301c..ffba4787589 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -49,7 +49,6 @@ pipeline {
choice(name: 'nodeLabel', choices: [ 'ubuntu', 's390x', 'arm', 'Windows' ])
choice(name: 'jdkVersion', choices: ['jdk_17_latest', 'jdk_21_latest', 'jdk_24_latest', 'jdk_17_latest_windows', 'jdk_21_latest_windows', 'jdk_24_latest_windows'])
booleanParam(name: 'deployEnabled', defaultValue: false)
- booleanParam(name: 'parallelTestsEnabled', defaultValue: true)
booleanParam(name: 'sonarEnabled', defaultValue: false)
booleanParam(name: 'testsEnabled', defaultValue: true)
}
@@ -131,20 +130,12 @@ pipeline {
}
when { expression { return params.testsEnabled } }
steps {
+ echo 'Running tests'
sh 'java -version'
sh 'mvn -version'
-
// all tests is very very long (10 hours on Apache Jenkins)
// sh 'mvn -B -e test -pl activemq-unit-tests -Dactivemq.tests=all'
- script {
- if (params.parallelTestsEnabled == 'true') {
- sh 'echo "Running parallel-tests ..."'
- sh 'mvn -B -e -fae -Pparallel-tests test -Dsurefire.rerunFailingTestsCount=3'
- } else {
- sh 'echo "Running tests ..."'
- sh 'mvn -B -e -fae test -Dsurefire.rerunFailingTestsCount=3'
- }
- }
+ sh 'mvn -B -e -fae test -Dsurefire.rerunFailingTestsCount=3'
}
post {
always {
diff --git a/activemq-mqtt/pom.xml b/activemq-mqtt/pom.xml
index ac5398ac4e2..3c9d3f4659b 100644
--- a/activemq-mqtt/pom.xml
+++ b/activemq-mqtt/pom.xml
@@ -28,6 +28,7 @@
activemq-mqtt
jar
ActiveMQ :: MQTT Protocol
+
The ActiveMQ MQTT Protocol Implementation
@@ -212,21 +213,56 @@
maven-surefire-plugin
- 1
- false
-javaagent:${org.mockito:mockito-core:jar}
alphabetical
-
- target
-
-
-
- **/PahoMQTNioTTest.java
-
+ plain
+ org.apache.activemq.transport.mqtt.ParallelTest
+
+ true
+ true
+ false
+ ${project.build.directory}/
+
+
+ true
+
+
+ true
+ true
+ true
+ true
+ true
+ true
+
+
+ **/PahoMQTNioTTest.java
+
+
+
+ parallel
+ test
+
+ test
+
+
+
+
+ org.apache.activemq.transport.mqtt.ParallelTest
+ 2C
+ false
+ 600
+
+ ${project.build.directory}/parallel-tests-${surefire.forkNumber}/
+
+ 20000
+
+
+
+
+
org.apache.activemq.protobuf
activemq-protobuf
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java
index a7c790e7b3b..96acd0e6c43 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java
@@ -47,15 +47,16 @@
import org.fusesource.mqtt.codec.CONNACK;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* Tests various use cases that require authentication or authorization over MQTT
*/
+@Category(ParallelTest.class)
@RunWith(Parameterized.class)
public class MQTTAuthTest extends MQTTAuthTestSupport {
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCodecTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCodecTest.java
index 994bff4e670..62f367aa7ba 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCodecTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCodecTest.java
@@ -41,12 +41,14 @@
import org.fusesource.mqtt.codec.UNSUBSCRIBE;
import org.junit.Before;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests the functionality of the MQTTCodec class.
*/
+@Category(ParallelTest.class)
public class MQTTCodecTest {
private static final Logger LOG = LoggerFactory.getLogger(MQTTCodecTest.class);
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCompositeQueueRetainedTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCompositeQueueRetainedTest.java
index 2f7e7d7853a..0bb23dd0891 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCompositeQueueRetainedTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCompositeQueueRetainedTest.java
@@ -42,9 +42,12 @@
import org.apache.activemq.util.ByteSequence;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
/**
*
*/
+@Category(ParallelTest.class)
public class MQTTCompositeQueueRetainedTest extends MQTTTestSupport {
// configure composite topic
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java
index 59f122dc9ca..f3e89af8ddd 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java
@@ -35,6 +35,7 @@
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@@ -45,6 +46,7 @@
* Test that connection attempts that don't send a CONNECT frame will
* get cleaned up by the inactivity monitor.
*/
+@Category(ParallelTest.class)
@RunWith(Parameterized.class)
public class MQTTConnectTest extends MQTTTestSupport {
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java
index e5282b31ad5..73b304d9862 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java
@@ -31,9 +31,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.junit.experimental.categories.Category;
/**
* Test that the maxFrameSize configuration value is applied across the transports.
*/
+@Category(ParallelTest.class)
@RunWith(Parameterized.class)
public class MQTTMaxFrameSizeTest extends MQTTTestSupport {
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTest.java
index b6dd9f91581..e634f0da1af 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTest.java
@@ -16,9 +16,12 @@
*/
package org.apache.activemq.transport.mqtt;
+import org.junit.experimental.categories.Category;
+
/**
* Run the basic tests with the NIO Transport.
*/
+@Category(ParallelTest.class)
public class MQTTNIOSSLTest extends MQTTTest {
@Override
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNIOTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNIOTest.java
index abb5d6c4b82..9bb4b0ee9ac 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNIOTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNIOTest.java
@@ -16,9 +16,12 @@
*/
package org.apache.activemq.transport.mqtt;
+import org.junit.experimental.categories.Category;
+
/**
* Run the basic tests with the NIO Transport.
*/
+@Category(ParallelTest.class)
public class MQTTNIOTest extends MQTTTest {
@Override
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTOverlapedSubscriptionsTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTOverlapedSubscriptionsTest.java
index 6d75ab7e067..c550f683923 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTOverlapedSubscriptionsTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTOverlapedSubscriptionsTest.java
@@ -29,7 +29,9 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
+@Category(ParallelTest.class)
public class MQTTOverlapedSubscriptionsTest {
private BrokerService brokerService;
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTPingReqTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTPingReqTest.java
index 7e8f070c6ea..fc27ead4ee1 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTPingReqTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTPingReqTest.java
@@ -52,10 +52,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.junit.experimental.categories.Category;
/**
* Test to show that a PINGRESP will only be sent for a PINGREQ
* packet after a CONNECT packet has been received.
*/
+@Category(ParallelTest.class)
@RunWith(Parameterized.class)
public class MQTTPingReqTest extends MQTTTestSupport {
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverterTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverterTest.java
index 3a1fd20d8fb..0f1c93d3362 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverterTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverterTest.java
@@ -37,12 +37,14 @@
import org.fusesource.mqtt.codec.MQTTFrame;
import org.junit.Before;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
/**
* Tests for various usage scenarios of the protocol converter
*/
+@Category(ParallelTest.class)
public class MQTTProtocolConverterTest {
private MQTTTransport transport;
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
index 609c49d225b..8b41e0d2894 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
@@ -16,9 +16,12 @@
*/
package org.apache.activemq.transport.mqtt;
+import org.junit.experimental.categories.Category;
+
/**
* Run the basic tests with the NIO Transport.
*/
+@Category(ParallelTest.class)
public class MQTTSSLTest extends MQTTTest {
@Override
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSubscriptionRecoveryTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSubscriptionRecoveryTest.java
index 0b7f9581797..3696c399697 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSubscriptionRecoveryTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSubscriptionRecoveryTest.java
@@ -34,9 +34,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.junit.experimental.categories.Category;
/**
* Test that all previous QoS 2 subscriptions are recovered on Broker restart.
*/
+@Category(ParallelTest.class)
@RunWith(Parameterized.class)
public class MQTTSubscriptionRecoveryTest extends MQTTTestSupport {
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 7cf1523beef..b75b0499b9d 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -65,9 +65,13 @@
import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.PUBLISH;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@Category(ParallelTest.class)
public class MQTTTest extends MQTTTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
index 881ad634243..4c23cee7eee 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
@@ -39,6 +39,7 @@
import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.transport.mqtt.util.ResourceLoadingSslContext;
+import org.apache.activemq.util.IOHelper;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame;
@@ -53,8 +54,6 @@ public class MQTTTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(MQTTTestSupport.class);
- public static final String KAHADB_DIRECTORY = "target/activemq-data/";
-
protected BrokerService brokerService;
protected int port;
protected String jmsUri = "vm://localhost";
@@ -143,7 +142,7 @@ protected BrokerService createBroker(boolean deleteAllMessages) throws Exception
brokerService.setPersistent(isPersistent());
if (isPersistent()) {
KahaDBStore kaha = new KahaDBStore();
- kaha.setDirectory(new File(KAHADB_DIRECTORY + getTestName()));
+ kaha.setDirectory(new File(IOHelper.getDefaultDataDirectory() + "/" + getTestName()));
brokerService.setPersistenceAdapter(kaha);
}
brokerService.setAdvisorySupport(advisorySupport);
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
index d84ce894c1f..c7d601bb10b 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
@@ -42,6 +42,7 @@
import org.fusesource.mqtt.client.Topic;
import org.junit.Before;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +56,7 @@
/**
* Run the basic tests with the NIO Transport.
*/
+@Category(ParallelTest.class)
public class MQTTVirtualTopicSubscriptionsTest extends MQTTTest {
private static final Logger LOG = LoggerFactory.getLogger(MQTTVirtualTopicSubscriptionsTest.class);
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTWillTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTWillTest.java
index 0c81c5a47ef..48f955b2808 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTWillTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTWillTest.java
@@ -18,12 +18,14 @@
import org.fusesource.mqtt.client.*;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+@Category(ParallelTest.class)
public class MQTTWillTest extends MQTTTestSupport {
@Test(timeout = 60 * 1000)
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTNIOTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTNIOTest.java
index 88c8780df87..b5102bd5236 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTNIOTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTNIOTest.java
@@ -16,9 +16,12 @@
*/
package org.apache.activemq.transport.mqtt;
+import org.junit.experimental.categories.Category;
+
/**
* Test the NIO transport with this Test group
*/
+@Category(ParallelTest.class)
public class PahoMQTTNIOTest extends PahoMQTTTest {
@Override
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
index 362ded3dcc1..1c010d745d2 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
@@ -47,6 +47,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.junit.experimental.categories.Category;
+
+@Category(ParallelTest.class)
public class PahoMQTTTest extends MQTTTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(PahoMQTTTest.class);
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoVirtualTopicMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoVirtualTopicMQTTTest.java
index be9e8b3361d..1c3b527bc09 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoVirtualTopicMQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoVirtualTopicMQTTTest.java
@@ -22,12 +22,14 @@
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.junit.Before;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Session;
import static org.junit.Assert.assertEquals;
+@Category(ParallelTest.class)
public class PahoVirtualTopicMQTTTest extends PahoMQTTTest {
@Override
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/ParallelTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/ParallelTest.java
new file mode 100644
index 00000000000..dfeb9081264
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/ParallelTest.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+
+/**
+ * Marker interface used with {@code @Category(ParallelTest.class)} to opt a
+ * test class or method into the {@code all-parallel} Maven profile. Only tests
+ * explicitly tagged with this category execute when the profile is enabled,
+ * which allows a gradual migration toward full parallelism.
+ */
+public interface ParallelTest {
+}
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioSslTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioSslTest.java
index e777385f1b8..2db79bd928e 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioSslTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioSslTest.java
@@ -17,10 +17,13 @@
package org.apache.activemq.transport.mqtt.auto;
import org.apache.activemq.transport.mqtt.MQTTTest;
+import org.apache.activemq.transport.mqtt.ParallelTest;
+import org.junit.experimental.categories.Category;
/**
* Run the basic tests with the NIO Transport.
*/
+@Category(ParallelTest.class)
public class MQTTAutoNioSslTest extends MQTTTest {
@Override
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioTest.java
index f7023a3ba04..55fe032f0c1 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioTest.java
@@ -17,10 +17,13 @@
package org.apache.activemq.transport.mqtt.auto;
import org.apache.activemq.transport.mqtt.MQTTTest;
+import org.apache.activemq.transport.mqtt.ParallelTest;
+import org.junit.experimental.categories.Category;
/**
* Run the basic tests with the NIO Transport.
*/
+@Category(ParallelTest.class)
public class MQTTAutoNioTest extends MQTTTest {
@Override
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoSslAuthTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoSslAuthTest.java
index 4fae9c44c68..d2d731fa9a0 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoSslAuthTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoSslAuthTest.java
@@ -36,6 +36,10 @@
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
+import org.junit.experimental.categories.Category;
+import org.apache.activemq.transport.mqtt.ParallelTest;
+
+@Category(ParallelTest.class)
@RunWith(Parameterized.class)
public class MQTTAutoSslAuthTest extends MQTTTestSupport {
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoSslTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoSslTest.java
index e31f49495b5..0d490e1f122 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoSslTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoSslTest.java
@@ -17,10 +17,13 @@
package org.apache.activemq.transport.mqtt.auto;
import org.apache.activemq.transport.mqtt.MQTTTest;
+import org.apache.activemq.transport.mqtt.ParallelTest;
+import org.junit.experimental.categories.Category;
/**
* Run the basic tests with the NIO Transport.
*/
+@Category(ParallelTest.class)
public class MQTTAutoSslTest extends MQTTTest {
@Override
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoTest.java
index 7471f6e38f6..dcdf265df4a 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoTest.java
@@ -17,10 +17,13 @@
package org.apache.activemq.transport.mqtt.auto;
import org.apache.activemq.transport.mqtt.MQTTTest;
+import org.apache.activemq.transport.mqtt.ParallelTest;
+import org.junit.experimental.categories.Category;
/**
* Run the basic tests with the NIO Transport.
*/
+@Category(ParallelTest.class)
public class MQTTAutoTest extends MQTTTest {
@Override
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
index 8008f6fb93d..b0d45f8b2ce 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
@@ -43,6 +43,8 @@
import junit.framework.Test;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.QueueSubscription;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicSubscription;
@@ -254,26 +256,23 @@ public void testReceiveTopicWithPrefetch1() throws Exception {
}
final List subscriptions = getDestinationConsumers(broker, destination);
- Thread.sleep(1000);
- assertTrue("prefetch extension..",
+ assertTrue("prefetch extension..", Wait.waitFor(() ->
subscriptions.stream().
filter(s -> s instanceof TopicSubscription).
mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()).
- allMatch(e -> e == 4));
+ allMatch(e -> e == 4)
+ , TimeUnit.SECONDS.toMillis(5), 100));
assertNull(consumer.receiveNoWait());
message.acknowledge();
- assertTrue("prefetch extension back to 0", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return subscriptions.stream().
+ assertTrue("prefetch extension back to 0", Wait.waitFor(() ->
+ subscriptions.stream().
filter(s -> s instanceof TopicSubscription).
mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()).
- allMatch(e -> e == 0);
- }
- }));
+ allMatch(e -> e == 0)
+ ));
}
@@ -299,29 +298,23 @@ public void testReceiveQueueWithPrefetch1() throws Exception {
final List subscriptions = getDestinationConsumers(broker, destination);
- assertTrue("prefetch extension..", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return subscriptions.stream().
- filter(s -> s instanceof QueueSubscription).
- mapToInt(s -> ((QueueSubscription)s).getPrefetchExtension().get()).
- allMatch(e -> e == 4);
- }
- }));
+ assertTrue("prefetch extension..", Wait.waitFor(() ->
+ subscriptions.stream().
+ filter(s -> s instanceof QueueSubscription).
+ mapToInt(s -> ((QueueSubscription)s).getPrefetchExtension().get()).
+ allMatch(e -> e == 4)
+ ));
assertNull(consumer.receiveNoWait());
message.acknowledge();
- assertTrue("prefetch extension back to 0", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return subscriptions.stream().
+ assertTrue("prefetch extension back to 0", Wait.waitFor(() ->
+ subscriptions.stream().
filter(s -> s instanceof QueueSubscription).
mapToInt(s -> ((QueueSubscription)s).getPrefetchExtension().get()).
- allMatch(e -> e == 0);
- }
- }));
+ allMatch(e -> e == 0)
+ ));
}
public void initCombosForTestDurableConsumerSelectorChange() {
@@ -429,10 +422,9 @@ public void onMessage(Message m) {
});
assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
- Thread.sleep(200);
// Make sure only 4 messages were delivered.
- assertEquals(4, counter.get());
+ assertNoAdditionalMessages(counter, 4);
}
public void initCombosForTestPassMessageListenerIntoCreateConsumer() {
@@ -463,10 +455,9 @@ public void onMessage(Message m) {
sendMessages(session, destination, 4);
assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
- Thread.sleep(200);
// Make sure only 4 messages were delivered.
- assertEquals(4, counter.get());
+ assertNoAdditionalMessages(counter, 4);
}
public void initCombosForTestMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() {
@@ -551,7 +542,7 @@ public void onMessage(Message m) {
});
assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
- Thread.sleep(200);
+ assertNoAdditionalMessages(counter, 5);
// assert msg 2 was redelivered as close() from onMessages() will only ack in auto_ack and dups_ok mode
assertEquals(5, counter.get());
@@ -637,11 +628,9 @@ public void onMessage(Message m) {
});
assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
- Thread.sleep(200);
- // close from onMessage with Auto_ack will ack
// Make sure only 4 messages were delivered.
- assertEquals(4, counter.get());
+ assertNoAdditionalMessages(counter, 4);
}
public void initCombosForTestMessageListenerWithConsumerWithPrefetch1() {
@@ -676,10 +665,9 @@ public void onMessage(Message m) {
sendMessages(session, destination, 4);
assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
- Thread.sleep(200);
// Make sure only 4 messages were delivered.
- assertEquals(4, counter.get());
+ assertNoAdditionalMessages(counter, 4);
}
public void initCombosForTestMessageListenerWithConsumer() {
@@ -712,10 +700,9 @@ public void onMessage(Message m) {
sendMessages(session, destination, 4);
assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
- Thread.sleep(200);
// Make sure only 4 messages were delivered.
- assertEquals(4, counter.get());
+ assertNoAdditionalMessages(counter, 4);
}
public void initCombosForTestUnackedWithPrefetch1StayInQueue() {
@@ -795,19 +782,16 @@ public void testPrefetch1MessageNotDispatched() throws Exception {
MessageConsumer consumer2 = session2.createConsumer(destination);
// Wait for consumer2 to fully register with the broker
- assertTrue("consumer2 registered", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return getDestinationConsumers(broker, destination).size() == 2;
- }
- }, 5000));
+ assertTrue("consumer2 registered", Wait.waitFor(() ->
+ getDestinationConsumers(broker, destination).size() == 2
+ , TimeUnit.SECONDS.toMillis(5), 100));
// Pick up the first message.
- Message message1 = consumer.receive(1000);
+ Message message1 = consumer.receive(10_000);
assertNotNull(message1);
// Pick up the 2nd messages.
- Message message2 = consumer2.receive(5000);
+ Message message2 = consumer2.receive(10_000);
assertNotNull(message2);
session.commit();
@@ -1019,26 +1003,27 @@ public void testAckOfExpired() throws Exception {
Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = sendSession.createProducer(destination);
- producer.setTimeToLive(500);
+ final int ttl = 500;
+ producer.setTimeToLive(ttl);
final int count = 4;
for (int i = 0; i < count; i++) {
- TextMessage message = sendSession.createTextMessage("" + i);
+ final TextMessage message = sendSession.createTextMessage("" + i);
producer.send(message);
}
- // let first bunch in queue expire
- Thread.sleep(1000);
+ // let first bunch expire - messages expire based on TTL
+ Thread.sleep(ttl * 2L);
producer.setTimeToLive(0);
for (int i = 0; i < count; i++) {
- TextMessage message = sendSession.createTextMessage("no expiry" + i);
+ final TextMessage message = sendSession.createTextMessage("no expiry" + i);
producer.send(message);
}
- ActiveMQMessageConsumer amqConsumer = (ActiveMQMessageConsumer) consumer;
+ final ActiveMQMessageConsumer amqConsumer = (ActiveMQMessageConsumer) consumer;
for (int i=0; i {
+ final DestinationViewMBean view = createView(destination);
+ return view.getInFlightCount() == 0 &&
+ view.getDispatchCount() == 8 &&
+ view.getDequeueCount() == 8 &&
+ view.getExpiredCount() == 4;
+ }, TimeUnit.SECONDS.toMillis(5), 100));
}
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
@@ -1066,4 +1053,10 @@ protected DestinationViewMBean createView(ActiveMQDestination destination) throw
}
return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true);
}
+
+ private void assertNoAdditionalMessages(final AtomicInteger counter, final int expected) throws Exception {
+ assertFalse("unexpected additional messages received", Wait.waitFor(
+ (Wait.Condition) () -> counter.get() > expected,
+ TimeUnit.SECONDS.toMillis(2), 50));
+ }
}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
index 48c0ba098c7..4cfc6ea5d91 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
@@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
@@ -35,9 +36,11 @@
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.BaseDestination;
+import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.activemq.test.annotations.ParallelTest;
@@ -136,6 +139,16 @@ public void testBatchSendBrowseReceive() throws Exception {
producer.send(outbound[i]);
}
+ // Wait for messages to be fully processed by the broker before browsing
+ final int expectedCount = outbound.length;
+ assertTrue("messages arrived in queue", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ final Queue queueView = (Queue) broker.getDestination(destination);
+ return queueView != null && queueView.getDestinationStatistics().getMessages().getCount() == expectedCount;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100)));
+
QueueBrowser browser = session.createBrowser(destination);
Enumeration> enumeration = browser.getEnumeration();
@@ -149,6 +162,16 @@ public void testBatchSendBrowseReceive() throws Exception {
producer.send(outbound[i]);
}
+ // Wait for second batch of messages to be fully processed by the broker before browsing
+ final int expectedCount2 = outbound.length * 2;
+ assertTrue("second batch arrived in queue", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ final Queue queueView = (Queue) broker.getDestination(destination);
+ return queueView != null && queueView.getDestinationStatistics().getMessages().getCount() == expectedCount2;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100)));
+
// verify second batch is visible to browse
browser = session.createBrowser(destination);
enumeration = browser.getEnumeration();
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
index abc25438f3d..d4de2ee84af 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
@@ -26,6 +26,8 @@
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
+import java.util.concurrent.TimeUnit;
+
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
@@ -331,14 +333,24 @@ private void doTestManyMessageConsumerWithSend(boolean transacted) throws Except
// https://issues.apache.org/jira/browse/AMQ-4224
public void testBrokerZeroPrefetchConfig() throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(brokerZeroQueue);
+ final MessageProducer producer = session.createProducer(brokerZeroQueue);
producer.send(session.createTextMessage("Msg1"));
// now lets receive it
- MessageConsumer consumer = session.createConsumer(brokerZeroQueue);
+ final MessageConsumer consumer = session.createConsumer(brokerZeroQueue);
- TextMessage answer = (TextMessage)consumer.receive(5000);
+ // Wait for broker subscription to be created and policy applied (same as testBrokerZeroPrefetchConfigWithConsumerControl)
+ final ActiveMQDestination transformedDest = ActiveMQDestination.transform(brokerZeroQueue);
+ org.apache.activemq.util.Wait.waitFor(new org.apache.activemq.util.Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return broker.getRegionBroker().getDestinationMap().get(transformedDest) != null
+ && !broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().isEmpty();
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));
+
+ final TextMessage answer = (TextMessage)consumer.receive(TimeUnit.SECONDS.toMillis(5));
assertNotNull("Consumer should have read a message", answer);
assertEquals("Should have received a message!", answer.getText(), "Msg1");
}
@@ -358,7 +370,7 @@ public boolean isSatisified() throws Exception {
return broker.getRegionBroker().getDestinationMap().get(transformedDest) != null
&& !broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().isEmpty();
}
- }, 5000, 100);
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));
assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize());
diff --git a/pom.xml b/pom.xml
index 96c4d70bd61..71a7a461cb3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -131,6 +131,7 @@
1.45
3.8.6
3.5.3
+ 1.5.1
*
org.apache.activemq*
@@ -962,6 +963,7 @@
org.apache.maven.plugins
maven-surefire-plugin
+ ${maven-surefire-plugin-version}
true
1
@@ -973,6 +975,18 @@
-enableassertions ${surefire.argLine} ${maven.surefire.allow.securitymanager}
+
+
+ org.apache.maven.surefire
+ surefire-junit47
+ ${maven-surefire-plugin-version}
+
+
+ me.fabriciorby
+ maven-surefire-junit5-tree-reporter
+ ${maven-surefire-junit5-tree-reporter-version}
+
+
org.apache.felix