From aa4877cbfaaeacbea7f07e53b86f6740226535c8 Mon Sep 17 00:00:00 2001 From: Yanxin Xiang Date: Tue, 17 Dec 2024 17:31:37 -0800 Subject: [PATCH] modify scripts --- benchmarks/requirements.txt | 3 + benchmarks/src/bin/tpcds.rs | 17 -- benchmarks/transfer_dat_parquet.py | 246 ++++++++++++++++++++++++++--- benchmarks/verify.sh | 236 +++++++++++++++++++++++++++ 4 files changed, 463 insertions(+), 39 deletions(-) create mode 100755 benchmarks/verify.sh diff --git a/benchmarks/requirements.txt b/benchmarks/requirements.txt index 20a5a2bddbf2..ebde6ece704d 100644 --- a/benchmarks/requirements.txt +++ b/benchmarks/requirements.txt @@ -16,3 +16,6 @@ # under the License. rich +pandas +pyarrow +chardet \ No newline at end of file diff --git a/benchmarks/src/bin/tpcds.rs b/benchmarks/src/bin/tpcds.rs index 1651e088957e..59183a121eb5 100644 --- a/benchmarks/src/bin/tpcds.rs +++ b/benchmarks/src/bin/tpcds.rs @@ -125,19 +125,13 @@ async fn compare_duckdb_datafusion( sql: &str, parquet_dir: &str, ) -> Result<(), DataFusionError> { - // Step 1: Execute query in DuckDB (used as the expected result) let expected_batches = execute_duckdb_query(sql, parquet_dir)?; - - // Step 2: Execute query in DataFusion (actual result) let ctx = create_tpcds_context(parquet_dir).await?; let actual_batches = execute_datafusion_query(sql, ctx).await?; - - // Step 3: Format the batches for comparison let expected_output = pretty_format_batches(&expected_batches)?.to_string(); let actual_output = pretty_format_batches(&actual_batches)?.to_string(); if expected_output != actual_output { - // Print detailed error information if outputs do not match eprintln!("❌ Query failed: Results do not match!"); eprintln!("SQL:\n{}", sql); eprintln!("Expected:\n{}", expected_output); @@ -153,19 +147,16 @@ async fn compare_duckdb_datafusion( /// Execute a query in DuckDB and return the results as RecordBatch fn execute_duckdb_query(sql: &str, parquet_dir: &str) -> Result> { - // Initialize DuckDB connection let conn = Connection::open_in_memory().map_err(|e| { DataFusionError::Execution(format!("DuckDB connection error: {}", e)) })?; - // Register all TPC-DS tables in DuckDB for table in TPCDS_TABLES { let path = format!("{}/{}.parquet", parquet_dir, table); let sql = format!( "CREATE TABLE {} AS SELECT * FROM read_parquet('{}')", table, path ); - println!("sql is {:?}", sql); conn.execute(&sql, []).map_err(|e| { DataFusionError::Execution(format!( "Error registering table '{}': {}", @@ -174,7 +165,6 @@ fn execute_duckdb_query(sql: &str, parquet_dir: &str) -> Result })?; } - // Execute the query let mut stmt = conn.prepare(sql).map_err(|e| { DataFusionError::Execution(format!("SQL preparation error: {}", e)) })?; @@ -191,14 +181,10 @@ async fn execute_datafusion_query( sql: &str, ctx: SessionContext, ) -> Result> { - // Execute the query let df = ctx.sql(sql).await?; - - // Collect the results into RecordBatch df.collect().await } -/// Load SQL query from a file fn load_query(query_number: usize) -> Result { let query_path = format!("datafusion/core/tests/tpc-ds/{}.sql", query_number); fs::read_to_string(&query_path).map_err(|e| { @@ -211,10 +197,7 @@ fn load_query(query_number: usize) -> Result { #[tokio::main] async fn main() -> Result<()> { - // Initialize logger env_logger::init(); - - // Parse command-line arguments let opt = TpcdsOpt::from_args(); match opt { TpcdsOpt::Run(opt) => opt.run().await, diff --git a/benchmarks/transfer_dat_parquet.py b/benchmarks/transfer_dat_parquet.py index 4beb8dc77fff..241cead0f2a6 100644 --- a/benchmarks/transfer_dat_parquet.py +++ b/benchmarks/transfer_dat_parquet.py @@ -15,12 +15,17 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + import os import pyarrow as pa import pyarrow.parquet as pq import chardet from datetime import datetime import pandas as pd +import re +from decimal import Decimal, getcontext +import argparse + # Define TPC-DS table schemas with column names TABLE_SCHEMAS = { @@ -553,13 +558,166 @@ "t_sub_shift": "string", "t_meal_time": "string" } - } + }, + "web_page": { + "columns": [ + "wp_web_page_sk", "wp_web_page_id", "wp_rec_start_date", "wp_rec_end_date", + "wp_creation_date_sk", "wp_access_date_sk", "wp_autogen_flag", + "wp_customer_sk", "wp_url", "wp_type", + "wp_char_count", "wp_link_count", "wp_image_count", "wp_max_ad_count" + ], + "dtype": { + "wp_web_page_sk": "int32", + "wp_web_page_id": "string", + "wp_rec_start_date": "date32", + "wp_rec_end_date": "date32", + "wp_creation_date_sk": "int32", + "wp_access_date_sk": "int32", + "wp_autogen_flag": "string", + "wp_customer_sk": "int32", + "wp_url": "string", + "wp_type": "string", + "wp_char_count": "int32", + "wp_link_count": "int32", + "wp_image_count": "int32", + "wp_max_ad_count": "int32" + } + }, + "web_site": { + "columns": [ + "web_site_sk", "web_site_id", "web_rec_start_date", "web_rec_end_date", + "web_name", "web_open_date_sk", "web_close_date_sk", "web_class", + "web_manager", "web_mkt_id", "web_mkt_class", "web_mkt_desc", + "web_market_manager", "web_company_id", "web_company_name", + "web_street_number", "web_street_name", "web_street_type", + "web_suite_number", "web_city", "web_county", "web_state", + "web_zip", "web_country", "web_gmt_offset", "web_tax_percentage" + ], + "dtype": { + "web_site_sk": "int32", + "web_site_id": "string", + "web_rec_start_date": "date32", + "web_rec_end_date": "date32", + "web_name": "string", + "web_open_date_sk": "int32", + "web_close_date_sk": "int32", + "web_class": "string", + "web_manager": "string", + "web_mkt_id": "int32", + "web_mkt_class": "string", + "web_mkt_desc": "string", + "web_market_manager": "string", + "web_company_id": "int32", + "web_company_name": "string", + "web_street_number": "string", + "web_street_name": "string", + "web_street_type": "string", + "web_suite_number": "string", + "web_city": "string", + "web_county": "string", + "web_state": "string", + "web_zip": "string", + "web_country": "string", + "web_gmt_offset": "decimal(5, 2)", + "web_tax_percentage": "decimal(5, 2)" + } + }, + "store_sales": { + "columns": [ + "ss_sold_date_sk", "ss_sold_time_sk", "ss_item_sk", "ss_customer_sk", + "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", "ss_store_sk", "ss_promo_sk", + "ss_ticket_number", "ss_quantity", "ss_wholesale_cost", "ss_list_price", + "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price", + "ss_ext_wholesale_cost", "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", + "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit" + ], + "dtype": { + "ss_sold_date_sk": "int32", + "ss_sold_time_sk": "int32", + "ss_item_sk": "int32", + "ss_customer_sk": "int32", + "ss_cdemo_sk": "int32", + "ss_hdemo_sk": "int32", + "ss_addr_sk": "int32", + "ss_store_sk": "int32", + "ss_promo_sk": "int32", + "ss_ticket_number": "int64", + "ss_quantity": "int32", + "ss_wholesale_cost": "decimal(7, 2)", + "ss_list_price": "decimal(7, 2)", + "ss_sales_price": "decimal(7, 2)", + "ss_ext_discount_amt": "decimal(7, 2)", + "ss_ext_sales_price": "decimal(7, 2)", + "ss_ext_wholesale_cost": "decimal(7, 2)", + "ss_ext_list_price": "decimal(7, 2)", + "ss_ext_tax": "decimal(7, 2)", + "ss_coupon_amt": "decimal(7, 2)", + "ss_net_paid": "decimal(7, 2)", + "ss_net_paid_inc_tax": "decimal(7, 2)", + "ss_net_profit": "decimal(7, 2)" + } + }, + "store_returns": { + "columns": [ + "sr_returned_date_sk", "sr_return_time_sk", "sr_item_sk", "sr_customer_sk", + "sr_cdemo_sk", "sr_hdemo_sk", "sr_addr_sk", "sr_store_sk", "sr_reason_sk", + "sr_ticket_number", "sr_return_quantity", "sr_return_amt", "sr_return_tax", + "sr_return_amt_inc_tax", "sr_fee", "sr_return_ship_cost", "sr_refunded_cash", + "sr_reversed_charge", "sr_store_credit", "sr_net_loss" + ], + "dtype": { + "sr_returned_date_sk": "int32", + "sr_return_time_sk": "int32", + "sr_item_sk": "int32", + "sr_customer_sk": "int32", + "sr_cdemo_sk": "int32", + "sr_hdemo_sk": "int32", + "sr_addr_sk": "int32", + "sr_store_sk": "int32", + "sr_reason_sk": "int32", + "sr_ticket_number": "int64", + "sr_return_quantity": "int32", + "sr_return_amt": "decimal(7, 2)", + "sr_return_tax": "decimal(7, 2)", + "sr_return_amt_inc_tax": "decimal(7, 2)", + "sr_fee": "decimal(7, 2)", + "sr_return_ship_cost": "decimal(7, 2)", + "sr_refunded_cash": "decimal(7, 2)", + "sr_reversed_charge": "decimal(7, 2)", + "sr_store_credit": "decimal(7, 2)", + "sr_net_loss": "decimal(7, 2)" + } + }, + "warehouse": { + "columns": [ + "w_warehouse_sk", "w_warehouse_id", "w_warehouse_name", + "w_warehouse_sq_ft", "w_street_number", "w_street_name", + "w_street_type", "w_suite_number", "w_city", "w_county", + "w_state", "w_zip", "w_country", "w_gmt_offset" + ], + "dtype": { + "w_warehouse_sk": "int32", + "w_warehouse_id": "string", + "w_warehouse_name": "string", + "w_warehouse_sq_ft": "int32", + "w_street_number": "string", + "w_street_name": "string", + "w_street_type": "string", + "w_suite_number": "string", + "w_city": "string", + "w_county": "string", + "w_state": "string", + "w_zip": "string", + "w_country": "string", + "w_gmt_offset": "decimal(5, 2)" + } + }, } def detect_encoding(file_path, default_encoding="utf-8"): """ - Detect the encoding of a file using chardet. Falls back to a default encoding. + Detect the encoding of a file using chardet. Falls back to a default encoding if detection fails. """ try: with open(file_path, 'rb') as f: @@ -572,25 +730,27 @@ def detect_encoding(file_path, default_encoding="utf-8"): def convert_and_delete_dat_files(data_dir, default_value=0): """ - Convert `.dat` files in a directory to Parquet format based on the global TABLE_SCHEMAS. - If `NULL` or missing values are encountered, replace them with `default_value`. + Convert .dat files in the specified directory to .parquet format and delete the original .dat files. + + Args: + data_dir (str): The directory containing the .dat files. + default_value (int, optional): The default value to fill missing data. Defaults to 0. """ if not os.path.exists(data_dir): print(f"Error: The directory {data_dir} does not exist.") return - files_processed = False # Track if any files were processed - + files_processed = False for file_name in os.listdir(data_dir): if file_name.endswith(".dat"): files_processed = True - table_name, _ = os.path.splitext(file_name) # Extract table name (without extension) + base_name = os.path.basename(file_name) # Get the file name without the directory path + table_name = os.path.splitext(base_name)[0] # Extract table name by removing file extension input_file = os.path.join(data_dir, file_name) output_file = os.path.join(data_dir, f"{table_name}.parquet") print(f"Processing: {input_file} -> {output_file}") - # Check if a schema is defined for the table if table_name in TABLE_SCHEMAS: schema = TABLE_SCHEMAS[table_name] columns = schema.get("columns") @@ -600,11 +760,8 @@ def convert_and_delete_dat_files(data_dir, default_value=0): continue try: - # Detect encoding + # Detect encoding and read the .dat file encoding = detect_encoding(input_file) - print(f"Detected encoding for {file_name}: {encoding}") - - # Read .dat file using pandas df = pd.read_csv( input_file, sep="|", @@ -612,32 +769,77 @@ def convert_and_delete_dat_files(data_dir, default_value=0): header=None, engine="python", skipfooter=1, - encoding=encoding, - na_values=["NULL", ""], # Treat "NULL" and empty strings as missing values - dtype=dtype + na_values=["NULL", ""], + dtype="object", # Read as objects (strings) to avoid type issues during parsing + encoding=encoding, # Use detected encoding ) - # Replace NULL/missing values with the default value + # Replace missing values df = df.fillna(default_value) - # Convert to Parquet + # Convert column types based on schema + for col, col_type in dtype.items(): + try: + if col_type == "Int64" or col_type == "int64": + df[col] = pd.to_numeric(df[col], errors="coerce") + df[col] = df[col].fillna(default_value).astype(int).astype("Int64") + elif col_type == "int32": + df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0).astype(int).astype('int32') + elif col_type == "float64": + df[col] = pd.to_numeric(df[col], errors="coerce").fillna(default_value).astype("float64") + elif col_type == "datetime64[ns]": + df[col] = pd.to_datetime(df[col], format="%Y-%m-%d", errors="coerce").fillna(pd.Timestamp("1970-01-01")) + elif col_type == "date32": + df[col] = pd.to_datetime(df[col], format="%Y-%m-%d", errors="coerce").dt.date.fillna(pd.Timestamp("1970-01-01").date()) + elif col_type == "string": + df[col] = df[col].fillna("").astype(str) + elif re.match(r"decimal\(\d+, \d+\)", col_type): + match = re.match(r"decimal\((\d+), (\d+)\)", col_type) + precision = int(match.group(1)) # Extract precision + scale = int(match.group(2)) # Extract scale + getcontext().prec = precision + df[col] = df[col].apply( + lambda x: Decimal(str(x)).quantize(Decimal(f"1.{'0' * scale}")) + if pd.notnull(x) and str(x).replace(".", "", 1).isdigit() + else Decimal(f"0.{'0' * scale}") + ) + except Exception as e: + print(f"Error converting column {col}: {e}") + continue + + # Convert the DataFrame to Parquet format table = pa.Table.from_pandas(df) pq.write_table(table, output_file) print(f"Conversion completed: {output_file}") - # Delete original .dat file + # Delete the original .dat file os.remove(input_file) print(f"Original file deleted: {input_file}") except Exception as e: - error_message = f"Error processing {table_name}: {e}" - print(error_message) + print(f"Error processing {table_name}: {e}") if not files_processed: print(f"No `.dat` files found in the directory {data_dir}.") + if __name__ == "__main__": - DATA_DIR = "/Users/xiangyanxin/personal/DATAFUSION/tpcds-kit/tpcds-data" + # Use argparse to handle command-line arguments + parser = argparse.ArgumentParser(description="Convert .dat files to .parquet format.") + parser.add_argument( + "--dir", + required=True, + help="Path to the directory containing .dat files." + ) + args = parser.parse_args() - convert_and_delete_dat_files(DATA_DIR) \ No newline at end of file + # Set the directory from command-line arguments + data_dir = args.dir + + # Check if the directory exists + if not os.path.exists(data_dir): + print(f"Error: The specified directory '{data_dir}' does not exist.") + else: + pd.set_option('future.no_silent_downcasting', True) + convert_and_delete_dat_files(data_dir) \ No newline at end of file diff --git a/benchmarks/verify.sh b/benchmarks/verify.sh new file mode 100755 index 000000000000..aa736742bc69 --- /dev/null +++ b/benchmarks/verify.sh @@ -0,0 +1,236 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# This script is meant for developers of DataFusion -- it is runnable +# from the standard DataFusion development environment and uses cargo, +# etc and orchestrates gathering data and run the benchmark binary in +# different configurations. + +# Exit on error +set -e + +# Get the current script directory +SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd) + +# Default path configurations +TPCDS_REPO="https://github.com/databricks/tpcds-kit" # TPC-DS repository URL +TPCDS_BASE_DIR="${SCRIPT_DIR}/data" # Base directory for cloning and data +TPCDS_DIR="${TPCDS_BASE_DIR}/tpcds-kit" # Directory for TPC-DS repository +TPCDS_TOOL_DIR="${TPCDS_DIR}/tools" # Directory for TPC-DS tools +TPCDS_DATA_DIR="${TPCDS_DIR}/data" # Directory for generated TPC-DS data +QUERY_DIR="${SCRIPT_DIR}/queries" # Directory for query SQL files +RESULT_DIR="${SCRIPT_DIR}/results" # Directory for query results +CARGO_COMMAND="cargo run --release --bin tpcds" # Command to run Rust binary + +# Default arguments +SCALE_FACTOR="" +QUERY_NUMBER="" +DIR="" + +# Usage information +usage() { + echo " +Usage: +verify.sh --scale [--query ] [--dir ] + +Options: + --scale Specify TPC-DS data scale factor (e.g., 1, 10, 100, etc.) + --query Specify the query number to execute (e.g., 1, 2, 3, etc.) + If not provided, all queries will be executed. + --dir Specify a folder containing existing TPC-DS data (.dat files) + If provided, the script will skip data generation and convert .dat files to .parquet. + +Examples: +# Clone the TPC-DS repository, build tools, generate 10GB data, convert to parquet, and execute all queries +./verify.sh --scale 10 + +# Use existing data folder, convert to parquet, and execute all queries +./verify.sh --dir /path/to/existing/data --scale 10 + +# Generate 10GB data, convert to parquet, and execute query 1 +./verify.sh --scale 10 --query 1 +" + exit 1 +} + +# Parse command-line arguments +parse_args() { + while [[ $# -gt 0 ]]; do + case $1 in + --scale) + SCALE_FACTOR=$2 + shift 2 + ;; + --query) + QUERY_NUMBER=$2 + shift 2 + ;; + --dir) + DIR=$2 + shift 2 + ;; + *) + usage + ;; + esac + done + + if [[ -z "${SCALE_FACTOR}" && -z "${DIR}" ]]; then + echo "Error: Either --scale or --dir is required." + usage + fi +} + +# Clone the TPC-DS repository +clone_tpcds_repo() { + if [ ! -d "${TPCDS_DIR}" ]; then + echo "Cloning TPC-DS repository from ${TPCDS_REPO} into ${TPCDS_DIR}..." + mkdir -p "${TPCDS_BASE_DIR}" + git clone "${TPCDS_REPO}" "${TPCDS_DIR}" + echo "TPC-DS repository cloned successfully." + else + echo "TPC-DS repository already exists at ${TPCDS_DIR}." + fi +} + +# Build TPC-DS tools +build_tpcds_tools() { + echo "Building TPC-DS tools in ${TPCDS_TOOL_DIR}..." + pushd "${TPCDS_TOOL_DIR}" > /dev/null + make OS=MACOS CFLAGS="-D_FILE_OFFSET_BITS=64 -D_LARGEFILE_SOURCE -DYYDEBUG -DMACOS -g -Wall -fcommon -std=c99 -Wno-implicit-int -Wno-error" + popd > /dev/null + echo "TPC-DS tools built successfully." +} + +# Generate TPC-DS data +generate_data() { + echo "Preparing TPC-DS data directory at ${TPCDS_DATA_DIR}..." + + # 如果 tpcds-data 目录已经存在,删除并重新创建 + if [ -d "${TPCDS_DATA_DIR}" ]; then + echo "TPC-DS data directory already exists. Removing it..." + rm -rf "${TPCDS_DATA_DIR}" + fi + + mkdir -p "${TPCDS_DATA_DIR}" + + echo "Generating TPC-DS data at scale factor ${SCALE_FACTOR} in ${TPCDS_DATA_DIR}..." + + # 相对路径设置,计算相对于 tools 目录的路径 + local relative_data_dir="../data" + + # 打印 TPCDS_TOOL_DIR + echo "TPCDS_TOOL_DIR: ${TPCDS_TOOL_DIR}" + + # 构造并打印 dsdgen 命令 + local dsdgen_command="./dsdgen -dir ${relative_data_dir} -scale ${SCALE_FACTOR} -FORCE" + echo "Running dsdgen command: ${dsdgen_command}" + + # 切换到工具目录并运行命令 + pushd "${TPCDS_TOOL_DIR}" > /dev/null + ./dsdgen -dir "${relative_data_dir}" \ + -scale ${SCALE_FACTOR} \ + -FORCE + popd > /dev/null + + echo "TPC-DS data generation completed." + + DIR="${TPCDS_DATA_DIR}" +} + +# Convert .dat files to .parquet using the Python script +convert_dat_to_parquet() { + echo "Converting .dat files in ${DIR} to .parquet format using transfer_dat_parquet.py..." + python "${SCRIPT_DIR}/transfer_dat_parquet.py" --dir "${DIR}" + echo "Conversion completed for .dat files in ${DIR}." +} + +# Execute a single query +run_single_query() { + QUERY_NUMBER=$1 + QUERY_FILE="${QUERY_DIR}/query${QUERY_NUMBER}.sql" + + if [ ! -f "${QUERY_FILE}" ]; then + echo "Error: Query file '${QUERY_FILE}' does not exist." + exit 1 + fi + + RESULT_FILE="${RESULT_DIR}/query${QUERY_NUMBER}_sf${SCALE_FACTOR}.out" + + echo "Running query: query${QUERY_NUMBER} on scale factor ${SCALE_FACTOR}..." + mkdir -p "${RESULT_DIR}" # Ensure the result directory exists + + # Execute the query and save the result + ${CARGO_COMMAND} -- query --query "${QUERY_FILE}" --path "${DIR}" --format parquet > "${RESULT_FILE}" + + echo "Query query${QUERY_NUMBER} completed. Result saved to ${RESULT_FILE}." +} + +# Execute all queries +run_all_queries() { + echo "Running all TPC-DS queries on scale factor ${SCALE_FACTOR}..." + + mkdir -p "${RESULT_DIR}" # Ensure the result directory exists + + # Iterate through all query files and execute them + for query_file in "${QUERY_DIR}"/*.sql; do + if [ -f "${query_file}" ]; then + QUERY_NAME=$(basename "${query_file}" .sql) + QUERY_NUMBER=${QUERY_NAME#query} + run_single_query "${QUERY_NUMBER}" + fi + done + + echo "All queries completed. Results are in ${RESULT_DIR}." +} + +setup_venv() { + python3 -m venv "$VIRTUAL_ENV" + PATH=$VIRTUAL_ENV/bin:$PATH python3 -m pip install -r requirements.txt +} + + +# Main function +main() { + # Parse command-line arguments + parse_args "$@" + + # If DIR is not specified, generate data + if [ -z "${DIR}" ]; then + # Clone TPC-DS repository + clone_tpcds_repo + + # Build TPC-DS tools + build_tpcds_tools + + # Generate data + generate_data + fi + + # Convert .dat files to .parquet + convert_dat_to_parquet + + # Execute queries + if [ -n "${QUERY_NUMBER}" ]; then + run_single_query "${QUERY_NUMBER}" + else + run_all_queries + fi +} + +main "$@" \ No newline at end of file