Skip to content

Conversation

derrickaw
Copy link
Collaborator

@derrickaw derrickaw commented Aug 25, 2025

#35742


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@derrickaw
Copy link
Collaborator Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a valuable feature by allowing an optional output_schema to be specified for any YAML transform. This enables schema validation and robust error handling across the pipeline. The implementation is well-structured, adding a Validate transform dynamically after a transform's execution. The logic correctly handles various scenarios, including schemaless PCollections, single and multiple outputs, and error handling configurations. The changes are supported by a comprehensive set of new tests and clear documentation. I have a few minor suggestions to improve the documentation and test file formatting.

@derrickaw derrickaw marked this pull request as ready for review August 25, 2025 19:48
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@derrickaw
Copy link
Collaborator Author

assign set of reviewers

Copy link
Contributor

Assigning reviewers:

R: @jrmccluskey for label python.
R: @damccorm for label website.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Contributor

@jrmccluskey jrmccluskey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Largely LGTM, just one nit

# the validation downstream.
clean_schema = SafeLineLoader.strip_metadata(spec)

def enforce_schema(pcoll, label, error_handling_spec):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this function need to be defined within expand_output_schema_transform() ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, updated, thanks!

Copy link
Contributor

@damccorm damccorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this is excellent - I had one broad comment, but overall this is a great change

for (name, convert) in converters.items()
if (converted := convert(getattr(row, name, None))) is not None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would this be None? What condition are we guarding against?

In theory I'd expect the previous conversion to be a bit more correct.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was running through many scenarios during development and ran into an issue with this, but I don't think its needed anymore. Good catch. Thanks.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Figured it out after Validate_with_schema test failed after the revert :) :
So there is a bug in that transform for Null fields. The validator treats it as a failed row if the schema has one thing and the field is None. So we filter out those fields if they are None and let the Validator validate on that row. For example:

BeamSchema_....(name='Bob', score=None, age=25)
During the conversion process to json -> {'name': 'Bob', 'score': None, 'age': 25}
Validation will fail on this row with this schema:
{'type': 'object', 'properties': {'name': {'type': 'string'}, 'age': {'type': 'integer'}, 'score': {'type': 'number'}}}

But if we convert that BeamRow to -> {'name': 'Bob', 'age': 25}
Then it passes fine.

My understanding of the code base is that we would have to update the jsonschema package to allow None, but that seems like a non-starter.

Copy link
Contributor

@damccorm damccorm Aug 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I think this is fine - arguably both should error, but I think making all fields nullable is ok.

Eventually it might be good to add a optional: False field so that we can validate null fields as errors.

I am fine leaving as is for now though

Comment on lines 110 to 111
error_handling:
output: invalid_schema_rows
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't gotten to the point where we actually implement this, but it seems to me like it might be more useful to just have a single error_handling output associated with a transform where we capture all problematic records. In most cases, a user is going to want to write these somewhere for manual inspection/reprocessing and I don't think it matters too much whether it is because the output is an error or the output doesn't match the expected schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be done with a flatten step where you unify the error output from schema validation and normal exception handling.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the user may want more control over this and easily filter out the issue in the beginning based on generic issues with the transform versus schema issues.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice, they're both probably going to be data issues which users want to pipe to some exception handling sink, so I think treating them as the same collection probably makes sense.

Copy link
Collaborator Author

@derrickaw derrickaw Aug 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will discuss with group also to finalize. Thanks.

- {sdk: MillWheel, year: 2008}
```

However, a user will more likely want to detect and handle schema errors. This is where adding an `error_handling` configuration inside the `output_schema` comes into play. For example, the following code will
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we stay with the current approach, we should call out the difference between normal error handling and schema error handling here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added another paragraph below, thanks

@derrickaw
Copy link
Collaborator Author

Offline decision is to revamp this design to only have one error_handling. Updates pending...

@derrickaw derrickaw marked this pull request as draft August 29, 2025 03:10
@derrickaw
Copy link
Collaborator Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a valuable feature: an optional output_schema configuration for all YAML transforms. This allows for schema validation on the output of any transform, enhancing data quality and pipeline robustness. The implementation is well-structured, handling various scenarios like single vs. multiple outputs and integrating seamlessly with existing error handling. The addition of new tests and comprehensive documentation, including a new yaml-schema.md page, is commendable. I've identified a couple of minor typos in user-facing text and have provided suggestions for correction. Overall, this is a solid contribution.

@derrickaw
Copy link
Collaborator Author

Run Python PreCommit 3.12

@derrickaw derrickaw marked this pull request as ready for review September 4, 2025 20:28
@derrickaw derrickaw requested a review from damccorm September 5, 2025 20:32
Copy link
Contributor

@damccorm damccorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Some small remaining comments, but overall this is looking good

Comment on lines 597 to 599
"error_handling config on a capable transform or the user can remove the" \
"output_schema config on this transform and add a ValidateWithSchema " \
"transform downstream of the current transform.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"error_handling config on a capable transform or the user can remove the" \
"output_schema config on this transform and add a ValidateWithSchema " \
"transform downstream of the current transform.")
"error_handling config on a capable transform. Alternatively, you can remove the" \
"output_schema config on this transform and add a ValidateWithSchema " \
"transform with separate error handling downstream of the current transform.")

Nit for grammar/clarity

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(may break the linter, so you might want to apply the change yourself rather than committing from the UI)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thanks

# The transform produced outputs with many named PCollections and need to
# determine which PCollection should be validated on.
elif isinstance(outputs, dict):
main_output_key = _get_main_output_key(spec, outputs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behavior is a little surprising to me. If I have a PCollection which produces multiple non-error outputs, there are 3 possible behaviors I might expect:

  1. Validate only the main output (what you're doing here)
  2. Validate all non-error outputs (what I was expecting this to do)
  3. Throw and tell the user they should use the ValidateTransform instead

I think any of the 3 are reasonable. I'd probably lean towards (3) because it protects the user from accidental bad behavior, but it is also less convenient. I'll defer to you on what you want to do here (leaving it is fine), but if we do leave it I would recommend:

  1. Making sure we clearly document this (I don't think its called out right now, and I would document it no matter which behavior we choose)
  2. Explicitly testing this behavior
  3. Add a warning explaining that only the main output will be validated when we can definitely tell there's multiple outputs (aka there are 3+ outputs, or 2 outputs with no error output)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with keeping it as is.

  1. There is a doc string for the get_main_output_key method. Is that sufficient?
  2. Done
  3. There is a warning in that get_main_output_key method. Is that sufficient?

Thanks.

| f'FlattenErrors_{main_output_key}' >> beam.Flatten())
else:
# No error output in the original transform, so just add this one.
outputs[error_output_tag] = schema_error_pcoll
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this possible?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not anymore. Thanks.

Copy link

codecov bot commented Sep 6, 2025

Codecov Report

❌ Patch coverage is 77.21519% with 18 lines in your changes missing coverage. Please review.
✅ Project coverage is 56.81%. Comparing base (35969b3) to head (1b54a9c).
⚠️ Report is 22 commits behind head on master.

Files with missing lines Patch % Lines
sdks/python/apache_beam/yaml/yaml_transform.py 79.45% 15 Missing ⚠️
sdks/python/apache_beam/yaml/yaml_provider.py 0.00% 3 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #35952      +/-   ##
============================================
+ Coverage     56.79%   56.81%   +0.01%     
  Complexity     3385     3385              
============================================
  Files          1220     1220              
  Lines        185122   185201      +79     
  Branches       3508     3508              
============================================
+ Hits         105148   105216      +68     
- Misses        76649    76660      +11     
  Partials       3325     3325              
Flag Coverage Δ
python 81.00% <77.21%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@derrickaw
Copy link
Collaborator Author

Run Python_Transforms PreCommit 3.10

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants