Client library for RabbitMQ build on top of amqp_client.
fox allows to create one or seleral connection pools, monitors connections inside pool and reconnects them if needed.
fox:create_connection_pool(PoolName, ConenctionParams) creates pool with several connections (5 by default). PoolName should be an atom()
, string()
or binary()
. ConnectionParams should be record #amqp_params_network{} or map()
with the same fields.
Params = #{host => "localhost",
port => 5672,
virtual_host => <<"/">>,
username => <<"guest">>,
password => <<"guest">>},
fox:create_connection_pool(my_pool, Params),
fox:close_connection_pool(PoolName) closes all connections and removes the pool.
Before stating a pool you can check your connection params by calling fox:validate_params_network(Params). If params are valid to connect to RabbitMQ, function returns ok
, otherwise it returns {error, Reason}
.
Many APIs need a channel. You can get channel with fox:get_channel(PoolName). Function returns {ok, ChannelPid}
or {error, Reason}
.
{ok, Channel} = fox:get_channel(my_pool),
fox:declare_exchange(Channel, <<"my_exchange">>)),
fox:declare_queue(Channel, <<"my_queue">>)),
fox:bind_queue(Channel, <<"my_queue">>, <<"my_exchange">>, <<"my_key">>)),
amqp_client library communicates with RabbitMQ with amqp_channel:call/cast calls. It uses records like #'basic.publish'{}
, #'exchange.declare'{}
, #'queue.bind'{}
etc. And this is not very convenient.
Here you can find the full list of records: rabbit_framing.hrl.
fox provides wrappers for most often used actions. For example, instead of
BPublish = #'basic.publish'{exchange = Exchange, routing_key = RKey},
Message = #amqp_msg{payload = <<"foobar">>},
amqp_channel:cast(Channel, BPublish, Message)
you can use
fox:publish(Channel, Exchange, RKey, <<"foobar">>)
And instead of
BPublish = #'basic.publish'{exchange = Exchange, routing_key = RKey},
Props = #'P_basic'{delivery_mode = 2}, %% persistent message
Message = #amqp_msg{props = Props, payload = <<"foobar">>},
amqp_channel:cast(Channel, BPublish, Message)
you can use
fox:publish(Channel, Exchange, RKey, <<"foobar">>, #{delivery_mode => 2})
There are wrappers for publish
, declare_exchange
, delete_exchange
, declare_queue
, delete_queue
, bind_queue
, unbind_queue
and qos
.
To publish a message to RabbitMQ use fox:publish. First agrument should be channel pid or pool name.
fox:publish(my_pool, Exchange, RougingKey, <<"Message">>)
The most sophisticated part of working with amqp_channel is subscription. You need gen_server process to accept messages, and send pid of this process as argument to amqp_client:subscribe/3.
fox works differently. First you create callback module implementing fox_subs_worker behaviour. This is something similar to cowboy handler. Then you call fox:subscribe
. fox creates channel and new process for you module, subscribes to queues and routes messages to your callback module.
{ok, Ref} = fox:subscribe(my_pool, <<"my_queue">>, my_callback_module, CallbackInitArgs)
The first argument is a pool name. The second argument is queue. Queue can be a queue name (binary()
) or a #'basic.consume'{}
record. Use basic.consume
if you need to defined some additional parameters for queue like exclusive
, nowait
, no_ack
etc. The forth argument (Args) used to init you callback module. subscribe returns reference()
needed fo unsubscribe.
fox_subs_worker behaviour includes 3 functions:
init(ChannelPid, Args) gets channel pid and Args (the forth argument to fox:subscribe
). Here you can do any initialization steps, like creating exchanges, queues and bindings. init
returns some state which will be later used in other callbacks.
init(Channel, Args) ->
ok = fox:declare_exchange(Channel, Exchange),
ok = fox:declare_queue(Channel, Queue),
ok = fox:bind_queue(Channel, Queue, Exchange, RoutingKey),
State = ...,
{ok, State}.
handle(Data, ChannelPid, State) called each time new message arrives. Here you can process message, reply with #'basic.ack'{}
or #'basic.reject'{}
, or don't reply at all. Function should return a new state.
handle({#'basic.deliver'{delivery_tag = Tag}, #amqp_msg{payload = Payload}}, ChannelPid, State) ->
do_something,
amqp_channel:cast(ChannelPid, #'basic.ack'{delivery_tag = Tag}),
{ok, State};
terminate(ChannelPid, State) is called on unsubscribe
or pool closing. Here you can free resources, remove exchanges and queue.
terminate(ChannelPid, State) ->
fox:unbind_queue(ChannelPid, Queue, Exchange, RoutingKey),
fox:delete_queue(ChannelPid, Queue)
fox:delete_exchange(ChannelPid, Exchange),
ok.
Here is a sample callback module: sample_subs_callback
fox:unsubscribe(PoolName, Ref) removes subscription.