From 940ee8bfc540da467a26315c1d65bad9751b0fb7 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Mon, 26 Aug 2024 09:31:08 +0000 Subject: [PATCH] feat(suscription): Improving usability of subscription (#18217) (#18234) Co-authored-by: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> --- e2e_test/subscription/main.py | 108 +++++++++++------- src/batch/src/executor/log_row_seq_scan.rs | 4 +- src/common/src/array/stream_chunk.rs | 10 ++ src/frontend/src/handler/declare_cursor.rs | 10 +- src/frontend/src/handler/show.rs | 19 ++- .../optimizer/plan_node/generic/log_scan.rs | 2 +- src/frontend/src/session/cursor_manager.rs | 66 ++++++++++- src/sqlparser/src/ast/mod.rs | 4 + src/sqlparser/src/ast/statement.rs | 4 +- src/sqlparser/src/keywords.rs | 1 + src/sqlparser/src/parser.rs | 25 +++- 11 files changed, 194 insertions(+), 59 deletions(-) diff --git a/e2e_test/subscription/main.py b/e2e_test/subscription/main.py index fa89c9697d40c..60db0df82c33a 100644 --- a/e2e_test/subscription/main.py +++ b/e2e_test/subscription/main.py @@ -53,9 +53,9 @@ def test_cursor_snapshot(): database="dev" ) - execute_insert("declare cur subscription cursor for sub",conn) + execute_insert("declare cur subscription cursor for sub full",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([1,2],row[0],1) + check_rows_data([1,2],row[0],"Insert") row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) @@ -72,9 +72,9 @@ def test_cursor_snapshot_log_store(): database="dev" ) - execute_insert("declare cur subscription cursor for sub",conn) + execute_insert("declare cur subscription cursor for sub full",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([1,2],row[0],1) + check_rows_data([1,2],row[0],"Insert") row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("insert into t1 values(4,4)",conn) @@ -82,9 +82,9 @@ def test_cursor_snapshot_log_store(): execute_insert("insert into t1 values(5,5)",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([4,4],row[0],1) + check_rows_data([4,4],row[0],"Insert") row = execute_query("fetch next from cur",conn) - check_rows_data([5,5],row[0],1) + check_rows_data([5,5],row[0],"Insert") row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) @@ -108,11 +108,11 @@ def test_cursor_since_begin(): execute_insert("insert into t1 values(6,6)",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([4,4],row[0],1) + check_rows_data([4,4],row[0],"Insert") row = execute_query("fetch next from cur",conn) - check_rows_data([5,5],row[0],1) + check_rows_data([5,5],row[0],"Insert") row = execute_query("fetch next from cur",conn) - check_rows_data([6,6],row[0],1) + check_rows_data([6,6],row[0],"Insert") row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) @@ -137,7 +137,32 @@ def test_cursor_since_now(): execute_insert("insert into t1 values(6,6)",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([6,6],row[0],1) + check_rows_data([6,6],row[0],"Insert") + row = execute_query("fetch next from cur",conn) + assert row == [] + execute_insert("close cur",conn) + drop_table_subscription() + +def test_cursor_without_since(): + print(f"test_cursor_since_now") + create_table_subscription() + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + execute_insert("insert into t1 values(4,4)",conn) + execute_insert("flush",conn) + execute_insert("insert into t1 values(5,5)",conn) + execute_insert("flush",conn) + execute_insert("declare cur subscription cursor for sub",conn) + time.sleep(2) + execute_insert("insert into t1 values(6,6)",conn) + execute_insert("flush",conn) + row = execute_query("fetch next from cur",conn) + check_rows_data([6,6],row[0],"Insert") row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) @@ -163,27 +188,27 @@ def test_cursor_since_rw_timestamp(): row = execute_query("fetch next from cur",conn) valuelen = len(row[0]) rw_timestamp_1 = row[0][valuelen - 1] - check_rows_data([4,4],row[0],1) + check_rows_data([4,4],row[0],"Insert") row = execute_query("fetch next from cur",conn) valuelen = len(row[0]) rw_timestamp_2 = row[0][valuelen - 1] - 1 - check_rows_data([5,5],row[0],1) + check_rows_data([5,5],row[0],"Insert") row = execute_query("fetch next from cur",conn) valuelen = len(row[0]) rw_timestamp_3 = row[0][valuelen - 1] + 1 - check_rows_data([6,6],row[0],1) + check_rows_data([6,6],row[0],"Insert") row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_1}",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([4,4],row[0],1) + check_rows_data([4,4],row[0],"Insert") execute_insert("close cur",conn) execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_2}",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([5,5],row[0],1) + check_rows_data([5,5],row[0],"Insert") execute_insert("close cur",conn) execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_3}",conn) @@ -203,9 +228,9 @@ def test_cursor_op(): database="dev" ) - execute_insert("declare cur subscription cursor for sub",conn) + execute_insert("declare cur subscription cursor for sub full",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([1,2],row[0],1) + check_rows_data([1,2],row[0],"Insert") row = execute_query("fetch next from cur",conn) assert row == [] @@ -214,18 +239,18 @@ def test_cursor_op(): execute_insert("update t1 set v2 = 10 where v1 = 4",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([4,4],row[0],1) + check_rows_data([4,4],row[0],"Insert") row = execute_query("fetch next from cur",conn) - check_rows_data([4,4],row[0],4) + check_rows_data([4,4],row[0],"UpdateDelete") row = execute_query("fetch next from cur",conn) - check_rows_data([4,10],row[0],3) + check_rows_data([4,10],row[0],"UpdateInsert") row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("delete from t1 where v1 = 4",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([4,10],row[0],2) + check_rows_data([4,10],row[0],"Delete") row = execute_query("fetch next from cur",conn) assert row == [] @@ -242,27 +267,27 @@ def test_cursor_with_table_alter(): database="dev" ) - execute_insert("declare cur subscription cursor for sub",conn) + execute_insert("declare cur subscription cursor for sub full",conn) execute_insert("alter table t1 add v3 int",conn) execute_insert("insert into t1 values(4,4,4)",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([1,2],row[0],1) + check_rows_data([1,2],row[0],"Insert") row = execute_query("fetch next from cur",conn) assert(row == []) row = execute_query("fetch next from cur",conn) - check_rows_data([4,4,4],row[0],1) + check_rows_data([4,4,4],row[0],"Insert") execute_insert("insert into t1 values(5,5,5)",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) - check_rows_data([5,5,5],row[0],1) + check_rows_data([5,5,5],row[0],"Insert") execute_insert("alter table t1 drop column v2",conn) execute_insert("insert into t1 values(6,6)",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) assert(row == []) row = execute_query("fetch next from cur",conn) - check_rows_data([6,6],row[0],1) + check_rows_data([6,6],row[0],"Insert") drop_table_subscription() def test_cursor_fetch_n(): @@ -275,7 +300,7 @@ def test_cursor_fetch_n(): database="dev" ) - execute_insert("declare cur subscription cursor for sub",conn) + execute_insert("declare cur subscription cursor for sub full",conn) execute_insert("insert into t1 values(4,4)",conn) execute_insert("flush",conn) execute_insert("insert into t1 values(5,5)",conn) @@ -294,18 +319,18 @@ def test_cursor_fetch_n(): execute_insert("flush",conn) row = execute_query("fetch 6 from cur",conn) assert len(row) == 6 - check_rows_data([1,2],row[0],1) - check_rows_data([4,4],row[1],1) - check_rows_data([5,5],row[2],1) - check_rows_data([6,6],row[3],1) - check_rows_data([7,7],row[4],1) - check_rows_data([8,8],row[5],1) + check_rows_data([1,2],row[0],"Insert") + check_rows_data([4,4],row[1],"Insert") + check_rows_data([5,5],row[2],"Insert") + check_rows_data([6,6],row[3],"Insert") + check_rows_data([7,7],row[4],"Insert") + check_rows_data([8,8],row[5],"Insert") row = execute_query("fetch 6 from cur",conn) assert len(row) == 4 - check_rows_data([9,9],row[0],1) - check_rows_data([10,10],row[1],1) - check_rows_data([10,10],row[2],4) - check_rows_data([10,100],row[3],3) + check_rows_data([9,9],row[0],"Insert") + check_rows_data([10,10],row[1],"Insert") + check_rows_data([10,10],row[2],"UpdateDelete") + check_rows_data([10,100],row[3],"UpdateInsert") drop_table_subscription() def test_rebuild_table(): @@ -318,16 +343,16 @@ def test_rebuild_table(): database="dev" ) - execute_insert("declare cur subscription cursor for sub2",conn) + execute_insert("declare cur subscription cursor for sub2 full",conn) execute_insert("insert into t2 values(1,1)",conn) execute_insert("flush",conn) execute_insert("update t2 set v2 = 100 where v1 = 1",conn) execute_insert("flush",conn) row = execute_query("fetch 4 from cur",conn) assert len(row) == 3 - check_rows_data([1,1],row[0],1) - check_rows_data([1,1],row[1],4) - check_rows_data([1,100],row[2],3) + check_rows_data([1,1],row[0],"Insert") + check_rows_data([1,1],row[1],"UpdateDelete") + check_rows_data([1,100],row[2],"UpdateInsert") drop_table_subscription() if __name__ == "__main__": @@ -336,6 +361,7 @@ def test_rebuild_table(): test_cursor_snapshot_log_store() test_cursor_since_rw_timestamp() test_cursor_since_now() + test_cursor_without_since() test_cursor_since_begin() test_cursor_with_table_alter() test_cursor_fetch_n() diff --git a/src/batch/src/executor/log_row_seq_scan.rs b/src/batch/src/executor/log_row_seq_scan.rs index 3614673ee941e..7106eaec1b760 100644 --- a/src/batch/src/executor/log_row_seq_scan.rs +++ b/src/batch/src/executor/log_row_seq_scan.rs @@ -62,7 +62,7 @@ impl LogRowSeqScanExecutor { ) -> Self { let mut schema = table.schema().clone(); schema.fields.push(Field::with_name( - risingwave_common::types::DataType::Int16, + risingwave_common::types::DataType::Varchar, "op", )); Self { @@ -212,7 +212,7 @@ impl LogRowSeqScanExecutor { match r { Ok(change_log_row) => { fn with_op(op: Op, row: impl Row) -> impl Row { - row.chain([Some(ScalarImpl::Int16(op.to_i16()))]) + row.chain([Some(ScalarImpl::Utf8(op.to_varchar().into()))]) } for (op, row) in change_log_row.into_op_value_iter() { yield Ok(with_op(op, row)); diff --git a/src/common/src/array/stream_chunk.rs b/src/common/src/array/stream_chunk.rs index cfee990561aa1..2827f07493fd2 100644 --- a/src/common/src/array/stream_chunk.rs +++ b/src/common/src/array/stream_chunk.rs @@ -89,6 +89,16 @@ impl Op { Op::UpdateDelete => 4, } } + + pub fn to_varchar(self) -> String { + match self { + Op::Insert => "Insert", + Op::Delete => "Delete", + Op::UpdateInsert => "UpdateInsert", + Op::UpdateDelete => "UpdateDelete", + } + .to_string() + } } pub type Ops<'a> = &'a [Op]; diff --git a/src/frontend/src/handler/declare_cursor.rs b/src/frontend/src/handler/declare_cursor.rs index a4974530cfe50..4c13886581706 100644 --- a/src/frontend/src/handler/declare_cursor.rs +++ b/src/frontend/src/handler/declare_cursor.rs @@ -55,7 +55,7 @@ async fn handle_declare_subscription_cursor( handle_args: HandlerArgs, sub_name: ObjectName, cursor_name: ObjectName, - rw_timestamp: Option, + rw_timestamp: Since, ) -> Result { let session = handle_args.session.clone(); let db_name = session.database(); @@ -67,19 +67,19 @@ async fn handle_declare_subscription_cursor( session.get_subscription_by_name(schema_name, &cursor_from_subscription_name)?; // Start the first query of cursor, which includes querying the table and querying the subscription's logstore let start_rw_timestamp = match rw_timestamp { - Some(risingwave_sqlparser::ast::Since::TimestampMsNum(start_rw_timestamp)) => { + risingwave_sqlparser::ast::Since::TimestampMsNum(start_rw_timestamp) => { check_cursor_unix_millis(start_rw_timestamp, subscription.retention_seconds)?; Some(convert_unix_millis_to_logstore_u64(start_rw_timestamp)) } - Some(risingwave_sqlparser::ast::Since::ProcessTime) => Some(Epoch::now().0), - Some(risingwave_sqlparser::ast::Since::Begin) => { + risingwave_sqlparser::ast::Since::ProcessTime => Some(Epoch::now().0), + risingwave_sqlparser::ast::Since::Begin => { let min_unix_millis = Epoch::now().as_unix_millis() - subscription.retention_seconds * 1000; let subscription_build_millis = subscription.created_at_epoch.unwrap().as_unix_millis(); let min_unix_millis = std::cmp::max(min_unix_millis, subscription_build_millis); Some(convert_unix_millis_to_logstore_u64(min_unix_millis)) } - None => None, + risingwave_sqlparser::ast::Since::Full => None, }; // Create cursor based on the response session diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index 6c0bc2c7e61e8..b5090537bc8c0 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -31,7 +31,7 @@ use risingwave_sqlparser::ast::{ display_comma_separated, Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter, }; -use super::{fields_to_descriptors, RwPgResponse, RwPgResponseBuilderExt}; +use super::{fields_to_descriptors, PgResponseStream, RwPgResponse, RwPgResponseBuilderExt}; use crate::binder::{Binder, Relation}; use crate::catalog::{CatalogError, IndexCatalog}; use crate::error::Result; @@ -481,6 +481,23 @@ pub async fn handle_show_object( .rows(rows) .into()); } + ShowObject::Cursor => { + let (rows, pg_descs) = session.get_cursor_manager().get_all_query_cursors().await; + return Ok(PgResponse::builder(StatementType::SHOW_COMMAND) + .row_cnt_opt(Some(rows.len() as i32)) + .values(PgResponseStream::from(rows), pg_descs) + .into()); + } + ShowObject::SubscriptionCursor => { + let (rows, pg_descs) = session + .get_cursor_manager() + .get_all_subscription_cursors() + .await; + return Ok(PgResponse::builder(StatementType::SHOW_COMMAND) + .row_cnt_opt(Some(rows.len() as i32)) + .values(PgResponseStream::from(rows), pg_descs) + .into()); + } }; let rows = names diff --git a/src/frontend/src/optimizer/plan_node/generic/log_scan.rs b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs index 498d4a44b0fcc..9a1e53aeab758 100644 --- a/src/frontend/src/optimizer/plan_node/generic/log_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs @@ -25,7 +25,7 @@ use crate::catalog::ColumnId; use crate::optimizer::optimizer_context::OptimizerContextRef; const OP_NAME: &str = "op"; -const OP_TYPE: DataType = DataType::Int16; +const OP_TYPE: DataType = DataType::Varchar; #[derive(Debug, Clone, Educe)] #[educe(PartialEq, Eq, Hash)] diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index 390428f09bea3..dd14e8023fbc4 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -555,8 +555,8 @@ impl SubscriptionCursor { } else { let op_formats = formats.get(row_len).unwrap_or(&Format::Text); let op = pg_value_format( - &DataType::Int16, - risingwave_common::types::ScalarRefImpl::Int16(1_i16), + &DataType::Varchar, + risingwave_common::types::ScalarRefImpl::Utf8("Insert"), *op_formats, session_data, )?; @@ -568,7 +568,7 @@ impl SubscriptionCursor { pub fn build_desc(mut descs: Vec, from_snapshot: bool) -> Vec { if from_snapshot { - descs.push(Field::with_name(DataType::Int16, "op")); + descs.push(Field::with_name(DataType::Varchar, "op")); } descs.push(Field::with_name(DataType::Int64, "rw_timestamp")); descs @@ -750,4 +750,64 @@ impl CursorManager { Err(ErrorCode::ItemNotFound(format!("Cannot find cursor `{}`", cursor_name)).into()) } } + + pub async fn get_all_query_cursors(&self) -> (Vec, Vec) { + let cursor_names = self + .cursor_map + .lock() + .await + .iter() + .filter_map(|(currsor_name, cursor)| { + if let Cursor::Query(_cursor) = cursor { + let cursor_name = vec![Some(Bytes::from(currsor_name.clone().into_bytes()))]; + Some(Row::new(cursor_name)) + } else { + None + } + }) + .collect(); + ( + cursor_names, + vec![PgFieldDescriptor::new( + "Name".to_string(), + DataType::Varchar.to_oid(), + DataType::Varchar.type_len(), + )], + ) + } + + pub async fn get_all_subscription_cursors(&self) -> (Vec, Vec) { + let cursors = self + .cursor_map + .lock() + .await + .iter() + .filter_map(|(cursor_name, cursor)| { + if let Cursor::Subscription(cursor) = cursor { + let cursors = vec![ + Some(Bytes::from(cursor_name.clone().into_bytes())), + Some(Bytes::from(cursor.subscription.name.clone().into_bytes())), + ]; + Some(Row::new(cursors)) + } else { + None + } + }) + .collect(); + ( + cursors, + vec![ + PgFieldDescriptor::new( + "Name".to_string(), + DataType::Varchar.to_oid(), + DataType::Varchar.type_len(), + ), + PgFieldDescriptor::new( + "SubscriptionName".to_string(), + DataType::Varchar.to_oid(), + DataType::Varchar.type_len(), + ), + ], + ) + } } diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index eeb9859679717..f12f7705d587b 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -1020,6 +1020,8 @@ pub enum ShowObject { Cluster, Jobs, ProcessList, + Cursor, + SubscriptionCursor, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -1064,6 +1066,8 @@ impl fmt::Display for ShowObject { ShowObject::ProcessList => write!(f, "PROCESSLIST"), ShowObject::Subscription { schema } => write!(f, "SUBSCRIPTIONS{}", fmt_schema(schema)), ShowObject::Secret { schema } => write!(f, "SECRETS{}", fmt_schema(schema)), + ShowObject::Cursor => write!(f, "CURSORS"), + ShowObject::SubscriptionCursor => write!(f, "SUBSCRIPTION CURSORS"), } } } diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index db314fc8d6f6e..4bde036521b07 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -671,7 +671,7 @@ impl fmt::Display for CreateSubscriptionStatement { #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum DeclareCursor { Query(Box), - Subscription(ObjectName, Option), + Subscription(ObjectName, Since), } impl fmt::Display for DeclareCursor { @@ -927,6 +927,7 @@ pub enum Since { TimestampMsNum(u64), ProcessTime, Begin, + Full, } impl fmt::Display for Since { @@ -936,6 +937,7 @@ impl fmt::Display for Since { TimestampMsNum(ts) => write!(f, " SINCE {}", ts), ProcessTime => write!(f, " SINCE PROCTIME()"), Begin => write!(f, " SINCE BEGIN()"), + Full => write!(f, " FULL"), } } } diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index 79036ea849a33..014d100b1f954 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -174,6 +174,7 @@ define_keywords!( CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, + CURSORS, CYCLE, DATA, DATABASE, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index e239bfb3b0a6f..011c6d84e10b7 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -2909,7 +2909,7 @@ impl Parser<'_> { Ok(SqlOption { name, value }) } - pub fn parse_since(&mut self) -> PResult> { + pub fn parse_since(&mut self) -> PResult { if self.parse_keyword(Keyword::SINCE) { let checkpoint = *self; let token = self.next_token(); @@ -2920,11 +2920,11 @@ impl Parser<'_> { if ident.real_value() == "proctime" || ident.real_value() == "now" { self.expect_token(&Token::LParen)?; self.expect_token(&Token::RParen)?; - Ok(Some(Since::ProcessTime)) + Ok(Since::ProcessTime) } else if ident.real_value() == "begin" { self.expect_token(&Token::LParen)?; self.expect_token(&Token::RParen)?; - Ok(Some(Since::Begin)) + Ok(Since::Begin) } else { parser_err!( "Expected proctime(), begin() or now(), found: {}", @@ -2936,12 +2936,14 @@ impl Parser<'_> { let num = s .parse::() .map_err(|e| StrError(format!("Could not parse '{}' as u64: {}", s, e)))?; - Ok(Some(Since::TimestampMsNum(num))) + Ok(Since::TimestampMsNum(num)) } _ => self.expected_at(checkpoint, "proctime(), begin() , now(), Number"), } + } else if self.parse_word("FULL") { + Ok(Since::Full) } else { - Ok(None) + Ok(Since::ProcessTime) } } @@ -4447,6 +4449,19 @@ impl Parser<'_> { self.expect_keywords(&[Keyword::ISOLATION, Keyword::LEVEL])?; return Ok(Statement::ShowTransactionIsolationLevel); } + Keyword::CURSORS => { + return Ok(Statement::ShowObjects { + object: ShowObject::Cursor, + filter: None, + }); + } + Keyword::SUBSCRIPTION => { + self.expect_keyword(Keyword::CURSORS)?; + return Ok(Statement::ShowObjects { + object: ShowObject::SubscriptionCursor, + filter: None, + }); + } _ => {} } }