Skip to content

Commit b776dd3

Browse files
authored
Support for Dynamic Table Deletion roapi#340 (roapi#343)
closes roapi#340
1 parent 08ec089 commit b776dd3

File tree

8 files changed

+115
-2
lines changed

8 files changed

+115
-2
lines changed

columnq/src/columnq.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,12 @@ impl ColumnQ {
110110
Ok(())
111111
}
112112

113+
pub async fn drop_table(&mut self, t: &TableSource) -> Result<(), ColumnQError> {
114+
self.schema_map.remove(&t.name);
115+
self.dfctx.deregister_table(t.name.as_str())?;
116+
Ok(())
117+
}
118+
113119
pub fn register_object_storage(
114120
&mut self,
115121
url: &Url,

roapi/src/api/drop.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
use std::{collections::HashMap, sync::Arc};
2+
3+
use axum::extract::{Extension, Json};
4+
use columnq::error::ColumnQError;
5+
use columnq::table::TableSource;
6+
use log::info;
7+
use serde::Deserialize;
8+
use tokio::sync::Mutex;
9+
10+
use crate::context::RoapiContext;
11+
use crate::error::ApiErrResp;
12+
13+
#[derive(Debug, Deserialize)]
14+
pub struct SourceConfig {
15+
#[serde(rename = "tableName")]
16+
pub table_name: String,
17+
}
18+
19+
pub async fn drop_table<H: RoapiContext>(
20+
Extension(ctx): Extension<Arc<H>>,
21+
Extension(tables): Extension<Arc<Mutex<HashMap<String, TableSource>>>>,
22+
Json(body): Json<Vec<SourceConfig>>,
23+
) -> Result<(), ApiErrResp> {
24+
let mut tables = tables.lock().await;
25+
for config in body {
26+
if let Some(t) = tables.get(&config.table_name) {
27+
info!("dropping table `{}`", t.name);
28+
ctx.drop_table(t)
29+
.await
30+
.map_err(ColumnQError::from)
31+
.map_err(ApiErrResp::drop_table)?;
32+
tables.remove(&config.table_name);
33+
info!("dropped table `{}`", config.table_name);
34+
} else {
35+
return Err(ApiErrResp::not_found(format!(
36+
"Table `{}` source does not exist",
37+
config.table_name
38+
)));
39+
}
40+
}
41+
Ok(())
42+
}
43+
44+
pub async fn drop_table_read_only() -> Result<(), ApiErrResp> {
45+
Err(ApiErrResp::read_only_mode())
46+
}

roapi/src/api/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ pub fn encode_record_batches(
5656
Ok(bytes_to_resp(payload, content_type.to_str()))
5757
}
5858

59+
pub mod drop;
5960
pub mod graphql;
6061
pub mod health;
6162
pub mod kv;

roapi/src/api/routes.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,13 @@ pub fn register_app_routes<H: RoapiContext>() -> Router {
2020
);
2121

2222
if H::read_only_mode() {
23-
router = router.route("/api/table", post(api::register::register_table_read_only));
23+
router = router
24+
.route("/api/table", post(api::register::register_table_read_only))
25+
.route("/api/tables/drop", post(api::drop::drop_table_read_only));
2426
} else {
25-
router = router.route("/api/table", post(api::register::register_table::<H>));
27+
router = router
28+
.route("/api/table", post(api::register::register_table::<H>))
29+
.route("/api/tables/drop", post(api::drop::drop_table::<H>));
2630
}
2731

2832
router

roapi/src/context.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ pub trait RoapiContext: Send + Sync + 'static {
6262

6363
async fn load_table(&self, table: &TableSource) -> Result<(), ColumnQError>;
6464

65+
async fn drop_table(&self, table: &TableSource) -> Result<(), ColumnQError>;
66+
6567
async fn schemas(&self) -> Result<Vec<(String, arrow::datatypes::SchemaRef)>, ApiErrResp>;
6668

6769
async fn schemas_json_bytes(&self) -> Result<Vec<u8>, ApiErrResp>;
@@ -108,6 +110,13 @@ impl RoapiContext for RawRoapiContext {
108110
))
109111
}
110112

113+
#[inline]
114+
async fn drop_table(&self, _table: &TableSource) -> Result<(), ColumnQError> {
115+
Err(ColumnQError::Generic(
116+
"Table update not supported in read only mode".to_string(),
117+
))
118+
}
119+
111120
#[inline]
112121
async fn schemas(&self) -> Result<Vec<(String, arrow::datatypes::SchemaRef)>, ApiErrResp> {
113122
Ok(self
@@ -209,6 +218,12 @@ impl RoapiContext for ConcurrentRoapiContext {
209218
ctx.cq.load_table(table).await
210219
}
211220

221+
#[inline]
222+
async fn drop_table(&self, table: &TableSource) -> Result<(), ColumnQError> {
223+
let mut ctx = self.write().await;
224+
ctx.cq.drop_table(table).await
225+
}
226+
212227
#[inline]
213228
async fn table_names(&self) -> Vec<String> {
214229
let ctx = self.read().await;

roapi/src/error.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,14 @@ impl ApiErrResp {
9797
}
9898
}
9999

100+
pub fn drop_table(error: ColumnQError) -> Self {
101+
Self {
102+
code: http::StatusCode::INTERNAL_SERVER_ERROR,
103+
error: "drop_table".to_string(),
104+
message: error.to_string(),
105+
}
106+
}
107+
100108
pub fn load_table(error: ColumnQError) -> Self {
101109
Self {
102110
code: http::StatusCode::INTERNAL_SERVER_ERROR,

roapi/tests/api_test.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::collections::HashMap;
44

55
use async_process::Command;
66
use columnq::arrow::datatypes::Schema;
7+
use serde_json::json;
78

89
#[tokio::test]
910
async fn test_schema() {
@@ -18,6 +19,26 @@ async fn test_schema() {
1819
assert!(body.contains_key("spacex_launches"));
1920
}
2021

22+
#[tokio::test]
23+
async fn test_drop_table() {
24+
let json_table = helpers::get_spacex_table();
25+
let (app, address) = helpers::test_api_app_with_tables(vec![json_table]).await;
26+
tokio::spawn(app.run_until_stopped());
27+
28+
let get_response =
29+
helpers::http_get(&format!("{address}/api/tables/spacex_launches"), None).await;
30+
assert_eq!(get_response.status(), 200);
31+
let drop_response = helpers::http_post_json(
32+
&format!("{address}/api/tables/drop"),
33+
json!([{"tableName": "spacex_launches"}]).to_string(),
34+
)
35+
.await;
36+
assert_eq!(drop_response.status(), 200);
37+
let failed_get_response =
38+
helpers::http_get(&format!("{address}/api/tables/spaces_launches"), None).await;
39+
assert_eq!(failed_get_response.status(), 400);
40+
}
41+
2142
#[tokio::test]
2243
async fn test_uk_cities_sql_post() {
2344
let table = helpers::get_uk_cities_table();

roapi/tests/helpers.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ pub async fn test_api_app(
3535
},
3636
tables,
3737
reload_interval: Some(Duration::from_secs(1000)),
38+
disable_read_only: true,
3839
kvstores,
3940
..Default::default()
4041
};
@@ -68,6 +69,17 @@ pub async fn http_post(url: &str, payload: impl Into<reqwest::Body>) -> reqwest:
6869
.expect("Unable to execute POST request")
6970
}
7071

72+
#[allow(dead_code)]
73+
pub async fn http_post_json(url: &str, payload: impl Into<reqwest::Body>) -> reqwest::Response {
74+
reqwest::Client::new()
75+
.post(url)
76+
.header("Content-Type", "application/json")
77+
.body(payload)
78+
.send()
79+
.await
80+
.expect("Unable to execute POST request")
81+
}
82+
7183
#[allow(dead_code)]
7284
pub fn get_spacex_table() -> TableSource {
7385
let json_source_path = test_data_path("spacex_launches.json");

0 commit comments

Comments
 (0)