Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: split up candidate registration and keep alive #5320

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/meta-srv/src/election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub const ELECTION_KEY: &str = "__metasrv_election";
pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";

pub(crate) const CANDIDATE_LEASE_SECS: u64 = 600;
const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2;
pub(crate) const CANDIDATE_KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2;

/// Messages sent when the leader changes.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -125,6 +125,9 @@ pub trait Election: Send + Sync {
/// Registers a candidate for the election.
async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()>;

/// Keep alive the candidate lease.
async fn candidate_keep_alive(&self, node_info: &MetasrvNodeInfo) -> Result<()>;

/// Gets all candidates in the election.
async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>>;

Expand Down
8 changes: 6 additions & 2 deletions src/meta-srv/src/election/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tokio::time::{timeout, MissedTickBehavior};

use crate::election::{
listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT,
CANDIDATE_LEASE_SECS, ELECTION_KEY, KEEP_ALIVE_INTERVAL_SECS,
CANDIDATE_KEEP_ALIVE_INTERVAL_SECS, CANDIDATE_LEASE_SECS, ELECTION_KEY,
};
use crate::error;
use crate::error::Result;
Expand Down Expand Up @@ -153,7 +153,7 @@ impl Election for EtcdElection {
.context(error::EtcdFailedSnafu)?;

let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS));
tokio::time::interval(Duration::from_secs(CANDIDATE_KEEP_ALIVE_INTERVAL_SECS));

loop {
let _ = keep_alive_interval.tick().await;
Expand All @@ -170,6 +170,10 @@ impl Election for EtcdElection {
Ok(())
}

async fn candidate_keep_alive(&self, _node_info: &MetasrvNodeInfo) -> Result<()> {
unimplemented!("Etcd keeps the candidate alive in register_candidate, so we don't need to call candidate_keep_alive.")
}

async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
let key = self.candidate_root().into_bytes();
let res = self
Expand Down
61 changes: 36 additions & 25 deletions src/meta-srv/src/election/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ impl Election for PgElection {
.is_ok()
}

/// TODO(CookiePie): Split the candidate registration and keep alive logic into separate methods, so that upper layers can call them separately.
async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
let key = self.candidate_key();
let node_info =
Expand All @@ -297,37 +296,42 @@ impl Election for PgElection {
.await?;
// May registered before, just update the lease.
if !res {
warn!("Candidate lease exists. Now remove previous lease and register again.");
self.delete_value(&key).await?;
self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs)
.await?;
}

// Check if the current lease has expired and renew the lease.
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(self.candidate_lease_ttl_secs / 2));
loop {
let _ = keep_alive_interval.tick().await;
Ok(())
}

let (_, prev_expire_time, current_time, origin) = self
.get_value_with_lease(&key, true)
.await?
.unwrap_or_default();

ensure!(
prev_expire_time > current_time,
UnexpectedSnafu {
violated: format!(
"Candidate lease expired, key: {:?}",
String::from_utf8_lossy(&key.into_bytes())
),
}
);
async fn candidate_keep_alive(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
let key = self.candidate_key();
let node_info =
serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
input: format!("{node_info:?}"),
})?;
let (_, prev_expire_time, current_time, origin) = self
.get_value_with_lease(&key, true)
.await?
.unwrap_or_default();

// Safety: origin is Some since we are using `get_value_with_lease` with `true`.
let origin = origin.unwrap();
self.update_value_with_lease(&key, &origin, &node_info)
.await?;
}
ensure!(
prev_expire_time > current_time,
UnexpectedSnafu {
violated: format!(
"Candidate lease expired, key: {:?}",
String::from_utf8_lossy(&key.into_bytes())
),
}
);

// Safety: origin is Some since we are using `get_value_with_lease` with `true`.
let origin = origin.unwrap();
self.update_value_with_lease(&key, &origin, &node_info)
.await?;

Ok(())
}

async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
Expand Down Expand Up @@ -729,6 +733,7 @@ mod tests {
use tokio_postgres::{Client, NoTls};

use super::*;
use crate::election::CANDIDATE_KEEP_ALIVE_INTERVAL_SECS;
use crate::error::PostgresExecutionSnafu;

async fn create_postgres_client(table_name: Option<&str>) -> Result<Client> {
Expand Down Expand Up @@ -864,7 +869,13 @@ mod tests {
git_commit: "test_git_commit".to_string(),
start_time_ms: 0,
};

pg_election.register_candidate(&node_info).await.unwrap();

loop {
pg_election.candidate_keep_alive(&node_info).await.unwrap();
tokio::time::sleep(Duration::from_secs(CANDIDATE_KEEP_ALIVE_INTERVAL_SECS)).await;
}
}

#[tokio::test]
Expand Down
17 changes: 14 additions & 3 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use table::metadata::TableId;
use tokio::sync::broadcast::error::RecvError;

use crate::cluster::MetaPeerClientRef;
use crate::election::{Election, LeaderChangeMessage};
use crate::election::{Election, LeaderChangeMessage, CANDIDATE_KEEP_ALIVE_INTERVAL_SECS};
use crate::error::{
self, InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu,
StartTelemetryTaskSnafu, StopProcedureManagerSnafu,
Expand Down Expand Up @@ -494,9 +494,20 @@ impl Metasrv {
let node_info = self.node_info();
let _handle = common_runtime::spawn_global(async move {
while started.load(Ordering::Relaxed) {
let res = election.register_candidate(&node_info).await;
if let Err(e) = res {
if let Err(e) = election.register_candidate(&node_info).await {
warn!(e; "Metasrv register candidate error");
} else {
break;
}
}
while started.load(Ordering::Relaxed) {
let mut keep_alive_interval = tokio::time::interval(Duration::from_secs(
CANDIDATE_KEEP_ALIVE_INTERVAL_SECS,
));
keep_alive_interval.tick().await;
let res = election.candidate_keep_alive(&node_info).await;
if let Err(e) = res {
warn!(e; "Metasrv keep lease error");
}
}
});
Expand Down
Loading