Skip to content

Commit

Permalink
feat(suscription): Improving usability of subscription (#18217) (#18234)
Browse files Browse the repository at this point in the history
Co-authored-by: Xinhao Xu <[email protected]>
  • Loading branch information
github-actions[bot] and xxhZs authored Aug 26, 2024
1 parent eb80f9c commit 940ee8b
Show file tree
Hide file tree
Showing 11 changed files with 194 additions and 59 deletions.
108 changes: 67 additions & 41 deletions e2e_test/subscription/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ def test_cursor_snapshot():
database="dev"
)

execute_insert("declare cur subscription cursor for sub",conn)
execute_insert("declare cur subscription cursor for sub full",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([1,2],row[0],1)
check_rows_data([1,2],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
assert row == []
execute_insert("close cur",conn)
Expand All @@ -72,19 +72,19 @@ def test_cursor_snapshot_log_store():
database="dev"
)

execute_insert("declare cur subscription cursor for sub",conn)
execute_insert("declare cur subscription cursor for sub full",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([1,2],row[0],1)
check_rows_data([1,2],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
assert row == []
execute_insert("insert into t1 values(4,4)",conn)
execute_insert("flush",conn)
execute_insert("insert into t1 values(5,5)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([4,4],row[0],1)
check_rows_data([4,4],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
check_rows_data([5,5],row[0],1)
check_rows_data([5,5],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
assert row == []
execute_insert("close cur",conn)
Expand All @@ -108,11 +108,11 @@ def test_cursor_since_begin():
execute_insert("insert into t1 values(6,6)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([4,4],row[0],1)
check_rows_data([4,4],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
check_rows_data([5,5],row[0],1)
check_rows_data([5,5],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
check_rows_data([6,6],row[0],1)
check_rows_data([6,6],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
assert row == []
execute_insert("close cur",conn)
Expand All @@ -137,7 +137,32 @@ def test_cursor_since_now():
execute_insert("insert into t1 values(6,6)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([6,6],row[0],1)
check_rows_data([6,6],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
assert row == []
execute_insert("close cur",conn)
drop_table_subscription()

def test_cursor_without_since():
print(f"test_cursor_since_now")
create_table_subscription()
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

execute_insert("insert into t1 values(4,4)",conn)
execute_insert("flush",conn)
execute_insert("insert into t1 values(5,5)",conn)
execute_insert("flush",conn)
execute_insert("declare cur subscription cursor for sub",conn)
time.sleep(2)
execute_insert("insert into t1 values(6,6)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([6,6],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
assert row == []
execute_insert("close cur",conn)
Expand All @@ -163,27 +188,27 @@ def test_cursor_since_rw_timestamp():
row = execute_query("fetch next from cur",conn)
valuelen = len(row[0])
rw_timestamp_1 = row[0][valuelen - 1]
check_rows_data([4,4],row[0],1)
check_rows_data([4,4],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
valuelen = len(row[0])
rw_timestamp_2 = row[0][valuelen - 1] - 1
check_rows_data([5,5],row[0],1)
check_rows_data([5,5],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
valuelen = len(row[0])
rw_timestamp_3 = row[0][valuelen - 1] + 1
check_rows_data([6,6],row[0],1)
check_rows_data([6,6],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
assert row == []
execute_insert("close cur",conn)

execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_1}",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([4,4],row[0],1)
check_rows_data([4,4],row[0],"Insert")
execute_insert("close cur",conn)

execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_2}",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([5,5],row[0],1)
check_rows_data([5,5],row[0],"Insert")
execute_insert("close cur",conn)

execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_3}",conn)
Expand All @@ -203,9 +228,9 @@ def test_cursor_op():
database="dev"
)

execute_insert("declare cur subscription cursor for sub",conn)
execute_insert("declare cur subscription cursor for sub full",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([1,2],row[0],1)
check_rows_data([1,2],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
assert row == []

Expand All @@ -214,18 +239,18 @@ def test_cursor_op():
execute_insert("update t1 set v2 = 10 where v1 = 4",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([4,4],row[0],1)
check_rows_data([4,4],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
check_rows_data([4,4],row[0],4)
check_rows_data([4,4],row[0],"UpdateDelete")
row = execute_query("fetch next from cur",conn)
check_rows_data([4,10],row[0],3)
check_rows_data([4,10],row[0],"UpdateInsert")
row = execute_query("fetch next from cur",conn)
assert row == []

execute_insert("delete from t1 where v1 = 4",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([4,10],row[0],2)
check_rows_data([4,10],row[0],"Delete")
row = execute_query("fetch next from cur",conn)
assert row == []

Expand All @@ -242,27 +267,27 @@ def test_cursor_with_table_alter():
database="dev"
)

execute_insert("declare cur subscription cursor for sub",conn)
execute_insert("declare cur subscription cursor for sub full",conn)
execute_insert("alter table t1 add v3 int",conn)
execute_insert("insert into t1 values(4,4,4)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([1,2],row[0],1)
check_rows_data([1,2],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
assert(row == [])
row = execute_query("fetch next from cur",conn)
check_rows_data([4,4,4],row[0],1)
check_rows_data([4,4,4],row[0],"Insert")
execute_insert("insert into t1 values(5,5,5)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([5,5,5],row[0],1)
check_rows_data([5,5,5],row[0],"Insert")
execute_insert("alter table t1 drop column v2",conn)
execute_insert("insert into t1 values(6,6)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
assert(row == [])
row = execute_query("fetch next from cur",conn)
check_rows_data([6,6],row[0],1)
check_rows_data([6,6],row[0],"Insert")
drop_table_subscription()

def test_cursor_fetch_n():
Expand All @@ -275,7 +300,7 @@ def test_cursor_fetch_n():
database="dev"
)

execute_insert("declare cur subscription cursor for sub",conn)
execute_insert("declare cur subscription cursor for sub full",conn)
execute_insert("insert into t1 values(4,4)",conn)
execute_insert("flush",conn)
execute_insert("insert into t1 values(5,5)",conn)
Expand All @@ -294,18 +319,18 @@ def test_cursor_fetch_n():
execute_insert("flush",conn)
row = execute_query("fetch 6 from cur",conn)
assert len(row) == 6
check_rows_data([1,2],row[0],1)
check_rows_data([4,4],row[1],1)
check_rows_data([5,5],row[2],1)
check_rows_data([6,6],row[3],1)
check_rows_data([7,7],row[4],1)
check_rows_data([8,8],row[5],1)
check_rows_data([1,2],row[0],"Insert")
check_rows_data([4,4],row[1],"Insert")
check_rows_data([5,5],row[2],"Insert")
check_rows_data([6,6],row[3],"Insert")
check_rows_data([7,7],row[4],"Insert")
check_rows_data([8,8],row[5],"Insert")
row = execute_query("fetch 6 from cur",conn)
assert len(row) == 4
check_rows_data([9,9],row[0],1)
check_rows_data([10,10],row[1],1)
check_rows_data([10,10],row[2],4)
check_rows_data([10,100],row[3],3)
check_rows_data([9,9],row[0],"Insert")
check_rows_data([10,10],row[1],"Insert")
check_rows_data([10,10],row[2],"UpdateDelete")
check_rows_data([10,100],row[3],"UpdateInsert")
drop_table_subscription()

def test_rebuild_table():
Expand All @@ -318,16 +343,16 @@ def test_rebuild_table():
database="dev"
)

execute_insert("declare cur subscription cursor for sub2",conn)
execute_insert("declare cur subscription cursor for sub2 full",conn)
execute_insert("insert into t2 values(1,1)",conn)
execute_insert("flush",conn)
execute_insert("update t2 set v2 = 100 where v1 = 1",conn)
execute_insert("flush",conn)
row = execute_query("fetch 4 from cur",conn)
assert len(row) == 3
check_rows_data([1,1],row[0],1)
check_rows_data([1,1],row[1],4)
check_rows_data([1,100],row[2],3)
check_rows_data([1,1],row[0],"Insert")
check_rows_data([1,1],row[1],"UpdateDelete")
check_rows_data([1,100],row[2],"UpdateInsert")
drop_table_subscription()

if __name__ == "__main__":
Expand All @@ -336,6 +361,7 @@ def test_rebuild_table():
test_cursor_snapshot_log_store()
test_cursor_since_rw_timestamp()
test_cursor_since_now()
test_cursor_without_since()
test_cursor_since_begin()
test_cursor_with_table_alter()
test_cursor_fetch_n()
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/log_row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
) -> Self {
let mut schema = table.schema().clone();
schema.fields.push(Field::with_name(
risingwave_common::types::DataType::Int16,
risingwave_common::types::DataType::Varchar,
"op",
));
Self {
Expand Down Expand Up @@ -212,7 +212,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
match r {
Ok(change_log_row) => {
fn with_op(op: Op, row: impl Row) -> impl Row {
row.chain([Some(ScalarImpl::Int16(op.to_i16()))])
row.chain([Some(ScalarImpl::Utf8(op.to_varchar().into()))])
}
for (op, row) in change_log_row.into_op_value_iter() {
yield Ok(with_op(op, row));
Expand Down
10 changes: 10 additions & 0 deletions src/common/src/array/stream_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ impl Op {
Op::UpdateDelete => 4,
}
}

pub fn to_varchar(self) -> String {
match self {
Op::Insert => "Insert",
Op::Delete => "Delete",
Op::UpdateInsert => "UpdateInsert",
Op::UpdateDelete => "UpdateDelete",
}
.to_string()
}
}

pub type Ops<'a> = &'a [Op];
Expand Down
10 changes: 5 additions & 5 deletions src/frontend/src/handler/declare_cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn handle_declare_subscription_cursor(
handle_args: HandlerArgs,
sub_name: ObjectName,
cursor_name: ObjectName,
rw_timestamp: Option<Since>,
rw_timestamp: Since,
) -> Result<RwPgResponse> {
let session = handle_args.session.clone();
let db_name = session.database();
Expand All @@ -67,19 +67,19 @@ async fn handle_declare_subscription_cursor(
session.get_subscription_by_name(schema_name, &cursor_from_subscription_name)?;
// Start the first query of cursor, which includes querying the table and querying the subscription's logstore
let start_rw_timestamp = match rw_timestamp {
Some(risingwave_sqlparser::ast::Since::TimestampMsNum(start_rw_timestamp)) => {
risingwave_sqlparser::ast::Since::TimestampMsNum(start_rw_timestamp) => {
check_cursor_unix_millis(start_rw_timestamp, subscription.retention_seconds)?;
Some(convert_unix_millis_to_logstore_u64(start_rw_timestamp))
}
Some(risingwave_sqlparser::ast::Since::ProcessTime) => Some(Epoch::now().0),
Some(risingwave_sqlparser::ast::Since::Begin) => {
risingwave_sqlparser::ast::Since::ProcessTime => Some(Epoch::now().0),
risingwave_sqlparser::ast::Since::Begin => {
let min_unix_millis =
Epoch::now().as_unix_millis() - subscription.retention_seconds * 1000;
let subscription_build_millis = subscription.created_at_epoch.unwrap().as_unix_millis();
let min_unix_millis = std::cmp::max(min_unix_millis, subscription_build_millis);
Some(convert_unix_millis_to_logstore_u64(min_unix_millis))
}
None => None,
risingwave_sqlparser::ast::Since::Full => None,
};
// Create cursor based on the response
session
Expand Down
19 changes: 18 additions & 1 deletion src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use risingwave_sqlparser::ast::{
display_comma_separated, Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter,
};

use super::{fields_to_descriptors, RwPgResponse, RwPgResponseBuilderExt};
use super::{fields_to_descriptors, PgResponseStream, RwPgResponse, RwPgResponseBuilderExt};
use crate::binder::{Binder, Relation};
use crate::catalog::{CatalogError, IndexCatalog};
use crate::error::Result;
Expand Down Expand Up @@ -481,6 +481,23 @@ pub async fn handle_show_object(
.rows(rows)
.into());
}
ShowObject::Cursor => {
let (rows, pg_descs) = session.get_cursor_manager().get_all_query_cursors().await;
return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
.row_cnt_opt(Some(rows.len() as i32))
.values(PgResponseStream::from(rows), pg_descs)
.into());
}
ShowObject::SubscriptionCursor => {
let (rows, pg_descs) = session
.get_cursor_manager()
.get_all_subscription_cursors()
.await;
return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
.row_cnt_opt(Some(rows.len() as i32))
.values(PgResponseStream::from(rows), pg_descs)
.into());
}
};

let rows = names
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/log_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::catalog::ColumnId;
use crate::optimizer::optimizer_context::OptimizerContextRef;

const OP_NAME: &str = "op";
const OP_TYPE: DataType = DataType::Int16;
const OP_TYPE: DataType = DataType::Varchar;

#[derive(Debug, Clone, Educe)]
#[educe(PartialEq, Eq, Hash)]
Expand Down
Loading

0 comments on commit 940ee8b

Please sign in to comment.