diff --git a/lib/streamy/message_buses/outbox_message_bus.rb b/lib/streamy/message_buses/outbox_message_bus.rb new file mode 100644 index 0000000..552c284 --- /dev/null +++ b/lib/streamy/message_buses/outbox_message_bus.rb @@ -0,0 +1,22 @@ +require "streamy/kafka_configuration" +require "waterdrop" +require "active_support/core_ext/hash/indifferent_access" +require "active_support/json" + +module Streamy + module MessageBuses + class OutboxMessageBus < MessageBus + def initialize(config) + @model = config[:model] + end + + def deliver(key:, topic:, payload:, priority:) + @model.create(key: key, topic: topic, payload: payload) + end + + def deliver_many(messages) + @model.create(messages.map { |message| message.except(:priority) }) + end + end + end +end diff --git a/test/message_buses/outbox_message_bus_test.rb b/test/message_buses/outbox_message_bus_test.rb new file mode 100644 index 0000000..b61a7b8 --- /dev/null +++ b/test/message_buses/outbox_message_bus_test.rb @@ -0,0 +1,101 @@ +require "test_helper" +require "waterdrop" +require "streamy/message_buses/outbox_message_bus" + +module Streamy + class OutboxMessageBusTest < Minitest::Test + attr_reader :bus + + def setup + @model = mock("outbox_model") + @bus = MessageBuses::OutboxMessageBus.new(model: @model) + end + + def example_delivery(priority) + bus.deliver( + payload: payload.to_s, + key: "prk-sg-001", + topic: "charcuterie", + priority: priority + ) + end + + def payload + { + type: "sausage", + body: { meat: "pork", herbs: "sage" }, + event_time: "2018" + } + end + + def expected_event(key: "prk-sg-001") + { + payload: { + type: "sausage", + body: { + meat: "pork", + herbs: "sage" + }, + event_time: "2018" + }.to_s, + key: key, + topic: "charcuterie" + } + end + + def test_standard_priority_deliver + @model.expects(:create).with(expected_event) + example_delivery(:standard) + end + + def test_low_priority_deliver + @model.expects(:create).with(expected_event) + example_delivery(:low) + end + + def test_essential_priority_deliver + @model.expects(:create).with(expected_event) + example_delivery(:essential) + end + + def test_all_priority_delivery + @model.expects(:create).with(expected_event) + example_delivery(:essential) + + @model.expects(:create).with(expected_event) + example_delivery(:low) + + @model.expects(:create).with(expected_event) + example_delivery(:standard) + end + + def test_batch_delivery + @model.expects(:create).with([ + expected_event(key: "prk-sg-001"), + expected_event(key: "prk-sg-002"), + expected_event(key: "prk-sg-003") + ]) + + bus.deliver_many([ + { + payload: payload.to_s, + key: "prk-sg-001", + topic: "charcuterie", + priority: :standard + }, + { + payload: payload.to_s, + key: "prk-sg-002", + topic: "charcuterie", + priority: :standard + }, + { + payload: payload.to_s, + key: "prk-sg-003", + topic: "charcuterie", + priority: :standard + } + ]) + end + end +end