Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
## Bugfixes

* (Python) Fixed Java YAML provider fails on Windows ([#35617](https://github.com/apache/beam/issues/35617)).
* (Python) Fixed transform naming conflict when executing DataTransform on a dictionary of PColls ([#30445][https://github.com/apache/beam/issues/30445])
* Fixed BigQueryIO creating temporary datasets in wrong project when temp_dataset is specified with a different project than the pipeline project. For some jobs, temporary datasets will now be created in the correct project (Python) ([#35813](https://github.com/apache/beam/issues/35813)).
* (Go) Fix duplicates due to reads after blind writes to Bag State ([#35869](https://github.com/apache/beam/issues/35869)).
* Earlier Go SDK versions can avoid the issue by not reading in the same call after a blind write.
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/dataframe/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def expand(self, input_pcolls):
for tag in input_dict
}
input_frames: dict[Any, frame_base.DeferredFrame] = {
k: convert.to_dataframe(pc, proxies[k])
k: convert.to_dataframe(pc, proxies[k], str(k))
for k, pc in input_dict.items()
} # noqa: F821

Expand Down
20 changes: 20 additions & 0 deletions sdks/python/apache_beam/dataframe/transforms_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,26 @@ def check(actual):
lambda x: {'res': 3 * x}, proxy, yield_elements='pandas')
assert_that(res['res'], equal_to_series(three_series), 'CheckDictOut')

def test_multiple_dataframes_transforms(self):
expected_output = ["Bryan", "DKER2"]

def transform_func(a, b):
b["name"] = "DKER2"
return a, b

with beam.Pipeline() as p:
pcol1 = p | "Create1" >> beam.Create([beam.Row(name="Bryan")])
pcol2 = p | "Create2" >> beam.Create([beam.Row(name="common")])

result = ({
"a": pcol1, "b": pcol2
}
|
"TransformDF" >> transforms.DataframeTransform(transform_func)
| "Flatten" >> beam.Flatten()
| transforms.DataframeTransform(lambda df: df.name))
assert_that(result, equal_to(expected_output))

def test_cat(self):
# verify that cat works with a List[Series] since this is
# missing from doctests
Expand Down
Loading