Skip to content

Commit

Permalink
feat kafka: add consumer GetPartitionIds and GetOffsetRange
Browse files Browse the repository at this point in the history
------------------------
Note: by creating a PR or an issue you automatically agree to the CLA. See [CONTRIBUTING.md](https://github.com/userver-framework/userver/blob/develop/CONTRIBUTING.md). Feel free to remove this note, the agreement holds.

Tests: протестировано CI

Pull Request resolved: #775
commit_hash:421e9456d2970f4f31b1a35cd7b3bb88d55d3899
  • Loading branch information
Володин Кирилл Сергеевич authored and fdr400 committed Dec 9, 2024
1 parent 8182be6 commit 7ee9044
Show file tree
Hide file tree
Showing 19 changed files with 350 additions and 20 deletions.
1 change: 1 addition & 0 deletions .mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -2146,6 +2146,7 @@
"kafka/include/userver/kafka/impl/holders.hpp":"taxi/uservices/userver/kafka/include/userver/kafka/impl/holders.hpp",
"kafka/include/userver/kafka/impl/stats.hpp":"taxi/uservices/userver/kafka/include/userver/kafka/impl/stats.hpp",
"kafka/include/userver/kafka/message.hpp":"taxi/uservices/userver/kafka/include/userver/kafka/message.hpp",
"kafka/include/userver/kafka/offset_range.hpp":"taxi/uservices/userver/kafka/include/userver/kafka/offset_range.hpp",
"kafka/include/userver/kafka/producer.hpp":"taxi/uservices/userver/kafka/include/userver/kafka/producer.hpp",
"kafka/include/userver/kafka/producer_component.hpp":"taxi/uservices/userver/kafka/include/userver/kafka/producer_component.hpp",
"kafka/library.yaml":"taxi/uservices/userver/kafka/library.yaml",
Expand Down
2 changes: 1 addition & 1 deletion kafka/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ userver_module(kafka
"TESTSUITE_KAFKA_SERVER_HOST=[::1]"
"TESTSUITE_KAFKA_SERVER_PORT=8099"
"TESTSUITE_KAFKA_CONTROLLER_PORT=8100"
"TESTSUITE_KAFKA_CUSTOM_TOPICS=lt-1:4,lt-2:4,tt-1:1,tt-2:1,tt-3:1,tt-4:1,tt-5:1,tt-6:1,tt-7:1,tt-8:1"
"TESTSUITE_KAFKA_CUSTOM_TOPICS=bt:4,lt-1:4,lt-2:4,tt-1:1,tt-2:1,tt-3:1,tt-4:1,tt-5:1,tt-6:1,tt-7:1,tt-8:1"
)

target_compile_options(${PROJECT_NAME} PRIVATE "-Wno-ignored-qualifiers")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ components_manager:
consumer-task-processor:
thread_name: consumer
worker_threads: 4
consumer-blocking-task-processor:
thread_name: consumer-blocking
worker_threads: 1
default_task_processor: main-task-processor

components:
Expand Down
3 changes: 3 additions & 0 deletions kafka/functional_tests/integrational_tests/static_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ components_manager:
consumer-task-processor:
thread_name: consumer
worker_threads: 4
consumer-blocking-task-processor:
thread_name: consumer-blocking
worker_threads: 1
producer-task-processor:
thread_name: producer
worker_threads: 4
Expand Down
23 changes: 23 additions & 0 deletions kafka/include/userver/kafka/consumer_scope.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <functional>

#include <userver/kafka/message.hpp>
#include <userver/kafka/offset_range.hpp>

USERVER_NAMESPACE_BEGIN

Expand Down Expand Up @@ -114,6 +115,28 @@ class ConsumerScope final {
/// has stopped and leaved the group
void AsyncCommit();

/// @brief Retrieves the lowest and highest offsets for the specified topic and partition.
/// @throws OffsetRangeException if offsets could not be retrieved
/// @warning This is a blocking call
/// @param topic The name of the topic.
/// @param partition The partition number of the topic.
/// @returns Lowest and highest offsets for the given topic and partition.
/// @see OffsetRange for more explanation
OffsetRange GetOffsetRange(
const std::string& topic,
std::uint32_t partition,
std::optional<std::chrono::milliseconds> timeout = std::nullopt
) const;

/// @brief Retrieves the partition IDs for the specified topic.
/// @throws GetMetadataException if failed to fetch metadata
/// @throws TopicNotFoundException if topic not found
/// @warning This is a blocking call
/// @param topic The name of the topic.
/// @returns A vector of partition IDs for the given topic.
std::vector<std::uint32_t>
GetPartitionIds(const std::string& topic, std::optional<std::chrono::milliseconds> timeout = std::nullopt) const;

private:
friend class impl::Consumer;

Expand Down
39 changes: 39 additions & 0 deletions kafka/include/userver/kafka/exceptions.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include <cstdint>
#include <stdexcept>
#include <string_view>

USERVER_NAMESPACE_BEGIN

Expand Down Expand Up @@ -68,6 +70,43 @@ class UnknownPartitionException final : public SendException {
UnknownPartitionException();
};

/// @brief Exception thrown when there is an error retrieving the offset range.
class OffsetRangeException : public std::runtime_error {
public:
using std::runtime_error::runtime_error;

public:
OffsetRangeException(std::string_view what, std::string_view topic, std::uint32_t partition);
};

class OffsetRangeTimeoutException final : public OffsetRangeException {
static constexpr const char* kWhat = "Timeout while fetching offsets.";

public:
OffsetRangeTimeoutException(std::string_view topic, std::uint32_t partition);
};

class TopicNotFoundException final : public std::runtime_error {
public:
using std::runtime_error::runtime_error;
};

/// @brief Exception thrown when fetching metadata.
class GetMetadataException : public std::runtime_error {
public:
using std::runtime_error::runtime_error;

public:
GetMetadataException(std::string_view what, std::string_view topic);
};

class GetMetadataTimeoutException final : public GetMetadataException {
static constexpr const char* kWhat = "Timeout while getting metadata.";

public:
GetMetadataTimeoutException(std::string_view topic);
};

} // namespace kafka

USERVER_NAMESPACE_END
21 changes: 19 additions & 2 deletions kafka/include/userver/kafka/impl/consumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ class Consumer final {
///
/// @param topics stands for topics list that consumer subscribes to
/// after ConsumerScope::Start called
/// @param consumer_task_processor -- task processor for message batches
/// polling
/// @param consumer_task_processor -- task processor for message batches polling
/// @param consumer_blocking_task_processor -- task processor for consumer blocking operations
/// All callbacks are invoked in `main_task_processor`
Consumer(
const std::string& name,
const std::vector<std::string>& topics,
engine::TaskProcessor& consumer_task_processor,
engine::TaskProcessor& consumer_blocking_task_processor,
engine::TaskProcessor& main_task_processor,
const ConsumerConfiguration& consumer_configuration,
const Secret& secrets,
Expand All @@ -81,6 +82,21 @@ class Consumer final {
/// @see impl/stats.hpp
void DumpMetric(utils::statistics::Writer& writer) const;

/// @cond
/// @brief Retrieves the low and high offsets for the specified topic and partition.
/// @see ConsumerScope::GetOffsetRange for better commitment process
OffsetRange GetOffsetRange(
const std::string& topic,
std::uint32_t partition,
std::optional<std::chrono::milliseconds> timeout = std::nullopt
) const;

/// @brief Retrieves the partition IDs for the specified topic.
/// @see ConsumerScope::GetPartitionIds for better commitment process
std::vector<std::uint32_t>
GetPartitionIds(const std::string& topic, std::optional<std::chrono::milliseconds> timeout = std::nullopt) const;
/// @endcond

private:
friend class kafka::ConsumerScope;

Expand Down Expand Up @@ -111,6 +127,7 @@ class Consumer final {
const ConsumerExecutionParams execution_params;

engine::TaskProcessor& consumer_task_processor_;
engine::TaskProcessor& consumer_blocking_task_processor_;
engine::TaskProcessor& main_task_processor_;

ConfHolder conf_;
Expand Down
22 changes: 22 additions & 0 deletions kafka/include/userver/kafka/offset_range.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#pragma once

#include <cstdint>

USERVER_NAMESPACE_BEGIN

namespace kafka {

/// @brief Represents the range of offsets for certain topic.
struct OffsetRange final {
/// @brief The low watermark offset. It indicates the earliest available offset in Kafka.
/// @note low offset is guaranteed to be commited
std::uint32_t low{};

/// @brief The high watermark offset. It indicates the next offset that will be written in Kafka.
/// @note high offset is not required to be commited yet
std::uint32_t high{};
};

} // namespace kafka

USERVER_NAMESPACE_END
1 change: 1 addition & 0 deletions kafka/src/kafka/consumer_component.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ ConsumerComponent::ConsumerComponent(
config.Name(),
config["topics"].As<std::vector<std::string>>(),
context.GetTaskProcessor("consumer-task-processor"),
context.GetTaskProcessor("consumer-blocking-task-processor"),
context.GetTaskProcessor("main-task-processor"),
config.As<impl::ConsumerConfiguration>(),
context.FindComponent<components::Secdist>().Get().Get<impl::BrokerSecrets>().GetSecretByComponentName(
Expand Down
13 changes: 13 additions & 0 deletions kafka/src/kafka/consumer_scope.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@ void ConsumerScope::Stop() noexcept { consumer_.Stop(); }

void ConsumerScope::AsyncCommit() { consumer_.AsyncCommit(); }

OffsetRange ConsumerScope::GetOffsetRange(
const std::string& topic,
std::uint32_t partition,
std::optional<std::chrono::milliseconds> timeout
) const {
return consumer_.GetOffsetRange(topic, partition, timeout);
}

std::vector<std::uint32_t>
ConsumerScope::GetPartitionIds(const std::string& topic, std::optional<std::chrono::milliseconds> timeout) const {
return consumer_.GetPartitionIds(topic, timeout);
}

} // namespace kafka

USERVER_NAMESPACE_END
13 changes: 13 additions & 0 deletions kafka/src/kafka/exceptions.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include <userver/kafka/exceptions.hpp>

#include <fmt/format.h>

USERVER_NAMESPACE_BEGIN

namespace kafka {
Expand All @@ -19,6 +21,17 @@ UnknownTopicException::UnknownTopicException() : SendException(kWhat) {}

UnknownPartitionException::UnknownPartitionException() : SendException(kWhat) {}

OffsetRangeException::OffsetRangeException(std::string_view what, std::string_view topic, std::uint32_t partition)
: std::runtime_error(fmt::format("{} topic: '{}', partition: {}", what, topic, partition)) {}

OffsetRangeTimeoutException::OffsetRangeTimeoutException(std::string_view topic, std::uint32_t partition)
: OffsetRangeException(kWhat, topic, partition) {}

GetMetadataException::GetMetadataException(std::string_view what, std::string_view topic)
: std::runtime_error(fmt::format("{} topic: '{}'", what, topic)) {}

GetMetadataTimeoutException::GetMetadataTimeoutException(std::string_view topic) : GetMetadataException(kWhat, topic) {}

} // namespace kafka

USERVER_NAMESPACE_END
41 changes: 37 additions & 4 deletions kafka/src/kafka/impl/consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ Consumer::Consumer(
const std::string& name,
const std::vector<std::string>& topics,
engine::TaskProcessor& consumer_task_processor,
engine::TaskProcessor& consumer_blocking_task_processor,
engine::TaskProcessor& main_task_processor,
const ConsumerConfiguration& configuration,
const Secret& secrets,
Expand All @@ -62,8 +63,10 @@ Consumer::Consumer(
topics_(topics),
execution_params(params),
consumer_task_processor_(consumer_task_processor),
consumer_blocking_task_processor_(consumer_blocking_task_processor),
main_task_processor_(main_task_processor),
conf_(Configuration{name, configuration, secrets}.Release()) {
conf_(Configuration{name, configuration, secrets}.Release()),
consumer_(std::make_unique<ConsumerImpl>(name_, conf_, topics_, stats_)) {
/// To check configuration validity
[[maybe_unused]] auto _ = ConsumerHolder{conf_};
}
Expand All @@ -87,7 +90,11 @@ void Consumer::DumpMetric(utils::statistics::Writer& writer) const {
}

void Consumer::RunConsuming(ConsumerScope::Callback callback) {
// note: Consumer must be recreated after each stop,
// because stop invalidates some internal consumer state (in librdkafka).
// Nevertheless, it is possible to use blocking consumer methods after stop.
consumer_ = std::make_unique<ConsumerImpl>(name_, conf_, topics_, stats_);
consumer_->StartConsuming();

LOG_INFO() << fmt::format("Started messages polling");

Expand Down Expand Up @@ -166,10 +173,37 @@ void Consumer::AsyncCommit() {
}).Get();
}

OffsetRange Consumer::GetOffsetRange(
const std::string& topic,
std::uint32_t partition,
std::optional<std::chrono::milliseconds> timeout
) const {
return utils::Async(
consumer_blocking_task_processor_,
"consumer_getting_offset",
[this, &topic, partition, &timeout] {
ExtendCurrentSpan();

return consumer_->GetOffsetRange(topic, partition, timeout);
}
).Get();
}

std::vector<std::uint32_t>
Consumer::GetPartitionIds(const std::string& topic, std::optional<std::chrono::milliseconds> timeout) const {
return utils::Async(
consumer_blocking_task_processor_,
"consumer_getting_partition_ids",
[this, &topic, &timeout] {
ExtendCurrentSpan();

return consumer_->GetPartitionIds(topic, timeout);
}
).Get();
}

void Consumer::Stop() noexcept {
if (processing_.exchange(false) && poll_task_.IsValid()) {
UINVARIANT(consumer_, "Stopping already stopped consumer");

LOG_INFO() << "Stopping consumer poll task";
poll_task_.SyncCancel();

Expand All @@ -178,7 +212,6 @@ void Consumer::Stop() noexcept {
// 1. This is blocking.
// 2. This calls testpoints
consumer_->StopConsuming();
consumer_.reset();
}).Get();
TESTPOINT(fmt::format("tp_{}_stopped", name_), {});
LOG_INFO() << "Consumer stopped";
Expand Down
Loading

0 comments on commit 7ee9044

Please sign in to comment.