38
38
import json
39
39
import uuid
40
40
import base64
41
+ import inspect
41
42
from collections import OrderedDict
42
43
from typing import Any , Callable , Dict , Iterable , List , Set , Tuple , Union , Type
43
44
@@ -373,6 +374,57 @@ def _build_tree_module(module_obj):
373
374
_build_tree_module (root_module )
374
375
pass
375
376
377
+ @staticmethod
378
+ def _reassign_parameters_objects (
379
+ manager : PipelineModuleManager ,
380
+ objects : List [AbstractModule ],
381
+ pipeline_args : Dict [str , str ] = dict ()) -> List [AbstractModule ]:
382
+ """
383
+ Reassign parameters of operations.
384
+ """
385
+ for key , value in pipeline_args .items ():
386
+ #
387
+ if key in ['-source' , '-i' , '-input' ]:
388
+ data_source = value
389
+ obj = manager .find_data_source (data_source , AbstractReader , StoreCapabilities .READ )
390
+ if obj is None :
391
+ raise Exception ('Input DataSource "{}" not supported!' .format (data_source ))
392
+
393
+ obj .connectionString = data_source
394
+ objects .insert (0 , obj )
395
+ continue
396
+ if key in ['-target' , '-o' , '-output' ]:
397
+ data_source = value
398
+ obj = manager .find_data_source (data_source , AbstractWriter , StoreCapabilities .CREATE )
399
+ if obj is None :
400
+ raise Exception ('Output DataSource "{}" not supported!' .format (data_source ))
401
+
402
+ obj .connectionString = data_source
403
+ objects .append (obj )
404
+ continue
405
+ #
406
+ elif key .startswith ('--' ):
407
+ key = key [2 :].split ('.' )
408
+ if len (key ) != 3 :
409
+ continue
410
+ object_type , stage_id , attribute_name = key
411
+ if object_type .lower () != 'module' :
412
+ continue
413
+
414
+ logging .debug (
415
+ 'Assigning parameter of "{}.{}" -> {}={}...'
416
+ .format (object_type , stage_id , attribute_name , value ))
417
+
418
+ for obj in objects :
419
+ if obj .stageId == stage_id :
420
+ if hasattr (obj , attribute_name ):
421
+ new_value = ProcessingUtils .cast_value (value , type (getattr (obj , attribute_name )))
422
+ setattr (obj , attribute_name , new_value )
423
+ else :
424
+ setattr (obj , attribute_name , value )
425
+
426
+ return objects
427
+
376
428
@staticmethod
377
429
def _parse_pipeline_objects (manager : PipelineModuleManager ,
378
430
modules : Dict [str , Type ],
@@ -399,7 +451,6 @@ def _parse_pipeline_objects(manager: PipelineModuleManager,
399
451
if not os .path .exists (script_file ):
400
452
script_file = os .path .join (os .path .dirname (__file__ ), script_file )
401
453
402
- import inspect
403
454
type_def = ModuleManager .import_type_from_file (script_file , class_name , inspect .isclass )
404
455
405
456
if type_def is None :
@@ -427,46 +478,8 @@ def _parse_pipeline_objects(manager: PipelineModuleManager,
427
478
objects .append (obj )
428
479
429
480
# Reassign parameters of operations.
430
- for key , value in pipeline_args .items ():
431
- #
432
- if key in ['-source' , '-i' , '-input' ]:
433
- data_source = value
434
- obj = manager .find_data_source (data_source , AbstractReader , StoreCapabilities .READ )
435
- if obj is None :
436
- raise Exception ('Input DataSource "{}" not supported!' .format (data_source ))
437
-
438
- obj .connectionString = data_source
439
- objects .insert (0 , obj )
440
- continue
441
- if key in ['-target' , '-o' , '-output' ]:
442
- data_source = value
443
- obj = manager .find_data_source (data_source , AbstractWriter , StoreCapabilities .CREATE )
444
- if obj is None :
445
- raise Exception ('Output DataSource "{}" not supported!' .format (data_source ))
446
-
447
- obj .connectionString = data_source
448
- objects .append (obj )
449
- continue
450
- #
451
- elif key .startswith ('--' ):
452
- key = key [2 :].split ('.' )
453
- if len (key ) != 3 :
454
- continue
455
- object_type , stage_id , attribute_name = key
456
- if object_type .lower () != 'module' :
457
- continue
458
-
459
- logging .debug (
460
- 'Assigning parameter of "{}.{}" -> {}={}...'
461
- .format (object_type , stage_id , attribute_name , value ))
462
-
463
- for obj in objects :
464
- if obj .stageId == stage_id :
465
- if hasattr (obj , attribute_name ):
466
- new_value = ProcessingUtils .cast_value (value , type (getattr (obj , attribute_name )))
467
- setattr (obj , attribute_name , new_value )
468
- else :
469
- setattr (obj , attribute_name , value )
481
+ objects = \
482
+ PipelineManager ._reassign_parameters_objects (manager , objects , pipeline_args )
470
483
471
484
# Fix some possible bad settings.
472
485
for obj in objects :
@@ -621,7 +634,6 @@ def run(self, processing_args, callback: Callable = None, callback_args: Any = N
621
634
return False
622
635
623
636
set_of_objects = [obj for obj in self .objects (recursive = True )]
624
- this_manager = self
625
637
626
638
if self ._pipeline_dir :
627
639
os .environ ['PIPELINE_FOLDER' ] = self ._pipeline_dir
@@ -639,53 +651,6 @@ def run(self, processing_args, callback: Callable = None, callback_args: Any = N
639
651
logging .warning ('There is none Output node, the Pipeline does nothing!' )
640
652
return False
641
653
642
- def my_starting_run_function (module_obj : AbstractModule ,
643
- function_args : Any ) -> Tuple [SchemaDef , AbstractModule ]:
644
- """
645
- Prepare of task metadata of each Pipeline Operation.
646
- """
647
- input_function_args = function_args
648
- input_schemas = []
649
-
650
- # Calling before to the tree of inputs.
651
- for obj in module_obj .ti_ .inputs .values ():
652
- function_args = my_starting_run_function (obj , input_function_args )
653
- input_schemas .append (function_args [0 ])
654
-
655
- if hasattr (module_obj , 'pipeline_args' ):
656
- return [module_obj .pipeline_args .schema_def , module_obj .pipeline_args .data_source ]
657
- if len (input_schemas ) > 1 :
658
- function_args [0 ] = SchemaDef .merge_all (input_schemas )
659
- if module_obj .className == 'ConnectionJoin' :
660
- function_args [1 ] = None
661
-
662
- # Assign metadata for using when the Pipeline runs.
663
- schema_def = function_args [0 ]
664
- parent_obj = function_args [1 ]
665
- schema_def = module_obj .starting_run (schema_def , this_manager , processing_args )
666
-
667
- setattr (module_obj , 'pipeline_args' , type ('PipelineArgs' , (object ,), {
668
- 'config' : this_manager .config ,
669
- 'pipeline' : this_manager ,
670
- 'data_source' : parent_obj ,
671
- 'schema_def' : schema_def ,
672
- 'processing_args' : processing_args
673
- })())
674
- return [schema_def , module_obj ]
675
-
676
- def my_finished_run_function (module_obj : AbstractModule ) -> AbstractModule :
677
- """
678
- Finalization of task for each Pipeline operation.
679
- """
680
- if hasattr (module_obj , 'pipeline_args' ):
681
- module_obj .finished_run (self , processing_args )
682
- delattr (module_obj , 'pipeline_args' )
683
-
684
- for obj in module_obj .ti_ .inputs .values ():
685
- my_finished_run_function (obj )
686
-
687
- return module_obj
688
-
689
654
# Redefine 'connectionString' of Readers, when defining embebed fileData.
690
655
for reader in readers :
691
656
if hasattr (reader , 'connectionString' ):
@@ -707,15 +672,15 @@ def my_finished_run_function(module_obj: AbstractModule) -> AbstractModule:
707
672
708
673
# Run workflow, like one IEnumerable stream!
709
674
for writer in writers :
710
- my_starting_run_function (writer , [None , None ])
675
+ self . _invoke_starting_run (writer , [None , None ], processing_args )
711
676
712
677
for feature in writer :
713
678
if callback :
714
679
callback (self , processing_args , writer , feature , callback_args )
715
680
716
681
feature = None
717
682
718
- my_finished_run_function (writer )
683
+ self . _invoke_finished_run (writer , processing_args )
719
684
720
685
return True
721
686
finally :
@@ -731,3 +696,52 @@ def my_finished_run_function(module_obj: AbstractModule) -> AbstractModule:
731
696
sys .path .remove (self ._pipeline_dir )
732
697
733
698
return False
699
+
700
+ def _invoke_starting_run (
701
+ self ,
702
+ module_obj : AbstractModule ,
703
+ function_args : Any , processing_args ) -> Tuple [SchemaDef , AbstractModule ]:
704
+ """
705
+ Prepare of task metadata of each Pipeline Operation.
706
+ """
707
+ input_function_args = function_args
708
+ input_schemas = []
709
+
710
+ # Calling before to the tree of inputs.
711
+ for obj in module_obj .ti_ .inputs .values ():
712
+ function_args = self ._invoke_starting_run (obj , input_function_args , processing_args )
713
+ input_schemas .append (function_args [0 ])
714
+
715
+ if hasattr (module_obj , 'pipeline_args' ):
716
+ return [module_obj .pipeline_args .schema_def , module_obj .pipeline_args .data_source ]
717
+ if len (input_schemas ) > 1 :
718
+ function_args [0 ] = SchemaDef .merge_all (input_schemas )
719
+ if module_obj .className == 'ConnectionJoin' :
720
+ function_args [1 ] = None
721
+
722
+ # Assign metadata for using when the Pipeline runs.
723
+ schema_def = function_args [0 ]
724
+ parent_obj = function_args [1 ]
725
+ schema_def = module_obj .starting_run (schema_def , self , processing_args )
726
+
727
+ setattr (module_obj , 'pipeline_args' , type ('PipelineArgs' , (object ,), {
728
+ 'config' : self .config ,
729
+ 'pipeline' : self ,
730
+ 'data_source' : parent_obj ,
731
+ 'schema_def' : schema_def ,
732
+ 'processing_args' : processing_args
733
+ })())
734
+ return [schema_def , module_obj ]
735
+
736
+ def _invoke_finished_run (self , module_obj : AbstractModule , processing_args ) -> AbstractModule :
737
+ """
738
+ Finalization of task for each Pipeline operation.
739
+ """
740
+ if hasattr (module_obj , 'pipeline_args' ):
741
+ module_obj .finished_run (self , processing_args )
742
+ delattr (module_obj , 'pipeline_args' )
743
+
744
+ for obj in module_obj .ti_ .inputs .values ():
745
+ self ._invoke_finished_run (obj , processing_args )
746
+
747
+ return module_obj
0 commit comments