Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@
- Use the `--element_processing_timeout_minutes` option to reduce the chance of having stalled pipelines due to unexpected cases of slow processing, where slowness might not happen again if processing of the same element is retried.
* (Python) Adding GCP Spanner Change Stream support for Python (apache_beam.io.gcp.spanner) ([#24103](https://github.com/apache/beam/issues/24103)).


## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/yaml/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ or
pytest -v integration_tests.py::<yaml_file_name_without_extension>Test
```

To run the postcommit tests:
To run some of the postcommit tests, for example:

```bash
pytest -v integration_tests.py --test_files_dir="extended_tests"
pytest -v integration_tests.py --test_files_dir="extended_tests/messaging"
```

7 changes: 6 additions & 1 deletion sdks/python/apache_beam/yaml/json_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,10 @@ def row_to_json(beam_type: schema_pb2.FieldType) -> Callable[[Any], Any]:
for field in beam_type.row_type.schema.fields
}
return lambda row: {
name: convert(getattr(row, name))
name: converted
for (name, convert) in converters.items()
# To filter out nullable fields in rows
if (converted := convert(getattr(row, name, None))) is not None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would this be None? What condition are we guarding against?

In theory I'd expect the previous conversion to be a bit more correct.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was running through many scenarios during development and ran into an issue with this, but I don't think its needed anymore. Good catch. Thanks.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Figured it out after Validate_with_schema test failed after the revert :) :
So there is a bug in that transform for Null fields. The validator treats it as a failed row if the schema has one thing and the field is None. So we filter out those fields if they are None and let the Validator validate on that row. For example:

BeamSchema_....(name='Bob', score=None, age=25)
During the conversion process to json -> {'name': 'Bob', 'score': None, 'age': 25}
Validation will fail on this row with this schema:
{'type': 'object', 'properties': {'name': {'type': 'string'}, 'age': {'type': 'integer'}, 'score': {'type': 'number'}}}

But if we convert that BeamRow to -> {'name': 'Bob', 'age': 25}
Then it passes fine.

My understanding of the code base is that we would have to update the jsonschema package to allow None, but that seems like a non-starter.

Copy link
Contributor

@damccorm damccorm Aug 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I think this is fine - arguably both should error, but I think making all fields nullable is ok.

Eventually it might be good to add a optional: False field so that we can validate null fields as errors.

I am fine leaving as is for now though

}
elif type_info == "logical_type":
return lambda value: value
Expand Down Expand Up @@ -348,6 +350,9 @@ def validate(row):
nonlocal validator
if validator is None:
validator = jsonschema.validators.validator_for(json_schema)(json_schema)
# NOTE: A row like BeamSchema_...(name='Bob', score=None, age=25) needs to
# have any fields that are None to be filtered out or the validator will
# fail (e.g. {'age': 25, 'name': 'Bob'}).
validator.validate(convert(row))

return validate
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/yaml/pipeline.schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ $defs:
properties: { __line__: {}}
additionalProperties:
type: string
output_schema:
type: object
additionalProperties: true
required:
- type
Expand Down Expand Up @@ -129,6 +131,7 @@ $defs:
name: {}
input: {}
output: {}
output_schema: { type: object }
windowing: {}
resource_hints: {}
config: { type: object }
Expand Down
87 changes: 87 additions & 0 deletions sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,90 @@ pipelines:
config:
elements:
- {user: bob, timestamp: 3}

# Assign timestamp to beam row element with error handling and output schema
# check.
- pipeline:
type: composite
transforms:
- type: Create
name: CreateVisits
config:
elements:
- {user: alice, timestamp: "not-valid"}
- {user: bob, timestamp: 3}
- type: AssignTimestamps
input: CreateVisits
config:
timestamp: timestamp
error_handling:
output: invalid_rows
output_schema:
type: object
properties:
user:
type: string
timestamp:
type: integer
- type: MapToFields
name: ExtractInvalidTimestamp
input: AssignTimestamps.invalid_rows
config:
language: python
fields:
user: "element.user"
timestamp: "element.timestamp"
- type: AssertEqual
input: ExtractInvalidTimestamp
config:
elements:
- {user: "alice", timestamp: "not-valid"}
- type: AssertEqual
input: AssignTimestamps
config:
elements:
- {user: bob, timestamp: 3}

# Assign timestamp to beam row element with error handling and output schema
# check with more error handling.
- pipeline:
type: composite
transforms:
- type: Create
name: CreateVisits
config:
elements:
- {user: alice, timestamp: "not-valid"}
- {user: bob, timestamp: 3}
- type: AssignTimestamps
input: CreateVisits
config:
timestamp: timestamp
error_handling:
output: invalid_rows
output_schema:
type: object
properties:
user:
type: string
timestamp:
type: boolean
- type: MapToFields
name: ExtractInvalidTimestamp
input: AssignTimestamps.invalid_rows
config:
language: python
fields:
user: "element.user"
timestamp: "element.timestamp"
- type: AssertEqual
input: ExtractInvalidTimestamp
config:
elements:
- {user: "alice", timestamp: "not-valid"}
- {user: bob, timestamp: 3}
- type: AssertEqual
input: AssignTimestamps
config:
elements: []

23 changes: 23 additions & 0 deletions sdks/python/apache_beam/yaml/tests/create.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,26 @@ pipelines:
- {sdk: MapReduce, year: 2004}
- {}
- {sdk: MillWheel, year: 2008}

# Simple Create with output schema check
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {sdk: MapReduce, year: 2004}
- {sdk: MillWheel, year: 2008}
output_schema:
type: object
properties:
sdk:
type: string
year:
type: integer
- type: AssertEqual
config:
elements:
- {sdk: MapReduce, year: 2004}
- {sdk: MillWheel, year: 2008}

46 changes: 45 additions & 1 deletion sdks/python/apache_beam/yaml/tests/validate_with_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,48 @@ pipelines:
- {name: "Alice", age: 30, score: 95.5}
- {name: "Bob", age: 25, score: 88.0}


# Validate a Beam Row with a predefined schema, nulls, and error handling
- pipeline:
type: composite
transforms:
- type: Create
config:
elements:
- {name: "Alice", age: 30, score: 95.5}
- {name: "Bob", age: 25}
- {name: "Charlie", age: 27, score: "apple"}
- type: ValidateWithSchema
input: Create
config:
schema:
type: object
properties:
name:
type: string
age:
type: integer
score:
type: number
error_handling:
output: invalid_rows
# ValidateWithSchema outputs the element, error msg, and traceback, so
# MapToFields is needed to easily assert downstream.
- type: MapToFields
input: ValidateWithSchema.invalid_rows
config:
language: python
fields:
name: "element.name"
age: "element.age"
score: "element.score"
- type: AssertEqual
input: ValidateWithSchema
config:
elements:
- {name: "Alice", age: 30, score: 95.5}
- {name: "Bob", age: 25}
- type: AssertEqual
input: MapToFields
config:
elements:
- {name: "Charlie", age: 27, score: "apple"}
5 changes: 4 additions & 1 deletion sdks/python/apache_beam/yaml/yaml_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import apache_beam as beam
from apache_beam.typehints.row_type import RowTypeConstraint
from apache_beam.yaml.yaml_utils import SafeLineLoader


class ErrorHandlingConfig(NamedTuple):
Expand All @@ -35,9 +36,11 @@ class ErrorHandlingConfig(NamedTuple):

def exception_handling_args(error_handling_spec):
if error_handling_spec:
# error_handling_spec may have come from a yaml file and have metadata.
clean_spec = SafeLineLoader.strip_metadata(error_handling_spec)
return {
'dead_letter_tag' if k == 'output' else k: v
for (k, v) in error_handling_spec.items()
for (k, v) in clean_spec.items()
}
else:
return None
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/yaml/yaml_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ def expand(self, pcoll):
typing_from_runner_api(existing_fields[fld]))


class _Validate(beam.PTransform):
class Validate(beam.PTransform):
"""Validates each element of a PCollection against a json schema.

Args:
Expand Down Expand Up @@ -982,7 +982,7 @@ def create_mapping_providers():
'Partition-javascript': _Partition,
'Partition-generic': _Partition,
'StripErrorMetadata': _StripErrorMetadata,
'ValidateWithSchema': _Validate,
'ValidateWithSchema': Validate,
}),
yaml_provider.SqlBackedProvider({
'Filter-sql': _SqlFilterTransform,
Expand Down
9 changes: 6 additions & 3 deletions sdks/python/apache_beam/yaml/yaml_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,9 +759,12 @@ def _unify_element_with_schema(element, target_schema):
elif isinstance(element, dict):
element_dict = element
else:
# This element is not a row, so it can't be unified with a
# row schema.
return element
# This element is not a row-like object. If the target schema has a single
# field, assume this element is the value for that field.
if len(target_schema._fields) == 1:
return target_schema(**{target_schema._fields[0]: element})
else:
return element

# Create new element with only the fields that exist in the original
# element plus None for fields that are expected but missing
Expand Down
Loading
Loading