Skip to content

Commit

Permalink
sea-streamer-kafka 0.3.2
Browse files Browse the repository at this point in the history
  • Loading branch information
tyt2y3 committed Nov 19, 2023
1 parent f0fe169 commit 02d7a97
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 2 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

### `sea-streamer-kafka` 0.3.2 - 2023-11-19

* Added `KafkaProducer::send_record`, `KafkaProducer::send_message` https://github.com/SeaQL/sea-streamer/pull/17

### `sea-streamer-file` 0.3.8 - 2023-11-17

* Added `FileSource::drain`
Expand Down
2 changes: 1 addition & 1 deletion sea-streamer-kafka/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "sea-streamer-kafka"
version = "0.3.1"
version = "0.3.2"
authors = ["Chris Tsang <[email protected]>"]
edition = "2021"
description = "🌊 SeaStreamer Kafka / Redpanda Backend"
Expand Down
20 changes: 19 additions & 1 deletion sea-streamer-kafka/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl KafkaProducer {
.expect("Producer is still inside a transaction, please await the future")
}

/// Send a `FutureRecord` to a stream
/// Send a raw [`FutureRecord`] to a stream
pub fn send_record<K, P>(&self, record: FutureRecord<K, P>) -> KafkaResult<SendFuture>
where
K: rdkafka::message::ToBytes + ?Sized,
Expand All @@ -153,6 +153,24 @@ impl KafkaProducer {
})
}

/// Send a message to a particular (topic, partition).
/// The `timestamp` and `sequence` of [`MessageHeader`] is currently ignored.
pub fn send_message<S: Buffer>(
&self,
header: MessageHeader,
payload: S,
) -> KafkaResult<SendFuture> {
let partition = header
.shard_id()
.id()
.try_into()
.expect("shard_id out of range");
let record = FutureRecord::<(), _>::to(header.stream_key().name())
.partition(partition)
.payload(payload.as_bytes());
self.send_record(record)
}

/// Returns the number of messages that are either waiting to be sent or
/// are sent but are waiting to be acknowledged.
pub fn in_flight_count(&self) -> i32 {
Expand Down

0 comments on commit 02d7a97

Please sign in to comment.