From b7b79b0ec8bc6b1c977941009660fc15225c7d26 Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Mon, 27 Jan 2025 12:46:04 +0000 Subject: [PATCH] fixes for parquet encoders --- Cargo.toml | 1 - raphtory/src/io/arrow/df_loaders.rs | 1 + raphtory/src/lib.rs | 2 +- raphtory/src/python/graph/io/pandas_loaders.rs | 4 ++-- raphtory/src/serialise/parquet/edges.rs | 4 ++-- raphtory/src/serialise/parquet/mod.rs | 13 ++++++++----- raphtory/src/serialise/parquet/nodes.rs | 4 ++-- raphtory/src/serialise/serialise.rs | 8 -------- 8 files changed, 16 insertions(+), 21 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3b8d568e41..5b37ded3be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,6 @@ members = [ ] default-members = [ "raphtory", - "raphtory-graphql" ] resolver = "2" diff --git a/raphtory/src/io/arrow/df_loaders.rs b/raphtory/src/io/arrow/df_loaders.rs index aa92f95a77..8f74e3ca2b 100644 --- a/raphtory/src/io/arrow/df_loaders.rs +++ b/raphtory/src/io/arrow/df_loaders.rs @@ -122,6 +122,7 @@ pub(crate) fn load_nodes_from_df< let node_col = df.node_col(node_id_index)?; node_col_resolved.resize_with(df.len(), Default::default); + node_col .par_iter() .zip(node_col_resolved.par_iter_mut()) diff --git a/raphtory/src/lib.rs b/raphtory/src/lib.rs index ca9e4b4d96..9601035ef2 100644 --- a/raphtory/src/lib.rs +++ b/raphtory/src/lib.rs @@ -199,7 +199,7 @@ mod test_utils { PropType::F64 => any::().prop_map(Prop::F64).boxed(), PropType::U8 => any::().prop_map(Prop::U8).boxed(), PropType::Bool => any::().prop_map(Prop::Bool).boxed(), - PropType::DTime => (1970..2024, 1..=12, 1..28, 0..24, 0..60, 0..60) + PropType::DTime => (1900..2024, 1..=12, 1..28, 0..24, 0..60, 0..60) .prop_map(|(year, month, day, h, m, s)| { Prop::DTime( format!( diff --git a/raphtory/src/python/graph/io/pandas_loaders.rs b/raphtory/src/python/graph/io/pandas_loaders.rs index 97995073f4..da1bf0a23f 100644 --- a/raphtory/src/python/graph/io/pandas_loaders.rs +++ b/raphtory/src/python/graph/io/pandas_loaders.rs @@ -98,7 +98,7 @@ pub(crate) fn load_edges_from_pandas< pub(crate) fn load_node_props_from_pandas< 'py, - G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps + InternalCache, >( graph: &G, df: &Bound<'py, PyAny>, @@ -128,7 +128,7 @@ pub(crate) fn load_node_props_from_pandas< pub(crate) fn load_edge_props_from_pandas< 'py, - G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps + InternalCache, >( graph: &G, df: &Bound<'py, PyAny>, diff --git a/raphtory/src/serialise/parquet/edges.rs b/raphtory/src/serialise/parquet/edges.rs index f3bdf4e26e..af405f70d0 100644 --- a/raphtory/src/serialise/parquet/edges.rs +++ b/raphtory/src/serialise/parquet/edges.rs @@ -41,7 +41,7 @@ pub(crate) fn encode_edge_tprop( ] }, |edges, g, decoder, writer| { - let row_group_size = 100_000.min(edges.len()); + let row_group_size = 100_000; let all_layers = LayerIds::All; for edge_rows in edges @@ -86,7 +86,7 @@ pub(crate) fn encode_edge_deletions( ] }, |edges, g, decoder, writer| { - let row_group_size = 100_000.min(edges.len()); + let row_group_size = 100_000; let g = g.lock(); let g = &g; let g_edges = g.edges(); diff --git a/raphtory/src/serialise/parquet/mod.rs b/raphtory/src/serialise/parquet/mod.rs index 77fc98aa5f..d20709328f 100644 --- a/raphtory/src/serialise/parquet/mod.rs +++ b/raphtory/src/serialise/parquet/mod.rs @@ -126,10 +126,12 @@ pub(crate) fn run_encode( std::fs::create_dir_all(&root_dir)?; if size > 0 { - let chunk_size = (size / rayon::current_num_threads()).max(size); - (0..size) - .step_by(chunk_size) - .enumerate() + let chunk_size = (size / rayon::current_num_threads()).max(128); + let iter = (0..size).step_by(chunk_size); + + let num_digits = iter.len().to_string().len(); + + iter.enumerate() .par_bridge() .try_for_each(|(chunk, first)| { let props = WriterProperties::builder() @@ -137,7 +139,8 @@ pub(crate) fn run_encode( .build(); let items = first..(first + chunk_size).min(size); - let node_file = File::create(root_dir.join(format!("{}.parquet", chunk)))?; + let node_file = + File::create(root_dir.join(format!("{chunk:0num_digits$}.parquet")))?; let mut writer = ArrowWriter::try_new(node_file, schema.clone(), Some(props))?; let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder()?; diff --git a/raphtory/src/serialise/parquet/nodes.rs b/raphtory/src/serialise/parquet/nodes.rs index 40486e585b..5bf1535928 100644 --- a/raphtory/src/serialise/parquet/nodes.rs +++ b/raphtory/src/serialise/parquet/nodes.rs @@ -35,7 +35,7 @@ pub(crate) fn encode_nodes_tprop( ] }, |nodes, g, decoder, writer| { - let row_group_size = 100_000.min(nodes.len()); + let row_group_size = 100_000; let cols = g .node_meta() @@ -93,7 +93,7 @@ pub(crate) fn encode_nodes_cprop( ] }, |nodes, g, decoder, writer| { - let row_group_size = 100_000.min(nodes.len()); + let row_group_size = 100_000; for node_rows in nodes .into_iter() diff --git a/raphtory/src/serialise/serialise.rs b/raphtory/src/serialise/serialise.rs index ad391de9f6..118f120741 100644 --- a/raphtory/src/serialise/serialise.rs +++ b/raphtory/src/serialise/serialise.rs @@ -82,14 +82,6 @@ pub trait CacheOps: Sized { impl StableEncode for GraphStorage { fn encode_to_proto(&self) -> proto::Graph { - #[cfg(feature = "storage")] - if let GraphStorage::Disk(storage) = self { - assert!( - storage.inner.layers().len() <= 1, - "Disk based storage not supported right now because it doesn't have aligned edges" - ); - } - let storage = self.lock(); let mut graph = proto::Graph::default();