Handy little queues and producers that make using GenStage
or Broadway
a breeze.
Getting events into a GenStage
or Broadway
pipeline is more difficult than it seems at first glance.
Any message producer based on GenStage
needs to track demand,
and emit events as it receives them if there is surplus demand.
Also, any producer implementation needs to provide back-pressure,
a key part of GenStage
and Broadway
's design.
That's why a standalone blocking queue process is most the most ideal producer for these libraries.
- Decouples message producers and consumers, so producers do not need to wait for the consumer to be ready
- Blocks when full, so producers don't keep producing messages if the consumer is overwhelmed
- Tracks demand, and immediately emits events if demand is pending
This library also provides a blocking queue implementation that can be used in pure Elixir.
This library is available in Hex, and the package can be installed
by adding queutils
to your list of dependencies in mix.exs
:
def deps do
[
{:queutils, "~> 1.2"}
]
end
Documentation can be generated with ExDoc and can be found online at https://hexdocs.pm/queutils.
If you just want a queue to communicate between processes, use a Queutils.BlockingQueue
.
This module implements a queue with a fixed length,
and any calls to Queutils.BlockingQueue.push/2
will block until the queue has room again.
If you're working with GenStage
, you probably want to use Queutils.BlockingProducer
.
This module is just like a Queutils.BlockingQueue
,
but it also provides callbacks that let a GenStage
consumer subscribe to it.
This way you can push messages into a GenStage
pipeline.
Lastly, a Queutils.BlockingQueueProducer
acts just like a Queutils.BlockingProducer
,
except you need to provide the queue yourself.
This is the module to use if you're working with Broadway
,
because Broadway
stages needs to start up their producers themselves.
Add Queutils.BlockingQueue
to your application supervisor's start/2
function, like this:
def start(_type, _args) do
children = [
{Queutils.BlockingQueue, name: MessageQueue, max_length: 10_000},
]
opts = [strategy: :one_for_one, name: MyApplication.Supervisor]
Supervisor.start_link(children, opts)
end
You can now push messages to the queue like this:
:ok = Queutils.BlockingQueue.push(MessageQueue, :my_message)
and pop from it like this:
:my_message = Queutils.BlockingQueue.pop(MessageQueue)
Add Queutils.BlockingProducer
to your application supervisor's start/2
function, like this:
def start(_type, _args) do
children = [
{Queutils.BlockingProducer, name: MessageProducer, max_length: 10_000}
]
opts = [strategy: :one_for_one, name: MyApplication.Supervisor]
Supervisor.start_link(children, opts)
end
Then, subscribe a GenStage
to it.
def init(:ok) do
{:consumer, :the_state_does_not_matter, subscribe_to: [MessageProducer]}
end
You can now push messages to the queue like this:
:ok = Queutils.BlockingProducer.push(MessageProducer, :my_message)
Add Queutils.BlockingQueue
to your application supervisor's start/2
function,
just like we're using it with plain Elixir:
def start(_type, _args) do
children = [
{Queutils.BlockingQueue, name: MessageQueue, max_length: 10_000},
]
opts = [strategy: :one_for_one, name: MyApplication.Supervisor]
Supervisor.start_link(children, opts)
end
Then, add a Queutils.BlockingQueueProducer
as your Broadway
stage's producer,
pointing it to the queue you just created.
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {Queutils.BlockingQueueProducer, queue: MessageQueue},
transformer: {MyApplication.Transformer, :transform, []}
],
processors: [
default: []
]
)
end
You will need to add a :transformer
option to your Broadway
stage in order to wrap messages in a Broadway.Message
struct.
It's easy, but needs to be done.
See Broadway
's Custom Producers documentation for details.
You can now push to the queue like this, and your Broadway
stage will pick it up:
:ok = Queutils.Blockingqueue.push(MessageQueue, :my_message)
This project was developed by Rosa Richter. You can get in touch with her on Keybase.io.
Questions and pull requests are more than welcome. I follow Elixir's tenet of bad documentation being a bug, so if anything is unclear, please file an issue! Ideally, my answer to your question will be in an update to the docs.
Please see CONTRIBUTING.md for all the details you could ever want about helping me with this project.
Note that this project is released with a Contributor Code of Conduct. By participating in this project you agree to abide by its terms.
MIT License
Copyright 2020 Rosa Richter.
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.