Skip to content

Commit 930c2ce

Browse files
authored
feat(service): schema mismatch disables service (#198)
* feat: refactor State to hide inner vars, expose functionality via methods Adds `State::event_db()` which returns an optional handle to the DB connection pool Adds `State::schema_version_check()` which retuns the result of the schema version check query Adds `State::is_schema_version_status(svs)` which compares the inner value with the argument Adds `State::set_schema_version_status(svs)` which sets the inner value with the argument * feat: add SchemaVersionValidation middleware Add middleware type to check the State's schema version status variable, if a mismatch is detected, a `503 Service unavailable` response is returned. * fix: update GET /health/ready endpoint * feat: update endpoint to use 'State::event_db()' method * feat: add SchemaVersionValidation middleware to existing API endpoints * fix: spelling
1 parent 63df50d commit 930c2ce

File tree

9 files changed

+193
-56
lines changed

9 files changed

+193
-56
lines changed

catalyst-gateway/bin/src/service/api/health/ready_get.rs

+14-13
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use poem_extensions::{
99
};
1010

1111
use crate::{
12+
cli::Error,
13+
event_db::error::Error as DBError,
1214
service::common::responses::{
1315
resp_2xx::NoContent,
1416
resp_5xx::{server_error, ServerError, ServiceUnavailable},
@@ -48,23 +50,22 @@ pub(crate) type AllResponses = response! {
4850
/// but unlikely)
4951
/// * 503 Service Unavailable - Service is not ready, do not send other requests.
5052
pub(crate) async fn endpoint(state: Data<&Arc<State>>) -> AllResponses {
51-
match state.event_db.schema_version_check().await {
53+
match state.schema_version_check().await {
5254
Ok(_) => {
5355
tracing::debug!("DB schema version status ok");
54-
if let Ok(mut g) = state.schema_version_status.lock() {
55-
*g = SchemaVersionStatus::Ok;
56-
}
56+
state.set_schema_version_status(SchemaVersionStatus::Ok);
5757
T204(NoContent)
5858
},
59-
Err(crate::event_db::error::Error::TimedOut) => T503(ServiceUnavailable),
60-
Err(err) => {
61-
tracing::error!("DB schema version status mismatch");
62-
if let Ok(mut g) = state.schema_version_status.lock() {
63-
*g = SchemaVersionStatus::Mismatch;
64-
T503(ServiceUnavailable)
65-
} else {
66-
T500(server_error!("{}", err.to_string()))
67-
}
59+
Err(Error::EventDb(DBError::MismatchedSchema { was, expected })) => {
60+
tracing::error!(
61+
expected = expected,
62+
current = was,
63+
"DB schema version status mismatch"
64+
);
65+
state.set_schema_version_status(SchemaVersionStatus::Mismatch);
66+
T503(ServiceUnavailable)
6867
},
68+
Err(Error::EventDb(DBError::TimedOut)) => T503(ServiceUnavailable),
69+
Err(err) => T500(server_error!("{}", err.to_string())),
6970
}
7071
}

catalyst-gateway/bin/src/service/api/registration/mod.rs

+37-29
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::sync::Arc;
44
use poem::web::Data;
55
use poem_extensions::{
66
response,
7-
UniResponse::{T200, T404, T500},
7+
UniResponse::{T200, T404, T500, T503},
88
};
99
use poem_openapi::{
1010
param::{Path, Query},
@@ -13,17 +13,20 @@ use poem_openapi::{
1313
};
1414

1515
use crate::{
16-
service::common::{
17-
objects::{
18-
event_id::EventId, voter_registration::VoterRegistration,
19-
voting_public_key::VotingPublicKey,
20-
},
21-
responses::{
22-
resp_2xx::OK,
23-
resp_4xx::NotFound,
24-
resp_5xx::{server_error, ServerError},
16+
service::{
17+
common::{
18+
objects::{
19+
event_id::EventId, voter_registration::VoterRegistration,
20+
voting_public_key::VotingPublicKey,
21+
},
22+
responses::{
23+
resp_2xx::OK,
24+
resp_4xx::NotFound,
25+
resp_5xx::{server_error, ServerError, ServiceUnavailable},
26+
},
27+
tags::ApiTags,
2528
},
26-
tags::ApiTags,
29+
utilities::middleware::schema_validation::schema_version_validation,
2730
},
2831
state::State,
2932
};
@@ -36,7 +39,8 @@ impl RegistrationApi {
3639
#[oai(
3740
path = "/voter/:voting_key",
3841
method = "get",
39-
operation_id = "getVoterInfo"
42+
operation_id = "getVoterInfo",
43+
transform = "schema_version_validation"
4044
)]
4145
/// Voter's info
4246
///
@@ -65,24 +69,28 @@ impl RegistrationApi {
6569
200: OK<Json<VoterRegistration>>,
6670
404: NotFound,
6771
500: ServerError,
72+
503: ServiceUnavailable,
6873
} {
69-
let voter = pool
70-
.event_db
71-
.get_voter(
72-
&event_id.0.map(Into::into),
73-
voting_key.0 .0,
74-
*with_delegators,
75-
)
76-
.await;
77-
match voter {
78-
Ok(voter) => {
79-
match voter.try_into() {
80-
Ok(voter) => T200(OK(Json(voter))),
81-
Err(err) => T500(server_error!("{}", err.to_string())),
82-
}
83-
},
84-
Err(crate::event_db::error::Error::NotFound(_)) => T404(NotFound),
85-
Err(err) => T500(server_error!("{}", err.to_string())),
74+
if let Ok(event_db) = pool.event_db() {
75+
let voter = event_db
76+
.get_voter(
77+
&event_id.0.map(Into::into),
78+
voting_key.0 .0,
79+
*with_delegators,
80+
)
81+
.await;
82+
match voter {
83+
Ok(voter) => {
84+
match voter.try_into() {
85+
Ok(voter) => T200(OK(Json(voter))),
86+
Err(err) => T500(server_error!("{}", err.to_string())),
87+
}
88+
},
89+
Err(crate::event_db::error::Error::NotFound(_)) => T404(NotFound),
90+
Err(err) => T500(server_error!("{}", err.to_string())),
91+
}
92+
} else {
93+
T503(ServiceUnavailable)
8694
}
8795
}
8896
}

catalyst-gateway/bin/src/service/api/v0/mod.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@ use std::sync::Arc;
55
use poem::web::Data;
66
use poem_openapi::{payload::Binary, OpenApi};
77

8-
use crate::{service::common::tags::ApiTags, state::State};
8+
use crate::{
9+
service::{
10+
common::tags::ApiTags, utilities::middleware::schema_validation::schema_version_validation,
11+
},
12+
state::State,
13+
};
914

1015
mod message_post;
1116
mod plans_get;
@@ -25,7 +30,8 @@ impl V0Api {
2530
#[oai(
2631
path = "/vote/active/plans",
2732
method = "get",
28-
operation_id = "GetActivePlans"
33+
operation_id = "GetActivePlans",
34+
transform = "schema_version_validation"
2935
)]
3036
async fn plans_get(&self, state: Data<&Arc<State>>) -> plans_get::AllResponses {
3137
plans_get::endpoint(state).await

catalyst-gateway/bin/src/service/api/v1/mod.rs

+10-4
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@ use poem::web::{Data, Path};
66
use poem_openapi::{param::Query, payload::Json, OpenApi};
77

88
use crate::{
9-
service::common::{
10-
objects::{account_votes::AccountId, fragments_batch::FragmentsBatch},
11-
tags::ApiTags,
9+
service::{
10+
common::{
11+
objects::{account_votes::AccountId, fragments_batch::FragmentsBatch},
12+
tags::ApiTags,
13+
},
14+
utilities::middleware::schema_validation::schema_version_validation,
1215
},
1316
state::State,
1417
};
@@ -25,7 +28,8 @@ impl V1Api {
2528
#[oai(
2629
path = "/votes/plan/account-votes/:account_id",
2730
method = "get",
28-
operation_id = "AccountVotes"
31+
operation_id = "AccountVotes",
32+
transform = "schema_version_validation"
2933
)]
3034
/// Get from all active vote plans, the index of the voted proposals
3135
/// by th given account ID.
@@ -43,6 +47,7 @@ impl V1Api {
4347
method = "post",
4448
operation_id = "fragments",
4549
tag = "ApiTags::Fragments",
50+
transform = "schema_version_validation",
4651
deprecated = true
4752
)]
4853
async fn fragments_post(
@@ -61,6 +66,7 @@ impl V1Api {
6166
method = "get",
6267
operation_id = "fragmentsStatuses",
6368
tag = "ApiTags::Fragments",
69+
transform = "schema_version_validation",
6470
deprecated = true
6571
)]
6672
async fn fragments_statuses(

catalyst-gateway/bin/src/service/common/responses/resp_5xx.rs

+6
Original file line numberDiff line numberDiff line change
@@ -93,3 +93,9 @@ impl ResponseError for ServerError {
9393
///
9494
/// #### NO DATA BODY IS RETURNED FOR THIS RESPONSE
9595
pub(crate) struct ServiceUnavailable;
96+
97+
impl ResponseError for ServiceUnavailable {
98+
fn status(&self) -> StatusCode {
99+
StatusCode::SERVICE_UNAVAILABLE
100+
}
101+
}

catalyst-gateway/bin/src/service/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ pub(crate) enum Error {
2323
/// An IO error has occurred
2424
#[error(transparent)]
2525
Io(#[from] std::io::Error),
26+
/// A mismatch in the expected EventDB schema version
27+
#[error("expected schema version mismatch")]
28+
SchemaVersionMismatch,
2629
}
2730

2831
/// # Run Catalyst Gateway Service.
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
//! Custom POEM Middleware for this service.
22
3+
pub(crate) mod schema_validation;
34
pub(crate) mod tracing_mw;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
//! Middleware to verify the status of the last DB schema version validation.
2+
//!
3+
//! If a mismatch is detected, the middleware returns an error with `ServiceUnavailable`
4+
//! status code (503). Otherwise, the middleware calls and returns the wrapped endpoint's
5+
//! response.
6+
//!
7+
//! This middleware checks the `State.schema_version_status` value, if it is Ok,
8+
//! the wrapped endpoint is called and its response is returned.
9+
10+
use std::sync::Arc;
11+
12+
use poem::{web::Data, Endpoint, EndpointExt, Middleware, Request, Result};
13+
14+
use crate::{
15+
service::common::responses::resp_5xx::ServiceUnavailable,
16+
state::{SchemaVersionStatus, State},
17+
};
18+
19+
/// A middleware that raises an error with `ServiceUnavailable` and 503 status code
20+
/// if a DB schema version mismatch is found the existing `State`.
21+
pub(crate) struct SchemaVersionValidation;
22+
23+
impl<E: Endpoint> Middleware<E> for SchemaVersionValidation {
24+
type Output = SchemaVersionValidationImpl<E>;
25+
26+
fn transform(&self, ep: E) -> Self::Output {
27+
SchemaVersionValidationImpl { ep }
28+
}
29+
}
30+
31+
/// The new endpoint type generated by the `SchemaVersionValidation`.
32+
pub(crate) struct SchemaVersionValidationImpl<E> {
33+
/// Endpoint wrapped by the middleware.
34+
ep: E,
35+
}
36+
37+
#[poem::async_trait]
38+
impl<E: Endpoint> Endpoint for SchemaVersionValidationImpl<E> {
39+
type Output = E::Output;
40+
41+
async fn call(&self, req: Request) -> Result<Self::Output> {
42+
if let Some(state) = req.data::<Data<&Arc<State>>>() {
43+
// Check if the inner schema version status is set to `Mismatch`,
44+
// if so, return the `ServiceUnavailable` error, which implements
45+
// `ResponseError`, with status code `503`.
46+
// Otherwise, return the endpoint as usual.
47+
if state.is_schema_version_status(&SchemaVersionStatus::Mismatch) {
48+
return Err(ServiceUnavailable.into());
49+
}
50+
}
51+
// Calls the endpoint with the request, and returns the response.
52+
self.ep.call(req).await
53+
}
54+
}
55+
56+
/// A function that wraps an endpoint with the `SchemaVersionValidation`.
57+
///
58+
/// This function is convenient to use with `poem-openapi` [operation parameters](https://docs.rs/poem-openapi/latest/poem_openapi/attr.OpenApi.html#operation-parameters) via the
59+
/// `transform` attribute.
60+
pub(crate) fn schema_version_validation(ep: impl Endpoint) -> impl Endpoint {
61+
ep.with(SchemaVersionValidation)
62+
}

catalyst-gateway/bin/src/state/mod.rs

+52-8
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
//! Shared state used by all endpoints.
2-
use std::sync::{Arc, Mutex};
2+
use std::sync::{Arc, Mutex, MutexGuard};
33

44
use crate::{
55
cli::Error,
66
event_db::{establish_connection, queries::EventDbQueries},
7+
service::Error as ServiceError,
78
};
89

910
/// The status of the expected DB schema version.
11+
#[derive(Debug, PartialEq, Eq)]
1012
pub(crate) enum SchemaVersionStatus {
1113
/// The current DB schema version matches what is expected.
1214
Ok,
@@ -23,10 +25,10 @@ pub(crate) struct State {
2325
/// This is Private, it needs to be accessed with a function.
2426
// event_db_handle: Arc<ArcSwap<Option<dyn EventDbQueries>>>,
2527
// Private need to get it with a function.
26-
pub(crate) event_db: Arc<dyn EventDbQueries>, /* This needs to be obsoleted, we want the DB
27-
* to be able to be down. */
28+
event_db: Arc<dyn EventDbQueries>, /* This needs to be obsoleted, we want the DB
29+
* to be able to be down. */
2830
/// Status of the last DB schema version check.
29-
pub(crate) schema_version_status: Mutex<SchemaVersionStatus>,
31+
schema_version_status: Mutex<SchemaVersionStatus>,
3032
}
3133

3234
impl State {
@@ -49,8 +51,50 @@ impl State {
4951
Ok(state)
5052
}
5153

52-
// pub(crate) async fn event_db(&self) -> Option<Arc<dyn EventDbQueries>> {
53-
//
54-
//
55-
// }
54+
/// Get the reference to the database connection pool for `EventDB`.
55+
pub(crate) fn event_db(&self) -> Result<Arc<dyn EventDbQueries>, Error> {
56+
let guard = self.schema_version_status_lock();
57+
match *guard {
58+
SchemaVersionStatus::Ok => Ok(self.event_db.clone()),
59+
SchemaVersionStatus::Mismatch => Err(ServiceError::SchemaVersionMismatch.into()),
60+
}
61+
}
62+
63+
/// Check the DB schema version matches the one expected by the service.
64+
pub(crate) async fn schema_version_check(&self) -> Result<i32, Error> {
65+
Ok(self.event_db.schema_version_check().await?)
66+
}
67+
68+
/// Compare the `State`'s inner value with a given `&SchemaVersionStatus`, returns
69+
/// `bool`.
70+
pub(crate) fn is_schema_version_status(&self, svs: &SchemaVersionStatus) -> bool {
71+
let guard = self.schema_version_status_lock();
72+
&*guard == svs
73+
}
74+
75+
/// Set the state's `SchemaVersionStatus`.
76+
pub(crate) fn set_schema_version_status(&self, svs: SchemaVersionStatus) {
77+
let mut guard = self.schema_version_status_lock();
78+
tracing::debug!(
79+
status = format!("{:?}", svs),
80+
"db schema version status was set"
81+
);
82+
*guard = svs;
83+
}
84+
85+
/// Get the `MutexGuard<SchemaVersionStatus>` from inner the variable.
86+
///
87+
/// Handle poisoned mutex by recovering the guard, and tracing the error.
88+
fn schema_version_status_lock(&self) -> MutexGuard<SchemaVersionStatus> {
89+
match self.schema_version_status.lock() {
90+
Ok(guard) => guard,
91+
Err(poisoned) => {
92+
tracing::error!(
93+
error = format!("{:?}", poisoned),
94+
"recovering DB schema version status fom poisoned mutex"
95+
);
96+
poisoned.into_inner()
97+
},
98+
}
99+
}
56100
}

0 commit comments

Comments
 (0)