Skip to content

Commit

Permalink
bellman-ford inspired shortest path for distributed graph. works bett…
Browse files Browse the repository at this point in the history
…er than approach in approx centrality when graph is sharded. still need to implement some low-hanging fruits for optimisation
  • Loading branch information
mikkeldenker committed Dec 9, 2024
1 parent 7633b61 commit 49abc74
Show file tree
Hide file tree
Showing 13 changed files with 746 additions and 0 deletions.
7 changes: 7 additions & 0 deletions configs/shortest_paths/coordinator.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
source = "https://www.cdc.gov/healthywater/swimming/"
host = "0.0.0.0:5000"
output_path = "data/shortest_paths"

[gossip]
addr = "0.0.0.0:5001"
seed_nodes = ["0.0.0.0:3102"]
7 changes: 7 additions & 0 deletions configs/shortest_paths/dht.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
node_id = 0
shard = 0
host = "0.0.0.0:3101"

[gossip]
addr = "0.0.0.0:3102"
seed_nodes = []
7 changes: 7 additions & 0 deletions configs/shortest_paths/worker.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
shard = 0
graph_path = "data/webgraph"
host = "0.0.0.0:5002"

[gossip]
addr = "0.0.0.0:5003"
seed_nodes = ["0.0.0.0:3102"]
19 changes: 19 additions & 0 deletions crates/core/src/ampc/dht/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ pub enum UpsertAction {
Inserted,
}

impl UpsertAction {
pub fn is_changed(&self) -> bool {
matches!(self, UpsertAction::Merged | UpsertAction::Inserted)
}
}

#[enum_dispatch]
pub trait UpsertFn {
fn upsert(&self, old: Value, new: Value) -> Value;
Expand All @@ -41,6 +47,7 @@ pub enum UpsertEnum {
HyperLogLog64Upsert,
HyperLogLog128Upsert,
U64Add,
U64Min,
F32Add,
F64Add,
KahanSumAdd,
Expand Down Expand Up @@ -94,6 +101,18 @@ impl UpsertFn for U64Add {
}
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, bincode::Encode, bincode::Decode)]
pub struct U64Min;

impl UpsertFn for U64Min {
fn upsert(&self, old: Value, new: Value) -> Value {
let old = unwrap_value!(old, U64);
let new = unwrap_value!(new, U64);

Value::U64(old.min(new))
}
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, bincode::Encode, bincode::Decode)]
pub struct F32Add;

Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/ampc/dht/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ impl ValueTrait for HyperLogLog128 {}
type HarmonicMeta = crate::entrypoint::ampc::harmonic_centrality::Meta;
impl ValueTrait for HarmonicMeta {}

type ShortestPathMeta = crate::entrypoint::ampc::shortest_path::Meta;
impl ValueTrait for ShortestPathMeta {}

impl ValueTrait for U64BloomFilter {}

type Unit = ();
Expand All @@ -71,6 +74,7 @@ pub enum Value {
HyperLogLog64(HyperLogLog64),
HyperLogLog128(HyperLogLog128),
HarmonicMeta(HarmonicMeta),
ShortestPathMeta(ShortestPathMeta),
U64BloomFilter(U64BloomFilter),
Unit(Unit),
}
Expand Down Expand Up @@ -108,5 +112,6 @@ impl_from_to_value!(HyperLogLog32, HyperLogLog32);
impl_from_to_value!(HyperLogLog64, HyperLogLog64);
impl_from_to_value!(HyperLogLog128, HyperLogLog128);
impl_from_to_value!(HarmonicMeta, HarmonicMeta);
impl_from_to_value!(ShortestPathMeta, ShortestPathMeta);
impl_from_to_value!(U64BloomFilter, U64BloomFilter);
impl_from_to_value!(Unit, Unit);
16 changes: 16 additions & 0 deletions crates/core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,14 @@ pub struct HarmonicCoordinatorConfig {
pub output_path: String,
}

#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct ShortestPathCoordinatorConfig {
pub source: String,
pub gossip: GossipConfig,
pub host: SocketAddr,
pub output_path: String,
}

#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct HarmonicWorkerConfig {
pub gossip: GossipConfig,
Expand All @@ -688,6 +696,14 @@ pub struct HarmonicWorkerConfig {
pub host: SocketAddr,
}

#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct ShortestPathWorkerConfig {
pub gossip: GossipConfig,
pub shard: ShardId,
pub graph_path: String,
pub host: SocketAddr,
}

#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct ApproxHarmonicCoordinatorConfig {
pub gossip: GossipConfig,
Expand Down
13 changes: 13 additions & 0 deletions crates/core/src/distributed/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ pub enum Service {
ApproxHarmonicCoordinator {
host: SocketAddr,
},
ShortestPathWorker {
host: SocketAddr,
shard: ShardId,
},
ShortestPathCoordinator {
host: SocketAddr,
},
}

impl std::fmt::Display for Service {
Expand Down Expand Up @@ -163,6 +170,12 @@ impl std::fmt::Display for Service {
Self::ApproxHarmonicCoordinator { host } => {
write!(f, "ApproxHarmonicCoordinator {}", host)
}
Self::ShortestPathWorker { host, shard } => {
write!(f, "ShortestPathWorker {} {}", host, shard)
}
Self::ShortestPathCoordinator { host } => {
write!(f, "ShortestPathCoordinator {}", host)
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/entrypoint/ampc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
pub mod approximated_harmonic_centrality;
pub mod dht;
pub mod harmonic_centrality;
pub mod shortest_path;
230 changes: 230 additions & 0 deletions crates/core/src/entrypoint/ampc/shortest_path/coordinator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
// Stract is an open source web search engine.
// Copyright (C) 2024 Stract ApS
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>

use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::path::Path;

use itertools::Itertools;
use url::Url;

use super::mapper::ShortestPathMapper;
use super::{worker::RemoteShortestPathWorker, ShortestPathTables};
use super::{DhtTable as _, Finisher, Meta, Setup, ShortestPathJob};
use crate::ampc::{Coordinator, DefaultDhtTable, DhtConn};
use crate::config::ShortestPathCoordinatorConfig;
use crate::distributed::cluster::Cluster;
use crate::distributed::member::{Member, Service, ShardId};
use crate::webpage::url_ext::UrlExt;
use crate::{webgraph, Result};

pub struct ShortestPathSetup {
dht: DhtConn<ShortestPathTables>,
source: webgraph::NodeID,
}

impl ShortestPathSetup {
pub async fn new(cluster: &Cluster, source: webgraph::NodeID) -> Result<Self> {
let dht_members: Vec<_> = cluster
.members()
.await
.into_iter()
.filter_map(|member| {
if let Service::Dht { host, shard } = member.service {
Some((shard, host))
} else {
None
}
})
.collect();

Ok(Self::new_for_dht_members(&dht_members, source))
}
pub fn new_for_dht_members(
dht_members: &[(ShardId, SocketAddr)],
source: webgraph::NodeID,
) -> Self {
let initial = ShortestPathTables {
distances: DefaultDhtTable::new(dht_members, "distances"),
meta: DefaultDhtTable::new(dht_members, "meta"),
changed_nodes: DefaultDhtTable::new(dht_members, "changed_nodes"),
};

let dht = DhtConn::new(initial);

Self { dht, source }
}
}

impl Setup for ShortestPathSetup {
type DhtTables = ShortestPathTables;

fn init_dht(&self) -> DhtConn<Self::DhtTables> {
self.dht.clone()
}

fn setup_round(&self, dht: &Self::DhtTables) {
dht.meta.set(
(),
Meta {
round_had_changes: false,
},
);
}

fn setup_first_round(&self, dht: &Self::DhtTables) {
dht.distances.set(self.source, 0);
dht.meta.set(
(),
Meta {
round_had_changes: true,
},
);
}
}

pub struct ShortestPathFinish;

impl Finisher for ShortestPathFinish {
type Job = ShortestPathJob;

fn is_finished(&self, dht: &ShortestPathTables) -> bool {
!dht.meta.get(()).unwrap().round_had_changes
}
}

pub fn build(
dht: &[(ShardId, SocketAddr)],
workers: Vec<RemoteShortestPathWorker>,
source: webgraph::NodeID,
) -> Coordinator<ShortestPathJob> {
let setup = ShortestPathSetup::new_for_dht_members(dht, source);

Coordinator::new(setup, workers.clone())
.with_mapper(ShortestPathMapper::RelaxEdges)
.with_mapper(ShortestPathMapper::UpdateChangedNodes)
}

struct ClusterInfo {
// dropping the handle will leave the cluster
_handle: Cluster,
dht: Vec<(ShardId, SocketAddr)>,
workers: Vec<RemoteShortestPathWorker>,
}

async fn setup_gossip(config: ShortestPathCoordinatorConfig) -> Result<ClusterInfo> {
let handle = Cluster::join(
Member::new(Service::ShortestPathCoordinator { host: config.host }),
config.gossip.addr,
config.gossip.seed_nodes.unwrap_or_default(),
)
.await?;

tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;

let members = handle.members().await;

let dht = members
.iter()
.filter_map(|member| {
if let Service::Dht { host, shard } = member.service {
Some((shard, host))
} else {
None
}
})
.collect();

let workers = members
.iter()
.filter_map(|member| {
if let Service::ShortestPathWorker { host, shard } = member.service {
Some(RemoteShortestPathWorker::new(shard, host))
} else {
None
}
})
.collect::<Result<Vec<RemoteShortestPathWorker>>>()?;

Ok(ClusterInfo {
_handle: handle,
dht,
workers,
})
}

pub fn run(config: ShortestPathCoordinatorConfig) -> Result<()> {
let source = webgraph::Node::from(Url::robust_parse(&config.source)?).id();
let tokio_conf = config.clone();
let cluster = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?
.block_on(setup_gossip(tokio_conf))?;

let jobs: Vec<_> = cluster
.workers
.iter()
.map(|worker| ShortestPathJob {
shard: worker.shard(),
source,
})
.collect();

tracing::info!("starting {} jobs", jobs.len());

let coordinator = build(&cluster.dht, cluster.workers.clone(), source);
let res = coordinator.run(jobs, ShortestPathFinish)?;

let output_path = Path::new(&config.output_path);

if !output_path.exists() {
std::fs::create_dir_all(output_path)?;
}

let mut writer = csv::Writer::from_writer(
std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(output_path.join("distances.csv"))?,
);
let mut distances = res.distances.iter().collect::<Vec<_>>();
distances.sort_by_key(|(_id, distance)| *distance);

let id2node: BTreeMap<_, _> = cluster
.workers
.iter()
.flat_map(|w| {
distances
.iter()
.chunks(10_000)
.into_iter()
.flat_map(move |c| {
let ids = c.map(|(id, _)| *id).collect::<Vec<_>>();
w.batch_id2node(ids)
})
.collect::<Vec<_>>()
})
.collect();

for (id, distance) in distances {
if let Some(node) = id2node.get(&id) {
writer.write_record(&[node.as_str().to_string(), distance.to_string()])?;
}
}

Ok(())
}
Loading

0 comments on commit 49abc74

Please sign in to comment.