Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait before exiting in FileTrigger #45859

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions providers/src/airflow/providers/standard/triggers/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ async def run(self) -> typing.AsyncIterator[TriggerEvent]:
mod_time = datetime.datetime.fromtimestamp(mod_time_f).strftime("%Y%m%d%H%M%S")
self.log.info("Found File %s last modified: %s", path, mod_time)
yield TriggerEvent(True)
await asyncio.sleep(self.poke_interval)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this slow down the throughput from the triggerer, as we have this sleep that must be performed before the slot frees up for use again?

You could probably exacerbate/demonstrate this by setting a high poke interval and a triggerer that only runs a single trigger at a time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it is an async sleep that will not block the triggerer. The triggerer will use this time to run other async operations. At least that's my understanding

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth testing to be sure, but I'd also expect the triggerer to move on to another trigger while waiting for this sleep. Otherwise, the poke sleeps themselves would block the triggerer massively.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Not important now that we are going a different direction, but for posterity)
The main thread won't be blocked, but if you are at capacity, I think you'd see the triggerer is unable to start another trigger until the sleeping one wraps up.

return
for _, _, files in os.walk(self.filepath):
if files:
yield TriggerEvent(True)
await asyncio.sleep(self.poke_interval)
return
await asyncio.sleep(self.poke_interval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could move the wait to the top of the loop so that it covers both cases and we only need it in one spot (seems easier to maintain and less likely for regressions). BUT, that does mean that the functionality changes slightly since we'd not fire immediately if the file is already present.

Copy link
Contributor Author

@vincbeck vincbeck Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly. That's something I want to avoid. To me, firing right when the file is detected is important.

However, there is a discussion on Slack on that topic. I expect this specific issue to be in several different triggers. S3KeyTrigger is an example. As soon as the file is detected, the trigger will exit right away and the next loop will kick in and detect that same file again. Instead of modifying the triggers to handle that, why do not we update the logic in the triggerer to add a sleeping window between 2 fires of the same trigger (that would be applied to only triggers used for event driven scheduling). WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of modifying the triggers to handle that, why do not we update the logic in the triggerer to add a sleeping window between 2 fires of the same trigger (that would be applied to only triggers used for event driven scheduling). WDYT?

So you're proposing to close this PR and open a more generic one to solve the edge case more generally? If so I like that plan :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep :) But I keep this one open until we take a decision