diff --git a/CHANGES.md b/CHANGES.md index 0394882d8a7a..5fa6c0cb4910 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -142,6 +142,7 @@ ## Bugfixes * (Python) Fixed Java YAML provider fails on Windows ([#35617](https://github.com/apache/beam/issues/35617)). +* (Python) Fixed transform naming conflict when executing DataTransform on a dictionary of PColls ([#30445][https://github.com/apache/beam/issues/30445]) * Fixed BigQueryIO creating temporary datasets in wrong project when temp_dataset is specified with a different project than the pipeline project. For some jobs, temporary datasets will now be created in the correct project (Python) ([#35813](https://github.com/apache/beam/issues/35813)). * (Go) Fix duplicates due to reads after blind writes to Bag State ([#35869](https://github.com/apache/beam/issues/35869)). * Earlier Go SDK versions can avoid the issue by not reading in the same call after a blind write. diff --git a/sdks/python/apache_beam/dataframe/transforms.py b/sdks/python/apache_beam/dataframe/transforms.py index 7128726f5eb1..49fe881ec8e7 100644 --- a/sdks/python/apache_beam/dataframe/transforms.py +++ b/sdks/python/apache_beam/dataframe/transforms.py @@ -108,7 +108,7 @@ def expand(self, input_pcolls): for tag in input_dict } input_frames: dict[Any, frame_base.DeferredFrame] = { - k: convert.to_dataframe(pc, proxies[k]) + k: convert.to_dataframe(pc, proxies[k], str(k)) for k, pc in input_dict.items() } # noqa: F821 diff --git a/sdks/python/apache_beam/dataframe/transforms_test.py b/sdks/python/apache_beam/dataframe/transforms_test.py index a2ca2f9d3879..c5ca2b9a359c 100644 --- a/sdks/python/apache_beam/dataframe/transforms_test.py +++ b/sdks/python/apache_beam/dataframe/transforms_test.py @@ -317,6 +317,26 @@ def check(actual): lambda x: {'res': 3 * x}, proxy, yield_elements='pandas') assert_that(res['res'], equal_to_series(three_series), 'CheckDictOut') + def test_multiple_dataframes_transforms(self): + expected_output = ["Bryan", "DKER2"] + + def transform_func(a, b): + b["name"] = "DKER2" + return a, b + + with beam.Pipeline() as p: + pcol1 = p | "Create1" >> beam.Create([beam.Row(name="Bryan")]) + pcol2 = p | "Create2" >> beam.Create([beam.Row(name="common")]) + + result = ({ + "a": pcol1, "b": pcol2 + } + | + "TransformDF" >> transforms.DataframeTransform(transform_func) + | "Flatten" >> beam.Flatten() + | transforms.DataframeTransform(lambda df: df.name)) + assert_that(result, equal_to(expected_output)) + def test_cat(self): # verify that cat works with a List[Series] since this is # missing from doctests