From 5fac81041cc7052110a58f9ca5563cb0a280c919 Mon Sep 17 00:00:00 2001 From: Ed Robinson Date: Thu, 8 Aug 2024 11:07:47 +0100 Subject: [PATCH 1/2] Add a message bus for transactional outbox deliveries A service that updates database entities, and also writes a related event to kafka can have some issues if we decide to wrap the database updates in a transaction, then some error or delay occurs when writing and event to kafka. Especially if the event is written synchronously. This can also cause lock conntention and increased resource useage on the database server at scale. * https://microservices.io/patterns/data/transactional-outbox.html * https://docs.aws.amazon.com/prescriptive-guidance/latest/cloud-design-patterns/transactional-outbox.html One solution is to write the event stream to a dedicated database table, and have an additonal process to handle writing the events to kafka. This means that the application doesn't need to manage it's own connections to kafka, and transactions can be used in the normal way without any downsides or performance degredation. This change provides a new OutboxMessageBus that can be configured with an active record model. e.g. migration: ``` class CreateKafkaOutboxEvents < ActiveRecord::Migration[7.0] def change create_table :kafka_outbox_events do |t| t.string :topic t.string :key t.column :payload, :longblob # for avro - text would be more appropriate for JSON t.timestamps end add_index :kafka_outbox_events, :topic end end ``` config/initializers/streamy.rb: ``` require "streamy/message_buses/outbox_message_bus" class KafkaOutboxEvent < ActiveRecord::Base; end Streamy.message_bus = Streamy::MessageBuses::OutboxMessageBus.new(model: KafkaOutboxEvent) ``` This implimentation only allows for the use of a single table as the outbox. If we wanted to e.g. use a table per topic then the implimentation will need to be a bit more complex. For now, I suspect that indexing on the topic collum will be good enough, as we can run multiple consuming workers each selecting a different topic concurrently. We will only be able to use a single worker to select rows (with locking) in the consuming process where the backend is MySQL 5.7 however with an upgrade to MySQL 8+ we can make use of `SKIP LOCKED` to increase concurrency if required. --- .../message_buses/outbox_message_bus.rb | 22 ++++ test/message_buses/outbox_message_bus_test.rb | 101 ++++++++++++++++++ 2 files changed, 123 insertions(+) create mode 100644 lib/streamy/message_buses/outbox_message_bus.rb create mode 100644 test/message_buses/outbox_message_bus_test.rb diff --git a/lib/streamy/message_buses/outbox_message_bus.rb b/lib/streamy/message_buses/outbox_message_bus.rb new file mode 100644 index 0000000..552c284 --- /dev/null +++ b/lib/streamy/message_buses/outbox_message_bus.rb @@ -0,0 +1,22 @@ +require "streamy/kafka_configuration" +require "waterdrop" +require "active_support/core_ext/hash/indifferent_access" +require "active_support/json" + +module Streamy + module MessageBuses + class OutboxMessageBus < MessageBus + def initialize(config) + @model = config[:model] + end + + def deliver(key:, topic:, payload:, priority:) + @model.create(key: key, topic: topic, payload: payload) + end + + def deliver_many(messages) + @model.create(messages.map { |message| message.except(:priority) }) + end + end + end +end diff --git a/test/message_buses/outbox_message_bus_test.rb b/test/message_buses/outbox_message_bus_test.rb new file mode 100644 index 0000000..b61a7b8 --- /dev/null +++ b/test/message_buses/outbox_message_bus_test.rb @@ -0,0 +1,101 @@ +require "test_helper" +require "waterdrop" +require "streamy/message_buses/outbox_message_bus" + +module Streamy + class OutboxMessageBusTest < Minitest::Test + attr_reader :bus + + def setup + @model = mock("outbox_model") + @bus = MessageBuses::OutboxMessageBus.new(model: @model) + end + + def example_delivery(priority) + bus.deliver( + payload: payload.to_s, + key: "prk-sg-001", + topic: "charcuterie", + priority: priority + ) + end + + def payload + { + type: "sausage", + body: { meat: "pork", herbs: "sage" }, + event_time: "2018" + } + end + + def expected_event(key: "prk-sg-001") + { + payload: { + type: "sausage", + body: { + meat: "pork", + herbs: "sage" + }, + event_time: "2018" + }.to_s, + key: key, + topic: "charcuterie" + } + end + + def test_standard_priority_deliver + @model.expects(:create).with(expected_event) + example_delivery(:standard) + end + + def test_low_priority_deliver + @model.expects(:create).with(expected_event) + example_delivery(:low) + end + + def test_essential_priority_deliver + @model.expects(:create).with(expected_event) + example_delivery(:essential) + end + + def test_all_priority_delivery + @model.expects(:create).with(expected_event) + example_delivery(:essential) + + @model.expects(:create).with(expected_event) + example_delivery(:low) + + @model.expects(:create).with(expected_event) + example_delivery(:standard) + end + + def test_batch_delivery + @model.expects(:create).with([ + expected_event(key: "prk-sg-001"), + expected_event(key: "prk-sg-002"), + expected_event(key: "prk-sg-003") + ]) + + bus.deliver_many([ + { + payload: payload.to_s, + key: "prk-sg-001", + topic: "charcuterie", + priority: :standard + }, + { + payload: payload.to_s, + key: "prk-sg-002", + topic: "charcuterie", + priority: :standard + }, + { + payload: payload.to_s, + key: "prk-sg-003", + topic: "charcuterie", + priority: :standard + } + ]) + end + end +end From abd1ba1787908a7abe86acd09a849772c1491ba9 Mon Sep 17 00:00:00 2001 From: Ed Robinson Date: Thu, 8 Aug 2024 12:21:15 +0100 Subject: [PATCH 2/2] Add dependency on ostruct Ruby 3.4 moves OpenStruct to a bundled gem Ruby 3.5 will raise errors e.g. https://github.com/rails/rails/pull/51468 --- lib/streamy/event_handler.rb | 1 + streamy.gemspec | 1 + test/avro_deserializer_test.rb | 1 + 3 files changed, 3 insertions(+) diff --git a/lib/streamy/event_handler.rb b/lib/streamy/event_handler.rb index 561fd5a..e8b4328 100644 --- a/lib/streamy/event_handler.rb +++ b/lib/streamy/event_handler.rb @@ -1,4 +1,5 @@ require "active_support/core_ext/hash/indifferent_access" +require "ostruct" module Streamy class EventHandler diff --git a/streamy.gemspec b/streamy.gemspec index 020fd81..3293c9b 100644 --- a/streamy.gemspec +++ b/streamy.gemspec @@ -45,4 +45,5 @@ Gem::Specification.new do |spec| # rubocop:disable Metrics/BlockLength spec.add_dependency "avro_turf", "~> 1.3.0" spec.add_dependency "waterdrop", ">= 2.4.10", "< 3.0.0" spec.add_dependency "webmock", "~> 3.3" + spec.add_dependency "ostruct" end diff --git a/test/avro_deserializer_test.rb b/test/avro_deserializer_test.rb index 3eb4600..68a81bd 100644 --- a/test/avro_deserializer_test.rb +++ b/test/avro_deserializer_test.rb @@ -1,6 +1,7 @@ require "test_helper" require "avro_turf/test/fake_confluent_schema_registry_server" require "webmock/minitest" +require "ostruct" module Streamy class AvroDeserializerTest < Minitest::Test