Skip to content

Commit

Permalink
refactor(meta): do not store upstream actors in merge node
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jan 22, 2025
1 parent 983dd18 commit 53af608
Show file tree
Hide file tree
Showing 19 changed files with 477 additions and 276 deletions.
2 changes: 1 addition & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ message MergeNode {
//
// `upstream_actor_id` stored in the plan node in `Fragment` meta model cannot be directly used.
// See `compose_fragment`.
repeated uint32 upstream_actor_id = 1;
repeated uint32 upstream_actor_id = 1 [deprecated = true];
uint32 upstream_fragment_id = 2;
// Type of the upstream dispatcher. If there's always one upstream according to this
// type, the compute node may use the `ReceiverExecutor` as an optimization.
Expand Down
11 changes: 10 additions & 1 deletion proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,17 @@ message InjectBarrierRequest {
repeated uint32 table_ids_to_sync = 5;
uint32 partial_graph_id = 6;

message BuildActorInfo {
message UpstreamActors {
repeated uint32 actors = 1;
}

stream_plan.StreamActor actor = 1;
map<uint32, UpstreamActors> upstreams = 2;
}

repeated common.ActorInfo broadcast_info = 8;
repeated stream_plan.StreamActor actors_to_build = 9;
repeated BuildActorInfo actors_to_build = 9;

Check failure on line 30 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "9" with name "actors_to_build" on message "InjectBarrierRequest" changed type from "stream_plan.StreamActor" to "stream_service.InjectBarrierRequest.BuildActorInfo".
repeated stream_plan.SubscriptionUpstreamInfo subscriptions_to_add = 10;
repeated stream_plan.SubscriptionUpstreamInfo subscriptions_to_remove = 11;
}
Expand Down
11 changes: 7 additions & 4 deletions src/meta/src/barrier/checkpoint/creating_job/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ use risingwave_common::util::epoch::Epoch;
use risingwave_meta_model::WorkerId;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_plan::StreamActor;
use risingwave_pb::stream_service::barrier_complete_response::{
CreateMviewProgress, PbCreateMviewProgress,
};
use tracing::warn;

use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch};
use crate::model::StreamActorWithUpstreams;

#[derive(Debug)]
pub(super) struct CreateMviewLogStoreProgressTracker {
Expand Down Expand Up @@ -110,7 +110,7 @@ pub(super) enum CreatingStreamingJobStatus {
pending_non_checkpoint_barriers: Vec<u64>,
/// Info of the first barrier: (`actors_to_create`, `mutation`)
/// Take the mutation out when injecting the first barrier
initial_barrier_info: Option<(HashMap<WorkerId, Vec<StreamActor>>, Mutation)>,
initial_barrier_info: Option<(HashMap<WorkerId, Vec<StreamActorWithUpstreams>>, Mutation)>,
},
/// The creating job is consuming log store.
///
Expand All @@ -126,7 +126,7 @@ pub(super) enum CreatingStreamingJobStatus {

pub(super) struct CreatingJobInjectBarrierInfo {
pub barrier_info: BarrierInfo,
pub new_actors: Option<HashMap<WorkerId, Vec<StreamActor>>>,
pub new_actors: Option<HashMap<WorkerId, Vec<StreamActorWithUpstreams>>>,
pub mutation: Option<Mutation>,
}

Expand Down Expand Up @@ -252,7 +252,10 @@ impl CreatingStreamingJobStatus {
pub(super) fn new_fake_barrier(
prev_epoch_fake_physical_time: &mut u64,
pending_non_checkpoint_barriers: &mut Vec<u64>,
initial_barrier_info: &mut Option<(HashMap<WorkerId, Vec<StreamActor>>, Mutation)>,
initial_barrier_info: &mut Option<(
HashMap<WorkerId, Vec<StreamActorWithUpstreams>>,
Mutation,
)>,
is_checkpoint: bool,
) -> CreatingJobInjectBarrierInfo {
{
Expand Down
10 changes: 6 additions & 4 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use risingwave_pb::stream_plan::update_mutation::*;
use risingwave_pb::stream_plan::{
AddMutation, BarrierMutation, CombinedMutation, Dispatcher, Dispatchers,
DropSubscriptionsMutation, PauseMutation, ResumeMutation, SourceChangeSplitMutation,
StopMutation, StreamActor, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
StopMutation, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
};
use risingwave_pb::stream_service::BarrierCompleteResponse;
use tracing::warn;
Expand All @@ -49,7 +49,9 @@ use crate::barrier::InflightSubscriptionInfo;
use crate::controller::fragment::InflightFragmentInfo;
use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
use crate::manager::{StreamingJob, StreamingJobType};
use crate::model::{ActorId, DispatcherId, FragmentId, StreamJobFragments};
use crate::model::{
ActorId, DispatcherId, FragmentId, StreamActorWithUpstreams, StreamJobFragments,
};
use crate::stream::{
build_actor_connector_splits, JobReschedulePostUpdates, SplitAssignment, ThrottleConfig,
};
Expand Down Expand Up @@ -83,7 +85,7 @@ pub struct Reschedule {
/// `Source` and `SourceBackfill` are handled together here.
pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,

pub newly_created_actors: Vec<(StreamActor, PbActorStatus)>,
pub newly_created_actors: Vec<(StreamActorWithUpstreams, PbActorStatus)>,
}

/// Replacing an old job with a new one. All actors in the job will be rebuilt.
Expand Down Expand Up @@ -952,7 +954,7 @@ impl Command {
mutation
}

pub fn actors_to_create(&self) -> Option<HashMap<WorkerId, Vec<StreamActor>>> {
pub fn actors_to_create(&self) -> Option<HashMap<WorkerId, Vec<StreamActorWithUpstreams>>> {
match self {
Command::CreateStreamingJob { info, job_type } => {
let mut map = match job_type {
Expand Down
5 changes: 2 additions & 3 deletions src/meta/src/barrier/context/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_common::config::DefaultParallelism;
use risingwave_common::hash::WorkerSlotId;
use risingwave_meta_model::StreamingParallelism;
use risingwave_pb::stream_plan::StreamActor;
use thiserror_ext::AsReport;
use tokio::time::Instant;
use tracing::{debug, info, warn};
Expand All @@ -34,7 +33,7 @@ use crate::barrier::info::InflightDatabaseInfo;
use crate::barrier::{DatabaseRuntimeInfoSnapshot, InflightSubscriptionInfo};
use crate::controller::fragment::InflightFragmentInfo;
use crate::manager::ActiveStreamingWorkerNodes;
use crate::model::{ActorId, StreamJobFragments, TableParallelism};
use crate::model::{ActorId, StreamActorWithUpstreams, StreamJobFragments, TableParallelism};
use crate::stream::{
JobParallelismTarget, JobReschedulePolicy, JobRescheduleTarget, JobResourceGroupTarget,
RescheduleOptions, SourceChange,
Expand Down Expand Up @@ -724,7 +723,7 @@ impl GlobalBarrierWorkerContextImpl {
}

/// Update all actors in compute nodes.
async fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
async fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActorWithUpstreams>> {
self.metadata_manager.all_active_actors().await
}
}
Expand Down
9 changes: 4 additions & 5 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ use risingwave_connector::source::SplitImpl;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::PbRecoveryStatus;
use risingwave_pb::stream_plan::StreamActor;
use tokio::sync::oneshot::Sender;

use self::notifier::Notifier;
use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo};
use crate::manager::ActiveStreamingWorkerNodes;
use crate::model::{ActorId, StreamJobFragments};
use crate::model::{ActorId, StreamActorWithUpstreams, StreamJobFragments};
use crate::{MetaError, MetaResult};

mod checkpoint;
Expand Down Expand Up @@ -104,7 +103,7 @@ struct BarrierWorkerRuntimeInfoSnapshot {
database_fragment_infos: HashMap<DatabaseId, InflightDatabaseInfo>,
state_table_committed_epochs: HashMap<TableId, u64>,
subscription_infos: HashMap<DatabaseId, InflightSubscriptionInfo>,
stream_actors: HashMap<ActorId, StreamActor>,
stream_actors: HashMap<ActorId, StreamActorWithUpstreams>,
source_splits: HashMap<ActorId, Vec<SplitImpl>>,
background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
hummock_version_stats: HummockVersionStats,
Expand All @@ -115,7 +114,7 @@ impl BarrierWorkerRuntimeInfoSnapshot {
database_id: DatabaseId,
database_info: &InflightDatabaseInfo,
active_streaming_nodes: &ActiveStreamingWorkerNodes,
stream_actors: &HashMap<ActorId, StreamActor>,
stream_actors: &HashMap<ActorId, StreamActorWithUpstreams>,
state_table_committed_epochs: &HashMap<TableId, u64>,
) -> MetaResult<()> {
{
Expand Down Expand Up @@ -190,7 +189,7 @@ struct DatabaseRuntimeInfoSnapshot {
database_fragment_info: InflightDatabaseInfo,
state_table_committed_epochs: HashMap<TableId, u64>,
subscription_info: InflightSubscriptionInfo,
stream_actors: HashMap<ActorId, StreamActor>,
stream_actors: HashMap<ActorId, StreamActorWithUpstreams>,
source_splits: HashMap<ActorId, Vec<SplitImpl>>,
background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
}
Expand Down
30 changes: 23 additions & 7 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ use risingwave_pb::common::{ActorInfo, WorkerNode};
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_plan::{
AddMutation, Barrier, BarrierMutation, StreamActor, SubscriptionUpstreamInfo,
};
use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation, SubscriptionUpstreamInfo};
use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
use risingwave_pb::stream_service::streaming_control_stream_request::{
CreatePartialGraphRequest, PbDatabaseInitialPartialGraph, PbInitRequest, PbInitialPartialGraph,
RemovePartialGraphRequest, ResetDatabaseRequest,
Expand All @@ -57,7 +57,7 @@ use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo};
use crate::barrier::progress::CreateMviewProgressTracker;
use crate::controller::fragment::InflightFragmentInfo;
use crate::manager::MetaSrvEnv;
use crate::model::{ActorId, StreamJobFragments};
use crate::model::{ActorId, StreamActorWithUpstreams, StreamJobFragments};
use crate::stream::build_actor_connector_splits;
use crate::{MetaError, MetaResult};

Expand Down Expand Up @@ -320,7 +320,7 @@ impl ControlStreamManager {
database_id: DatabaseId,
info: InflightDatabaseInfo,
state_table_committed_epochs: &mut HashMap<TableId, u64>,
stream_actors: &mut HashMap<ActorId, StreamActor>,
stream_actors: &mut HashMap<ActorId, StreamActorWithUpstreams>,
source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
background_jobs: &mut HashMap<TableId, (String, StreamJobFragments)>,
subscription_info: InflightSubscriptionInfo,
Expand Down Expand Up @@ -455,7 +455,7 @@ impl ControlStreamManager {
barrier_info: &BarrierInfo,
pre_applied_graph_info: impl IntoIterator<Item = &InflightFragmentInfo>,
applied_graph_info: impl IntoIterator<Item = &'a InflightFragmentInfo> + 'a,
mut new_actors: Option<HashMap<WorkerId, Vec<StreamActor>>>,
mut new_actors: Option<HashMap<WorkerId, Vec<StreamActorWithUpstreams>>>,
subscriptions_to_add: Vec<SubscriptionUpstreamInfo>,
subscriptions_to_remove: Vec<SubscriptionUpstreamInfo>,
) -> MetaResult<HashSet<WorkerId>> {
Expand Down Expand Up @@ -484,7 +484,7 @@ impl ControlStreamManager {
.flatten()
.flat_map(|(worker_id, actor_infos)| {
actor_infos.iter().map(|actor_info| ActorInfo {
actor_id: actor_info.actor_id,
actor_id: actor_info.0.actor_id,
host: self
.nodes
.get(worker_id)
Expand Down Expand Up @@ -541,6 +541,22 @@ impl ControlStreamManager {
.into_iter()
.flatten()
.flatten()
.map(|(actor, upstreams)| BuildActorInfo {
actor: Some(actor),
upstreams: upstreams
.into_iter()
.map(|(fragment_id, upstreams)| {
(
fragment_id,
UpstreamActors {
actors: upstreams
.into_iter()
.collect(),
},
)
})
.collect(),
})
.collect(),
subscriptions_to_add: subscriptions_to_add.clone(),
subscriptions_to_remove: subscriptions_to_remove.clone(),
Expand Down
Loading

0 comments on commit 53af608

Please sign in to comment.