Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"dockerComposeFile": "docker-compose.yml",
"service": "devcontainer",
"runServices": ["nginx", "postgres", "minio", "fake-gcs"],
"workspaceFolder": "/workspaces/moonlink",
"features": {
"ghcr.io/devcontainers/features/docker-in-docker:2": {},
Expand Down
16 changes: 16 additions & 0 deletions .devcontainer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ services:
dockerfile: Dockerfile
volumes:
- ..:/workspaces/moonlink:cached
- shared-nginx:/tmp/moonlink_service_test
networks:
- shared_network
depends_on:
- minio
- postgres
- fake-gcs
- nginx
command: sleep infinity

postgres:
Expand Down Expand Up @@ -53,9 +55,23 @@ services:
volumes:
- fake-gcs-data:/data

nginx:
image: nginx:latest
hostname: nginx
ports:
- "8080:80"
networks:
shared_network:
aliases:
- nginx.local
volumes:
- shared-nginx:/usr/share/nginx/html:ro

volumes:
minio-data:
fake-gcs-data:
shared-nginx:
name: moonlink-shared-nginx

networks:
shared_network:
Expand Down
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/moonlink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub use table_notify::TableEvent;
pub use union_read::{ReadState, ReadStateFilepathRemap, ReadStateManager};

#[cfg(any(test, feature = "test-utils"))]
pub use union_read::decode_read_state_for_testing;
pub use union_read::{decode_read_state_for_testing, decode_serialized_read_state_for_testing};

#[cfg(feature = "bench")]
pub use storage::GlobalIndex;
Expand Down
2 changes: 1 addition & 1 deletion src/moonlink/src/union_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ pub use read_state::ReadStateFilepathRemap;
pub use read_state_manager::ReadStateManager;

#[cfg(any(test, feature = "test-utils"))]
pub use read_state::decode_read_state_for_testing;
pub use read_state::{decode_read_state_for_testing, decode_serialized_read_state_for_testing};
24 changes: 24 additions & 0 deletions src/moonlink/src/union_read/read_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,16 @@ impl ReadState {
.collect::<Vec<_>>();

// Map from local filepath to remote file path if needed and if possible.

println!("data files = {:?}", data_files);

let remapped_data_files = data_files
.into_iter()
.map(|path| read_state_filepath_remap(path))
.collect::<Vec<_>>();

println!("remapped data files = {:?}", remapped_data_files);

let remapped_puffin_files = puffin_files
.into_iter()
.map(|path| read_state_filepath_remap(path))
Comment on lines +97 to 109

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

These println! statements appear to be for debugging purposes and should be removed before merging.

Suggested change
println!("data files = {:?}", data_files);
let remapped_data_files = data_files
.into_iter()
.map(|path| read_state_filepath_remap(path))
.collect::<Vec<_>>();
println!("remapped data files = {:?}", remapped_data_files);
let remapped_puffin_files = puffin_files
.into_iter()
.map(|path| read_state_filepath_remap(path))
// Map from local filepath to remote file path if needed and if possible.
let remapped_data_files = data_files
.into_iter()
.map(|path| read_state_filepath_remap(path))
.collect::<Vec<_>>();
let remapped_puffin_files = puffin_files
.into_iter()
.map(|path| read_state_filepath_remap(path))

Expand Down Expand Up @@ -139,6 +145,24 @@ pub fn decode_read_state_for_testing(
)
}

#[cfg(any(test, feature = "test-utils"))]
#[allow(clippy::type_complexity)]
pub fn decode_serialized_read_state_for_testing(
data: Vec<u8>,
) -> (
Vec<String>, /*data_file_paths*/
Vec<String>, /*puffin_file_paths*/
Vec<PuffinDeletionBlobAtRead>,
Vec<(u32, u32)>,
) {
let read_state = ReadState {
data,
associated_files: vec![],
cache_handles: vec![],
};
decode_read_state_for_testing(&read_state)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
9 changes: 9 additions & 0 deletions src/moonlink_service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ tokio = { workspace = true }
tower-http = { version = "0.6", features = ["cors", "trace"] }
tracing = { workspace = true }

[dev-dependencies]
reqwest = { version = "0.12", features = ["json"] }
arrow-array = { workspace = true }
bytes = "1"
tokio = { workspace = true }
parquet = { workspace = true }
arrow = { workspace = true }
tempfile = { workspace = true }

[[example]]
name = "rest_api_demo"
required-features = ["rest_api_demo"]
2 changes: 1 addition & 1 deletion src/moonlink_service/examples/rest_api_demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ async fn read_table_via_rpc() -> Result<(), Box<dyn std::error::Error>> {

// Scan table data
println!(" 🔍 Scanning table data...");
let data_bytes = moonlink_rpc::scan_table_begin(
let data_bytes: Vec<u8> = moonlink_rpc::scan_table_begin(
&mut stream,
table.mooncake_database.clone(),
table.mooncake_table.clone(),
Expand Down
11 changes: 10 additions & 1 deletion src/moonlink_service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ use tokio::{
use tracing::{error, info};

/// Default readiness probe port number.
const READINESS_PROBE_PORT: u16 = 5050;
pub(crate) const READINESS_PROBE_PORT: u16 = 5050;

/// Service initiation and execution status.
struct ServiceStatus {
/// Whether the service starts up successfully.
ready: AtomicBool,
}

#[derive(Debug)]
pub struct ServiceConfig {
/// Base location for moonlink storage (including cache files, iceberg tables, etc).
pub base_path: String,
Expand Down Expand Up @@ -79,6 +80,11 @@ pub async fn start_with_config(config: ServiceConfig) -> Result<()> {

// Initialize moonlink backend.
let mut sigterm = signal(SignalKind::terminate()).unwrap();

println!("start with config = {:?}", config);

println!("sqlite metadata store = {}", config.base_path);

Comment on lines +83 to +87

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

These println! statements appear to be for debugging and should be removed.

let sqlite_metadata_accessor = SqliteMetadataStore::new_with_directory(&config.base_path)
.await
.unwrap();
Expand Down Expand Up @@ -162,3 +168,6 @@ pub async fn start_with_config(config: ServiceConfig) -> Result<()> {
info!("Moonlink service shut down complete");
Ok(())
}

#[cfg(test)]
mod test;
190 changes: 190 additions & 0 deletions src/moonlink_service/src/test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
use std::collections::HashMap;
use std::sync::Arc;

use moonlink::decode_serialized_read_state_for_testing;
use serde_json::json;
use bytes::Bytes;
use tokio::net::TcpStream;
use arrow_array::{Int32Array, RecordBatch, StringArray};
use arrow::datatypes::Schema as ArrowSchema;
use arrow::datatypes::{DataType, Field};
use tempfile::TempDir;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use std::io::Cursor;
use reqwest;

use moonlink_rpc::{scan_table_begin, scan_table_end};
use crate::{start_with_config, ServiceConfig, READINESS_PROBE_PORT};

/// Moonlink backend directory.
const MOONLINK_BACKEND_DIR: &str = "/tmp/moonlink_service_test";
/// Local nginx server IP/port address.
const NGINX_ADDR: &str = "http://nginx.local:80";
/// Local moonlink REST API IP/port address.
const REST_ADDR: &str = "http://127.0.0.1:3030";
/// Local moonlink server IP/port address.
const MOONLINK_ADDR: &str = "127.0.0.1:3031";
/// Test database name.
const DATABASE: &str = "test-database";
/// Test table name.
const TABLE: &str = "test-table";

/// Util function to delete and re-create the given directory.
fn recreate_directory(dir: &str) {
// Clean up directory to place moonlink temporary files.
match std::fs::remove_dir_all(dir) {
Ok(()) => {}
Err(e) => {
if e.kind() != std::io::ErrorKind::NotFound {
return;
}
}
}
std::fs::create_dir_all(dir).unwrap();
}

/// Send request to readiness endpoint.
async fn test_readiness_probe() {
let url = format!("http://127.0.0.1:{}/ready", READINESS_PROBE_PORT);
loop {
if let Ok(resp) = reqwest::get(&url).await {
if resp.status() == reqwest::StatusCode::OK {
return;
}
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
Comment on lines +47 to +57

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This readiness probe loop can run indefinitely and will silently ignore connection errors, making test failures hard to debug. It's better to have a timeout and to panic with a clear error message if the service doesn't become ready within a reasonable time.

Suggested change
async fn test_readiness_probe() {
let url = format!("http://127.0.0.1:{}/ready", READINESS_PROBE_PORT);
loop {
if let Ok(resp) = reqwest::get(&url).await {
if resp.status() == reqwest::StatusCode::OK {
return;
}
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
async fn test_readiness_probe() {
let url = format!("http://127.0.0.1:{}/ready", READINESS_PROBE_PORT);
for _ in 0..30 { // ~3 seconds timeout
if let Ok(resp) = reqwest::get(&url).await {
if resp.status() == reqwest::StatusCode::OK {
return;
}
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
panic!("Readiness probe failed to respond after 3 seconds.");
}


/// Util function to load all record batches inside of the given [`path`].
pub async fn read_all_batches(url: &str) -> Vec<RecordBatch> {
let resp = reqwest::get(url).await.unwrap();
println!("status = {}", resp.status());

assert!(resp.status().is_success());

println!("content = {}", resp.text().await.unwrap());

vec![]

// let data: Bytes = resp.bytes().await.unwrap();
// println!("read len = {}", data.len());
// let mut reader = ParquetRecordBatchReaderBuilder::try_new(data).unwrap()
// .build().unwrap();

// // Collect all record batches
// let batches = reader
// .into_iter()
// .map(|b| b.unwrap()) // handle Err properly in production
// .collect();

// batches
}
Comment on lines +60 to +82

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This function is incomplete and returns an empty vector, which causes test_moonlink_standalone to fail. The intended implementation appears to be commented out. The println! statements should also be removed, and unwrap() calls should be replaced with expect() to provide more context on failure.

pub async fn read_all_batches(url: &str) -> Vec<RecordBatch> {
    let resp = reqwest::get(url)
        .await
        .unwrap_or_else(|e| panic!("Failed to GET from url: {}: {}", url, e));

    assert!(resp.status().is_success(), "Request to {} failed with status {}", url, resp.status());

    let data: Bytes = resp.bytes().await.expect("Failed to read response bytes");
    let reader = ParquetRecordBatchReaderBuilder::try_new(data)
        .expect("Failed to create parquet reader builder")
        .build()
        .expect("Failed to build parquet reader");

    // Collect all record batches
    reader
        .map(|b| b.expect("Failed to read record batch"))
        .collect()
}


/// Util function to create test arrow schema.
fn create_test_arrow_schema() -> Arc<ArrowSchema> {
Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, /*nullable=*/false).with_metadata(HashMap::from([(
"PARQUET:field_id".to_string(),
"0".to_string(),
)])),
Field::new("name", DataType::Utf8, /*nullable=*/false).with_metadata(HashMap::from([(
"PARQUET:field_id".to_string(),
"1".to_string(),
)])),
Field::new("email", DataType::Utf8, /*nullable=*/true).with_metadata(HashMap::from([(
"PARQUET:field_id".to_string(),
"2".to_string(),
)])),
Field::new("age", DataType::Int32, /*nullable=*/true).with_metadata(HashMap::from([(
"PARQUET:field_id".to_string(),
"3".to_string(),
)])),
]))
}

#[tokio::test]
async fn test_moonlink_standalone() {
recreate_directory(MOONLINK_BACKEND_DIR);
let config = ServiceConfig {
base_path: MOONLINK_BACKEND_DIR.to_string(),
Comment on lines +108 to +110

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Instead of using a hardcoded directory and a manual cleanup function, it's better to use tempfile::TempDir for creating a temporary directory for the test. This avoids hardcoded paths, potential conflicts, and ensures the directory is cleaned up automatically. Please also remove the MOONLINK_BACKEND_DIR constant and the recreate_directory function.

Suggested change
recreate_directory(MOONLINK_BACKEND_DIR);
let config = ServiceConfig {
base_path: MOONLINK_BACKEND_DIR.to_string(),
let temp_dir = TempDir::new().expect("Failed to create temp dir");
let config = ServiceConfig {
base_path: temp_dir.path().to_str().expect("temp dir path is not valid utf8").to_string(),

data_server_uri: Some(NGINX_ADDR.to_string()),
rest_api_port: Some(3030),
tcp_port: Some(3031),
};
tokio::spawn(async move {
start_with_config(config).await.unwrap();
});
test_readiness_probe().await;

// Create test table.
let client = reqwest::Client::new();
let create_table_payload = json!({
"mooncake_database": DATABASE,
"mooncake_table": TABLE,
"schema": [
{"name": "id", "data_type": "int32", "nullable": false},
{"name": "name", "data_type": "string", "nullable": false},
{"name": "email", "data_type": "string", "nullable": true},
{"name": "age", "data_type": "int32", "nullable": true}
]
});
let response = client
.post(format!("{REST_ADDR}/tables/{TABLE}"))
.header("content-type", "application/json")
.json(&create_table_payload)
.send()
.await.unwrap();
assert!(response.status().is_success());

// Ingest some data.
let insert_payload = json!({
"operation": "insert",
"data": {
"id": 1,
"name": "Alice Johnson",
"email": "alice@example.com",
"age": 30
}
});
let response = client
.post(format!("{REST_ADDR}/ingest/{TABLE}"))
.header("content-type", "application/json")
.json(&insert_payload)
.send()
.await.unwrap();
assert!(response.status().is_success());

// Scan table and get data file and puffin files back.
let mut moonlink_stream = TcpStream::connect(MOONLINK_ADDR).await.unwrap();
let bytes = scan_table_begin(
&mut moonlink_stream,
DATABASE.to_string(),
TABLE.to_string(),
0,
).await.unwrap();
let (data_file_paths, puffin_file_paths, puffin_deletion, positional_deletion) = decode_serialized_read_state_for_testing(bytes);
assert_eq!(data_file_paths.len(), 1);
let record_batches = read_all_batches(&data_file_paths[0]).await;
let expected_arrow_batch = RecordBatch::try_new(
create_test_arrow_schema(),
vec![
Arc::new(Int32Array::from(vec![1])),
Arc::new(StringArray::from(vec!["Alice Johnson".to_string()])),
Arc::new(StringArray::from(vec!["alice@example.com".to_string()])),
Arc::new(Int32Array::from(vec![30])),
],
)
.unwrap();
assert_eq!(record_batches, vec![expected_arrow_batch]);

assert!(puffin_file_paths.is_empty());
assert!(puffin_deletion.is_empty());
assert!(positional_deletion.is_empty());

scan_table_end(
&mut moonlink_stream,
DATABASE.to_string(),
TABLE.to_string(),
).await.unwrap();
}
Empty file.
Empty file.