diff --git a/CHANGES.md b/CHANGES.md index 0394882d8a7a..315268b3b556 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -119,7 +119,6 @@ - Use the `--element_processing_timeout_minutes` option to reduce the chance of having stalled pipelines due to unexpected cases of slow processing, where slowness might not happen again if processing of the same element is retried. * (Python) Adding GCP Spanner Change Stream support for Python (apache_beam.io.gcp.spanner) ([#24103](https://github.com/apache/beam/issues/24103)). - ## Breaking Changes * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). diff --git a/sdks/python/apache_beam/yaml/README.md b/sdks/python/apache_beam/yaml/README.md index 56b5e7bedff7..68576726b20f 100644 --- a/sdks/python/apache_beam/yaml/README.md +++ b/sdks/python/apache_beam/yaml/README.md @@ -62,9 +62,9 @@ or pytest -v integration_tests.py::Test ``` -To run the postcommit tests: +To run some of the postcommit tests, for example: ```bash -pytest -v integration_tests.py --test_files_dir="extended_tests" +pytest -v integration_tests.py --test_files_dir="extended_tests/messaging" ``` diff --git a/sdks/python/apache_beam/yaml/json_utils.py b/sdks/python/apache_beam/yaml/json_utils.py index 893ebba55103..2d8f32051973 100644 --- a/sdks/python/apache_beam/yaml/json_utils.py +++ b/sdks/python/apache_beam/yaml/json_utils.py @@ -287,8 +287,10 @@ def row_to_json(beam_type: schema_pb2.FieldType) -> Callable[[Any], Any]: for field in beam_type.row_type.schema.fields } return lambda row: { - name: convert(getattr(row, name)) + name: converted for (name, convert) in converters.items() + # To filter out nullable fields in rows + if (converted := convert(getattr(row, name, None))) is not None } elif type_info == "logical_type": return lambda value: value @@ -348,6 +350,9 @@ def validate(row): nonlocal validator if validator is None: validator = jsonschema.validators.validator_for(json_schema)(json_schema) + # NOTE: A row like BeamSchema_...(name='Bob', score=None, age=25) needs to + # have any fields that are None to be filtered out or the validator will + # fail (e.g. {'age': 25, 'name': 'Bob'}). validator.validate(convert(row)) return validate diff --git a/sdks/python/apache_beam/yaml/pipeline.schema.yaml b/sdks/python/apache_beam/yaml/pipeline.schema.yaml index afcb2e23a663..35625d58d160 100644 --- a/sdks/python/apache_beam/yaml/pipeline.schema.yaml +++ b/sdks/python/apache_beam/yaml/pipeline.schema.yaml @@ -45,6 +45,8 @@ $defs: properties: { __line__: {}} additionalProperties: type: string + output_schema: + type: object additionalProperties: true required: - type @@ -129,6 +131,7 @@ $defs: name: {} input: {} output: {} + output_schema: { type: object } windowing: {} resource_hints: {} config: { type: object } diff --git a/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml b/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml index edaa581214ea..13e56f22edb5 100644 --- a/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml +++ b/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml @@ -82,3 +82,90 @@ pipelines: config: elements: - {user: bob, timestamp: 3} + +# Assign timestamp to beam row element with error handling and output schema +# check. + - pipeline: + type: composite + transforms: + - type: Create + name: CreateVisits + config: + elements: + - {user: alice, timestamp: "not-valid"} + - {user: bob, timestamp: 3} + - type: AssignTimestamps + input: CreateVisits + config: + timestamp: timestamp + error_handling: + output: invalid_rows + output_schema: + type: object + properties: + user: + type: string + timestamp: + type: integer + - type: MapToFields + name: ExtractInvalidTimestamp + input: AssignTimestamps.invalid_rows + config: + language: python + fields: + user: "element.user" + timestamp: "element.timestamp" + - type: AssertEqual + input: ExtractInvalidTimestamp + config: + elements: + - {user: "alice", timestamp: "not-valid"} + - type: AssertEqual + input: AssignTimestamps + config: + elements: + - {user: bob, timestamp: 3} + +# Assign timestamp to beam row element with error handling and output schema +# check with more error handling. + - pipeline: + type: composite + transforms: + - type: Create + name: CreateVisits + config: + elements: + - {user: alice, timestamp: "not-valid"} + - {user: bob, timestamp: 3} + - type: AssignTimestamps + input: CreateVisits + config: + timestamp: timestamp + error_handling: + output: invalid_rows + output_schema: + type: object + properties: + user: + type: string + timestamp: + type: boolean + - type: MapToFields + name: ExtractInvalidTimestamp + input: AssignTimestamps.invalid_rows + config: + language: python + fields: + user: "element.user" + timestamp: "element.timestamp" + - type: AssertEqual + input: ExtractInvalidTimestamp + config: + elements: + - {user: "alice", timestamp: "not-valid"} + - {user: bob, timestamp: 3} + - type: AssertEqual + input: AssignTimestamps + config: + elements: [] + diff --git a/sdks/python/apache_beam/yaml/tests/create.yaml b/sdks/python/apache_beam/yaml/tests/create.yaml index 30f276671874..bf346f7667c8 100644 --- a/sdks/python/apache_beam/yaml/tests/create.yaml +++ b/sdks/python/apache_beam/yaml/tests/create.yaml @@ -115,3 +115,26 @@ pipelines: - {sdk: MapReduce, year: 2004} - {} - {sdk: MillWheel, year: 2008} + + # Simple Create with output schema check + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {sdk: MapReduce, year: 2004} + - {sdk: MillWheel, year: 2008} + output_schema: + type: object + properties: + sdk: + type: string + year: + type: integer + - type: AssertEqual + config: + elements: + - {sdk: MapReduce, year: 2004} + - {sdk: MillWheel, year: 2008} + diff --git a/sdks/python/apache_beam/yaml/tests/validate_with_schema.yaml b/sdks/python/apache_beam/yaml/tests/validate_with_schema.yaml index d5ae57a3e8c1..1df599e254a7 100644 --- a/sdks/python/apache_beam/yaml/tests/validate_with_schema.yaml +++ b/sdks/python/apache_beam/yaml/tests/validate_with_schema.yaml @@ -92,4 +92,48 @@ pipelines: - {name: "Alice", age: 30, score: 95.5} - {name: "Bob", age: 25, score: 88.0} - + # Validate a Beam Row with a predefined schema, nulls, and error handling + - pipeline: + type: composite + transforms: + - type: Create + config: + elements: + - {name: "Alice", age: 30, score: 95.5} + - {name: "Bob", age: 25} + - {name: "Charlie", age: 27, score: "apple"} + - type: ValidateWithSchema + input: Create + config: + schema: + type: object + properties: + name: + type: string + age: + type: integer + score: + type: number + error_handling: + output: invalid_rows + # ValidateWithSchema outputs the element, error msg, and traceback, so + # MapToFields is needed to easily assert downstream. + - type: MapToFields + input: ValidateWithSchema.invalid_rows + config: + language: python + fields: + name: "element.name" + age: "element.age" + score: "element.score" + - type: AssertEqual + input: ValidateWithSchema + config: + elements: + - {name: "Alice", age: 30, score: 95.5} + - {name: "Bob", age: 25} + - type: AssertEqual + input: MapToFields + config: + elements: + - {name: "Charlie", age: 27, score: "apple"} diff --git a/sdks/python/apache_beam/yaml/yaml_errors.py b/sdks/python/apache_beam/yaml/yaml_errors.py index dace44ca09f6..66e9de058f1a 100644 --- a/sdks/python/apache_beam/yaml/yaml_errors.py +++ b/sdks/python/apache_beam/yaml/yaml_errors.py @@ -21,6 +21,7 @@ import apache_beam as beam from apache_beam.typehints.row_type import RowTypeConstraint +from apache_beam.yaml.yaml_utils import SafeLineLoader class ErrorHandlingConfig(NamedTuple): @@ -35,9 +36,11 @@ class ErrorHandlingConfig(NamedTuple): def exception_handling_args(error_handling_spec): if error_handling_spec: + # error_handling_spec may have come from a yaml file and have metadata. + clean_spec = SafeLineLoader.strip_metadata(error_handling_spec) return { 'dead_letter_tag' if k == 'output' else k: v - for (k, v) in error_handling_spec.items() + for (k, v) in clean_spec.items() } else: return None diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index da454932eb57..a6b2b5704751 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -487,7 +487,7 @@ def expand(self, pcoll): typing_from_runner_api(existing_fields[fld])) -class _Validate(beam.PTransform): +class Validate(beam.PTransform): """Validates each element of a PCollection against a json schema. Args: @@ -982,7 +982,7 @@ def create_mapping_providers(): 'Partition-javascript': _Partition, 'Partition-generic': _Partition, 'StripErrorMetadata': _StripErrorMetadata, - 'ValidateWithSchema': _Validate, + 'ValidateWithSchema': Validate, }), yaml_provider.SqlBackedProvider({ 'Filter-sql': _SqlFilterTransform, diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index 70ad9309dc34..3af457b7010b 100755 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -759,9 +759,12 @@ def _unify_element_with_schema(element, target_schema): elif isinstance(element, dict): element_dict = element else: - # This element is not a row, so it can't be unified with a - # row schema. - return element + # This element is not a row-like object. If the target schema has a single + # field, assume this element is the value for that field. + if len(target_schema._fields) == 1: + return target_schema(**{target_schema._fields[0]: element}) + else: + return element # Create new element with only the fields that exist in the original # element plus None for fields that are expected but missing diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index 341cd8d65f41..e2390bf12d45 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -34,9 +34,13 @@ from apache_beam.io.filesystems import FileSystems from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.transforms.fully_qualified_named_transform import FullyQualifiedNamedTransform +from apache_beam.typehints import schemas +from apache_beam.typehints import typehints +from apache_beam.yaml import json_utils from apache_beam.yaml import yaml_provider from apache_beam.yaml import yaml_utils from apache_beam.yaml.yaml_combine import normalize_combine +from apache_beam.yaml.yaml_mapping import Validate from apache_beam.yaml.yaml_mapping import normalize_mapping from apache_beam.yaml.yaml_mapping import validate_generic_expressions from apache_beam.yaml.yaml_utils import SafeLineLoader @@ -481,6 +485,15 @@ def expand_transform(spec, scope): def expand_leaf_transform(spec, scope): + spec = spec.copy() + + # Check for optional output_schema to verify on. + # The idea is to pass this output_schema config to the ValidateWithSchema + # transform. + output_schema_spec = {} + if 'output_schema' in spec.get('config', {}): + output_schema_spec = spec.get('config').pop('output_schema') + spec = normalize_inputs_outputs(spec) inputs_dict = { key: scope.get_pcollection(value) @@ -507,6 +520,20 @@ def expand_leaf_transform(spec, scope): except Exception as exn: raise ValueError( f"Error applying transform {identify_object(spec)}: {exn}") from exn + + # Optional output_schema was found, so lets expand on that before returning. + if output_schema_spec: + error_handling_spec = {} + # Obtain original transform error_handling_spec, so that all validate + # schema errors use that. + if 'error_handling' in spec.get('config', None): + error_handling_spec = spec.get('config').get('error_handling', {}) + + outputs = expand_output_schema_transform( + spec=output_schema_spec, + outputs=outputs, + error_handling_spec=error_handling_spec) + if isinstance(outputs, dict): # TODO: Handle (or at least reject) nested case. return outputs @@ -522,6 +549,242 @@ def expand_leaf_transform(spec, scope): f'{type(outputs)}') +def expand_output_schema_transform(spec, outputs, error_handling_spec): + """Applies a `Validate` transform to the output of another transform. + + This function is called when an `output_schema` is defined on a transform. + It wraps the original transform's output(s) with a `Validate` transform + to ensure the data conforms to the specified schema. + + If the original transform has error handling configured, validation errors + will be routed to the specified error output. If not, validation failures + will cause the pipeline to fail. + + Args: + spec (dict): The `output_schema` specification from the YAML config. + outputs (beam.PCollection or dict[str, beam.PCollection]): The output(s) + from the transform to be validated. + error_handling_spec (dict): The `error_handling` configuration from the + original transform. + + Returns: + The validated PCollection(s). If error handling is enabled, this will be a + dictionary containing the 'good' output and any error outputs. + + Raises: + ValueError: If `error_handling` is incorrectly specified within the + `output_schema` spec itself, or if the main output of a multi-output + transform cannot be determined. + """ + if 'error_handling' in spec: + raise ValueError( + 'error_handling config is not supported directly in ' + 'the output_schema. Please use error_handling config in ' + 'the transform, if possible, or use ValidateWithSchema transform ' + 'instead.') + + # Strip metadata such as __line__ and __uuid__ as these will interfere with + # the validation downstream. + clean_schema = SafeLineLoader.strip_metadata(spec) + + # If no error handling is specified for the main transform, warn the user + # that the pipeline may fail if any output data fails the output schema + # validation. + if not error_handling_spec: + _LOGGER.warning("Output_schema config is attached to a transform that has "\ + "no error_handling config specified. Any failures validating on output" \ + "schema will fail the pipeline unless the user specifies an" \ + "error_handling config on a capable transform or the user can remove the" \ + "output_schema config on this transform and add a ValidateWithSchema " \ + "transform downstream of the current transform.") + + # The transform produced outputs with a single beam.PCollection + if isinstance(outputs, beam.PCollection): + outputs = _enforce_schema( + outputs, 'EnforceOutputSchema', error_handling_spec, clean_schema) + if isinstance(outputs, dict): + main_tag = error_handling_spec.get('main_tag', 'good') + main_output = outputs.pop(main_tag) + if error_handling_spec: + error_output_tag = error_handling_spec.get('output') + if error_output_tag in outputs: + return { + 'output': main_output, + error_output_tag: outputs.pop(error_output_tag) + } + return main_output + + # The transform produced outputs with many named PCollections and need to + # determine which PCollection should be validated on. + elif isinstance(outputs, dict): + main_output_key = _get_main_output_key(spec, outputs) + + validation_result = _enforce_schema( + outputs[main_output_key], + f'EnforceOutputSchema_{main_output_key}', + error_handling_spec, + clean_schema) + outputs = _integrate_validation_results( + outputs, validation_result, main_output_key, error_handling_spec) + + return outputs + + +def _get_main_output_key(spec, outputs): + """Determines the main output key from a dictionary of PCollections. + + This is used to identify which output of a multi-output transform should be + validated against an `output_schema`. + + The main output is determined using the following precedence: + 1. An output with the key 'output'. + 2. An output with the key 'good'. + 3. The single output if there is only one. + + Args: + spec: The transform specification, used for creating informative error + messages. + outputs: A dictionary mapping output tags to their corresponding + PCollections. + + Returns: + The key of the main output PCollection. + + Raises: + ValueError: If a main output cannot be determined because there are + multiple outputs and none are named 'output' or 'good'. + """ + main_output_key = 'output' + if main_output_key not in outputs: + if 'good' in outputs: + main_output_key = 'good' + elif len(outputs) == 1: + main_output_key = next(iter(outputs.keys())) + else: + raise ValueError( + f"Transform {identify_object(spec)} has outputs " + f"{list(outputs.keys())}, but none are named 'output'. To apply " + "an 'output_schema', please ensure the transform has exactly one " + "output, or that the main output is named 'output'.") + return main_output_key + + +def _integrate_validation_results( + outputs, validation_result, main_output_key, error_handling_spec): + """ + Integrates the results of a validation transform back into the outputs of + the original transform. + + This function handles merging the "good" and "bad" outputs from a + `Validate` transform with the existing outputs of the transform that was + validated. + + Args: + outputs: The original dictionary of output PCollections from the transform. + validation_result: The output of the `Validate` transform. This can be a + single PCollection (if all elements passed) or a dictionary of + PCollections (if error handling was enabled for validation). + main_output_key: The key in the `outputs` dictionary corresponding to the + PCollection that was validated. + error_handling_spec: The error handling configuration of the original + transform. + + Returns: + The updated dictionary of output PCollections, with validation results + integrated. + + Raises: + ValueError: If the validation transform produces unexpected outputs. + """ + if not isinstance(validation_result, dict): + outputs[main_output_key] = validation_result + return outputs + + # The main output from validation is the good output. + main_tag = error_handling_spec.get('main_tag', 'good') + outputs[main_output_key] = validation_result.pop(main_tag) + + if error_handling_spec: + error_output_tag = error_handling_spec['output'] + if error_output_tag in validation_result: + schema_error_pcoll = validation_result.pop(error_output_tag) + if error_output_tag in outputs: + # The original transform also had an error output. Merge them. + outputs[error_output_tag] = ( + (outputs[error_output_tag], schema_error_pcoll) + | f'FlattenErrors_{main_output_key}' >> beam.Flatten()) + else: + # No error output in the original transform, so just add this one. + outputs[error_output_tag] = schema_error_pcoll + + # There should be no other outputs from validation. + if validation_result: + raise ValueError( + "Unexpected outputs from validation: " + f"{list(validation_result.keys())}") + + return outputs + + +def _enforce_schema(pcoll, label, error_handling_spec, clean_schema): + """Applies schema to PCollection elements if necessary, then validates. + + This function ensures that the input PCollection conforms to a specified + schema. If the PCollection is schemaless (i.e., its element_type is Any), + it attempts to convert its elements into schema-aware `beam.Row` objects + based on the provided `clean_schema`. After ensuring the PCollection has + a defined schema, it applies a `Validate` transform to perform the actual + schema validation. + + Args: + pcoll: The input PCollection to be schema-enforced and validated. + label: A string label to be used for the Beam transforms created within this + function. + error_handling_spec: A dictionary specifying how to handle validation + errors. + clean_schema: A dictionary representing the schema to enforce and validate + against. + + Returns: + A PCollection (or PCollectionTuple if error handling is enabled) resulting + from the `Validate` transform. + """ + if pcoll.element_type == typehints.Any: + _LOGGER.info( + "PCollection for %s has no schema (element_type=Any). " + "Converting elements to beam.Row based on provided output_schema.", + label) + try: + # Attempt to confer the schemaless elements into schema-aware beam.Row + # objects + beam_schema = json_utils.json_schema_to_beam_schema(clean_schema) + row_type_constraint = schemas.named_tuple_from_schema(beam_schema) + + def to_row(element): + """ + Convert a single element into the row type constraint type. + """ + if isinstance(element, dict): + return row_type_constraint(**element) + elif hasattr(element, '_asdict'): # Handle NamedTuple, beam.Row + return row_type_constraint(**element._asdict()) + else: + raise TypeError( + f"Cannot convert element of type {type(element)} to beam.Row " + f"for validation in {label}. Element: {element}") + + pcoll = pcoll | f'{label}_ConvertToRow' >> beam.Map( + to_row).with_output_types(row_type_constraint) + except Exception as e: + raise ValueError( + f"Failed to prepare schemaless PCollection for \ + validation in {label}: {e}") from e + + # Add Validation step downstream of current transform + return pcoll | label >> Validate( + schema=clean_schema, error_handling=error_handling_spec) + + def expand_composite_transform(spec, scope): spec = normalize_inputs_outputs(normalize_source_sink(spec)) diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py b/sdks/python/apache_beam/yaml/yaml_transform_test.py index 543f13eeff58..80ac391d6e42 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py @@ -687,6 +687,72 @@ def test_flatten_unifies_complex_mixed_schemas(self): categories: []} ''') + def test_output_schema_success(self): + """Test that optional output_schema works.""" + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + _ = p | YamlTransform( + ''' + type: composite + transforms: + - type: Create + name: MyCreate + config: + elements: + - {sdk: 'Beam', year: 2016} + - {sdk: 'Flink', year: 2015} + output_schema: + type: object + properties: + sdk: + type: string + year: + type: integer + - type: AssertEqual + name: CheckGood + input: MyCreate + config: + elements: + - {sdk: 'Beam', year: 2016} + - {sdk: 'Flink', year: 2015} + ''') + + def test_output_schema_fails(self): + """ + Test that optional output_schema works by failing the pipeline since main + transform doesn't have error_handling config. + """ + with self.assertRaises(Exception) as e: + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + _ = p | YamlTransform( + ''' + type: composite + transforms: + - type: Create + name: MyCreate + config: + elements: + - {sdk: 'Beam', year: 2016} + - {sdk: 'Spark', year: 'date'} + - {sdk: 'Flink', year: 2015} + output_schema: + type: object + properties: + sdk: + type: string + year: + type: integer + - type: AssertEqual + name: CheckGood + input: MyCreate + config: + elements: + - {sdk: 'Beam', year: 2016} + - {sdk: 'Flink', year: 2015} + ''') + self.assertIn("'date' is not of type 'integer'", str(e.exception)) + class ErrorHandlingTest(unittest.TestCase): def test_error_handling_outputs(self): diff --git a/website/www/site/content/en/documentation/sdks/yaml-errors.md b/website/www/site/content/en/documentation/sdks/yaml-errors.md index 6edd1751a65b..8a836890a73e 100644 --- a/website/www/site/content/en/documentation/sdks/yaml-errors.md +++ b/website/www/site/content/en/documentation/sdks/yaml-errors.md @@ -40,7 +40,7 @@ the following code will write all "good" processed records to one file and any "bad" records, along with metadata about what error was encountered, to a separate file. -``` +```yaml pipeline: transforms: - type: ReadFromCsv @@ -87,7 +87,7 @@ Some transforms allow for extra arguments in their error_handling config, e.g. for Python functions one can give a `threshold` which limits the relative number of records that can be bad before considering the entire pipeline a failure -``` +```yaml pipeline: transforms: - type: ReadFromCsv @@ -122,7 +122,7 @@ pipeline: One can do arbitrary further processing on these failed records if desired, e.g. -``` +```yaml pipeline: transforms: - type: ReadFromCsv @@ -176,7 +176,7 @@ pipeline: When using the `chain` syntax, the required error consumption can happen in an `extra_transforms` block. -``` +```yaml pipeline: type: chain transforms: @@ -217,3 +217,5 @@ pipeline: config: path: /path/to/errors.json ``` + +See YAML schema [info](https://beam.apache.org/documentation/sdks/yaml-schema/) for another use of error_handling in a schema context. diff --git a/website/www/site/content/en/documentation/sdks/yaml-schema.md b/website/www/site/content/en/documentation/sdks/yaml-schema.md new file mode 100644 index 000000000000..143a42e230b5 --- /dev/null +++ b/website/www/site/content/en/documentation/sdks/yaml-schema.md @@ -0,0 +1,119 @@ +--- +type: languages +title: "Apache Beam YAML Schema" +--- + + +# Beam YAML Schema + +As pipelines grow in size and complexity, it becomes more common to encounter +data that is malformed, doesn't meet preconditions, or otherwise causes issues +during processing. + +Beam YAML helps the user detect and capture these issues by using the optional +`output_schema` configuration, which is available for any transform in the YAML +SDK. For example, the following code creates a few "good" records and specifies +that the output schema from the `Create` transform should have records that +follow the expected schema: `sdk` as a string and `year` as an integer. + +```yaml +pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {sdk: MapReduce, year: 2004} + - {sdk: MillWheel, year: 2008} + output_schema: + type: object + properties: + sdk: + type: string + year: + type: integer + - type: AssertEqual + config: + elements: + - {sdk: MapReduce, year: 2004} + - {sdk: MillWheel, year: 2008} +``` + +However, a user will more likely want to detect and handle schema errors. If a +transform has a built-in error_handling configuration, the user can specify that +error_handling configuration and any errors found will be appended to the +transform error_handling output. For example, the following code will +create a few "good" and "bad" records with a specified schema of `user` as a +string and `timestamp` as a boolean. The `alice` row will fail in the standard +way because of not being an integer for the AssignTimestamps transform, while +the `bob` row will fail because after the AssignTimestamp transformation, the +output row will have the timestamp as an integer when it should be a boolean. + + +```yaml +pipeline: + type: composite + transforms: + - type: Create + name: CreateVisits + config: + elements: + - {user: alice, timestamp: "not-valid"} + - {user: bob, timestamp: 3} + - type: AssignTimestamps + input: CreateVisits + config: + timestamp: timestamp + error_handling: + output: invalid_rows + output_schema: + type: object + properties: + user: + type: string + timestamp: + type: boolean + - type: MapToFields + name: ExtractInvalidTimestamp + input: AssignTimestamps.invalid_rows + config: + language: python + fields: + user: "element.user" + timestamp: "element.timestamp" + - type: AssertEqual + input: ExtractInvalidTimestamp + config: + elements: + - {user: "alice", timestamp: "not-valid"} + - {user: bob, timestamp: 3} + - type: AssertEqual + input: AssignTimestamps + config: + elements: [] +``` + +WARNING: If a transform doesn't have the error_handling configuration available +and a user chooses to use this optional output_schema feature, any failures +found will result in the entire pipeline failing. If the user would still like +to have some kind of output schema validation, please use the ValidateWithSchema +transform instead. + +For more detailed information on error handling, see this [page](https://beam.apache.org/documentation/sdks/yaml-errors/).