Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GetMinMaxOffset and GetPartitionsId #775

Closed
wants to merge 18 commits into from
14 changes: 14 additions & 0 deletions kafka/include/userver/kafka/consumer_scope.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <functional>

#include <userver/kafka/message.hpp>
#include <userver/kafka/topic.hpp>

USERVER_NAMESPACE_BEGIN

Expand Down Expand Up @@ -114,6 +115,19 @@ class ConsumerScope final {
/// has stopped and leaved the group
void AsyncCommit();

/// @brief Retrieves the minimum and maximum offsets for the specified Kafka topic and partition.
/// @warning This is a blocking call
/// @param topic The name of the Kafka topic.
/// @param partition The partition number of the Kafka topic.
/// @return A struct with minimum and maximum offsets for the given topic and partition.
OffsetRange GetOffsetRange(const std::string& topic, std::int32_t partition) const;

/// @brief Retrieves the partition IDs for the specified Kafka topic.
/// @warning This is a blocking call
/// @param topic The name of the Kafka topic.
/// @return A vector of partition IDs for the given topic.
std::vector<std::uint32_t> GetPartitionIds(const std::string& topic) const;

private:
friend class impl::Consumer;

Expand Down
8 changes: 8 additions & 0 deletions kafka/include/userver/kafka/impl/consumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ class Consumer final {
/// understanding
void AsyncCommit();

/// @brief Retrieves the low and high offsets for the specified kafka topic and partition.
/// @see ConsumerScope::GetOffsetRange for better commitment process
OffsetRange GetOffsetRange(const std::string& topic, std::int32_t partition) const;

/// @brief Retrieves the partition IDs for the specified kafka topic.
/// @see ConsumerScope::GetPartitionIds for better commitment process
std::vector<std::uint32_t> GetPartitionIds(const std::string& topic) const;

/// @brief Adds consumer name to current span.
void ExtendCurrentSpan() const;

Expand Down
14 changes: 14 additions & 0 deletions kafka/include/userver/kafka/topic.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#pragma once

USERVER_NAMESPACE_BEGIN

namespace kafka {

struct OffsetRange final {
std::int64_t high{};
std::int64_t low{};
};

} // namespace kafka

USERVER_NAMESPACE_END
8 changes: 8 additions & 0 deletions kafka/src/kafka/consumer_scope.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ void ConsumerScope::Stop() noexcept { consumer_.Stop(); }

void ConsumerScope::AsyncCommit() { consumer_.AsyncCommit(); }

OffsetRange ConsumerScope::GetOffsetRange(const std::string& topic, std::int32_t partition) const {
return consumer_.GetOffsetRange(topic, partition);
}

std::vector<std::uint32_t> ConsumerScope::GetPartitionIds(const std::string& topic) const {
return consumer_.GetPartitionIds(topic);
}

} // namespace kafka

USERVER_NAMESPACE_END
8 changes: 8 additions & 0 deletions kafka/src/kafka/impl/consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,14 @@ void Consumer::AsyncCommit() {
}).Get();
}

OffsetRange Consumer::GetOffsetRange(const std::string& topic, std::int32_t partition) const {
consumer_->GetOffsetRange(topic, partition);
}

std::vector<std::uint32_t> Consumer::GetPartitionIds(const std::string& topic) const {
consumer_->GetPartitionIds(topic);
}

void Consumer::Stop() noexcept {
if (processing_.exchange(false) && poll_task_.IsValid()) {
UINVARIANT(consumer_, "Stopping already stopped consumer");
Expand Down
54 changes: 54 additions & 0 deletions kafka/src/kafka/impl/consumer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <userver/utils/fast_scope_guard.hpp>
#include <userver/utils/span.hpp>

#include <kafka/impl/error_buffer.hpp>
#include <kafka/impl/holders_aliases.hpp>
#include <kafka/impl/log_level.hpp>

Expand Down Expand Up @@ -292,6 +293,59 @@ void ConsumerImpl::Commit() { rd_kafka_commit(consumer_.GetHandle(), nullptr, /*

void ConsumerImpl::AsyncCommit() { rd_kafka_commit(consumer_.GetHandle(), nullptr, /*async=*/1); }

OffsetRange ConsumerImpl::GetOffsetRange(const std::string& topic, std::int32_t partition) const {
OffsetRange offset_range;
auto err = rd_kafka_query_watermark_offsets(
consumer_.GetHandle(),
topic.с_str(),
partition,
&offset_range.low,
&offset_range.high,
/*timeout_ms=*/-1
);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
PrintErrorAndThrow("get offsets", rd_kafka_err2str(err));
}

if (offset_range.low == RD_KAFKA_OFFSET_INVALID || offset_range.high == RD_KAFKA_OFFSET_INVALID) {
PrintErrorAndThrow("get offsets", fmt::format("invalid offset for topic {} partition {}", topic, partition));
}

return offset_range;
}

std::vector<std::uint32_t> ConsumerImpl::GetPartitionIds(const std::string& topic) const {
MetadataHolder metadata{[&consumer_] {
const rd_kafka_metadata_t* raw_metadata{nullptr};
auto err = rd_kafka_metadata(consumer_.GetHandle(), 0, nullptr, &raw_metadata, /*timeout_ms=*/-1);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
PrintErrorAndThrow("fetch metadata", rd_kafka_err2str(err));
}
return raw_metadata;
}()};

userver::utils::span<const rd_kafka_metadata_topic> topics{
metadata->topics, static_cast<std::size_t>(metadata->topic_cnt)};
const auto* topic_it =
std::find_if(topics.begin(), topics.end(), [&topic](const rd_kafka_metadata_topic& topic_raw) {
return topic == topic_raw.topic;
});
if (topic_it == topics.end()) {
PrintErrorAndThrow("find topic", fmt::format("{} not found", topic));
}

userver::utils::span<const rd_kafka_metadata_partition> partitions{
topic_it->partitions, static_cast<std::size_t>(topic_it->partition_cnt)};
std::vector<std::uint32_t> partition_ids;
partition_ids.reserve(partitions.size());

for (const auto& partition : partitions) {
partition_ids.push_back(partition.id);
}

return partition_ids;
}

EventHolder ConsumerImpl::PollEvent() {
return EventHolder{rd_kafka_queue_poll(consumer_.GetQueue(), /*timeout_ms=*/0)};
}
Expand Down
7 changes: 7 additions & 0 deletions kafka/src/kafka/impl/consumer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <userver/engine/single_consumer_event.hpp>
#include <userver/kafka/impl/holders.hpp>
#include <userver/kafka/message.hpp>
#include <userver/kafka/topic.hpp>

#include <kafka/impl/holders_aliases.hpp>

Expand Down Expand Up @@ -37,6 +38,12 @@ class ConsumerImpl final {
/// @brief Schedules the commitment task.
void AsyncCommit();

/// @brief Retrieves the low and high offsets for the specified kafka topic and partition.
OffsetRange GetOffsetRange(const std::string& topic, std::int32_t partition) const;

/// @brief Retrieves the partition IDs for the specified kafka topic.
std::vector<std::uint32_t> GetPartitionIds(const std::string& topic) const;

/// @brief Effectively calls `PollMessage` until `deadline` is reached
/// and no more than `max_batch_size` messages polled.
MessageBatch PollBatch(std::size_t max_batch_size, engine::Deadline deadline);
Expand Down
6 changes: 6 additions & 0 deletions kafka/src/kafka/impl/error_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ void PrintErrorAndThrow(std::string_view failed_action, const ErrorBuffer& err_b
throw std::runtime_error{full_error};
}

void PrintErrorAndThrow(std::string_view failed_action, std::string_view reason) {
const auto full_error = fmt::format("Failed to {}: {}", failed_action, reason);
LOG_ERROR() << full_error;
throw std::runtime_error{full_error};
}

} // namespace kafka::impl

USERVER_NAMESPACE_END
1 change: 1 addition & 0 deletions kafka/src/kafka/impl/error_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ constexpr std::size_t kErrorBufferSize = 512;
using ErrorBuffer = std::array<char, kErrorBufferSize>;

[[noreturn]] void PrintErrorAndThrow(std::string_view failed_action, const ErrorBuffer& err_buf);
[[noreturn]] void PrintErrorAndThrow(std::string_view failed_action, std::string_view reason);

} // namespace kafka::impl

Expand Down
1 change: 1 addition & 0 deletions kafka/src/kafka/impl/holders_aliases.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ using ErrorHolder = HolderBase<rd_kafka_error_t, &rd_kafka_error_destroy>;
using EventHolder = HolderBase<rd_kafka_event_t, &rd_kafka_event_destroy>;
using QueueHolder = HolderBase<rd_kafka_queue_t, &rd_kafka_queue_destroy>;
using TopicPartitionsListHolder = HolderBase<rd_kafka_topic_partition_list_t, &rd_kafka_topic_partition_list_destroy>;
using MetadataHolder = HolderBase<const rd_kafka_metadata_t, &rd_kafka_metadata_destroy>;

using ConsumerHolder = KafkaClientHolder<ClientType::kConsumer>;
using ProducerHolder = KafkaClientHolder<ClientType::kProducer>;
Expand Down
Loading