Skip to content

Commit

Permalink
Add a message bus for transactional outbox deliveries
Browse files Browse the repository at this point in the history
A service that updates database entities, and also writes a related
event to kafka can have some issues if we decide to wrap the database
updates in a transaction, then some error or delay occurs when writing
and event to kafka.  Especially if the event is written synchronously.

This can also cause lock conntention and increased resource useage
on the database server at scale.

* https://microservices.io/patterns/data/transactional-outbox.html
* https://docs.aws.amazon.com/prescriptive-guidance/latest/cloud-design-patterns/transactional-outbox.html

One solution is to write the event stream to a dedicated database
table, and have an additonal process to handle writing the events
to kafka.

This means that the application doesn't need to manage it's own
connections to kafka, and transactions can be used in the normal
way without any downsides or performance degredation.

This change provides a new OutboxMessageBus that can be configured with
an active record model.

e.g.

migration:
```
class CreateKafkaOutboxEvents < ActiveRecord::Migration[7.0]
  def change
    create_table :kafka_outbox_events do |t|
      t.string :topic
      t.string :key
      t.column :payload, :longblob # for avro - text would be more
appropriate for JSON
      t.timestamps
    end

    add_index :kafka_outbox_events, :topic
  end
end
```

config/initializers/streamy.rb:
```
require "streamy/message_buses/outbox_message_bus"
class KafkaOutboxEvent < ActiveRecord::Base; end
Streamy.message_bus = Streamy::MessageBuses::OutboxMessageBus.new(model: KafkaOutboxEvent)
```

This implimentation only allows for the use of a single table as the
outbox.  If we wanted to e.g. use a table per topic then the
implimentation will need to be a bit more complex.

For now, I suspect that indexing on the topic collum will
be good enough, as we can run multiple consuming workers
each selecting a different topic concurrently.

We will only be able to use a single worker to select rows (with
locking) in the consuming process where the backend is MySQL 5.7
however with an upgrade to MySQL 8+ we can make use of `SKIP LOCKED`
to increase concurrency if required.
  • Loading branch information
errm committed Aug 8, 2024
1 parent 2c66bb8 commit 5fac810
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 0 deletions.
22 changes: 22 additions & 0 deletions lib/streamy/message_buses/outbox_message_bus.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
require "streamy/kafka_configuration"
require "waterdrop"
require "active_support/core_ext/hash/indifferent_access"
require "active_support/json"

module Streamy
module MessageBuses
class OutboxMessageBus < MessageBus
def initialize(config)
@model = config[:model]
end

def deliver(key:, topic:, payload:, priority:)
@model.create(key: key, topic: topic, payload: payload)
end

def deliver_many(messages)
@model.create(messages.map { |message| message.except(:priority) })
end
end
end
end
101 changes: 101 additions & 0 deletions test/message_buses/outbox_message_bus_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
require "test_helper"
require "waterdrop"
require "streamy/message_buses/outbox_message_bus"

module Streamy
class OutboxMessageBusTest < Minitest::Test
attr_reader :bus

def setup
@model = mock("outbox_model")
@bus = MessageBuses::OutboxMessageBus.new(model: @model)
end

def example_delivery(priority)
bus.deliver(
payload: payload.to_s,
key: "prk-sg-001",
topic: "charcuterie",
priority: priority
)
end

def payload
{
type: "sausage",
body: { meat: "pork", herbs: "sage" },
event_time: "2018"
}
end

def expected_event(key: "prk-sg-001")
{
payload: {
type: "sausage",
body: {
meat: "pork",
herbs: "sage"
},
event_time: "2018"
}.to_s,
key: key,
topic: "charcuterie"
}
end

def test_standard_priority_deliver
@model.expects(:create).with(expected_event)
example_delivery(:standard)
end

def test_low_priority_deliver
@model.expects(:create).with(expected_event)
example_delivery(:low)
end

def test_essential_priority_deliver
@model.expects(:create).with(expected_event)
example_delivery(:essential)
end

def test_all_priority_delivery
@model.expects(:create).with(expected_event)
example_delivery(:essential)

@model.expects(:create).with(expected_event)
example_delivery(:low)

@model.expects(:create).with(expected_event)
example_delivery(:standard)
end

def test_batch_delivery
@model.expects(:create).with([
expected_event(key: "prk-sg-001"),
expected_event(key: "prk-sg-002"),
expected_event(key: "prk-sg-003")
])

bus.deliver_many([
{
payload: payload.to_s,
key: "prk-sg-001",
topic: "charcuterie",
priority: :standard
},
{
payload: payload.to_s,
key: "prk-sg-002",
topic: "charcuterie",
priority: :standard
},
{
payload: payload.to_s,
key: "prk-sg-003",
topic: "charcuterie",
priority: :standard
}
])
end
end
end

0 comments on commit 5fac810

Please sign in to comment.