@@ -27,19 +27,46 @@ use curvine_common::proto::{
2727 CancelLoadRequest , CancelLoadResponse , LoadState , LoadTaskReportRequest , LoadTaskRequest ,
2828 LoadTaskResponse ,
2929} ;
30- use curvine_common:: state:: { WorkerAddress , TtlAction } ;
30+ use curvine_common:: state:: { TtlAction , WorkerAddress } ;
31+ use curvine_ufs:: fs:: UfsClient ;
3132use dashmap:: DashMap ;
3233use log:: { error, info, warn} ;
3334use orpc:: client:: ClientFactory ;
3435use orpc:: common:: DurationUnit ;
3536use orpc:: io:: net:: InetAddr ;
3637use orpc:: message:: { Builder , RequestStatus } ;
3738use orpc:: runtime:: { RpcRuntime , Runtime } ;
38- use orpc:: { try_err, try_log, try_option, CommonError , CommonResult } ;
39+ use orpc:: { try_err, try_log, try_option, CommonResult } ;
3940use std:: sync:: { Arc , Mutex } ;
4041use thiserror:: Error ;
41- use curvine_common:: FsResult ;
42- use curvine_ufs:: fs:: UfsClient ;
42+
43+ /// Directory processing context
44+ #[ derive( Clone ) ]
45+ pub struct DirectoryProcessContext {
46+ pub job_id : String ,
47+ pub source_path : String ,
48+ pub uri : CurvineURI ,
49+ pub ttl_duration_unit : Option < DurationUnit > ,
50+ pub recursive : bool ,
51+ }
52+
53+ impl DirectoryProcessContext {
54+ pub fn new (
55+ job_id : String ,
56+ source_path : String ,
57+ uri : CurvineURI ,
58+ ttl_duration_unit : Option < DurationUnit > ,
59+ recursive : bool ,
60+ ) -> Self {
61+ Self {
62+ job_id,
63+ source_path,
64+ uri,
65+ ttl_duration_unit,
66+ recursive,
67+ }
68+ }
69+ }
4370
4471/// Load the Task Manager
4572#[ derive( Clone ) ]
@@ -118,7 +145,7 @@ impl LoadManager {
118145 std:: io:: ErrorKind :: InvalidInput ,
119146 format ! ( "Cannot cancel job in state {}" , job. state) ,
120147 )
121- . into ( ) ) ;
148+ . into ( ) ) ;
122149 }
123150
124151 // Update status is Cancel
@@ -256,35 +283,52 @@ impl LoadManager {
256283 let ( uri, target_path, source_mtime) = self . validate_and_prepare_job ( path) . await ?;
257284
258285 // Check if reload is needed
259- let should_reload = self . check_file_modification ( path, & target_path, source_mtime) . await ?;
286+ let should_reload = self
287+ . check_file_modification ( path, & target_path, source_mtime)
288+ . await ?;
260289 if !should_reload {
261290 if let Some ( existing_job) = self . find_existing_job ( path) {
262- return Ok ( ( existing_job. job_id . clone ( ) , existing_job. target_path . clone ( ) ) ) ;
291+ return Ok ( (
292+ existing_job. job_id . clone ( ) ,
293+ existing_job. target_path . clone ( ) ,
294+ ) ) ;
263295 }
264296 }
265297
266298 // Remove duplicate jobs and create new job
267299 self . remove_duplicate_jobs_for_path ( path) . await ;
268- let ( job, ttl_duration_unit) = self . create_job_with_ttl ( path, & target_path, recursive, ttl) ?;
300+ let ( job, ttl_duration_unit) =
301+ self . create_job_with_ttl ( path, & target_path, recursive, ttl) ?;
269302 let job_id = job. job_id . clone ( ) ;
270-
303+
271304 // Add job to storage
272305 self . jobs . insert ( job_id. clone ( ) , job) ;
273306
274307 // Execute job asynchronously
275- self . execute_job_async ( job_id. clone ( ) , target_path. clone ( ) , path. to_string ( ) , uri, ttl_duration_unit, recursive) . await ;
308+ self . execute_job_async (
309+ job_id. clone ( ) ,
310+ target_path. clone ( ) ,
311+ path. to_string ( ) ,
312+ uri,
313+ ttl_duration_unit,
314+ recursive,
315+ )
316+ . await ;
276317
277318 Ok ( ( job_id, target_path) )
278319 }
279320
280- async fn validate_and_prepare_job ( & self , path : & str ) -> CommonResult < ( CurvineURI , String , i64 ) > {
321+ async fn validate_and_prepare_job (
322+ & self ,
323+ path : & str ,
324+ ) -> CommonResult < ( CurvineURI , String , i64 ) > {
281325 let uri = CurvineURI :: new ( path) ?;
282326 let mut ufs_manager = UfsManager :: new ( self . fs_client . clone ( ) ) ;
283327 let target_path = ufs_manager. get_curvine_path ( & uri) . await ?;
284328 let ufs_client = ufs_manager. get_client ( & uri) . await ?;
285329 let file_status = try_err ! ( ufs_client. get_file_status( & uri) . await ) ;
286330 let source_mtime = try_option ! ( file_status, "Source file not found: {}" , uri) . mtime ;
287-
331+
288332 Ok ( ( uri, target_path, source_mtime) )
289333 }
290334
@@ -323,7 +367,7 @@ impl LoadManager {
323367 } else {
324368 None
325369 } ;
326-
370+
327371 Ok ( ( job, ttl_duration_unit) )
328372 }
329373
@@ -370,7 +414,9 @@ impl LoadManager {
370414 if let Some ( mut job) = jobs_clone. get_mut ( & job_id) {
371415 job. update_state (
372416 LoadState :: Failed ,
373- Some ( "Path is a directory but recursive flag is not set" . to_string ( ) ) ,
417+ Some (
418+ "Path is a directory but recursive flag is not set" . to_string ( ) ,
419+ ) ,
374420 ) ;
375421 }
376422 return ;
@@ -386,21 +432,26 @@ impl LoadManager {
386432 source_path,
387433 target_path,
388434 ttl_duration_unit,
389- ) . await ;
435+ )
436+ . await ;
390437 } else {
391438 // Process directory
439+ let context = DirectoryProcessContext :: new (
440+ job_id,
441+ source_path,
442+ uri,
443+ ttl_duration_unit,
444+ recursive,
445+ ) ;
392446 Self :: process_directory (
393447 jobs_clone. clone ( ) ,
394448 fs_clone_1,
395449 client_factory_clone,
396450 ufs_manager,
397451 ufs_client,
398- job_id,
399- source_path,
400- uri,
401- ttl_duration_unit,
402- recursive,
403- ) . await ;
452+ context,
453+ )
454+ . await ;
404455 }
405456 }
406457 Err ( e) => {
@@ -466,7 +517,10 @@ impl LoadManager {
466517 } ;
467518
468519 let ( ttl_ms, ttl_action) = if let Some ( duration_unit) = ttl_duration_unit {
469- ( Some ( duration_unit. as_millis ( ) as i64 ) , Some ( TtlAction :: Delete . into ( ) ) )
520+ (
521+ Some ( duration_unit. as_millis ( ) as i64 ) ,
522+ Some ( TtlAction :: Delete . into ( ) ) ,
523+ )
470524 } else {
471525 ( None , None )
472526 } ;
@@ -478,7 +532,7 @@ impl LoadManager {
478532 ttl_ms,
479533 ttl_action,
480534 } ;
481-
535+
482536 let msg = Builder :: new_rpc ( RpcCode :: SubmitLoadTask )
483537 . request ( RequestStatus :: Rpc )
484538 . proto_header ( request)
@@ -495,7 +549,10 @@ impl LoadManager {
495549 target_path,
496550 worker. worker_id ,
497551 ) ;
498- info ! ( "Added sub-task {} for job {}" , task_response. task_id, job_id) ;
552+ info ! (
553+ "Added sub-task {} for job {}" ,
554+ task_response. task_id, job_id
555+ ) ;
499556 }
500557 }
501558 Err ( e) => {
@@ -516,23 +573,22 @@ impl LoadManager {
516573 client_factory : Arc < ClientFactory > ,
517574 mut ufs_manager : UfsManager ,
518575 ufs_client : Arc < UfsClient > ,
519- job_id : String ,
520- source_path : String ,
521- uri : CurvineURI ,
522- ttl_duration_unit : Option < DurationUnit > ,
523- recursive : bool ,
576+ context : DirectoryProcessContext ,
524577 ) {
525578 // List directory files
526- let files = match ufs_client. list_directory ( & uri, recursive) . await {
579+ let files = match ufs_client
580+ . list_directory ( & context. uri , context. recursive )
581+ . await
582+ {
527583 Ok ( files) => {
528584 if files. is_empty ( ) {
529- if let Some ( mut job) = jobs. get_mut ( & job_id) {
585+ if let Some ( mut job) = jobs. get_mut ( & context . job_id ) {
530586 job. update_state (
531587 LoadState :: Failed ,
532588 Some ( format ! (
533589 "Found {} files to process in directory {}" ,
534590 files. len( ) ,
535- source_path
591+ context . source_path
536592 ) ) ,
537593 ) ;
538594 }
@@ -542,7 +598,7 @@ impl LoadManager {
542598 }
543599 Err ( e) => {
544600 error ! ( "Failed to process directory: {}" , e) ;
545- if let Some ( mut job) = jobs. get_mut ( & job_id) {
601+ if let Some ( mut job) = jobs. get_mut ( & context . job_id ) {
546602 job. update_state (
547603 LoadState :: Failed ,
548604 Some ( format ! ( "Failed to process directory: {}" , e) ) ,
@@ -552,7 +608,11 @@ impl LoadManager {
552608 }
553609 } ;
554610
555- info ! ( "Found {} files to process in directory {}" , files. len( ) , source_path) ;
611+ info ! (
612+ "Found {} files to process in directory {}" ,
613+ files. len( ) ,
614+ context. source_path
615+ ) ;
556616
557617 // Process each file in directory
558618 for file_path in files {
@@ -563,7 +623,7 @@ impl LoadManager {
563623 continue ;
564624 }
565625 } ;
566-
626+
567627 let file_target_path = match ufs_manager. get_curvine_path ( & file_uri) . await {
568628 Ok ( path) => path,
569629 Err ( e) => {
@@ -576,7 +636,7 @@ impl LoadManager {
576636 Ok ( w) => w,
577637 Err ( e) => {
578638 error ! ( "Failed to choose workers: {}" , e) ;
579- if let Some ( mut job_ref) = jobs. get_mut ( & job_id) {
639+ if let Some ( mut job_ref) = jobs. get_mut ( & context . job_id ) {
580640 job_ref. update_state (
581641 LoadState :: Failed ,
582642 Some ( format ! ( "Failed to choose workers: {}" , e) ) ,
@@ -587,7 +647,7 @@ impl LoadManager {
587647 } ;
588648
589649 // Update worker assignment
590- if let Some ( mut job_ref) = jobs. get_mut ( & job_id) {
650+ if let Some ( mut job_ref) = jobs. get_mut ( & context . job_id ) {
591651 job_ref. assign_worker ( worker. clone ( ) ) ;
592652 job_ref. update_state (
593653 LoadState :: Loading ,
@@ -607,14 +667,17 @@ impl LoadManager {
607667 }
608668 } ;
609669
610- let ( ttl_ms, ttl_action) = if let Some ( duration_unit) = ttl_duration_unit {
611- ( Some ( duration_unit. as_millis ( ) as i64 ) , Some ( TtlAction :: Delete . into ( ) ) )
670+ let ( ttl_ms, ttl_action) = if let Some ( duration_unit) = context. ttl_duration_unit {
671+ (
672+ Some ( duration_unit. as_millis ( ) as i64 ) ,
673+ Some ( TtlAction :: Delete . into ( ) ) ,
674+ )
612675 } else {
613676 ( None , None )
614677 } ;
615678
616679 let request = LoadTaskRequest {
617- job_id : job_id. clone ( ) ,
680+ job_id : context . job_id . clone ( ) ,
618681 source_path : file_path. clone ( ) ,
619682 target_path : file_target_path. clone ( ) ,
620683 ttl_ms,
@@ -631,7 +694,7 @@ impl LoadManager {
631694 match worker_client. rpc ( msg) . await {
632695 Ok ( response) => {
633696 let task_response: LoadTaskResponse = response. parse_header ( ) . unwrap ( ) ;
634- if let Some ( mut job) = jobs. get_mut ( & job_id) {
697+ if let Some ( mut job) = jobs. get_mut ( & context . job_id ) {
635698 job. add_sub_task (
636699 task_response. task_id . clone ( ) ,
637700 file_path. clone ( ) ,
@@ -641,7 +704,7 @@ impl LoadManager {
641704 info ! (
642705 "Added sub-task {} for job {}, sub_path {}" ,
643706 task_response. task_id,
644- job_id,
707+ context . job_id,
645708 file_uri. to_local_path( ) . unwrap_or_default( )
646709 ) ;
647710 }
@@ -665,22 +728,24 @@ impl LoadManager {
665728 }
666729
667730 let target_mtime = try_log ! ( self . master_fs. file_status( target_path) , {
668- warn!( "Failed to get target file status for {}, proceeding with load" , target_path) ;
731+ warn!(
732+ "Failed to get target file status for {}, proceeding with load" ,
733+ target_path
734+ ) ;
669735 return Ok ( true ) ;
670- } ) . mtime ;
736+ } )
737+ . mtime ;
671738
672739 if source_mtime != target_mtime {
673740 for entry in self . jobs . iter ( ) {
674741 let job = entry. value ( ) ;
675742 if job. source_path == source_path {
676743 return match job. state {
677744 LoadState :: Loading | LoadState :: Pending => {
678- try_log ! ( self . cancel_job( job. job_id. clone( ) ) . await ) ;
679- Ok ( true )
680- }
681- LoadState :: Completed | LoadState :: Failed | LoadState :: Canceled => {
745+ let _ = try_log ! ( self . cancel_job( job. job_id. clone( ) ) . await ) ;
682746 Ok ( true )
683747 }
748+ LoadState :: Completed | LoadState :: Failed | LoadState :: Canceled => Ok ( true ) ,
684749 } ;
685750 }
686751 }
@@ -695,7 +760,8 @@ impl LoadManager {
695760 for entry in self . jobs . iter ( ) {
696761 let job = entry. value ( ) ;
697762 if job. source_path == source_path
698- && ( job. state == LoadState :: Failed || job. state == LoadState :: Canceled ) {
763+ && ( job. state == LoadState :: Failed || job. state == LoadState :: Canceled )
764+ {
699765 jobs_to_remove. push ( job. job_id . clone ( ) ) ;
700766 }
701767 }
@@ -777,7 +843,6 @@ impl LoadManager {
777843 }
778844}
779845
780-
781846/// Load manager error
782847#[ derive( Debug , Error ) ]
783848pub enum LoadManagerError {
0 commit comments