Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Reusable RabbitMQ Connection in Publisher #46

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,22 @@ route_key = "hello"
end)
```

You can also use named connection like this:

``` elixir
options = %{
url: "amqp://rabbitmq:5672",
connection_id: :publisher,
exchange: "test-exchange",
routing_key: "test-messages",
}

Tackle.publish("Hi!", options)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test that verifies this? It should probably publish a few times in a row and check the number of open connections.

```

This example will look for already opened connection with name `:publisher` and reuse it.
If the connection is not found, it will be created. Connections are managed by `Tackle.Connection`.

## Consuming messages from an exchange

First, declare a consumer module:
Expand Down
19 changes: 13 additions & 6 deletions lib/tackle.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,38 @@ defmodule Tackle do
exchange = options[:exchange]
routing_key = options[:routing_key]
exchange_opts = options[:exchange_opts] || []
connection_id = options[:connection_id] || :default

{_exchange_type, exchange_name} =
exchange
|> Tackle.Util.parse_exchange()

Logger.debug("Connecting to '#{url}'")
{:ok, connection} = AMQP.Connection.open(url)
{:ok, connection} = Tackle.Connection.open(connection_id, url)
channel = Tackle.Channel.create(connection)

try do
Tackle.Exchange.create(channel, exchange, exchange_opts)
Tackle.Exchange.publish(channel, exchange_name, message, routing_key)
after
AMQP.Channel.close(channel)
AMQP.Connection.close(connection)
Tackle.Util.cleanup(connection_id, connection, channel)
end
end

def republish(options) do
url = options[:url]
queue = options[:queue]
exchange = options[:exchange]
exchange_name = options[:exchange]
routing_key = options[:routing_key]
count = options[:count] || 1
connection_id = options[:connection_id] || :default

{:ok, connection} = Tackle.Connection.open(connection_id, url)
channel = Tackle.Channel.create(connection)

Tackle.Republisher.republish(url, queue, exchange, routing_key, count)
try do
Tackle.Republisher.republish(url, queue, exchange_name, routing_key, count)
after
Tackle.Util.cleanup(connection_id, connection, channel)
end
end
end
15 changes: 13 additions & 2 deletions lib/tackle/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ defmodule Tackle.Connection do
open_(name, url)
end

def open(url) do
Logger.debug("Connecting to '#{scrub_url(url)}'")

AMQP.Connection.open(url)
end

defp scrub_url(url) do
url
|> URI.parse()
|> Map.put(:userinfo, nil)
|> URI.to_string()
end

@doc """
Get a list of opened connections
"""
Expand Down Expand Up @@ -101,6 +114,4 @@ defmodule Tackle.Connection do
defp validate_connection_process_rh(_alive? = false, _connection, _name) do
{:error, :no_process}
end

def open(url), do: AMQP.Connection.open(url)
end
4 changes: 3 additions & 1 deletion lib/tackle/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ defmodule Tackle.Consumer do
delay_queue: delay_queue,
dead_queue: dead_queue,
retry_limit: retry_limit,
consumer_tag: consumer_tag
consumer_tag: consumer_tag,
connection_id: connection_id
}

{:ok, state}
Expand Down Expand Up @@ -189,6 +190,7 @@ defmodule Tackle.Consumer do
retry_count = Tackle.DelayedRetry.retry_count_from_headers(headers)

options = [
connection_id: state.connection_id,
persistent: true,
headers: [
retry_count: retry_count + 1
Expand Down
7 changes: 3 additions & 4 deletions lib/tackle/delayed_retry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@ defmodule Tackle.DelayedRetry do
def retry_count_from_headers([_ | tail]), do: retry_count_from_headers(tail)

def publish(url, queue, payload, options) do
Logger.info("Connecting to '#{url}'")
connection_id = options[:connection_id] || :default

{:ok, connection} = AMQP.Connection.open(url)
{:ok, connection} = Tackle.Connection.open(connection_id, url)
{:ok, channel} = Channel.open(connection)

try do
:ok = AMQP.Basic.publish(channel, "", queue, payload, options)
after
AMQP.Channel.close(channel)
AMQP.Connection.close(connection)
Tackle.Util.cleanup(connection_id, connection, channel)
end
end
end
22 changes: 13 additions & 9 deletions lib/tackle/republisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@ defmodule Tackle.Republisher do
use AMQP
require Logger

def republish(url, queue, exchange, routing_key, count) do
Logger.info("Connecting to '#{url}'")
{:ok, connection} = AMQP.Connection.open(url)
@deprecated "Use Tackle.republish/1 instead"
def republish(url, queue, exchange, routing_key, count) when is_binary(url) do
connection_id = :default
{:ok, connection} = Tackle.Connection.open(connection_id, url)
channel = Tackle.Channel.create(connection)

try do
0..(count - 1)
|> Enum.each(fn idx ->
republish_one_message(channel, queue, exchange, routing_key, idx)
end)
republish(channel, queue, exchange, routing_key, count)
after
AMQP.Channel.close(channel)
AMQP.Connection.close(connection)
Tackle.Util.cleanup(connection_id, connection, channel)
end
end

def republish(channel, queue, exchange, routing_key, count) do
0..(count - 1)
|> Enum.each(fn idx ->
republish_one_message(channel, queue, exchange, routing_key, idx)
end)
end

defp republish_one_message(channel, queue, exchange, routing_key, idx) do
Logger.info("(#{idx}) Fetching message... from '#{inspect(queue)}' queue")

Expand Down
9 changes: 9 additions & 0 deletions lib/tackle/util.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,13 @@ defmodule Tackle.Util do
name -> {:direct, name}
end
end

def cleanup(:default, connection, channel) do
AMQP.Channel.close(channel)
AMQP.Connection.close(connection)
end

def cleanup(_, _, channel) do
AMQP.Channel.close(channel)
end
end
7 changes: 2 additions & 5 deletions test/integration/republish_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ defmodule Tackle.RepublishTest do
#

MessageTrace.clear("fixed-service")
{:ok, _} = FixedConsumer.start_link()
{:ok, fixed_consumer} = FixedConsumer.start_link()
:timer.sleep(1000)

Tackle.republish(%{
Expand All @@ -99,13 +99,10 @@ defmodule Tackle.RepublishTest do
count: 2
})

GenServer.stop(fixed_consumer)
:timer.sleep(2000)
end

# Since bumping the `amqp` dependency from 1.1.0 - the process is not connecting fast enough to the queue.
# This causes the test to fail. I'm not sure why this is happening, but I'm skipping the test for now.
@tag :skip
@tag :fixme
test "consumes only two messages" do
assert MessageTrace.content("fixed-service") == "Hi there!"
end
Expand Down