Skip to content

Commit

Permalink
Graph to parquet (#1932)
Browse files Browse the repository at this point in the history
* Write all props except Arrays to parquet for edges, nodes and graph, need to add deletions

* encode and decode Graph as parquet

* fix test after rebase

* fix issues with edge layers

* fixing issues from review

* fix test failures in df_loaders.rs

* fix properties in df_loaders.rs

* fix compilation issues

* fix the columns for loading props to graph too
  • Loading branch information
fabianmurariu authored Jan 30, 2025
1 parent fc30a06 commit 0a9ba26
Show file tree
Hide file tree
Showing 51 changed files with 3,120 additions and 627 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ members = [
]
default-members = [
"raphtory",
"raphtory-graphql"
]
resolver = "2"

Expand Down Expand Up @@ -149,6 +148,8 @@ datafusion = { version = "43.0.0" }
sqlparser = "0.51.0"
futures = "0.3"
arrow = { version = "53.2.0" }
parquet = { version = "53.2.0" }
arrow-json = { version = "53.2.0" }
arrow-buffer = { version = "53.2.0" }
arrow-schema = { version = "53.2.0" }
arrow-array = { version = "53.2.0" }
Expand Down
2 changes: 1 addition & 1 deletion examples/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ keywords = ["graph", "temporal-graph", "temporal", "examples"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
raphtory = { path = "../../raphtory", features = ["io"] }
raphtory = { path = "../../raphtory", features = ["io", "proto"] }
chrono = { workspace = true }
regex = { workspace = true }
serde = { workspace = true }
Expand Down
4 changes: 3 additions & 1 deletion python/tests/test_graphdb/test_disk_graph.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from raphtory import DiskGraphStorage
from raphtory import algorithms
import pandas as pd
import tempfile
Expand Down Expand Up @@ -40,6 +39,7 @@


def test_counts():
from raphtory import DiskGraphStorage
graph_dir = tempfile.TemporaryDirectory()
graph = DiskGraphStorage.load_from_pandas(
graph_dir.name, edges, "time", "src", "dst"
Expand All @@ -50,6 +50,7 @@ def test_counts():


def test_disk_graph():
from raphtory import DiskGraphStorage
curr_dir = os.path.dirname(os.path.abspath(__file__))
rsc_dir = os.path.join(
curr_dir, "..", "..", "..", "pometry-storage-private", "resources"
Expand Down Expand Up @@ -139,6 +140,7 @@ def test_disk_graph():


def test_disk_graph_type_filter():
from raphtory import DiskGraphStorage
curr_dir = os.path.dirname(os.path.abspath(__file__))
rsc_dir = os.path.join(
curr_dir, "..", "..", "..", "pometry-storage-private", "resources"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def test_graph_node_property_filter_equal_type_error(graph):
}
}
"""
expected_error_message = "PropertyType Error: Wrong type for property prop5: expected List but actual type is I64"
expected_error_message = "PropertyType Error: Wrong type for property prop5: expected List(I64) but actual type is I64"
run_graphql_error_test(query, expected_error_message, graph())


Expand Down Expand Up @@ -936,7 +936,7 @@ def test_graph_edge_property_filter_equal_type_error(graph):
}
}
"""
expected_error_message = "PropertyType Error: Wrong type for property eprop5: expected List but actual type is I64"
expected_error_message = "PropertyType Error: Wrong type for property eprop5: expected List(I64) but actual type is I64"
run_graphql_error_test(query, expected_error_message, graph())


Expand Down
2 changes: 1 addition & 1 deletion python/tests/test_graphql/test_nodes_property_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def test_node_property_filter_equal_type_error(graph):
}
}
"""
expected_error_message = "PropertyType Error: Wrong type for property prop5: expected List but actual type is I64"
expected_error_message = "PropertyType Error: Wrong type for property prop5: expected List(I64) but actual type is I64"
run_graphql_error_test(query, expected_error_message, graph())


Expand Down
6 changes: 3 additions & 3 deletions python/tests/test_graphql/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def test_node_edge_properties_schema():
},
{
"key": "prop6",
"propertyType": "Map",
"propertyType": "Map{ data: Str }",
"variants": ['{"data": "map"}'],
},
],
Expand All @@ -172,7 +172,7 @@ def test_node_edge_properties_schema():
},
{
"key": "propArray",
"propertyType": "List",
"propertyType": "List<I64>",
"variants": ["[1, 2, 3]"],
},
{
Expand Down Expand Up @@ -263,7 +263,7 @@ def test_node_edge_properties_schema():
},
{
"key": "list_prop",
"propertyType": "List",
"propertyType": "List<F64>",
"variants": ["[1.1, 2.2, 3.3]"],
},
{
Expand Down
2 changes: 1 addition & 1 deletion raphtory-api/src/core/entities/properties/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::core::PropType;

pub mod props;

#[derive(thiserror::Error, Debug)]
#[derive(thiserror::Error, Debug, PartialEq)]
pub enum PropError {
#[error("Wrong type for property {name}: expected {expected:?} but actual type is {actual:?}")]
PropertyTypeError {
Expand Down
128 changes: 104 additions & 24 deletions raphtory-api/src/core/entities/properties/props.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::core::{
dict_mapper::{DictMapper, MaybeNew},
locked_vec::ArcReadLockedVec,
},
PropType,
unify_types, PropType,
};

use super::PropError;
Expand Down Expand Up @@ -217,37 +217,33 @@ impl PropMapper {
let id = wrapped_id.inner();
let dtype_read = self.dtypes.read_recursive();
if let Some(old_type) = dtype_read.get(id) {
if !matches!(old_type, PropType::Empty) {
return if *old_type == dtype {
Ok(wrapped_id)
} else {
Err(PropError::PropertyTypeError {
name: prop.to_owned(),
expected: old_type.clone(),
actual: dtype,
})
};
let mut unified = false;
if let Ok(_) = unify_types(&dtype, old_type, &mut unified) {
if !unified {
// means the types were equal, no change needed
return Ok(wrapped_id);
}
} else {
return Err(PropError::PropertyTypeError {
name: prop.to_owned(),
expected: old_type.clone(),
actual: dtype,
});
}
}
drop(dtype_read); // drop the read lock and wait for write lock as type did not exist yet
let mut dtype_write = self.dtypes.write();
match dtype_write.get(id).cloned() {
Some(old_type) => {
if matches!(old_type, PropType::Empty) {
// vector already resized but this id is not filled yet, set the dtype and return id
dtype_write[id] = dtype;
if let Ok(tpe) = unify_types(&dtype, &old_type, &mut false) {
dtype_write[id] = tpe;
Ok(wrapped_id)
} else {
// already filled because a different thread won the race for this id, check the type matches
if old_type == dtype {
Ok(wrapped_id)
} else {
Err(PropError::PropertyTypeError {
name: prop.to_owned(),
expected: old_type,
actual: dtype,
})
}
Err(PropError::PropertyTypeError {
name: prop.to_owned(),
expected: old_type,
actual: dtype,
})
}
}
None => {
Expand Down Expand Up @@ -276,3 +272,87 @@ impl PropMapper {
self.dtypes.read_recursive()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::core::PropType;

#[test]
fn test_get_or_create_and_validate_new_property() {
let prop_mapper = PropMapper::default();
let result = prop_mapper.get_or_create_and_validate("new_prop", PropType::U8);
assert!(result.is_ok());
assert_eq!(result.unwrap().inner(), 0);
assert_eq!(prop_mapper.get_dtype(0), Some(PropType::U8));
}

#[test]
fn test_get_or_create_and_validate_existing_property_same_type() {
let prop_mapper = PropMapper::default();
prop_mapper
.get_or_create_and_validate("existing_prop", PropType::U8)
.unwrap();
let result = prop_mapper.get_or_create_and_validate("existing_prop", PropType::U8);
assert!(result.is_ok());
assert_eq!(result.unwrap().inner(), 0);
assert_eq!(prop_mapper.get_dtype(0), Some(PropType::U8));
}

#[test]
fn test_get_or_create_and_validate_existing_property_different_type() {
let prop_mapper = PropMapper::default();
prop_mapper
.get_or_create_and_validate("existing_prop", PropType::U8)
.unwrap();
let result = prop_mapper.get_or_create_and_validate("existing_prop", PropType::U16);
assert!(result.is_err());
if let Err(PropError::PropertyTypeError {
name,
expected,
actual,
}) = result
{
assert_eq!(name, "existing_prop");
assert_eq!(expected, PropType::U8);
assert_eq!(actual, PropType::U16);
} else {
panic!("Expected PropertyTypeError");
}
}

#[test]
fn test_get_or_create_and_validate_unify_types() {
let prop_mapper = PropMapper::default();
prop_mapper
.get_or_create_and_validate("prop", PropType::Empty)
.unwrap();
let result = prop_mapper.get_or_create_and_validate("prop", PropType::U8);
assert!(result.is_ok());
assert_eq!(result.unwrap().inner(), 0);
assert_eq!(prop_mapper.get_dtype(0), Some(PropType::U8));
}

#[test]
fn test_get_or_create_and_validate_resize_vector() {
let prop_mapper = PropMapper::default();
prop_mapper.set_id_and_dtype("existing_prop", 5, PropType::U8);
let result = prop_mapper.get_or_create_and_validate("new_prop", PropType::U16);
assert!(result.is_ok());
assert_eq!(result.unwrap().inner(), 6);
assert_eq!(prop_mapper.get_dtype(6), Some(PropType::U16));
}

#[test]
fn test_get_or_create_and_validate_two_independent_properties() {
let prop_mapper = PropMapper::default();
let result1 = prop_mapper.get_or_create_and_validate("prop1", PropType::U8);
let result2 = prop_mapper.get_or_create_and_validate("prop2", PropType::U16);
assert!(result1.is_ok());
assert!(result2.is_ok());
assert_eq!(result1.unwrap().inner(), 0);
assert_eq!(result2.unwrap().inner(), 1);
assert_eq!(prop_mapper.get_dtype(0), Some(PropType::U8));
assert_eq!(prop_mapper.get_dtype(1), Some(PropType::U16));
}
}
Loading

0 comments on commit 0a9ba26

Please sign in to comment.