diff --git a/src/v/compat/raft_generator.h b/src/v/compat/raft_generator.h index 8d0ca63c96069..271502746cb74 100644 --- a/src/v/compat/raft_generator.h +++ b/src/v/compat/raft_generator.h @@ -347,7 +347,7 @@ struct instance_generator { {raft::reply_result::success, raft::reply_result::failure, raft::reply_result::group_unavailable, - raft::reply_result::timeout}), + raft::reply_result::follower_busy}), .may_recover = tests::random_bool(), }; } diff --git a/src/v/compat/raft_json.h b/src/v/compat/raft_json.h index 2c6d2a96a45fc..49ea00d363d7a 100644 --- a/src/v/compat/raft_json.h +++ b/src/v/compat/raft_json.h @@ -106,7 +106,7 @@ inline void read_value(const json::Value& rd, raft::append_entries_reply& out) { obj.result = raft::reply_result::group_unavailable; break; case 3: - obj.result = raft::reply_result::timeout; + obj.result = raft::reply_result::follower_busy; break; default: vassert(false, "invalid result {}", result); diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 0c59f32976950..9700b4afce3f4 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -338,6 +338,13 @@ consensus::success_reply consensus::update_follower_index( // current node may change it. return success_reply::yes; } + + if (unlikely(r.value().result == reply_result::follower_busy)) { + // ignore this response, timed out on the receiver node + vlog(_ctxlog.trace, "Follower busy on node {}", node.id()); + return success_reply::no; + } + const auto& config = _configuration_manager.get_latest(); if (!config.contains(node)) { // We might have sent an append_entries just before removing @@ -373,12 +380,6 @@ consensus::success_reply consensus::update_follower_index( physical_node); return success_reply::no; } - - if (unlikely(reply.result == reply_result::timeout)) { - // ignore this response, timed out on the receiver node - vlog(_ctxlog.trace, "Append entries request timedout at node {}", node); - return success_reply::no; - } if (unlikely(reply.result == reply_result::group_unavailable)) { // ignore this response since group is not yet bootstrapped at the // follower diff --git a/src/v/raft/fundamental.h b/src/v/raft/fundamental.h index 3364ba168f38a..fb3c5adc6be67 100644 --- a/src/v/raft/fundamental.h +++ b/src/v/raft/fundamental.h @@ -32,7 +32,7 @@ enum class reply_result : uint8_t { success, failure, group_unavailable, - timeout + follower_busy }; /** diff --git a/src/v/raft/heartbeat_manager.cc b/src/v/raft/heartbeat_manager.cc index 78d1478988d2f..e67ba131c4d5d 100644 --- a/src/v/raft/heartbeat_manager.cc +++ b/src/v/raft/heartbeat_manager.cc @@ -373,14 +373,18 @@ void heartbeat_manager::process_reply( n); return; } - - if (unlikely(result == reply_result::timeout)) { + /** + * This is here for completeness, it should never be triggered as the + * follower do not reply with busy error code when processing + * lightweight heartbeats + */ + if (unlikely(result == reply_result::follower_busy)) { vlog( - hbeatlog.debug, - "Heartbeat request for group {} timed out on the node {}", + hbeatlog.error, + "Follower reported busy for group {} on node {} when processing " + "lightweight heartbeat", group, n); - return; } if (unlikely(target != consensus->self().id())) { vlog( @@ -438,14 +442,17 @@ void heartbeat_manager::process_reply( n); continue; } - - if (unlikely(m.result == reply_result::timeout)) { + /** + * Follower being busy is updating the last received reply timestamp as + * it is indicating the receiving replica is alive and is able to + * process request, it may simply be slow and its oplock is contended. + */ + if (unlikely(m.result == reply_result::follower_busy)) { vlog( - hbeatlog.debug, - "Heartbeat request for group {} timed out on the node {}", + hbeatlog.trace, + "Follower busy when processing full heartbeat for group {} on {}", m.group, n); - continue; } if (unlikely(reply.target() != consensus->self().id())) { @@ -555,7 +562,7 @@ void heartbeat_manager::process_reply( continue; } - if (unlikely(m.result == reply_result::timeout)) { + if (unlikely(m.result == reply_result::follower_busy)) { vlog( hbeatlog.debug, "Heartbeat request for group {} timed out on the node {}", diff --git a/src/v/raft/service.h b/src/v/raft/service.h index 0bb0bc4fc4ffc..9a92076564f50 100644 --- a/src/v/raft/service.h +++ b/src/v/raft/service.h @@ -411,7 +411,7 @@ class service final : public raftgen_service { return ss::with_timeout(timeout, std::move(f)) .handle_exception_type([group](const ss::timed_out_error&) { return append_entries_reply{ - .group = group, .result = reply_result::timeout}; + .group = group, .result = reply_result::follower_busy}; }); }); @@ -447,11 +447,11 @@ class service final : public raftgen_service { auto f = dispatch_full_heartbeat( source_node, target_node, m, full_hb); f = ss::with_timeout(timeout, std::move(f)) - .handle_exception_type( - [group = full_hb.group](const ss::timed_out_error&) { - return full_heartbeat_reply{ - .group = group, .result = reply_result::timeout}; - }); + .handle_exception_type([group = full_hb.group]( + const ss::timed_out_error&) { + return full_heartbeat_reply{ + .group = group, .result = reply_result::follower_busy}; + }); futures.push_back(std::move(f)); } diff --git a/src/v/raft/tests/BUILD b/src/v/raft/tests/BUILD index f1d3502f1b64c..58c42f0d33674 100644 --- a/src/v/raft/tests/BUILD +++ b/src/v/raft/tests/BUILD @@ -72,6 +72,22 @@ redpanda_test_cc_library( ], ) +redpanda_test_cc_library( + name = "failure_injectable_log", + srcs = ["failure_injectable_log.cc"], + hdrs = [ + "failure_injectable_log.h", + ], + include_prefix = "raft/tests", + visibility = ["//visibility:public"], + deps = [ + "//src/v/base", + "//src/v/storage", + "//src/v/test_utils:gtest", + "@seastar", + ], +) + redpanda_test_cc_library( name = "raft_fixture", srcs = ["raft_fixture.cc"], @@ -81,6 +97,7 @@ redpanda_test_cc_library( include_prefix = "raft/tests", visibility = ["//visibility:public"], deps = [ + ":failure_injectable_log", "//src/v/base", "//src/v/bytes:iobuf", "//src/v/bytes:iobuf_parser", diff --git a/src/v/raft/tests/CMakeLists.txt b/src/v/raft/tests/CMakeLists.txt index 081fcbb9d4846..d7818567d24b8 100644 --- a/src/v/raft/tests/CMakeLists.txt +++ b/src/v/raft/tests/CMakeLists.txt @@ -35,8 +35,14 @@ rp_test( ARGS "-- -c 8" ) + +v_cc_library( + NAME raft_fixture + SRCS raft_fixture.cc + failure_injectable_log.cc + DEPS Seastar::seastar v::raft v::gtest_main) + set(gsrcs - raft_fixture.cc basic_raft_fixture_test.cc stm_manager_test.cc raft_reconfiguration_test.cc @@ -53,7 +59,7 @@ rp_test( TIMEOUT 2000 BINARY_NAME gtest_raft SOURCES ${gsrcs} - LIBRARIES v::raft v::storage_test_utils v::model_test_utils v::features v::gtest_main + LIBRARIES v::raft v::storage_test_utils v::model_test_utils v::features v::gtest_main v::raft_fixture LABELS raft ARGS "-- -c 8 -m 4G" ) @@ -68,7 +74,3 @@ rp_test( LABELS raft ) -v_cc_library( - NAME raft_fixture - SRCS raft_fixture.cc - DEPS Seastar::seastar v::raft v::gtest_main) diff --git a/src/v/raft/tests/basic_raft_fixture_test.cc b/src/v/raft/tests/basic_raft_fixture_test.cc index 2050b11ce0687..f26468a55176a 100644 --- a/src/v/raft/tests/basic_raft_fixture_test.cc +++ b/src/v/raft/tests/basic_raft_fixture_test.cc @@ -836,3 +836,29 @@ TEST_F_CORO(raft_fixture, leadership_transfer_delay) { ASSERT_LE_CORO(election_time * 1.0, transfer_time * tolerance_multiplier); ASSERT_GE_CORO(election_time * 1.0, transfer_time / tolerance_multiplier); } + +TEST_F_CORO(raft_fixture, test_no_stepdown_on_append_entries_timeout) { + config::shard_local_cfg().replicate_append_timeout_ms.set_value(1s); + co_await create_simple_group(3); + auto leader_id = co_await wait_for_leader(10s); + for (auto& [id, n] : nodes()) { + if (id != leader_id) { + n->f_injectable_log()->set_append_delay([]() { return 5s; }); + } + } + + auto& leader_node = node(leader_id); + auto term_before = leader_node.raft()->term(); + auto r = co_await leader_node.raft()->replicate( + make_batches(1, 10, 128), + replicate_options(consistency_level::quorum_ack, 10s)); + ASSERT_FALSE_CORO(r.has_error()); + for (auto& [_, n] : nodes()) { + n->f_injectable_log()->set_append_delay(std::nullopt); + } + + leader_id = co_await wait_for_leader(10s); + auto& new_leader_node = node(leader_id); + ASSERT_EQ_CORO(term_before, new_leader_node.raft()->term()); + ASSERT_TRUE_CORO(new_leader_node.raft()->is_leader()); +} diff --git a/src/v/raft/tests/failure_injectable_log.cc b/src/v/raft/tests/failure_injectable_log.cc new file mode 100644 index 0000000000000..d55e12afb9a2d --- /dev/null +++ b/src/v/raft/tests/failure_injectable_log.cc @@ -0,0 +1,243 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#include "raft/tests/failure_injectable_log.h" +namespace raft { + +failure_injectable_log::failure_injectable_log( + ss::shared_ptr underlying_log) noexcept + : storage::log(underlying_log->config().copy()) + , _underlying_log(std::move(underlying_log)) {} + +ss::future<> failure_injectable_log::start( + std::optional cfg) { + return _underlying_log->start(cfg); +} + +ss::future<> +failure_injectable_log::housekeeping(storage::housekeeping_config cfg) { + return _underlying_log->housekeeping(cfg); +} + +ss::future<> failure_injectable_log::truncate(storage::truncate_config cfg) { + return _underlying_log->truncate(cfg); +} + +ss::future<> +failure_injectable_log::truncate_prefix(storage::truncate_prefix_config tpc) { + return _underlying_log->truncate_prefix(tpc); +} +ss::future<> failure_injectable_log::gc(storage::gc_config cfg) { + return _underlying_log->gc(cfg); +} + +ss::future<> failure_injectable_log::apply_segment_ms() { + return _underlying_log->apply_segment_ms(); +}; + +ss::future +failure_injectable_log::make_reader(storage::log_reader_config cfg) { + return _underlying_log->make_reader(cfg); +} +namespace { +struct delay_introducing_appender : public storage::log_appender::impl { + delay_introducing_appender( + storage::log_appender underlying, append_delay_generator generator) + : _underlying(std::move(underlying)) + , _append_delay_generator(std::move(generator)) {} + + /// non-owning reference - do not steal the iobuf + ss::future operator()(model::record_batch& b) final { + co_await ss::sleep(_append_delay_generator()); + co_return co_await _underlying(b); + } + + ss::future end_of_stream() final { + return _underlying.end_of_stream(); + } + storage::log_appender _underlying; + append_delay_generator _append_delay_generator; +}; +} // namespace + +storage::log_appender +failure_injectable_log::make_appender(storage::log_append_config cfg) { + if (_append_delay_generator) { + return storage::log_appender( + std::make_unique( + _underlying_log->make_appender(cfg), *_append_delay_generator)); + } + + return _underlying_log->make_appender(cfg); +} + +ss::future> failure_injectable_log::close() { + return _underlying_log->close(); +} + +ss::future<> failure_injectable_log::remove() { + return _underlying_log->remove(); +} + +ss::future<> failure_injectable_log::flush() { + return _underlying_log->flush(); +} + +ss::future> +failure_injectable_log::timequery(storage::timequery_config cfg) { + return _underlying_log->timequery(cfg); +} + +ss::lw_shared_ptr +failure_injectable_log::get_offset_translator_state() const { + return _underlying_log->get_offset_translator_state(); +} + +model::offset_delta +failure_injectable_log::offset_delta(model::offset o) const { + return _underlying_log->offset_delta(o); +} + +model::offset failure_injectable_log::from_log_offset(model::offset o) const { + return _underlying_log->from_log_offset(o); +} + +model::offset failure_injectable_log::to_log_offset(model::offset o) const { + return _underlying_log->to_log_offset(o); +} + +bool failure_injectable_log::is_new_log() const { + return _underlying_log->is_new_log(); +} + +size_t failure_injectable_log::segment_count() const { + return _underlying_log->segment_count(); +} + +storage::offset_stats failure_injectable_log::offsets() const { + return _underlying_log->offsets(); +} + +size_t failure_injectable_log::get_log_truncation_counter() const noexcept { + return _underlying_log->get_log_truncation_counter(); +} + +model::offset failure_injectable_log::find_last_term_start_offset() const { + return _underlying_log->find_last_term_start_offset(); +} + +model::timestamp failure_injectable_log::start_timestamp() const { + return _underlying_log->start_timestamp(); +} + +std::ostream& +failure_injectable_log::failure_injectable_log::print(std::ostream& o) const { + return _underlying_log->print(o); +} + +std::optional +failure_injectable_log::get_term(model::offset o) const { + return _underlying_log->get_term(o); +} + +std::optional +failure_injectable_log::get_term_last_offset(model::term_id t) const { + return _underlying_log->get_term_last_offset(t); +} + +std::optional +failure_injectable_log::index_lower_bound(model::offset o) const { + return _underlying_log->index_lower_bound(o); +} + +ss::future +failure_injectable_log::monitor_eviction(ss::abort_source& as) { + return _underlying_log->monitor_eviction(as); +} + +size_t failure_injectable_log::size_bytes() const { + return _underlying_log->size_bytes(); +} + +uint64_t +failure_injectable_log::size_bytes_after_offset(model::offset o) const { + return _underlying_log->size_bytes_after_offset(o); +} + +ss::future> +failure_injectable_log::offset_range_size( + model::offset first, model::offset last, ss::io_priority_class io_priority) { + return _underlying_log->offset_range_size(first, last, io_priority); +} + +ss::future> +failure_injectable_log::offset_range_size( + model::offset first, + offset_range_size_requirements_t target, + ss::io_priority_class io_priority) { + return _underlying_log->offset_range_size(first, target, io_priority); +} + +bool failure_injectable_log::is_compacted( + model::offset first, model::offset last) const { + return _underlying_log->is_compacted(first, last); +} + +void failure_injectable_log::set_overrides( + storage::ntp_config::default_overrides overrides) { + mutable_config().set_overrides(overrides); + return _underlying_log->set_overrides(overrides); +} + +bool failure_injectable_log::notify_compaction_update() { + return _underlying_log->notify_compaction_update(); +} + +int64_t failure_injectable_log::compaction_backlog() const { + return _underlying_log->compaction_backlog(); +} + +ss::future +failure_injectable_log::disk_usage(storage::gc_config cfg) { + return _underlying_log->disk_usage(cfg); +} + +ss::future +failure_injectable_log::get_reclaimable_offsets(storage::gc_config cfg) { + return _underlying_log->get_reclaimable_offsets(cfg); +} + +void failure_injectable_log::set_cloud_gc_offset(model::offset o) { + return _underlying_log->set_cloud_gc_offset(o); +} + +const storage::segment_set& failure_injectable_log::segments() const { + return _underlying_log->segments(); +} +storage::segment_set& failure_injectable_log::segments() { + return _underlying_log->segments(); +} + +ss::future<> failure_injectable_log::force_roll(ss::io_priority_class iop) { + return _underlying_log->force_roll(iop); +} + +storage::probe& failure_injectable_log::get_probe() { + return _underlying_log->get_probe(); +} + +size_t failure_injectable_log::reclaimable_size_bytes() const { + return _underlying_log->reclaimable_size_bytes(); +} + +std::optional +failure_injectable_log::retention_offset(storage::gc_config cfg) const { + return _underlying_log->retention_offset(cfg); +} + +} // namespace raft diff --git a/src/v/raft/tests/failure_injectable_log.h b/src/v/raft/tests/failure_injectable_log.h new file mode 100644 index 0000000000000..b8d1b14fc5738 --- /dev/null +++ b/src/v/raft/tests/failure_injectable_log.h @@ -0,0 +1,142 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once +#include "storage/log.h" +#include "storage/offset_translator_state.h" + +namespace raft { + +using append_delay_generator = std::function; + +class failure_injectable_log final : public storage::log { +public: + // sets the generator for the delay to be injected when appending a record + // batch to the log + void set_append_delay(std::optional generator) { + _append_delay_generator = std::move(generator); + } + +public: + explicit failure_injectable_log( + ss::shared_ptr underlying_log) noexcept; + + failure_injectable_log(failure_injectable_log&&) noexcept = delete; + failure_injectable_log& operator=(failure_injectable_log&&) noexcept + = delete; + failure_injectable_log(const failure_injectable_log&) = delete; + failure_injectable_log& operator=(const failure_injectable_log&) = delete; + ~failure_injectable_log() noexcept final = default; + + ss::future<> + start(std::optional cfg) final; + ss::future<> housekeeping(storage::housekeeping_config cfg) final; + + ss::future<> truncate(storage::truncate_config) final; + + ss::future<> truncate_prefix(storage::truncate_prefix_config) final; + ss::future<> gc(storage::gc_config) final; + ss::future<> apply_segment_ms() final; + + ss::future + make_reader(storage::log_reader_config) final; + + storage::log_appender make_appender(storage::log_append_config) final; + + ss::future> close() final; + + ss::future<> remove() final; + + ss::future<> flush() final; + + ss::future> + timequery(storage::timequery_config) final; + + ss::lw_shared_ptr + get_offset_translator_state() const final; + + model::offset_delta offset_delta(model::offset) const final; + + model::offset from_log_offset(model::offset) const final; + + model::offset to_log_offset(model::offset) const final; + + bool is_new_log() const final; + + size_t segment_count() const final; + + storage::offset_stats offsets() const final; + + size_t get_log_truncation_counter() const noexcept final; + + model::offset find_last_term_start_offset() const final; + + model::timestamp start_timestamp() const final; + + std::ostream& print(std::ostream& o) const final; + + std::optional get_term(model::offset) const final; + + std::optional + get_term_last_offset(model::term_id) const final; + + std::optional index_lower_bound(model::offset o) const final; + + ss::future monitor_eviction(ss::abort_source&) final; + + size_t size_bytes() const final; + + uint64_t size_bytes_after_offset(model::offset o) const final; + + ss::future> + offset_range_size( + model::offset first, + model::offset last, + ss::io_priority_class io_priority) final; + + ss::future> offset_range_size( + model::offset first, + offset_range_size_requirements_t target, + ss::io_priority_class io_priority) final; + + bool is_compacted(model::offset first, model::offset last) const final; + + void set_overrides(storage::ntp_config::default_overrides) final; + + bool notify_compaction_update() final; + + int64_t compaction_backlog() const final; + + ss::future disk_usage(storage::gc_config) final; + + ss::future + get_reclaimable_offsets(storage::gc_config cfg) final; + + void set_cloud_gc_offset(model::offset) final; + + const storage::segment_set& segments() const final; + storage::segment_set& segments() final; + + ss::future<> force_roll(ss::io_priority_class) final; + + storage::probe& get_probe() final; + + size_t reclaimable_size_bytes() const final; + + std::optional + retention_offset(storage::gc_config) const final; + +private: + ss::shared_ptr _underlying_log; + std::optional _append_delay_generator; + friend std::ostream& + operator<<(std::ostream& o, const failure_injectable_log& lg) { + return lg.print(o); + } +}; +} // namespace raft diff --git a/src/v/raft/tests/heartbeat_bench.cc b/src/v/raft/tests/heartbeat_bench.cc index 67b26755dcb81..d8a211bc8b4c0 100644 --- a/src/v/raft/tests/heartbeat_bench.cc +++ b/src/v/raft/tests/heartbeat_bench.cc @@ -31,7 +31,7 @@ struct fixture { return random_generators::random_choice(std::vector{ raft::reply_result::success, raft::reply_result::failure, - raft::reply_result::timeout, + raft::reply_result::follower_busy, raft::reply_result::group_unavailable}); } diff --git a/src/v/raft/tests/heartbeats_test.cc b/src/v/raft/tests/heartbeats_test.cc index 60c887638e853..8c4c9487ce205 100644 --- a/src/v/raft/tests/heartbeats_test.cc +++ b/src/v/raft/tests/heartbeats_test.cc @@ -62,7 +62,7 @@ raft::reply_result random_reply_status() { return random_generators::random_choice(std::vector{ raft::reply_result::success, raft::reply_result::failure, - raft::reply_result::timeout, + raft::reply_result::follower_busy, raft::reply_result::group_unavailable}); } diff --git a/src/v/raft/tests/raft_fixture.cc b/src/v/raft/tests/raft_fixture.cc index c4f632ec9cab1..15a1dbf80c02e 100644 --- a/src/v/raft/tests/raft_fixture.cc +++ b/src/v/raft/tests/raft_fixture.cc @@ -28,6 +28,7 @@ #include "raft/heartbeat_manager.h" #include "raft/heartbeats.h" #include "raft/state_machine_manager.h" +#include "raft/tests/failure_injectable_log.h" #include "raft/timeout_jitter.h" #include "raft/types.h" #include "random/generators.h" @@ -85,13 +86,27 @@ ss::future channel::exchange(msg_type type, iobuf request) { } bool channel::is_valid() const { return _node && _node->raft() != nullptr; } -ss::lw_shared_ptr channel::raft() { +raft::service& +channel::get_service() { if (!_node || _node->raft() == nullptr) { throw std::runtime_error("no raft group"); } - return _node->raft(); + return _node->get_service(); } +namespace { +struct test_ctx : rpc::streaming_context { + ss::future reserve_memory(size_t) final { + co_return ssx::semaphore_units(); + } + const rpc::header& get_header() const final { return _header; }; + + void signal_body_parse() final {} + void body_parse_exception(std::exception_ptr) final {} + + rpc::header _header{}; +}; +} // namespace ss::future<> channel::dispatch_loop() { while (!_as.abort_requested()) { co_await _new_messages.wait([this] { return !_messages.empty(); }); @@ -100,107 +115,90 @@ ss::future<> channel::dispatch_loop() { } auto msg = std::move(_messages.front()); _messages.pop_front(); + ssx::spawn_with_gate(_gate, [this, msg = std::move(msg)]() mutable { + return do_dispatch_message(std::move(msg)); + }); + } +} +ss::future<> channel::do_dispatch_message(msg msg) { + try { iobuf_parser req_parser(std::move(msg.req_data)); + test_ctx ctx{}; + switch (msg.type) { + case msg_type::vote: { + auto req = co_await serde::read_async(req_parser); + auto resp = co_await get_service().vote(std::move(req), ctx); + iobuf resp_buf; + co_await serde::write_async(resp_buf, std::move(resp)); + msg.resp_data.set_value(std::move(resp_buf)); + break; + } + case msg_type::append_entries: { + auto req = co_await serde::read_async( + req_parser); + auto resp = co_await get_service().append_entries( + std::move(req), ctx); + iobuf resp_buf; + co_await serde::write_async(resp_buf, std::move(resp)); + msg.resp_data.set_value(std::move(resp_buf)); + break; + } + case msg_type::heartbeat: { + auto req = co_await serde::read_async( + req_parser); - try { - switch (msg.type) { - case msg_type::vote: { - auto req = co_await serde::read_async(req_parser); - auto resp = co_await raft()->vote(std::move(req)); - iobuf resp_buf; - co_await serde::write_async(resp_buf, std::move(resp)); - msg.resp_data.set_value(std::move(resp_buf)); - break; - } - case msg_type::append_entries: { - auto req = co_await serde::read_async( - req_parser); - auto resp = co_await raft()->append_entries(std::move(req)); - iobuf resp_buf; - co_await serde::write_async(resp_buf, std::move(resp)); - msg.resp_data.set_value(std::move(resp_buf)); - break; - } - case msg_type::heartbeat: { - auto req = co_await serde::read_async( - req_parser); - heartbeat_reply reply; - for (auto& hb : req.heartbeats) { - auto resp = co_await raft()->append_entries( - append_entries_request( - hb.node_id, - hb.meta, - model::make_memory_record_batch_reader( - ss::circular_buffer{}), - 0, - flush_after_append::no)); - reply.meta.push_back(resp); - } - - iobuf resp_buf; - co_await serde::write_async(resp_buf, std::move(reply)); - msg.resp_data.set_value(std::move(resp_buf)); - break; - } - case msg_type::heartbeat_v2: { - auto req = co_await serde::read_async( - req_parser); - heartbeat_reply_v2 reply(raft()->self().id(), req.source()); - - for (auto& hb : req.full_heartbeats()) { - auto resp = co_await raft()->full_heartbeat( - hb.group, req.source(), req.target(), hb.data); - - reply.add(resp.group, resp.result, resp.data); - } - req.for_each_lw_heartbeat( - [this, &req, &reply](raft::group_id g) { - auto result = raft()->lightweight_heartbeat( - req.source(), req.target()); - reply.add(g, result); - }); - - iobuf resp_buf; - co_await serde::write_async(resp_buf, std::move(reply)); - msg.resp_data.set_value(std::move(resp_buf)); - break; - } - case msg_type::install_snapshot: { - auto req = co_await serde::read_async( - req_parser); - auto resp = co_await raft()->install_snapshot(std::move(req)); - iobuf resp_buf; - co_await serde::write_async(resp_buf, std::move(resp)); - msg.resp_data.set_value(std::move(resp_buf)); - break; - } - case msg_type::timeout_now: { - auto req = co_await serde::read_async( - req_parser); - auto resp = co_await raft()->timeout_now(std::move(req)); - iobuf resp_buf; - co_await serde::write_async(resp_buf, std::move(resp)); - msg.resp_data.set_value(std::move(resp_buf)); - break; - } - case msg_type::transfer_leadership: { - auto req - = co_await serde::read_async( - req_parser); - auto resp = co_await raft()->transfer_leadership( - std::move(req)); - iobuf resp_buf; - co_await serde::write_async(resp_buf, std::move(resp)); - msg.resp_data.set_value(std::move(resp_buf)); - break; - } - } - } catch (...) { - msg.resp_data.set_to_current_exception(); + auto reply = co_await get_service().heartbeat(std::move(req), ctx); + + iobuf resp_buf; + co_await serde::write_async(resp_buf, std::move(reply)); + msg.resp_data.set_value(std::move(resp_buf)); + break; + } + case msg_type::heartbeat_v2: { + auto req = co_await serde::read_async( + req_parser); + auto reply = co_await get_service().heartbeat_v2( + std::move(req), ctx); + + iobuf resp_buf; + co_await serde::write_async(resp_buf, std::move(reply)); + msg.resp_data.set_value(std::move(resp_buf)); + break; + } + case msg_type::install_snapshot: { + auto req = co_await serde::read_async( + req_parser); + auto resp = co_await get_service().install_snapshot( + std::move(req), ctx); + iobuf resp_buf; + co_await serde::write_async(resp_buf, std::move(resp)); + msg.resp_data.set_value(std::move(resp_buf)); + break; + } + case msg_type::timeout_now: { + auto req = co_await serde::read_async( + req_parser); + auto resp = co_await get_service().timeout_now(std::move(req), ctx); + iobuf resp_buf; + co_await serde::write_async(resp_buf, std::move(resp)); + msg.resp_data.set_value(std::move(resp_buf)); + break; + } + case msg_type::transfer_leadership: { + auto req = co_await serde::read_async( + req_parser); + auto resp = co_await get_service().transfer_leadership( + std::move(req), ctx); + iobuf resp_buf; + co_await serde::write_async(resp_buf, std::move(resp)); + msg.resp_data.set_value(std::move(resp_buf)); + break; } + } + } catch (...) { + msg.resp_data.set_to_current_exception(); } } - in_memory_test_protocol::in_memory_test_protocol( raft_node_map& node_map, prefix_logger& logger) : _nodes(node_map) @@ -257,9 +255,9 @@ static constexpr msg_type map_msg_type() { } template -ss::future> -in_memory_test_protocol::dispatch(model::node_id id, ReqT req) { - _gate.hold(); +ss::future> in_memory_test_protocol::dispatch( + model::node_id id, ReqT req, rpc::client_opts opts) { + auto h = _gate.hold(); auto it = _channels.find(id); if (it == _channels.end()) { auto node = _nodes.node_for(id); @@ -288,54 +286,61 @@ in_memory_test_protocol::dispatch(model::node_id id, ReqT req) { } try { - auto resp = co_await node_channel.exchange(msg_type, std::move(buffer)); + auto f = node_channel.exchange(msg_type, std::move(buffer)); + opts.resource_units.release(); + auto resp = co_await ss::with_timeout( + opts.timeout.timeout_at(), std::move(f)); iobuf_parser parser(std::move(resp)); co_return co_await serde::read_async(parser); + } catch (const ss::timed_out_error&) { + co_return rpc::errc::client_request_timeout; } catch (const seastar::gate_closed_exception&) { co_return errc::shutting_down; } } ss::future> in_memory_test_protocol::vote( - model::node_id id, vote_request req, rpc::client_opts) { - return dispatch(id, req); + model::node_id id, vote_request req, rpc::client_opts opts) { + return dispatch(id, req, std::move(opts)); }; ss::future> in_memory_test_protocol::append_entries( - model::node_id id, append_entries_request req, rpc::client_opts) { + model::node_id id, append_entries_request req, rpc::client_opts opts) { return dispatch( - id, std::move(req)); + id, std::move(req), std::move(opts)); }; ss::future> in_memory_test_protocol::heartbeat( - model::node_id id, heartbeat_request req, rpc::client_opts) { - return dispatch(id, std::move(req)); + model::node_id id, heartbeat_request req, rpc::client_opts opts) { + return dispatch( + id, std::move(req), std::move(opts)); } ss::future> in_memory_test_protocol::heartbeat_v2( - model::node_id id, heartbeat_request_v2 req, rpc::client_opts) { + model::node_id id, heartbeat_request_v2 req, rpc::client_opts opts) { return dispatch( - id, std::move(req)); + id, std::move(req), std::move(opts)); } ss::future> in_memory_test_protocol::install_snapshot( - model::node_id id, install_snapshot_request req, rpc::client_opts) { + model::node_id id, install_snapshot_request req, rpc::client_opts opts) { return dispatch( - id, std::move(req)); + id, std::move(req), std::move(opts)); } ss::future> in_memory_test_protocol::timeout_now( - model::node_id id, timeout_now_request req, rpc::client_opts) { - return dispatch(id, std::move(req)); + model::node_id id, timeout_now_request req, rpc::client_opts opts) { + return dispatch( + id, std::move(req), std::move(opts)); } ss::future> in_memory_test_protocol::transfer_leadership( - model::node_id id, transfer_leadership_request req, rpc::client_opts) { + model::node_id id, transfer_leadership_request req, rpc::client_opts opts) { return dispatch( - id, std::move(req)); + id, std::move(req), std::move(opts)); } raft_node_instance::raft_node_instance( @@ -395,12 +400,20 @@ raft_node_instance::raft_node_instance( , _enable_longest_log_detection(enable_longest_log_detection) , _election_timeout(std::move(election_timeout)) , _heartbeat_interval(std::move(heartbeat_interval)) - , _with_offset_translation(with_offset_translation) { + , _with_offset_translation(with_offset_translation) + , _service( + ss::default_scheduling_group(), + ss::default_smp_service_group(), + std::ref(_group_manager), + _shard_manager, + _heartbeat_interval(), + _id) { config::shard_local_cfg().disable_metrics.set_value(true); } ss::future<> raft_node_instance::initialise(std::vector initial_nodes) { + co_await _group_manager.start_single(); _hb_manager = std::make_unique( _heartbeat_interval, consensus_client_protocol(_buffered_protocol), @@ -433,13 +446,13 @@ raft_node_instance::initialise(std::vector initial_nodes) { test_group, _with_offset_translation ? model::offset_translator_batch_types() : std::vector{}); - + _f_log = ss::make_shared(std::move(log)); _raft = ss::make_lw_shared( _id, test_group, raft::group_configuration(std::move(initial_nodes), _revision), timeout_jitter(_election_timeout), - log, + _f_log, scheduling_config( ss::default_scheduling_group(), ss::default_priority_class()), config::mock_binding(1s), @@ -451,6 +464,7 @@ raft_node_instance::initialise(std::vector initial_nodes) { _recovery_mem_quota, _recovery_scheduler, _features.local()); + _group_manager.local().raft = _raft; co_await _hb_manager->register_group(_raft); } @@ -474,6 +488,8 @@ ss::future<> raft_node_instance::stop() { vlog(_logger.debug, "stopping protocol"); co_await _buffered_protocol->stop(); co_await _protocol->stop(); + // release f_log pointer before stopping raft + _f_log = nullptr; vlog(_logger.debug, "stopping raft"); co_await _raft->stop(); vlog(_logger.debug, "stopping recovery throttle"); @@ -484,6 +500,10 @@ ss::future<> raft_node_instance::stop() { co_await _hb_manager->stop(); vlog(_logger.debug, "stopping feature table"); _raft = nullptr; + + // group manager must be stopped before storage as consensus stores the + // units of the storage resources semaphore. + co_await _group_manager.stop(); vlog(_logger.debug, "stopping storage"); co_await _storage.stop(); } diff --git a/src/v/raft/tests/raft_fixture.h b/src/v/raft/tests/raft_fixture.h index 3baf664a2b462..08458e2b12db5 100644 --- a/src/v/raft/tests/raft_fixture.h +++ b/src/v/raft/tests/raft_fixture.h @@ -24,7 +24,9 @@ #include "raft/fwd.h" #include "raft/heartbeat_manager.h" #include "raft/recovery_memory_quota.h" +#include "raft/service.h" #include "raft/state_machine_manager.h" +#include "raft/tests/failure_injectable_log.h" #include "raft/types.h" #include "ssx/sformat.h" #include "storage/api.h" @@ -60,6 +62,20 @@ struct msg { ss::promise resp_data; }; class raft_node_instance; +/** + * Dummy shard and group managers for the fixture to be used + * with Raft rpc service implementation. + */ +struct fixture_group_manager { + ss::lw_shared_ptr consensus_for(raft::group_id) { return raft; } + ss::lw_shared_ptr raft; +}; + +struct fixture_shard_manager { + std::optional shard_for(raft::group_id) { + return ss::this_shard_id(); + } +}; struct channel { explicit channel(raft_node_instance&); @@ -73,7 +89,8 @@ struct channel { bool is_valid() const; private: - ss::lw_shared_ptr raft(); + ss::future<> do_dispatch_message(msg); + raft::service& get_service(); ss::weak_ptr _node; ss::chunked_fifo _messages; ss::gate _gate; @@ -130,7 +147,8 @@ class in_memory_test_protocol : public consensus_client_protocol::impl { private: template - ss::future> dispatch(model::node_id, ReqT req); + ss::future> + dispatch(model::node_id, ReqT req, rpc::client_opts); ss::gate _gate; absl::flat_hash_map> _channels; std::vector _on_dispatch_handlers; @@ -147,6 +165,8 @@ inline model::timeout_clock::time_point default_timeout() { */ class raft_node_instance : public ss::weakly_referencable { public: + using service_t + = raft::service; using leader_update_clb_t = ss::noncopyable_function; raft_node_instance( @@ -246,6 +266,10 @@ class raft_node_instance : public ss::weakly_referencable { storage::kvstore& get_kvstore() { return _storage.local().kvs(); } + ss::shared_ptr f_injectable_log() { return _f_log; } + + service_t& get_service() { return _service; } + private: model::node_id _id; model::revision_id _revision; @@ -268,6 +292,10 @@ class raft_node_instance : public ss::weakly_referencable { config::binding _election_timeout; config::binding _heartbeat_interval; bool _with_offset_translation; + ss::sharded _group_manager; + fixture_shard_manager _shard_manager{}; + service_t _service; + ss::shared_ptr _f_log; }; class raft_fixture diff --git a/src/v/raft/tests/type_serialization_tests.cc b/src/v/raft/tests/type_serialization_tests.cc index 014a05f69be8d..06d8e22eb9697 100644 --- a/src/v/raft/tests/type_serialization_tests.cc +++ b/src/v/raft/tests/type_serialization_tests.cc @@ -299,7 +299,7 @@ SEASTAR_THREAD_TEST_CASE(heartbeat_response_with_failures) { .last_flushed_log_index = model::offset{}, .last_dirty_log_index = model::offset{}, .last_term_base_offset = model::offset{}, - .result = raft::reply_result::timeout}); + .result = raft::reply_result::follower_busy}); /** * Two other replies are successful diff --git a/src/v/raft/types.cc b/src/v/raft/types.cc index e5c675e01b9e6..5752cc389b521 100644 --- a/src/v/raft/types.cc +++ b/src/v/raft/types.cc @@ -279,8 +279,8 @@ std::ostream& operator<<(std::ostream& o, const reply_result& r) { case reply_result::group_unavailable: o << "group_unavailable"; return o; - case reply_result::timeout: - o << "timeout"; + case reply_result::follower_busy: + o << "follower_busy"; return o; } __builtin_unreachable(); diff --git a/src/v/storage/ntp_config.h b/src/v/storage/ntp_config.h index eca03e6f9ef24..47c67916f17ec 100644 --- a/src/v/storage/ntp_config.h +++ b/src/v/storage/ntp_config.h @@ -365,6 +365,17 @@ class ntp_config { : default_cloud_topic_enabled; } + ntp_config copy() const { + return { + _ntp, + _base_dir, + _overrides ? std::make_unique(*_overrides) + : nullptr, + _revision_id, + _topic_rev, + _remote_rev}; + } + private: model::ntp _ntp; /// \brief currently this is the basedir. In the future