9
9
10
10
#include " base/vlog.h"
11
11
#include " container/fragmented_vector.h"
12
+ #include " model/namespace.h"
12
13
#include " model/record_batch_types.h"
13
14
#include " model/timeout_clock.h"
14
15
#include " random/generators.h"
@@ -122,11 +123,24 @@ struct ot_state_consumer {
122
123
// / by the set of segment base offset values.
123
124
ss::future<ot_state> arrange_and_compact (
124
125
const fragmented_vector<model::record_batch>& batches,
125
- std::deque<model::offset> arrangement) {
126
+ std::deque<model::offset> arrangement,
127
+ bool simulate_internal_topic_compaction = false ) {
126
128
std::sort (arrangement.begin (), arrangement.end ());
127
- storage::disk_log_builder b1;
129
+ storage::log_config cfg = storage::log_builder_config ();
130
+ auto offset_translator_types = model::offset_translator_batch_types ();
131
+ auto raft_group_id = raft::group_id{0 };
132
+ storage::disk_log_builder b1 (cfg, offset_translator_types, raft_group_id);
133
+
134
+ auto ns = simulate_internal_topic_compaction
135
+ ? model::kafka_internal_namespace
136
+ : model::kafka_namespace;
137
+ model::ntp log_ntp (
138
+ ns,
139
+ model::topic_partition (
140
+ model::topic (random_generators::gen_alphanum_string (8 )),
141
+ model::partition_id{0 }));
128
142
std::exception_ptr error = nullptr ;
129
- co_await b1.start ();
143
+ co_await b1.start (log_ntp );
130
144
try {
131
145
for (const auto & b : batches) {
132
146
co_await b1.add_batch (b.copy ());
@@ -138,11 +152,13 @@ ss::future<ot_state> arrange_and_compact(
138
152
}
139
153
}
140
154
ss::abort_source as;
141
- co_await b1. apply_compaction ( storage::compaction_config (
155
+ auto compact_cfg = storage::compaction_config (
142
156
batches.back ().last_offset (),
143
157
std::nullopt,
144
158
ss::default_priority_class (),
145
- as));
159
+ as);
160
+ co_await b1.apply_sliding_window_compaction (compact_cfg);
161
+ co_await b1.apply_adjacent_merge_compaction (compact_cfg);
146
162
} catch (...) {
147
163
error = std::current_exception ();
148
164
}
@@ -185,11 +201,25 @@ std::deque<model::offset> generate_random_arrangement(
185
201
SEASTAR_THREAD_TEST_CASE (test_compaction_with_different_segment_arrangements) {
186
202
auto batches = generate_random_record_batches (1000 , 10 );
187
203
auto expected_ot
188
- = arrange_and_compact (batches, std::deque<model::offset>{}).get ();
204
+ = arrange_and_compact (batches, std::deque<model::offset>{}, false ).get ();
205
+ std::vector<size_t > num_segments = {10 , 100 , 1000 };
206
+ for (auto num : num_segments) {
207
+ auto arrangement = generate_random_arrangement (batches, num);
208
+ auto actual_ot = arrange_and_compact (batches, arrangement, false ).get ();
209
+ BOOST_REQUIRE (expected_ot.gap_offset == actual_ot.gap_offset );
210
+ BOOST_REQUIRE (expected_ot.gap_length == actual_ot.gap_length );
211
+ }
212
+ }
213
+
214
+ SEASTAR_THREAD_TEST_CASE (
215
+ test_compaction_with_different_segment_arrangements_simulate_internal_topic) {
216
+ auto batches = generate_random_record_batches (1000 , 10 );
217
+ auto expected_ot
218
+ = arrange_and_compact (batches, std::deque<model::offset>{}, true ).get ();
189
219
std::vector<size_t > num_segments = {10 , 100 , 1000 };
190
220
for (auto num : num_segments) {
191
221
auto arrangement = generate_random_arrangement (batches, num);
192
- auto actual_ot = arrange_and_compact (batches, arrangement).get ();
222
+ auto actual_ot = arrange_and_compact (batches, arrangement, true ).get ();
193
223
BOOST_REQUIRE (expected_ot.gap_offset == actual_ot.gap_offset );
194
224
BOOST_REQUIRE (expected_ot.gap_length == actual_ot.gap_length );
195
225
}
0 commit comments