@@ -418,6 +418,13 @@ void admin_server::register_debug_routes() {
418
418
return get_partition_state_handler (std::move (req));
419
419
});
420
420
421
+ register_route<user>(
422
+ seastar::httpd::debug_json::get_partition_producers,
423
+ [this ](std::unique_ptr<ss::http::request> req)
424
+ -> ss::future<ss::json::json_return_type> {
425
+ return get_producers_state_handler (std::move (req));
426
+ });
427
+
421
428
register_route<superuser>(
422
429
ss::httpd::debug_json::cpu_profile,
423
430
[this ](std::unique_ptr<ss::http::request> req)
@@ -810,6 +817,79 @@ admin_server::get_partition_state_handler(
810
817
co_return ss::json::json_return_type (std::move (response));
811
818
}
812
819
820
+ ss::future<ss::json::json_return_type>
821
+ admin_server::get_producers_state_handler (
822
+ std::unique_ptr<ss::http::request> req) {
823
+ const model::ntp ntp = parse_ntp_from_request (req->param );
824
+ auto timeout = std::chrono::duration_cast<model::timeout_clock::duration>(
825
+ 10s);
826
+ auto result = co_await _tx_gateway_frontend.local ().get_producers (
827
+ cluster::get_producers_request{ntp, timeout});
828
+ if (result.error_code != cluster::tx::errc::none) {
829
+ throw ss::httpd::server_error_exception (fmt::format (
830
+ " Error {} processing partition state for ntp: {}" ,
831
+ result.error_code ,
832
+ ntp));
833
+ }
834
+ vlog (
835
+ adminlog.debug ,
836
+ " producers for {}, size: {}" ,
837
+ ntp,
838
+ result.producers .size ());
839
+ ss::httpd::debug_json::partition_producers producers;
840
+ producers.ntp = fmt::format (" {}" , ntp);
841
+ for (auto & producer : result.producers ) {
842
+ ss::httpd::debug_json::partition_producer_state producer_state;
843
+ producer_state.id = producer.pid .id ();
844
+ producer_state.epoch = producer.pid .epoch ();
845
+ for (const auto & req : producer.inflight_requests ) {
846
+ ss::httpd::debug_json::idempotent_producer_request_state inflight;
847
+ inflight.first_sequence = req.first_sequence ;
848
+ inflight.last_sequence = req.last_sequence ;
849
+ inflight.term = req.term ();
850
+ producer_state.inflight_idempotent_requests .push (
851
+ std::move (inflight));
852
+ }
853
+ for (const auto & req : producer.finished_requests ) {
854
+ ss::httpd::debug_json::idempotent_producer_request_state finished;
855
+ finished.first_sequence = req.first_sequence ;
856
+ finished.last_sequence = req.last_sequence ;
857
+ finished.term = req.term ();
858
+ producer_state.finished_idempotent_requests .push (
859
+ std::move (finished));
860
+ }
861
+ if (producer.last_update ) {
862
+ producer_state.last_update_timestamp
863
+ = producer.last_update .value ()();
864
+ }
865
+ if (producer.tx_begin_offset ) {
866
+ producer_state.transaction_begin_offset
867
+ = producer.tx_begin_offset .value ();
868
+ }
869
+ if (producer.tx_end_offset ) {
870
+ producer_state.transaction_last_offset
871
+ = producer.tx_end_offset .value ();
872
+ }
873
+ if (producer.tx_seq ) {
874
+ producer_state.transaction_sequence = producer.tx_seq .value ();
875
+ }
876
+ if (producer.tx_timeout ) {
877
+ producer_state.transaction_timeout_ms
878
+ = producer.tx_timeout .value ().count ();
879
+ }
880
+ if (producer.coordinator_partition ) {
881
+ producer_state.transaction_coordinator_partition
882
+ = producer.coordinator_partition .value ()();
883
+ }
884
+ if (producer.group_id ) {
885
+ producer_state.transaction_group_id = producer.group_id .value ();
886
+ }
887
+ producers.producers .push (std::move (producer_state));
888
+ co_await ss::coroutine::maybe_yield ();
889
+ }
890
+ co_return ss::json::json_return_type (std::move (producers));
891
+ }
892
+
813
893
ss::future<ss::json::json_return_type> admin_server::get_node_uuid_handler () {
814
894
ss::httpd::debug_json::broker_uuid uuid;
815
895
uuid.node_uuid = ssx::sformat (
0 commit comments