Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Single fragment scaling is too conservative #19259

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 31 additions & 43 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -968,9 +968,9 @@ impl ScaleController {
return;
}

let fragment = ctx.fragment_map.get(fragment_id).unwrap();
let fragment = &ctx.fragment_map[fragment_id];

let upstream_fragment = ctx.fragment_map.get(upstream_fragment_id).unwrap();
let upstream_fragment = &ctx.fragment_map[upstream_fragment_id];

// build actor group map
for upstream_actor in &upstream_fragment.actors {
Expand All @@ -993,8 +993,7 @@ impl ScaleController {
(upstream_fragment.fragment_id, upstream_actor.actor_id),
);
} else {
let root_actor_id =
*actor_group_map.get(&upstream_actor.actor_id).unwrap();
let root_actor_id = actor_group_map[&upstream_actor.actor_id];

actor_group_map.insert(downstream_actor_id, root_actor_id);
}
Expand Down Expand Up @@ -1182,7 +1181,7 @@ impl ScaleController {
.cloned()
.unwrap_or_default();

let fragment = ctx.fragment_map.get(fragment_id).unwrap();
let fragment = &ctx.fragment_map[fragment_id];

assert!(!fragment.actors.is_empty());

Expand Down Expand Up @@ -1223,11 +1222,10 @@ impl ScaleController {
// Because we are in the Pause state, so it's no problem to reallocate
let mut fragment_actor_splits = HashMap::new();
for fragment_id in reschedules.keys() {
let actors_after_reschedule =
fragment_actors_after_reschedule.get(fragment_id).unwrap();
let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];

if ctx.stream_source_fragment_ids.contains(fragment_id) {
let fragment = ctx.fragment_map.get(fragment_id).unwrap();
let fragment = &ctx.fragment_map[fragment_id];

let prev_actor_ids = fragment
.actors
Expand Down Expand Up @@ -1257,14 +1255,13 @@ impl ScaleController {
// We use 2 iterations to make sure source actors are migrated first, and then align backfill actors
if !ctx.stream_source_backfill_fragment_ids.is_empty() {
for fragment_id in reschedules.keys() {
let actors_after_reschedule =
fragment_actors_after_reschedule.get(fragment_id).unwrap();
let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];

if ctx
.stream_source_backfill_fragment_ids
.contains(fragment_id)
{
let fragment = ctx.fragment_map.get(fragment_id).unwrap();
let fragment = &ctx.fragment_map[fragment_id];

let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();

Expand Down Expand Up @@ -1308,12 +1305,11 @@ impl ScaleController {
.into_keys()
.collect();

let actors_after_reschedule =
fragment_actors_after_reschedule.get(&fragment_id).unwrap();
let actors_after_reschedule = &fragment_actors_after_reschedule[&fragment_id];

assert!(!actors_after_reschedule.is_empty());

let fragment = ctx.fragment_map.get(&fragment_id).unwrap();
let fragment = &ctx.fragment_map[&fragment_id];

let in_degree_types: HashSet<_> = fragment
.upstream_fragment_ids
Expand Down Expand Up @@ -1568,7 +1564,7 @@ impl ScaleController {
no_shuffle_downstream_actors_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
new_actor: &mut PbStreamActor,
) -> MetaResult<()> {
let fragment = &ctx.fragment_map.get(&new_actor.fragment_id).unwrap();
let fragment = &ctx.fragment_map[&new_actor.fragment_id];
let mut applied_upstream_fragment_actor_ids = HashMap::new();

for upstream_fragment_id in &fragment.upstream_fragment_ids {
Expand All @@ -1581,7 +1577,7 @@ impl ScaleController {
match upstream_dispatch_type {
DispatcherType::Unspecified => unreachable!(),
DispatcherType::Hash | DispatcherType::Broadcast | DispatcherType::Simple => {
let upstream_fragment = &ctx.fragment_map.get(upstream_fragment_id).unwrap();
let upstream_fragment = &ctx.fragment_map[upstream_fragment_id];
let mut upstream_actor_ids = upstream_fragment
.actors
.iter()
Expand Down Expand Up @@ -1939,10 +1935,9 @@ impl ScaleController {

let mut fragment_slots: BTreeMap<WorkerId, usize> = BTreeMap::new();

for actor_id in fragment_actor_id_map.get(&fragment_id).unwrap() {
let worker_id = actor_location.get(actor_id).unwrap();

*fragment_slots.entry(*worker_id).or_default() += 1;
for actor_id in &fragment_actor_id_map[&fragment_id] {
let worker_id = actor_location[actor_id];
*fragment_slots.entry(worker_id).or_default() += 1;
}

let all_available_slots: usize = schedulable_worker_slots.values().cloned().sum();
Expand All @@ -1954,22 +1949,19 @@ impl ScaleController {
);
}

let &(dist, vnode_count) = fragment_distribution_map.get(&fragment_id).unwrap();
let (dist, vnode_count) = fragment_distribution_map[&fragment_id];
let max_parallelism = vnode_count;

match dist {
FragmentDistributionType::Unspecified => unreachable!(),
FragmentDistributionType::Single => {
let (single_worker_id, should_be_one) =
fragment_slots.iter().exactly_one().unwrap();
let (single_worker_id, should_be_one) = fragment_slots
.iter()
.exactly_one()
.expect("single fragment should have only one worker slot");

assert_eq!(*should_be_one, 1);

if schedulable_worker_slots.contains_key(single_worker_id) {
// NOTE: shall we continue?
continue;
}

let units =
schedule_units_for_slots(&schedulable_worker_slots, 1, table_id)?;

Expand All @@ -1981,7 +1973,11 @@ impl ScaleController {
})?;

assert_eq!(*should_be_one, 1);
assert_ne!(*chosen_target_worker_id, *single_worker_id);

if *chosen_target_worker_id == *single_worker_id {
tracing::debug!("single fragment {fragment_id} already on target worker {chosen_target_worker_id}");
continue;
}

target_plan.insert(
fragment_id,
Expand Down Expand Up @@ -2181,17 +2177,13 @@ impl ScaleController {
}

// for upstream
for upstream_fragment_id in &fragment_map
.get(&fragment_id)
.unwrap()
.upstream_fragment_ids
{
for upstream_fragment_id in &fragment_map[&fragment_id].upstream_fragment_ids {
if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
continue;
}

let table_id = fragment_to_table.get(&fragment_id).unwrap();
let upstream_table_id = fragment_to_table.get(upstream_fragment_id).unwrap();
let table_id = &fragment_to_table[&fragment_id];
let upstream_table_id = &fragment_to_table[upstream_fragment_id];

// Only custom parallelism will be propagated to the no shuffle upstream.
if let Some(TableParallelism::Custom) = table_parallelisms.get(table_id) {
Expand Down Expand Up @@ -2248,16 +2240,12 @@ impl ScaleController {
}

// for upstream
for upstream_fragment_id in &fragment_map
.get(&fragment_id)
.unwrap()
.upstream_fragment_ids
{
for upstream_fragment_id in &fragment_map[&fragment_id].upstream_fragment_ids {
if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
continue;
}

let reschedule_plan = reschedule.get(&fragment_id).unwrap();
let reschedule_plan = &reschedule[&fragment_id];

if let Some(upstream_reschedule_plan) = reschedule.get(upstream_fragment_id) {
if upstream_reschedule_plan != reschedule_plan {
Expand Down Expand Up @@ -2719,7 +2707,7 @@ impl ConsistentHashRing {
let ring_range = self.ring.range(task_hash..).chain(self.ring.iter());

for (_, &worker_id) in ring_range {
let task_limit = *soft_limits.get(&worker_id).unwrap();
let task_limit = soft_limits[&worker_id];

let worker_task_count = task_distribution.entry(worker_id).or_insert(0);

Expand Down
Loading