Skip to content

Commit

Permalink
Safer close, heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
eduard93 committed Dec 9, 2020
1 parent bd1ba37 commit 5921dee
Showing 1 changed file with 40 additions and 5 deletions.
45 changes: 40 additions & 5 deletions src/isc/rabbitmq/API.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,14 @@ public API(String host, int port, String user, String pass, String virtualHost,
factory.setUsername(user);
factory.setPassword(pass);
factory.setVirtualHost(virtualHost);
//factory.setAutomaticRecoveryEnabled(true);
}

//factory.setAutomaticRecoveryEnabled(true);
factory.setRequestedHeartbeat(0);


_connection = factory.newConnection();

_channel = _connection.createChannel();
try {
// Do we need to declare queue?
Expand All @@ -50,7 +54,7 @@ public API(String host, int port, String user, String pass, String virtualHost,
// Check that queue exists
// Method throws exception if queue does not exist or is exclusive
// Correct exception text: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'queue'
com.rabbitmq.client.AMQP.Queue.DeclareOk declareOk = _channel.queueDeclarePassive(queue);
AMQP.Queue.DeclareOk declareOk = _channel.queueDeclarePassive(queue);
}
} catch (java.io.IOException ex) {
// Exception closes the channel.
Expand All @@ -70,8 +74,34 @@ public API(String host, int port, String user, String pass, String virtualHost,

}

if (exchange != null) {
_exchange = exchange;
try {
AMQP.Exchange.DeclareOk declareOk = _channel.exchangeDeclarePassive(exchange);
} catch (java.io.IOException ex) {
// Exception closes the channel.
// So we need to create new one.
// _channel.basicRecover() doesn't do the trick
_channel = _connection.createChannel();

Boolean durableBool = (durable != 0);
Boolean autoDelete = false;
Boolean passive = false;
// exchange - name of the exchange
// type - direct, topic, fanout, headers. See https://lostechies.com/derekgreer/2012/03/28/rabbitmq-for-windows-exchange-types/
// passive - if true, works the same as exchangeDeclarePassive
// durable - true if we are declaring a durable exchange (the exchange will survive a server restart)
// autoDelete - true if we are declaring an autodelete exchange (server will delete it when no longer in use)
// arguments - other properties (construction arguments) for the exchange

AMQP.Exchange.DeclareOk declareOk = _channel.exchangeDeclare(exchange, "direct", passive, durableBool, autoDelete, null); // , exclusive, autoDelete, null
}
} else {
_exchange = "";
}

_queue = queue;
_exchange = exchange != null ? exchange : "";
//_exchange = exchange != null ? exchange : "";
}

public void sendMessageId(byte[] msg, String correlationId, String messageId) throws Exception {
Expand Down Expand Up @@ -160,8 +190,13 @@ public Boolean isOpen()
}

public void close()throws Exception {
_channel.close();
_connection.close();
try {
_channel.close();
} catch ( Exception ex) {}

try {
_connection.close();
} catch ( Exception ex) {}
}

private AMQP.BasicProperties createProperties(String correlationId, String messageId) throws Exception
Expand Down

0 comments on commit 5921dee

Please sign in to comment.