Skip to content

Commit

Permalink
Implement simple networking layer for RaftMetadataStore
Browse files Browse the repository at this point in the history
This fixes restatedev#1803.
  • Loading branch information
tillrohrmann committed Aug 20, 2024
1 parent a6d99f7 commit cc49b41
Show file tree
Hide file tree
Showing 15 changed files with 551 additions and 25 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ where
}
}
Ok(cluster_state) = cluster_state_watcher.next_cluster_state() => {
scheduler.on_cluster_state_update(cluster_state).await?;
if let Err(err) = scheduler.on_cluster_state_update(cluster_state).await {
warn!("Could not perform scheduling operation: {err}");
}
}
Some(cmd) = self.command_rx.recv() => {
self.on_cluster_cmd(cmd, bifrost_admin).await;
Expand Down
1 change: 1 addition & 0 deletions crates/metadata-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ async-trait = { workspace = true }
bytes = { workspace = true }
bytestring = { workspace = true }
derive_builder = { workspace = true }
derive_more = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
humantime = { workspace = true }
Expand Down
6 changes: 6 additions & 0 deletions crates/metadata-store/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,11 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.protoc_arg("--experimental_allow_proto3_optional")
.compile(&["./proto/metadata_store_svc.proto"], &["proto"])?;

tonic_build::configure()
.bytes(["."])
.file_descriptor_set_path(out_dir.join("raft_metadata_store_svc.bin"))
.protoc_arg("--experimental_allow_proto3_optional")
.compile(&["./proto/raft_metadata_store_svc.proto"], &["proto"])?;

Ok(())
}
24 changes: 24 additions & 0 deletions crates/metadata-store/proto/raft_metadata_store_svc.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate service protocol, which is
// released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/proto/blob/main/LICENSE

syntax = "proto3";

import "google/protobuf/empty.proto";

package dev.restate.raft_metadata_store_svc;

// Grpc service definition for the RaftMetadataStore implementation.
service RaftMetadataStoreSvc {
rpc Raft(stream RaftMessage) returns (stream RaftMessage);
}

message RaftMessage {
bytes message = 1;
}

192 changes: 192 additions & 0 deletions crates/metadata-store/src/raft/connection_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::raft::grpc_svc::RaftMessage;
use futures::StreamExt;
use protobuf::Message as ProtobufMessage;
use raft::prelude::Message;
use restate_core::{cancellation_watcher, ShutdownError, TaskCenter, TaskKind};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio_stream::wrappers::ReceiverStream;
use tonic::codegen::BoxStream;
use tracing::{debug, instrument};

#[derive(Debug, thiserror::Error)]
pub enum ConnectionError {
#[error("internal error: {0}")]
Internal(String),
#[error(transparent)]
Shutdown(#[from] ShutdownError),
}

#[derive(Clone, derive_more::Debug)]
pub struct ConnectionManager {
inner: Arc<ConnectionManagerInner>,
#[debug(skip)]
task_center: TaskCenter,
}

impl ConnectionManager {
pub fn new(task_center: TaskCenter, identity: u64, router: mpsc::Sender<Message>) -> Self {
ConnectionManager {
inner: Arc::new(ConnectionManagerInner::new(identity, router)),
task_center,
}
}

pub fn identity(&self) -> u64 {
self.inner.identity
}

pub fn accept_connection(
&self,
raft_peer: u64,
incoming_rx: tonic::Streaming<RaftMessage>,
) -> Result<BoxStream<RaftMessage>, ConnectionError> {
let (outgoing_tx, outgoing_rx) = mpsc::channel(128);
self.run_connection(raft_peer, outgoing_tx, incoming_rx)?;

let outgoing_stream = ReceiverStream::new(outgoing_rx)
.map(Result::<_, tonic::Status>::Ok)
.boxed();
Ok(outgoing_stream)
}

pub fn run_connection(
&self,
remote_peer: u64,
outgoing_tx: mpsc::Sender<RaftMessage>,
incoming_rx: tonic::Streaming<RaftMessage>,
) -> Result<(), ConnectionError> {
let mut guard = self.inner.connections.lock().unwrap();

if guard.contains_key(&remote_peer) {
// we already have a connection established to remote peer
return Ok(());
}

let connection = Connection::new(outgoing_tx);
guard.insert(remote_peer, connection);

let reactor = ConnectionReactor {
remote_peer,
connection_manager: Arc::clone(&self.inner),
};

let _task_id = self.task_center.spawn_child(
TaskKind::ConnectionReactor,
"raft-connection-reactor",
None,
reactor.run(incoming_rx),
)?;

Ok(())
}

pub fn get_connection(&self, target: u64) -> Option<Connection> {
self.inner.connections.lock().unwrap().get(&target).cloned()
}
}

struct ConnectionReactor {
remote_peer: u64,
connection_manager: Arc<ConnectionManagerInner>,
}

impl ConnectionReactor {
#[instrument(level = "debug", skip_all, fields(remote_peer = %self.remote_peer))]
async fn run(self, mut incoming_rx: tonic::Streaming<RaftMessage>) -> anyhow::Result<()> {
let mut shutdown = std::pin::pin!(cancellation_watcher());
debug!("Run connection reactor");

loop {
tokio::select! {
_ = &mut shutdown => {
break;
},
message = incoming_rx.next() => {
match message {
Some(message) => {
match message {
Ok(message) => {
let message = Message::parse_from_carllerche_bytes(&message.message)?;

assert_eq!(message.to, self.connection_manager.identity, "Expect to only receive messages for peer '{}'", self.connection_manager.identity);

if self.connection_manager.router.send(message).await.is_err() {
// system is shutting down
debug!("System is shutting down; closing connection");
break;
}
}
Err(err) => {
debug!("Closing connection because received error: {err}");
break;
}
}
}
None => {
debug!("Remote peer closed connection");
break
},
}
}
}
}

Ok(())
}
}

impl Drop for ConnectionReactor {
fn drop(&mut self) {
debug!(remote_peer = %self.remote_peer, "Close connection");
self.connection_manager
.connections
.lock()
.expect("shouldn't be poisoned")
.remove(&self.remote_peer);
}
}

#[derive(Debug)]
struct ConnectionManagerInner {
identity: u64,
connections: Mutex<HashMap<u64, Connection>>,
router: mpsc::Sender<Message>,
}

impl ConnectionManagerInner {
pub fn new(identity: u64, router: mpsc::Sender<Message>) -> Self {
ConnectionManagerInner {
identity,
router,
connections: Mutex::default(),
}
}
}

#[derive(Debug, Clone)]
pub struct Connection {
tx: mpsc::Sender<RaftMessage>,
}

impl Connection {
pub fn new(tx: mpsc::Sender<RaftMessage>) -> Self {
Connection { tx }
}

pub fn try_send(&self, message: RaftMessage) -> Result<(), TrySendError<RaftMessage>> {
self.tx.try_send(message)
}
}
14 changes: 14 additions & 0 deletions crates/metadata-store/src/raft/grpc_svc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

tonic::include_proto!("dev.restate.raft_metadata_store_svc");

pub const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("raft_metadata_store_svc");
67 changes: 67 additions & 0 deletions crates/metadata-store/src/raft/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::raft::connection_manager::{ConnectionError, ConnectionManager};
use crate::raft::grpc_svc::raft_metadata_store_svc_server::RaftMetadataStoreSvc;
use crate::raft::grpc_svc::RaftMessage;
use std::str::FromStr;
use tonic::codegen::BoxStream;
use tonic::{Request, Response, Status, Streaming};

pub const RAFT_PEER_METADATA_KEY: &str = "x-restate-raft-peer";

#[derive(Debug)]
pub struct RaftMetadataStoreHandler {
connection_manager: ConnectionManager,
}

impl RaftMetadataStoreHandler {
pub fn new(connection_manager: ConnectionManager) -> Self {
Self { connection_manager }
}
}

#[async_trait::async_trait]
impl RaftMetadataStoreSvc for RaftMetadataStoreHandler {
type RaftStream = BoxStream<RaftMessage>;

async fn raft(
&self,
request: Request<Streaming<RaftMessage>>,
) -> Result<Response<Self::RaftStream>, Status> {
let raft_peer_metadata =
request
.metadata()
.get(RAFT_PEER_METADATA_KEY)
.ok_or(Status::invalid_argument(format!(
"'{}' is missing",
RAFT_PEER_METADATA_KEY
)))?;
let raft_peer = u64::from_str(
raft_peer_metadata
.to_str()
.map_err(|err| Status::invalid_argument(err.to_string()))?,
)
.map_err(|err| Status::invalid_argument(err.to_string()))?;
let outgoing_rx = self
.connection_manager
.accept_connection(raft_peer, request.into_inner())?;
Ok(Response::new(outgoing_rx))
}
}

impl From<ConnectionError> for Status {
fn from(value: ConnectionError) -> Self {
match value {
ConnectionError::Internal(err) => Status::internal(err),
ConnectionError::Shutdown(err) => Status::aborted(err.to_string()),
}
}
}
4 changes: 4 additions & 0 deletions crates/metadata-store/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod connection_manager;
pub mod grpc_svc;
mod handler;
mod networking;
pub mod service;
mod storage;
mod store;
Loading

0 comments on commit cc49b41

Please sign in to comment.