@@ -87,18 +87,19 @@ where
87
87
88
88
Ok ( ( ) )
89
89
}
90
- }
91
90
92
- impl < T > ClusterControllerState < T >
93
- where
94
- T : TransportConnect ,
95
- {
96
- pub async fn run ( & mut self ) -> anyhow:: Result < ( ) > {
91
+ pub async fn on_leader_event ( & mut self , leader_event : LeaderEvent ) -> anyhow:: Result < ( ) > {
97
92
match self {
98
- Self :: Follower => {
99
- futures:: future:: pending :: < ( ) > ( ) . await ;
100
- Ok ( ( ) )
101
- }
93
+ ClusterControllerState :: Follower => Ok ( ( ) ) ,
94
+ ClusterControllerState :: Leader ( leader) => leader. on_leader_event ( leader_event) . await ,
95
+ }
96
+ }
97
+
98
+ /// Runs the cluster controller state related tasks. It returns [`LeaderEvent`] which need to
99
+ /// be processed by calling [`Self::on_leader_event`].
100
+ pub async fn run ( & mut self ) -> anyhow:: Result < LeaderEvent > {
101
+ match self {
102
+ Self :: Follower => futures:: future:: pending :: < anyhow:: Result < _ > > ( ) . await ,
102
103
Self :: Leader ( leader) => leader. run ( ) . await ,
103
104
}
104
105
}
@@ -125,6 +126,15 @@ where
125
126
}
126
127
}
127
128
129
+ /// Events that are emitted by a leading cluster controller that need to be processed explicitly
130
+ /// because their operations are not cancellation safe.
131
+ #[ derive( Debug ) ]
132
+ pub enum LeaderEvent {
133
+ TrimLogs ,
134
+ LogsUpdate ,
135
+ PartitionTableUpdate ,
136
+ }
137
+
128
138
pub struct Leader < T > {
129
139
metadata : Metadata ,
130
140
bifrost : Bifrost ,
@@ -224,48 +234,82 @@ where
224
234
create_log_trim_interval ( & configuration. admin ) ;
225
235
}
226
236
227
- async fn run ( & mut self ) -> anyhow:: Result < ( ) > {
228
- let bifrost_admin = BifrostAdmin :: new (
229
- & self . bifrost ,
230
- & self . metadata_writer ,
231
- & self . metadata_store_client ,
232
- ) ;
233
-
237
+ async fn run ( & mut self ) -> anyhow:: Result < LeaderEvent > {
234
238
loop {
235
239
tokio:: select! {
236
240
_ = self . find_logs_tail_interval. tick( ) => {
237
241
self . logs_controller. find_logs_tail( ) ;
238
242
}
239
243
_ = OptionFuture :: from( self . log_trim_interval. as_mut( ) . map( |interval| interval. tick( ) ) ) => {
240
- let result = self . trim_logs( bifrost_admin) . await ;
241
-
242
- if let Err ( err) = result {
243
- warn!( "Could not trim the logs. This can lead to increased disk usage: {err}" ) ;
244
- }
244
+ return Ok ( LeaderEvent :: TrimLogs ) ;
245
245
}
246
246
result = self . logs_controller. run_async_operations( ) => {
247
247
result?;
248
248
}
249
249
Ok ( _) = self . logs_watcher. changed( ) => {
250
- self . logs_controller. on_logs_update( self . metadata. logs_ref( ) ) ?;
251
- // tell the scheduler about potentially newly provisioned logs
252
- self . scheduler. on_logs_update( self . logs. live_load( ) , self . partition_table. live_load( ) ) . await ?
250
+ return Ok ( LeaderEvent :: LogsUpdate ) ;
251
+
253
252
}
254
253
Ok ( _) = self . partition_table_watcher. changed( ) => {
255
- let partition_table = self . partition_table. live_load( ) ;
256
- let logs = self . logs. live_load( ) ;
257
-
258
- self . logs_controller. on_partition_table_update( partition_table) ;
259
- self . scheduler. on_logs_update( logs, partition_table) . await ?;
254
+ return Ok ( LeaderEvent :: PartitionTableUpdate ) ;
260
255
}
261
256
}
262
257
}
263
258
}
264
259
265
- async fn trim_logs (
266
- & self ,
267
- bifrost_admin : BifrostAdmin < ' _ > ,
268
- ) -> Result < ( ) , restate_bifrost:: Error > {
260
+ pub async fn on_leader_event ( & mut self , leader_event : LeaderEvent ) -> anyhow:: Result < ( ) > {
261
+ match leader_event {
262
+ LeaderEvent :: TrimLogs => {
263
+ self . trim_logs ( ) . await ;
264
+ }
265
+ LeaderEvent :: LogsUpdate => {
266
+ self . on_logs_update ( ) . await ?;
267
+ }
268
+ LeaderEvent :: PartitionTableUpdate => {
269
+ self . on_partition_table_update ( ) . await ?;
270
+ }
271
+ }
272
+
273
+ Ok ( ( ) )
274
+ }
275
+
276
+ async fn on_logs_update ( & mut self ) -> anyhow:: Result < ( ) > {
277
+ self . logs_controller
278
+ . on_logs_update ( self . metadata . logs_ref ( ) ) ?;
279
+ // tell the scheduler about potentially newly provisioned logs
280
+ self . scheduler
281
+ . on_logs_update ( self . logs . live_load ( ) , self . partition_table . live_load ( ) )
282
+ . await ?;
283
+
284
+ Ok ( ( ) )
285
+ }
286
+
287
+ async fn on_partition_table_update ( & mut self ) -> anyhow:: Result < ( ) > {
288
+ let partition_table = self . partition_table . live_load ( ) ;
289
+ let logs = self . logs . live_load ( ) ;
290
+
291
+ self . logs_controller
292
+ . on_partition_table_update ( partition_table) ;
293
+ self . scheduler . on_logs_update ( logs, partition_table) . await ?;
294
+
295
+ Ok ( ( ) )
296
+ }
297
+
298
+ async fn trim_logs ( & self ) {
299
+ let result = self . trim_logs_inner ( ) . await ;
300
+
301
+ if let Err ( err) = result {
302
+ warn ! ( "Could not trim the logs. This can lead to increased disk usage: {err}" ) ;
303
+ }
304
+ }
305
+
306
+ async fn trim_logs_inner ( & self ) -> Result < ( ) , restate_bifrost:: Error > {
307
+ let bifrost_admin = BifrostAdmin :: new (
308
+ & self . bifrost ,
309
+ & self . metadata_writer ,
310
+ & self . metadata_store_client ,
311
+ ) ;
312
+
269
313
let cluster_state = self . cluster_state_watcher . current ( ) ;
270
314
271
315
let mut persisted_lsns_per_partition: BTreeMap <
0 commit comments