Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GetMinMaxOffset and GetPartitionsId #775

Closed
wants to merge 18 commits into from
21 changes: 21 additions & 0 deletions kafka/include/userver/kafka/consumer_scope.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <functional>

#include <userver/kafka/message.hpp>
#include <userver/kafka/offset_range.hpp>

USERVER_NAMESPACE_BEGIN

Expand Down Expand Up @@ -114,6 +115,26 @@ class ConsumerScope final {
/// has stopped and leaved the group
void AsyncCommit();

/// @brief Retrieves the minimum and maximum offsets for the specified topic and partition.
/// @throws GetOffsetRangeException if the offsets could not retrieve or if the returned offsets are invalid
fdr400 marked this conversation as resolved.
Show resolved Hide resolved
/// @warning This is a blocking call
/// @param topic The name of the topic.
/// @param partition The partition number of the topic.
/// @returns A struct with minimum and maximum offsets for the given topic and partition.
OffsetRange GetOffsetRange(
const std::string& topic,
std::uint32_t partition,
std::optional<std::chrono::milliseconds> timeout = std::nullopt
) const;

/// @brief Retrieves the partition IDs for the specified topic.
/// @throws GetMetadataException if failed to fetch metadata and TopicNotFoundException if topic not found
/// @warning This is a blocking call
/// @param topic The name of the topic.
/// @returns A vector of partition IDs for the given topic.
std::vector<std::uint32_t>
GetPartitionIds(const std::string& topic, std::optional<std::chrono::milliseconds> timeout = std::nullopt) const;

private:
friend class impl::Consumer;

Expand Down
21 changes: 21 additions & 0 deletions kafka/include/userver/kafka/exceptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,27 @@ class UnknownPartitionException final : public SendException {
UnknownPartitionException();
};

class GetOffsetRangeException : public std::runtime_error {
fdr400 marked this conversation as resolved.
Show resolved Hide resolved
fdr400 marked this conversation as resolved.
Show resolved Hide resolved
public:
using std::runtime_error::runtime_error;

GetOffsetRangeException(const char* what) : std::runtime_error(what) {}
fdr400 marked this conversation as resolved.
Show resolved Hide resolved
};

class TopicNotFoundException final : public std::runtime_error {
public:
using std::runtime_error::runtime_error;

TopicNotFoundException(const char* what) : std::runtime_error(what) {}
};

class GetMetadataException final : public std::runtime_error {
public:
using std::runtime_error::runtime_error;

GetMetadataException(const char* what) : std::runtime_error(what) {}
};

} // namespace kafka

USERVER_NAMESPACE_END
16 changes: 16 additions & 0 deletions kafka/include/userver/kafka/impl/consumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@ class Consumer final {
/// @param topics stands for topics list that consumer subscribes to
/// after ConsumerScope::Start called
/// @param consumer_task_processor -- task processor for message batches
/// @param consumer_operation_task_processor -- task processor for consumer operation
/// polling
/// All callbacks are invoked in `main_task_processor`
Consumer(
const std::string& name,
const std::vector<std::string>& topics,
engine::TaskProcessor& consumer_task_processor,
engine::TaskProcessor& consumer_operation_task_processor,
fdr400 marked this conversation as resolved.
Show resolved Hide resolved
engine::TaskProcessor& main_task_processor,
const ConsumerConfiguration& consumer_configuration,
const Secret& secrets,
Expand Down Expand Up @@ -96,6 +98,19 @@ class Consumer final {
/// understanding
void AsyncCommit();

/// @brief Retrieves the low and high offsets for the specified topic and partition.
/// @see ConsumerScope::GetOffsetRange for better commitment process
OffsetRange GetOffsetRange(
const std::string& topic,
std::uint32_t partition,
std::optional<std::chrono::milliseconds> timeout = std::nullopt
) const;

/// @brief Retrieves the partition IDs for the specified topic.
/// @see ConsumerScope::GetPartitionIds for better commitment process
std::vector<std::uint32_t>
GetPartitionIds(const std::string& topic, std::optional<std::chrono::milliseconds> timeout = std::nullopt) const;

/// @brief Adds consumer name to current span.
void ExtendCurrentSpan() const;

Expand All @@ -111,6 +126,7 @@ class Consumer final {
const ConsumerExecutionParams execution_params;

engine::TaskProcessor& consumer_task_processor_;
engine::TaskProcessor& consumer_operation_task_processor_;
engine::TaskProcessor& main_task_processor_;

ConfHolder conf_;
Expand Down
19 changes: 19 additions & 0 deletions kafka/include/userver/kafka/offset_range.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#pragma once

#include <cstdint>

USERVER_NAMESPACE_BEGIN

namespace kafka {

struct OffsetRange final {
fdr400 marked this conversation as resolved.
Show resolved Hide resolved
///@brief The low watermark offset. It indicates the earliest available offset in Kafka.
std::uint32_t low{};

///@brief The high watermark offset. It indicates the next offset that will be written in Kafka.
std::uint32_t high{};
};

} // namespace kafka

USERVER_NAMESPACE_END
1 change: 1 addition & 0 deletions kafka/src/kafka/consumer_component.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ ConsumerComponent::ConsumerComponent(
config.Name(),
config["topics"].As<std::vector<std::string>>(),
context.GetTaskProcessor("consumer-task-processor"),
context.GetTaskProcessor("consumer-operation-task-processor"),
context.GetTaskProcessor("main-task-processor"),
config.As<impl::ConsumerConfiguration>(),
context.FindComponent<components::Secdist>().Get().Get<impl::BrokerSecrets>().GetSecretByComponentName(
Expand Down
13 changes: 13 additions & 0 deletions kafka/src/kafka/consumer_scope.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@ void ConsumerScope::Stop() noexcept { consumer_.Stop(); }

void ConsumerScope::AsyncCommit() { consumer_.AsyncCommit(); }

OffsetRange ConsumerScope::GetOffsetRange(
const std::string& topic,
std::uint32_t partition,
std::optional<std::chrono::milliseconds> timeout
) const {
return consumer_.GetOffsetRange(topic, partition, timeout);
}

std::vector<std::uint32_t>
ConsumerScope::GetPartitionIds(const std::string& topic, std::optional<std::chrono::milliseconds> timeout) const {
return consumer_.GetPartitionIds(topic, timeout);
}

} // namespace kafka

USERVER_NAMESPACE_END
31 changes: 31 additions & 0 deletions kafka/src/kafka/impl/consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ Consumer::Consumer(
const std::string& name,
const std::vector<std::string>& topics,
engine::TaskProcessor& consumer_task_processor,
engine::TaskProcessor& consumer_operation_task_processor,
engine::TaskProcessor& main_task_processor,
const ConsumerConfiguration& configuration,
const Secret& secrets,
Expand All @@ -62,6 +63,7 @@ Consumer::Consumer(
topics_(topics),
execution_params(params),
consumer_task_processor_(consumer_task_processor),
consumer_operation_task_processor_(consumer_operation_task_processor),
main_task_processor_(main_task_processor),
conf_(Configuration{name, configuration, secrets}.Release()) {
/// To check configuration validity
Expand Down Expand Up @@ -166,6 +168,35 @@ void Consumer::AsyncCommit() {
}).Get();
}

OffsetRange Consumer::GetOffsetRange(
const std::string& topic,
std::uint32_t partition,
std::optional<std::chrono::milliseconds> timeout
) const {
return utils::Async(
consumer_operation_task_processor_,
"consumer_getting_offset",
[this, &topic, partition, &timeout] {
ExtendCurrentSpan();

return consumer_->GetOffsetRange(topic, partition, timeout);
}
).Get();
}

std::vector<std::uint32_t>
Consumer::GetPartitionIds(const std::string& topic, std::optional<std::chrono::milliseconds> timeout) const {
return utils::Async(
consumer_operation_task_processor_,
"consumer_getting_partition_ids",
[this, &topic, &timeout] {
ExtendCurrentSpan();

return consumer_->GetPartitionIds(topic, timeout);
}
).Get();
}

void Consumer::Stop() noexcept {
if (processing_.exchange(false) && poll_task_.IsValid()) {
UINVARIANT(consumer_, "Stopping already stopped consumer");
Expand Down
71 changes: 71 additions & 0 deletions kafka/src/kafka/impl/consumer_impl.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#include <kafka/impl/consumer_impl.hpp>

#include <algorithm>
#include <chrono>

#include <fmt/format.h>
#include <fmt/ranges.h>

#include <userver/kafka/exceptions.hpp>
#include <userver/kafka/impl/configuration.hpp>
#include <userver/kafka/impl/stats.hpp>
#include <userver/logging/log.hpp>
Expand All @@ -13,6 +15,7 @@
#include <userver/utils/fast_scope_guard.hpp>
#include <userver/utils/span.hpp>

#include <kafka/impl/error_buffer.hpp>
#include <kafka/impl/holders_aliases.hpp>
#include <kafka/impl/log_level.hpp>

Expand Down Expand Up @@ -292,6 +295,74 @@ void ConsumerImpl::Commit() { rd_kafka_commit(consumer_.GetHandle(), nullptr, /*

void ConsumerImpl::AsyncCommit() { rd_kafka_commit(consumer_.GetHandle(), nullptr, /*async=*/1); }

OffsetRange ConsumerImpl::GetOffsetRange(
const std::string& topic,
std::uint32_t partition,
std::optional<std::chrono::milliseconds> timeout
) const {
int64_t low_offset{0};
fdr400 marked this conversation as resolved.
Show resolved Hide resolved
int64_t high_offset{0};

auto err = rd_kafka_query_watermark_offsets(
consumer_.GetHandle(),
topic.c_str(),
partition,
&low_offset,
&high_offset,
static_cast<int>(timeout.value_or(std::chrono::milliseconds(-1)).count())
);
fdr400 marked this conversation as resolved.
Show resolved Hide resolved
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
throw GetOffsetRangeException{fmt::format("Failed to get offsets: {}", rd_kafka_err2str(err))};
fdr400 marked this conversation as resolved.
Show resolved Hide resolved
}

if (low_offset == RD_KAFKA_OFFSET_INVALID || high_offset == RD_KAFKA_OFFSET_INVALID) {
throw GetOffsetRangeException{
fmt::format("Failed to get offsets: invalid offset for topic {} partition {}", topic, partition)
};
}

return {static_cast<std::uint32_t>(low_offset), static_cast<std::uint32_t>(high_offset)};
}

std::vector<std::uint32_t>
ConsumerImpl::GetPartitionIds(const std::string& topic, std::optional<std::chrono::milliseconds> timeout) const {
MetadataHolder metadata{[this, &topic, &timeout] {
const rd_kafka_metadata_t* raw_metadata{nullptr};
TopicHolder topic_holder{rd_kafka_topic_new(consumer_.GetHandle(), topic.c_str(), nullptr)};
auto err = rd_kafka_metadata(
consumer_.GetHandle(),
/*all_topics=*/0,
topic_holder.GetHandle(),
&raw_metadata,
static_cast<int>(timeout.value_or(std::chrono::milliseconds(-1)).count())
);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
throw GetMetadataException{fmt::format("Failed to fetch metadata: {}", rd_kafka_err2str(err))};
}
return raw_metadata;
}()};

utils::span<const rd_kafka_metadata_topic> topics{metadata->topics, static_cast<std::size_t>(metadata->topic_cnt)};
const auto* topic_it =
std::find_if(topics.begin(), topics.end(), [&topic](const rd_kafka_metadata_topic& topic_raw) {
return topic == topic_raw.topic;
});
if (topic_it == topics.end()) {
throw TopicNotFoundException{fmt::format("Failed to find topic: {}", topic)};
}

utils::span<const rd_kafka_metadata_partition> partitions{
topic_it->partitions, static_cast<std::size_t>(topic_it->partition_cnt)};
std::vector<std::uint32_t> partition_ids;
partition_ids.reserve(partitions.size());

for (const auto& partition : partitions) {
partition_ids.push_back(partition.id);
}

return partition_ids;
}

EventHolder ConsumerImpl::PollEvent() {
return EventHolder{rd_kafka_queue_poll(consumer_.GetQueue(), /*timeout_ms=*/0)};
}
Expand Down
12 changes: 12 additions & 0 deletions kafka/src/kafka/impl/consumer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <userver/engine/single_consumer_event.hpp>
#include <userver/kafka/impl/holders.hpp>
#include <userver/kafka/message.hpp>
#include <userver/kafka/offset_range.hpp>

#include <kafka/impl/holders_aliases.hpp>

Expand Down Expand Up @@ -37,6 +38,17 @@ class ConsumerImpl final {
/// @brief Schedules the commitment task.
void AsyncCommit();

/// @brief Retrieves the low and high offsets for the specified topic and partition.
OffsetRange GetOffsetRange(
const std::string& topic,
std::uint32_t partition,
std::optional<std::chrono::milliseconds> timeout = std::nullopt
) const;

/// @brief Retrieves the partition IDs for the specified topic.
std::vector<std::uint32_t>
GetPartitionIds(const std::string& topic, std::optional<std::chrono::milliseconds> timeout = std::nullopt) const;

/// @brief Effectively calls `PollMessage` until `deadline` is reached
/// and no more than `max_batch_size` messages polled.
MessageBatch PollBatch(std::size_t max_batch_size, engine::Deadline deadline);
Expand Down
2 changes: 2 additions & 0 deletions kafka/src/kafka/impl/holders_aliases.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ using ErrorHolder = HolderBase<rd_kafka_error_t, &rd_kafka_error_destroy>;
using EventHolder = HolderBase<rd_kafka_event_t, &rd_kafka_event_destroy>;
using QueueHolder = HolderBase<rd_kafka_queue_t, &rd_kafka_queue_destroy>;
using TopicPartitionsListHolder = HolderBase<rd_kafka_topic_partition_list_t, &rd_kafka_topic_partition_list_destroy>;
using MetadataHolder = HolderBase<const rd_kafka_metadata_t, &rd_kafka_metadata_destroy>;
using TopicHolder = HolderBase<rd_kafka_topic_t, &rd_kafka_topic_destroy>;

using ConsumerHolder = KafkaClientHolder<ClientType::kConsumer>;
using ProducerHolder = KafkaClientHolder<ClientType::kProducer>;
Expand Down
4 changes: 3 additions & 1 deletion kafka/utest/src/kafka/utest/kafka_fixture.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,11 @@ impl::Consumer KafkaCluster::MakeConsumer(
topics,
engine::current_task::GetTaskProcessor(),
engine::current_task::GetTaskProcessor(),
engine::current_task::GetTaskProcessor(),
configuration,
MakeSecrets(bootstrap_servers_),
std::move(params)};
std::move(params)
};
};

std::vector<Message> KafkaCluster::ReceiveMessages(
Expand Down
Loading