Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Several small changes, resolves #20 and resolves #21 #28

Closed
wants to merge 43 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
deae778
Adds config per environment
arfl Jul 29, 2018
5bf365b
After formatting
arfl Jul 29, 2018
6d43ef8
Cleans up after broken_consumer_raises_test
arfl Jul 29, 2018
0d5ef98
Cleans up after broken_consumer_signals_test
arfl Jul 29, 2018
fda4967
Makes tests less noisz
arfl Jul 29, 2018
b5ac2f9
Cleans up after broken_consumer_test
arfl Jul 29, 2018
7c5a403
Cleans up after broken_consumer_throws_test
arfl Jul 29, 2018
7479eb2
Cleans up after broken_dead_queue_test
arfl Jul 29, 2018
386c382
Cleans up after broken_healthy_consumer_test
arfl Jul 29, 2018
a7f02ac
Cleans up after broken_multiple_services_test
arfl Jul 29, 2018
4794d5a
Removes unneeded aliases
arfl Jul 29, 2018
91d02e6
Cleans up after prefetch_count_2_test
arfl Jul 29, 2018
1851492
Cleans up after prefetch_count_test
arfl Jul 29, 2018
6201ef9
Cleans up after republish_test
arfl Jul 29, 2018
90d8711
Cleans up after shared_connection_test
arfl Jul 29, 2018
5e53467
Cleans up after unit tests
arfl Jul 29, 2018
c300d03
Makes debug info on rabbitmq url to not leak credentials
arfl Jul 29, 2018
588e12e
Dry up test helper
arfl Jul 31, 2018
191613d
Uses Logger.debug instead of info for debug output
arfl Jul 31, 2018
997367e
Allows to specify the type of generated exchanges
arfl Jul 31, 2018
5cb45fb
AMQP.Basic.publish assumes the payload as a binary
elitau Aug 3, 2018
fcd69d3
Added MIT License
elitau Aug 27, 2018
401c4e6
Resolves merge conflict
arfl Sep 2, 2018
ccde4cd
Use sudo for rabbitmqctl executable
arfl Sep 3, 2018
405b2b9
Revert "Use sudo for rabbitmqctl executable"
arfl Sep 19, 2018
21154ab
on_error callback fuer consumer
arfl Sep 19, 2018
17ca493
Throws elixir errors only
arfl Sep 19, 2018
327cdb2
Fix false order of arguments
arfl Sep 21, 2018
ef5b5f5
Terminate callback on consumer
arfl Mar 9, 2019
fcd54af
Connection reset
arfl Mar 10, 2019
29690ea
Removes old Supervisor.Spec import
arfl Mar 10, 2019
7e49848
Better variable names and time to cleanup
arfl Mar 10, 2019
63f5284
Function header, so we do not get warnings
arfl Mar 10, 2019
3f0b691
Function header, so we do not get warnings
arfl Mar 10, 2019
45d7043
Function header, so we do not get warnings
arfl Mar 10, 2019
45fab7d
Revert "Function header, so we do not get warnings"
arfl Mar 10, 2019
6882a37
Close channel before closing a connection
arfl Jun 6, 2019
6d70e16
Increased the dead letter queue message ttl
arfl Aug 12, 2019
1608fb0
Increased the dead letter queue message ttl
arfl Aug 12, 2019
9b754e9
Function call needs parens
arfl Sep 10, 2019
546f84f
Allow retry with a throw(:retry)
arfl Sep 10, 2019
16470e6
After mix format
arfl Sep 10, 2019
66e40c7
Merge branch 'error_callback'
arfl Nov 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Used by "mix format"
[
inputs: ["mix.exs", "{config,lib}/**/*.{ex,exs}"]
]
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,9 @@ defmodule Consumer do
connection_id: :connection_identifier,
...
```
#### Specify generated exchanges type

In your `config.exs` put:
```
config :tackle, exchange_type: :topic
```
5 changes: 4 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ use Mix.Config
# config :logger, level: :info
#

# :direct || :fanout || :topic || :headers || :match
# config :tackle, exchange_type: :topic

# It is also possible to import configuration files, relative to this
# directory. For example, you can emulate configuration per environment
# by uncommenting the line below and defining dev.exs, test.exs and such.
# Configuration from the imported file will override the ones defined
# here (which is why it is important to import them last).
#
# import_config "#{Mix.env}.exs"
import_config "#{Mix.env()}.exs"
1 change: 1 addition & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
use Mix.Config
8 changes: 8 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use Mix.Config

config :logger,
backends: [{LoggerFileBackend, :error_log}]

config :logger, :error_log,
path: "/dev/null",
level: :error
15 changes: 7 additions & 8 deletions lib/tackle.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,31 @@ defmodule Tackle do

require Logger

@impl Application
def start(_type, _args) do
import Supervisor.Spec, warn: false

children = [
worker(Tackle.Connection, []),
Tackle.Connection
]

opts = [strategy: :one_for_one, name: Tackle.Supervisor]
Supervisor.start_link(children, opts)
end

def publish(message, options) do
def publish(message, options) when is_binary(message) do
url = options[:url]
exchange = options[:exchange]
routing_key = options[:routing_key]

Logger.debug "Connecting to '#{url}'"
Logger.debug("Connecting to '#{Tackle.DebugHelper.safe_uri(url)}'")
{:ok, connection} = AMQP.Connection.open(url)
channel = Tackle.Channel.create(connection)

Logger.debug "Declaring an exchange '#{exchange}'"
AMQP.Exchange.direct(channel, exchange, durable: true)
Logger.debug("Declaring an exchange '#{exchange}'")
Tackle.Exchange.create_remote_exchange(channel, exchange)

AMQP.Basic.publish(channel, exchange, routing_key, message, persistent: true)

AMQP.Channel.close(channel)
AMQP.Connection.close(connection)
end

Expand All @@ -40,5 +40,4 @@ defmodule Tackle do

Tackle.Republisher.republish(url, queue, exchange, routing_key, count)
end

end
13 changes: 11 additions & 2 deletions lib/tackle/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@ defmodule Tackle.Channel do

@prefetch_count 1

def create(connection) do create(connection, @prefetch_count) end
def create(connection, nil) do create(connection, @prefetch_count) end
def create(connection) do
create(connection, @prefetch_count)
end

def create(connection, nil) do
create(connection, @prefetch_count)
end

def create(connection, prefetch_count) do
{:ok, channel} = Channel.open(connection)

Expand All @@ -14,4 +20,7 @@ defmodule Tackle.Channel do
channel
end

def close(channel) do
Channel.close(channel)
end
end
64 changes: 48 additions & 16 deletions lib/tackle/connection.ex
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
defmodule Tackle.Connection do
use Agent
require Logger

@moduledoc """
Holds established connections.
Each connection is identifed by name.

Connection name ':default' is speciall: it is NOT persisted ->
Connection name ':default' is special: it is NOT persisted ->
each open() call with :default connection name opens new connection
(to preserve current behaviour).
"""

def start_link do
Agent.start_link(fn -> %{} end, name: __MODULE__)
def start_link, do: start_link([])

def start_link(opts) do
{cache, opts} = Keyword.pop(opts, :initial_value, %{})
opts = Keyword.merge([name: __MODULE__], opts)

Agent.start_link(fn -> cache end, opts)
end

@doc """
Expand All @@ -20,27 +26,46 @@ defmodule Tackle.Connection do

open(:foo, [])
"""
def open(name, url) do open_(name, url) end
def open(name, url) do
do_open(name, url)
end

def close(conn) do
AMQP.Connection.close(conn)
end

def reset do
get_all()
|> Enum.each(fn {_name, conn} ->
Tackle.Connection.close(conn)
Agent.update(__MODULE__, fn _state -> %{} end)
end)

:ok
end

@doc """
Get a list of opened connections
"""
def get_all do
Agent.get(__MODULE__, fn state -> state |> Map.to_list end)
Agent.get(__MODULE__, fn state -> state |> Map.to_list() end)
end

defp open_(name=:default, url) do
defp do_open(name = :default, url) do
connection = open(url)
Logger.info("Opening new connection #{inspect connection} for id: #{name}")
Logger.info("Opening new connection #{inspect(connection)} for id: #{name}")
connection
end
defp open_(name, url) do

defp do_open(name, url) do
Agent.get(__MODULE__, fn state -> Map.get(state, name) end)
|> case do
nil ->
open_and_persist(name, url)

connection ->
Logger.info("Fetched existing connection #{inspect connection} for id: #{name}")
Logger.info("Fetched existing connection #{inspect(connection)} for id: #{name}")

connection
|> validate(name)
|> reopen_on_validation_failure(name, url)
Expand All @@ -51,35 +76,42 @@ defmodule Tackle.Connection do
case open(url) do
response = {:ok, connection} ->
Agent.update(__MODULE__, fn state -> Map.put(state, name, connection) end)
Logger.info("Opening new connection #{inspect connection} for id: #{name}")
Logger.debug("Opening new connection #{inspect(connection)} for id: #{name}")
response

error ->
Logger.error("Failed to open new connection for id: #{name}: #{error}")
error
end
end

defp validate(connection, name) do
connection |> Map.get(:pid) |> validate_connection_process(connection, name)
connection.pid |> validate_connection_process(connection, name)
end

def reopen_on_validation_failure(state = {:error, _}, name, url) do
Logger.warn("Connection validation failed #{inspect state} for id: #{name}")
Logger.warn("Connection validation failed #{inspect(state)} for id: #{name}")
Agent.update(__MODULE__, fn state -> Map.delete(state, name) end)
open(name, url)
end

def reopen_on_validation_failure(connection, _name, _url) do
{:ok, connection} end
{:ok, connection}
end

defp validate_connection_process(pid, connection, name) when is_pid(pid) do
pid |> Process.alive? |> validate_connection_process_rh(connection, name)
pid |> Process.alive?() |> validate_connection_process_rh(connection, name)
end

defp validate_connection_process(_pid, connection, name) do
false |> validate_connection_process_rh(connection, name)
end

defp validate_connection_process_rh(_alive? = true, connection, _name) do connection end
defp validate_connection_process_rh(_alive? = false, _connection, _name) do
defp validate_connection_process_rh(_alive? = true, connection, _name) do
connection
end

defp validate_connection_process_rh(_alive? = false, _connection, _name) do
{:error, :no_process}
end

Expand Down
Loading