Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@

public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTestSupport {
protected SyncCreateDataSource sharedDs;
protected String MASTER_URL = "tcp://localhost:62001";
protected String SLAVE_URL = "tcp://localhost:62002";
protected String findStatement;

protected void setUp() throws Exception {
Expand All @@ -55,7 +53,7 @@ protected void tearDown() throws Exception {
protected void createMaster() throws Exception {
master = new BrokerService();
master.setBrokerName("master");
master.addConnector(MASTER_URL);
master.addConnector("tcp://localhost:0");
master.setUseJmx(false);
master.setPersistent(true);
master.setDeleteAllMessagesOnStartup(true);
Expand Down Expand Up @@ -83,7 +81,7 @@ public void run() {
BrokerService broker = new BrokerService();
broker.setBrokerName("slave");
TransportConnector connector = new TransportConnector();
connector.setUri(new URI(SLAVE_URL));
connector.setUri(new URI("tcp://localhost:" + masterPort));
broker.addConnector(connector);
// no need for broker.setMasterConnectorURI(masterConnectorURI)
// as the db lock provides the slave/master initialisation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,14 @@ abstract public class QueueMasterSlaveTestSupport extends JmsTopicSendReceiveWit
protected CountDownLatch slaveStarted;
protected int inflightMessageCount;
protected int failureCount = 50;
protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false&useExponentialBackOff=false";
protected int masterPort;
protected String uriString;

@Override
protected void setUp() throws Exception {
// Use ephemeral port for XML-based broker configs
System.setProperty("masterPort", "0");

slaveStarted = new CountDownLatch(1);
slave.set(null);
setMaxTestTime(TimeUnit.MINUTES.toMillis(10));
Expand All @@ -64,6 +68,15 @@ protected void setUp() throws Exception {
failureCount = super.messageCount / 2;
super.topic = isTopic();
createMaster();

// Get the actual port assigned by the OS after master starts
masterPort = master.getTransportConnectors().get(0).getConnectUri().getPort();
uriString = "failover://(tcp://localhost:" + masterPort
+ ")?randomize=false&useExponentialBackOff=false";

// Slave reuses the same port so failover reconnects to the same address
System.setProperty("slavePort", String.valueOf(masterPort));

createSlave();
// wait for thing to connect
Thread.sleep(1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@

public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSupport {
protected DataSource sharedDs;
protected String MASTER_URL = "tcp://localhost:62001";
protected String SLAVE_URL = "tcp://localhost:62002";
File sharedDbDirFile;

@Override
Expand All @@ -54,7 +52,7 @@ protected void tearDown() throws Exception {
protected void createMaster() throws Exception {
master = new BrokerService();
master.setBrokerName("master");
master.addConnector(MASTER_URL);
master.addConnector("tcp://localhost:0");
master.setUseJmx(false);
master.setPersistent(true);
master.setDeleteAllMessagesOnStartup(true);
Expand Down Expand Up @@ -87,7 +85,7 @@ public void run() {
BrokerService broker = new BrokerService();
broker.setBrokerName("slave");
TransportConnector connector = new TransportConnector();
connector.setUri(new URI(SLAVE_URL));
connector.setUri(new URI("tcp://localhost:" + masterPort));
broker.addConnector(connector);
broker.setUseJmx(false);
broker.setPersistent(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@
import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;

public class mKahaDbQueueMasterSlaveTest extends QueueMasterSlaveTestSupport {
protected String MASTER_URL = "tcp://localhost:62001";
protected String SLAVE_URL = "tcp://localhost:62002";

protected void createMaster() throws Exception {
master = new BrokerService();
master.setBrokerName("master");
master.addConnector(MASTER_URL);
master.addConnector("tcp://localhost:0");
master.setUseJmx(false);
master.setPersistent(true);
master.setDeleteAllMessagesOnStartup(true);
Expand All @@ -59,7 +57,7 @@ public void run() {
BrokerService broker = new BrokerService();
broker.setBrokerName("slave");
TransportConnector connector = new TransportConnector();
connector.setUri(new URI(SLAVE_URL));
connector.setUri(new URI("tcp://localhost:" + masterPort));
broker.addConnector(connector);
// no need for broker.setMasterConnectorURI(masterConnectorURI)
// as the db lock provides the slave/master initialisation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,30 @@
public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport {
static final String payload = new String(new byte[10 * 1024]);
private static final Logger LOG = LoggerFactory.getLogger(AMQ4485LowLimitTest.class);
final int portBase = 61600;
int numBrokers = 4;
final int[] brokerPorts = new int[numBrokers];
final int numProducers = 10;
final int numMessages = 200;
final int consumerSleepTime = 5;
StringBuilder brokersUrl = new StringBuilder();
HashMap<ActiveMQQueue, AtomicInteger> accumulators = new HashMap<ActiveMQQueue, AtomicInteger>();
private ArrayList<Throwable> exceptions = new ArrayList<Throwable>();

protected void buildUrlList() throws Exception {
protected void collectBrokerPorts() throws Exception {
for (int i = 0; i < numBrokers; i++) {
brokersUrl.append("tcp://localhost:" + (portBase + i));
final BrokerService broker = brokers.get("B" + i).broker;
brokerPorts[i] = broker.getTransportConnectors().get(0).getConnectUri().getPort();
}
}

protected String buildBrokersUrl() {
final StringBuilder sb = new StringBuilder();
for (int i = 0; i < numBrokers; i++) {
sb.append("tcp://localhost:").append(brokerPorts[i]);
if (i != numBrokers - 1) {
brokersUrl.append(',');
sb.append(',');
}
}
return sb.toString();
}

protected BrokerService createBroker(int brokerid) throws Exception {
Expand All @@ -95,11 +103,8 @@ protected BrokerService createBroker(int brokerid, boolean addToNetwork) throws

broker.setUseJmx(true);
broker.setBrokerName("B" + brokerid);
broker.addConnector(new URI("tcp://localhost:" + (portBase + brokerid)));
broker.addConnector(new URI("tcp://localhost:0"));

if (addToNetwork) {
addNetworkConnector(broker);
}
broker.setSchedulePeriodForDestinationPurge(0);
broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024l);

Expand Down Expand Up @@ -132,12 +137,11 @@ protected BrokerService createBroker(int brokerid, boolean addToNetwork) throws
return broker;
}

private void addNetworkConnector(BrokerService broker) throws Exception {
StringBuilder networkConnectorUrl = new StringBuilder("static:(").append(brokersUrl.toString());
networkConnectorUrl.append(')');
private void addNetworkConnector(BrokerService broker, String brokersUrl) throws Exception {
final String networkConnectorUrl = "static:(" + brokersUrl + ")";

for (int i = 0; i < 2; i++) {
NetworkConnector nc = new DiscoveryNetworkConnector(new URI(networkConnectorUrl.toString()));
final NetworkConnector nc = new DiscoveryNetworkConnector(new URI(networkConnectorUrl));
nc.setName("Bridge-" + i);
nc.setNetworkTTL(1);
nc.setDecreaseNetworkConsumerPriority(true);
Expand All @@ -146,6 +150,7 @@ private void addNetworkConnector(BrokerService broker) throws Exception {
nc.setDynamicallyIncludedDestinations(
Arrays.asList(new ActiveMQDestination[]{new ActiveMQQueue("GW.*")}));
broker.addNetworkConnector(nc);
broker.startNetworkConnector(nc, null);
}
}

Expand All @@ -156,7 +161,8 @@ public void x_testInterleavedSend() throws Exception {
BrokerService b = createBroker(0, false);
b.start();

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + (portBase + 0));
final int port = b.getTransportConnectors().get(0).getConnectUri().getPort();
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + port);
connectionFactory.setWatchTopicAdvisories(false);

QueueConnection c1 = connectionFactory.createQueueConnection();
Expand Down Expand Up @@ -207,13 +213,19 @@ public void run() {

public void testBrokers() throws Exception {

buildUrlList();

for (int i = 0; i < numBrokers; i++) {
createBroker(i);
}

startAllBrokers();

// Get actual ports after brokers start and add network connectors
collectBrokerPorts();
final String brokersUrl = buildBrokersUrl();
for (int i = 0; i < numBrokers; i++) {
addNetworkConnector(brokers.get("B" + i).broker, brokersUrl);
}

waitForBridgeFormation(numBrokers - 1);

verifyPeerBrokerInfos(numBrokers - 1);
Expand Down Expand Up @@ -266,7 +278,7 @@ public boolean isSatisified() throws Exception {

private void startConsumer(String brokerName, ActiveMQDestination destination) throws Exception {
int id = Integer.parseInt(brokerName.substring(1));
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + (portBase + id));
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + brokerPorts[id]);
connectionFactory.setWatchTopicAdvisories(false);
QueueConnection queueConnection = connectionFactory.createQueueConnection();
queueConnection.start();
Expand Down Expand Up @@ -303,7 +315,7 @@ private void startAllGWFanoutConsumers(int nBrokers) throws Exception {
ActiveMQQueue compositeQ = new ActiveMQQueue(compositeDest.toString());

for (int id = 0; id < nBrokers; id++) {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + brokerPorts[id] + ")");
connectionFactory.setWatchTopicAdvisories(false);

QueueConnection queueConnection = connectionFactory.createQueueConnection();
Expand All @@ -330,7 +342,7 @@ public void onMessage(Message message) {
private List<ConsumerState> startAllGWConsumers(int nBrokers) throws Exception {
List<ConsumerState> consumerStates = new LinkedList<ConsumerState>();
for (int id = 0; id < nBrokers; id++) {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + brokerPorts[id] + ")");
connectionFactory.setWatchTopicAdvisories(false);

QueueConnection queueConnection = connectionFactory.createQueueConnection();
Expand Down Expand Up @@ -393,7 +405,7 @@ private void produce(final int numMessages) throws Exception {
@Override
public void run() {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + brokerPorts[id] + ")");
connectionFactory.setWatchTopicAdvisories(false);
QueueConnection queueConnection = connectionFactory.createQueueConnection();
queueConnection.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,30 @@
public class AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest extends JmsMultipleBrokersTestSupport {
static final String payload = new String(new byte[10 * 1024]);
private static final Logger LOG = LoggerFactory.getLogger(AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.class);
final int portBase = 61600;
final int numBrokers = 4;
final int[] brokerPorts = new int[numBrokers];
final int numProducers = 10;
final int numMessages = 800;
final int consumerSleepTime = 20;
StringBuilder brokersUrl = new StringBuilder();
HashMap<ActiveMQQueue, AtomicInteger> accumulators = new HashMap<ActiveMQQueue, AtomicInteger>();
private ArrayList<Throwable> exceptions = new ArrayList<Throwable>();

protected void buildUrlList() throws Exception {
protected void collectBrokerPorts() throws Exception {
for (int i = 0; i < numBrokers; i++) {
brokersUrl.append("tcp://localhost:" + (portBase + i));
final BrokerService broker = brokers.get("B" + i).broker;
brokerPorts[i] = broker.getTransportConnectors().get(0).getConnectUri().getPort();
}
}

protected String buildBrokersUrl() {
final StringBuilder sb = new StringBuilder();
for (int i = 0; i < numBrokers; i++) {
sb.append("tcp://localhost:").append(brokerPorts[i]);
if (i != numBrokers - 1) {
brokersUrl.append(',');
sb.append(',');
}
}
return sb.toString();
}

protected BrokerService createBroker(int brokerid) throws Exception {
Expand All @@ -86,9 +94,8 @@ protected BrokerService createBroker(int brokerid) throws Exception {

broker.setUseJmx(true);
broker.setBrokerName("B" + brokerid);
broker.addConnector(new URI("tcp://localhost:" + (portBase + brokerid)));
broker.addConnector(new URI("tcp://localhost:0"));

addNetworkConnector(broker);
broker.setSchedulePeriodForDestinationPurge(0);
broker.getSystemUsage().setSendFailIfNoSpace(true);
broker.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024);
Expand All @@ -113,12 +120,11 @@ protected BrokerService createBroker(int brokerid) throws Exception {
return broker;
}

private void addNetworkConnector(BrokerService broker) throws Exception {
StringBuilder networkConnectorUrl = new StringBuilder("static:(").append(brokersUrl.toString());
networkConnectorUrl.append(')');
private void addNetworkConnector(BrokerService broker, String brokersUrl) throws Exception {
final String networkConnectorUrl = "static:(" + brokersUrl + ")";

for (int i = 0; i < 2; i++) {
NetworkConnector nc = new DiscoveryNetworkConnector(new URI(networkConnectorUrl.toString()));
final NetworkConnector nc = new DiscoveryNetworkConnector(new URI(networkConnectorUrl));
nc.setName("Bridge-" + i);
nc.setNetworkTTL(1);
nc.setDecreaseNetworkConsumerPriority(true);
Expand All @@ -127,18 +133,25 @@ private void addNetworkConnector(BrokerService broker) throws Exception {
nc.setDynamicallyIncludedDestinations(
Arrays.asList(new ActiveMQDestination[]{new ActiveMQQueue("GW.*")}));
broker.addNetworkConnector(nc);
broker.startNetworkConnector(nc, null);
}
}

public void testBrokers() throws Exception {

buildUrlList();

for (int i = 0; i < numBrokers; i++) {
createBroker(i);
}

startAllBrokers();

// Get actual ports after brokers start and add network connectors
collectBrokerPorts();
final String brokersUrl = buildBrokersUrl();
for (int i = 0; i < numBrokers; i++) {
addNetworkConnector(brokers.get("B" + i).broker, brokersUrl);
}

waitForBridgeFormation(numBrokers - 1);

verifyPeerBrokerInfos(numBrokers - 1);
Expand Down Expand Up @@ -192,7 +205,7 @@ private void startAllGWFanoutConsumers(int nBrokers) throws Exception {
ActiveMQQueue compositeQ = new ActiveMQQueue(compositeDest.toString());

for (int id = 0; id < nBrokers; id++) {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + brokerPorts[id] + ")");
connectionFactory.setWatchTopicAdvisories(false);

QueueConnection queueConnection = connectionFactory.createQueueConnection();
Expand All @@ -219,7 +232,7 @@ public void onMessage(Message message) {
private List<ConsumerState> startAllGWConsumers(int nBrokers) throws Exception {
List<ConsumerState> consumerStates = new LinkedList<ConsumerState>();
for (int id = 0; id < nBrokers; id++) {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + brokerPorts[id] + ")");
connectionFactory.setWatchTopicAdvisories(false);

QueueConnection queueConnection = connectionFactory.createQueueConnection();
Expand Down Expand Up @@ -281,7 +294,7 @@ private void produce(int numMessages) throws Exception {
@Override
public void run() {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + brokerPorts[id] + ")");
connectionFactory.setWatchTopicAdvisories(false);
QueueConnection queueConnection = connectionFactory.createQueueConnection();
queueConnection.start();
Expand Down
Loading