Skip to content

[Bug]: No "Operation ongoing ..." logs for the work threads stuck at deserializing DoFn in python pipelines #36022

@baeminbo

Description

@baeminbo

What happened?

The _log_lull_in_bundle_processor examines the bundle processes in active_bundle_processors.keys() to report Operation ongoing in bundle ... for stalled work threads.

The active_bundle_processors is updated after creating a BundleProcessor where DoFn objects are deserialized.

Therefore, if the work thread is stalled at deserializing DoFn, the thread is not reported in the warning log Operation ongoing ....


The following is an example of the stacktrace for a work thread stalled in DoFn deserialization (a long sleep was put at __setstate to simulate this issue). The BundleProcessor is created at sdk_worker.py:511, and active_bundle_processors is updated at sdk_worker.py:520.

--- Thread #138116862035648 name: Thread-16 ---
  File "/usr/local/lib/python3.11/threading.py", line 1002, in _bootstrap
    self._bootstrap_inner()
  File "/usr/local/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.11/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run
    self._work_item.run()
  File "/usr/local/lib/python3.11/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run
    self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 386, in task
    self._execute(
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, in _execute
    response = task()
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 387, in 
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 659, in do_instruction
    return getattr(self, request_type)(
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 690, in process_bundle
    bundle_processor = self.bundle_processor_cache.get(
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 511, in get
    processor = bundle_processor.BundleProcessor(
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1133, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1190, in create_execution_tree
    return collections.OrderedDict([(
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1193, in 
    get_operation(transform_id))) for transform_id in sorted(
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation
    transform_consumers = {
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in 
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in 
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation
    transform_consumers = {
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in 
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in 
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation
    transform_consumers = {
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in 
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in 
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1178, in get_operation
    return transform_factory.create_operation(
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1497, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1828, in create_par_do
    return _create_pardo_operation(
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1872, in _create_pardo_operation
    dofn_data = pickler.loads(serialized_fn)
  File "/usr/local/lib/python3.11/site-packages/apache_beam/internal/pickler.py", line 57, in loads
    return desired_pickle_lib.loads(
  File "/usr/local/lib/python3.11/site-packages/apache_beam/internal/cloudpickle_pickler.py", line 176, in loads
    unpickled = cloudpickle.loads(s)
  File "/Users/baeminbo/Documents/workspace/dataflow-pipelines/py-subprocess/pipeline.py", line 30, in __setstate__

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions