diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index d707a96f5..9925d8958 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -1,6 +1,7 @@ { "dockerComposeFile": "docker-compose.yml", "service": "devcontainer", + "runServices": ["nginx", "postgres", "minio", "fake-gcs"], "workspaceFolder": "/workspaces/moonlink", "features": { "ghcr.io/devcontainers/features/docker-in-docker:2": {}, diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml index a5922c6bb..8378b925b 100644 --- a/.devcontainer/docker-compose.yml +++ b/.devcontainer/docker-compose.yml @@ -5,12 +5,14 @@ services: dockerfile: Dockerfile volumes: - ..:/workspaces/moonlink:cached + - shared-nginx:/tmp/moonlink_service_test networks: - shared_network depends_on: - minio - postgres - fake-gcs + - nginx command: sleep infinity postgres: @@ -53,9 +55,23 @@ services: volumes: - fake-gcs-data:/data + nginx: + image: nginx:latest + hostname: nginx + ports: + - "8080:80" + networks: + shared_network: + aliases: + - nginx.local + volumes: + - shared-nginx:/usr/share/nginx/html:ro + volumes: minio-data: fake-gcs-data: + shared-nginx: + name: moonlink-shared-nginx networks: shared_network: diff --git a/Cargo.lock b/Cargo.lock index 248a9e8e8..0d581d015 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4397,17 +4397,22 @@ dependencies = [ name = "moonlink_service" version = "0.0.1" dependencies = [ + "arrow", + "arrow-array", "arrow-ipc", "arrow-schema", "axum 0.8.4", + "bytes", "clap", "moonlink", "moonlink_backend", "moonlink_metadata_store", "moonlink_rpc", + "parquet", "reqwest", "serde", "serde_json", + "tempfile", "thiserror 2.0.14", "tokio", "tower-http", diff --git a/src/moonlink/src/lib.rs b/src/moonlink/src/lib.rs index 0a99417d1..0f5889410 100644 --- a/src/moonlink/src/lib.rs +++ b/src/moonlink/src/lib.rs @@ -26,7 +26,7 @@ pub use table_notify::TableEvent; pub use union_read::{ReadState, ReadStateFilepathRemap, ReadStateManager}; #[cfg(any(test, feature = "test-utils"))] -pub use union_read::decode_read_state_for_testing; +pub use union_read::{decode_read_state_for_testing, decode_serialized_read_state_for_testing}; #[cfg(feature = "bench")] pub use storage::GlobalIndex; diff --git a/src/moonlink/src/union_read.rs b/src/moonlink/src/union_read.rs index 831bc3f30..8c7844764 100644 --- a/src/moonlink/src/union_read.rs +++ b/src/moonlink/src/union_read.rs @@ -7,4 +7,4 @@ pub use read_state::ReadStateFilepathRemap; pub use read_state_manager::ReadStateManager; #[cfg(any(test, feature = "test-utils"))] -pub use read_state::decode_read_state_for_testing; +pub use read_state::{decode_read_state_for_testing, decode_serialized_read_state_for_testing}; diff --git a/src/moonlink/src/union_read/read_state.rs b/src/moonlink/src/union_read/read_state.rs index 1c2dca3c6..4df17a8b1 100644 --- a/src/moonlink/src/union_read/read_state.rs +++ b/src/moonlink/src/union_read/read_state.rs @@ -94,10 +94,16 @@ impl ReadState { .collect::>(); // Map from local filepath to remote file path if needed and if possible. + + println!("data files = {:?}", data_files); + let remapped_data_files = data_files .into_iter() .map(|path| read_state_filepath_remap(path)) .collect::>(); + + println!("remapped data files = {:?}", remapped_data_files); + let remapped_puffin_files = puffin_files .into_iter() .map(|path| read_state_filepath_remap(path)) @@ -139,6 +145,24 @@ pub fn decode_read_state_for_testing( ) } +#[cfg(any(test, feature = "test-utils"))] +#[allow(clippy::type_complexity)] +pub fn decode_serialized_read_state_for_testing( + data: Vec, +) -> ( + Vec, /*data_file_paths*/ + Vec, /*puffin_file_paths*/ + Vec, + Vec<(u32, u32)>, +) { + let read_state = ReadState { + data, + associated_files: vec![], + cache_handles: vec![], + }; + decode_read_state_for_testing(&read_state) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/moonlink_service/Cargo.toml b/src/moonlink_service/Cargo.toml index 39a2352d1..9cff58ec1 100644 --- a/src/moonlink_service/Cargo.toml +++ b/src/moonlink_service/Cargo.toml @@ -25,6 +25,15 @@ tokio = { workspace = true } tower-http = { version = "0.6", features = ["cors", "trace"] } tracing = { workspace = true } +[dev-dependencies] +reqwest = { version = "0.12", features = ["json"] } +arrow-array = { workspace = true } +bytes = "1" +tokio = { workspace = true } +parquet = { workspace = true } +arrow = { workspace = true } +tempfile = { workspace = true } + [[example]] name = "rest_api_demo" required-features = ["rest_api_demo"] diff --git a/src/moonlink_service/examples/rest_api_demo.rs b/src/moonlink_service/examples/rest_api_demo.rs index c74324eda..53738ee5e 100644 --- a/src/moonlink_service/examples/rest_api_demo.rs +++ b/src/moonlink_service/examples/rest_api_demo.rs @@ -306,7 +306,7 @@ async fn read_table_via_rpc() -> Result<(), Box> { // Scan table data println!(" 🔍 Scanning table data..."); - let data_bytes = moonlink_rpc::scan_table_begin( + let data_bytes: Vec = moonlink_rpc::scan_table_begin( &mut stream, table.mooncake_database.clone(), table.mooncake_table.clone(), diff --git a/src/moonlink_service/src/lib.rs b/src/moonlink_service/src/lib.rs index a282fb981..d307c6dd7 100644 --- a/src/moonlink_service/src/lib.rs +++ b/src/moonlink_service/src/lib.rs @@ -16,7 +16,7 @@ use tokio::{ use tracing::{error, info}; /// Default readiness probe port number. -const READINESS_PROBE_PORT: u16 = 5050; +pub(crate) const READINESS_PROBE_PORT: u16 = 5050; /// Service initiation and execution status. struct ServiceStatus { @@ -24,6 +24,7 @@ struct ServiceStatus { ready: AtomicBool, } +#[derive(Debug)] pub struct ServiceConfig { /// Base location for moonlink storage (including cache files, iceberg tables, etc). pub base_path: String, @@ -79,6 +80,11 @@ pub async fn start_with_config(config: ServiceConfig) -> Result<()> { // Initialize moonlink backend. let mut sigterm = signal(SignalKind::terminate()).unwrap(); + + println!("start with config = {:?}", config); + + println!("sqlite metadata store = {}", config.base_path); + let sqlite_metadata_accessor = SqliteMetadataStore::new_with_directory(&config.base_path) .await .unwrap(); @@ -162,3 +168,6 @@ pub async fn start_with_config(config: ServiceConfig) -> Result<()> { info!("Moonlink service shut down complete"); Ok(()) } + +#[cfg(test)] +mod test; diff --git a/src/moonlink_service/src/test.rs b/src/moonlink_service/src/test.rs new file mode 100644 index 000000000..549ac98a7 --- /dev/null +++ b/src/moonlink_service/src/test.rs @@ -0,0 +1,190 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use moonlink::decode_serialized_read_state_for_testing; +use serde_json::json; +use bytes::Bytes; +use tokio::net::TcpStream; +use arrow_array::{Int32Array, RecordBatch, StringArray}; +use arrow::datatypes::Schema as ArrowSchema; +use arrow::datatypes::{DataType, Field}; +use tempfile::TempDir; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use std::io::Cursor; +use reqwest; + +use moonlink_rpc::{scan_table_begin, scan_table_end}; +use crate::{start_with_config, ServiceConfig, READINESS_PROBE_PORT}; + +/// Moonlink backend directory. +const MOONLINK_BACKEND_DIR: &str = "/tmp/moonlink_service_test"; +/// Local nginx server IP/port address. +const NGINX_ADDR: &str = "http://nginx.local:80"; +/// Local moonlink REST API IP/port address. +const REST_ADDR: &str = "http://127.0.0.1:3030"; +/// Local moonlink server IP/port address. +const MOONLINK_ADDR: &str = "127.0.0.1:3031"; +/// Test database name. +const DATABASE: &str = "test-database"; +/// Test table name. +const TABLE: &str = "test-table"; + +/// Util function to delete and re-create the given directory. +fn recreate_directory(dir: &str) { + // Clean up directory to place moonlink temporary files. + match std::fs::remove_dir_all(dir) { + Ok(()) => {} + Err(e) => { + if e.kind() != std::io::ErrorKind::NotFound { + return; + } + } + } + std::fs::create_dir_all(dir).unwrap(); +} + +/// Send request to readiness endpoint. +async fn test_readiness_probe() { + let url = format!("http://127.0.0.1:{}/ready", READINESS_PROBE_PORT); + loop { + if let Ok(resp) = reqwest::get(&url).await { + if resp.status() == reqwest::StatusCode::OK { + return; + } + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } +} + +/// Util function to load all record batches inside of the given [`path`]. +pub async fn read_all_batches(url: &str) -> Vec { + let resp = reqwest::get(url).await.unwrap(); + println!("status = {}", resp.status()); + + assert!(resp.status().is_success()); + + println!("content = {}", resp.text().await.unwrap()); + + vec![] + + // let data: Bytes = resp.bytes().await.unwrap(); + // println!("read len = {}", data.len()); + // let mut reader = ParquetRecordBatchReaderBuilder::try_new(data).unwrap() + // .build().unwrap(); + + // // Collect all record batches + // let batches = reader + // .into_iter() + // .map(|b| b.unwrap()) // handle Err properly in production + // .collect(); + + // batches +} + +/// Util function to create test arrow schema. +fn create_test_arrow_schema() -> Arc { + Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, /*nullable=*/false).with_metadata(HashMap::from([( + "PARQUET:field_id".to_string(), + "0".to_string(), + )])), + Field::new("name", DataType::Utf8, /*nullable=*/false).with_metadata(HashMap::from([( + "PARQUET:field_id".to_string(), + "1".to_string(), + )])), + Field::new("email", DataType::Utf8, /*nullable=*/true).with_metadata(HashMap::from([( + "PARQUET:field_id".to_string(), + "2".to_string(), + )])), + Field::new("age", DataType::Int32, /*nullable=*/true).with_metadata(HashMap::from([( + "PARQUET:field_id".to_string(), + "3".to_string(), + )])), + ])) +} + +#[tokio::test] +async fn test_moonlink_standalone() { + recreate_directory(MOONLINK_BACKEND_DIR); + let config = ServiceConfig { + base_path: MOONLINK_BACKEND_DIR.to_string(), + data_server_uri: Some(NGINX_ADDR.to_string()), + rest_api_port: Some(3030), + tcp_port: Some(3031), + }; + tokio::spawn(async move { + start_with_config(config).await.unwrap(); + }); + test_readiness_probe().await; + + // Create test table. + let client = reqwest::Client::new(); + let create_table_payload = json!({ + "mooncake_database": DATABASE, + "mooncake_table": TABLE, + "schema": [ + {"name": "id", "data_type": "int32", "nullable": false}, + {"name": "name", "data_type": "string", "nullable": false}, + {"name": "email", "data_type": "string", "nullable": true}, + {"name": "age", "data_type": "int32", "nullable": true} + ] + }); + let response = client + .post(format!("{REST_ADDR}/tables/{TABLE}")) + .header("content-type", "application/json") + .json(&create_table_payload) + .send() + .await.unwrap(); + assert!(response.status().is_success()); + + // Ingest some data. + let insert_payload = json!({ + "operation": "insert", + "data": { + "id": 1, + "name": "Alice Johnson", + "email": "alice@example.com", + "age": 30 + } + }); + let response = client + .post(format!("{REST_ADDR}/ingest/{TABLE}")) + .header("content-type", "application/json") + .json(&insert_payload) + .send() + .await.unwrap(); + assert!(response.status().is_success()); + + // Scan table and get data file and puffin files back. + let mut moonlink_stream = TcpStream::connect(MOONLINK_ADDR).await.unwrap(); + let bytes = scan_table_begin( + &mut moonlink_stream, + DATABASE.to_string(), + TABLE.to_string(), + 0, + ).await.unwrap(); + let (data_file_paths, puffin_file_paths, puffin_deletion, positional_deletion) = decode_serialized_read_state_for_testing(bytes); + assert_eq!(data_file_paths.len(), 1); + let record_batches = read_all_batches(&data_file_paths[0]).await; + let expected_arrow_batch = RecordBatch::try_new( + create_test_arrow_schema(), + vec![ + Arc::new(Int32Array::from(vec![1])), + Arc::new(StringArray::from(vec!["Alice Johnson".to_string()])), + Arc::new(StringArray::from(vec!["alice@example.com".to_string()])), + Arc::new(Int32Array::from(vec![30])), + ], + ) + .unwrap(); + assert_eq!(record_batches, vec![expected_arrow_batch]); + + assert!(puffin_file_paths.is_empty()); + assert!(puffin_deletion.is_empty()); + assert!(positional_deletion.is_empty()); + + scan_table_end( + &mut moonlink_stream, + DATABASE.to_string(), + TABLE.to_string(), + ).await.unwrap(); +} diff --git a/src/moonlink_service/src/test_requests.json b/src/moonlink_service/src/test_requests.json new file mode 100644 index 000000000..e69de29bb diff --git a/src/moonlink_service/src/test_service_request.rs b/src/moonlink_service/src/test_service_request.rs new file mode 100644 index 000000000..e69de29bb