Skip to content

Commit

Permalink
refactor: replace OnceLock with LazyLock (round 2) (apache#13674)
Browse files Browse the repository at this point in the history
* refactor: replace `OnceLock` with `LazyLock`

* Update UDWF macros

* Fix comment format
  • Loading branch information
jonahgao authored Dec 6, 2024
1 parent 2464703 commit ce330ec
Show file tree
Hide file tree
Showing 17 changed files with 161 additions and 214 deletions.
23 changes: 9 additions & 14 deletions benchmarks/src/bin/external_aggr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::LazyLock;
use structopt::StructOpt;

use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -91,7 +91,13 @@ struct QueryResult {
/// Memory limits to run: 64MiB, 32MiB, 16MiB
/// Q2 requires 250MiB for aggregation
/// Memory limits to run: 512MiB, 256MiB, 128MiB, 64MiB, 32MiB
static QUERY_MEMORY_LIMITS: OnceLock<HashMap<usize, Vec<u64>>> = OnceLock::new();
static QUERY_MEMORY_LIMITS: LazyLock<HashMap<usize, Vec<u64>>> = LazyLock::new(|| {
use units::*;
let mut map = HashMap::new();
map.insert(1, vec![64 * MB, 32 * MB, 16 * MB]);
map.insert(2, vec![512 * MB, 256 * MB, 128 * MB, 64 * MB, 32 * MB]);
map
});

impl ExternalAggrConfig {
const AGGR_TABLES: [&'static str; 1] = ["lineitem"];
Expand All @@ -114,16 +120,6 @@ impl ExternalAggrConfig {
"#,
];

fn init_query_memory_limits() -> &'static HashMap<usize, Vec<u64>> {
use units::*;
QUERY_MEMORY_LIMITS.get_or_init(|| {
let mut map = HashMap::new();
map.insert(1, vec![64 * MB, 32 * MB, 16 * MB]);
map.insert(2, vec![512 * MB, 256 * MB, 128 * MB, 64 * MB, 32 * MB]);
map
})
}

/// If `--query` and `--memory-limit` is not speicified, run all queries
/// with pre-configured memory limits
/// If only `--query` is specified, run the query with all memory limits
Expand Down Expand Up @@ -161,8 +157,7 @@ impl ExternalAggrConfig {
query_executions.push((query_id, limit));
}
None => {
let memory_limits_table = Self::init_query_memory_limits();
let memory_limits = memory_limits_table.get(&query_id).unwrap();
let memory_limits = QUERY_MEMORY_LIMITS.get(&query_id).unwrap();
for limit in memory_limits {
query_executions.push((query_id, *limit));
}
Expand Down
20 changes: 8 additions & 12 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::collections::HashMap;
use std::env;
use std::path::Path;
use std::process::ExitCode;
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};

use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionConfig;
Expand Down Expand Up @@ -279,9 +279,8 @@ impl ByteUnit {
}

fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
fn byte_suffixes() -> &'static HashMap<&'static str, ByteUnit> {
static BYTE_SUFFIXES: OnceLock<HashMap<&'static str, ByteUnit>> = OnceLock::new();
BYTE_SUFFIXES.get_or_init(|| {
static BYTE_SUFFIXES: LazyLock<HashMap<&'static str, ByteUnit>> =
LazyLock::new(|| {
let mut m = HashMap::new();
m.insert("b", ByteUnit::Byte);
m.insert("k", ByteUnit::KiB);
Expand All @@ -293,23 +292,20 @@ fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
m.insert("t", ByteUnit::TiB);
m.insert("tb", ByteUnit::TiB);
m
})
}
});

fn suffix_re() -> &'static regex::Regex {
static SUFFIX_REGEX: OnceLock<regex::Regex> = OnceLock::new();
SUFFIX_REGEX.get_or_init(|| regex::Regex::new(r"^(-?[0-9]+)([a-z]+)?$").unwrap())
}
static SUFFIX_REGEX: LazyLock<regex::Regex> =
LazyLock::new(|| regex::Regex::new(r"^(-?[0-9]+)([a-z]+)?$").unwrap());

let lower = size.to_lowercase();
if let Some(caps) = suffix_re().captures(&lower) {
if let Some(caps) = SUFFIX_REGEX.captures(&lower) {
let num_str = caps.get(1).unwrap().as_str();
let num = num_str.parse::<usize>().map_err(|_| {
format!("Invalid numeric value in memory pool size '{}'", size)
})?;

let suffix = caps.get(2).map(|m| m.as_str()).unwrap_or("b");
let unit = byte_suffixes()
let unit = &BYTE_SUFFIXES
.get(suffix)
.ok_or_else(|| format!("Invalid memory pool size '{}'", size))?;
let memory_pool_size = usize::try_from(unit.multiplier())
Expand Down
58 changes: 27 additions & 31 deletions datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ mod test {
use parquet::basic::LogicalType;
use parquet::file::metadata::ColumnChunkMetaData;
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};

#[test]
fn test_only_scans() {
Expand All @@ -358,7 +358,7 @@ mod test {

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap();

// scan all row groups, no selection
Expand All @@ -377,7 +377,7 @@ mod test {

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap();

// skip all row groups, no selection
Expand All @@ -403,7 +403,7 @@ mod test {

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap();

assert_eq!(row_group_indexes, vec![0, 1]);
Expand Down Expand Up @@ -442,7 +442,7 @@ mod test {

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap();

assert_eq!(row_group_indexes, vec![1, 2, 3]);
Expand Down Expand Up @@ -478,7 +478,7 @@ mod test {

let row_group_indexes = access_plan.row_group_indexes();
let err = access_plan
.into_overall_row_selection(row_group_metadata())
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap_err()
.to_string();
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
Expand All @@ -504,39 +504,35 @@ mod test {

let row_group_indexes = access_plan.row_group_indexes();
let err = access_plan
.into_overall_row_selection(row_group_metadata())
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap_err()
.to_string();
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
assert_contains!(err, "Invalid ParquetAccessPlan Selection. Row group 1 has 20 rows but selection only specifies 22 rows");
}

static ROW_GROUP_METADATA: OnceLock<Vec<RowGroupMetaData>> = OnceLock::new();

/// [`RowGroupMetaData`] that returns 4 row groups with 10, 20, 30, 40 rows
/// respectively
fn row_group_metadata() -> &'static [RowGroupMetaData] {
ROW_GROUP_METADATA.get_or_init(|| {
let schema_descr = get_test_schema_descr();
let row_counts = [10, 20, 30, 40];

row_counts
.into_iter()
.map(|num_rows| {
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
.set_num_values(num_rows)
.build()
.unwrap();

RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(num_rows)
.set_column_metadata(vec![column])
.build()
.unwrap()
})
.collect()
})
}
static ROW_GROUP_METADATA: LazyLock<Vec<RowGroupMetaData>> = LazyLock::new(|| {
let schema_descr = get_test_schema_descr();
let row_counts = [10, 20, 30, 40];

row_counts
.into_iter()
.map(|num_rows| {
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
.set_num_values(num_rows)
.build()
.unwrap();

RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(num_rows)
.set_column_metadata(vec![column])
.build()
.unwrap()
})
.collect()
});

/// Single column schema with a single column named "a" of type `BYTE_ARRAY`/`String`
fn get_test_schema_descr() -> SchemaDescPtr {
Expand Down
76 changes: 34 additions & 42 deletions datafusion/core/tests/expr_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_functions_nested::expr_ext::{IndexAccessor, SliceAccessor};
use sqlparser::ast::NullTreatment;
/// Tests of using and evaluating `Expr`s outside the context of a LogicalPlan
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};

mod parse_sql_expr;
mod simplification;
Expand Down Expand Up @@ -305,13 +305,11 @@ async fn test_aggregate_ext_null_treatment() {
/// Evaluates the specified expr as an aggregate and compares the result to the
/// expected result.
async fn evaluate_agg_test(expr: Expr, expected_lines: Vec<&str>) {
let batch = test_batch();

let ctx = SessionContext::new();
let group_expr = vec![];
let agg_expr = vec![expr];
let result = ctx
.read_batch(batch)
.read_batch(TEST_BATCH.clone())
.unwrap()
.aggregate(group_expr, agg_expr)
.unwrap()
Expand All @@ -332,13 +330,13 @@ async fn evaluate_agg_test(expr: Expr, expected_lines: Vec<&str>) {
/// Converts the `Expr` to a `PhysicalExpr`, evaluates it against the provided
/// `RecordBatch` and compares the result to the expected result.
fn evaluate_expr_test(expr: Expr, expected_lines: Vec<&str>) {
let batch = test_batch();
let batch = &TEST_BATCH;
let df_schema = DFSchema::try_from(batch.schema()).unwrap();
let physical_expr = SessionContext::new()
.create_physical_expr(expr, &df_schema)
.unwrap();

let result = physical_expr.evaluate(&batch).unwrap();
let result = physical_expr.evaluate(batch).unwrap();
let array = result.into_array(1).unwrap();
let result = pretty_format_columns("expr", &[array]).unwrap().to_string();
let actual_lines = result.lines().collect::<Vec<_>>();
Expand All @@ -350,39 +348,33 @@ fn evaluate_expr_test(expr: Expr, expected_lines: Vec<&str>) {
);
}

static TEST_BATCH: OnceLock<RecordBatch> = OnceLock::new();

fn test_batch() -> RecordBatch {
TEST_BATCH
.get_or_init(|| {
let string_array: ArrayRef = Arc::new(StringArray::from(vec!["1", "2", "3"]));
let int_array: ArrayRef =
Arc::new(Int64Array::from_iter(vec![Some(10), None, Some(5)]));

// { a: "2021-02-01" } { a: "2021-02-02" } { a: "2021-02-03" }
let struct_array: ArrayRef = Arc::from(StructArray::from(vec![(
Arc::new(Field::new("a", DataType::Utf8, false)),
Arc::new(StringArray::from(vec![
"2021-02-01",
"2021-02-02",
"2021-02-03",
])) as _,
)]));

// ["one"] ["two", "three", "four"] ["five"]
let mut builder = ListBuilder::new(StringBuilder::new());
builder.append_value([Some("one")]);
builder.append_value([Some("two"), Some("three"), Some("four")]);
builder.append_value([Some("five")]);
let list_array: ArrayRef = Arc::new(builder.finish());

RecordBatch::try_from_iter(vec![
("id", string_array),
("i", int_array),
("props", struct_array),
("list", list_array),
])
.unwrap()
})
.clone()
}
static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
let string_array: ArrayRef = Arc::new(StringArray::from(vec!["1", "2", "3"]));
let int_array: ArrayRef =
Arc::new(Int64Array::from_iter(vec![Some(10), None, Some(5)]));

// { a: "2021-02-01" } { a: "2021-02-02" } { a: "2021-02-03" }
let struct_array: ArrayRef = Arc::from(StructArray::from(vec![(
Arc::new(Field::new("a", DataType::Utf8, false)),
Arc::new(StringArray::from(vec![
"2021-02-01",
"2021-02-02",
"2021-02-03",
])) as _,
)]));

// ["one"] ["two", "three", "four"] ["five"]
let mut builder = ListBuilder::new(StringBuilder::new());
builder.append_value([Some("one")]);
builder.append_value([Some("two"), Some("three"), Some("four")]);
builder.append_value([Some("five")]);
let list_array: ArrayRef = Arc::new(builder.finish());

RecordBatch::try_from_iter(vec![
("id", string_array),
("i", int_array),
("props", struct_array),
("list", list_array),
])
.unwrap()
});
7 changes: 3 additions & 4 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use datafusion_physical_plan::spill::get_record_batch_memory_size;
use futures::StreamExt;
use std::any::Any;
use std::num::NonZeroUsize;
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};
use tokio::fs::File;

use datafusion::datasource::streaming::StreamingTable;
Expand Down Expand Up @@ -730,15 +730,14 @@ fn maybe_split_batches(
.collect()
}

static DICT_BATCHES: OnceLock<Vec<RecordBatch>> = OnceLock::new();

/// Returns 5 sorted string dictionary batches each with 50 rows with
/// this schema.
///
/// a: Dictionary<Utf8, Int32>,
/// b: Dictionary<Utf8, Int32>,
fn dict_batches() -> Vec<RecordBatch> {
DICT_BATCHES.get_or_init(make_dict_batches).clone()
static DICT_BATCHES: LazyLock<Vec<RecordBatch>> = LazyLock::new(make_dict_batches);
DICT_BATCHES.clone()
}

fn make_dict_batches() -> Vec<RecordBatch> {
Expand Down
10 changes: 5 additions & 5 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::fmt::{self, Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};

use super::dml::CopyTo;
use super::DdlStatement;
Expand Down Expand Up @@ -3067,12 +3067,12 @@ impl Aggregate {

/// Get the output expressions.
fn output_expressions(&self) -> Result<Vec<&Expr>> {
static INTERNAL_ID_EXPR: OnceLock<Expr> = OnceLock::new();
static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
});
let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
if self.is_grouping_set() {
exprs.push(INTERNAL_ID_EXPR.get_or_init(|| {
Expr::Column(Column::from_name(Self::INTERNAL_GROUPING_ID))
}));
exprs.push(&INTERNAL_ID_EXPR);
}
exprs.extend(self.aggr_expr.iter());
debug_assert!(exprs.len() == self.schema.fields().len());
Expand Down
Loading

0 comments on commit ce330ec

Please sign in to comment.