diff --git a/src/ctl/src/cmd_impl/meta.rs b/src/ctl/src/cmd_impl/meta.rs index 8920839c7b0de..2afeb1560a817 100644 --- a/src/ctl/src/cmd_impl/meta.rs +++ b/src/ctl/src/cmd_impl/meta.rs @@ -13,6 +13,7 @@ // limitations under the License. mod backup_meta; +mod check; mod cluster_info; mod connection; mod migration; @@ -21,6 +22,7 @@ mod reschedule; mod serving; pub use backup_meta::*; +pub use check::*; pub use cluster_info::*; pub use connection::*; pub use migration::*; diff --git a/src/ctl/src/cmd_impl/meta/check.rs b/src/ctl/src/cmd_impl/meta/check.rs new file mode 100644 index 0000000000000..5f62fb9450f50 --- /dev/null +++ b/src/ctl/src/cmd_impl/meta/check.rs @@ -0,0 +1,33 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::process::exit; + +use risingwave_meta::controller::catalog::CatalogController; +use sea_orm::TransactionTrait; + +pub async fn graph_check(endpoint: String) -> anyhow::Result<()> { + let conn = sea_orm::Database::connect(sea_orm::ConnectOptions::new(endpoint)).await?; + let txn = conn.begin().await?; + match CatalogController::graph_check(&txn).await { + Ok(_) => { + println!("integrity check passed!"); + exit(0); + } + Err(_) => { + println!("integrity check failed!"); + exit(1); + } + } +} diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 5cc0765e16c81..1a7951cfbbab0 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -514,6 +514,14 @@ enum MetaCommands { #[clap(short = 'f', long, default_value_t = false)] force_clean: bool, }, + + /// Performing graph check for scaling. + #[clap(verbatim_doc_comment)] + GraphCheck { + /// SQL endpoint + #[clap(long, required = true)] + endpoint: String, + }, } #[derive(Subcommand, Clone, Debug)] @@ -850,6 +858,9 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { Commands::Meta(MetaCommands::ValidateSource { props }) => { cmd_impl::meta::validate_source(context, props).await? } + Commands::Meta(MetaCommands::GraphCheck { endpoint }) => { + cmd_impl::meta::graph_check(endpoint).await? + } Commands::Meta(MetaCommands::Migration { etcd_endpoints, etcd_user_password, @@ -881,6 +892,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { Commands::Profile(ProfileCommands::Heap { dir }) => { cmd_impl::profile::heap_profile(context, dir).await? } + Commands::Scale(ScaleCommands::Cordon { workers }) => { cmd_impl::scale::update_schedulability(context, workers, Schedulability::Unschedulable) .await? diff --git a/src/frontend/planner_test/tests/testdata/output/agg.yaml b/src/frontend/planner_test/tests/testdata/output/agg.yaml index f2048707eda61..d0bf79ceb83f0 100644 --- a/src/frontend/planner_test/tests/testdata/output/agg.yaml +++ b/src/frontend/planner_test/tests/testdata/output/agg.yaml @@ -1291,39 +1291,36 @@ create table t (v1 int); select stddev_samp(v1), stddev_pop(v1) from t; logical_plan: |- - LogicalProject { exprs: [Case((count(t.v1) <= 1:Int64), null:Decimal, Sqrt(((sum($expr1)::Decimal - ((sum(t.v1)::Decimal * sum(t.v1)::Decimal) / count(t.v1)::Decimal)) / (count(t.v1) - 1:Int64)::Decimal))) as $expr2, Sqrt(((sum($expr1)::Decimal - ((sum(t.v1)::Decimal * sum(t.v1)::Decimal) / count(t.v1)::Decimal)) / count(t.v1)::Decimal)) as $expr3] } + LogicalProject { exprs: [Case((count(t.v1) <= 1:Int32), null:Decimal, Sqrt((Greatest((sum($expr1)::Decimal - ((sum(t.v1)::Decimal * sum(t.v1)::Decimal) / count(t.v1)::Decimal)), 0:Int32::Decimal) / (count(t.v1) - 1:Int32)::Decimal))) as $expr2, Case((count(t.v1) = 0:Int32), null:Decimal, Sqrt((Greatest((sum($expr1)::Decimal - ((sum(t.v1)::Decimal * sum(t.v1)::Decimal) / count(t.v1)::Decimal)), 0:Int32::Decimal) / count(t.v1)::Decimal))) as $expr3] } └─LogicalAgg { aggs: [sum($expr1), sum(t.v1), count(t.v1)] } └─LogicalProject { exprs: [(t.v1 * t.v1) as $expr1, t.v1] } └─LogicalScan { table: t, columns: [t.v1, t._row_id] } batch_plan: |- - BatchProject { exprs: [Case((sum0(count(t.v1)) <= 1:Int64), null:Decimal, Sqrt(((sum(sum($expr1))::Decimal - (($expr2 * $expr2) / $expr3)) / (sum0(count(t.v1)) - 1:Int64)::Decimal))) as $expr4, Sqrt(((sum(sum($expr1))::Decimal - (($expr2 * $expr2) / $expr3)) / $expr3)) as $expr5] } - └─BatchProject { exprs: [sum(sum($expr1)), sum(sum(t.v1)), sum0(count(t.v1)), sum(sum(t.v1))::Decimal as $expr2, sum0(count(t.v1))::Decimal as $expr3] } - └─BatchSimpleAgg { aggs: [sum(sum($expr1)), sum(sum(t.v1)), sum0(count(t.v1))] } - └─BatchExchange { order: [], dist: Single } - └─BatchSimpleAgg { aggs: [sum($expr1), sum(t.v1), count(t.v1)] } - └─BatchProject { exprs: [(t.v1 * t.v1) as $expr1, t.v1] } - └─BatchScan { table: t, columns: [t.v1], distribution: SomeShard } - batch_local_plan: |- - BatchProject { exprs: [Case((count(t.v1) <= 1:Int64), null:Decimal, Sqrt(((sum($expr1)::Decimal - (($expr2 * $expr2) / $expr3)) / (count(t.v1) - 1:Int64)::Decimal))) as $expr4, Sqrt(((sum($expr1)::Decimal - (($expr2 * $expr2) / $expr3)) / $expr3)) as $expr5] } - └─BatchProject { exprs: [sum($expr1), sum(t.v1), count(t.v1), sum(t.v1)::Decimal as $expr2, count(t.v1)::Decimal as $expr3] } - └─BatchSimpleAgg { aggs: [sum($expr1), sum(t.v1), count(t.v1)] } - └─BatchExchange { order: [], dist: Single } + BatchProject { exprs: [Case((sum0(count(t.v1)) <= 1:Int32), null:Decimal, Sqrt((Greatest((sum(sum($expr1))::Decimal - ((sum(sum(t.v1))::Decimal * sum(sum(t.v1))::Decimal) / sum0(count(t.v1))::Decimal)), 0:Decimal) / (sum0(count(t.v1)) - 1:Int32)::Decimal))) as $expr2, Case((sum0(count(t.v1)) = 0:Int32), null:Decimal, Sqrt((Greatest((sum(sum($expr1))::Decimal - ((sum(sum(t.v1))::Decimal * sum(sum(t.v1))::Decimal) / sum0(count(t.v1))::Decimal)), 0:Decimal) / sum0(count(t.v1))::Decimal))) as $expr3] } + └─BatchSimpleAgg { aggs: [sum(sum($expr1)), sum(sum(t.v1)), sum0(count(t.v1))] } + └─BatchExchange { order: [], dist: Single } + └─BatchSimpleAgg { aggs: [sum($expr1), sum(t.v1), count(t.v1)] } └─BatchProject { exprs: [(t.v1 * t.v1) as $expr1, t.v1] } └─BatchScan { table: t, columns: [t.v1], distribution: SomeShard } + batch_local_plan: |- + BatchProject { exprs: [Case((count(t.v1) <= 1:Int32), null:Decimal, Sqrt((Greatest((sum($expr1)::Decimal - ((sum(t.v1)::Decimal * sum(t.v1)::Decimal) / count(t.v1)::Decimal)), 0:Decimal) / (count(t.v1) - 1:Int32)::Decimal))) as $expr2, Case((count(t.v1) = 0:Int32), null:Decimal, Sqrt((Greatest((sum($expr1)::Decimal - ((sum(t.v1)::Decimal * sum(t.v1)::Decimal) / count(t.v1)::Decimal)), 0:Decimal) / count(t.v1)::Decimal))) as $expr3] } + └─BatchSimpleAgg { aggs: [sum($expr1), sum(t.v1), count(t.v1)] } + └─BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [(t.v1 * t.v1) as $expr1, t.v1] } + └─BatchScan { table: t, columns: [t.v1], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [stddev_samp, stddev_pop], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [Case((sum0(count(t.v1)) <= 1:Int64), null:Decimal, Sqrt(((sum(sum($expr1))::Decimal - (($expr2 * $expr2) / $expr3)) / (sum0(count(t.v1)) - 1:Int64)::Decimal))) as $expr4, Sqrt(((sum(sum($expr1))::Decimal - (($expr2 * $expr2) / $expr3)) / $expr3)) as $expr5] } - └─StreamProject { exprs: [sum(sum($expr1)), sum(sum(t.v1)), sum0(count(t.v1)), sum(sum(t.v1))::Decimal as $expr2, sum0(count(t.v1))::Decimal as $expr3] } - └─StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum(t.v1)), sum0(count(t.v1)), count] } - └─StreamExchange { dist: Single } - └─StreamStatelessSimpleAgg { aggs: [sum($expr1), sum(t.v1), count(t.v1)] } - └─StreamProject { exprs: [(t.v1 * t.v1) as $expr1, t.v1, t._row_id] } - └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProject { exprs: [Case((sum0(count(t.v1)) <= 1:Int32), null:Decimal, Sqrt((Greatest((sum(sum($expr1))::Decimal - ((sum(sum(t.v1))::Decimal * sum(sum(t.v1))::Decimal) / sum0(count(t.v1))::Decimal)), 0:Decimal) / (sum0(count(t.v1)) - 1:Int32)::Decimal))) as $expr2, Case((sum0(count(t.v1)) = 0:Int32), null:Decimal, Sqrt((Greatest((sum(sum($expr1))::Decimal - ((sum(sum(t.v1))::Decimal * sum(sum(t.v1))::Decimal) / sum0(count(t.v1))::Decimal)), 0:Decimal) / sum0(count(t.v1))::Decimal))) as $expr3] } + └─StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum(t.v1)), sum0(count(t.v1)), count] } + └─StreamExchange { dist: Single } + └─StreamStatelessSimpleAgg { aggs: [sum($expr1), sum(t.v1), count(t.v1)] } + └─StreamProject { exprs: [(t.v1 * t.v1) as $expr1, t.v1, t._row_id] } + └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - name: stddev_samp with other columns sql: | select count(''), stddev_samp(1); logical_plan: |- - LogicalProject { exprs: [count('':Varchar), Case((count(1:Int32) <= 1:Int64), null:Decimal, Sqrt(((sum($expr1)::Decimal - ((sum(1:Int32)::Decimal * sum(1:Int32)::Decimal) / count(1:Int32)::Decimal)) / (count(1:Int32) - 1:Int64)::Decimal))) as $expr2] } + LogicalProject { exprs: [count('':Varchar), Case((count(1:Int32) <= 1:Int32), null:Decimal, Sqrt((Greatest((sum($expr1)::Decimal - ((sum(1:Int32)::Decimal * sum(1:Int32)::Decimal) / count(1:Int32)::Decimal)), 0:Int32::Decimal) / (count(1:Int32) - 1:Int32)::Decimal))) as $expr2] } └─LogicalAgg { aggs: [count('':Varchar), sum($expr1), sum(1:Int32), count(1:Int32)] } └─LogicalProject { exprs: ['':Varchar, (1:Int32 * 1:Int32) as $expr1, 1:Int32] } └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } @@ -1332,7 +1329,7 @@ create table t(v int, w float); select stddev_samp(v) from t group by w; logical_plan: |- - LogicalProject { exprs: [Case((count(t.v) <= 1:Int64), null:Decimal, Sqrt(((sum($expr1)::Decimal - ((sum(t.v)::Decimal * sum(t.v)::Decimal) / count(t.v)::Decimal)) / (count(t.v) - 1:Int64)::Decimal))) as $expr2] } + LogicalProject { exprs: [Case((count(t.v) <= 1:Int32), null:Decimal, Sqrt((Greatest((sum($expr1)::Decimal - ((sum(t.v)::Decimal * sum(t.v)::Decimal) / count(t.v)::Decimal)), 0:Int32::Decimal) / (count(t.v) - 1:Int32)::Decimal))) as $expr2] } └─LogicalAgg { group_key: [t.w], aggs: [sum($expr1), sum(t.v), count(t.v)] } └─LogicalProject { exprs: [t.w, (t.v * t.v) as $expr1, t.v] } └─LogicalScan { table: t, columns: [t.v, t.w, t._row_id] } diff --git a/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml b/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml index abbc0aae184e0..cb22396ac6fe5 100644 --- a/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml +++ b/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml @@ -60,26 +60,20 @@ create table t(v int); select stddev_pop(v), stddev_samp(v), var_pop(v), var_samp(v) from t; batch_plan: |- - BatchProject { exprs: [Sqrt($expr5) as $expr6, Case((sum0(count(t.v)) <= 1:Int64), null:Decimal, Sqrt(($expr4 / (sum0(count(t.v)) - 1:Int64)::Decimal))) as $expr7, $expr5, Case((sum0(count(t.v)) <= 1:Int64), null:Decimal, ($expr4 / (sum0(count(t.v)) - 1:Int64)::Decimal)) as $expr8] } - └─BatchProject { exprs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v)), ($expr4 / $expr3) as $expr5, $expr4] } - └─BatchProject { exprs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v)), (sum(sum($expr1))::Decimal - (($expr2 * $expr2) / $expr3)) as $expr4, $expr3] } - └─BatchProject { exprs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v)), sum(sum(t.v))::Decimal as $expr2, sum0(count(t.v))::Decimal as $expr3] } - └─BatchSimpleAgg { aggs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v))] } - └─BatchExchange { order: [], dist: Single } - └─BatchSimpleAgg { aggs: [sum($expr1), sum(t.v), count(t.v)] } - └─BatchProject { exprs: [(t.v * t.v) as $expr1, t.v] } - └─BatchScan { table: t, columns: [t.v], distribution: SomeShard } + BatchProject { exprs: [Case((sum0(count(t.v)) = 0:Int32), null:Decimal, Sqrt((Greatest((sum(sum($expr1))::Decimal - ((sum(sum(t.v))::Decimal * sum(sum(t.v))::Decimal) / sum0(count(t.v))::Decimal)), 0:Decimal) / sum0(count(t.v))::Decimal))) as $expr2, Case((sum0(count(t.v)) <= 1:Int32), null:Decimal, Sqrt((Greatest((sum(sum($expr1))::Decimal - ((sum(sum(t.v))::Decimal * sum(sum(t.v))::Decimal) / sum0(count(t.v))::Decimal)), 0:Decimal) / (sum0(count(t.v)) - 1:Int32)::Decimal))) as $expr3, Case((sum0(count(t.v)) = 0:Int32), null:Decimal, (Greatest((sum(sum($expr1))::Decimal - ((sum(sum(t.v))::Decimal * sum(sum(t.v))::Decimal) / sum0(count(t.v))::Decimal)), 0:Decimal) / sum0(count(t.v))::Decimal)) as $expr4, Case((sum0(count(t.v)) <= 1:Int32), null:Decimal, (Greatest((sum(sum($expr1))::Decimal - ((sum(sum(t.v))::Decimal * sum(sum(t.v))::Decimal) / sum0(count(t.v))::Decimal)), 0:Decimal) / (sum0(count(t.v)) - 1:Int32)::Decimal)) as $expr5] } + └─BatchSimpleAgg { aggs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v))] } + └─BatchExchange { order: [], dist: Single } + └─BatchSimpleAgg { aggs: [sum($expr1), sum(t.v), count(t.v)] } + └─BatchProject { exprs: [(t.v * t.v) as $expr1, t.v] } + └─BatchScan { table: t, columns: [t.v], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [stddev_pop, stddev_samp, var_pop, var_samp], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [Sqrt($expr5) as $expr6, Case((sum0(count(t.v)) <= 1:Int64), null:Decimal, Sqrt(($expr4 / (sum0(count(t.v)) - 1:Int64)::Decimal))) as $expr7, $expr5, Case((sum0(count(t.v)) <= 1:Int64), null:Decimal, ($expr4 / (sum0(count(t.v)) - 1:Int64)::Decimal)) as $expr8] } - └─StreamProject { exprs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v)), ($expr4 / $expr3) as $expr5, $expr4] } - └─StreamProject { exprs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v)), (sum(sum($expr1))::Decimal - (($expr2 * $expr2) / $expr3)) as $expr4, $expr3] } - └─StreamProject { exprs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v)), sum(sum(t.v))::Decimal as $expr2, sum0(count(t.v))::Decimal as $expr3] } - └─StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v)), count] } - └─StreamExchange { dist: Single } - └─StreamStatelessSimpleAgg { aggs: [sum($expr1), sum(t.v), count(t.v)] } - └─StreamProject { exprs: [(t.v * t.v) as $expr1, t.v, t._row_id] } - └─StreamTableScan { table: t, columns: [t.v, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProject { exprs: [Case((sum0(count(t.v)) = 0:Int32), null:Decimal, Sqrt((Greatest((sum(sum($expr1))::Decimal - ((sum(sum(t.v))::Decimal * sum(sum(t.v))::Decimal) / sum0(count(t.v))::Decimal)), 0:Decimal) / sum0(count(t.v))::Decimal))) as $expr2, Case((sum0(count(t.v)) <= 1:Int32), null:Decimal, Sqrt((Greatest((sum(sum($expr1))::Decimal - ((sum(sum(t.v))::Decimal * sum(sum(t.v))::Decimal) / sum0(count(t.v))::Decimal)), 0:Decimal) / (sum0(count(t.v)) - 1:Int32)::Decimal))) as $expr3, Case((sum0(count(t.v)) = 0:Int32), null:Decimal, (Greatest((sum(sum($expr1))::Decimal - ((sum(sum(t.v))::Decimal * sum(sum(t.v))::Decimal) / sum0(count(t.v))::Decimal)), 0:Decimal) / sum0(count(t.v))::Decimal)) as $expr4, Case((sum0(count(t.v)) <= 1:Int32), null:Decimal, (Greatest((sum(sum($expr1))::Decimal - ((sum(sum(t.v))::Decimal * sum(sum(t.v))::Decimal) / sum0(count(t.v))::Decimal)), 0:Decimal) / (sum0(count(t.v)) - 1:Int32)::Decimal)) as $expr5] } + └─StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v)), count] } + └─StreamExchange { dist: Single } + └─StreamStatelessSimpleAgg { aggs: [sum($expr1), sum(t.v), count(t.v)] } + └─StreamProject { exprs: [(t.v * t.v) as $expr1, t.v, t._row_id] } + └─StreamTableScan { table: t, columns: [t.v, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - name: Common sub expression shouldn't extract partial expression of `some`/`all`. See 11766 sql: | with t(v, arr) as (select 1, array[2, 3]) select v < all(arr), v < some(arr) from t; diff --git a/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml b/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml index 3449606ad97ba..81c2797061f88 100644 --- a/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml +++ b/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml @@ -325,33 +325,31 @@ logical_plan: |- LogicalProject { exprs: [t.x, t.y, t.z, $expr4, $expr5] } └─LogicalFilter { predicate: (t.z > 0:Int32) AND (t.y > 0:Int32) AND (t.x > 0:Int32) AND ($expr4 <= 3.0:Decimal) AND ($expr5 > 1.0:Decimal) } - └─LogicalProject { exprs: [t.x, t.y, t.z, Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / count::Decimal)) as $expr4, Case((count <= 1:Int64), null:Decimal, Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / (count - 1:Int64)::Decimal))) as $expr5] } + └─LogicalProject { exprs: [t.x, t.y, t.z, Case((count = 0:Int32), null:Decimal, Sqrt((Greatest((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)), 0:Int32::Decimal) / count::Decimal))) as $expr4, Case((count <= 1:Int32), null:Decimal, Sqrt((Greatest((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)), 0:Int32::Decimal) / (count - 1:Int32)::Decimal))) as $expr5] } └─LogicalOverWindow { window_functions: [sum($expr1) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum($expr2) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count($expr2) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum($expr3) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum(t.x) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count(t.x) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } └─LogicalProject { exprs: [t.x, t.y, t.z, t.w, t._row_id, ((t.x - t.y) * (t.x - t.y)) as $expr1, (t.x - t.y) as $expr2, (t.x * t.x) as $expr3] } └─LogicalScan { table: t, columns: [t.x, t.y, t.z, t.w, t._row_id] } batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchProject { exprs: [t.x, t.y, t.z, Sqrt(((sum::Decimal - (($expr4 * $expr4) / $expr5)) / $expr5)) as $expr6, Case((count <= 1:Int64), null:Decimal, Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / (count - 1:Int64)::Decimal))) as $expr7] } - └─BatchProject { exprs: [t.x, t.y, t.z, $expr2, $expr1, $expr3, sum, sum, count, sum, sum, count, sum::Decimal as $expr4, count::Decimal as $expr5] } - └─BatchFilter { predicate: (t.y > 0:Int32) AND (t.x > 0:Int32) AND (Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / count::Decimal)) <= 3.0:Decimal) AND (Case((count <= 1:Int64), null:Decimal, Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / (count - 1:Int64)::Decimal))) > 1.0:Decimal) } - └─BatchOverWindow { window_functions: [sum($expr2) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum($expr1) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count($expr1) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum($expr3) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum(t.x) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count(t.x) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } - └─BatchExchange { order: [t.z ASC, t.x ASC], dist: HashShard(t.z) } - └─BatchSort { order: [t.z ASC, t.x ASC] } - └─BatchProject { exprs: [t.x, t.y, t.z, ($expr1 * $expr1) as $expr2, $expr1, (t.x * t.x) as $expr3] } - └─BatchProject { exprs: [t.x, t.y, t.z, (t.x - t.y) as $expr1] } - └─BatchFilter { predicate: (t.z > 0:Int32) } - └─BatchScan { table: t, columns: [t.x, t.y, t.z], distribution: SomeShard } + └─BatchProject { exprs: [t.x, t.y, t.z, Case((count = 0:Int32), null:Decimal, Sqrt((Greatest((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)), 0:Decimal) / count::Decimal))) as $expr4, Case((count <= 1:Int32), null:Decimal, Sqrt((Greatest((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)), 0:Decimal) / (count - 1:Int32)::Decimal))) as $expr5] } + └─BatchFilter { predicate: (t.y > 0:Int32) AND (t.x > 0:Int32) AND (Case((count = 0:Int32), null:Decimal, Sqrt((Greatest((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)), 0:Decimal) / count::Decimal))) <= 3.0:Decimal) AND (Case((count <= 1:Int32), null:Decimal, Sqrt((Greatest((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)), 0:Decimal) / (count - 1:Int32)::Decimal))) > 1.0:Decimal) } + └─BatchOverWindow { window_functions: [sum($expr2) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum($expr1) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count($expr1) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum($expr3) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum(t.x) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count(t.x) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─BatchExchange { order: [t.z ASC, t.x ASC], dist: HashShard(t.z) } + └─BatchSort { order: [t.z ASC, t.x ASC] } + └─BatchProject { exprs: [t.x, t.y, t.z, ($expr1 * $expr1) as $expr2, $expr1, (t.x * t.x) as $expr3] } + └─BatchProject { exprs: [t.x, t.y, t.z, (t.x - t.y) as $expr1] } + └─BatchFilter { predicate: (t.z > 0:Int32) } + └─BatchScan { table: t, columns: [t.x, t.y, t.z], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [x, y, z, res0, res1, t._row_id(hidden)], stream_key: [t._row_id, z], pk_columns: [t._row_id, z], pk_conflict: NoCheck } - └─StreamProject { exprs: [t.x, t.y, t.z, Sqrt(((sum::Decimal - (($expr4 * $expr4) / $expr5)) / $expr5)) as $expr6, Case((count <= 1:Int64), null:Decimal, Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / (count - 1:Int64)::Decimal))) as $expr7, t._row_id] } - └─StreamProject { exprs: [t.x, t.y, t.z, $expr2, $expr1, $expr3, sum, sum, count, sum, sum, count, sum::Decimal as $expr4, count::Decimal as $expr5, t._row_id] } - └─StreamFilter { predicate: (t.y > 0:Int32) AND (t.x > 0:Int32) AND (Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / count::Decimal)) <= 3.0:Decimal) AND (Case((count <= 1:Int64), null:Decimal, Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / (count - 1:Int64)::Decimal))) > 1.0:Decimal) } - └─StreamOverWindow { window_functions: [sum($expr2) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum($expr1) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count($expr1) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum($expr3) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum(t.x) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count(t.x) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } - └─StreamExchange { dist: HashShard(t.z) } - └─StreamProject { exprs: [t.x, t.y, t.z, ($expr1 * $expr1) as $expr2, $expr1, (t.x * t.x) as $expr3, t._row_id] } - └─StreamProject { exprs: [t.x, t.y, t.z, (t.x - t.y) as $expr1, t._row_id] } - └─StreamFilter { predicate: (t.z > 0:Int32) } - └─StreamTableScan { table: t, columns: [t.x, t.y, t.z, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProject { exprs: [t.x, t.y, t.z, Case((count = 0:Int32), null:Decimal, Sqrt((Greatest((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)), 0:Decimal) / count::Decimal))) as $expr4, Case((count <= 1:Int32), null:Decimal, Sqrt((Greatest((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)), 0:Decimal) / (count - 1:Int32)::Decimal))) as $expr5, t._row_id] } + └─StreamFilter { predicate: (t.y > 0:Int32) AND (t.x > 0:Int32) AND (Case((count = 0:Int32), null:Decimal, Sqrt((Greatest((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)), 0:Decimal) / count::Decimal))) <= 3.0:Decimal) AND (Case((count <= 1:Int32), null:Decimal, Sqrt((Greatest((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)), 0:Decimal) / (count - 1:Int32)::Decimal))) > 1.0:Decimal) } + └─StreamOverWindow { window_functions: [sum($expr2) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum($expr1) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count($expr1) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum($expr3) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum(t.x) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count(t.x) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─StreamExchange { dist: HashShard(t.z) } + └─StreamProject { exprs: [t.x, t.y, t.z, ($expr1 * $expr1) as $expr2, $expr1, (t.x * t.x) as $expr3, t._row_id] } + └─StreamProject { exprs: [t.x, t.y, t.z, (t.x - t.y) as $expr1, t._row_id] } + └─StreamFilter { predicate: (t.z > 0:Int32) } + └─StreamTableScan { table: t, columns: [t.x, t.y, t.z, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - id: aggregate with expression in func arguments and over clause sql: | create table t(x int, y int, z int, w int); diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index b0ad102ee693c..1e9e4b8102c3f 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -14,7 +14,7 @@ use fixedbitset::FixedBitSet; use itertools::Itertools; -use risingwave_common::types::{DataType, Datum, ScalarImpl}; +use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_common::{bail, bail_not_implemented, not_implemented}; use risingwave_expr::aggregate::{agg_kinds, AggKind, PbAggKind}; @@ -685,17 +685,15 @@ impl LogicalAggBuilder { agg_call.direct_args.clone(), )?)?); - let one = ExprImpl::from(Literal::new( - Datum::from(ScalarImpl::Int64(1)), - DataType::Int64, - )); + let zero = ExprImpl::literal_int(0); + let one = ExprImpl::literal_int(1); let squared_sum = ExprImpl::from(FunctionCall::new( ExprType::Multiply, vec![sum.clone(), sum], )?); - let numerator = ExprImpl::from(FunctionCall::new( + let raw_numerator = ExprImpl::from(FunctionCall::new( ExprType::Subtract, vec![ sum_of_sq, @@ -706,6 +704,13 @@ impl LogicalAggBuilder { ], )?); + // We need to check for potential accuracy issues that may occasionally lead to results less than 0. + let numerator_type = raw_numerator.return_type(); + let numerator = ExprImpl::from(FunctionCall::new( + ExprType::Greatest, + vec![raw_numerator, zero.clone().cast_explicit(numerator_type)?], + )?); + let denominator = match kind { PbAggKind::VarPop | PbAggKind::StddevPop => count.clone(), PbAggKind::VarSamp | PbAggKind::StddevSamp => ExprImpl::from( @@ -723,22 +728,21 @@ impl LogicalAggBuilder { target = ExprImpl::from(FunctionCall::new(ExprType::Sqrt, vec![target])?); } - match kind { - PbAggKind::VarPop | PbAggKind::StddevPop => Ok(target), - PbAggKind::StddevSamp | PbAggKind::VarSamp => { - let case_cond = ExprImpl::from(FunctionCall::new( - ExprType::LessThanOrEqual, - vec![count, one], - )?); - let null = ExprImpl::from(Literal::new(None, agg_call.return_type())); - - Ok(ExprImpl::from(FunctionCall::new( - ExprType::Case, - vec![case_cond, null, target], - )?)) + let null = ExprImpl::from(Literal::new(None, agg_call.return_type())); + let case_cond = match kind { + PbAggKind::VarPop | PbAggKind::StddevPop => { + ExprImpl::from(FunctionCall::new(ExprType::Equal, vec![count, zero])?) } + PbAggKind::VarSamp | PbAggKind::StddevSamp => ExprImpl::from( + FunctionCall::new(ExprType::LessThanOrEqual, vec![count, one])?, + ), _ => unreachable!(), - } + }; + + Ok(ExprImpl::from(FunctionCall::new( + ExprType::Case, + vec![case_cond, null, target], + )?)) } AggKind::Builtin(PbAggKind::ApproxPercentile) => { if agg_call.order_by.sort_exprs[0].order_type == OrderType::descending() { diff --git a/src/meta/src/controller/scale.rs b/src/meta/src/controller/scale.rs index ec329061237c9..a14ad3bc67377 100644 --- a/src/meta/src/controller/scale.rs +++ b/src/meta/src/controller/scale.rs @@ -13,20 +13,28 @@ // limitations under the License. use std::collections::{HashMap, HashSet}; +use std::ops::{BitAnd, BitOrAssign}; use itertools::Itertools; +use risingwave_common::bitmap::Bitmap; +use risingwave_common::hash; +use risingwave_common::hash::VirtualNode; +use risingwave_connector::source::{SplitImpl, SplitMetaData}; use risingwave_meta_model_migration::{ Alias, CommonTableExpression, Expr, IntoColumnRef, QueryStatementBuilder, SelectStatement, UnionType, WithClause, WithQuery, }; +use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::actor_dispatcher::DispatcherType; +use risingwave_meta_model_v2::fragment::DistributionType; use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, StreamingJob}; use risingwave_meta_model_v2::{ - actor, actor_dispatcher, fragment, streaming_job, ActorId, FragmentId, ObjectId, + actor, actor_dispatcher, fragment, streaming_job, ActorId, ActorMapping, ActorUpstreamActors, + ConnectorSplits, FragmentId, I32Array, ObjectId, VnodeBitmap, }; use sea_orm::{ - ColumnTrait, ConnectionTrait, DbErr, EntityTrait, JoinType, QueryFilter, QuerySelect, - RelationTrait, Statement, TransactionTrait, + ColumnTrait, ConnectionTrait, DbErr, DerivePartialModel, EntityTrait, FromQueryResult, + JoinType, QueryFilter, QuerySelect, RelationTrait, Statement, TransactionTrait, }; use crate::controller::catalog::CatalogController; @@ -428,3 +436,484 @@ impl CatalogController { }) } } + +macro_rules! crit_check_in_loop { + ($flag:expr, $condition:expr, $message:expr) => { + if !$condition { + tracing::error!("Integrity check failed: {}", $message); + $flag = true; + continue; + } + }; +} + +impl CatalogController { + pub async fn graph_check(txn: &C) -> MetaResult<()> + where + C: ConnectionTrait, + { + #[derive(Clone, DerivePartialModel, FromQueryResult)] + #[sea_orm(entity = "ActorDispatcher")] + pub struct PartialActorDispatcher { + pub id: i32, + pub actor_id: ActorId, + pub dispatcher_type: DispatcherType, + pub hash_mapping: Option, + pub dispatcher_id: FragmentId, + pub downstream_actor_ids: I32Array, + } + + #[derive(Clone, DerivePartialModel, FromQueryResult)] + #[sea_orm(entity = "Fragment")] + pub struct PartialFragment { + pub fragment_id: FragmentId, + pub distribution_type: DistributionType, + pub upstream_fragment_id: I32Array, + } + + #[derive(Clone, DerivePartialModel, FromQueryResult)] + #[sea_orm(entity = "Actor")] + pub struct PartialActor { + pub actor_id: ActorId, + pub fragment_id: FragmentId, + pub status: ActorStatus, + pub splits: Option, + pub upstream_actor_ids: ActorUpstreamActors, + pub vnode_bitmap: Option, + } + + let mut flag = false; + + let fragments: Vec = + Fragment::find().into_partial_model().all(txn).await?; + + let fragment_map: HashMap<_, _> = fragments + .into_iter() + .map(|fragment| (fragment.fragment_id, fragment)) + .collect(); + + let actors: Vec = Actor::find().into_partial_model().all(txn).await?; + + let mut fragment_actors = HashMap::new(); + for actor in &actors { + fragment_actors + .entry(actor.fragment_id) + .or_insert(HashSet::new()) + .insert(actor.actor_id); + } + + let actor_map: HashMap<_, _> = actors + .into_iter() + .map(|actor| (actor.actor_id, actor)) + .collect(); + + let actor_dispatchers: Vec = ActorDispatcher::find() + .into_partial_model() + .all(txn) + .await?; + + let mut discovered_upstream_fragments = HashMap::new(); + let mut discovered_upstream_actors = HashMap::new(); + + for (fragment_id, actor_ids) in &fragment_actors { + crit_check_in_loop!( + flag, + fragment_map.contains_key(fragment_id), + format!("Fragment {fragment_id} has actors {actor_ids:?} which does not exist",) + ); + + let mut split_map = HashMap::new(); + for actor_id in actor_ids { + let actor = &actor_map[actor_id]; + + if let Some(splits) = &actor.splits { + for split in splits.to_protobuf().splits { + let Ok(split_impl) = SplitImpl::try_from(&split) else { + continue; + }; + + let dup_split_actor = split_map.insert(split_impl.id(), actor_id); + crit_check_in_loop!( + flag, + dup_split_actor.is_none(), + format!( + "Fragment {fragment_id} actor {actor_id} has duplicate split {split:?} from actor {dup_split_actor:?}", + ) + ); + } + } + } + + let fragment = &fragment_map[fragment_id]; + + match fragment.distribution_type { + DistributionType::Single => { + crit_check_in_loop!( + flag, + actor_ids.len() == 1, + format!( + "Fragment {fragment_id} has more than one actors {actor_ids:?} for single distribution type", + ) + ); + + let actor_id = actor_ids.iter().exactly_one().unwrap(); + let actor = &actor_map[actor_id]; + + crit_check_in_loop!( + flag, + actor.vnode_bitmap.is_none(), + format!( + "Fragment {fragment_id} actor {actor_id} has vnode_bitmap set for single distribution type", + ) + ); + } + DistributionType::Hash => { + crit_check_in_loop!( + flag, + !actor_ids.is_empty(), + format!( + "Fragment {fragment_id} has less than one actors {actor_ids:?} for hash distribution type", + ) + ); + + let fragment_vnode_count = VirtualNode::COUNT; + + let mut result_bitmap = Bitmap::zeros(fragment_vnode_count); + + for actor_id in actor_ids { + let actor = &actor_map[actor_id]; + + crit_check_in_loop!( + flag, + actor.vnode_bitmap.is_some(), + format!( + "Fragment {fragment_id} actor {actor_id} has no vnode_bitmap set for hash distribution type", + ) + ); + + let buffer = actor.vnode_bitmap.as_ref().unwrap().to_protobuf(); + let bitmap = Bitmap::from(&buffer); + + crit_check_in_loop!( + flag, + result_bitmap.clone().bitand(&bitmap).count_ones() == 0, + format!( + "Fragment {fragment_id} actor {actor_id} has duplicate vnode_bitmap with other actor for hash distribution type, actor bitmap {bitmap:?}, other all bitmap {result_bitmap:?}", + ) + ); + + result_bitmap.bitor_assign(&bitmap); + } + + crit_check_in_loop!( + flag, + result_bitmap.all(), + format!( + "Fragment {fragment_id} has incomplete vnode_bitmap for hash distribution type", + ) + ); + + let discovered_vnode_count = result_bitmap.count_ones(); + + crit_check_in_loop!( + flag, + discovered_vnode_count == fragment_vnode_count, + format!( + "Fragment {fragment_id} has different vnode_count {fragment_vnode_count} with discovered vnode count {discovered_vnode_count} for hash distribution type", + ) + ); + } + } + } + + for PartialActorDispatcher { + id, + actor_id, + dispatcher_type, + hash_mapping, + dispatcher_id, + downstream_actor_ids, + } in &actor_dispatchers + { + crit_check_in_loop!( + flag, + actor_map.contains_key(actor_id), + format!( + "ActorDispatcher {} has actor_id {} which does not exist", + id, actor_id + ) + ); + + let actor = &actor_map[actor_id]; + + crit_check_in_loop!( + flag, + fragment_map.contains_key(dispatcher_id), + format!( + "ActorDispatcher {} has dispatcher_id {} which does not exist", + id, dispatcher_id + ) + ); + + discovered_upstream_fragments + .entry(*dispatcher_id) + .or_insert(HashSet::new()) + .insert(actor.fragment_id); + + let downstream_fragment = &fragment_map[dispatcher_id]; + + crit_check_in_loop!( + flag, + downstream_fragment.upstream_fragment_id.inner_ref().contains(&actor.fragment_id), + format!( + "ActorDispatcher {} has downstream fragment {} which does not have upstream fragment {}", + id, dispatcher_id, actor.fragment_id + ) + ); + + crit_check_in_loop!( + flag, + fragment_actors.contains_key(dispatcher_id), + format!( + "ActorDispatcher {id} has downstream fragment {dispatcher_id} which has no actors", + ) + ); + + let dispatcher_downstream_actor_ids: HashSet<_> = + downstream_actor_ids.inner_ref().iter().cloned().collect(); + + let target_fragment_actor_ids = &fragment_actors[dispatcher_id]; + + for dispatcher_downstream_actor_id in &dispatcher_downstream_actor_ids { + crit_check_in_loop!( + flag, + actor_map.contains_key(dispatcher_downstream_actor_id), + format!( + "ActorDispatcher {id} has downstream_actor_id {dispatcher_downstream_actor_id} which does not exist", + ) + ); + + let actor_fragment_id = actor.fragment_id; + + crit_check_in_loop!( + flag, + actor_map[dispatcher_downstream_actor_id].upstream_actor_ids.inner_ref().contains_key(&actor.fragment_id), + format!( + "ActorDispatcher {id} has downstream_actor_id {dispatcher_downstream_actor_id} which does not have fragment_id {actor_fragment_id} in upstream_actor_id", + ) + ); + + discovered_upstream_actors + .entry(*dispatcher_downstream_actor_id) + .or_insert(HashSet::new()) + .insert(actor.actor_id); + } + + match dispatcher_type { + DispatcherType::NoShuffle => {} + _ => { + crit_check_in_loop!( + flag, + &dispatcher_downstream_actor_ids == target_fragment_actor_ids, + format!( + "ActorDispatcher {id} has downstream fragment {dispatcher_id} which has different actors: {dispatcher_downstream_actor_ids:?} != target_fragment_actor_ids: {target_fragment_actor_ids:?}", + ) + ); + } + } + + match dispatcher_type { + DispatcherType::Hash => { + crit_check_in_loop!( + flag, + hash_mapping.is_some(), + format!( + "ActorDispatcher {id} has no hash_mapping set for {dispatcher_type:?}", + ) + ); + } + _ => { + crit_check_in_loop!( + flag, + hash_mapping.is_none(), + format!( + "ActorDispatcher {id} has hash_mapping set for {dispatcher_type:?}" + ) + ); + } + } + + match dispatcher_type { + DispatcherType::Simple | DispatcherType::NoShuffle => { + crit_check_in_loop!( + flag, + dispatcher_downstream_actor_ids.len() == 1, + format!( + "ActorDispatcher {id} has more than one downstream_actor_ids for {dispatcher_type:?}", + ) + ); + } + _ => {} + } + + match dispatcher_type { + DispatcherType::Hash => { + let mapping = hash::ActorMapping::from_protobuf( + &hash_mapping.as_ref().unwrap().to_protobuf(), + ); + + let mapping_actors: HashSet<_> = + mapping.iter().map(|actor_id| actor_id as ActorId).collect(); + + crit_check_in_loop!( + flag, + &mapping_actors == target_fragment_actor_ids, + format!( + "ActorDispatcher {id} has downstream fragment {dispatcher_id} which has different actors: {mapping_actors:?} != target_fragment_actor_ids: {target_fragment_actor_ids:?}", + ) + ); + + // actors only from hash distribution fragment can have hash mapping + match downstream_fragment.distribution_type { + DistributionType::Hash => { + let mut downstream_bitmaps = HashMap::new(); + + for downstream_actor in target_fragment_actor_ids { + let bitmap = Bitmap::from( + &actor_map[downstream_actor] + .vnode_bitmap + .as_ref() + .unwrap() + .to_protobuf(), + ); + + downstream_bitmaps + .insert(*downstream_actor as hash::ActorId, bitmap); + } + + crit_check_in_loop!( + flag, + mapping.to_bitmaps() == downstream_bitmaps, + format!( + "ActorDispatcher {id} has hash downstream fragment {dispatcher_id} which has different bitmaps: {mapping:?} != downstream bitmaps: {downstream_bitmaps:?}" + ) + ); + } + DistributionType::Single => { + tracing::warn!( + "ActorDispatcher {id} has hash downstream fragment {dispatcher_id} which has single distribution type" + ); + } + } + } + + DispatcherType::Simple => { + crit_check_in_loop!( + flag, + target_fragment_actor_ids.len() == 1, + format!( + "ActorDispatcher {id} has more than one actors in downstream fragment {dispatcher_id} for {dispatcher_type:?}", + ) + ); + + crit_check_in_loop!( + flag, + downstream_fragment.distribution_type != DistributionType::Hash, + format!( + "ActorDispatcher {id} has downstream fragment {dispatcher_id} which has hash distribution type for {dispatcher_type:?}", + ) + ); + } + + DispatcherType::NoShuffle => { + let downstream_actor_id = + dispatcher_downstream_actor_ids.iter().next().unwrap(); + let downstream_actor = &actor_map[downstream_actor_id]; + + crit_check_in_loop!( + flag, + actor.vnode_bitmap == downstream_actor.vnode_bitmap, + format!( + "ActorDispatcher {id} has different vnode_bitmap with downstream_actor_id {downstream_actor_id} for {dispatcher_type:?}", + ) + ); + } + + DispatcherType::Broadcast => { + if let DistributionType::Single = downstream_fragment.distribution_type { + tracing::warn!( + "ActorDispatcher {id} has broadcast downstream fragment {dispatcher_id} which has single distribution type" + ); + } + } + } + } + + for (fragment_id, fragment) in &fragment_map { + let discovered_upstream_fragment_ids = discovered_upstream_fragments + .get(&fragment.fragment_id) + .cloned() + .unwrap_or_default(); + + let upstream_fragment_ids: HashSet<_> = fragment + .upstream_fragment_id + .inner_ref() + .iter() + .copied() + .collect(); + + crit_check_in_loop!( + flag, + discovered_upstream_fragment_ids == upstream_fragment_ids, + format!( + "Fragment {fragment_id} has different upstream_fragment_ids from discovered: {discovered_upstream_fragment_ids:?} != upstream_fragment_ids: {upstream_fragment_ids:?}", + ) + ); + } + + for PartialActor { + actor_id, + status, + upstream_actor_ids, + .. + } in actor_map.values() + { + crit_check_in_loop!( + flag, + *status == ActorStatus::Running, + format!("Actor {actor_id} has status {status:?} which is not Running",) + ); + + let discovered_upstream_actor_ids = discovered_upstream_actors + .get(actor_id) + .cloned() + .unwrap_or_default(); + + let upstream_actor_ids: HashSet<_> = upstream_actor_ids + .inner_ref() + .iter() + .flat_map(|(_, v)| v.iter().copied()) + .collect(); + + crit_check_in_loop!( + flag, + discovered_upstream_actor_ids == upstream_actor_ids, + format!( + "Actor {actor_id} has different upstream_actor_ids from dispatcher discovered: {discovered_upstream_actor_ids:?} != upstream_actor_ids: {upstream_actor_ids:?}", + ) + ) + } + + if flag { + return Err(MetaError::integrity_check_failed()); + } + + Ok(()) + } + + pub async fn integrity_check(&self) -> MetaResult<()> { + let inner = self.inner.read().await; + let txn = inner.db.begin().await?; + Self::graph_check(&txn).await + } +} diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index 8aeaed2f9c5a8..5a1ad1988786e 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -125,6 +125,9 @@ pub enum MetaErrorInner { // Indicates that recovery was triggered manually. #[error("adhoc recovery triggered")] AdhocRecovery, + + #[error("Integrity check failed")] + IntegrityCheckFailed, } impl MetaError { diff --git a/src/utils/runtime/src/lib.rs b/src/utils/runtime/src/lib.rs index 6eff2932ae8a6..23abe6dd9e52c 100644 --- a/src/utils/runtime/src/lib.rs +++ b/src/utils/runtime/src/lib.rs @@ -72,7 +72,7 @@ where { set_panic_hook(); - rustls::crypto::aws_lc_rs::default_provider() + rustls::crypto::ring::default_provider() .install_default() .inspect_err(|e| { tracing::error!(?e, "Failed to install default crypto provider.");