diff --git a/lib/streamy/dispatcher.rb b/lib/streamy/dispatcher.rb index d6dbbd8..980402c 100644 --- a/lib/streamy/dispatcher.rb +++ b/lib/streamy/dispatcher.rb @@ -8,6 +8,10 @@ def dispatch Streamy.message_bus.deliver(**message_params) end + def self.dispatch_many(events) + Streamy.message_bus.deliver_many(events.map(&:to_message)) + end + private attr_reader :event diff --git a/lib/streamy/event.rb b/lib/streamy/event.rb index 1082d24..eaa20e3 100644 --- a/lib/streamy/event.rb +++ b/lib/streamy/event.rb @@ -16,6 +16,10 @@ def self.publish(**args) new(**args).publish end + def self.publish_many(events) + Streamy.dispatcher.dispatch_many(events) + end + priority :standard def publish diff --git a/lib/streamy/message_buses/kafka_message_bus.rb b/lib/streamy/message_buses/kafka_message_bus.rb index 2936b6b..902bbfa 100644 --- a/lib/streamy/message_buses/kafka_message_bus.rb +++ b/lib/streamy/message_buses/kafka_message_bus.rb @@ -23,6 +23,11 @@ def deliver(key:, topic:, payload:, priority:) end end + def deliver_many(messages) + messages = messages.map { |message| message.except(:priority) } + sync_producer.produce_many_sync(messages) + end + def shutdown async_producer.close if async_producer? sync_producers.map(&:close) diff --git a/lib/streamy/message_buses/message_bus.rb b/lib/streamy/message_buses/message_bus.rb index c45faec..165289f 100644 --- a/lib/streamy/message_buses/message_bus.rb +++ b/lib/streamy/message_buses/message_bus.rb @@ -4,6 +4,10 @@ class MessageBus def deliver(key:, topic:, payload:, priority:) # NOOP: Implement delivery logic end + + def deliver_many(messages) + # NOOP: Implement delivery logic + end end end end diff --git a/lib/streamy/test_dispatcher.rb b/lib/streamy/test_dispatcher.rb index bdd316a..1b0754a 100644 --- a/lib/streamy/test_dispatcher.rb +++ b/lib/streamy/test_dispatcher.rb @@ -12,5 +12,10 @@ def dispatch events << event_params messages << message_params end + + def self.dispatch_many(events) + self.events += events.map(&:to_params) + self.messages += events.map(&:to_message) + end end end