From 6a1e34beaddd5182ad2ff58beadc014480c0ea13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Mon, 16 Dec 2024 18:04:06 +0100 Subject: [PATCH 1/7] raft: renamed reply_result::timeout to follower_busy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `raft::reply_result::follower_busy` is indicating that the follower was unable to process the heartbeat fast enough to generate a response. Renaming the reply from `timeout` will make it less confusing for the reader and differentiate the error code from an RPC timeout. Signed-off-by: Michał Maślanka --- src/v/compat/raft_generator.h | 2 +- src/v/compat/raft_json.h | 2 +- src/v/raft/consensus.cc | 5 ++--- src/v/raft/fundamental.h | 2 +- src/v/raft/heartbeat_manager.cc | 6 +++--- src/v/raft/service.h | 12 ++++++------ src/v/raft/tests/heartbeat_bench.cc | 2 +- src/v/raft/tests/heartbeats_test.cc | 2 +- src/v/raft/tests/type_serialization_tests.cc | 2 +- src/v/raft/types.cc | 4 ++-- 10 files changed, 19 insertions(+), 20 deletions(-) diff --git a/src/v/compat/raft_generator.h b/src/v/compat/raft_generator.h index 8d0ca63c9606..271502746cb7 100644 --- a/src/v/compat/raft_generator.h +++ b/src/v/compat/raft_generator.h @@ -347,7 +347,7 @@ struct instance_generator { {raft::reply_result::success, raft::reply_result::failure, raft::reply_result::group_unavailable, - raft::reply_result::timeout}), + raft::reply_result::follower_busy}), .may_recover = tests::random_bool(), }; } diff --git a/src/v/compat/raft_json.h b/src/v/compat/raft_json.h index 2c6d2a96a45f..49ea00d363d7 100644 --- a/src/v/compat/raft_json.h +++ b/src/v/compat/raft_json.h @@ -106,7 +106,7 @@ inline void read_value(const json::Value& rd, raft::append_entries_reply& out) { obj.result = raft::reply_result::group_unavailable; break; case 3: - obj.result = raft::reply_result::timeout; + obj.result = raft::reply_result::follower_busy; break; default: vassert(false, "invalid result {}", result); diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 0c59f3297695..9d577b0d8218 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -373,10 +373,9 @@ consensus::success_reply consensus::update_follower_index( physical_node); return success_reply::no; } - - if (unlikely(reply.result == reply_result::timeout)) { + if (unlikely(reply.result == reply_result::follower_busy)) { // ignore this response, timed out on the receiver node - vlog(_ctxlog.trace, "Append entries request timedout at node {}", node); + vlog(_ctxlog.trace, "Follower busy on node {}", node.id()); return success_reply::no; } if (unlikely(reply.result == reply_result::group_unavailable)) { diff --git a/src/v/raft/fundamental.h b/src/v/raft/fundamental.h index 3364ba168f38..fb3c5adc6be6 100644 --- a/src/v/raft/fundamental.h +++ b/src/v/raft/fundamental.h @@ -32,7 +32,7 @@ enum class reply_result : uint8_t { success, failure, group_unavailable, - timeout + follower_busy }; /** diff --git a/src/v/raft/heartbeat_manager.cc b/src/v/raft/heartbeat_manager.cc index 78d1478988d2..bd7ef8a8afc9 100644 --- a/src/v/raft/heartbeat_manager.cc +++ b/src/v/raft/heartbeat_manager.cc @@ -374,7 +374,7 @@ void heartbeat_manager::process_reply( return; } - if (unlikely(result == reply_result::timeout)) { + if (unlikely(result == reply_result::follower_busy)) { vlog( hbeatlog.debug, "Heartbeat request for group {} timed out on the node {}", @@ -439,7 +439,7 @@ void heartbeat_manager::process_reply( continue; } - if (unlikely(m.result == reply_result::timeout)) { + if (unlikely(m.result == reply_result::follower_busy)) { vlog( hbeatlog.debug, "Heartbeat request for group {} timed out on the node {}", @@ -555,7 +555,7 @@ void heartbeat_manager::process_reply( continue; } - if (unlikely(m.result == reply_result::timeout)) { + if (unlikely(m.result == reply_result::follower_busy)) { vlog( hbeatlog.debug, "Heartbeat request for group {} timed out on the node {}", diff --git a/src/v/raft/service.h b/src/v/raft/service.h index 0bb0bc4fc4ff..9a92076564f5 100644 --- a/src/v/raft/service.h +++ b/src/v/raft/service.h @@ -411,7 +411,7 @@ class service final : public raftgen_service { return ss::with_timeout(timeout, std::move(f)) .handle_exception_type([group](const ss::timed_out_error&) { return append_entries_reply{ - .group = group, .result = reply_result::timeout}; + .group = group, .result = reply_result::follower_busy}; }); }); @@ -447,11 +447,11 @@ class service final : public raftgen_service { auto f = dispatch_full_heartbeat( source_node, target_node, m, full_hb); f = ss::with_timeout(timeout, std::move(f)) - .handle_exception_type( - [group = full_hb.group](const ss::timed_out_error&) { - return full_heartbeat_reply{ - .group = group, .result = reply_result::timeout}; - }); + .handle_exception_type([group = full_hb.group]( + const ss::timed_out_error&) { + return full_heartbeat_reply{ + .group = group, .result = reply_result::follower_busy}; + }); futures.push_back(std::move(f)); } diff --git a/src/v/raft/tests/heartbeat_bench.cc b/src/v/raft/tests/heartbeat_bench.cc index 67b26755dcb8..d8a211bc8b4c 100644 --- a/src/v/raft/tests/heartbeat_bench.cc +++ b/src/v/raft/tests/heartbeat_bench.cc @@ -31,7 +31,7 @@ struct fixture { return random_generators::random_choice(std::vector{ raft::reply_result::success, raft::reply_result::failure, - raft::reply_result::timeout, + raft::reply_result::follower_busy, raft::reply_result::group_unavailable}); } diff --git a/src/v/raft/tests/heartbeats_test.cc b/src/v/raft/tests/heartbeats_test.cc index 60c887638e85..8c4c9487ce20 100644 --- a/src/v/raft/tests/heartbeats_test.cc +++ b/src/v/raft/tests/heartbeats_test.cc @@ -62,7 +62,7 @@ raft::reply_result random_reply_status() { return random_generators::random_choice(std::vector{ raft::reply_result::success, raft::reply_result::failure, - raft::reply_result::timeout, + raft::reply_result::follower_busy, raft::reply_result::group_unavailable}); } diff --git a/src/v/raft/tests/type_serialization_tests.cc b/src/v/raft/tests/type_serialization_tests.cc index 014a05f69be8..06d8e22eb969 100644 --- a/src/v/raft/tests/type_serialization_tests.cc +++ b/src/v/raft/tests/type_serialization_tests.cc @@ -299,7 +299,7 @@ SEASTAR_THREAD_TEST_CASE(heartbeat_response_with_failures) { .last_flushed_log_index = model::offset{}, .last_dirty_log_index = model::offset{}, .last_term_base_offset = model::offset{}, - .result = raft::reply_result::timeout}); + .result = raft::reply_result::follower_busy}); /** * Two other replies are successful diff --git a/src/v/raft/types.cc b/src/v/raft/types.cc index e5c675e01b9e..5752cc389b52 100644 --- a/src/v/raft/types.cc +++ b/src/v/raft/types.cc @@ -279,8 +279,8 @@ std::ostream& operator<<(std::ostream& o, const reply_result& r) { case reply_result::group_unavailable: o << "group_unavailable"; return o; - case reply_result::timeout: - o << "timeout"; + case reply_result::follower_busy: + o << "follower_busy"; return o; } __builtin_unreachable(); From 95a29dba65fa96ad67cbf84de601e44bec970f8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 17 Dec 2024 08:10:33 +0100 Subject: [PATCH 2/7] storage/ntp_config: added copy method to ntp_config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Maślanka --- src/v/storage/ntp_config.h | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/v/storage/ntp_config.h b/src/v/storage/ntp_config.h index eca03e6f9ef2..47c67916f17e 100644 --- a/src/v/storage/ntp_config.h +++ b/src/v/storage/ntp_config.h @@ -365,6 +365,17 @@ class ntp_config { : default_cloud_topic_enabled; } + ntp_config copy() const { + return { + _ntp, + _base_dir, + _overrides ? std::make_unique(*_overrides) + : nullptr, + _revision_id, + _topic_rev, + _remote_rev}; + } + private: model::ntp _ntp; /// \brief currently this is the basedir. In the future From 5f69d9b733a8c4c42282bebaa2667e8d0baaaf65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 17 Dec 2024 09:49:57 +0100 Subject: [PATCH 3/7] raft/tests: wired in rpc service into the raft_fixture MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wired raft RPC service handler into Raft fixture to make the tests more accurate and cover the service code with tests. Signed-off-by: Michał Maślanka --- src/v/raft/tests/raft_fixture.cc | 208 ++++++++++++++++--------------- src/v/raft/tests/raft_fixture.h | 26 +++- 2 files changed, 134 insertions(+), 100 deletions(-) diff --git a/src/v/raft/tests/raft_fixture.cc b/src/v/raft/tests/raft_fixture.cc index c4f632ec9cab..259f7b90c11f 100644 --- a/src/v/raft/tests/raft_fixture.cc +++ b/src/v/raft/tests/raft_fixture.cc @@ -85,13 +85,27 @@ ss::future channel::exchange(msg_type type, iobuf request) { } bool channel::is_valid() const { return _node && _node->raft() != nullptr; } -ss::lw_shared_ptr channel::raft() { +raft::service& +channel::get_service() { if (!_node || _node->raft() == nullptr) { throw std::runtime_error("no raft group"); } - return _node->raft(); + return _node->get_service(); } +namespace { +struct test_ctx : rpc::streaming_context { + ss::future reserve_memory(size_t) final { + co_return ssx::semaphore_units(); + } + const rpc::header& get_header() const final { return _header; }; + + void signal_body_parse() final {} + void body_parse_exception(std::exception_ptr) final {} + + rpc::header _header{}; +}; +} // namespace ss::future<> channel::dispatch_loop() { while (!_as.abort_requested()) { co_await _new_messages.wait([this] { return !_messages.empty(); }); @@ -100,107 +114,90 @@ ss::future<> channel::dispatch_loop() { } auto msg = std::move(_messages.front()); _messages.pop_front(); + ssx::spawn_with_gate(_gate, [this, msg = std::move(msg)]() mutable { + return do_dispatch_message(std::move(msg)); + }); + } +} +ss::future<> channel::do_dispatch_message(msg msg) { + try { iobuf_parser req_parser(std::move(msg.req_data)); + test_ctx ctx{}; + switch (msg.type) { + case msg_type::vote: { + auto req = co_await serde::read_async(req_parser); + auto resp = co_await get_service().vote(std::move(req), ctx); + iobuf resp_buf; + co_await serde::write_async(resp_buf, std::move(resp)); + msg.resp_data.set_value(std::move(resp_buf)); + break; + } + case msg_type::append_entries: { + auto req = co_await serde::read_async( + req_parser); + auto resp = co_await get_service().append_entries( + std::move(req), ctx); + iobuf resp_buf; + co_await serde::write_async(resp_buf, std::move(resp)); + msg.resp_data.set_value(std::move(resp_buf)); + break; + } + case msg_type::heartbeat: { + auto req = co_await serde::read_async( + req_parser); + + auto reply = co_await get_service().heartbeat(std::move(req), ctx); - try { - switch (msg.type) { - case msg_type::vote: { - auto req = co_await serde::read_async(req_parser); - auto resp = co_await raft()->vote(std::move(req)); - iobuf resp_buf; - co_await serde::write_async(resp_buf, std::move(resp)); - msg.resp_data.set_value(std::move(resp_buf)); - break; - } - case msg_type::append_entries: { - auto req = co_await serde::read_async( - req_parser); - auto resp = co_await raft()->append_entries(std::move(req)); - iobuf resp_buf; - co_await serde::write_async(resp_buf, std::move(resp)); - msg.resp_data.set_value(std::move(resp_buf)); - break; - } - case msg_type::heartbeat: { - auto req = co_await serde::read_async( - req_parser); - heartbeat_reply reply; - for (auto& hb : req.heartbeats) { - auto resp = co_await raft()->append_entries( - append_entries_request( - hb.node_id, - hb.meta, - model::make_memory_record_batch_reader( - ss::circular_buffer{}), - 0, - flush_after_append::no)); - reply.meta.push_back(resp); - } - - iobuf resp_buf; - co_await serde::write_async(resp_buf, std::move(reply)); - msg.resp_data.set_value(std::move(resp_buf)); - break; - } - case msg_type::heartbeat_v2: { - auto req = co_await serde::read_async( - req_parser); - heartbeat_reply_v2 reply(raft()->self().id(), req.source()); - - for (auto& hb : req.full_heartbeats()) { - auto resp = co_await raft()->full_heartbeat( - hb.group, req.source(), req.target(), hb.data); - - reply.add(resp.group, resp.result, resp.data); - } - req.for_each_lw_heartbeat( - [this, &req, &reply](raft::group_id g) { - auto result = raft()->lightweight_heartbeat( - req.source(), req.target()); - reply.add(g, result); - }); - - iobuf resp_buf; - co_await serde::write_async(resp_buf, std::move(reply)); - msg.resp_data.set_value(std::move(resp_buf)); - break; - } - case msg_type::install_snapshot: { - auto req = co_await serde::read_async( - req_parser); - auto resp = co_await raft()->install_snapshot(std::move(req)); - iobuf resp_buf; - co_await serde::write_async(resp_buf, std::move(resp)); - msg.resp_data.set_value(std::move(resp_buf)); - break; - } - case msg_type::timeout_now: { - auto req = co_await serde::read_async( - req_parser); - auto resp = co_await raft()->timeout_now(std::move(req)); - iobuf resp_buf; - co_await serde::write_async(resp_buf, std::move(resp)); - msg.resp_data.set_value(std::move(resp_buf)); - break; - } - case msg_type::transfer_leadership: { - auto req - = co_await serde::read_async( - req_parser); - auto resp = co_await raft()->transfer_leadership( - std::move(req)); - iobuf resp_buf; - co_await serde::write_async(resp_buf, std::move(resp)); - msg.resp_data.set_value(std::move(resp_buf)); - break; - } - } - } catch (...) { - msg.resp_data.set_to_current_exception(); + iobuf resp_buf; + co_await serde::write_async(resp_buf, std::move(reply)); + msg.resp_data.set_value(std::move(resp_buf)); + break; + } + case msg_type::heartbeat_v2: { + auto req = co_await serde::read_async( + req_parser); + auto reply = co_await get_service().heartbeat_v2( + std::move(req), ctx); + + iobuf resp_buf; + co_await serde::write_async(resp_buf, std::move(reply)); + msg.resp_data.set_value(std::move(resp_buf)); + break; + } + case msg_type::install_snapshot: { + auto req = co_await serde::read_async( + req_parser); + auto resp = co_await get_service().install_snapshot( + std::move(req), ctx); + iobuf resp_buf; + co_await serde::write_async(resp_buf, std::move(resp)); + msg.resp_data.set_value(std::move(resp_buf)); + break; + } + case msg_type::timeout_now: { + auto req = co_await serde::read_async( + req_parser); + auto resp = co_await get_service().timeout_now(std::move(req), ctx); + iobuf resp_buf; + co_await serde::write_async(resp_buf, std::move(resp)); + msg.resp_data.set_value(std::move(resp_buf)); + break; } + case msg_type::transfer_leadership: { + auto req = co_await serde::read_async( + req_parser); + auto resp = co_await get_service().transfer_leadership( + std::move(req), ctx); + iobuf resp_buf; + co_await serde::write_async(resp_buf, std::move(resp)); + msg.resp_data.set_value(std::move(resp_buf)); + break; + } + } + } catch (...) { + msg.resp_data.set_to_current_exception(); } } - in_memory_test_protocol::in_memory_test_protocol( raft_node_map& node_map, prefix_logger& logger) : _nodes(node_map) @@ -395,12 +392,20 @@ raft_node_instance::raft_node_instance( , _enable_longest_log_detection(enable_longest_log_detection) , _election_timeout(std::move(election_timeout)) , _heartbeat_interval(std::move(heartbeat_interval)) - , _with_offset_translation(with_offset_translation) { + , _with_offset_translation(with_offset_translation) + , _service( + ss::default_scheduling_group(), + ss::default_smp_service_group(), + std::ref(_group_manager), + _shard_manager, + _heartbeat_interval(), + _id) { config::shard_local_cfg().disable_metrics.set_value(true); } ss::future<> raft_node_instance::initialise(std::vector initial_nodes) { + co_await _group_manager.start_single(); _hb_manager = std::make_unique( _heartbeat_interval, consensus_client_protocol(_buffered_protocol), @@ -451,6 +456,7 @@ raft_node_instance::initialise(std::vector initial_nodes) { _recovery_mem_quota, _recovery_scheduler, _features.local()); + _group_manager.local().raft = _raft; co_await _hb_manager->register_group(_raft); } @@ -484,6 +490,10 @@ ss::future<> raft_node_instance::stop() { co_await _hb_manager->stop(); vlog(_logger.debug, "stopping feature table"); _raft = nullptr; + + // group manager must be stopped before storage as consensus stores the + // units of the storage resources semaphore. + co_await _group_manager.stop(); vlog(_logger.debug, "stopping storage"); co_await _storage.stop(); } diff --git a/src/v/raft/tests/raft_fixture.h b/src/v/raft/tests/raft_fixture.h index 3baf664a2b46..aba243527e37 100644 --- a/src/v/raft/tests/raft_fixture.h +++ b/src/v/raft/tests/raft_fixture.h @@ -24,6 +24,7 @@ #include "raft/fwd.h" #include "raft/heartbeat_manager.h" #include "raft/recovery_memory_quota.h" +#include "raft/service.h" #include "raft/state_machine_manager.h" #include "raft/types.h" #include "ssx/sformat.h" @@ -60,6 +61,20 @@ struct msg { ss::promise resp_data; }; class raft_node_instance; +/** + * Dummy shard and group managers for the fixture to be used + * with Raft rpc service implementation. + */ +struct fixture_group_manager { + ss::lw_shared_ptr consensus_for(raft::group_id) { return raft; } + ss::lw_shared_ptr raft; +}; + +struct fixture_shard_manager { + std::optional shard_for(raft::group_id) { + return ss::this_shard_id(); + } +}; struct channel { explicit channel(raft_node_instance&); @@ -73,7 +88,8 @@ struct channel { bool is_valid() const; private: - ss::lw_shared_ptr raft(); + ss::future<> do_dispatch_message(msg); + raft::service& get_service(); ss::weak_ptr _node; ss::chunked_fifo _messages; ss::gate _gate; @@ -147,6 +163,8 @@ inline model::timeout_clock::time_point default_timeout() { */ class raft_node_instance : public ss::weakly_referencable { public: + using service_t + = raft::service; using leader_update_clb_t = ss::noncopyable_function; raft_node_instance( @@ -246,6 +264,9 @@ class raft_node_instance : public ss::weakly_referencable { storage::kvstore& get_kvstore() { return _storage.local().kvs(); } + + service_t& get_service() { return _service; } + private: model::node_id _id; model::revision_id _revision; @@ -268,6 +289,9 @@ class raft_node_instance : public ss::weakly_referencable { config::binding _election_timeout; config::binding _heartbeat_interval; bool _with_offset_translation; + ss::sharded _group_manager; + fixture_shard_manager _shard_manager{}; + service_t _service; }; class raft_fixture From 7d33bb5659e05a8bd480c4cc25d20a5cd7907f7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 17 Dec 2024 10:43:57 +0100 Subject: [PATCH 4/7] r/tests: handle timeout in in memory rpc protocol MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Propagating timeout to the node sending RPC request is crucial for accurate testing of Raft implementation. Signed-off-by: Michał Maślanka --- src/v/raft/tests/raft_fixture.cc | 43 +++++++++++++++++++------------- src/v/raft/tests/raft_fixture.h | 3 ++- 2 files changed, 27 insertions(+), 19 deletions(-) diff --git a/src/v/raft/tests/raft_fixture.cc b/src/v/raft/tests/raft_fixture.cc index 259f7b90c11f..20f12b8fb3b7 100644 --- a/src/v/raft/tests/raft_fixture.cc +++ b/src/v/raft/tests/raft_fixture.cc @@ -254,9 +254,9 @@ static constexpr msg_type map_msg_type() { } template -ss::future> -in_memory_test_protocol::dispatch(model::node_id id, ReqT req) { - _gate.hold(); +ss::future> in_memory_test_protocol::dispatch( + model::node_id id, ReqT req, rpc::client_opts opts) { + auto h = _gate.hold(); auto it = _channels.find(id); if (it == _channels.end()) { auto node = _nodes.node_for(id); @@ -285,54 +285,61 @@ in_memory_test_protocol::dispatch(model::node_id id, ReqT req) { } try { - auto resp = co_await node_channel.exchange(msg_type, std::move(buffer)); + auto f = node_channel.exchange(msg_type, std::move(buffer)); + opts.resource_units.release(); + auto resp = co_await ss::with_timeout( + opts.timeout.timeout_at(), std::move(f)); iobuf_parser parser(std::move(resp)); co_return co_await serde::read_async(parser); + } catch (const ss::timed_out_error&) { + co_return rpc::errc::client_request_timeout; } catch (const seastar::gate_closed_exception&) { co_return errc::shutting_down; } } ss::future> in_memory_test_protocol::vote( - model::node_id id, vote_request req, rpc::client_opts) { - return dispatch(id, req); + model::node_id id, vote_request req, rpc::client_opts opts) { + return dispatch(id, req, std::move(opts)); }; ss::future> in_memory_test_protocol::append_entries( - model::node_id id, append_entries_request req, rpc::client_opts) { + model::node_id id, append_entries_request req, rpc::client_opts opts) { return dispatch( - id, std::move(req)); + id, std::move(req), std::move(opts)); }; ss::future> in_memory_test_protocol::heartbeat( - model::node_id id, heartbeat_request req, rpc::client_opts) { - return dispatch(id, std::move(req)); + model::node_id id, heartbeat_request req, rpc::client_opts opts) { + return dispatch( + id, std::move(req), std::move(opts)); } ss::future> in_memory_test_protocol::heartbeat_v2( - model::node_id id, heartbeat_request_v2 req, rpc::client_opts) { + model::node_id id, heartbeat_request_v2 req, rpc::client_opts opts) { return dispatch( - id, std::move(req)); + id, std::move(req), std::move(opts)); } ss::future> in_memory_test_protocol::install_snapshot( - model::node_id id, install_snapshot_request req, rpc::client_opts) { + model::node_id id, install_snapshot_request req, rpc::client_opts opts) { return dispatch( - id, std::move(req)); + id, std::move(req), std::move(opts)); } ss::future> in_memory_test_protocol::timeout_now( - model::node_id id, timeout_now_request req, rpc::client_opts) { - return dispatch(id, std::move(req)); + model::node_id id, timeout_now_request req, rpc::client_opts opts) { + return dispatch( + id, std::move(req), std::move(opts)); } ss::future> in_memory_test_protocol::transfer_leadership( - model::node_id id, transfer_leadership_request req, rpc::client_opts) { + model::node_id id, transfer_leadership_request req, rpc::client_opts opts) { return dispatch( - id, std::move(req)); + id, std::move(req), std::move(opts)); } raft_node_instance::raft_node_instance( diff --git a/src/v/raft/tests/raft_fixture.h b/src/v/raft/tests/raft_fixture.h index aba243527e37..1f63e83cefa9 100644 --- a/src/v/raft/tests/raft_fixture.h +++ b/src/v/raft/tests/raft_fixture.h @@ -146,7 +146,8 @@ class in_memory_test_protocol : public consensus_client_protocol::impl { private: template - ss::future> dispatch(model::node_id, ReqT req); + ss::future> + dispatch(model::node_id, ReqT req, rpc::client_opts); ss::gate _gate; absl::flat_hash_map> _channels; std::vector _on_dispatch_handlers; From f04995a75105fb5e070accdc28e3b3de4e914836 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 17 Dec 2024 11:47:01 +0100 Subject: [PATCH 5/7] r/tets: added failure injectable log MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added a wrapper around the `storage::log` allowing us to inject storage layer failures in Raft fixture tests. Signed-off-by: Michał Maślanka --- src/v/raft/tests/BUILD | 17 ++ src/v/raft/tests/CMakeLists.txt | 14 +- src/v/raft/tests/failure_injectable_log.cc | 243 +++++++++++++++++++++ src/v/raft/tests/failure_injectable_log.h | 142 ++++++++++++ src/v/raft/tests/raft_fixture.cc | 7 +- src/v/raft/tests/raft_fixture.h | 3 + 6 files changed, 418 insertions(+), 8 deletions(-) create mode 100644 src/v/raft/tests/failure_injectable_log.cc create mode 100644 src/v/raft/tests/failure_injectable_log.h diff --git a/src/v/raft/tests/BUILD b/src/v/raft/tests/BUILD index f1d3502f1b64..58c42f0d3367 100644 --- a/src/v/raft/tests/BUILD +++ b/src/v/raft/tests/BUILD @@ -72,6 +72,22 @@ redpanda_test_cc_library( ], ) +redpanda_test_cc_library( + name = "failure_injectable_log", + srcs = ["failure_injectable_log.cc"], + hdrs = [ + "failure_injectable_log.h", + ], + include_prefix = "raft/tests", + visibility = ["//visibility:public"], + deps = [ + "//src/v/base", + "//src/v/storage", + "//src/v/test_utils:gtest", + "@seastar", + ], +) + redpanda_test_cc_library( name = "raft_fixture", srcs = ["raft_fixture.cc"], @@ -81,6 +97,7 @@ redpanda_test_cc_library( include_prefix = "raft/tests", visibility = ["//visibility:public"], deps = [ + ":failure_injectable_log", "//src/v/base", "//src/v/bytes:iobuf", "//src/v/bytes:iobuf_parser", diff --git a/src/v/raft/tests/CMakeLists.txt b/src/v/raft/tests/CMakeLists.txt index 081fcbb9d484..d7818567d24b 100644 --- a/src/v/raft/tests/CMakeLists.txt +++ b/src/v/raft/tests/CMakeLists.txt @@ -35,8 +35,14 @@ rp_test( ARGS "-- -c 8" ) + +v_cc_library( + NAME raft_fixture + SRCS raft_fixture.cc + failure_injectable_log.cc + DEPS Seastar::seastar v::raft v::gtest_main) + set(gsrcs - raft_fixture.cc basic_raft_fixture_test.cc stm_manager_test.cc raft_reconfiguration_test.cc @@ -53,7 +59,7 @@ rp_test( TIMEOUT 2000 BINARY_NAME gtest_raft SOURCES ${gsrcs} - LIBRARIES v::raft v::storage_test_utils v::model_test_utils v::features v::gtest_main + LIBRARIES v::raft v::storage_test_utils v::model_test_utils v::features v::gtest_main v::raft_fixture LABELS raft ARGS "-- -c 8 -m 4G" ) @@ -68,7 +74,3 @@ rp_test( LABELS raft ) -v_cc_library( - NAME raft_fixture - SRCS raft_fixture.cc - DEPS Seastar::seastar v::raft v::gtest_main) diff --git a/src/v/raft/tests/failure_injectable_log.cc b/src/v/raft/tests/failure_injectable_log.cc new file mode 100644 index 000000000000..d55e12afb9a2 --- /dev/null +++ b/src/v/raft/tests/failure_injectable_log.cc @@ -0,0 +1,243 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#include "raft/tests/failure_injectable_log.h" +namespace raft { + +failure_injectable_log::failure_injectable_log( + ss::shared_ptr underlying_log) noexcept + : storage::log(underlying_log->config().copy()) + , _underlying_log(std::move(underlying_log)) {} + +ss::future<> failure_injectable_log::start( + std::optional cfg) { + return _underlying_log->start(cfg); +} + +ss::future<> +failure_injectable_log::housekeeping(storage::housekeeping_config cfg) { + return _underlying_log->housekeeping(cfg); +} + +ss::future<> failure_injectable_log::truncate(storage::truncate_config cfg) { + return _underlying_log->truncate(cfg); +} + +ss::future<> +failure_injectable_log::truncate_prefix(storage::truncate_prefix_config tpc) { + return _underlying_log->truncate_prefix(tpc); +} +ss::future<> failure_injectable_log::gc(storage::gc_config cfg) { + return _underlying_log->gc(cfg); +} + +ss::future<> failure_injectable_log::apply_segment_ms() { + return _underlying_log->apply_segment_ms(); +}; + +ss::future +failure_injectable_log::make_reader(storage::log_reader_config cfg) { + return _underlying_log->make_reader(cfg); +} +namespace { +struct delay_introducing_appender : public storage::log_appender::impl { + delay_introducing_appender( + storage::log_appender underlying, append_delay_generator generator) + : _underlying(std::move(underlying)) + , _append_delay_generator(std::move(generator)) {} + + /// non-owning reference - do not steal the iobuf + ss::future operator()(model::record_batch& b) final { + co_await ss::sleep(_append_delay_generator()); + co_return co_await _underlying(b); + } + + ss::future end_of_stream() final { + return _underlying.end_of_stream(); + } + storage::log_appender _underlying; + append_delay_generator _append_delay_generator; +}; +} // namespace + +storage::log_appender +failure_injectable_log::make_appender(storage::log_append_config cfg) { + if (_append_delay_generator) { + return storage::log_appender( + std::make_unique( + _underlying_log->make_appender(cfg), *_append_delay_generator)); + } + + return _underlying_log->make_appender(cfg); +} + +ss::future> failure_injectable_log::close() { + return _underlying_log->close(); +} + +ss::future<> failure_injectable_log::remove() { + return _underlying_log->remove(); +} + +ss::future<> failure_injectable_log::flush() { + return _underlying_log->flush(); +} + +ss::future> +failure_injectable_log::timequery(storage::timequery_config cfg) { + return _underlying_log->timequery(cfg); +} + +ss::lw_shared_ptr +failure_injectable_log::get_offset_translator_state() const { + return _underlying_log->get_offset_translator_state(); +} + +model::offset_delta +failure_injectable_log::offset_delta(model::offset o) const { + return _underlying_log->offset_delta(o); +} + +model::offset failure_injectable_log::from_log_offset(model::offset o) const { + return _underlying_log->from_log_offset(o); +} + +model::offset failure_injectable_log::to_log_offset(model::offset o) const { + return _underlying_log->to_log_offset(o); +} + +bool failure_injectable_log::is_new_log() const { + return _underlying_log->is_new_log(); +} + +size_t failure_injectable_log::segment_count() const { + return _underlying_log->segment_count(); +} + +storage::offset_stats failure_injectable_log::offsets() const { + return _underlying_log->offsets(); +} + +size_t failure_injectable_log::get_log_truncation_counter() const noexcept { + return _underlying_log->get_log_truncation_counter(); +} + +model::offset failure_injectable_log::find_last_term_start_offset() const { + return _underlying_log->find_last_term_start_offset(); +} + +model::timestamp failure_injectable_log::start_timestamp() const { + return _underlying_log->start_timestamp(); +} + +std::ostream& +failure_injectable_log::failure_injectable_log::print(std::ostream& o) const { + return _underlying_log->print(o); +} + +std::optional +failure_injectable_log::get_term(model::offset o) const { + return _underlying_log->get_term(o); +} + +std::optional +failure_injectable_log::get_term_last_offset(model::term_id t) const { + return _underlying_log->get_term_last_offset(t); +} + +std::optional +failure_injectable_log::index_lower_bound(model::offset o) const { + return _underlying_log->index_lower_bound(o); +} + +ss::future +failure_injectable_log::monitor_eviction(ss::abort_source& as) { + return _underlying_log->monitor_eviction(as); +} + +size_t failure_injectable_log::size_bytes() const { + return _underlying_log->size_bytes(); +} + +uint64_t +failure_injectable_log::size_bytes_after_offset(model::offset o) const { + return _underlying_log->size_bytes_after_offset(o); +} + +ss::future> +failure_injectable_log::offset_range_size( + model::offset first, model::offset last, ss::io_priority_class io_priority) { + return _underlying_log->offset_range_size(first, last, io_priority); +} + +ss::future> +failure_injectable_log::offset_range_size( + model::offset first, + offset_range_size_requirements_t target, + ss::io_priority_class io_priority) { + return _underlying_log->offset_range_size(first, target, io_priority); +} + +bool failure_injectable_log::is_compacted( + model::offset first, model::offset last) const { + return _underlying_log->is_compacted(first, last); +} + +void failure_injectable_log::set_overrides( + storage::ntp_config::default_overrides overrides) { + mutable_config().set_overrides(overrides); + return _underlying_log->set_overrides(overrides); +} + +bool failure_injectable_log::notify_compaction_update() { + return _underlying_log->notify_compaction_update(); +} + +int64_t failure_injectable_log::compaction_backlog() const { + return _underlying_log->compaction_backlog(); +} + +ss::future +failure_injectable_log::disk_usage(storage::gc_config cfg) { + return _underlying_log->disk_usage(cfg); +} + +ss::future +failure_injectable_log::get_reclaimable_offsets(storage::gc_config cfg) { + return _underlying_log->get_reclaimable_offsets(cfg); +} + +void failure_injectable_log::set_cloud_gc_offset(model::offset o) { + return _underlying_log->set_cloud_gc_offset(o); +} + +const storage::segment_set& failure_injectable_log::segments() const { + return _underlying_log->segments(); +} +storage::segment_set& failure_injectable_log::segments() { + return _underlying_log->segments(); +} + +ss::future<> failure_injectable_log::force_roll(ss::io_priority_class iop) { + return _underlying_log->force_roll(iop); +} + +storage::probe& failure_injectable_log::get_probe() { + return _underlying_log->get_probe(); +} + +size_t failure_injectable_log::reclaimable_size_bytes() const { + return _underlying_log->reclaimable_size_bytes(); +} + +std::optional +failure_injectable_log::retention_offset(storage::gc_config cfg) const { + return _underlying_log->retention_offset(cfg); +} + +} // namespace raft diff --git a/src/v/raft/tests/failure_injectable_log.h b/src/v/raft/tests/failure_injectable_log.h new file mode 100644 index 000000000000..b8d1b14fc573 --- /dev/null +++ b/src/v/raft/tests/failure_injectable_log.h @@ -0,0 +1,142 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once +#include "storage/log.h" +#include "storage/offset_translator_state.h" + +namespace raft { + +using append_delay_generator = std::function; + +class failure_injectable_log final : public storage::log { +public: + // sets the generator for the delay to be injected when appending a record + // batch to the log + void set_append_delay(std::optional generator) { + _append_delay_generator = std::move(generator); + } + +public: + explicit failure_injectable_log( + ss::shared_ptr underlying_log) noexcept; + + failure_injectable_log(failure_injectable_log&&) noexcept = delete; + failure_injectable_log& operator=(failure_injectable_log&&) noexcept + = delete; + failure_injectable_log(const failure_injectable_log&) = delete; + failure_injectable_log& operator=(const failure_injectable_log&) = delete; + ~failure_injectable_log() noexcept final = default; + + ss::future<> + start(std::optional cfg) final; + ss::future<> housekeeping(storage::housekeeping_config cfg) final; + + ss::future<> truncate(storage::truncate_config) final; + + ss::future<> truncate_prefix(storage::truncate_prefix_config) final; + ss::future<> gc(storage::gc_config) final; + ss::future<> apply_segment_ms() final; + + ss::future + make_reader(storage::log_reader_config) final; + + storage::log_appender make_appender(storage::log_append_config) final; + + ss::future> close() final; + + ss::future<> remove() final; + + ss::future<> flush() final; + + ss::future> + timequery(storage::timequery_config) final; + + ss::lw_shared_ptr + get_offset_translator_state() const final; + + model::offset_delta offset_delta(model::offset) const final; + + model::offset from_log_offset(model::offset) const final; + + model::offset to_log_offset(model::offset) const final; + + bool is_new_log() const final; + + size_t segment_count() const final; + + storage::offset_stats offsets() const final; + + size_t get_log_truncation_counter() const noexcept final; + + model::offset find_last_term_start_offset() const final; + + model::timestamp start_timestamp() const final; + + std::ostream& print(std::ostream& o) const final; + + std::optional get_term(model::offset) const final; + + std::optional + get_term_last_offset(model::term_id) const final; + + std::optional index_lower_bound(model::offset o) const final; + + ss::future monitor_eviction(ss::abort_source&) final; + + size_t size_bytes() const final; + + uint64_t size_bytes_after_offset(model::offset o) const final; + + ss::future> + offset_range_size( + model::offset first, + model::offset last, + ss::io_priority_class io_priority) final; + + ss::future> offset_range_size( + model::offset first, + offset_range_size_requirements_t target, + ss::io_priority_class io_priority) final; + + bool is_compacted(model::offset first, model::offset last) const final; + + void set_overrides(storage::ntp_config::default_overrides) final; + + bool notify_compaction_update() final; + + int64_t compaction_backlog() const final; + + ss::future disk_usage(storage::gc_config) final; + + ss::future + get_reclaimable_offsets(storage::gc_config cfg) final; + + void set_cloud_gc_offset(model::offset) final; + + const storage::segment_set& segments() const final; + storage::segment_set& segments() final; + + ss::future<> force_roll(ss::io_priority_class) final; + + storage::probe& get_probe() final; + + size_t reclaimable_size_bytes() const final; + + std::optional + retention_offset(storage::gc_config) const final; + +private: + ss::shared_ptr _underlying_log; + std::optional _append_delay_generator; + friend std::ostream& + operator<<(std::ostream& o, const failure_injectable_log& lg) { + return lg.print(o); + } +}; +} // namespace raft diff --git a/src/v/raft/tests/raft_fixture.cc b/src/v/raft/tests/raft_fixture.cc index 20f12b8fb3b7..15a1dbf80c02 100644 --- a/src/v/raft/tests/raft_fixture.cc +++ b/src/v/raft/tests/raft_fixture.cc @@ -28,6 +28,7 @@ #include "raft/heartbeat_manager.h" #include "raft/heartbeats.h" #include "raft/state_machine_manager.h" +#include "raft/tests/failure_injectable_log.h" #include "raft/timeout_jitter.h" #include "raft/types.h" #include "random/generators.h" @@ -445,13 +446,13 @@ raft_node_instance::initialise(std::vector initial_nodes) { test_group, _with_offset_translation ? model::offset_translator_batch_types() : std::vector{}); - + _f_log = ss::make_shared(std::move(log)); _raft = ss::make_lw_shared( _id, test_group, raft::group_configuration(std::move(initial_nodes), _revision), timeout_jitter(_election_timeout), - log, + _f_log, scheduling_config( ss::default_scheduling_group(), ss::default_priority_class()), config::mock_binding(1s), @@ -487,6 +488,8 @@ ss::future<> raft_node_instance::stop() { vlog(_logger.debug, "stopping protocol"); co_await _buffered_protocol->stop(); co_await _protocol->stop(); + // release f_log pointer before stopping raft + _f_log = nullptr; vlog(_logger.debug, "stopping raft"); co_await _raft->stop(); vlog(_logger.debug, "stopping recovery throttle"); diff --git a/src/v/raft/tests/raft_fixture.h b/src/v/raft/tests/raft_fixture.h index 1f63e83cefa9..08458e2b12db 100644 --- a/src/v/raft/tests/raft_fixture.h +++ b/src/v/raft/tests/raft_fixture.h @@ -26,6 +26,7 @@ #include "raft/recovery_memory_quota.h" #include "raft/service.h" #include "raft/state_machine_manager.h" +#include "raft/tests/failure_injectable_log.h" #include "raft/types.h" #include "ssx/sformat.h" #include "storage/api.h" @@ -265,6 +266,7 @@ class raft_node_instance : public ss::weakly_referencable { storage::kvstore& get_kvstore() { return _storage.local().kvs(); } + ss::shared_ptr f_injectable_log() { return _f_log; } service_t& get_service() { return _service; } @@ -293,6 +295,7 @@ class raft_node_instance : public ss::weakly_referencable { ss::sharded _group_manager; fixture_shard_manager _shard_manager{}; service_t _service; + ss::shared_ptr _f_log; }; class raft_fixture From 8b57b4210166d2e8804c3da39421f79951228769 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 17 Dec 2024 11:48:57 +0100 Subject: [PATCH 6/7] r/heartbeat_manager: do not treat follower busy err as missed heartbeat MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When follower is busy it may fail fast processing full heartbeat requests sent by the leader. In this case a follower RPC handler sets the `follower_busy` result in heartbeat_reply. Leader should still treat a follower replica as online in this case. The replica hosting node must be online to reply with the `follower_busy` error. This way we prevent to eager leader step downs when follower replicas are slow. Signed-off-by: Michał Maślanka --- src/v/raft/consensus.cc | 12 +++++++----- src/v/raft/heartbeat_manager.cc | 23 +++++++++++++++-------- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 9d577b0d8218..9700b4afce3f 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -338,6 +338,13 @@ consensus::success_reply consensus::update_follower_index( // current node may change it. return success_reply::yes; } + + if (unlikely(r.value().result == reply_result::follower_busy)) { + // ignore this response, timed out on the receiver node + vlog(_ctxlog.trace, "Follower busy on node {}", node.id()); + return success_reply::no; + } + const auto& config = _configuration_manager.get_latest(); if (!config.contains(node)) { // We might have sent an append_entries just before removing @@ -373,11 +380,6 @@ consensus::success_reply consensus::update_follower_index( physical_node); return success_reply::no; } - if (unlikely(reply.result == reply_result::follower_busy)) { - // ignore this response, timed out on the receiver node - vlog(_ctxlog.trace, "Follower busy on node {}", node.id()); - return success_reply::no; - } if (unlikely(reply.result == reply_result::group_unavailable)) { // ignore this response since group is not yet bootstrapped at the // follower diff --git a/src/v/raft/heartbeat_manager.cc b/src/v/raft/heartbeat_manager.cc index bd7ef8a8afc9..e67ba131c4d5 100644 --- a/src/v/raft/heartbeat_manager.cc +++ b/src/v/raft/heartbeat_manager.cc @@ -373,14 +373,18 @@ void heartbeat_manager::process_reply( n); return; } - + /** + * This is here for completeness, it should never be triggered as the + * follower do not reply with busy error code when processing + * lightweight heartbeats + */ if (unlikely(result == reply_result::follower_busy)) { vlog( - hbeatlog.debug, - "Heartbeat request for group {} timed out on the node {}", + hbeatlog.error, + "Follower reported busy for group {} on node {} when processing " + "lightweight heartbeat", group, n); - return; } if (unlikely(target != consensus->self().id())) { vlog( @@ -438,14 +442,17 @@ void heartbeat_manager::process_reply( n); continue; } - + /** + * Follower being busy is updating the last received reply timestamp as + * it is indicating the receiving replica is alive and is able to + * process request, it may simply be slow and its oplock is contended. + */ if (unlikely(m.result == reply_result::follower_busy)) { vlog( - hbeatlog.debug, - "Heartbeat request for group {} timed out on the node {}", + hbeatlog.trace, + "Follower busy when processing full heartbeat for group {} on {}", m.group, n); - continue; } if (unlikely(reply.target() != consensus->self().id())) { From 67e7c6ea213704f8c3edb30028a9f3a0da517da3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 17 Dec 2024 11:52:25 +0100 Subject: [PATCH 7/7] r/tests: added test checking if leadership is stable after timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Maślanka --- src/v/raft/tests/basic_raft_fixture_test.cc | 26 +++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/v/raft/tests/basic_raft_fixture_test.cc b/src/v/raft/tests/basic_raft_fixture_test.cc index 2050b11ce068..f26468a55176 100644 --- a/src/v/raft/tests/basic_raft_fixture_test.cc +++ b/src/v/raft/tests/basic_raft_fixture_test.cc @@ -836,3 +836,29 @@ TEST_F_CORO(raft_fixture, leadership_transfer_delay) { ASSERT_LE_CORO(election_time * 1.0, transfer_time * tolerance_multiplier); ASSERT_GE_CORO(election_time * 1.0, transfer_time / tolerance_multiplier); } + +TEST_F_CORO(raft_fixture, test_no_stepdown_on_append_entries_timeout) { + config::shard_local_cfg().replicate_append_timeout_ms.set_value(1s); + co_await create_simple_group(3); + auto leader_id = co_await wait_for_leader(10s); + for (auto& [id, n] : nodes()) { + if (id != leader_id) { + n->f_injectable_log()->set_append_delay([]() { return 5s; }); + } + } + + auto& leader_node = node(leader_id); + auto term_before = leader_node.raft()->term(); + auto r = co_await leader_node.raft()->replicate( + make_batches(1, 10, 128), + replicate_options(consistency_level::quorum_ack, 10s)); + ASSERT_FALSE_CORO(r.has_error()); + for (auto& [_, n] : nodes()) { + n->f_injectable_log()->set_append_delay(std::nullopt); + } + + leader_id = co_await wait_for_leader(10s); + auto& new_leader_node = node(leader_id); + ASSERT_EQ_CORO(term_before, new_leader_node.raft()->term()); + ASSERT_TRUE_CORO(new_leader_node.raft()->is_leader()); +}