Skip to content

Conversation

@haiyuan-eng-google
Copy link
Collaborator

Added robustness checks to ensure loop identity before accessing batch processor and flushing.

Please ensure you have read the contribution guide before creating a pull request.

Link to Issue or Description of Change

1. Link to an existing issue (if applicable):

  • Closes: #issue_number
  • Related: #issue_number

2. Or, if no issue exists, describe the change:

If applicable, please follow the issue templates to provide as much detail as
possible.

Problem:
A clear and concise description of what the problem is.

Solution:
A clear and concise description of what you want to happen and why you choose
this solution.

Testing Plan

Please describe the tests that you ran to verify your changes. This is required
for all PRs that are not small documentation or typo fixes.

Unit Tests:

  • I have added or updated unit tests for my change.
  • All unit tests pass locally.

Please include a summary of passed pytest results.

Manual End-to-End (E2E) Tests:

Please provide instructions on how to manually test your changes, including any
necessary setup or configuration. Please provide logs or screenshots to help
reviewers better understand the fix.

Checklist

  • I have read the CONTRIBUTING.md document.
  • I have performed a self-review of my own code.
  • I have commented my code, particularly in hard-to-understand areas.
  • I have added tests that prove my fix is effective or that my feature works.
  • New and existing unit tests pass locally with my changes.
  • I have manually tested my changes end-to-end.
  • Any dependent changes have been merged and published in downstream modules.

Additional context

Add any other context or screenshots about the feature request here.

Added robustness checks to ensure loop identity before accessing batch processor and flushing.
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @haiyuan-eng-google, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly improves the stability of the BigQuery plugin by introducing robust validation for asyncio event loop states. It addresses potential issues where a BatchProcessor might incorrectly reference a closed or mismatched event loop, which can happen in complex asynchronous environments. By validating loop identity before accessing or flushing batch processors, and by actively clearing stale loop states, the plugin becomes more resilient to RuntimeError exceptions and ensures operations are always performed with the correct, active event loop.

Highlights

  • Enhanced Loop State Validation: Implemented checks in _batch_processor_prop, _get_loop_state, and flush methods to ensure the BatchProcessor is associated with the currently running asyncio event loop, preventing the use of stale or invalid loop states.
  • Stale Loop State Handling: Added logic to detect and clear stale loop states from the internal _loop_state_by_loop dictionary, particularly in _get_loop_state and flush, to avoid issues arising from asyncio loop reuse or closure.
  • Improved Robustness: The changes collectively enhance the robustness of the BigQuery plugin by preventing potential RuntimeError exceptions that could occur when attempting to use a BatchProcessor or asyncio.Queue bound to a closed or mismatched event loop.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • src/google/adk/plugins/bigquery_agent_analytics_plugin.py
    • Added a check in _batch_processor_prop to verify that the batch_processor's internal queue loop matches the current running loop before returning the processor.
    • Introduced a robustness check in _get_loop_state to detect if a stored loop state's batch_processor is bound to a different or closed loop, logging a warning and deleting the stale state if a mismatch is found.
    • Modified the flush method to validate the loop identity of the stored state's batch_processor before flushing, logging a warning and removing the stale state if a mismatch occurs.
Activity
  • No specific activity (comments, reviews, progress) has been recorded for this pull request yet.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces important robustness checks for handling asyncio event loop state in the BigQuery plugin. By validating the loop identity before accessing or flushing the batch processor, it effectively prevents issues arising from stale state when event loops are reused. The changes in _get_loop_state and flush are well-implemented. I've added one suggestion for _batch_processor_prop to make its behavior consistent with the other modifications by also cleaning up stale state.

Comment on lines +1652 to +1656
state = self._loop_state_by_loop[loop]
# Validate loop identity to prevent reuse of closed loops
if state.batch_processor._queue._loop is not loop:
return None
return state.batch_processor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency with the changes in _get_loop_state and flush, it would be better to also log a warning and clean up the stale state from _loop_state_by_loop here. Currently, a stale state is detected but not removed, which means subsequent calls to this property will repeatedly find the stale state until it's cleaned up by one of the other methods.

Suggested change
state = self._loop_state_by_loop[loop]
# Validate loop identity to prevent reuse of closed loops
if state.batch_processor._queue._loop is not loop:
return None
return state.batch_processor
state = self._loop_state_by_loop[loop]
# Validate loop identity to prevent reuse of closed loops
if state.batch_processor._queue._loop is not loop:
logger.warning(
"Detected stale loop state for loop %s (id=%s) in property, clearing.",
loop,
id(loop),
)
del self._loop_state_by_loop[loop]
return None
return state.batch_processor

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants