Skip to content

Commit acf792e

Browse files
authored
[data] MapOperator.num_active_tasks should exclude pending actors (ray-project#46364)
`MapOperator.num_active_tasks` should exclude the pending actors. Because 1. `PhysicalOperator.completed` checks `num_active_tasks`. The operator should be considered completed if there are still pending actors. 2. The number of active tasks in the progress bar will be more accurate to reflect the actual data processing tasks. --------- Signed-off-by: Hao Chen <[email protected]>
1 parent 77660e9 commit acf792e

File tree

4 files changed

+95
-4
lines changed

4 files changed

+95
-4
lines changed

python/ray/data/_internal/execution/interfaces/physical_operator.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,12 +351,25 @@ def _get_next_inner(self) -> RefBundle:
351351
raise NotImplementedError
352352

353353
def get_active_tasks(self) -> List[OpTask]:
354-
"""Get a list of the active tasks of this operator."""
354+
"""Get a list of the active tasks of this operator.
355+
356+
Subclasses should return *all* running normal/actor tasks. The
357+
StreamingExecutor will wait on these tasks and trigger callbacks.
358+
"""
355359
return []
356360

357361
def num_active_tasks(self) -> int:
358362
"""Return the number of active tasks.
359363
364+
This method is used for 2 purposes:
365+
* Determine if this operator is completed.
366+
* Displaying active task info in the progress bar.
367+
Thus, the return value can be less than `len(get_active_tasks())`,
368+
if some tasks are not needed for the above purposes. E.g., for the
369+
actor pool map operator, readiness checking tasks can be excluded
370+
from `num_active_tasks`, but they should be included in
371+
`get_active_tasks`.
372+
360373
Subclasses can override this as a performance optimization.
361374
"""
362375
return len(self.get_active_tasks())

python/ray/data/_internal/execution/operators/map_operator.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,17 @@ def implements_accurate_memory_accounting(self) -> bool:
414414
def supports_fusion(self) -> bool:
415415
return self._supports_fusion
416416

417+
def num_active_tasks(self) -> int:
418+
# Override `num_active_tasks` to only include data tasks and exclude
419+
# metadata tasks, which are used by the actor-pool map operator to
420+
# check if a newly created actor is ready.
421+
# The reasons are because:
422+
# 1. `PhysicalOperator.completed` checks `num_active_tasks`. The operator
423+
# should be considered completed if there are still pending actors.
424+
# 2. The number of active tasks in the progress bar will be more accurate
425+
# to reflect the actual data processing tasks.
426+
return len(self._data_tasks)
427+
417428

418429
def _map_task(
419430
map_transformer: MapTransformer,

python/ray/data/tests/test_executor_resource_management.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,8 @@ def test_actor_pool_resource_reporting(ray_start_10_cpus_shared, restore_data_co
313313
assert op.metrics.obj_store_mem_pending_task_outputs == 0
314314

315315
# Wait for actors to start.
316-
assert op.num_active_tasks() == 2
316+
assert op.num_active_tasks() == 0
317+
assert op._actor_pool.num_pending_actors() == 2
317318
run_op_tasks_sync(op, only_existing=True)
318319

319320
# Actors have now started and the pool is actively running tasks.
@@ -411,7 +412,8 @@ def test_actor_pool_resource_reporting_with_bundling(ray_start_10_cpus_shared):
411412
assert op.metrics.obj_store_mem_pending_task_outputs == 0
412413

413414
# Wait for actors to start.
414-
assert op.num_active_tasks() == 2
415+
assert op.num_active_tasks() == 0
416+
assert op._actor_pool.num_pending_actors() == 2
415417
run_op_tasks_sync(op, only_existing=True)
416418

417419
# Actors have now started and the pool is actively running tasks.

python/ray/data/tests/test_operators.py

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from ray.data.block import Block, BlockAccessor
4242
from ray.data.context import DataContext
4343
from ray.data.tests.util import run_one_op_task, run_op_tasks_sync
44+
from ray.tests.client_test_utils import create_remote_signal_actor
4445
from ray.tests.conftest import * # noqa
4546

4647

@@ -443,7 +444,7 @@ def _sleep(block_iter: Iterable[Block]) -> Iterable[Block]:
443444

444445
# Create with inputs.
445446
input_op = InputDataBuffer(make_ref_bundles([[i] for i in range(10)]))
446-
compute_strategy = ActorPoolStrategy() if use_actors else TaskPoolStrategy()
447+
compute_strategy = ActorPoolStrategy(size=1) if use_actors else TaskPoolStrategy()
447448
op = MapOperator.create(
448449
create_map_transformer_from_block_fn(_sleep),
449450
input_op=input_op,
@@ -454,6 +455,9 @@ def _sleep(block_iter: Iterable[Block]) -> Iterable[Block]:
454455

455456
# Start one task and then cancel.
456457
op.start(ExecutionOptions())
458+
if use_actors:
459+
# Wait for the actor to start.
460+
run_op_tasks_sync(op)
457461
op.add_input(input_op.get_next(), 0)
458462
assert op.num_active_tasks() == 1
459463
op.shutdown()
@@ -517,6 +521,67 @@ def _sleep(block_iter: Iterable[Block]) -> Iterable[Block]:
517521
assert not op.should_add_input()
518522

519523

524+
def test_actor_pool_map_operator_num_active_tasks_and_completed(shutdown_only):
525+
"""Tests ActorPoolMapOperator's num_active_tasks and completed methods."""
526+
num_actors = 2
527+
ray.shutdown()
528+
ray.init(num_cpus=num_actors)
529+
530+
signal_actor = create_remote_signal_actor(ray).options(num_cpus=0).remote()
531+
532+
def _map_transfom_fn(block_iter: Iterable[Block], _) -> Iterable[Block]:
533+
ray.get(signal_actor.wait.remote())
534+
yield from block_iter
535+
536+
input_op = InputDataBuffer(make_ref_bundles([[i] for i in range(num_actors)]))
537+
compute_strategy = ActorPoolStrategy(min_size=num_actors, max_size=2 * num_actors)
538+
539+
# Create an operator with [num_actors, 2 * num_actors] actors.
540+
# Resources are limited to num_actors, so the second half will be pending.
541+
op = MapOperator.create(
542+
create_map_transformer_from_block_fn(_map_transfom_fn),
543+
input_op=input_op,
544+
name="TestMapper",
545+
compute_strategy=compute_strategy,
546+
)
547+
actor_pool = op._actor_pool
548+
549+
# Wait for the op to scale up to the min size.
550+
op.start(ExecutionOptions())
551+
run_op_tasks_sync(op)
552+
assert actor_pool.num_running_actors() == num_actors
553+
assert op.num_active_tasks() == 0
554+
555+
# Scale up to the max size, the second half of the actors will be pending.
556+
actor_pool.scale_up(num_actors)
557+
assert actor_pool.num_pending_actors() == num_actors
558+
# `num_active_tasks` should exclude the metadata tasks for the pending actors.
559+
assert op.num_active_tasks() == 0
560+
561+
# Add inputs.
562+
for _ in range(num_actors):
563+
assert op.should_add_input()
564+
op.add_input(input_op.get_next(), 0)
565+
# Still `num_active_tasks` should only include data tasks.
566+
assert op.num_active_tasks() == num_actors
567+
assert actor_pool.num_pending_actors() == num_actors
568+
569+
# Let the data tasks complete.
570+
signal_actor.send.remote()
571+
while len(op._data_tasks) > 0:
572+
run_one_op_task(op)
573+
assert op.num_active_tasks() == 0
574+
assert actor_pool.num_pending_actors() == num_actors
575+
576+
# Mark the inputs done and take all outputs.
577+
# The operator should be completed, even if there are pending actors.
578+
op.all_inputs_done()
579+
while op.has_next():
580+
op.get_next()
581+
assert actor_pool.num_pending_actors() == num_actors
582+
assert op.completed()
583+
584+
520585
@pytest.mark.parametrize(
521586
"compute,expected",
522587
[

0 commit comments

Comments
 (0)