@@ -2,6 +2,10 @@ use std::path::Path;
22use std:: sync:: Arc ;
33
44use anyhow:: { Context , Result } ;
5+ use database:: actions:: fingerprint:: compare_all_pairs;
6+ use database:: actions:: fingerprint:: compute_file_fingerprints;
7+ use database:: actions:: fingerprint:: mark_duplicate_files;
8+ use database:: actions:: fingerprint:: Configuration ;
59use log:: { debug, info, warn} ;
610use tokio:: { sync:: Mutex , task} ;
711use tokio_util:: sync:: CancellationToken ;
@@ -279,6 +283,131 @@ impl Signal for AnalyzeAudioLibraryRequest {
279283 }
280284}
281285
286+ impl ParamsExtractor for DeduplicateAudioLibraryRequest {
287+ type Params = (
288+ Arc < MainDbConnection > ,
289+ Arc < Mutex < TaskTokens > > ,
290+ Arc < dyn Broadcaster > ,
291+ ) ;
292+
293+ fn extract_params ( & self , all_params : & GlobalParams ) -> Self :: Params {
294+ (
295+ Arc :: clone ( & all_params. main_db ) ,
296+ Arc :: clone ( & all_params. task_tokens ) ,
297+ Arc :: clone ( & all_params. broadcaster ) ,
298+ )
299+ }
300+ }
301+
302+ impl Signal for DeduplicateAudioLibraryRequest {
303+ type Params = (
304+ Arc < MainDbConnection > ,
305+ Arc < Mutex < TaskTokens > > ,
306+ Arc < dyn Broadcaster > ,
307+ ) ;
308+ type Response = ( ) ;
309+
310+ async fn handle (
311+ & self ,
312+ ( main_db, task_tokens, broadcaster) : Self :: Params ,
313+ _session : Option < Session > ,
314+ dart_signal : & Self ,
315+ ) -> Result < Option < Self :: Response > > {
316+ let mut tokens = task_tokens. lock ( ) . await ;
317+ if let Some ( token) = tokens. deduplicate_token . take ( ) {
318+ token. cancel ( ) ;
319+ }
320+
321+ let new_token = CancellationToken :: new ( ) ;
322+ tokens. deduplicate_token = Some ( new_token. clone ( ) ) ;
323+ drop ( tokens) ;
324+
325+ let request = dart_signal. clone ( ) ;
326+ let request_path = Arc :: new ( request. path . clone ( ) ) ;
327+ let batch_size = determine_batch_size ( request. workload_factor ) ;
328+ let config = Configuration :: default ( ) ;
329+ let similarity_threshold = request. similarity_threshold ;
330+
331+ let request_path_clone = request_path. clone ( ) ;
332+ task:: spawn_blocking ( move || {
333+ let rt = tokio:: runtime:: Runtime :: new ( ) . unwrap ( ) ;
334+
335+ let request_path_clone = request_path_clone. clone ( ) ;
336+ rt. block_on ( async {
337+ // Stage 1: Compute fingerprints (0% - 33%)
338+ let broadcaster_clone = Arc :: clone ( & broadcaster) ;
339+ let progress_path = request_path_clone. to_string ( ) ;
340+
341+ let request_path_clone = request_path_clone. to_string ( ) ;
342+ compute_file_fingerprints (
343+ & main_db,
344+ Path :: new ( & request_path_clone) ,
345+ batch_size,
346+ move |cur, total| {
347+ let progress = cur as f32 / total as f32 * 0.33 ;
348+
349+ broadcaster_clone. broadcast ( & DeduplicateAudioLibraryProgress {
350+ path : progress_path. clone ( ) ,
351+ progress : ( progress * 100.0 ) as i32 ,
352+ total : 100 ,
353+ } )
354+ } ,
355+ Some ( new_token. clone ( ) ) ,
356+ )
357+ . await ?;
358+
359+ // Stage 2: Compare all pairs (33% - 66%)
360+ let broadcaster_clone = Arc :: clone ( & broadcaster) ;
361+ let progress_path = request_path_clone. to_string ( ) ;
362+
363+ compare_all_pairs (
364+ & main_db,
365+ batch_size,
366+ move |cur, total| {
367+ let progress = 0.33 + cur as f32 / total as f32 * 0.33 ;
368+
369+ broadcaster_clone. broadcast ( & DeduplicateAudioLibraryProgress {
370+ path : progress_path. clone ( ) ,
371+ progress : ( progress * 100.0 ) as i32 ,
372+ total : 100 ,
373+ } ) ;
374+ } ,
375+ & config,
376+ Some ( Arc :: new ( new_token. clone ( ) ) ) ,
377+ 1000 ,
378+ )
379+ . await ?;
380+
381+ // Stage 3: Mark duplicates (66% - 100%)
382+ if !new_token. is_cancelled ( ) {
383+ let broadcaster_clone = Arc :: clone ( & broadcaster) ;
384+ let progress_path = request_path_clone. to_string ( ) ;
385+
386+ mark_duplicate_files ( & main_db, similarity_threshold, move |cur, total| {
387+ let progress = 0.66 + cur as f32 / total as f32 * 0.34 ;
388+
389+ broadcaster_clone. broadcast ( & DeduplicateAudioLibraryProgress {
390+ path : progress_path. clone ( ) ,
391+ progress : ( progress * 100.0 ) as i32 ,
392+ total : 100 ,
393+ } ) ;
394+ } )
395+ . await ?;
396+ }
397+
398+ let request_path_clone = request_path_clone. to_string ( ) ;
399+ broadcaster. broadcast ( & DeduplicateAudioLibraryResponse {
400+ path : request_path_clone,
401+ } ) ;
402+
403+ Ok :: < ( ) , anyhow:: Error > ( ( ) )
404+ } )
405+ } ) ;
406+
407+ Ok ( Some ( ( ) ) )
408+ }
409+ }
410+
282411impl ParamsExtractor for CancelTaskRequest {
283412 type Params = ( Arc < Mutex < TaskTokens > > , ) ;
284413
0 commit comments