Skip to content

Conversation

@dejii
Copy link
Contributor

@dejii dejii commented Oct 7, 2025

Pending review from the Google team (see #54494 (comment))

related: #54494


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@boring-cyborg boring-cyborg bot added area:dev-tools area:providers backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch kind:documentation provider:google Google (including GCP) related issues labels Oct 7, 2025
Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for raising the PR again!

Copy link
Contributor

@vincbeck vincbeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

@potiuk
Copy link
Member

potiuk commented Oct 13, 2025

@VladaZakharova - can you take a look please?

@VladaZakharova
Copy link
Contributor

Hi there, thank you for pinging me here, this PR actually introduces very important set of changes for Google provider, that I think should be discussed before merging it.

In general changes introduced here make sense and will be good to have in Google provider as well, but from Google provider's point of view the structure itself of the files and folders where to store those changes should be discussed in a more detailed way.
I think from our team's side we can introduce some design document as a guideline how to add this new functionality in already existing provider's structure.

For now, I would kindly ask you to NOT merge these changes. Later will be good to adjust this PR to the design doc that we will share and then review this PR again.

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requesting changes for Google team discussion to complete.

@dejii
Copy link
Contributor Author

dejii commented Oct 23, 2025

I think from our team's side we can introduce some design document as a guideline how to add this new functionality in already existing provider's structure.

Just to add some context here that may be helpful – using a different folder structure will cause common.messaging to be unable to detect the queue.

References:

MESSAGE_QUEUE_PROVIDERS = [create_class_by_name(name)() for name in providers_manager.queue_class_names]

def _discover_queues(self) -> None:
"""Retrieve all queues defined in the providers."""
for provider_package, provider in self._provider_dict.items():
if provider.data.get("queues"):
for queue_class_name in provider.data["queues"]:
if _correctness_check(provider_package, queue_class_name, provider):
self._queue_class_name_set.add(queue_class_name)

So, changing the folder structure would ultimately require updates to the common package or introducing a special-case implementation specifically for the Google provider.

@vincbeck
Copy link
Contributor

I think from our team's side we can introduce some design document as a guideline how to add this new functionality in already existing provider's structure.

Just to add some context here that may be helpful – using a different folder structure will cause common.messaging to be unable to detect the queue.

References:

MESSAGE_QUEUE_PROVIDERS = [create_class_by_name(name)() for name in providers_manager.queue_class_names]

def _discover_queues(self) -> None:
"""Retrieve all queues defined in the providers."""
for provider_package, provider in self._provider_dict.items():
if provider.data.get("queues"):
for queue_class_name in provider.data["queues"]:
if _correctness_check(provider_package, queue_class_name, provider):
self._queue_class_name_set.add(queue_class_name)

So, changing the folder structure would ultimately require updates to the common package or introducing a special-case implementation specifically for the Google provider.

I do not understand, why do you need to change the common messaging structure?

@dejii
Copy link
Contributor Author

dejii commented Oct 23, 2025

I do not understand, why do you need to change the common messaging structure?

I do not intend to change the structure, I was only addressing #56445 (comment) about maintaining the existing provider's structure. The only thing introduced in this PR is a queues/ folder and it's a prerequisite for the common messaging system providers. I hope this clarifies it.

@vincbeck
Copy link
Contributor

I do not understand, why do you need to change the common messaging structure?

I do not intend to change the structure, I was only addressing #56445 (comment) about maintaining the existing provider's structure. The only thing introduced in this PR is a queues/ folder and it's a prerequisite for the common messaging system providers. I hope this clarifies it.

My bad :)

@VladaZakharova
Copy link
Contributor

VladaZakharova commented Nov 25, 2025

Hi there everyone. Thank you for waiting! We have discussed this inside the team and wanted to propose some changes.

  1. Change the parent class of the desired trigger to BaseEventTrigger. This means, that for current implementation for PubSub it will require the change of base class to BaseEventTrigger, which basically will only extend the current funtionality and will not break any code for the trigger existing logic, but will fix the need to add ignore tag as here: https://github.com/apache/airflow/pull/56445/files#diff-35c447dab2b24d97f627e395892d4810a15a4faffc619ebfad03e02c1bf5242eR68
  2. Implement a new class in providers/google/src/airflow/providers/google/event_scheduling/events/…..py that will inherit from BaseMessageQueueProvider and have a method. Naming should be like PubSubMessageQueueEventTriggerContainer Since right now the suggested name has Provider in it, which is very much confusing and can mistake user. New suggested name will fix this misunderstanding.
  3. Also, since the idea itself is to implement logic for event-driven scheduling, the proposed idea includes this kind of naming convention for the folders/files. In this way, it will be more clear for the user where to add new assets or events when extending logic for event scheduling in Airflow:
    https://screenshot.googleplex.com/GauARo3y86HyvcA

@dejii
Copy link
Contributor Author

dejii commented Nov 25, 2025

Thanks for taking a look into this! I have one question, how do you suggest we resolve this #56445 (comment)? Are we going to make changes to the discovery implementation to support the google provider?

Also the link to the image provided isn't publicly accessible.

@VladaZakharova
Copy link
Contributor

Sorry, my bad :)
GauARo3y86HyvcA

Regarding that comment, let me think...

@VladaZakharova
Copy link
Contributor

Regarding the comment: yes, unfortunately, this will need some changes in the existing code. Overall, this is a proposal for the common structure. Still, as a part of the Community, we want to be on the same page with other providers, so the proposed idea will include the changes in other providers as well. Good that we have only 2 I think for now :) But still, this is a proposal, as we respect the Community's decisions. From Google side it looks like current structure can be not very clear for the user, but if we will all follow one idea, then Google will also respect this and follow already implemented idea for event scheduling.

@potiuk @vincbeck

Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I not understand wrong, we don't need to change the common queue interface even the PubSubMessageQueueEventTriggerContainer is placed in different directory structure as screenshot described. The Provider Manager is still able to retrieve it as long as the queues of get_provider_info return the correct path to import.

Here is example of RedisPubSubMessageQueueProvider

"queues": ["airflow.providers.redis.queues.redis.RedisPubSubMessageQueueProvider"],
"sensors": [

we just need to set the queues to ["airflow.providers.google.cloud.event_scheduling.pubsub.PubSubMessageQueueEventTriggerContainer"] for this case.

@dejii
Copy link
Contributor Author

dejii commented Nov 25, 2025

@jason810496 good call out, that solves it then. Thanks!

@dejii dejii force-pushed the feat/pubsub-message-queue-provider branch from df668b2 to 847b25a Compare January 14, 2026 01:13
@dejii
Copy link
Contributor Author

dejii commented Jan 14, 2026

@VladaZakharova I have updated the PR to follow the proposed structure and naming convention for folders/files. Would you mind taking a look when you get a chance? Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:dev-tools area:providers backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch kind:documentation provider:google Google (including GCP) related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants