Skip to content

Conversation

derrickaw
Copy link
Collaborator

@derrickaw derrickaw commented Aug 25, 2025

#35742


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@derrickaw
Copy link
Collaborator Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a valuable feature by allowing an optional output_schema to be specified for any YAML transform. This enables schema validation and robust error handling across the pipeline. The implementation is well-structured, adding a Validate transform dynamically after a transform's execution. The logic correctly handles various scenarios, including schemaless PCollections, single and multiple outputs, and error handling configurations. The changes are supported by a comprehensive set of new tests and clear documentation. I have a few minor suggestions to improve the documentation and test file formatting.

@derrickaw derrickaw marked this pull request as ready for review August 25, 2025 19:48
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@derrickaw
Copy link
Collaborator Author

assign set of reviewers

Copy link
Contributor

Assigning reviewers:

R: @jrmccluskey for label python.
R: @damccorm for label website.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Contributor

@jrmccluskey jrmccluskey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Largely LGTM, just one nit

# the validation downstream.
clean_schema = SafeLineLoader.strip_metadata(spec)

def enforce_schema(pcoll, label, error_handling_spec):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this function need to be defined within expand_output_schema_transform() ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, updated, thanks!

Copy link
Contributor

@damccorm damccorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this is excellent - I had one broad comment, but overall this is a great change

for (name, convert) in converters.items()
if (converted := convert(getattr(row, name, None))) is not None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would this be None? What condition are we guarding against?

In theory I'd expect the previous conversion to be a bit more correct.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was running through many scenarios during development and ran into an issue with this, but I don't think its needed anymore. Good catch. Thanks.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Figured it out after Validate_with_schema test failed after the revert :) :
So there is a bug in that transform for Null fields. The validator treats it as a failed row if the schema has one thing and the field is None. So we filter out those fields if they are None and let the Validator validate on that row. For example:

BeamSchema_....(name='Bob', score=None, age=25)
During the conversion process to json -> {'name': 'Bob', 'score': None, 'age': 25}
Validation will fail on this row with this schema:
{'type': 'object', 'properties': {'name': {'type': 'string'}, 'age': {'type': 'integer'}, 'score': {'type': 'number'}}}

But if we convert that BeamRow to -> {'name': 'Bob', 'age': 25}
Then it passes fine.

My understanding of the code base is that we would have to update the jsonschema package to allow None, but that seems like a non-starter.

Copy link
Contributor

@damccorm damccorm Aug 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I think this is fine - arguably both should error, but I think making all fields nullable is ok.

Eventually it might be good to add a optional: False field so that we can validate null fields as errors.

I am fine leaving as is for now though

Comment on lines 110 to 111
error_handling:
output: invalid_schema_rows
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't gotten to the point where we actually implement this, but it seems to me like it might be more useful to just have a single error_handling output associated with a transform where we capture all problematic records. In most cases, a user is going to want to write these somewhere for manual inspection/reprocessing and I don't think it matters too much whether it is because the output is an error or the output doesn't match the expected schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be done with a flatten step where you unify the error output from schema validation and normal exception handling.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the user may want more control over this and easily filter out the issue in the beginning based on generic issues with the transform versus schema issues.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice, they're both probably going to be data issues which users want to pipe to some exception handling sink, so I think treating them as the same collection probably makes sense.

Copy link
Collaborator Author

@derrickaw derrickaw Aug 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will discuss with group also to finalize. Thanks.

- {sdk: MillWheel, year: 2008}
```

However, a user will more likely want to detect and handle schema errors. This is where adding an `error_handling` configuration inside the `output_schema` comes into play. For example, the following code will
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we stay with the current approach, we should call out the difference between normal error handling and schema error handling here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added another paragraph below, thanks

@derrickaw
Copy link
Collaborator Author

Offline decision is to revamp this design to only have one error_handling. Updates pending...

@derrickaw derrickaw marked this pull request as draft August 29, 2025 03:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants