Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GetMinMaxOffset and GetPartitionsId #775

Closed
wants to merge 18 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ components_manager:
consumer-task-processor:
thread_name: consumer
worker_threads: 4
consumer-blocking-task-processor:
thread_name: consumer-blocking
worker_threads: 1
default_task_processor: main-task-processor

components:
Expand Down
3 changes: 3 additions & 0 deletions kafka/functional_tests/integrational_tests/static_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ components_manager:
consumer-task-processor:
thread_name: consumer
worker_threads: 4
consumer-blocking-task-processor:
thread_name: consumer-blocking
worker_threads: 1
producer-task-processor:
thread_name: producer
worker_threads: 4
Expand Down
22 changes: 22 additions & 0 deletions kafka/include/userver/kafka/consumer_scope.hpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#pragma once

#include <atomic>
#include <functional>

#include <userver/kafka/message.hpp>
#include <userver/kafka/offset_range.hpp>

USERVER_NAMESPACE_BEGIN

Expand Down Expand Up @@ -114,6 +116,26 @@ class ConsumerScope final {
/// has stopped and leaved the group
void AsyncCommit();

/// @brief Retrieves the minimum and maximum offsets for the specified topic and partition.
/// @throws GetOffsetRangeException if the offsets could not be retrieve or if the returned offsets are invalid
/// @warning This is a blocking call
/// @param topic The name of the topic.
/// @param partition The partition number of the topic.
/// @returns A struct with minimum and maximum offsets for the given topic and partition.
OffsetRange GetOffsetRange(
const std::string& topic,
std::uint32_t partition,
std::optional<std::chrono::milliseconds> timeout = std::nullopt
) const;

/// @brief Retrieves the partition IDs for the specified topic.
/// @throws GetMetadataException if failed to fetch metadata and TopicNotFoundException if topic not found
/// @warning This is a blocking call
/// @param topic The name of the topic.
/// @returns A vector of partition IDs for the given topic.
std::vector<std::uint32_t>
GetPartitionIds(const std::string& topic, std::optional<std::chrono::milliseconds> timeout = std::nullopt) const;

private:
friend class impl::Consumer;

Expand Down
39 changes: 39 additions & 0 deletions kafka/include/userver/kafka/exceptions.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include <cstdint>
#include <stdexcept>
#include <string_view>

USERVER_NAMESPACE_BEGIN

Expand Down Expand Up @@ -68,6 +70,43 @@ class UnknownPartitionException final : public SendException {
UnknownPartitionException();
};

/// @brief Exception thrown when there is an error retrieving the offset range.
class OffsetRangeException : public std::runtime_error {
public:
using std::runtime_error::runtime_error;

public:
OffsetRangeException(std::string_view what, std::string_view topic, std::uint32_t partition);
};

class OffsetRangeTimeoutException final : public OffsetRangeException {
static constexpr const char* kWhat = "Timeout while fetching offsets.";

public:
OffsetRangeTimeoutException(std::string_view topic, std::uint32_t partition);
};

class TopicNotFoundException final : public std::runtime_error {
public:
using std::runtime_error::runtime_error;
};

/// @brief Exception thrown when fetching metadata.
class GetMetadataException final : public std::runtime_error {
public:
using std::runtime_error::runtime_error;

public:
GetMetadataException(std::string_view what, std::string_view topic);
};

class GetMetadataTimeoutException final : public std::runtime_error {
static constexpr const char* kWhat = "Timeout while getting metadata.";

public:
GetMetadataTimeoutException(std::string_view topic);
};

} // namespace kafka

USERVER_NAMESPACE_END
16 changes: 16 additions & 0 deletions kafka/include/userver/kafka/impl/consumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@ class Consumer final {
/// @param topics stands for topics list that consumer subscribes to
/// after ConsumerScope::Start called
/// @param consumer_task_processor -- task processor for message batches
/// @param consumer_blocking_task_processor -- task processor for consumer blocking operation
/// polling
/// All callbacks are invoked in `main_task_processor`
Consumer(
const std::string& name,
const std::vector<std::string>& topics,
engine::TaskProcessor& consumer_task_processor,
engine::TaskProcessor& consumer_blocking_task_processor,
engine::TaskProcessor& main_task_processor,
const ConsumerConfiguration& consumer_configuration,
const Secret& secrets,
Expand Down Expand Up @@ -96,6 +98,19 @@ class Consumer final {
/// understanding
void AsyncCommit();

/// @brief Retrieves the low and high offsets for the specified topic and partition.
/// @see ConsumerScope::GetOffsetRange for better commitment process
OffsetRange GetOffsetRange(
const std::string& topic,
std::uint32_t partition,
std::optional<std::chrono::milliseconds> timeout = std::nullopt
) const;

/// @brief Retrieves the partition IDs for the specified topic.
/// @see ConsumerScope::GetPartitionIds for better commitment process
std::vector<std::uint32_t>
GetPartitionIds(const std::string& topic, std::optional<std::chrono::milliseconds> timeout = std::nullopt) const;

/// @brief Adds consumer name to current span.
void ExtendCurrentSpan() const;

Expand All @@ -111,6 +126,7 @@ class Consumer final {
const ConsumerExecutionParams execution_params;

engine::TaskProcessor& consumer_task_processor_;
engine::TaskProcessor& consumer_operation_task_processor_;
engine::TaskProcessor& main_task_processor_;

ConfHolder conf_;
Expand Down
20 changes: 20 additions & 0 deletions kafka/include/userver/kafka/offset_range.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#pragma once

#include <cstdint>

USERVER_NAMESPACE_BEGIN

namespace kafka {

/// @brief Represents the range of offsets.
struct OffsetRange final {
fdr400 marked this conversation as resolved.
Show resolved Hide resolved
///@brief The low watermark offset. It indicates the earliest available offset in Kafka.
std::uint32_t low{};

///@brief The high watermark offset. It indicates the next offset that will be written in Kafka.
std::uint32_t high{};
};

} // namespace kafka

USERVER_NAMESPACE_END
1 change: 1 addition & 0 deletions kafka/src/kafka/consumer_component.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ ConsumerComponent::ConsumerComponent(
config.Name(),
config["topics"].As<std::vector<std::string>>(),
context.GetTaskProcessor("consumer-task-processor"),
context.GetTaskProcessor("consumer-blocking-task-processor"),
context.GetTaskProcessor("main-task-processor"),
config.As<impl::ConsumerConfiguration>(),
context.FindComponent<components::Secdist>().Get().Get<impl::BrokerSecrets>().GetSecretByComponentName(
Expand Down
13 changes: 13 additions & 0 deletions kafka/src/kafka/consumer_scope.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@ void ConsumerScope::Stop() noexcept { consumer_.Stop(); }

void ConsumerScope::AsyncCommit() { consumer_.AsyncCommit(); }

OffsetRange ConsumerScope::GetOffsetRange(
const std::string& topic,
std::uint32_t partition,
std::optional<std::chrono::milliseconds> timeout
) const {
return consumer_.GetOffsetRange(topic, partition, timeout);
}

std::vector<std::uint32_t>
ConsumerScope::GetPartitionIds(const std::string& topic, std::optional<std::chrono::milliseconds> timeout) const {
return consumer_.GetPartitionIds(topic, timeout);
}

} // namespace kafka

USERVER_NAMESPACE_END
14 changes: 14 additions & 0 deletions kafka/src/kafka/exceptions.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include <userver/kafka/exceptions.hpp>

#include <fmt/format.h>

USERVER_NAMESPACE_BEGIN

namespace kafka {
Expand All @@ -19,6 +21,18 @@ UnknownTopicException::UnknownTopicException() : SendException(kWhat) {}

UnknownPartitionException::UnknownPartitionException() : SendException(kWhat) {}

OffsetRangeException::OffsetRangeException(std::string_view what, std::string_view topic, std::uint32_t partition)
: std::runtime_error(fmt::format("{} topic: '{}', partition: {}", what, topic, partition)) {}

OffsetRangeTimeoutException::OffsetRangeTimeoutException(std::string_view topic, std::uint32_t partition)
: OffsetRangeException(kWhat, topic, partition) {}

GetMetadataException::GetMetadataException(std::string_view what, std::string_view topic)
: std::runtime_error(fmt::format("{} topic: '{}'", what, topic)) {}

GetMetadataTimeoutException::GetMetadataTimeoutException(std::string_view topic)
: std::runtime_error(fmt::format("{} topic: '{}'", kWhat, topic)) {}

} // namespace kafka

USERVER_NAMESPACE_END
38 changes: 34 additions & 4 deletions kafka/src/kafka/impl/consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ Consumer::Consumer(
const std::string& name,
const std::vector<std::string>& topics,
engine::TaskProcessor& consumer_task_processor,
engine::TaskProcessor& consumer_blocking_task_processor,
engine::TaskProcessor& main_task_processor,
const ConsumerConfiguration& configuration,
const Secret& secrets,
Expand All @@ -62,8 +63,10 @@ Consumer::Consumer(
topics_(topics),
execution_params(params),
consumer_task_processor_(consumer_task_processor),
consumer_operation_task_processor_(consumer_blocking_task_processor),
main_task_processor_(main_task_processor),
conf_(Configuration{name, configuration, secrets}.Release()) {
conf_(Configuration{name, configuration, secrets}.Release()),
consumer_(std::make_unique<ConsumerImpl>(name_, conf_, topics_, stats_)) {
/// To check configuration validity
[[maybe_unused]] auto _ = ConsumerHolder{conf_};
}
Expand All @@ -88,6 +91,7 @@ void Consumer::DumpMetric(utils::statistics::Writer& writer) const {

void Consumer::RunConsuming(ConsumerScope::Callback callback) {
consumer_ = std::make_unique<ConsumerImpl>(name_, conf_, topics_, stats_);
consumer_->StartConsuming();

LOG_INFO() << fmt::format("Started messages polling");

Expand Down Expand Up @@ -166,10 +170,37 @@ void Consumer::AsyncCommit() {
}).Get();
}

OffsetRange Consumer::GetOffsetRange(
const std::string& topic,
std::uint32_t partition,
std::optional<std::chrono::milliseconds> timeout
) const {
return utils::Async(
consumer_operation_task_processor_,
"consumer_getting_offset",
[this, &topic, partition, &timeout] {
ExtendCurrentSpan();

return consumer_->GetOffsetRange(topic, partition, timeout);
}
).Get();
}

std::vector<std::uint32_t>
Consumer::GetPartitionIds(const std::string& topic, std::optional<std::chrono::milliseconds> timeout) const {
return utils::Async(
consumer_operation_task_processor_,
"consumer_getting_partition_ids",
[this, &topic, &timeout] {
ExtendCurrentSpan();

return consumer_->GetPartitionIds(topic, timeout);
}
).Get();
}

void Consumer::Stop() noexcept {
if (processing_.exchange(false) && poll_task_.IsValid()) {
UINVARIANT(consumer_, "Stopping already stopped consumer");

LOG_INFO() << "Stopping consumer poll task";
poll_task_.SyncCancel();

Expand All @@ -178,7 +209,6 @@ void Consumer::Stop() noexcept {
// 1. This is blocking.
// 2. This calls testpoints
consumer_->StopConsuming();
consumer_.reset();
}).Get();
TESTPOINT(fmt::format("tp_{}_stopped", name_), {});
LOG_INFO() << "Consumer stopped";
Expand Down
Loading
Loading