Skip to content

Commit

Permalink
fixes for parquet encoders
Browse files Browse the repository at this point in the history
  • Loading branch information
fabianmurariu committed Jan 27, 2025
1 parent 6d229b1 commit b7b79b0
Show file tree
Hide file tree
Showing 8 changed files with 16 additions and 21 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ members = [
]
default-members = [
"raphtory",
"raphtory-graphql"
]
resolver = "2"

Expand Down
1 change: 1 addition & 0 deletions raphtory/src/io/arrow/df_loaders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ pub(crate) fn load_nodes_from_df<
let node_col = df.node_col(node_id_index)?;

node_col_resolved.resize_with(df.len(), Default::default);

node_col
.par_iter()
.zip(node_col_resolved.par_iter_mut())
Expand Down
2 changes: 1 addition & 1 deletion raphtory/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ mod test_utils {
PropType::F64 => any::<f64>().prop_map(Prop::F64).boxed(),
PropType::U8 => any::<u8>().prop_map(Prop::U8).boxed(),
PropType::Bool => any::<bool>().prop_map(Prop::Bool).boxed(),
PropType::DTime => (1970..2024, 1..=12, 1..28, 0..24, 0..60, 0..60)
PropType::DTime => (1900..2024, 1..=12, 1..28, 0..24, 0..60, 0..60)
.prop_map(|(year, month, day, h, m, s)| {
Prop::DTime(
format!(
Expand Down
4 changes: 2 additions & 2 deletions raphtory/src/python/graph/io/pandas_loaders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub(crate) fn load_edges_from_pandas<

pub(crate) fn load_node_props_from_pandas<
'py,
G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps,
G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps + InternalCache,
>(
graph: &G,
df: &Bound<'py, PyAny>,
Expand Down Expand Up @@ -128,7 +128,7 @@ pub(crate) fn load_node_props_from_pandas<

pub(crate) fn load_edge_props_from_pandas<
'py,
G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps,
G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps + InternalCache,
>(
graph: &G,
df: &Bound<'py, PyAny>,
Expand Down
4 changes: 2 additions & 2 deletions raphtory/src/serialise/parquet/edges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub(crate) fn encode_edge_tprop(
]
},
|edges, g, decoder, writer| {
let row_group_size = 100_000.min(edges.len());
let row_group_size = 100_000;
let all_layers = LayerIds::All;

for edge_rows in edges
Expand Down Expand Up @@ -86,7 +86,7 @@ pub(crate) fn encode_edge_deletions(
]
},
|edges, g, decoder, writer| {
let row_group_size = 100_000.min(edges.len());
let row_group_size = 100_000;
let g = g.lock();
let g = &g;
let g_edges = g.edges();
Expand Down
13 changes: 8 additions & 5 deletions raphtory/src/serialise/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,21 @@ pub(crate) fn run_encode(
std::fs::create_dir_all(&root_dir)?;

if size > 0 {
let chunk_size = (size / rayon::current_num_threads()).max(size);
(0..size)
.step_by(chunk_size)
.enumerate()
let chunk_size = (size / rayon::current_num_threads()).max(128);
let iter = (0..size).step_by(chunk_size);

let num_digits = iter.len().to_string().len();

iter.enumerate()
.par_bridge()
.try_for_each(|(chunk, first)| {
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let items = first..(first + chunk_size).min(size);

let node_file = File::create(root_dir.join(format!("{}.parquet", chunk)))?;
let node_file =
File::create(root_dir.join(format!("{chunk:0num_digits$}.parquet")))?;
let mut writer = ArrowWriter::try_new(node_file, schema.clone(), Some(props))?;

let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder()?;
Expand Down
4 changes: 2 additions & 2 deletions raphtory/src/serialise/parquet/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub(crate) fn encode_nodes_tprop(
]
},
|nodes, g, decoder, writer| {
let row_group_size = 100_000.min(nodes.len());
let row_group_size = 100_000;

let cols = g
.node_meta()
Expand Down Expand Up @@ -93,7 +93,7 @@ pub(crate) fn encode_nodes_cprop(
]
},
|nodes, g, decoder, writer| {
let row_group_size = 100_000.min(nodes.len());
let row_group_size = 100_000;

for node_rows in nodes
.into_iter()
Expand Down
8 changes: 0 additions & 8 deletions raphtory/src/serialise/serialise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,6 @@ pub trait CacheOps: Sized {

impl StableEncode for GraphStorage {
fn encode_to_proto(&self) -> proto::Graph {
#[cfg(feature = "storage")]
if let GraphStorage::Disk(storage) = self {
assert!(
storage.inner.layers().len() <= 1,
"Disk based storage not supported right now because it doesn't have aligned edges"
);
}

let storage = self.lock();
let mut graph = proto::Graph::default();

Expand Down

0 comments on commit b7b79b0

Please sign in to comment.