Skip to content

Commit

Permalink
Merge branch 'main' into lz/limit_offset
Browse files Browse the repository at this point in the history
  • Loading branch information
lmatz authored Dec 27, 2024
2 parents 6d6b4a4 + fdd2493 commit 3de56f6
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 76 deletions.
1 change: 1 addition & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ message StreamingControlStreamRequest {
message InitialPartialGraph {
uint32 partial_graph_id = 1;
repeated stream_plan.SubscriptionUpstreamInfo subscriptions = 2;
repeated common.ActorInfo actor_infos = 3;
}

message DatabaseInitialPartialGraph {
Expand Down
17 changes: 11 additions & 6 deletions src/meta/src/barrier/checkpoint/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::barrier::checkpoint::recovery::{
use crate::barrier::checkpoint::state::BarrierWorkerState;
use crate::barrier::command::CommandContext;
use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo, InflightStreamingJobInfo};
use crate::barrier::notifier::Notifier;
use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob};
use crate::barrier::rpc::{from_partial_graph_id, ControlStreamManager};
Expand Down Expand Up @@ -342,13 +342,18 @@ impl CheckpointControl {
});
}

pub(crate) fn subscriptions(
pub(crate) fn inflight_infos(
&self,
) -> impl Iterator<Item = (DatabaseId, &InflightSubscriptionInfo)> + '_ {
) -> impl Iterator<Item = (DatabaseId, &InflightSubscriptionInfo, &InflightDatabaseInfo)> + '_
{
self.databases.iter().flat_map(|(database_id, database)| {
database
.checkpoint_control()
.map(|database| (*database_id, &database.state.inflight_subscription_info))
database.checkpoint_control().map(|database| {
(
*database_id,
&database.state.inflight_subscription_info,
&database.state.inflight_graph_info,
)
})
})
}
}
Expand Down
53 changes: 39 additions & 14 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ impl ControlStreamManager {
pub(super) async fn add_worker(
&mut self,
node: WorkerNode,
initial_subscriptions: impl Iterator<Item = (DatabaseId, &InflightSubscriptionInfo)>,
inflight_infos: impl Iterator<
Item = (DatabaseId, &InflightSubscriptionInfo, &InflightDatabaseInfo),
>,
context: &impl GlobalBarrierWorkerContext,
) {
let node_id = node.id as WorkerId;
Expand All @@ -113,7 +115,7 @@ impl ControlStreamManager {
let mut backoff = ExponentialBackoff::from_millis(100)
.max_delay(Duration::from_secs(3))
.factor(5);
let init_request = Self::collect_init_request(initial_subscriptions);
let init_request = self.collect_init_request(inflight_infos);
const MAX_RETRY: usize = 5;
for i in 1..=MAX_RETRY {
match context.new_control_stream(&node, &init_request).await {
Expand Down Expand Up @@ -145,11 +147,10 @@ impl ControlStreamManager {

pub(super) async fn reset(
&mut self,
initial_subscriptions: impl Iterator<Item = (DatabaseId, &InflightSubscriptionInfo)>,
nodes: &HashMap<WorkerId, WorkerNode>,
context: &impl GlobalBarrierWorkerContext,
) -> MetaResult<()> {
let init_request = Self::collect_init_request(initial_subscriptions);
let init_request = PbInitRequest { databases: vec![] };
let init_request = &init_request;
let nodes = try_join_all(nodes.iter().map(|(worker_id, node)| async move {
let handle = context.new_control_stream(node, init_request).await?;
Expand Down Expand Up @@ -267,17 +268,41 @@ impl ControlStreamManager {
}

fn collect_init_request(
initial_subscriptions: impl Iterator<Item = (DatabaseId, &InflightSubscriptionInfo)>,
&self,
initial_inflight_infos: impl Iterator<
Item = (DatabaseId, &InflightSubscriptionInfo, &InflightDatabaseInfo),
>,
) -> PbInitRequest {
PbInitRequest {
databases: initial_subscriptions
.map(|(database_id, info)| PbDatabaseInitialPartialGraph {
database_id: database_id.database_id,
graphs: vec![PbInitialPartialGraph {
partial_graph_id: to_partial_graph_id(None),
subscriptions: info.into_iter().collect_vec(),
}],
})
databases: initial_inflight_infos
.map(
|(database_id, subscriptions, inflight_info)| PbDatabaseInitialPartialGraph {
database_id: database_id.database_id,
graphs: vec![PbInitialPartialGraph {
partial_graph_id: to_partial_graph_id(None),
subscriptions: subscriptions.into_iter().collect_vec(),
actor_infos: inflight_info
.fragment_infos()
.flat_map(|fragment| {
fragment.actors.iter().map(|(actor_id, worker_id)| {
let host_addr = self
.nodes
.get(worker_id)
.expect("worker should exist for inflight actor")
.worker
.host
.clone()
.expect("should exist");
ActorInfo {
actor_id: *actor_id,
host: Some(host_addr),
}
})
})
.collect(),
}],
},
)
.collect(),
}
}
Expand Down Expand Up @@ -368,7 +393,7 @@ impl ControlStreamManager {
info.fragment_infos(),
info.fragment_infos(),
Some(node_actors),
vec![],
(&subscription_info).into_iter().collect(),
vec![],
)?;
debug!(
Expand Down
9 changes: 4 additions & 5 deletions src/meta/src/barrier/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::collections::HashMap;
use std::mem::replace;
use std::sync::{Arc, LazyLock};
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
Expand All @@ -41,7 +41,7 @@ use crate::barrier::rpc::{merge_node_rpc_errors, ControlStreamManager};
use crate::barrier::schedule::PeriodicBarriers;
use crate::barrier::{
schedule, BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot,
InflightSubscriptionInfo, RecoveryReason,
RecoveryReason,
};
use crate::error::MetaErrorInner;
use crate::hummock::HummockManagerRef;
Expand Down Expand Up @@ -281,7 +281,7 @@ impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
info!(?changed_worker, "worker changed");

if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker {
self.control_stream_manager.add_worker(node, self.checkpoint_control.subscriptions(), &*self.context).await;
self.control_stream_manager.add_worker(node, self.checkpoint_control.inflight_infos(), &*self.context).await;
}
}

Expand Down Expand Up @@ -644,10 +644,8 @@ impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {

let mut control_stream_manager = ControlStreamManager::new(self.env.clone());
let reset_start_time = Instant::now();
let empty_subscriptions = LazyLock::new(InflightSubscriptionInfo::default);
control_stream_manager
.reset(
database_fragment_infos.keys().map(|database_id| (*database_id, subscription_infos.get(database_id).unwrap_or_else(|| &*empty_subscriptions))),
active_streaming_nodes.current(),
&*self.context,
)
Expand All @@ -661,6 +659,7 @@ impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
let mut collected_databases = HashMap::new();
let mut collecting_databases = HashMap::new();
for (database_id, info) in database_fragment_infos {
control_stream_manager.add_partial_graph(database_id, None)?;
let (node_to_collect, database, prev_epoch) = control_stream_manager.inject_database_initial_barrier(
database_id,
info,
Expand Down
12 changes: 5 additions & 7 deletions src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1199,13 +1199,11 @@ mod tests {
let (old_simple, new_simple) = (114, 514); // simple downstream actors

// 1. Register info in context.
{
let mut actor_infos = ctx.actor_infos.write();

for local_actor_id in [actor_id, untouched, old, new, old_simple, new_simple] {
actor_infos.insert(local_actor_id, helper_make_local_actor(local_actor_id));
}
}
ctx.add_actors(
[actor_id, untouched, old, new, old_simple, new_simple]
.into_iter()
.map(helper_make_local_actor),
);
// actor_id -> untouched, old, new, old_simple, new_simple

let broadcast_dispatcher_id = 666;
Expand Down
12 changes: 5 additions & 7 deletions src/stream/src/executor/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -889,13 +889,11 @@ mod tests {
let metrics = Arc::new(StreamingMetrics::unused());

// 1. Register info in context.
{
let mut actor_infos = ctx.actor_infos.write();

for local_actor_id in [actor_id, untouched, old, new] {
actor_infos.insert(local_actor_id, helper_make_local_actor(local_actor_id));
}
}
ctx.add_actors(
[actor_id, untouched, old, new]
.into_iter()
.map(helper_make_local_actor),
);
// untouched -> actor_id
// old -> actor_id
// new -> actor_id
Expand Down
12 changes: 6 additions & 6 deletions src/stream/src/executor/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,13 @@ mod tests {
let metrics = Arc::new(StreamingMetrics::unused());

// 1. Register info in context.
{
let mut actor_infos = ctx.actor_infos.write();

for local_actor_id in [actor_id, old, new] {
actor_infos.insert(local_actor_id, helper_make_local_actor(local_actor_id));
}
}
ctx.add_actors(
[actor_id, old, new]
.into_iter()
.map(helper_make_local_actor),
);

// old -> actor_id
// new -> actor_id

Expand Down
10 changes: 3 additions & 7 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ impl LocalBarrierWorker {
match actor_op {
LocalActorOperation::NewControlStream { handle, init_request } => {
self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one"));
self.reset(init_request.databases).await;
self.reset(init_request).await;
self.control_stream_handle = handle;
self.control_stream_handle.send_response(streaming_control_stream_response::Response::Init(InitResponse {}));
}
Expand Down Expand Up @@ -470,7 +470,7 @@ impl LocalBarrierWorker {
let database_id = DatabaseId::new(req.database_id);
let result: StreamResult<()> = try {
let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?;
self.update_actor_info(database_id, req.broadcast_info.iter().cloned())?;
self.update_actor_info(database_id, req.broadcast_info.iter().cloned());
self.send_barrier(&barrier, req)?;
};
result.map_err(|e| (database_id, e))?;
Expand Down Expand Up @@ -904,11 +904,6 @@ impl LocalBarrierWorker {
);
}

/// Reset all internal states.
pub(super) fn reset_state(&mut self, initial_partial_graphs: Vec<DatabaseInitialPartialGraph>) {
*self = Self::new(self.actor_manager.clone(), initial_partial_graphs);
}

/// When some other failure happens (like failed to send barrier), the error is reported using
/// this function. The control stream will be responded with a message to notify about the error,
/// and the global barrier worker will later reset and rerun the database.
Expand Down Expand Up @@ -1219,6 +1214,7 @@ pub(crate) mod barrier_test_utils {
graphs: vec![PbInitialPartialGraph {
partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
subscriptions: vec![],
actor_infos: vec![],
}],
}],
},
Expand Down
7 changes: 7 additions & 0 deletions src/stream/src/task/barrier_manager/managed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,13 @@ impl ManagedBarrierState {
let database_id = DatabaseId::new(database.database_id);
assert!(!databases.contains_key(&database_id));
let shared_context = Arc::new(SharedContext::new(database_id, &actor_manager.env));
shared_context.add_actors(
database
.graphs
.iter()
.flat_map(|graph| graph.actor_infos.iter())
.cloned(),
);
let state = DatabaseManagedBarrierState::new(
database_id,
actor_manager.clone(),
Expand Down
24 changes: 22 additions & 2 deletions src/stream/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ pub struct SharedContext {
///
/// The channel serves as a buffer because `ExchangeServiceImpl`
/// is on the server-side and we will also introduce backpressure.
pub(crate) channel_map: Mutex<HashMap<UpDownActorIds, ConsumableChannelPair>>,
channel_map: Mutex<HashMap<UpDownActorIds, ConsumableChannelPair>>,

/// Stores all actor information.
pub(crate) actor_infos: RwLock<HashMap<ActorId, ActorInfo>>,
actor_infos: RwLock<HashMap<ActorId, ActorInfo>>,

/// Stores the local address.
///
Expand Down Expand Up @@ -206,6 +206,26 @@ impl SharedContext {
actor_infos.remove(actor_id);
}
}

pub(crate) fn add_actors(&self, new_actor_infos: impl Iterator<Item = ActorInfo>) {
let mut actor_infos = self.actor_infos.write();
for actor in new_actor_infos {
if let Some(prev_actor) = actor_infos.get(&actor.get_actor_id()) {
if cfg!(debug_assertions) {
panic!("duplicate actor info: {:?} {:?}", actor, actor_infos);
}
if prev_actor != &actor {
warn!(
?prev_actor,
?actor,
"add actor again but have different actor info. ignored"
);
}
} else {
actor_infos.insert(actor.get_actor_id(), actor);
}
}
}
}

/// Generate a globally unique executor id.
Expand Down
29 changes: 7 additions & 22 deletions src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,13 @@ use itertools::Itertools;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnId, DatabaseId, Field, Schema, TableId};
use risingwave_common::config::MetricLevel;
use risingwave_common::{bail, must_match};
use risingwave_common::must_match;
use risingwave_pb::common::ActorInfo;
use risingwave_pb::plan_common::StorageTableDesc;
use risingwave_pb::stream_plan;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{StreamActor, StreamNode, StreamScanNode, StreamScanType};
use risingwave_pb::stream_service::streaming_control_stream_request::{
DatabaseInitialPartialGraph, InitRequest,
};
use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest;
use risingwave_pb::stream_service::{
StreamingControlStreamRequest, StreamingControlStreamResponse,
};
Expand Down Expand Up @@ -256,7 +254,7 @@ impl LocalStreamManager {

impl LocalBarrierWorker {
/// Force stop all actors on this worker, and then drop their resources.
pub(super) async fn reset(&mut self, initial_partial_graphs: Vec<DatabaseInitialPartialGraph>) {
pub(super) async fn reset(&mut self, init_request: InitRequest) {
join_all(
self.state
.databases
Expand All @@ -275,7 +273,7 @@ impl LocalBarrierWorker {
.await
}
self.actor_manager.env.dml_manager_ref().clear();
self.reset_state(initial_partial_graphs);
*self = Self::new(self.actor_manager.clone(), init_request.databases);
}
}

Expand Down Expand Up @@ -749,26 +747,13 @@ impl LocalBarrierWorker {
&mut self,
database_id: DatabaseId,
new_actor_infos: impl Iterator<Item = ActorInfo>,
) -> StreamResult<()> {
let mut actor_infos = Self::get_or_insert_database_shared_context(
) {
Self::get_or_insert_database_shared_context(
&mut self.state.current_shared_context,
database_id,
&self.actor_manager,
)
.actor_infos
.write();
for actor in new_actor_infos {
if let Some(prev_actor) = actor_infos.get(&actor.get_actor_id())
&& &actor != prev_actor
{
bail!(
"actor info mismatch when broadcasting {}",
actor.get_actor_id()
);
}
actor_infos.insert(actor.get_actor_id(), actor);
}
Ok(())
.add_actors(new_actor_infos);
}
}

Expand Down

0 comments on commit 3de56f6

Please sign in to comment.