Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graph to parquet #1932

Merged
merged 10 commits into from
Jan 30, 2025
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ members = [
]
default-members = [
"raphtory",
"raphtory-graphql"
]
resolver = "2"

Expand Down Expand Up @@ -149,6 +148,8 @@ datafusion = { version = "43.0.0" }
sqlparser = "0.51.0"
futures = "0.3"
arrow = { version = "53.2.0" }
parquet = { version = "53.2.0" }
arrow-json = { version = "53.2.0" }
arrow-buffer = { version = "53.2.0" }
arrow-schema = { version = "53.2.0" }
arrow-array = { version = "53.2.0" }
Expand Down
2 changes: 1 addition & 1 deletion examples/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ keywords = ["graph", "temporal-graph", "temporal", "examples"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
raphtory = { path = "../../raphtory", features = ["io"] }
raphtory = { path = "../../raphtory", features = ["io", "proto"] }
chrono = { workspace = true }
regex = { workspace = true }
serde = { workspace = true }
Expand Down
4 changes: 3 additions & 1 deletion python/tests/test_graphdb/test_disk_graph.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from raphtory import DiskGraphStorage
from raphtory import algorithms
import pandas as pd
import tempfile
Expand Down Expand Up @@ -40,6 +39,7 @@


def test_counts():
from raphtory import DiskGraphStorage
graph_dir = tempfile.TemporaryDirectory()
graph = DiskGraphStorage.load_from_pandas(
graph_dir.name, edges, "time", "src", "dst"
Expand All @@ -50,6 +50,7 @@ def test_counts():


def test_disk_graph():
from raphtory import DiskGraphStorage
curr_dir = os.path.dirname(os.path.abspath(__file__))
rsc_dir = os.path.join(
curr_dir, "..", "..", "..", "pometry-storage-private", "resources"
Expand Down Expand Up @@ -139,6 +140,7 @@ def test_disk_graph():


def test_disk_graph_type_filter():
from raphtory import DiskGraphStorage
curr_dir = os.path.dirname(os.path.abspath(__file__))
rsc_dir = os.path.join(
curr_dir, "..", "..", "..", "pometry-storage-private", "resources"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def test_graph_node_property_filter_equal_type_error(graph):
}
}
"""
expected_error_message = "PropertyType Error: Wrong type for property prop5: expected List but actual type is I64"
expected_error_message = "PropertyType Error: Wrong type for property prop5: expected List(I64) but actual type is I64"
run_graphql_error_test(query, expected_error_message, graph())


Expand Down Expand Up @@ -936,7 +936,7 @@ def test_graph_edge_property_filter_equal_type_error(graph):
}
}
"""
expected_error_message = "PropertyType Error: Wrong type for property eprop5: expected List but actual type is I64"
expected_error_message = "PropertyType Error: Wrong type for property eprop5: expected List(I64) but actual type is I64"
run_graphql_error_test(query, expected_error_message, graph())


Expand Down
2 changes: 1 addition & 1 deletion python/tests/test_graphql/test_nodes_property_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def test_node_property_filter_equal_type_error(graph):
}
}
"""
expected_error_message = "PropertyType Error: Wrong type for property prop5: expected List but actual type is I64"
expected_error_message = "PropertyType Error: Wrong type for property prop5: expected List(I64) but actual type is I64"
run_graphql_error_test(query, expected_error_message, graph())


Expand Down
6 changes: 3 additions & 3 deletions python/tests/test_graphql/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def test_node_edge_properties_schema():
},
{
"key": "prop6",
"propertyType": "Map",
"propertyType": "Map{ data: Str }",
"variants": ['{"data": "map"}'],
},
],
Expand All @@ -172,7 +172,7 @@ def test_node_edge_properties_schema():
},
{
"key": "propArray",
"propertyType": "List",
"propertyType": "List<I64>",
"variants": ["[1, 2, 3]"],
},
{
Expand Down Expand Up @@ -263,7 +263,7 @@ def test_node_edge_properties_schema():
},
{
"key": "list_prop",
"propertyType": "List",
"propertyType": "List<F64>",
"variants": ["[1.1, 2.2, 3.3]"],
},
{
Expand Down
2 changes: 1 addition & 1 deletion raphtory-api/src/core/entities/properties/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::core::PropType;

pub mod props;

#[derive(thiserror::Error, Debug)]
#[derive(thiserror::Error, Debug, PartialEq)]
pub enum PropError {
#[error("Wrong type for property {name}: expected {expected:?} but actual type is {actual:?}")]
PropertyTypeError {
Expand Down
130 changes: 105 additions & 25 deletions raphtory-api/src/core/entities/properties/props.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::core::{
dict_mapper::{DictMapper, MaybeNew},
locked_vec::ArcReadLockedVec,
},
PropType,
unify_types, PropType,
};

use super::PropError;
Expand Down Expand Up @@ -194,7 +194,7 @@ impl PropMapper {
Some(id) => {
let existing_dtype = self
.get_dtype(id)
.expect("Existing id should always have a dtype");
.expect("Existing id should always have a dtype"); // TODO: change this to return an error
fabianmurariu marked this conversation as resolved.
Show resolved Hide resolved
if existing_dtype == dtype {
Ok(Some(id))
} else {
Expand All @@ -217,37 +217,33 @@ impl PropMapper {
let id = wrapped_id.inner();
let dtype_read = self.dtypes.read_recursive();
if let Some(old_type) = dtype_read.get(id) {
if !matches!(old_type, PropType::Empty) {
return if *old_type == dtype {
Ok(wrapped_id)
} else {
Err(PropError::PropertyTypeError {
name: prop.to_owned(),
expected: old_type.clone(),
actual: dtype,
})
};
let mut unified = false;
if let Ok(_) = unify_types(&dtype, old_type, &mut unified) {
if !unified {
// means the types were equal, no change needed
return Ok(wrapped_id);
}
} else {
return Err(PropError::PropertyTypeError {
name: prop.to_owned(),
expected: old_type.clone(),
actual: dtype,
});
}
}
drop(dtype_read); // drop the read lock and wait for write lock as type did not exist yet
let mut dtype_write = self.dtypes.write();
match dtype_write.get(id).cloned() {
Some(old_type) => {
if matches!(old_type, PropType::Empty) {
// vector already resized but this id is not filled yet, set the dtype and return id
dtype_write[id] = dtype;
if let Ok(tpe) = unify_types(&dtype, &old_type, &mut false) {
dtype_write[id] = tpe;
Ok(wrapped_id)
} else {
// already filled because a different thread won the race for this id, check the type matches
if old_type == dtype {
Ok(wrapped_id)
} else {
Err(PropError::PropertyTypeError {
name: prop.to_owned(),
expected: old_type,
actual: dtype,
})
}
Err(PropError::PropertyTypeError {
name: prop.to_owned(),
expected: old_type,
actual: dtype,
})
}
}
None => {
Expand Down Expand Up @@ -276,3 +272,87 @@ impl PropMapper {
self.dtypes.read_recursive()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::core::PropType;

#[test]
fn test_get_or_create_and_validate_new_property() {
let prop_mapper = PropMapper::default();
let result = prop_mapper.get_or_create_and_validate("new_prop", PropType::U8);
assert!(result.is_ok());
assert_eq!(result.unwrap().inner(), 0);
assert_eq!(prop_mapper.get_dtype(0), Some(PropType::U8));
}

#[test]
fn test_get_or_create_and_validate_existing_property_same_type() {
let prop_mapper = PropMapper::default();
prop_mapper
.get_or_create_and_validate("existing_prop", PropType::U8)
.unwrap();
let result = prop_mapper.get_or_create_and_validate("existing_prop", PropType::U8);
assert!(result.is_ok());
assert_eq!(result.unwrap().inner(), 0);
assert_eq!(prop_mapper.get_dtype(0), Some(PropType::U8));
}

#[test]
fn test_get_or_create_and_validate_existing_property_different_type() {
let prop_mapper = PropMapper::default();
prop_mapper
.get_or_create_and_validate("existing_prop", PropType::U8)
.unwrap();
let result = prop_mapper.get_or_create_and_validate("existing_prop", PropType::U16);
assert!(result.is_err());
if let Err(PropError::PropertyTypeError {
name,
expected,
actual,
}) = result
{
assert_eq!(name, "existing_prop");
assert_eq!(expected, PropType::U8);
assert_eq!(actual, PropType::U16);
} else {
panic!("Expected PropertyTypeError");
}
}

#[test]
fn test_get_or_create_and_validate_unify_types() {
let prop_mapper = PropMapper::default();
prop_mapper
.get_or_create_and_validate("prop", PropType::Empty)
.unwrap();
let result = prop_mapper.get_or_create_and_validate("prop", PropType::U8);
assert!(result.is_ok());
assert_eq!(result.unwrap().inner(), 0);
assert_eq!(prop_mapper.get_dtype(0), Some(PropType::U8));
}

#[test]
fn test_get_or_create_and_validate_resize_vector() {
let prop_mapper = PropMapper::default();
prop_mapper.set_id_and_dtype("existing_prop", 5, PropType::U8);
let result = prop_mapper.get_or_create_and_validate("new_prop", PropType::U16);
assert!(result.is_ok());
assert_eq!(result.unwrap().inner(), 6);
assert_eq!(prop_mapper.get_dtype(6), Some(PropType::U16));
}

#[test]
fn test_get_or_create_and_validate_two_independent_properties() {
let prop_mapper = PropMapper::default();
let result1 = prop_mapper.get_or_create_and_validate("prop1", PropType::U8);
let result2 = prop_mapper.get_or_create_and_validate("prop2", PropType::U16);
assert!(result1.is_ok());
assert!(result2.is_ok());
assert_eq!(result1.unwrap().inner(), 0);
assert_eq!(result2.unwrap().inner(), 1);
assert_eq!(prop_mapper.get_dtype(0), Some(PropType::U8));
assert_eq!(prop_mapper.get_dtype(1), Some(PropType::U16));
}
}
Loading
Loading