From 857bb91faafee7982114aaabfc16cafd4ea7b262 Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Fri, 24 Jan 2025 16:46:06 +0000 Subject: [PATCH] speed-up node properties loading from parquet --- raphtory/src/io/arrow/df_loaders.rs | 115 +++++++++++++++++++--------- raphtory/src/io/parquet_loaders.rs | 2 +- raphtory/src/serialise/serialise.rs | 1 - 3 files changed, 80 insertions(+), 38 deletions(-) diff --git a/raphtory/src/io/arrow/df_loaders.rs b/raphtory/src/io/arrow/df_loaders.rs index 4eb7a20c27..e0491ea67b 100644 --- a/raphtory/src/io/arrow/df_loaders.rs +++ b/raphtory/src/io/arrow/df_loaders.rs @@ -155,7 +155,8 @@ pub(crate) fn load_nodes_from_df< for (idx, (((vid, time), node_type), gid)) in node_col_resolved .iter() .zip(time_col.iter()) - .zip(node_type_col_resolved.iter()).zip(node_col.iter()) + .zip(node_type_col_resolved.iter()) + .zip(node_col.iter()) .enumerate() { let shard_id = shard.shard_id(); @@ -517,7 +518,7 @@ pub(crate) fn load_edge_deletions_from_df< pub(crate) fn load_node_props_from_df< 'a, - G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps + InternalCache, >( df_view: DFView>>, node_id: &str, @@ -528,63 +529,105 @@ pub(crate) fn load_node_props_from_df< graph: &G, ) -> Result<(), GraphError> { let constant_properties = constant_properties.unwrap_or(&[]); + let constant_properties_indices = constant_properties .iter() .map(|name| df_view.get_index(name)) .collect::, GraphError>>()?; - let node_id_index = df_view.get_index(node_id)?; + let node_type_index = if let Some(node_type_col) = node_type_col { - Some(df_view.get_index(node_type_col.as_ref())?) + Some(df_view.get_index(node_type_col.as_ref())) } else { None }; - let shared_constant_properties = match shared_constant_properties { - Some(props) => props - .iter() - .map(|(name, prop)| { - Ok(( - graph - .resolve_node_property(name, prop.dtype(), true)? - .inner(), - prop.clone(), - )) - }) - .collect::, GraphError>>()?, - None => vec![], - }; + let node_type_index = node_type_index.transpose()?; + + let node_id_index = df_view.get_index(node_id)?; + + let shared_constant_properties = + process_shared_properties(shared_constant_properties, |key, dtype| { + graph.resolve_node_property(key, dtype, true) + })?; + #[cfg(feature = "python")] let mut pb = build_progress_bar("Loading node properties".to_string(), df_view.num_rows)?; + + let mut node_col_resolved = vec![]; + + let cache = graph.get_cache(); + let mut write_locked_graph = graph.write_lock()?; + let cache_shards = cache.map(|cache| { + (0..write_locked_graph.num_shards()) + .map(|_| cache.fork()) + .collect::>() + }); + for chunk in df_view.chunks { let df = chunk?; - let const_props = combine_properties( + let const_prop_cols = combine_properties( constant_properties, &constant_properties_indices, &df, - |name, dtype| graph.resolve_node_property(name, dtype, true), + |key, dtype| graph.resolve_node_property(key, dtype, true), )?; - let node_col = df.node_col(node_id_index)?; let node_type_col = lift_node_type_col(node_type, node_type_index, &df)?; + let node_type_col_resolved = node_type_col.resolve(graph)?; + let node_col = df.node_col(node_id_index)?; + node_col_resolved.resize_with(df.len(), Default::default); node_col .par_iter() - .zip(node_type_col.par_iter()) - .zip(const_props.par_rows()) - .try_for_each(|((node_id, node_type), cprops)| { - let node_id = node_id.ok_or(LoadError::MissingNodeError)?; - let node = graph - .node(node_id) - .ok_or_else(|| GraphError::NodeMissingError(node_id.to_owned()))?; - if let Some(node_type) = node_type { - node.set_node_type(node_type)?; + .zip(node_col_resolved.par_iter_mut()) + .try_for_each(|(gid, resolved)| { + let gid = gid.ok_or(LoadError::FatalError)?; + let vid = write_locked_graph + .resolve_node(gid) + .map_err(|_| LoadError::FatalError)?; + if let Some(cache) = cache { + cache.resolve_node(vid, gid); } - let props = cprops - .chain(shared_constant_properties.iter().cloned()) - .collect::>(); - if !props.is_empty() { - graph.internal_add_constant_node_properties(node.node, &props)?; + *resolved = vid.inner(); + Ok::<(), LoadError>(()) + })?; + + write_locked_graph + .nodes + .resize(write_locked_graph.num_nodes()); + + write_locked_graph + .nodes + .par_iter_mut() + .try_for_each(|mut shard| { + let mut c_props = vec![]; + + for (idx, ((vid, node_type), gid)) in node_col_resolved + .iter() + .zip(node_type_col_resolved.iter()) + .zip(node_col.iter()) + .enumerate() + { + let shard_id = shard.shard_id(); + if let Some(mut_node) = shard.get_mut(*vid) { + mut_node.init(*vid, gid); + mut_node.node_type = *node_type; + + c_props.clear(); + c_props.extend(const_prop_cols.iter_row(idx)); + c_props.extend_from_slice(&shared_constant_properties); + + if let Some(caches) = cache_shards.as_ref() { + let cache = &caches[shard_id]; + cache.add_node_cprops(*vid, &c_props); + } + + for (id, prop) in c_props.drain(..) { + mut_node.add_constant_prop(id, prop)?; + } + }; } - Ok::<(), GraphError>(()) + Ok::<_, GraphError>(()) })?; + #[cfg(feature = "python")] let _ = pb.update(df.len()); } diff --git a/raphtory/src/io/parquet_loaders.rs b/raphtory/src/io/parquet_loaders.rs index 148f5cfe43..d1570f3a55 100644 --- a/raphtory/src/io/parquet_loaders.rs +++ b/raphtory/src/io/parquet_loaders.rs @@ -137,7 +137,7 @@ pub fn load_edges_from_parquet< } pub fn load_node_props_from_parquet< - G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps + InternalCache, >( graph: &G, parquet_path: &Path, diff --git a/raphtory/src/serialise/serialise.rs b/raphtory/src/serialise/serialise.rs index d7e31260cb..ad391de9f6 100644 --- a/raphtory/src/serialise/serialise.rs +++ b/raphtory/src/serialise/serialise.rs @@ -688,7 +688,6 @@ mod proto_test { use chrono::{DateTime, NaiveDateTime}; use proptest::proptest; use raphtory_api::core::storage::arc_str::ArcStr; - use rustc_hash::FxHashMap; #[test] fn prev_proto_str() {