diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index e225d1e47ea7..0cd59aa1d98c 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -1047,6 +1047,57 @@ mod tests { check_invalid_request(&err, r#"unknown columns: ["k1"]"#); } + #[test] + fn test_fill_impure_columns_err() { + let rows = Rows { + schema: vec![new_column_schema( + "k0", + ColumnDataType::Int64, + SemanticType::Tag, + )], + rows: vec![Row { + values: vec![i64_value(1)], + }], + }; + let metadata = { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_default_constraint(Some(ColumnDefaultConstraint::Function( + "now()".to_string(), + ))) + .unwrap(), + semantic_type: SemanticType::Timestamp, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema::new( + "k0", + ConcreteDataType::int64_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 2, + }) + .primary_key(vec![2]); + builder.build().unwrap() + }; + + let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + assert!(err.is_fill_default()); + assert!(request + .fill_missing_columns(&metadata) + .unwrap_err() + .to_string() + .contains("Unexpected impure default value with region_id")); + } + #[test] fn test_fill_missing_columns() { let rows = Rows { diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index ad118dbd441b..33eb7e040823 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -218,10 +218,7 @@ impl Inserter { .convert(requests) .await?; - // Fill impure default values in the request - let inserts = fill_reqs_with_impure_default(&table_infos, inserts)?; - - self.do_request(inserts, &ctx).await + self.do_request(inserts, &table_infos, &ctx).await } /// Handles row inserts request with metric engine. @@ -266,9 +263,7 @@ impl Inserter { .convert(requests) .await?; - // Fill impure default values in the request - let inserts = fill_reqs_with_impure_default(&table_infos, inserts)?; - self.do_request(inserts, &ctx).await + self.do_request(inserts, &table_infos, &ctx).await } pub async fn handle_table_insert( @@ -291,9 +286,8 @@ impl Inserter { let table_infos = HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter()); - // Fill impure default values in the request - let inserts = fill_reqs_with_impure_default(&table_infos, inserts)?; - self.do_request(inserts, &ctx).await + + self.do_request(inserts, &table_infos, &ctx).await } pub async fn handle_statement_insert( @@ -308,10 +302,8 @@ impl Inserter { let table_infos = HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter()); - // Fill impure default values in the request - let inserts = fill_reqs_with_impure_default(&table_infos, inserts)?; - self.do_request(inserts, ctx).await + self.do_request(inserts, &table_infos, ctx).await } } @@ -319,8 +311,12 @@ impl Inserter { async fn do_request( &self, requests: InstantAndNormalInsertRequests, + table_infos: &HashMap>, ctx: &QueryContextRef, ) -> Result { + // Fill impure default values in the request + let requests = fill_reqs_with_impure_default(table_infos, requests)?; + let write_cost = write_meter!( ctx.current_catalog(), ctx.current_schema(), diff --git a/src/operator/src/req_convert/insert/fill_impure_default.rs b/src/operator/src/req_convert/insert/fill_impure_default.rs index c3ed4700b081..d473d55dfb7f 100644 --- a/src/operator/src/req_convert/insert/fill_impure_default.rs +++ b/src/operator/src/req_convert/insert/fill_impure_default.rs @@ -44,7 +44,7 @@ pub struct ImpureDefaultFiller { impl ImpureDefaultFiller { pub fn new(table_info: TableInfoRef) -> Result { let impure_column_list = find_all_impure_columns(&table_info); - let pks = table_info.meta.primary_key_indices.clone(); + let pks = &table_info.meta.primary_key_indices; let pk_names = pks .iter() .map(|&i| table_info.meta.schema.column_name_by_index(i).to_string()) @@ -93,6 +93,10 @@ impl ImpureDefaultFiller { }) .collect(); + if self.impure_columns.len() == impure_columns_in_reqs.len() { + return; + } + let (schema_append, row_append): (Vec<_>, Vec<_>) = self .impure_columns .iter() @@ -107,7 +111,7 @@ impl ImpureDefaultFiller { rows.schema.extend(schema_append); for row in rows.rows.iter_mut() { - row.values.extend(row_append.clone()); + row.values.extend_from_slice(row_append.as_slice()); } } }