Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GetMinMaxOffset and GetPartitionsId #775

Closed
wants to merge 18 commits into from
14 changes: 14 additions & 0 deletions kafka/include/userver/kafka/consumer_scope.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,20 @@ class ConsumerScope final {
/// has stopped and leaved the group
void AsyncCommit();

/// @brief Retrieves the minimum and maximum offsets for the specified Kafka topic and partition.
/// @warning This is a blocking call
/// @param topic The name of the Kafka topic.
/// @param partition The partition number of the Kafka topic.
/// @return A pair of integers representing the minimum and maximum offsets for the given topic and partition.
/// The first value is the minimum offset, and the second is the maximum offset.
std::pair<std::int64_t, std::int64_t> GetMinMaxOffset(const std::string& topic, std::int32_t partition);
fdr400 marked this conversation as resolved.
Show resolved Hide resolved
fdr400 marked this conversation as resolved.
Show resolved Hide resolved

/// @brief Retrieves the partition IDs for the specified Kafka topic.
/// @warning This is a blocking call
/// @param topic The name of the Kafka topic.
/// @return A vector of partition IDs for the given topic.
std::vector<std::int32_t> GetPartitionsId(const std::string& topic);
fdr400 marked this conversation as resolved.
Show resolved Hide resolved

private:
friend class impl::Consumer;

Expand Down
8 changes: 8 additions & 0 deletions kafka/src/kafka/consumer_scope.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ void ConsumerScope::Stop() noexcept { consumer_.Stop(); }

void ConsumerScope::AsyncCommit() { consumer_.AsyncCommit(); }

std::pair<std::int64_t, std::int64_t> ConsumerScope::GetMinMaxOffset(const std::string& topic, std::int32_t partition) {
return consumer_.GetMinMaxOffset(topic, partition);
}

std::vector<std::int32_t> ConsumerScope::GetPartitionsId(const std::string& topic){
return consumer_.GetPartitionsId(topic);
}

} // namespace kafka

USERVER_NAMESPACE_END
49 changes: 49 additions & 0 deletions kafka/src/kafka/impl/consumer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <fmt/format.h>
#include <fmt/ranges.h>

#include <boost/range/iterator_range.hpp>
fdr400 marked this conversation as resolved.
Show resolved Hide resolved

#include <userver/kafka/impl/configuration.hpp>
#include <userver/kafka/impl/stats.hpp>
#include <userver/logging/log.hpp>
Expand Down Expand Up @@ -292,6 +294,53 @@ void ConsumerImpl::Commit() { rd_kafka_commit(consumer_.GetHandle(), nullptr, /*

void ConsumerImpl::AsyncCommit() { rd_kafka_commit(consumer_.GetHandle(), nullptr, /*async=*/1); }

std::pair<std::int64_t, std::int64_t> ConsumerImpl::GetMinMaxOffset(const std::string& topic, std::int32_t partition) {
std::int64_t offset_high{-1};
std::int64_t offset_low{-1};

auto err = rd_kafka_query_watermark_offsets(
consumer_.GetHandle(), topic.data(), partition, &offset_low, &offset_high, /*timeout_ms=*/-1
);
fdr400 marked this conversation as resolved.
Show resolved Hide resolved
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
LOG_ERROR() << fmt::format("Failed to get offsets: {}", rd_kafka_err2str(err));
return {-1, -1};
fdr400 marked this conversation as resolved.
Show resolved Hide resolved
}

if (offset_high == RD_KAFKA_OFFSET_INVALID) {
LOG_ERROR() << fmt::format("Invalid offset for topic {} partition {}", topic, partition);
return {-1, -1};
}

return {offset_low, offset_high};
}

std::vector<std::int32_t> ConsumerImpl::GetPartitionsId(const std::string& topic) {
const rd_kafka_metadata_t* raw_metadata{nullptr};
auto err = rd_kafka_metadata(consumer_.GetHandle(), 0, nullptr, &raw_metadata, /*timeout_ms=*/-1);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
LOG_ERROR() << fmt::format("Failed to fetch metadata: ", rd_kafka_err2str(err));
return {};
fdr400 marked this conversation as resolved.
Show resolved Hide resolved
}
MetadataHolder metadata{raw_metadata};
fdr400 marked this conversation as resolved.
Show resolved Hide resolved

auto topics = boost::make_iterator_range(metadata->topics, metadata->topics + metadata->topic_cnt);
auto* topic_it = std::find_if(topics.begin(), topics.end(), [&topic](const rd_kafka_metadata_topic& topic_raw) {
return topic == topic_raw.topic;
});
if (topic_it == topics.end()) {
LOG_ERROR() << fmt::format("Topic not found: {}", topic);
return {};
}

std::vector<std::int32_t> partitions_id;
fdr400 marked this conversation as resolved.
Show resolved Hide resolved
fdr400 marked this conversation as resolved.
Show resolved Hide resolved
for (const auto& partition :
boost::make_iterator_range(topic_it->partitions, topic_it->partitions + topic_it->partition_cnt)) {
fdr400 marked this conversation as resolved.
Show resolved Hide resolved
partitions_id.push_back(partition.id);
}

return partitions_id;
}

EventHolder ConsumerImpl::PollEvent() {
return EventHolder{rd_kafka_queue_poll(consumer_.GetQueue(), /*timeout_ms=*/0)};
}
Expand Down
6 changes: 6 additions & 0 deletions kafka/src/kafka/impl/consumer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ class ConsumerImpl final {
/// @brief Schedules the commitment task.
void AsyncCommit();

/// @brief Retrieves the minimum and maximum offsets for the specified Kafka topic and partition.
std::pair<std::int64_t, std::int64_t> GetMinMaxOffset(const std::string& topic, std::int32_t partition);

/// @brief Retrieves the partition IDs for the specified Kafka topic.
std::vector<std::int32_t> GetPartitionsId(const std::string& topic);

/// @brief Effectively calls `PollMessage` until `deadline` is reached
/// and no more than `max_batch_size` messages polled.
MessageBatch PollBatch(std::size_t max_batch_size, engine::Deadline deadline);
Expand Down
1 change: 1 addition & 0 deletions kafka/src/kafka/impl/holders_aliases.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ using ErrorHolder = HolderBase<rd_kafka_error_t, &rd_kafka_error_destroy>;
using EventHolder = HolderBase<rd_kafka_event_t, &rd_kafka_event_destroy>;
using QueueHolder = HolderBase<rd_kafka_queue_t, &rd_kafka_queue_destroy>;
using TopicPartitionsListHolder = HolderBase<rd_kafka_topic_partition_list_t, &rd_kafka_topic_partition_list_destroy>;
using MetadataHolder = HolderBase<const rd_kafka_metadata_t, &rd_kafka_metadata_destroy>;

using ConsumerHolder = KafkaClientHolder<ClientType::kConsumer>;
using ProducerHolder = KafkaClientHolder<ClientType::kProducer>;
Expand Down
Loading