Skip to content

Commit

Permalink
load edge properties with full lock in df_loaders.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
fabianmurariu committed Jan 24, 2025
1 parent 857bb91 commit 6d229b1
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 51 deletions.
2 changes: 1 addition & 1 deletion examples/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ keywords = ["graph", "temporal-graph", "temporal", "examples"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
raphtory = { path = "../../raphtory", features = ["io"] }
raphtory = { path = "../../raphtory", features = ["io", "proto"] }
chrono = { workspace = true }
regex = { workspace = true }
serde = { workspace = true }
Expand Down
4 changes: 3 additions & 1 deletion raphtory/src/core/utils/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use pyo3::PyErr;
#[cfg(feature = "arrow")]
use raphtory_api::core::entities::GidType;
use raphtory_api::core::{
entities::{properties::PropError, GID},
entities::{properties::PropError, GID, VID},
storage::arc_str::ArcStr,
PropType,
};
Expand Down Expand Up @@ -63,6 +63,8 @@ pub enum LoadError {
MissingNodeError,
#[error("Missing value for timestamp")]
MissingTimeError,
#[error("Missing value for edge id {0:?} -> {1:?}")]
MissingEdgeError(VID, VID),
#[error("Node IDs have the wrong type, expected {existing}, got {new}")]
NodeIdTypeError { existing: GidType, new: GidType },
#[error("Fatal load error, graph may be in a dirty state.")]
Expand Down
251 changes: 205 additions & 46 deletions raphtory/src/io/arrow/df_loaders.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use crate::{
core::{
entities::LayerIds,
entities::{nodes::node_ref::AsNodeRef, LayerIds},
utils::errors::{GraphError, LoadError},
PropType,
},
db::api::{mutation::internal::*, view::StaticGraphViewOps},
io::arrow::{
dataframe::{DFChunk, DFView},
layer_col::{lift_layer_col, lift_node_type_col},
node_col::lift_node_col,
prop_handler::*,
},
prelude::*,
Expand Down Expand Up @@ -635,7 +634,7 @@ pub(crate) fn load_node_props_from_df<
}

pub(crate) fn load_edges_props_from_df<
G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps,
G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps + InternalCache,
S: AsRef<str>,
>(
df_view: DFView<impl Iterator<Item = Result<DFChunk, GraphError>>>,
Expand All @@ -649,78 +648,238 @@ pub(crate) fn load_edges_props_from_df<
) -> Result<(), GraphError> {
let constant_properties = constant_properties
.into_iter()
.flat_map(|s| s.into_iter().map(|s| s.as_ref()))
.flatten()
.map(|s| s.as_ref())
.collect::<Vec<_>>();

let constant_properties_indices = constant_properties
.iter()
.map(|name| df_view.get_index(name.as_ref()))
.map(|name| df_view.get_index(name))
.collect::<Result<Vec<_>, GraphError>>()?;

let src_index = df_view.get_index(src)?;
let dst_index = df_view.get_index(dst)?;
let layer_index = if let Some(layer_col) = layer_col {
Some(df_view.get_index(layer_col.as_ref()))
Some(df_view.get_index(layer_col.as_ref())?)
} else {
None
};
let layer_index = layer_index.transpose()?;
let shared_constant_properties =
process_shared_properties(shared_constant_properties, |key, dtype| {
graph.resolve_edge_property(key, dtype, true)
})?;

#[cfg(feature = "python")]
let mut pb = build_progress_bar("Loading edge properties".to_string(), df_view.num_rows)?;
let shared_constant_properties = match shared_constant_properties {
None => {
vec![]
}
Some(props) => props
.iter()
.map(|(key, prop)| {
Ok((
graph
.resolve_edge_property(key, prop.dtype(), true)?
.inner(),
prop.clone(),
))
})
.collect::<Result<Vec<_>, GraphError>>()?,
};
#[cfg(feature = "python")]
let _ = pb.update(0);

let mut src_col_resolved = vec![];
let mut dst_col_resolved = vec![];
let mut eid_col_resolved = vec![];

let cache = graph.get_cache();
let mut write_locked_graph = graph.write_lock()?;
let cache_shards = cache.map(|cache| {
(0..write_locked_graph.num_shards())
.map(|_| cache.fork())
.collect::<Vec<_>>()
});

let g = write_locked_graph.graph;

for chunk in df_view.chunks {
let df = chunk?;
let const_prop_iter = combine_properties(
let const_prop_cols = combine_properties(
&constant_properties,
&constant_properties_indices,
&df,
|name, dtype| graph.resolve_edge_property(name, dtype, true),
|key, dtype| graph.resolve_edge_property(key, dtype, true),
)?;

let layer = lift_layer_col(layer, layer_index, &df)?;
let src_col = lift_node_col(src_index, &df)?;
let dst_col = lift_node_col(dst_index, &df)?;
let layer_col_resolved = layer.resolve(graph)?;

let src_col = df.node_col(src_index)?;
src_col.validate(graph, LoadError::MissingSrcError)?;

let dst_col = df.node_col(dst_index)?;
dst_col.validate(graph, LoadError::MissingDstError)?;

// It's our graph, no one else can change it
src_col_resolved.resize_with(df.len(), Default::default);
src_col
.par_iter()
.zip(dst_col.par_iter())
.zip(layer.par_iter())
.zip(const_prop_iter.par_rows())
.try_for_each(|(((src, dst), layer), cprops)| {
let src = src.ok_or(LoadError::MissingSrcError)?;
let dst = dst.ok_or(LoadError::MissingDstError)?;
let e = graph
.edge(src, dst)
.ok_or_else(|| GraphError::EdgeMissingError {
src: src.to_owned(),
dst: dst.to_owned(),
})?;
let layer_id = graph.resolve_layer(layer)?.inner();
let props = cprops
.chain(shared_constant_properties.iter().cloned())
.collect::<Vec<_>>();
if !props.is_empty() {
graph.internal_add_constant_edge_properties(e.edge.pid(), layer_id, &props)?;
.zip(src_col_resolved.par_iter_mut())
.try_for_each(|(gid, resolved)| {
let gid = gid.ok_or(LoadError::FatalError)?;
let vid = g
.resolve_node_ref(gid.as_node_ref())
.ok_or(LoadError::MissingNodeError)?;
*resolved = vid;
Ok::<(), LoadError>(())
})?;

dst_col_resolved.resize_with(df.len(), Default::default);
dst_col
.par_iter()
.zip(dst_col_resolved.par_iter_mut())
.try_for_each(|(gid, resolved)| {
let gid = gid.ok_or(LoadError::FatalError)?;
let vid = g
.resolve_node_ref(gid.as_node_ref())
.ok_or(LoadError::MissingNodeError)?;
*resolved = vid;
Ok::<(), LoadError>(())
})?;

write_locked_graph
.nodes
.resize(write_locked_graph.num_nodes());

// resolve all the edges
eid_col_resolved.resize_with(df.len(), Default::default);
let eid_col_shared = atomic_usize_from_mut_slice(cast_slice_mut(&mut eid_col_resolved));
write_locked_graph
.nodes
.par_iter_mut()
.try_for_each(|shard| {
for (row, (src, dst)) in src_col_resolved
.iter()
.zip(dst_col_resolved.iter())
.enumerate()
{
if let Some(src_node) = shard.get(*src) {
// we know this is here
let EID(eid) = src_node
.find_edge_eid(*dst, &LayerIds::All)
.ok_or(LoadError::MissingEdgeError(*src, *dst))?;
eid_col_shared[row].store(eid, Ordering::Relaxed);
}
}
Ok::<_, LoadError>(())
})?;

write_locked_graph
.edges
.par_iter_mut()
.try_for_each(|mut shard| {
let mut c_props = vec![];
for (idx, (eid, layer)) in eid_col_resolved
.iter()
.zip(layer_col_resolved.iter())
.enumerate()
{
let shard_id = shard.shard_id();
if let Some(mut edge) = shard.get_mut(*eid) {
c_props.clear();
c_props.extend(const_prop_cols.iter_row(idx));
c_props.extend_from_slice(&shared_constant_properties);

if let Some(caches) = cache_shards.as_ref() {
let cache = &caches[shard_id];
cache.add_edge_cprops(*eid, *layer, &c_props);
}

if !c_props.is_empty() {
let edge_layer = edge.layer_mut(*layer);

for (id, prop) in c_props.drain(..) {
edge_layer.update_constant_prop(id, prop)?;
}
}
}
}
Ok::<(), GraphError>(())
})?;

if let Some(cache) = cache {
cache.write()?;
}
if let Some(cache_shards) = cache_shards.as_ref() {
for cache in cache_shards {
cache.write()?;
}
}

#[cfg(feature = "python")]
let _ = pb.update(df.len());
}
Ok(())

// let constant_properties = constant_properties
// .into_iter()
// .flat_map(|s| s.into_iter().map(|s| s.as_ref()))
// .collect::<Vec<_>>();
// let constant_properties_indices = constant_properties
// .iter()
// .map(|name| df_view.get_index(name.as_ref()))
// .collect::<Result<Vec<_>, GraphError>>()?;
// let src_index = df_view.get_index(src)?;
// let dst_index = df_view.get_index(dst)?;
// let layer_index = if let Some(layer_col) = layer_col {
// Some(df_view.get_index(layer_col.as_ref()))
// } else {
// None
// };
// let layer_index = layer_index.transpose()?;
// #[cfg(feature = "python")]
// let mut pb = build_progress_bar("Loading edge properties".to_string(), df_view.num_rows)?;
// let shared_constant_properties = match shared_constant_properties {
// None => {
// vec![]
// }
// Some(props) => props
// .iter()
// .map(|(key, prop)| {
// Ok((
// graph
// .resolve_edge_property(key, prop.dtype(), true)?
// .inner(),
// prop.clone(),
// ))
// })
// .collect::<Result<Vec<_>, GraphError>>()?,
// };

// for chunk in df_view.chunks {
// let df = chunk?;
// let const_prop_iter = combine_properties(
// &constant_properties,
// &constant_properties_indices,
// &df,
// |name, dtype| graph.resolve_edge_property(name, dtype, true),
// )?;

// let layer = lift_layer_col(layer, layer_index, &df)?;
// let src_col = lift_node_col(src_index, &df)?;
// let dst_col = lift_node_col(dst_index, &df)?;
// src_col
// .par_iter()
// .zip(dst_col.par_iter())
// .zip(layer.par_iter())
// .zip(const_prop_iter.par_rows())
// .try_for_each(|(((src, dst), layer), cprops)| {
// let src = src.ok_or(LoadError::MissingSrcError)?;
// let dst = dst.ok_or(LoadError::MissingDstError)?;
// let e = graph
// .edge(src, dst)
// .ok_or_else(|| GraphError::EdgeMissingError {
// src: src.to_owned(),
// dst: dst.to_owned(),
// })?;
// let layer_id = graph.resolve_layer(layer)?.inner();
// let props = cprops
// .chain(shared_constant_properties.iter().cloned())
// .collect::<Vec<_>>();
// if !props.is_empty() {
// graph.internal_add_constant_edge_properties(e.edge.pid(), layer_id, &props)?;
// }
// Ok::<(), GraphError>(())
// })?;
// #[cfg(feature = "python")]
// let _ = pb.update(df.len());
// }
// Ok(())
}

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions raphtory/src/io/parquet_loaders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ pub fn load_node_props_from_parquet<
}

pub fn load_edge_props_from_parquet<
G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps,
G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps + InternalCache,
>(
graph: &G,
parquet_path: &Path,
Expand Down Expand Up @@ -210,7 +210,7 @@ pub fn load_edge_props_from_parquet<
shared_const_properties,
layer,
layer_col,
graph.core_graph(),
graph,
)
.map_err(|e| GraphError::LoadFailure(format!("Failed to load graph {e:?}")))?;
}
Expand Down
5 changes: 4 additions & 1 deletion raphtory/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,10 @@ pub mod prelude {
pub use raphtory_api::core::{entities::GID, input::input_node::InputNode};

#[cfg(feature = "proto")]
pub use crate::serialise::{CacheOps, StableDecode, StableEncode, parquet::{ParquetEncoder, ParquetDecoder}};
pub use crate::serialise::{
parquet::{ParquetDecoder, ParquetEncoder},
CacheOps, StableDecode, StableEncode,
};
}

#[cfg(feature = "storage")]
Expand Down

0 comments on commit 6d229b1

Please sign in to comment.