Skip to content

Commit

Permalink
fix test after rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
fabianmurariu committed Jan 27, 2025
1 parent c14ca6b commit 3f53187
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 9 deletions.
7 changes: 3 additions & 4 deletions raphtory-api/src/core/storage/dict_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,9 @@ impl DictMapper {

pub fn get_name(&self, id: usize) -> ArcStr {
let guard = self.reverse_map.read();
guard
.get(id)
.cloned()
.expect("internal ids should always be mapped to a name")
guard.get(id).cloned().expect(&format!(
"internal ids should always be mapped to a name {id}"
))
}

pub fn get_keys(&self) -> ArcReadLockedVec<ArcStr> {
Expand Down
9 changes: 9 additions & 0 deletions raphtory/src/db/api/storage/graph/locked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ impl<'a> WriteLockedGraph<'a> {
.get_or_init(gid, || self.graph.storage.nodes.next_id())
}

pub fn resolve_node_type(
&self,
node_type: Option<&str>,
) -> Result<MaybeNew<usize>, GraphError> {
node_type
.map(|node_type| Ok(self.graph.node_meta.get_or_create_node_type_id(node_type)))
.unwrap_or_else(|| Ok(MaybeNew::Existing(0)))
}

pub fn num_shards(&self) -> usize {
self.nodes.num_shards().max(self.edges.num_shards())
}
Expand Down
9 changes: 6 additions & 3 deletions raphtory/src/db/api/storage/graph/storage_ops/additions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,12 @@ impl InternalAdditionOps for TemporalGraph {
dtype: PropType,
is_static: bool,
) -> Result<MaybeNew<usize>, GraphError> {
self.node_meta
.resolve_prop_id(prop, dtype, is_static)
.map_err(|e| e.into())
let out = self
.node_meta
.resolve_prop_id(prop, dtype.clone(), is_static)
.map_err(|e| e.into());
println!("resolve_node_property: const:{is_static} {prop}:{dtype:?} -> {out:?}");
out
}

fn resolve_edge_property(
Expand Down
13 changes: 11 additions & 2 deletions raphtory/src/io/arrow/df_loaders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ pub(crate) fn load_nodes_from_df<
let mut pb = build_progress_bar("Loading nodes".to_string(), df_view.num_rows)?;

let mut node_col_resolved = vec![];
let mut node_type_col_resolved = vec![];

let cache = graph.get_cache();
let mut write_locked_graph = graph.write_lock()?;
Expand All @@ -117,20 +118,28 @@ pub(crate) fn load_nodes_from_df<
|key, dtype| graph.resolve_node_property(key, dtype, true),
)?;
let node_type_col = lift_node_type_col(node_type, node_type_index, &df)?;
let node_type_col_resolved = node_type_col.resolve(graph)?;

let time_col = df.time_col(time_index)?;
let node_col = df.node_col(node_id_index)?;

node_col_resolved.resize_with(df.len(), Default::default);
node_type_col_resolved.resize_with(df.len(), Default::default);

node_col
.par_iter()
.zip(node_col_resolved.par_iter_mut())
.try_for_each(|(gid, resolved)| {
.zip(node_type_col.par_iter())
.zip(node_type_col_resolved.par_iter_mut())
.try_for_each(|(((gid, resolved), node_type), node_type_resolved)| {
let gid = gid.ok_or(LoadError::FatalError)?;
let vid = write_locked_graph
.resolve_node(gid)
.map_err(|_| LoadError::FatalError)?;
let node_type_res = write_locked_graph
.resolve_node_type(node_type)
.map_err(|_| LoadError::FatalError)?
.inner();
*node_type_resolved = node_type_res;
if let Some(cache) = cache {
cache.resolve_node(vid, gid);
}
Expand Down
1 change: 1 addition & 0 deletions raphtory/src/io/arrow/layer_col.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
io::arrow::dataframe::DFChunk,
};
use polars_arrow::array::Utf8Array;
use raphtory_api::core::storage::dict_mapper::MaybeNew;
use rayon::{
iter::{
plumbing::{Consumer, ProducerCallback, UnindexedConsumer},
Expand Down

0 comments on commit 3f53187

Please sign in to comment.