Skip to content

Commit

Permalink
speed-up node properties loading from parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
fabianmurariu committed Jan 24, 2025
1 parent 71360b7 commit 857bb91
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 38 deletions.
115 changes: 79 additions & 36 deletions raphtory/src/io/arrow/df_loaders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ pub(crate) fn load_nodes_from_df<
for (idx, (((vid, time), node_type), gid)) in node_col_resolved
.iter()
.zip(time_col.iter())
.zip(node_type_col_resolved.iter()).zip(node_col.iter())
.zip(node_type_col_resolved.iter())
.zip(node_col.iter())
.enumerate()
{
let shard_id = shard.shard_id();
Expand Down Expand Up @@ -517,7 +518,7 @@ pub(crate) fn load_edge_deletions_from_df<

pub(crate) fn load_node_props_from_df<
'a,
G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps,
G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps + InternalCache,
>(
df_view: DFView<impl Iterator<Item = Result<DFChunk, GraphError>>>,
node_id: &str,
Expand All @@ -528,63 +529,105 @@ pub(crate) fn load_node_props_from_df<
graph: &G,
) -> Result<(), GraphError> {
let constant_properties = constant_properties.unwrap_or(&[]);

let constant_properties_indices = constant_properties
.iter()
.map(|name| df_view.get_index(name))
.collect::<Result<Vec<_>, GraphError>>()?;
let node_id_index = df_view.get_index(node_id)?;

let node_type_index = if let Some(node_type_col) = node_type_col {
Some(df_view.get_index(node_type_col.as_ref())?)
Some(df_view.get_index(node_type_col.as_ref()))
} else {
None
};
let shared_constant_properties = match shared_constant_properties {
Some(props) => props
.iter()
.map(|(name, prop)| {
Ok((
graph
.resolve_node_property(name, prop.dtype(), true)?
.inner(),
prop.clone(),
))
})
.collect::<Result<Vec<_>, GraphError>>()?,
None => vec![],
};
let node_type_index = node_type_index.transpose()?;

let node_id_index = df_view.get_index(node_id)?;

let shared_constant_properties =
process_shared_properties(shared_constant_properties, |key, dtype| {
graph.resolve_node_property(key, dtype, true)
})?;

#[cfg(feature = "python")]
let mut pb = build_progress_bar("Loading node properties".to_string(), df_view.num_rows)?;

let mut node_col_resolved = vec![];

let cache = graph.get_cache();
let mut write_locked_graph = graph.write_lock()?;
let cache_shards = cache.map(|cache| {
(0..write_locked_graph.num_shards())
.map(|_| cache.fork())
.collect::<Vec<_>>()
});

for chunk in df_view.chunks {
let df = chunk?;
let const_props = combine_properties(
let const_prop_cols = combine_properties(
constant_properties,
&constant_properties_indices,
&df,
|name, dtype| graph.resolve_node_property(name, dtype, true),
|key, dtype| graph.resolve_node_property(key, dtype, true),
)?;
let node_col = df.node_col(node_id_index)?;
let node_type_col = lift_node_type_col(node_type, node_type_index, &df)?;
let node_type_col_resolved = node_type_col.resolve(graph)?;
let node_col = df.node_col(node_id_index)?;

node_col_resolved.resize_with(df.len(), Default::default);
node_col
.par_iter()
.zip(node_type_col.par_iter())
.zip(const_props.par_rows())
.try_for_each(|((node_id, node_type), cprops)| {
let node_id = node_id.ok_or(LoadError::MissingNodeError)?;
let node = graph
.node(node_id)
.ok_or_else(|| GraphError::NodeMissingError(node_id.to_owned()))?;
if let Some(node_type) = node_type {
node.set_node_type(node_type)?;
.zip(node_col_resolved.par_iter_mut())
.try_for_each(|(gid, resolved)| {
let gid = gid.ok_or(LoadError::FatalError)?;
let vid = write_locked_graph
.resolve_node(gid)
.map_err(|_| LoadError::FatalError)?;
if let Some(cache) = cache {
cache.resolve_node(vid, gid);
}
let props = cprops
.chain(shared_constant_properties.iter().cloned())
.collect::<Vec<_>>();
if !props.is_empty() {
graph.internal_add_constant_node_properties(node.node, &props)?;
*resolved = vid.inner();
Ok::<(), LoadError>(())
})?;

write_locked_graph
.nodes
.resize(write_locked_graph.num_nodes());

write_locked_graph
.nodes
.par_iter_mut()
.try_for_each(|mut shard| {
let mut c_props = vec![];

for (idx, ((vid, node_type), gid)) in node_col_resolved
.iter()
.zip(node_type_col_resolved.iter())
.zip(node_col.iter())
.enumerate()
{
let shard_id = shard.shard_id();
if let Some(mut_node) = shard.get_mut(*vid) {
mut_node.init(*vid, gid);
mut_node.node_type = *node_type;

c_props.clear();
c_props.extend(const_prop_cols.iter_row(idx));
c_props.extend_from_slice(&shared_constant_properties);

if let Some(caches) = cache_shards.as_ref() {
let cache = &caches[shard_id];
cache.add_node_cprops(*vid, &c_props);
}

for (id, prop) in c_props.drain(..) {
mut_node.add_constant_prop(id, prop)?;
}
};
}
Ok::<(), GraphError>(())
Ok::<_, GraphError>(())
})?;

#[cfg(feature = "python")]
let _ = pb.update(df.len());
}
Expand Down
2 changes: 1 addition & 1 deletion raphtory/src/io/parquet_loaders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ pub fn load_edges_from_parquet<
}

pub fn load_node_props_from_parquet<
G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps,
G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps + InternalCache,
>(
graph: &G,
parquet_path: &Path,
Expand Down
1 change: 0 additions & 1 deletion raphtory/src/serialise/serialise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,6 @@ mod proto_test {
use chrono::{DateTime, NaiveDateTime};
use proptest::proptest;
use raphtory_api::core::storage::arc_str::ArcStr;
use rustc_hash::FxHashMap;

#[test]
fn prev_proto_str() {
Expand Down

0 comments on commit 857bb91

Please sign in to comment.