Skip to content

Commit

Permalink
ampc shortest path test case
Browse files Browse the repository at this point in the history
  • Loading branch information
mikkeldenker committed Dec 11, 2024
1 parent e24a389 commit 0912e15
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ pub fn build(
) -> Coordinator<ShortestPathJob> {
let setup = ShortestPathSetup::new_for_dht_members(dht, source);

Coordinator::new(setup, workers.clone())
Coordinator::new(setup, workers)
.with_mapper(ShortestPathMapper::RelaxEdges)
.with_mapper(ShortestPathMapper::UpdateChangedNodes)
}
Expand Down
8 changes: 6 additions & 2 deletions crates/core/src/entrypoint/ampc/shortest_path/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,12 @@ impl ShortestPathMapper {
for node in exact_changed_nodes {
for edge in worker
.graph()
.search(&query::ForwardlinksQuery::new(*node))
.unwrap_or_default()
.search(
&query::ForwardlinksQuery::new(*node)
.skip_self_links(false)
.deduplicate(false),
)
.unwrap()
{
if edge.rel_flags.intersects(*SKIPPED_REL) {
continue;
Expand Down
104 changes: 104 additions & 0 deletions crates/core/src/entrypoint/ampc/shortest_path/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,107 @@ impl Job for ShortestPathJob {
self.shard == worker.shard()
}
}

#[cfg(test)]
mod tests {
use std::{collections::BTreeMap, sync::Arc};

use tracing_test::traced_test;
use webgraph::{Edge, ShortestPaths, Webgraph};

use crate::{config::WebgraphGranularity, free_socket_addr};

use super::*;

#[test]
#[traced_test]
fn test_simple_graph() {
let temp_dir = crate::gen_temp_dir().unwrap();
let mut combined = Webgraph::builder(temp_dir.as_ref().join("combined"), 0u64.into())
.open()
.unwrap();
let mut a = Webgraph::builder(temp_dir.as_ref().join("a"), 0u64.into())
.open()
.unwrap();
let mut b = Webgraph::builder(temp_dir.as_ref().join("b"), 0u64.into())
.open()
.unwrap();

let edges = crate::webgraph::tests::test_edges();

for (i, (from, to)) in edges.into_iter().enumerate() {
let e = Edge::new_test(from.clone(), to.clone());
combined.insert(e.clone()).unwrap();

if i % 2 == 0 {
a.insert(e).unwrap();
} else {
b.insert(e).unwrap();
}
}

combined.commit().unwrap();
a.commit().unwrap();
b.commit().unwrap();

let a = Arc::new(a);
let b = Arc::new(b);

let node = webgraph::Node::from("C");

let expected = combined
.raw_distances(node.id(), WebgraphGranularity::Page)
.into_iter()
.map(|(node, dist)| (node, dist as u64))
.collect::<BTreeMap<_, _>>();

let worker = ShortestPathWorker::new(a, 1.into());

let worker_addr = free_socket_addr();

std::thread::spawn(move || {
worker.run(worker_addr).unwrap();
});

std::thread::sleep(std::time::Duration::from_secs(2)); // Wait for worker to start
let a = RemoteShortestPathWorker::new(1.into(), worker_addr).unwrap();

let worker = ShortestPathWorker::new(b, 2.into());
let worker_addr = free_socket_addr();
std::thread::spawn(move || {
worker.run(worker_addr).unwrap();
});

std::thread::sleep(std::time::Duration::from_secs(2)); // Wait for worker to start

let b = RemoteShortestPathWorker::new(2.into(), worker_addr).unwrap();

let (dht_shard, dht_addr) = crate::entrypoint::ampc::dht::tests::setup();

let res = coordinator::build(
&[(dht_shard, dht_addr)],
vec![a.clone(), b.clone()],
node.id(),
)
.run(
vec![
ShortestPathJob {
shard: a.shard(),
source: node.id(),
},
ShortestPathJob {
shard: b.shard(),
source: node.id(),
},
],
coordinator::ShortestPathFinish {
max_distance: Some(128),
},
)
.unwrap();

let actual = res.distances.iter().collect::<BTreeMap<_, _>>();

assert_eq!(expected, actual);
}
}

0 comments on commit 0912e15

Please sign in to comment.