-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Wait before exiting in FileTrigger
#45859
Conversation
@@ -69,9 +69,11 @@ async def run(self) -> typing.AsyncIterator[TriggerEvent]: | |||
mod_time = datetime.datetime.fromtimestamp(mod_time_f).strftime("%Y%m%d%H%M%S") | |||
self.log.info("Found File %s last modified: %s", path, mod_time) | |||
yield TriggerEvent(True) | |||
await asyncio.sleep(self.poke_interval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this slow down the throughput from the triggerer, as we have this sleep that must be performed before the slot frees up for use again?
You could probably exacerbate/demonstrate this by setting a high poke interval and a triggerer that only runs a single trigger at a time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it is an async sleep that will not block the triggerer. The triggerer will use this time to run other async operations. At least that's my understanding
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be worth testing to be sure, but I'd also expect the triggerer to move on to another trigger while waiting for this sleep. Otherwise, the poke sleeps themselves would block the triggerer massively.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Not important now that we are going a different direction, but for posterity)
The main thread won't be blocked, but if you are at capacity, I think you'd see the triggerer is unable to start another trigger until the sleeping one wraps up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to be sure I'm groking the issue and goal of this PR. In the context of event based scheduling, we're trying to give time for the mechanism which is waiting for the file to exist to then remove it or move it before it triggers the workflow again? Otherwise we can fall into a tight loop of firing the trigger repeatedly?
return | ||
for _, _, files in os.walk(self.filepath): | ||
if files: | ||
yield TriggerEvent(True) | ||
await asyncio.sleep(self.poke_interval) | ||
return | ||
await asyncio.sleep(self.poke_interval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could move the wait to the top of the loop so that it covers both cases and we only need it in one spot (seems easier to maintain and less likely for regressions). BUT, that does mean that the functionality changes slightly since we'd not fire immediately if the file is already present.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly. That's something I want to avoid. To me, firing right when the file is detected is important.
However, there is a discussion on Slack on that topic. I expect this specific issue to be in several different triggers. S3KeyTrigger
is an example. As soon as the file is detected, the trigger will exit right away and the next loop will kick in and detect that same file again. Instead of modifying the triggers to handle that, why do not we update the logic in the triggerer to add a sleeping window between 2 fires of the same trigger (that would be applied to only triggers used for event driven scheduling). WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of modifying the triggers to handle that, why do not we update the logic in the triggerer to add a sleeping window between 2 fires of the same trigger (that would be applied to only triggers used for event driven scheduling). WDYT?
So you're proposing to close this PR and open a more generic one to solve the edge case more generally? If so I like that plan :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep :) But I keep this one open until we take a decision
You got it right :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like there are some conflicts to resolve. But otherwise lgtm
Closing this one in favor of https://apache-airflow.slack.com/archives/C06K9Q5G2UA/p1737999509008529. |
In the current implementation of FileTrigger, the trigger fires and exits immediately upon detecting the file. This behavior works well in Airflow 2 because triggers are exclusively used for deferrable operators, which are designed to fire only once (as the trigger is deleted after firing).
In Airflow 3, however, triggers are also used to schedule DAGs based on events, and in this context, triggers are long-running, remaining active even after firing. For
FileTrigger
, this creates a scenario where multiple events can be sent simultaneously because the trigger exits immediately after detecting the file. Pausing before exiting gives some breathing room between checks (and potentially give some time for other system to delete the file to avoid other DAG executions).This has no consequence for deferrable operators. The trigger will just leave
self.poke_interval
seconds more in the triggerer. Since it is an async operation, there is no harm.^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.