diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 9ddc4a6250e2c..7a5f9a3711f6a 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -68,6 +68,7 @@ message StreamingControlStreamRequest { message InitialPartialGraph { uint32 partial_graph_id = 1; repeated stream_plan.SubscriptionUpstreamInfo subscriptions = 2; + repeated common.ActorInfo actor_infos = 3; } message DatabaseInitialPartialGraph { diff --git a/src/meta/src/barrier/checkpoint/control.rs b/src/meta/src/barrier/checkpoint/control.rs index 4a22ff3a0ac2f..c74ed468e26e4 100644 --- a/src/meta/src/barrier/checkpoint/control.rs +++ b/src/meta/src/barrier/checkpoint/control.rs @@ -38,7 +38,7 @@ use crate::barrier::checkpoint::recovery::{ use crate::barrier::checkpoint::state::BarrierWorkerState; use crate::barrier::command::CommandContext; use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask}; -use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo}; +use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo, InflightStreamingJobInfo}; use crate::barrier::notifier::Notifier; use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob}; use crate::barrier::rpc::{from_partial_graph_id, ControlStreamManager}; @@ -342,13 +342,18 @@ impl CheckpointControl { }); } - pub(crate) fn subscriptions( + pub(crate) fn inflight_infos( &self, - ) -> impl Iterator + '_ { + ) -> impl Iterator + '_ + { self.databases.iter().flat_map(|(database_id, database)| { - database - .checkpoint_control() - .map(|database| (*database_id, &database.state.inflight_subscription_info)) + database.checkpoint_control().map(|database| { + ( + *database_id, + &database.state.inflight_subscription_info, + &database.state.inflight_graph_info, + ) + }) }) } } diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 2dbf0ed70fb1c..75cf419724a1e 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -101,7 +101,9 @@ impl ControlStreamManager { pub(super) async fn add_worker( &mut self, node: WorkerNode, - initial_subscriptions: impl Iterator, + inflight_infos: impl Iterator< + Item = (DatabaseId, &InflightSubscriptionInfo, &InflightDatabaseInfo), + >, context: &impl GlobalBarrierWorkerContext, ) { let node_id = node.id as WorkerId; @@ -113,7 +115,7 @@ impl ControlStreamManager { let mut backoff = ExponentialBackoff::from_millis(100) .max_delay(Duration::from_secs(3)) .factor(5); - let init_request = Self::collect_init_request(initial_subscriptions); + let init_request = self.collect_init_request(inflight_infos); const MAX_RETRY: usize = 5; for i in 1..=MAX_RETRY { match context.new_control_stream(&node, &init_request).await { @@ -145,11 +147,10 @@ impl ControlStreamManager { pub(super) async fn reset( &mut self, - initial_subscriptions: impl Iterator, nodes: &HashMap, context: &impl GlobalBarrierWorkerContext, ) -> MetaResult<()> { - let init_request = Self::collect_init_request(initial_subscriptions); + let init_request = PbInitRequest { databases: vec![] }; let init_request = &init_request; let nodes = try_join_all(nodes.iter().map(|(worker_id, node)| async move { let handle = context.new_control_stream(node, init_request).await?; @@ -267,17 +268,41 @@ impl ControlStreamManager { } fn collect_init_request( - initial_subscriptions: impl Iterator, + &self, + initial_inflight_infos: impl Iterator< + Item = (DatabaseId, &InflightSubscriptionInfo, &InflightDatabaseInfo), + >, ) -> PbInitRequest { PbInitRequest { - databases: initial_subscriptions - .map(|(database_id, info)| PbDatabaseInitialPartialGraph { - database_id: database_id.database_id, - graphs: vec![PbInitialPartialGraph { - partial_graph_id: to_partial_graph_id(None), - subscriptions: info.into_iter().collect_vec(), - }], - }) + databases: initial_inflight_infos + .map( + |(database_id, subscriptions, inflight_info)| PbDatabaseInitialPartialGraph { + database_id: database_id.database_id, + graphs: vec![PbInitialPartialGraph { + partial_graph_id: to_partial_graph_id(None), + subscriptions: subscriptions.into_iter().collect_vec(), + actor_infos: inflight_info + .fragment_infos() + .flat_map(|fragment| { + fragment.actors.iter().map(|(actor_id, worker_id)| { + let host_addr = self + .nodes + .get(worker_id) + .expect("worker should exist for inflight actor") + .worker + .host + .clone() + .expect("should exist"); + ActorInfo { + actor_id: *actor_id, + host: Some(host_addr), + } + }) + }) + .collect(), + }], + }, + ) .collect(), } } @@ -368,7 +393,7 @@ impl ControlStreamManager { info.fragment_infos(), info.fragment_infos(), Some(node_actors), - vec![], + (&subscription_info).into_iter().collect(), vec![], )?; debug!( diff --git a/src/meta/src/barrier/worker.rs b/src/meta/src/barrier/worker.rs index c8c4d0faa29e1..c8bb6d5296581 100644 --- a/src/meta/src/barrier/worker.rs +++ b/src/meta/src/barrier/worker.rs @@ -14,7 +14,7 @@ use std::collections::HashMap; use std::mem::replace; -use std::sync::{Arc, LazyLock}; +use std::sync::Arc; use std::time::Duration; use anyhow::anyhow; @@ -41,7 +41,7 @@ use crate::barrier::rpc::{merge_node_rpc_errors, ControlStreamManager}; use crate::barrier::schedule::PeriodicBarriers; use crate::barrier::{ schedule, BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, - InflightSubscriptionInfo, RecoveryReason, + RecoveryReason, }; use crate::error::MetaErrorInner; use crate::hummock::HummockManagerRef; @@ -281,7 +281,7 @@ impl GlobalBarrierWorker { info!(?changed_worker, "worker changed"); if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker { - self.control_stream_manager.add_worker(node, self.checkpoint_control.subscriptions(), &*self.context).await; + self.control_stream_manager.add_worker(node, self.checkpoint_control.inflight_infos(), &*self.context).await; } } @@ -644,10 +644,8 @@ impl GlobalBarrierWorker { let mut control_stream_manager = ControlStreamManager::new(self.env.clone()); let reset_start_time = Instant::now(); - let empty_subscriptions = LazyLock::new(InflightSubscriptionInfo::default); control_stream_manager .reset( - database_fragment_infos.keys().map(|database_id| (*database_id, subscription_infos.get(database_id).unwrap_or_else(|| &*empty_subscriptions))), active_streaming_nodes.current(), &*self.context, ) @@ -661,6 +659,7 @@ impl GlobalBarrierWorker { let mut collected_databases = HashMap::new(); let mut collecting_databases = HashMap::new(); for (database_id, info) in database_fragment_infos { + control_stream_manager.add_partial_graph(database_id, None)?; let (node_to_collect, database, prev_epoch) = control_stream_manager.inject_database_initial_barrier( database_id, info, diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index 90e6ef9592194..b668790a2862a 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -1199,13 +1199,11 @@ mod tests { let (old_simple, new_simple) = (114, 514); // simple downstream actors // 1. Register info in context. - { - let mut actor_infos = ctx.actor_infos.write(); - - for local_actor_id in [actor_id, untouched, old, new, old_simple, new_simple] { - actor_infos.insert(local_actor_id, helper_make_local_actor(local_actor_id)); - } - } + ctx.add_actors( + [actor_id, untouched, old, new, old_simple, new_simple] + .into_iter() + .map(helper_make_local_actor), + ); // actor_id -> untouched, old, new, old_simple, new_simple let broadcast_dispatcher_id = 666; diff --git a/src/stream/src/executor/merge.rs b/src/stream/src/executor/merge.rs index 960a7857f801e..3bcdffd3d2481 100644 --- a/src/stream/src/executor/merge.rs +++ b/src/stream/src/executor/merge.rs @@ -889,13 +889,11 @@ mod tests { let metrics = Arc::new(StreamingMetrics::unused()); // 1. Register info in context. - { - let mut actor_infos = ctx.actor_infos.write(); - - for local_actor_id in [actor_id, untouched, old, new] { - actor_infos.insert(local_actor_id, helper_make_local_actor(local_actor_id)); - } - } + ctx.add_actors( + [actor_id, untouched, old, new] + .into_iter() + .map(helper_make_local_actor), + ); // untouched -> actor_id // old -> actor_id // new -> actor_id diff --git a/src/stream/src/executor/receiver.rs b/src/stream/src/executor/receiver.rs index 1fd5e04eb6804..5bfa08df93288 100644 --- a/src/stream/src/executor/receiver.rs +++ b/src/stream/src/executor/receiver.rs @@ -224,13 +224,13 @@ mod tests { let metrics = Arc::new(StreamingMetrics::unused()); // 1. Register info in context. - { - let mut actor_infos = ctx.actor_infos.write(); - for local_actor_id in [actor_id, old, new] { - actor_infos.insert(local_actor_id, helper_make_local_actor(local_actor_id)); - } - } + ctx.add_actors( + [actor_id, old, new] + .into_iter() + .map(helper_make_local_actor), + ); + // old -> actor_id // new -> actor_id diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 10cbdb0090146..3d9ee02fb4a32 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -417,7 +417,7 @@ impl LocalBarrierWorker { match actor_op { LocalActorOperation::NewControlStream { handle, init_request } => { self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one")); - self.reset(init_request.databases).await; + self.reset(init_request).await; self.control_stream_handle = handle; self.control_stream_handle.send_response(streaming_control_stream_response::Response::Init(InitResponse {})); } @@ -470,7 +470,7 @@ impl LocalBarrierWorker { let database_id = DatabaseId::new(req.database_id); let result: StreamResult<()> = try { let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?; - self.update_actor_info(database_id, req.broadcast_info.iter().cloned())?; + self.update_actor_info(database_id, req.broadcast_info.iter().cloned()); self.send_barrier(&barrier, req)?; }; result.map_err(|e| (database_id, e))?; @@ -904,11 +904,6 @@ impl LocalBarrierWorker { ); } - /// Reset all internal states. - pub(super) fn reset_state(&mut self, initial_partial_graphs: Vec) { - *self = Self::new(self.actor_manager.clone(), initial_partial_graphs); - } - /// When some other failure happens (like failed to send barrier), the error is reported using /// this function. The control stream will be responded with a message to notify about the error, /// and the global barrier worker will later reset and rerun the database. @@ -1219,6 +1214,7 @@ pub(crate) mod barrier_test_utils { graphs: vec![PbInitialPartialGraph { partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(), subscriptions: vec![], + actor_infos: vec![], }], }], }, diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 1ca9cdf5c555c..6918a65657611 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -525,6 +525,13 @@ impl ManagedBarrierState { let database_id = DatabaseId::new(database.database_id); assert!(!databases.contains_key(&database_id)); let shared_context = Arc::new(SharedContext::new(database_id, &actor_manager.env)); + shared_context.add_actors( + database + .graphs + .iter() + .flat_map(|graph| graph.actor_infos.iter()) + .cloned(), + ); let state = DatabaseManagedBarrierState::new( database_id, actor_manager.clone(), diff --git a/src/stream/src/task/mod.rs b/src/stream/src/task/mod.rs index 3f0609bc3f830..881c49843374d 100644 --- a/src/stream/src/task/mod.rs +++ b/src/stream/src/task/mod.rs @@ -88,10 +88,10 @@ pub struct SharedContext { /// /// The channel serves as a buffer because `ExchangeServiceImpl` /// is on the server-side and we will also introduce backpressure. - pub(crate) channel_map: Mutex>, + channel_map: Mutex>, /// Stores all actor information. - pub(crate) actor_infos: RwLock>, + actor_infos: RwLock>, /// Stores the local address. /// @@ -206,6 +206,26 @@ impl SharedContext { actor_infos.remove(actor_id); } } + + pub(crate) fn add_actors(&self, new_actor_infos: impl Iterator) { + let mut actor_infos = self.actor_infos.write(); + for actor in new_actor_infos { + if let Some(prev_actor) = actor_infos.get(&actor.get_actor_id()) { + if cfg!(debug_assertions) { + panic!("duplicate actor info: {:?} {:?}", actor, actor_infos); + } + if prev_actor != &actor { + warn!( + ?prev_actor, + ?actor, + "add actor again but have different actor info. ignored" + ); + } + } else { + actor_infos.insert(actor.get_actor_id(), actor); + } + } + } } /// Generate a globally unique executor id. diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index eba1646259ea4..11f92bff8a459 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -28,15 +28,13 @@ use itertools::Itertools; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ColumnId, DatabaseId, Field, Schema, TableId}; use risingwave_common::config::MetricLevel; -use risingwave_common::{bail, must_match}; +use risingwave_common::must_match; use risingwave_pb::common::ActorInfo; use risingwave_pb::plan_common::StorageTableDesc; use risingwave_pb::stream_plan; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{StreamActor, StreamNode, StreamScanNode, StreamScanType}; -use risingwave_pb::stream_service::streaming_control_stream_request::{ - DatabaseInitialPartialGraph, InitRequest, -}; +use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest; use risingwave_pb::stream_service::{ StreamingControlStreamRequest, StreamingControlStreamResponse, }; @@ -256,7 +254,7 @@ impl LocalStreamManager { impl LocalBarrierWorker { /// Force stop all actors on this worker, and then drop their resources. - pub(super) async fn reset(&mut self, initial_partial_graphs: Vec) { + pub(super) async fn reset(&mut self, init_request: InitRequest) { join_all( self.state .databases @@ -275,7 +273,7 @@ impl LocalBarrierWorker { .await } self.actor_manager.env.dml_manager_ref().clear(); - self.reset_state(initial_partial_graphs); + *self = Self::new(self.actor_manager.clone(), init_request.databases); } } @@ -749,26 +747,13 @@ impl LocalBarrierWorker { &mut self, database_id: DatabaseId, new_actor_infos: impl Iterator, - ) -> StreamResult<()> { - let mut actor_infos = Self::get_or_insert_database_shared_context( + ) { + Self::get_or_insert_database_shared_context( &mut self.state.current_shared_context, database_id, &self.actor_manager, ) - .actor_infos - .write(); - for actor in new_actor_infos { - if let Some(prev_actor) = actor_infos.get(&actor.get_actor_id()) - && &actor != prev_actor - { - bail!( - "actor info mismatch when broadcasting {}", - actor.get_actor_id() - ); - } - actor_infos.insert(actor.get_actor_id(), actor); - } - Ok(()) + .add_actors(new_actor_infos); } }