From f777002c1fe75fa09f64be0784adc46aa0d25443 Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Tue, 19 Aug 2025 10:19:13 -0400 Subject: [PATCH 01/14] implement lambda name pickling in cloudpickle --- .../internal/cloudpickle/cloudpickle.py | 20 +++++++++++-- .../internal/cloudpickle_pickler.py | 4 ++- .../internal/code_object_pickler.py | 29 ++++++++++++++----- .../apache_beam/internal/dill_pickler.py | 7 ++++- sdks/python/apache_beam/internal/pickler.py | 6 ++-- .../apache_beam/internal/pickler_test.py | 16 ++++++++++ .../apache_beam/runners/pipeline_context.py | 1 + .../apache_beam/transforms/ptransform.py | 2 ++ 8 files changed, 70 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py index 5a9d89430fd3..46d2cf01f6ae 100644 --- a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py +++ b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py @@ -526,6 +526,11 @@ def _make_function(code, globals, name, argdefs, closure): return types.FunctionType(code, globals, name, argdefs, closure) +def _make_function_from_identifier(code_path, globals, name, argdefs, closure): + fcode = get_code_from_identifier(code_path) + return _make_function(fcode, globals, name, argdefs, closure) + + def _make_empty_cell(): if False: # trick the compiler into creating an empty cell in our lambda @@ -1266,7 +1271,11 @@ def _dynamic_function_reduce(self, func): """Reduce a function that is not pickleable via attribute lookup.""" newargs = self._function_getnewargs(func) state = _function_getstate(func) - return (_make_function, newargs, state, None, None, _function_setstate) + if type(newargs[0]) == str: + make_function - _make_function_from_identifier + else: + make_function = _make_function + return (make_function, newargs, state, None, None, _function_setstate) def _function_reduce(self, obj): """Reducer for function objects. @@ -1283,6 +1292,7 @@ def _function_reduce(self, obj): return self._dynamic_function_reduce(obj) def _function_getnewargs(self, func): + code_path = get_code_object_indentifier(func) if self.enable_lambda_name else None code = func.__code__ # base_globals represents the future global namespace of func at @@ -1313,7 +1323,10 @@ def _function_getnewargs(self, func): else: closure = tuple(_make_empty_cell() for _ in range(len(code.co_freevars))) - return code, base_globals, None, None, closure + if code_path: + return code_path, base_globals, None, None, closure + else: + return code, base_globals, None, None, closure def dump(self, obj): try: @@ -1326,7 +1339,8 @@ def dump(self, obj): raise def __init__( - self, file, protocol=None, buffer_callback=None, config=DEFAULT_CONFIG): + self, file, protocol=None, buffer_callback=None, config=DEFAULT_CONFIG, + enable_lambda_name=False): if protocol is None: protocol = DEFAULT_PROTOCOL super().__init__(file, protocol=protocol, buffer_callback=buffer_callback) diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index 63038e770f27..757d6a19682c 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -117,6 +117,7 @@ def dumps( enable_trace=True, use_zlib=False, enable_best_effort_determinism=False, + enable_lambda_name=False, config: cloudpickle.CloudPickleConfig = DEFAULT_CONFIG) -> bytes: """For internal use only; no backwards-compatibility guarantees.""" if enable_best_effort_determinism: @@ -127,7 +128,8 @@ def dumps( 'This has only been implemented for dill.') with _pickle_lock: with io.BytesIO() as file: - pickler = cloudpickle.CloudPickler(file, config=config) + pickler = cloudpickle.CloudPickler( + file, config=config, enable_lambda_name=enable_lambda_name) try: pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags except NameError: diff --git a/sdks/python/apache_beam/internal/code_object_pickler.py b/sdks/python/apache_beam/internal/code_object_pickler.py index b6ea015cc06f..0d0f4a3005fe 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler.py +++ b/sdks/python/apache_beam/internal/code_object_pickler.py @@ -315,10 +315,10 @@ def _search_lambda( _SINGLE_NAME_PATTERN = re.compile(r'co_consts\[([a-zA-Z0-9\<\>_-]+)]') # Matches a path like: co_consts[, ('x',)] _LAMBDA_WITH_ARGS_PATTERN = re.compile( - r"co_consts\[(<[^>]+>),\s*(\('[^']*'\s*,\s*\))\]") + r"co_consts\[(<.*?>),\s(\('[^']+'(?:,\s*'[^']+')*,?\))\]") # Matches a path like: co_consts[, ('x',), 1234567890] _LAMBDA_WITH_HASH_PATTERN = re.compile( - r"co_consts\[(<[^>]+>),\s*(\('[^']*'\s*,\s*\)),\s*(.+)\]") + r"co_consts\[(<[^>]+>),\s*(\([^\)]*\)),?\s*(.*)\]") # Matches a path like: __defaults__[0] _DEFAULT_PATTERN = re.compile(r'(__defaults__)\[(\d+)\]') # Matches an argument like: 'x' @@ -375,7 +375,7 @@ def _get_code_object_from_lambda_with_args_pattern( for obj_ in objects: args = tuple( re.findall(_ARGUMENT_PATTERN, lambda_with_args_result.group(2))) - if obj_.co_varnames == args: + if obj_.co_varnames[:_get_arg_count(obj_)] == args: return obj_ raise AttributeError(f'Could not find code object with path: {path}') @@ -404,7 +404,7 @@ def _get_code_object_from_lambda_with_hash_pattern( for obj_ in objects: args = tuple( re.findall(_ARGUMENT_PATTERN, lambda_with_hash_result.group(2))) - if obj_.co_varnames == args: + if obj_.co_varnames[:_get_arg_count(obj_)] == args: hash_value = lambda_with_hash_result.group(3) if hash_value == str(_create_bytecode_hash(obj_)): return obj_ @@ -462,12 +462,25 @@ def _signature(obj: types.CodeType): Returns: A tuple of the names of the arguments of the code object. """ - arg_count = ( - obj.co_argcount + obj.co_kwonlyargcount + - (obj.co_flags & 4 == 4) # PyCF_VARARGS + return obj.co_varnames[:_get_arg_count(obj)] + + +def _get_arg_count(obj: types.CodeType): + """Returns the number of arguments of a code object. + + Args: + obj: A code object, function, method, or cell. + + Returns: + The number of arguments of the code object, or None if the object is not a + code object. + """ + return ( + obj.co_argcount + + obj.co_kwonlyargcount + + (obj.co_flags & 4 == 4) # PyCF_VARARGS + (obj.co_flags & 8 == 8) # PyCF_VARKEYWORDS ) - return obj.co_varnames[:arg_count] def _create_bytecode_hash(code_object: types.CodeType): diff --git a/sdks/python/apache_beam/internal/dill_pickler.py b/sdks/python/apache_beam/internal/dill_pickler.py index 9a3d43826610..bbfc50546640 100644 --- a/sdks/python/apache_beam/internal/dill_pickler.py +++ b/sdks/python/apache_beam/internal/dill_pickler.py @@ -379,8 +379,13 @@ def dumps( o, enable_trace=True, use_zlib=False, - enable_best_effort_determinism=False) -> bytes: + enable_best_effort_determinism=False, + enable_lambda_name=False) -> bytes: """For internal use only; no backwards-compatibility guarantees.""" + if enable_lambda_name: + logging.info( + 'Ignoring unsupported option: enable_lambda_name. ' + 'This has only been implemented for CloudPickle.') with _pickle_lock: if enable_best_effort_determinism: old_save_set = dill.dill.Pickler.dispatch[set] diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 256f88c5453f..d43659df1b54 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -42,13 +42,15 @@ def dumps( o, enable_trace=True, use_zlib=False, - enable_best_effort_determinism=False) -> bytes: + enable_best_effort_determinism=False, + enable_lambda_name=False) -> bytes: return desired_pickle_lib.dumps( o, enable_trace=enable_trace, use_zlib=use_zlib, - enable_best_effort_determinism=enable_best_effort_determinism) + enable_best_effort_determinism=enable_best_effort_determinism, + enable_lambda_name=enable_lambda_name) def loads(encoded, enable_trace=True, use_zlib=False): diff --git a/sdks/python/apache_beam/internal/pickler_test.py b/sdks/python/apache_beam/internal/pickler_test.py index 7048f680de87..1f4d15696338 100644 --- a/sdks/python/apache_beam/internal/pickler_test.py +++ b/sdks/python/apache_beam/internal/pickler_test.py @@ -34,6 +34,10 @@ from apache_beam.internal.pickler import loads +def pickle_depickle(obj, enable_lambda_name): + return loads(dumps(obj, enable_lambda_name=enable_lambda_name)) + + class PicklerTest(unittest.TestCase): NO_MAPPINGPROXYTYPE = not hasattr(types, "MappingProxyType") @@ -279,5 +283,17 @@ def test_disable_best_effort_determinism(self): dumps(set2, enable_best_effort_determinism=False)) + def test_enable_lambda_name_pickling(self): + pickler.set_library('cloudpickle') + pickled = pickle_depickle(lambda x: x, enable_lambda_name=True) + pickled_type = type(pickled) + self.assertIsInstance(pickled, pickled_type) + + def test_disable_lambda_name_pickling(self): + pickler.set_library('cloudpickle') + pickled = pickle_depickle(lambda x: x, enable_lambda_name=False) + pickled_type = type(pickled) + self.assertIsInstance(pickled, pickled_type) + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py index 132a1aedca33..459c0066c3e9 100644 --- a/sdks/python/apache_beam/runners/pipeline_context.py +++ b/sdks/python/apache_beam/runners/pipeline_context.py @@ -227,6 +227,7 @@ def __init__( self.iterable_state_write = iterable_state_write self._requirements = set(requirements) self.enable_best_effort_deterministic_pickling = False + self.enable_lambda_name_pickling = False def add_requirement(self, requirement: str) -> None: self._requirements.add(requirement) diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index d2cf836713fb..d61cb02a419c 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -792,6 +792,8 @@ def to_runner_api_pickled(self, context): self, enable_best_effort_determinism=context. enable_best_effort_deterministic_pickling, + enable_lambda_name=context. + enable_lambda_name_pickling, ), ) From d1722992ce75b92849af716028a38b76ddc97a62 Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Tue, 19 Aug 2025 10:33:13 -0400 Subject: [PATCH 02/14] add enable_lambda_name to __init__ --- sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py index 46d2cf01f6ae..dd0a732cca32 100644 --- a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py +++ b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py @@ -1350,6 +1350,7 @@ def __init__( self.globals_ref = {} self.proto = int(protocol) self.config = config + self.enable_lambda_name=enable_lambda_name if not PYPY: # pickle.Pickler is the C implementation of the CPython pickler and From 1f9725e17cb35eab91d85d7a1b7d61fb45317d2c Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Tue, 19 Aug 2025 11:43:32 -0400 Subject: [PATCH 03/14] fix formatting and lint --- .../internal/cloudpickle/cloudpickle.py | 16 ++++++++++++---- .../apache_beam/internal/cloudpickle_pickler.py | 2 +- .../apache_beam/internal/code_object_pickler.py | 7 +++---- sdks/python/apache_beam/internal/pickler_test.py | 2 +- sdks/python/apache_beam/transforms/ptransform.py | 3 +-- 5 files changed, 18 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py index dd0a732cca32..61c319b5fcf5 100644 --- a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py +++ b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py @@ -78,6 +78,9 @@ import warnings import weakref +from apache_beam.internal.code_object_pickler import get_code_from_identifier +from apache_beam.internal.code_object_pickler import get_code_object_identifier + # The following import is required to be imported in the cloudpickle # namespace to be able to load pickle files generated with older versions of # cloudpickle. See: tests/test_backward_compat.py @@ -1272,7 +1275,7 @@ def _dynamic_function_reduce(self, func): newargs = self._function_getnewargs(func) state = _function_getstate(func) if type(newargs[0]) == str: - make_function - _make_function_from_identifier + make_function = _make_function_from_identifier else: make_function = _make_function return (make_function, newargs, state, None, None, _function_setstate) @@ -1292,7 +1295,8 @@ def _function_reduce(self, obj): return self._dynamic_function_reduce(obj) def _function_getnewargs(self, func): - code_path = get_code_object_indentifier(func) if self.enable_lambda_name else None + code_path = get_code_object_indentifier( + func) if self.enable_lambda_name else None code = func.__code__ # base_globals represents the future global namespace of func at @@ -1339,7 +1343,11 @@ def dump(self, obj): raise def __init__( - self, file, protocol=None, buffer_callback=None, config=DEFAULT_CONFIG, + self, + file, + protocol=None, + buffer_callback=None, + config=DEFAULT_CONFIG, enable_lambda_name=False): if protocol is None: protocol = DEFAULT_PROTOCOL @@ -1350,7 +1358,7 @@ def __init__( self.globals_ref = {} self.proto = int(protocol) self.config = config - self.enable_lambda_name=enable_lambda_name + self.enable_lambda_name = enable_lambda_name if not PYPY: # pickle.Pickler is the C implementation of the CPython pickler and diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index 757d6a19682c..b9affffdc5c3 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -129,7 +129,7 @@ def dumps( with _pickle_lock: with io.BytesIO() as file: pickler = cloudpickle.CloudPickler( - file, config=config, enable_lambda_name=enable_lambda_name) + file, config=config, enable_lambda_name=enable_lambda_name) try: pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags except NameError: diff --git a/sdks/python/apache_beam/internal/code_object_pickler.py b/sdks/python/apache_beam/internal/code_object_pickler.py index 0d0f4a3005fe..f1e035c938e0 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler.py +++ b/sdks/python/apache_beam/internal/code_object_pickler.py @@ -19,7 +19,7 @@ This module provides helper functions to improve pickling code objects, especially lambdas, in a consistent way by using code object identifiers. These -helper functions will be used to patch pickler implementations used by Beam +helper functions are used to patch pickler implementations used by Beam (e.g. Cloudpickle). A code object identifier is a unique identifier for a code object that provides @@ -476,9 +476,8 @@ def _get_arg_count(obj: types.CodeType): code object. """ return ( - obj.co_argcount - + obj.co_kwonlyargcount - + (obj.co_flags & 4 == 4) # PyCF_VARARGS + obj.co_argcount + obj.co_kwonlyargcount + + (obj.co_flags & 4 == 4) # PyCF_VARARGS + (obj.co_flags & 8 == 8) # PyCF_VARKEYWORDS ) diff --git a/sdks/python/apache_beam/internal/pickler_test.py b/sdks/python/apache_beam/internal/pickler_test.py index 1f4d15696338..33408f9a39dc 100644 --- a/sdks/python/apache_beam/internal/pickler_test.py +++ b/sdks/python/apache_beam/internal/pickler_test.py @@ -282,7 +282,6 @@ def test_disable_best_effort_determinism(self): dumps(set1, enable_best_effort_determinism=False), dumps(set2, enable_best_effort_determinism=False)) - def test_enable_lambda_name_pickling(self): pickler.set_library('cloudpickle') pickled = pickle_depickle(lambda x: x, enable_lambda_name=True) @@ -295,5 +294,6 @@ def test_disable_lambda_name_pickling(self): pickled_type = type(pickled) self.assertIsInstance(pickled, pickled_type) + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index d61cb02a419c..8dd5641f41c9 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -792,8 +792,7 @@ def to_runner_api_pickled(self, context): self, enable_best_effort_determinism=context. enable_best_effort_deterministic_pickling, - enable_lambda_name=context. - enable_lambda_name_pickling, + enable_lambda_name=context.enable_lambda_name_pickling, ), ) From a9e6652cc06ee2cd10a1b98f5ab064e217659160 Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Wed, 20 Aug 2025 12:03:27 -0400 Subject: [PATCH 04/14] fix typo --- sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py index 61c319b5fcf5..3cdae6105767 100644 --- a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py +++ b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py @@ -1295,7 +1295,7 @@ def _function_reduce(self, obj): return self._dynamic_function_reduce(obj) def _function_getnewargs(self, func): - code_path = get_code_object_indentifier( + code_path = get_code_object_identifier( func) if self.enable_lambda_name else None code = func.__code__ From c5fa8318772c4d10c281b7f732b7e765dfcfaab8 Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Wed, 20 Aug 2025 13:46:12 -0400 Subject: [PATCH 05/14] fix code paths in test --- .../internal/code_object_pickler_test.py | 57 ++++++++++--------- 1 file changed, 30 insertions(+), 27 deletions(-) diff --git a/sdks/python/apache_beam/internal/code_object_pickler_test.py b/sdks/python/apache_beam/internal/code_object_pickler_test.py index 2060533e9328..df2dfff1d889 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler_test.py +++ b/sdks/python/apache_beam/internal/code_object_pickler_test.py @@ -126,30 +126,33 @@ def get_lambda_from_dictionary(): return get_lambda_from_dictionary() +prefix = ("__main__" if __name__ == "__main__" else + "apache_beam.internal.code_object_pickler_test") + test_cases = [ ( top_level_function, - "apache_beam.internal.code_object_pickler_test.top_level_function" + f"{prefix}.top_level_function" ".__code__"), ( top_level_lambda, - "apache_beam.internal.code_object_pickler_test.top_level_lambda" + f"{prefix}.top_level_lambda" ".__code__"), ( get_nested_function(), ( - "apache_beam.internal.code_object_pickler_test.get_nested_function" + f"{prefix}.get_nested_function" ".__code__.co_consts[nested_function]")), ( get_lambda_from_dictionary(), ( - "apache_beam.internal.code_object_pickler_test" + f"{prefix} ".get_lambda_from_dictionary.__code__.co_consts[, ('x',)]") ), ( get_lambda_from_dictionary_same_args(), ( - "apache_beam.internal.code_object_pickler_test" + f"{prefix} ".get_lambda_from_dictionary_same_args.__code__.co_consts" "[, ('x',), " + hashlib.md5( get_lambda_from_dictionary_same_args().__code__.co_code). @@ -157,51 +160,51 @@ def get_lambda_from_dictionary(): ( function_with_lambda_default_argument(), ( - "apache_beam.internal.code_object_pickler_test" + f"{prefix} ".function_with_lambda_default_argument.__defaults__[0].__code__")), ( function_with_function_default_argument(), - "apache_beam.internal.code_object_pickler_test.top_level_function" + f"{prefix}.top_level_function" ".__code__"), ( add_one, - "apache_beam.internal.code_object_pickler_test.function_decorator" + f"{prefix}.function_decorator" ".__code__.co_consts[]"), ( ClassWithFunction.process, - "apache_beam.internal.code_object_pickler_test.ClassWithFunction" + f"{prefix}.ClassWithFunction" ".process.__code__"), ( ClassWithStaticMethod.static_method, - "apache_beam.internal.code_object_pickler_test.ClassWithStaticMethod" + f"{prefix}.ClassWithStaticMethod" ".static_method.__code__"), ( ClassWithClassMethod.class_method, - "apache_beam.internal.code_object_pickler_test.ClassWithClassMethod" + f"{prefix}.ClassWithClassMethod" ".class_method.__code__"), ( ClassWithNestedFunction().process(), ( - "apache_beam.internal.code_object_pickler_test" + f"{prefix} ".ClassWithNestedFunction.process.__code__.co_consts" "[nested_function]")), ( ClassWithLambda().process(), - "apache_beam.internal.code_object_pickler_test.ClassWithLambda.process" + f"{prefix}.ClassWithLambda.process" ".__code__.co_consts[]"), ( ClassWithNestedClass.InnerClass().process, - "apache_beam.internal.code_object_pickler_test.ClassWithNestedClass" + f"{prefix}ClassWithNestedClass" ".InnerClass.process.__code__"), ( ClassWithNestedLambda().process(), ( - "apache_beam.internal.code_object_pickler_test" + f"{prefix} ".ClassWithNestedLambda.process.__code__.co_consts" "[get_lambda_from_dictionary].co_consts[, ('x',)]")), ( ClassWithNestedLambda.process, - "apache_beam.internal.code_object_pickler_test.ClassWithNestedLambda" + f"{prefix}.ClassWithNestedLambda" ".process.__code__"), ] @@ -225,35 +228,35 @@ def test_roundtrip(self, callable, unused_path): class GetCodeFromCodeObjectIdentifierTest(unittest.TestCase): - def empty_path_raises_exception(self): + def test_empty_path_raises_exception(self): with self.assertRaisesRegex(ValueError, "Path must not be empty"): - code_object_pickler.test_get_code_from_identifier("") + code_object_pickler.get_code_from_identifier("") - def invalid_default_index_raises_exception(self): + def test_invalid_default_index_raises_exception(self): with self.assertRaisesRegex(ValueError, "out of bounds"): - code_object_pickler.test_get_code_from_identifier( - "apache_beam.internal.test_cases.module_with_default_argument." + code_object_pickler.get_code_from_identifier( + "apache_beam.internal.test_data.module_with_default_argument." "function_with_lambda_default_argument.__defaults__[1]") - def invalid_single_name_path_raises_exception(self): + def test_invalid_single_name_path_raises_exception(self): with self.assertRaisesRegex(AttributeError, "Could not find code object with path"): code_object_pickler.get_code_from_identifier( - "apache_beam.internal.test_cases.module_3." + "apache_beam.internal.test_data.module_3." "my_function.__code__.co_consts[something]") - def invalid_lambda_with_args_path_raises_exception(self): + def test_invalid_lambda_with_args_path_raises_exception(self): with self.assertRaisesRegex(AttributeError, "Could not find code object with path"): code_object_pickler.get_code_from_identifier( - "apache_beam.internal.test_cases.module_3." + "apache_beam.internal.test_data.module_3." "my_function.__code__.co_consts[, ('x',)]") - def invalid_lambda_with_hash_path_raises_exception(self): + def test_invalid_lambda_with_hash_path_raises_exception(self): with self.assertRaisesRegex(AttributeError, "Could not find code object with path"): code_object_pickler.get_code_from_identifier( - "apache_beam.internal.test_cases.module_3." + "apache_beam.internal.test_data.module_3." "my_function.__code__.co_consts[, ('',), 1234567890]") def test_adding_local_variable_in_class_preserves_object(self): From c32c32f7bb6cf1e06a3f173af4268493d1c1ea5d Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Wed, 20 Aug 2025 14:49:27 -0400 Subject: [PATCH 06/14] fix tests --- .../internal/code_object_pickler_test.py | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/internal/code_object_pickler_test.py b/sdks/python/apache_beam/internal/code_object_pickler_test.py index df2dfff1d889..5857e1911f37 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler_test.py +++ b/sdks/python/apache_beam/internal/code_object_pickler_test.py @@ -286,12 +286,14 @@ def test_adding_lambda_variable_in_class_preserves_object(self): module_2_modified.AddLambdaVariable.my_method(self).__code__, ) - def test_removing_lambda_variable_in_class_changes_object(self): - with self.assertRaisesRegex(AttributeError, "object has no attribute"): - code_object_pickler.get_code_from_identifier( - code_object_pickler.get_code_object_identifier( - module_2.RemoveLambdaVariable.my_method(self)).replace( - "module_2", "module_2_modified")) + def test_removing_lambda_variable_in_class_preserves_object(self): + self.assertEqual( + code_object_pickler.get_code_from_identifier( + code_object_pickler.get_code_object_identifier( + module_2.RemoveLambdaVariable.my_method(self)).replace( + "module_2", "module_2_modified")), + module_2_modified.RemoveLambdaVariable.my_method(self).__code__, + ) def test_adding_nested_function_in_class_preserves_object(self): self.assertEqual( @@ -403,11 +405,15 @@ def test_adding_lambda_variable_in_function_preserves_object(self): module_1_lambda_variable_added.my_function().__code__, ) - def test_removing_lambda_variable_in_function_raises_exception(self): - with self.assertRaisesRegex(AttributeError, "object has no attribute"): - code_object_pickler.get_code_from_identifier( - code_object_pickler.get_code_object_identifier( - module_3.my_function()).replace("module_3", "module_3_modified")) + def test_removing_lambda_variable_in_function_preserves_object(self): + self.assertEqual( + code_object_pickler.get_code_from_identifier( + code_object_pickler.get_code_object_identifier( + module_3.my_function() + ).replace("module_3", "module_3_modified") + ), + module_3_modified.my_function().__code__, + ) class CodePathStabilityTest(unittest.TestCase): From df614e0985846f607e3e78ad73c06f2df6405253 Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Wed, 20 Aug 2025 15:49:45 -0400 Subject: [PATCH 07/14] fix lint --- .../internal/code_object_pickler_test.py | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/internal/code_object_pickler_test.py b/sdks/python/apache_beam/internal/code_object_pickler_test.py index 5857e1911f37..80f53e20c1a9 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler_test.py +++ b/sdks/python/apache_beam/internal/code_object_pickler_test.py @@ -146,13 +146,13 @@ def get_lambda_from_dictionary(): ( get_lambda_from_dictionary(), ( - f"{prefix} + f"{prefix}" ".get_lambda_from_dictionary.__code__.co_consts[, ('x',)]") ), ( get_lambda_from_dictionary_same_args(), ( - f"{prefix} + f"{prefix}" ".get_lambda_from_dictionary_same_args.__code__.co_consts" "[, ('x',), " + hashlib.md5( get_lambda_from_dictionary_same_args().__code__.co_code). @@ -160,7 +160,7 @@ def get_lambda_from_dictionary(): ( function_with_lambda_default_argument(), ( - f"{prefix} + f"{prefix}" ".function_with_lambda_default_argument.__defaults__[0].__code__")), ( function_with_function_default_argument(), @@ -185,27 +185,23 @@ def get_lambda_from_dictionary(): ( ClassWithNestedFunction().process(), ( - f"{prefix} - ".ClassWithNestedFunction.process.__code__.co_consts" + f"{prefix}.ClassWithNestedFunction.process.__code__.co_consts" "[nested_function]")), ( ClassWithLambda().process(), - f"{prefix}.ClassWithLambda.process" - ".__code__.co_consts[]"), + f"{prefix}.ClassWithLambda.process.__code__.co_consts[]"), ( ClassWithNestedClass.InnerClass().process, - f"{prefix}ClassWithNestedClass" - ".InnerClass.process.__code__"), + f"{prefix}ClassWithNestedClass.InnerClass.process.__code__"), ( ClassWithNestedLambda().process(), ( - f"{prefix} + f"{prefix}" ".ClassWithNestedLambda.process.__code__.co_consts" "[get_lambda_from_dictionary].co_consts[, ('x',)]")), ( ClassWithNestedLambda.process, - f"{prefix}.ClassWithNestedLambda" - ".process.__code__"), + f"{prefix}.ClassWithNestedLambda.process.__code__"), ] From 261104a223a3eb91c152248686e0f4b3608028e0 Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Wed, 20 Aug 2025 16:44:54 -0400 Subject: [PATCH 08/14] fix formatting and failing test --- .../internal/code_object_pickler_test.py | 27 +++++++------------ 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/sdks/python/apache_beam/internal/code_object_pickler_test.py b/sdks/python/apache_beam/internal/code_object_pickler_test.py index 80f53e20c1a9..ce42407205ed 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler_test.py +++ b/sdks/python/apache_beam/internal/code_object_pickler_test.py @@ -126,21 +126,17 @@ def get_lambda_from_dictionary(): return get_lambda_from_dictionary() -prefix = ("__main__" if __name__ == "__main__" else - "apache_beam.internal.code_object_pickler_test") +prefix = ( + "__main__" if __name__ == "__main__" else + "apache_beam.internal.code_object_pickler_test") test_cases = [ - ( - top_level_function, - f"{prefix}.top_level_function" + (top_level_function, f"{prefix}.top_level_function" ".__code__"), - ( - top_level_lambda, - f"{prefix}.top_level_lambda" + (top_level_lambda, f"{prefix}.top_level_lambda" ".__code__"), ( - get_nested_function(), - ( + get_nested_function(), ( f"{prefix}.get_nested_function" ".__code__.co_consts[nested_function]")), ( @@ -166,9 +162,7 @@ def get_lambda_from_dictionary(): function_with_function_default_argument(), f"{prefix}.top_level_function" ".__code__"), - ( - add_one, - f"{prefix}.function_decorator" + (add_one, f"{prefix}.function_decorator" ".__code__.co_consts[]"), ( ClassWithFunction.process, @@ -192,7 +186,7 @@ def get_lambda_from_dictionary(): f"{prefix}.ClassWithLambda.process.__code__.co_consts[]"), ( ClassWithNestedClass.InnerClass().process, - f"{prefix}ClassWithNestedClass.InnerClass.process.__code__"), + f"{prefix}.ClassWithNestedClass.InnerClass.process.__code__"), ( ClassWithNestedLambda().process(), ( @@ -405,9 +399,8 @@ def test_removing_lambda_variable_in_function_preserves_object(self): self.assertEqual( code_object_pickler.get_code_from_identifier( code_object_pickler.get_code_object_identifier( - module_3.my_function() - ).replace("module_3", "module_3_modified") - ), + module_3.my_function()).replace( + "module_3", "module_3_modified")), module_3_modified.my_function().__code__, ) From d1618f4bbad2da81bfa1f26231973c6fe27e0117 Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Thu, 21 Aug 2025 13:22:38 -0400 Subject: [PATCH 09/14] fix formatting again --- .../apache_beam/internal/code_object_pickler_test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/internal/code_object_pickler_test.py b/sdks/python/apache_beam/internal/code_object_pickler_test.py index ce42407205ed..25b41b2b9393 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler_test.py +++ b/sdks/python/apache_beam/internal/code_object_pickler_test.py @@ -132,9 +132,9 @@ def get_lambda_from_dictionary(): test_cases = [ (top_level_function, f"{prefix}.top_level_function" - ".__code__"), + ".__code__"), (top_level_lambda, f"{prefix}.top_level_lambda" - ".__code__"), + ".__code__"), ( get_nested_function(), ( f"{prefix}.get_nested_function" @@ -163,7 +163,7 @@ def get_lambda_from_dictionary(): f"{prefix}.top_level_function" ".__code__"), (add_one, f"{prefix}.function_decorator" - ".__code__.co_consts[]"), + ".__code__.co_consts[]"), ( ClassWithFunction.process, f"{prefix}.ClassWithFunction" @@ -400,7 +400,7 @@ def test_removing_lambda_variable_in_function_preserves_object(self): code_object_pickler.get_code_from_identifier( code_object_pickler.get_code_object_identifier( module_3.my_function()).replace( - "module_3", "module_3_modified")), + "module_3", "module_3_modified")), module_3_modified.my_function().__code__, ) From 2eb780a5af2b88d1a67d707234ddaf9f2733ccd4 Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Wed, 27 Aug 2025 13:09:04 -0400 Subject: [PATCH 10/14] fix formatting --- sdks/python/apache_beam/internal/code_object_pickler_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/internal/code_object_pickler_test.py b/sdks/python/apache_beam/internal/code_object_pickler_test.py index 3fb7bdc94f8a..abe404ff02c5 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler_test.py +++ b/sdks/python/apache_beam/internal/code_object_pickler_test.py @@ -126,7 +126,6 @@ def get_lambda_from_dictionary(): return get_lambda_from_dictionary() - prefix = __name__ test_cases = [ From 577513efd23806d69131daad47062c025566c990 Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Wed, 3 Sep 2025 15:12:35 -0400 Subject: [PATCH 11/14] add conditionals for error handling --- .../internal/code_object_pickler.py | 73 ++++++++++++------- 1 file changed, 46 insertions(+), 27 deletions(-) diff --git a/sdks/python/apache_beam/internal/code_object_pickler.py b/sdks/python/apache_beam/internal/code_object_pickler.py index f1e035c938e0..c69971e562b7 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler.py +++ b/sdks/python/apache_beam/internal/code_object_pickler.py @@ -81,8 +81,12 @@ def get_code_object_identifier(callable: types.FunctionType): - __main__.ClassWithNestedLambda.process.__code__.co_consts[ , ('x',), 1234567890] """ - if not hasattr(callable, '__module__') or not hasattr(callable, - '__qualname__'): + if ( + not hasattr(callable, '__module__') + or not hasattr(callable, '__qualname__') + or not callable.__module__ + or callable.__module__ not in sys.modules + ): return None code_path: str = _extend_path( callable.__module__, @@ -100,7 +104,7 @@ def _extend_path(prefix: str, current_path: Optional[str]): Args: prefix: The prefix of the path. - suffix: The rest of the path. + current_path: The rest of the path. Returns: The extended path. @@ -189,6 +193,8 @@ def _search_module_or_class( if path is not None: return _extend_path(name, _extend_path(f'__defaults__[{i}]', path)) else: + if not hasattr(node, first_part): + return None return _extend_path( first_part, _search(callable, getattr(node, first_part), rest)) @@ -281,6 +287,8 @@ def _search_lambda( lambda_code_objects_by_name = collections.defaultdict(list) name = qual_name_parts[0] code_objects = code_objects_by_name[name] + if not code_objects: + return None if name == '': for code_object in code_objects: lambda_name = f', {_signature(code_object)}' @@ -345,9 +353,10 @@ def _get_code_object_from_single_name_pattern( raise ValueError(f'Invalid pattern for single name: {name_result.group(0)}') # Groups are indexed starting at 1, group(0) is the entire match. name = name_result.group(1) - for co_const in obj.co_consts: - if inspect.iscode(co_const) and co_const.co_name == name: - return co_const + if hasattr(obj, 'co_consts'): + for co_const in obj.co_consts: + if inspect.iscode(co_const) and co_const.co_name == name: + return co_const raise AttributeError(f'Could not find code object with path: {path}') @@ -368,15 +377,16 @@ def _get_code_object_from_lambda_with_args_pattern( """ name = lambda_with_args_result.group(1) code_objects = collections.defaultdict(list) - for co_const in obj.co_consts: - if inspect.iscode(co_const) and co_const.co_name == name: - code_objects[co_const.co_name].append(co_const) - for name, objects in code_objects.items(): - for obj_ in objects: - args = tuple( - re.findall(_ARGUMENT_PATTERN, lambda_with_args_result.group(2))) - if obj_.co_varnames[:_get_arg_count(obj_)] == args: - return obj_ + if hasattr(obj, 'co_consts'): + for co_const in obj.co_consts: + if inspect.iscode(co_const) and co_const.co_name == name: + code_objects[co_const.co_name].append(co_const) + for name, objects in code_objects.items(): + for obj_ in objects: + args = tuple( + re.findall(_ARGUMENT_PATTERN, lambda_with_args_result.group(2))) + if obj_.co_varnames[:_get_arg_count(obj_)] == args: + return obj_ raise AttributeError(f'Could not find code object with path: {path}') @@ -397,17 +407,19 @@ def _get_code_object_from_lambda_with_hash_pattern( """ name = lambda_with_hash_result.group(1) code_objects = collections.defaultdict(list) - for co_const in obj.co_consts: - if inspect.iscode(co_const) and co_const.co_name == name: - code_objects[co_const.co_name].append(co_const) - for name, objects in code_objects.items(): - for obj_ in objects: - args = tuple( - re.findall(_ARGUMENT_PATTERN, lambda_with_hash_result.group(2))) - if obj_.co_varnames[:_get_arg_count(obj_)] == args: - hash_value = lambda_with_hash_result.group(3) - if hash_value == str(_create_bytecode_hash(obj_)): - return obj_ + if hasattr(obj, 'co_consts'): + for co_const in obj.co_consts: + if inspect.iscode(co_const) and co_const.co_name == name: + code_objects[co_const.co_name].append(co_const) + for name, objects in code_objects.items(): + for obj_ in objects: + args = tuple( + re.findall(_ARGUMENT_PATTERN, lambda_with_hash_result.group(2)) + ) + if obj_.co_varnames[:_get_arg_count(obj_)] == args: + hash_value = lambda_with_hash_result.group(3) + if hash_value == str(_create_bytecode_hash(obj_)): + return obj_ raise AttributeError(f'Could not find code object with path: {path}') @@ -427,6 +439,9 @@ def get_code_from_identifier(code_object_identifier: str): if not code_object_identifier: raise ValueError('Path must not be empty.') parts = code_object_identifier.split('.') + if parts[0] not in sys.modules: + raise AttributeError( + f'Module {parts[0]} not found in sys.modules') obj = sys.modules[parts[0]] for part in parts[1:]: if name_result := _SINGLE_NAME_PATTERN.fullmatch(part): @@ -447,7 +462,11 @@ def get_code_from_identifier(code_object_identifier: str): obj = getattr(obj, '__defaults__')[index] else: obj = getattr(obj, part) - return obj + if isinstance(obj, types.CodeType): + return obj + else: + raise AttributeError( + f'Could not find code object with path: {code_object_identifier}') def _signature(obj: types.CodeType): From 42ce261bd22ea1164bb1ad86da03888d2068cc7d Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Wed, 3 Sep 2025 16:05:54 -0400 Subject: [PATCH 12/14] formatting --- .../apache_beam/internal/code_object_pickler.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/internal/code_object_pickler.py b/sdks/python/apache_beam/internal/code_object_pickler.py index c69971e562b7..698d6ca194ea 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler.py +++ b/sdks/python/apache_beam/internal/code_object_pickler.py @@ -82,10 +82,9 @@ def get_code_object_identifier(callable: types.FunctionType): , ('x',), 1234567890] """ if ( - not hasattr(callable, '__module__') - or not hasattr(callable, '__qualname__') - or not callable.__module__ - or callable.__module__ not in sys.modules + not hasattr(callable, '__module__') or + not hasattr(callable, '__qualname__')or not callable.__module__ or + callable.__module__ not in sys.modules ): return None code_path: str = _extend_path( @@ -414,8 +413,7 @@ def _get_code_object_from_lambda_with_hash_pattern( for name, objects in code_objects.items(): for obj_ in objects: args = tuple( - re.findall(_ARGUMENT_PATTERN, lambda_with_hash_result.group(2)) - ) + re.findall(_ARGUMENT_PATTERN, lambda_with_hash_result.group(2))) if obj_.co_varnames[:_get_arg_count(obj_)] == args: hash_value = lambda_with_hash_result.group(3) if hash_value == str(_create_bytecode_hash(obj_)): @@ -440,8 +438,7 @@ def get_code_from_identifier(code_object_identifier: str): raise ValueError('Path must not be empty.') parts = code_object_identifier.split('.') if parts[0] not in sys.modules: - raise AttributeError( - f'Module {parts[0]} not found in sys.modules') + raise AttributeError(f'Module {parts[0]} not found in sys.modules') obj = sys.modules[parts[0]] for part in parts[1:]: if name_result := _SINGLE_NAME_PATTERN.fullmatch(part): From e5632c1eb57a9cc2bb29da0e30c953840be101a2 Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Wed, 3 Sep 2025 16:19:09 -0400 Subject: [PATCH 13/14] fix typo --- sdks/python/apache_beam/internal/code_object_pickler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/internal/code_object_pickler.py b/sdks/python/apache_beam/internal/code_object_pickler.py index 698d6ca194ea..8237c347973b 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler.py +++ b/sdks/python/apache_beam/internal/code_object_pickler.py @@ -83,7 +83,7 @@ def get_code_object_identifier(callable: types.FunctionType): """ if ( not hasattr(callable, '__module__') or - not hasattr(callable, '__qualname__')or not callable.__module__ or + not hasattr(callable, '__qualname__') or not callable.__module__ or callable.__module__ not in sys.modules ): return None From c8e864f6ea71f2b7d83cda3dc4d95db28253ee38 Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Thu, 4 Sep 2025 10:05:18 -0400 Subject: [PATCH 14/14] formatting --- sdks/python/apache_beam/internal/code_object_pickler.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/internal/code_object_pickler.py b/sdks/python/apache_beam/internal/code_object_pickler.py index 8237c347973b..269bccb6b461 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler.py +++ b/sdks/python/apache_beam/internal/code_object_pickler.py @@ -81,11 +81,9 @@ def get_code_object_identifier(callable: types.FunctionType): - __main__.ClassWithNestedLambda.process.__code__.co_consts[ , ('x',), 1234567890] """ - if ( - not hasattr(callable, '__module__') or + if (not hasattr(callable, '__module__') or not hasattr(callable, '__qualname__') or not callable.__module__ or - callable.__module__ not in sys.modules - ): + callable.__module__ not in sys.modules): return None code_path: str = _extend_path( callable.__module__,