From f1d9e93938881ddc0cbda5363272767da3a9a59d Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Tue, 24 Sep 2024 19:31:05 +0700 Subject: [PATCH 01/20] Log Warning if process function return None --- sdks/python/apache_beam/transforms/core.py | 15 +++++++- .../apache_beam/transforms/core_test.py | 38 +++++++++++++++++++ 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index d7415e8d8135..30afd40d76d9 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1497,15 +1497,26 @@ def _check_fn_use_yield_and_return(fn): source_code = _get_function_body_without_inners(fn) has_yield = False has_return = False + return_none_warning = ( + "No iterator is returned by the process method in %s.", + fn.__self__.__class__) for line in source_code.split("\n"): - if line.lstrip().startswith("yield ") or line.lstrip().startswith( + lstripped_line = line.lstrip() + if lstripped_line.startswith("yield ") or lstripped_line.startswith( "yield("): has_yield = True - if line.lstrip().startswith("return ") or line.lstrip().startswith( + if lstripped_line.startswith("return ") or lstripped_line.startswith( "return("): has_return = True + if lstripped_line.startswith( + "return None") or lstripped_line.rstrip() == "return": + _LOGGER.warning(return_none_warning) if has_yield and has_return: return True + + if not has_yield and not has_return: + _LOGGER.warning(return_none_warning) + return False except Exception as e: _LOGGER.debug(str(e)) diff --git a/sdks/python/apache_beam/transforms/core_test.py b/sdks/python/apache_beam/transforms/core_test.py index b492ab0938cc..54afb365d2d8 100644 --- a/sdks/python/apache_beam/transforms/core_test.py +++ b/sdks/python/apache_beam/transforms/core_test.py @@ -30,6 +30,8 @@ from apache_beam.testing.util import equal_to from apache_beam.transforms.window import FixedWindows +RETURN_NONE_PARTIAL_WARNING = "No iterator is returned" + class TestDoFn1(beam.DoFn): def process(self, element): @@ -96,6 +98,24 @@ def process(self, element): yield element +class TestDoFn10(beam.DoFn): + """test process returning None explicitly""" + def process(self, element): + return None + + +class TestDoFn11(beam.DoFn): + """test process returning None (no return and no yield)""" + def process(self, element): + pass + + +class TestDoFn12(beam.DoFn): + """test process returning None (return statement without a value)""" + def process(self, element): + return + + class CreateTest(unittest.TestCase): @pytest.fixture(autouse=True) def inject_fixtures(self, caplog): @@ -119,6 +139,24 @@ def test_dofn_with_yield_and_return(self): beam.ParDo(TestDoFn3()) assert warning_text in self._caplog.text + def test_dofn_with_explicit_return_none(self): + with self._caplog.at_level(logging.WARNING): + beam.ParDo(TestDoFn10()) + assert RETURN_NONE_PARTIAL_WARNING in self._caplog.text + assert str(TestDoFn10) in self._caplog.text + + def test_dofn_with_implicit_return_none_missing_return_and_yield(self): + with self._caplog.at_level(logging.WARNING): + beam.ParDo(TestDoFn11()) + assert RETURN_NONE_PARTIAL_WARNING in self._caplog.text + assert str(TestDoFn11) in self._caplog.text + + def test_dofn_with_implicit_return_none_return_without_value(self): + with self._caplog.at_level(logging.WARNING): + beam.ParDo(TestDoFn12()) + assert RETURN_NONE_PARTIAL_WARNING in self._caplog.text + assert str(TestDoFn12) in self._caplog.text + class PartitionTest(unittest.TestCase): def test_partition_boundedness(self): From 3cdb62910a62557b7208d5e8a2fb517c42664fe3 Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Wed, 25 Sep 2024 09:20:23 +0700 Subject: [PATCH 02/20] Fix get function without inner --- sdks/python/apache_beam/transforms/core.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 30afd40d76d9..42523dae9d55 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1463,7 +1463,10 @@ def _get_function_body_without_inners(func): source_lines = inspect.getsourcelines(func)[0] source_lines = dropwhile(lambda x: x.startswith("@"), source_lines) def_line = next(source_lines).strip() - if def_line.startswith("def ") and def_line.endswith(":"): + if def_line.startswith("def "): + while next(source_lines).split("#")[0].split("\"\"\"")[0].endswith(":"): + continue + first_line = next(source_lines) indentation = len(first_line) - len(first_line.lstrip()) final_lines = [first_line[indentation:]] From 9170808d4313ac2e5a81d7393231ce791a68e67f Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Wed, 25 Sep 2024 09:34:03 +0700 Subject: [PATCH 03/20] check the first def_line also --- sdks/python/apache_beam/transforms/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 42523dae9d55..f1001076b2ef 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1464,8 +1464,8 @@ def _get_function_body_without_inners(func): source_lines = dropwhile(lambda x: x.startswith("@"), source_lines) def_line = next(source_lines).strip() if def_line.startswith("def "): - while next(source_lines).split("#")[0].split("\"\"\"")[0].endswith(":"): - continue + while not def_line.split("#")[0].split("\"\"\"")[0].endswith(":"): + def_line = next(source_lines) first_line = next(source_lines) indentation = len(first_line) - len(first_line.lstrip()) From 9370ed24aa4edcde077a4586088eb8af7fe621a4 Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Wed, 25 Sep 2024 09:37:00 +0700 Subject: [PATCH 04/20] rename variable --- sdks/python/apache_beam/transforms/core.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index f1001076b2ef..2d142e3e628a 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1462,10 +1462,11 @@ def partition_for(self, element, num_partitions, *args, **kwargs): def _get_function_body_without_inners(func): source_lines = inspect.getsourcelines(func)[0] source_lines = dropwhile(lambda x: x.startswith("@"), source_lines) - def_line = next(source_lines).strip() - if def_line.startswith("def "): - while not def_line.split("#")[0].split("\"\"\"")[0].endswith(":"): - def_line = next(source_lines) + first_def_line = next(source_lines).strip() + if first_def_line.startswith("def "): + last_def_line = first_def_line + while not last_def_line.split("#")[0].split("\"\"\"")[0].endswith(":"): + last_def_line = next(source_lines) first_line = next(source_lines) indentation = len(first_line) - len(first_line.lstrip()) @@ -1490,7 +1491,7 @@ def _get_function_body_without_inners(func): return "".join(final_lines) else: - return def_line.rsplit(":")[-1].strip() + return first_def_line.rsplit(":")[-1].strip() def _check_fn_use_yield_and_return(fn): From 62196640b6c6f1200975589b4ade96e86cb9c0d8 Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Wed, 25 Sep 2024 18:12:30 +0700 Subject: [PATCH 05/20] add strip function --- sdks/python/apache_beam/transforms/core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 2d142e3e628a..1d596536d2be 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1465,7 +1465,8 @@ def _get_function_body_without_inners(func): first_def_line = next(source_lines).strip() if first_def_line.startswith("def "): last_def_line = first_def_line - while not last_def_line.split("#")[0].split("\"\"\"")[0].endswith(":"): + while not last_def_line.split("#")[0].split("\"\"\"")[0].strip().endswith( + ":"): last_def_line = next(source_lines) first_line = next(source_lines) From 5a3c1a6c7e9d581fdc4a3850c297570c6226c706 Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Wed, 25 Sep 2024 18:14:12 +0700 Subject: [PATCH 06/20] reformat function --- sdks/python/apache_beam/transforms/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 1d596536d2be..46ad948bb0a9 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1465,8 +1465,8 @@ def _get_function_body_without_inners(func): first_def_line = next(source_lines).strip() if first_def_line.startswith("def "): last_def_line = first_def_line - while not last_def_line.split("#")[0].split("\"\"\"")[0].strip().endswith( - ":"): + while not ( + last_def_line.split("#")[0].split("\"\"\"")[0].strip().endswith(":")): last_def_line = next(source_lines) first_line = next(source_lines) From 3bfc43d77f16961f2883a51679ed1bc83db4f178 Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Wed, 25 Sep 2024 18:20:01 +0700 Subject: [PATCH 07/20] refactor code --- sdks/python/apache_beam/transforms/core.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 46ad948bb0a9..a3762adac0cb 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1464,10 +1464,11 @@ def _get_function_body_without_inners(func): source_lines = dropwhile(lambda x: x.startswith("@"), source_lines) first_def_line = next(source_lines).strip() if first_def_line.startswith("def "): - last_def_line = first_def_line - while not ( - last_def_line.split("#")[0].split("\"\"\"")[0].strip().endswith(":")): - last_def_line = next(source_lines) + last_def_line_without_comment = first_def_line.split("#")[0] \ + .split("\"\"\"")[0] + while not last_def_line_without_comment.strip().endswith(":"): + last_def_line_without_comment = next(source_lines).split("#")[0] \ + .split("\"\"\"")[0] first_line = next(source_lines) indentation = len(first_line) - len(first_line.lstrip()) From 278d1863e3d861824dc514ebcbb8057f95edc06f Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Sun, 29 Sep 2024 17:17:09 +0700 Subject: [PATCH 08/20] fix bug in get function body --- sdks/python/apache_beam/transforms/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index a3762adac0cb..3fec653a705b 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1461,7 +1461,7 @@ def partition_for(self, element, num_partitions, *args, **kwargs): def _get_function_body_without_inners(func): source_lines = inspect.getsourcelines(func)[0] - source_lines = dropwhile(lambda x: x.startswith("@"), source_lines) + source_lines = dropwhile(lambda x: x.strip().startswith("@"), source_lines) first_def_line = next(source_lines).strip() if first_def_line.startswith("def "): last_def_line_without_comment = first_def_line.split("#")[0] \ From e8193668022fdba8efb2b6fe9413d990e3494c87 Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Sun, 29 Sep 2024 18:04:56 +0700 Subject: [PATCH 09/20] retrigger test --- sdks/python/apache_beam/transforms/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 3fec653a705b..a1af96c865d1 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1504,8 +1504,8 @@ def _check_fn_use_yield_and_return(fn): has_yield = False has_return = False return_none_warning = ( - "No iterator is returned by the process method in %s.", - fn.__self__.__class__) + f"No iterator is returned by the " + f"process method in {fn.__self__.__class__}.") for line in source_code.split("\n"): lstripped_line = line.lstrip() if lstripped_line.startswith("yield ") or lstripped_line.startswith( From 25d5431232b6134713a5b396b9a4330280e1d9bb Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Sun, 29 Sep 2024 18:41:06 +0700 Subject: [PATCH 10/20] retrigger test --- sdks/python/apache_beam/transforms/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index a1af96c865d1..256dbd87ddba 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1461,7 +1461,7 @@ def partition_for(self, element, num_partitions, *args, **kwargs): def _get_function_body_without_inners(func): source_lines = inspect.getsourcelines(func)[0] - source_lines = dropwhile(lambda x: x.strip().startswith("@"), source_lines) + source_lines = dropwhile(lambda x: x.startswith("@"), source_lines) first_def_line = next(source_lines).strip() if first_def_line.startswith("def "): last_def_line_without_comment = first_def_line.split("#")[0] \ From 52f59ff429bced3b7cf6d98790c78656506dd511 Mon Sep 17 00:00:00 2001 From: DKER2 Date: Sun, 17 Aug 2025 21:14:53 +0800 Subject: [PATCH 11/20] fix: unexpected error when transform two pcoll --- .../apache_beam/dataframe/transforms.py | 5 ++- .../apache_beam/dataframe/transforms_test.py | 39 +++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/transforms.py b/sdks/python/apache_beam/dataframe/transforms.py index 852b49c4e2ed..643fb00d443b 100644 --- a/sdks/python/apache_beam/dataframe/transforms.py +++ b/sdks/python/apache_beam/dataframe/transforms.py @@ -114,8 +114,9 @@ def expand(self, input_pcolls): for tag in input_dict } input_frames = { - k: convert.to_dataframe(pc, proxies[k]) - for k, pc in input_dict.items() + k: convert.to_dataframe(pc, proxies[k], str(k)) + for k, + pc in input_dict.items() } # type: Dict[Any, DeferredFrame] # noqa: F821 # Apply the function. diff --git a/sdks/python/apache_beam/dataframe/transforms_test.py b/sdks/python/apache_beam/dataframe/transforms_test.py index a143606cc913..4e6ee050d9c0 100644 --- a/sdks/python/apache_beam/dataframe/transforms_test.py +++ b/sdks/python/apache_beam/dataframe/transforms_test.py @@ -318,6 +318,45 @@ def check(actual): lambda x: {'res': 3 * x}, proxy, yield_elements='pandas') assert_that(res['res'], equal_to_series(three_series), 'CheckDictOut') + def test_multiple_dataframe_transforms(self): + # Define test data + data1 = [ + beam.Row(id=1, name="abc"), + beam.Row(id=2, name="def"), + beam.Row(id=3, name="ghi") + ] + data2 = [ + beam.Row(addr="addr1"), beam.Row(addr="addr2"), beam.Row(addr="addr3") + ] + + # Create a TestPipeline + with beam.Pipeline() as p: + # Create PCollections for testing + pcol1 = p | "Create1" >> beam.Create(data1) + pcol2 = p | "Create2" >> beam.Create(data2) + + # Apply the DataframeTransform to the PCollections + pcol = ({ + "a": pcol1, "b": pcol2 + } + | "TransformedDF" >> transforms.DataframeTransform( + lambda a, b: a.assign(addr="addr-common"))) + + # Assert the expected output + expected_output = [ + { + "id": 1, "name": "abc", "addr": "addr-common" + }, + { + "id": 2, "name": "def", "addr": "addr-common" + }, + { + "id": 3, "name": "ghi", "addr": "addr-common" + }, + ] + assert_that(pcol | "Map" >> beam.Map(lambda row: row.asdict())) \ + .equal_to(expected_output) + def test_cat(self): # verify that cat works with a List[Series] since this is # missing from doctests From 69ed0856131ea2398ca8bab1815534bfc58a072b Mon Sep 17 00:00:00 2001 From: DKER2 Date: Sun, 17 Aug 2025 21:17:57 +0800 Subject: [PATCH 12/20] revert redundant --- sdks/python/apache_beam/transforms/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 256dbd87ddba..a3762adac0cb 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1504,8 +1504,8 @@ def _check_fn_use_yield_and_return(fn): has_yield = False has_return = False return_none_warning = ( - f"No iterator is returned by the " - f"process method in {fn.__self__.__class__}.") + "No iterator is returned by the process method in %s.", + fn.__self__.__class__) for line in source_code.split("\n"): lstripped_line = line.lstrip() if lstripped_line.startswith("yield ") or lstripped_line.startswith( From d19483885b2fe14d4fe275752794574a07392c6e Mon Sep 17 00:00:00 2001 From: DKER2 Date: Wed, 20 Aug 2025 21:57:29 +0800 Subject: [PATCH 13/20] fix test --- .../apache_beam/dataframe/transforms_test.py | 51 +++++++------------ 1 file changed, 18 insertions(+), 33 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/transforms_test.py b/sdks/python/apache_beam/dataframe/transforms_test.py index 56d578061484..f776375ca2b7 100644 --- a/sdks/python/apache_beam/dataframe/transforms_test.py +++ b/sdks/python/apache_beam/dataframe/transforms_test.py @@ -317,44 +317,29 @@ def check(actual): lambda x: {'res': 3 * x}, proxy, yield_elements='pandas') assert_that(res['res'], equal_to_series(three_series), 'CheckDictOut') - def test_multiple_dataframe_transforms(self): - # Define test data - data1 = [ - beam.Row(id=1, name="abc"), - beam.Row(id=2, name="def"), - beam.Row(id=3, name="ghi") - ] - data2 = [ - beam.Row(addr="addr1"), beam.Row(addr="addr2"), beam.Row(addr="addr3") - ] - - # Create a TestPipeline + def test_multiple_dataframes_transforms(self): + expected_output = [{ + "id": 1, "name": "Bryan", "addr": "addr-common" + }, { + "id": 2, "name": "DKER2", "addr": "addr-common" + }] + + def transform_func(a, b): + a["addr"] = "addr-common" + with beam.Pipeline() as p: - # Create PCollections for testing - pcol1 = p | "Create1" >> beam.Create(data1) - pcol2 = p | "Create2" >> beam.Create(data2) + pcol1 = p | "Create1" >> beam.Create( + [beam.Row(id=1, name="Bryan"), beam.Row(id=2, name="DKER2")]) + pcol2 = p | "Create2" >> beam.Create( + [beam.Row(addr="addr1"), beam.Row(addr="addr2")]) - # Apply the DataframeTransform to the PCollections pcol = ({ "a": pcol1, "b": pcol2 } - | "TransformedDF" >> transforms.DataframeTransform( - lambda a, b: a.assign(addr="addr-common"))) - - # Assert the expected output - expected_output = [ - { - "id": 1, "name": "abc", "addr": "addr-common" - }, - { - "id": 2, "name": "def", "addr": "addr-common" - }, - { - "id": 3, "name": "ghi", "addr": "addr-common" - }, - ] - assert_that(pcol | "Map" >> beam.Map(lambda row: row.asdict())) \ - .equal_to(expected_output) + | + "TransformedDF" >> transforms.DataframeTransform(transform_func)) + + assert_that(pcol, equal_to(expected_output)) def test_cat(self): # verify that cat works with a List[Series] since this is From 4acae6974a694c52bea71e59d4df83b7d44af8c6 Mon Sep 17 00:00:00 2001 From: DKER2 Date: Wed, 20 Aug 2025 21:59:42 +0800 Subject: [PATCH 14/20] reformat file --- sdks/python/apache_beam/dataframe/transforms.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/transforms.py b/sdks/python/apache_beam/dataframe/transforms.py index 66d0773ef719..49fe881ec8e7 100644 --- a/sdks/python/apache_beam/dataframe/transforms.py +++ b/sdks/python/apache_beam/dataframe/transforms.py @@ -109,8 +109,7 @@ def expand(self, input_pcolls): } input_frames: dict[Any, frame_base.DeferredFrame] = { k: convert.to_dataframe(pc, proxies[k], str(k)) - for k, - pc in input_dict.items() + for k, pc in input_dict.items() } # noqa: F821 # Apply the function. From 12c4973db7b6576f7f82fe9a97a543b0d1e76c4d Mon Sep 17 00:00:00 2001 From: DKER2 Date: Wed, 20 Aug 2025 22:00:38 +0800 Subject: [PATCH 15/20] simply change test case --- sdks/python/apache_beam/dataframe/transforms_test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/transforms_test.py b/sdks/python/apache_beam/dataframe/transforms_test.py index f776375ca2b7..2629b55f4fbe 100644 --- a/sdks/python/apache_beam/dataframe/transforms_test.py +++ b/sdks/python/apache_beam/dataframe/transforms_test.py @@ -319,9 +319,9 @@ def check(actual): def test_multiple_dataframes_transforms(self): expected_output = [{ - "id": 1, "name": "Bryan", "addr": "addr-common" + "name": "Bryan", "addr": "addr-common" }, { - "id": 2, "name": "DKER2", "addr": "addr-common" + "name": "DKER2", "addr": "addr-common" }] def transform_func(a, b): @@ -329,7 +329,7 @@ def transform_func(a, b): with beam.Pipeline() as p: pcol1 = p | "Create1" >> beam.Create( - [beam.Row(id=1, name="Bryan"), beam.Row(id=2, name="DKER2")]) + [beam.Row(name="Bryan"), beam.Row(name="DKER2")]) pcol2 = p | "Create2" >> beam.Create( [beam.Row(addr="addr1"), beam.Row(addr="addr2")]) From d5f052c84118af42b3fbe6aa3abc6cffd304ffae Mon Sep 17 00:00:00 2001 From: DKER2 Date: Wed, 20 Aug 2025 23:49:24 +0800 Subject: [PATCH 16/20] change test case --- .../apache_beam/dataframe/transforms_test.py | 32 ++++++++----------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/transforms_test.py b/sdks/python/apache_beam/dataframe/transforms_test.py index 2629b55f4fbe..04bab5a76384 100644 --- a/sdks/python/apache_beam/dataframe/transforms_test.py +++ b/sdks/python/apache_beam/dataframe/transforms_test.py @@ -318,28 +318,24 @@ def check(actual): assert_that(res['res'], equal_to_series(three_series), 'CheckDictOut') def test_multiple_dataframes_transforms(self): - expected_output = [{ - "name": "Bryan", "addr": "addr-common" - }, { - "name": "DKER2", "addr": "addr-common" - }] + expected_output = ["Bryan", "DKER2"] def transform_func(a, b): - a["addr"] = "addr-common" + b["name"] = "DKER2" + return a[["name"]], b[["name"]] with beam.Pipeline() as p: - pcol1 = p | "Create1" >> beam.Create( - [beam.Row(name="Bryan"), beam.Row(name="DKER2")]) - pcol2 = p | "Create2" >> beam.Create( - [beam.Row(addr="addr1"), beam.Row(addr="addr2")]) - - pcol = ({ - "a": pcol1, "b": pcol2 - } - | - "TransformedDF" >> transforms.DataframeTransform(transform_func)) - - assert_that(pcol, equal_to(expected_output)) + pcol1 = p | "Create1" >> beam.Create([beam.Row(name="Bryan")]) + pcol2 = p | "Create2" >> beam.Create([beam.Row(addr="addr1")]) + + result = ( + { + "a": pcol1, "b": pcol2 + } + | "TransformedDF" >> transforms.DataframeTransform(transform_func) + | "FlattenAB" >> beam.Flatten() + | transforms.DataframeTransform(lambda df: df.name)) + assert_that(result, equal_to(expected_output)) def test_cat(self): # verify that cat works with a List[Series] since this is From 91933398caeb22c70b99d38c2db167d49b463b12 Mon Sep 17 00:00:00 2001 From: DKER2 Date: Wed, 20 Aug 2025 23:52:20 +0800 Subject: [PATCH 17/20] change test case --- sdks/python/apache_beam/dataframe/transforms_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/transforms_test.py b/sdks/python/apache_beam/dataframe/transforms_test.py index 04bab5a76384..c0f08ed5dc22 100644 --- a/sdks/python/apache_beam/dataframe/transforms_test.py +++ b/sdks/python/apache_beam/dataframe/transforms_test.py @@ -322,11 +322,11 @@ def test_multiple_dataframes_transforms(self): def transform_func(a, b): b["name"] = "DKER2" - return a[["name"]], b[["name"]] + return a, b with beam.Pipeline() as p: pcol1 = p | "Create1" >> beam.Create([beam.Row(name="Bryan")]) - pcol2 = p | "Create2" >> beam.Create([beam.Row(addr="addr1")]) + pcol2 = p | "Create2" >> beam.Create([beam.Row(name="common")]) result = ( { From f1e5fd74ac33e56e9c6fd979ca9c28fd1f15b550 Mon Sep 17 00:00:00 2001 From: DKER2 Date: Sat, 23 Aug 2025 11:13:45 +0800 Subject: [PATCH 18/20] retrigger test --- .../apache_beam/dataframe/transforms_test.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/transforms_test.py b/sdks/python/apache_beam/dataframe/transforms_test.py index c0f08ed5dc22..c5ca2b9a359c 100644 --- a/sdks/python/apache_beam/dataframe/transforms_test.py +++ b/sdks/python/apache_beam/dataframe/transforms_test.py @@ -328,13 +328,13 @@ def transform_func(a, b): pcol1 = p | "Create1" >> beam.Create([beam.Row(name="Bryan")]) pcol2 = p | "Create2" >> beam.Create([beam.Row(name="common")]) - result = ( - { - "a": pcol1, "b": pcol2 - } - | "TransformedDF" >> transforms.DataframeTransform(transform_func) - | "FlattenAB" >> beam.Flatten() - | transforms.DataframeTransform(lambda df: df.name)) + result = ({ + "a": pcol1, "b": pcol2 + } + | + "TransformDF" >> transforms.DataframeTransform(transform_func) + | "Flatten" >> beam.Flatten() + | transforms.DataframeTransform(lambda df: df.name)) assert_that(result, equal_to(expected_output)) def test_cat(self): From d3ceae7430fef0d7c0665ffb4418850cdaf84e40 Mon Sep 17 00:00:00 2001 From: DKER2 Date: Thu, 4 Sep 2025 22:23:53 +0800 Subject: [PATCH 19/20] update change.md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 0394882d8a7a..63ee60e32bc6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -142,6 +142,7 @@ ## Bugfixes * (Python) Fixed Java YAML provider fails on Windows ([#35617](https://github.com/apache/beam/issues/35617)). +* (Python) Fixed transform naming conflict when executing DataTransform on a dictionary of PColl ([#30445][https://github.com/apache/beam/issues/30445]) * Fixed BigQueryIO creating temporary datasets in wrong project when temp_dataset is specified with a different project than the pipeline project. For some jobs, temporary datasets will now be created in the correct project (Python) ([#35813](https://github.com/apache/beam/issues/35813)). * (Go) Fix duplicates due to reads after blind writes to Bag State ([#35869](https://github.com/apache/beam/issues/35869)). * Earlier Go SDK versions can avoid the issue by not reading in the same call after a blind write. From a9665055a47d6342f306163dd2fd6cbdd85f163f Mon Sep 17 00:00:00 2001 From: DKER2 Date: Thu, 4 Sep 2025 22:25:12 +0800 Subject: [PATCH 20/20] update change.md --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 63ee60e32bc6..5fa6c0cb4910 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -142,7 +142,7 @@ ## Bugfixes * (Python) Fixed Java YAML provider fails on Windows ([#35617](https://github.com/apache/beam/issues/35617)). -* (Python) Fixed transform naming conflict when executing DataTransform on a dictionary of PColl ([#30445][https://github.com/apache/beam/issues/30445]) +* (Python) Fixed transform naming conflict when executing DataTransform on a dictionary of PColls ([#30445][https://github.com/apache/beam/issues/30445]) * Fixed BigQueryIO creating temporary datasets in wrong project when temp_dataset is specified with a different project than the pipeline project. For some jobs, temporary datasets will now be created in the correct project (Python) ([#35813](https://github.com/apache/beam/issues/35813)). * (Go) Fix duplicates due to reads after blind writes to Bag State ([#35869](https://github.com/apache/beam/issues/35869)). * Earlier Go SDK versions can avoid the issue by not reading in the same call after a blind write.