Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] If Else Outcome is incorrectly taking settings from dictionary #171

Closed
mikita-sakalouski opened this issue Oct 18, 2024 · 1 comment
Labels
bug Something isn't working
Milestone

Comments

@mikita-sakalouski
Copy link
Contributor

Describe the bug
When you are putting the if_else_condition_task type in the depends_on, you should provide dict with configuration of outcome lineage if_else_outcome={"is_local": "false", "is_local2": "true"}. The problem is that current implementation takes the first value and apply it for all tasks, instead of taking the value by task_name:

  @property
  def depends_on_names(self) -> Iterator[Dict[str, Optional[str]]]:
      for i in self.depends_on:
          if self.if_else_outcome:
              # Here is the issue
              outcome = list(self.if_else_outcome.values())[0]
          else:
              outcome = None

          if callable(i) and hasattr(i, "__name__"):
              yield {i.__name__: outcome}
          else:
              yield {str(i): outcome}

To Reproduce
Steps to reproduce the behavior:

  1. Create workflow
wf = Workflow(
    "test",
    default_cluster=Cluster.from_existing_cluster("existing_cluster_id"),
    schedule_quartz_expression="* * * * *",
    permissions=WorkflowPermissions(
        owner=User("[email protected]"),
        can_manage_run=[User("[email protected]")],
        can_view=[User("[email protected]")],
        can_manage=[User("[email protected]")],
    ),
    run_as_user="[email protected]",
    tags={"test": "test2"},
    common_task_parameters={"all_tasks1": "test", "all_tasks3": "123"},  # type: ignore
    health={
        "rules": [
            {"metric": "RUN_DURATION_SECONDS", "op": "GREATER_THAN", "value": 7200.0}
        ]
    },  # type: ignore
    trigger={
        "file_arrival": {"url": "<my_url>"},
        "pause_status": "UNPAUSED",
    },  # type: ignore
    parameters=[
        JobsParameters(
            default="value1",
            name="wf_param1",
        )
    ],
)
  1. Create several if_condition_tasks
@workflow.if_else_condition_task()
def is_local():
    return IfElseConditionTask(left="{{job.parameters.environment}}", op="==", right="local")


@workflow.if_else_condition_task()
def is_local2():
    return IfElseConditionTask(left="{{job.parameters.environment}}", op="==", right="local")
  1. Create one normal task and put both if_condition_tasks in dependency
@workflow.spark_python_task(
    depends_on=[is_local2, is_local], if_else_outcome={"is_local": "false", "is_local2": "true"}
)
def sync():
    return build_spark_python_task()
  1. Run brickflow deploy
  2. As result you will get the yaml which said
        - task_key: sync
          depends_on:
            - task_key: is_local2
              outcome: "false"
            - task_key: is_local
              outcome: "false"

Expected behavior
Outcome should be taken correctly based on the key, and not the first one. Generated yaml should look like:

        - task_key: sync
          depends_on:
            - task_key: is_local2
              outcome: "false"
            - task_key: is_local
              outcome: "true"

Screenshots
Screenshot 2024-10-18 at 13 47 17

@mikita-sakalouski
Copy link
Contributor Author

Done in #173

@pariksheet pariksheet modified the milestones: v1.3.3, v1.3.1 Jan 28, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants