Skip to content

Conversation

@goutamvenkat-anyscale
Copy link
Contributor

Description

This change defers writing until it reads the schema from the 1st bundle. This is needed for datasinks like Iceberg which require the schema to be mutated prior to writing the underlying data to the store.

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

@goutamvenkat-anyscale goutamvenkat-anyscale marked this pull request as ready for review December 8, 2025 19:46
@goutamvenkat-anyscale goutamvenkat-anyscale requested a review from a team as a code owner December 8, 2025 19:46
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the write datasink logic to defer on_write_start until the first data bundle is available, allowing schema-dependent initialization. This is a significant and useful change. My review focuses on the correctness of this new deferred execution flow and related refactorings.

I've found a critical issue in dataset.py where a refactoring of the Schema.types method has removed a necessary check, which will cause failures for Arrow-backed datasets. I've also pointed out a minor code cleanup opportunity by removing some redundant imports.

Overall, the changes are well-structured, but the identified critical issue should be addressed before merging.

@goutamvenkat-anyscale goutamvenkat-anyscale added data Ray Data-related issues go add ONLY when ready to merge, run all tests labels Dec 8, 2025
Signed-off-by: Goutam <[email protected]>
Signed-off-by: Goutam <[email protected]>
Signed-off-by: Goutam <[email protected]>
Signed-off-by: Goutam <[email protected]>
@goutamvenkat-anyscale
Copy link
Contributor Author

/gemini summary

@gemini-code-assist
Copy link
Contributor

Summary of Changes

This pull request significantly refactors the write path in Ray Data to support datasinks that require schema information before data is written. It introduces a deferred on_write_start callback that receives the PyArrow schema of the first data bundle, enabling advanced features like schema evolution for systems such as Iceberg. The changes also improve the handling of empty datasets and ensure consistent schema extraction and operator fusion, making the write process more robust and flexible.

Highlights

  • Deferred Write Execution: Write operations are now deferred until the schema from the first data bundle is available. This is critical for datasinks like Iceberg that require schema mutation before writing data.
  • New on_start Callback Mechanism: A new on_start callback has been introduced in MapOperator and its subclasses. This callback is invoked with the PyArrow schema of the first input bundle, allowing datasinks to perform schema-dependent initialization.
  • Datasink on_write_start Update: The on_write_start method for various datasinks (BigQuery, ClickHouse, Iceberg, Lance) has been updated to accept an optional PyArrow schema, aligning with the new deferred execution model.
  • Lazy _map_transformer_ref Initialization: The _map_transformer_ref in MapOperator is now lazily initialized. This ensures that on_start callbacks can modify the transformer's state before it is serialized and used by worker actors.
  • Refactored File Datasink Pre-flight Checks: Pre-flight checks for _FileDatasink (handling SaveMode.ERROR, IGNORE, OVERWRITE) have been moved to a new _pre_flight_check method, which is executed before the write operation begins. This ensures that checks are performed upfront, and on_write_start is simplified to only handle directory creation.
  • Schema Extraction from Blocks: A new remote function _get_arrow_schema_from_block was added to efficiently extract PyArrow schema from a block (including Pandas blocks) by converting a 1-row sample, avoiding fetching full block data to the driver.
Changelog
  • python/ray/data/_internal/datasource/bigquery_datasink.py
    • Added TYPE_CHECKING import for pyarrow as pa.
    • Modified on_write_start to accept an optional pa.Schema argument.
  • python/ray/data/_internal/datasource/clickhouse_datasink.py
    • Added import pyarrow as pa.
    • Modified on_write_start to accept an optional pa.Schema argument.
  • python/ray/data/_internal/datasource/iceberg_datasink.py
    • Added import pyarrow as pa within TYPE_CHECKING block.
    • Modified on_write_start to accept an optional pa.Schema argument.
  • python/ray/data/_internal/datasource/lance_datasink.py
    • Modified on_write_start to accept an optional pa.Schema argument.
  • python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
    • Added TYPE_CHECKING import for pyarrow as pa.
    • Added on_start parameter to __init__ and passed it to the super constructor.
    • Added a line to trigger the large UDF warning check by accessing _map_transformer_ref to ensure lazy initialization occurs before actors are scaled.
    • Added a call to self._notify_first_input(bundle) in _add_bundled_input.
  • python/ray/data/_internal/execution/operators/map_operator.py
    • Added TYPE_CHECKING import for pyarrow as pa.
    • Introduced a new remote function _get_arrow_schema_from_block to extract schema from a block without fetching data to the driver.
    • Added on_start parameter to __init__ and create methods.
    • Implemented lazy initialization for _map_transformer_ref using a property.
    • Added _notify_first_input method to invoke the on_start callback with the schema from the first bundle.
    • Added _get_schema_from_bundle method to extract PyArrow schema from a RefBundle, handling both Arrow and Pandas blocks.
    • Modified _warn_large_udf to use the lazily initialized __map_transformer_ref.
    • Ensured _add_bundled_input calls _notify_first_input.
  • python/ray/data/_internal/execution/operators/task_pool_map_operator.py
    • Added TYPE_CHECKING import for pyarrow as pa.
    • Added on_start parameter to __init__ and passed it to the super constructor.
    • Added a call to self._notify_first_input(bundle) in _add_bundled_input.
  • python/ray/data/_internal/logical/rules/operator_fusion.py
    • Implemented logic to fuse on_start callbacks from upstream and downstream operators.
    • Passed the fused on_start callback to MapOperator.create.
  • python/ray/data/_internal/planner/plan_write_op.py
    • Extracted datasink variable for clarity.
    • Set up the on_start callback for datasinks, passing datasink.on_write_start to MapOperator.create.
  • python/ray/data/dataset.py
    • Removed the direct call to datasink.on_write_start() in write_datasink.
    • Replaced the _FileDatasink pre-flight check with a call to datasink._pre_flight_check().
    • Modified _convert_to_pa_type to correctly handle TensorDtype by passing dtype._dtype to the recursive call for nested types.
  • python/ray/data/datasource/datasink.py
    • Added TYPE_CHECKING import for pyarrow as pa.
    • Modified on_write_start to accept an optional pa.Schema argument and updated its docstring to reflect deferred invocation and schema provision.
  • python/ray/data/datasource/file_datasink.py
    • Introduced _pre_flight_check method to handle SaveMode logic (ERROR, IGNORE, OVERWRITE) before execution starts, returning True if the write should be skipped.
    • Updated on_write_start to only handle directory creation, as SaveMode checks are now managed by _pre_flight_check.
Activity
  • The author goutamvenkat-anyscale requested a summary of the pull request.
  • gemini-code-assist[bot] identified a critical regression where a PyArrow schema check was removed, potentially causing type conversion failures.
  • gemini-code-assist[bot] also pointed out redundant imports in dataset.py.
  • cursor[bot] reported a high-severity bug regarding the missing PyArrow schema check, echoing gemini-code-assist[bot]'s finding.
  • cursor[bot] reported a high-severity bug where empty datasets would skip on_write_start, leading to AttributeError in on_write_complete for datasinks like Iceberg.
  • cursor[bot] reported a medium-severity bug in ActorPoolMapOperator where actors were started before the on_start callback could modify the transformer state.
  • cursor[bot] reported a medium-severity bug concerning inconsistent Arrow type conversions for TensorDtype across different code paths.
  • alexeykudinkin suggested an alternative approach for schema extraction: slicing a single row and converting that block to Arrow.
  • goutamvenkat-anyscale raised concerns about the overhead of ray.get(block_ref) in the suggested schema extraction method.
  • goutamvenkat-anyscale later confirmed the removal of the problematic schema extraction function.
  • alexeykudinkin recommended making the _map_transformer_ref initialization lazy.
  • alexeykudinkin suggested moving the _notify_first_input call into add_bundled_input.
  • goutamvenkat-anyscale noted that _add_bundled_input is an abstract method, implying replication would be needed.
  • alexeykudinkin inquired about the specific scenario causing issues with empty dataset writes.
  • goutamvenkat-anyscale clarified that empty dataset writes to Iceberg would crash on_write_complete due to an uninitialized transaction object.
  • goutamvenkat-anyscale later removed the special handling for empty datasets, likely due to the _pre_flight_check refactoring.
  • alexeykudinkin questioned why certain logic remained forked and advocated for consolidation.
  • alexeykudinkin suggested adding self._on_start_called = False for better state management.

Comment on lines 237 to 245
def _get_schema_from_bundle(self, bundle: RefBundle) -> Optional["pa.Schema"]:
"""Extract PyArrow schema from a RefBundle.
For Arrow schemas, returns directly. For Pandas blocks, runs a lightweight
remote task to convert a 1-row sample to Arrow and extract the schema.
This ensures schema consistency with actual block conversion logic without
fetching block data to the driver.
"""
import pyarrow as pa
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this an Operator's method (we're not using self, there's no overriding in subclasses, etc)

Comment on lines 5260 to 5264
if isinstance(datasink, _FileDatasink):
if not datasink.has_created_dir and datasink.mode == SaveMode.IGNORE:
if datasink._pre_flight_check():
logger.info(
f"Ignoring write because {datasink.path} already exists"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just make this on_write_start, don't create another method (unnecessarily)

@alexeykudinkin alexeykudinkin merged commit b8f22e9 into ray-project:master Dec 10, 2025
6 checks passed
Comment on lines +5261 to +5262
datasink.on_write_start()
if datasink._skip_write:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave a TODO to clean this up -- there'd be no special-casing for skipped writes

@goutamvenkat-anyscale goutamvenkat-anyscale deleted the goutam/first_bundle_schema_write branch December 10, 2025 01:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ray fails to serialize self-reference objects

2 participants