-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[reconfigurator] initialize clickhouse cluster db schema #7306
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,7 @@ | |
// License, v. 2.0. If a copy of the MPL was not distributed with this | ||
// file, You can obtain one at https://mozilla.org/MPL/2.0/. | ||
|
||
use crate::context::{ServerContext, SingleServerContext}; | ||
use crate::context::{KeeperServerContext, ServerContext}; | ||
use clickhouse_admin_api::*; | ||
use clickhouse_admin_types::{ | ||
ClickhouseKeeperClusterMembership, DistributedDdlQueue, KeeperConf, | ||
|
@@ -24,13 +24,13 @@ pub fn clickhouse_admin_server_api() -> ApiDescription<Arc<ServerContext>> { | |
.expect("registered entrypoints") | ||
} | ||
|
||
pub fn clickhouse_admin_keeper_api() -> ApiDescription<Arc<ServerContext>> { | ||
pub fn clickhouse_admin_keeper_api() -> ApiDescription<Arc<KeeperServerContext>> | ||
{ | ||
clickhouse_admin_keeper_api_mod::api_description::<ClickhouseAdminKeeperImpl>() | ||
.expect("registered entrypoints") | ||
} | ||
|
||
pub fn clickhouse_admin_single_api() -> ApiDescription<Arc<SingleServerContext>> | ||
{ | ||
pub fn clickhouse_admin_single_api() -> ApiDescription<Arc<ServerContext>> { | ||
clickhouse_admin_single_api_mod::api_description::<ClickhouseAdminSingleImpl>() | ||
.expect("registered entrypoints") | ||
} | ||
|
@@ -78,12 +78,55 @@ impl ClickhouseAdminServerApi for ClickhouseAdminServerImpl { | |
ctx.clickhouse_cli().system_timeseries_avg(settings).await?; | ||
Ok(HttpResponseOk(output)) | ||
} | ||
|
||
async fn init_db( | ||
rqctx: RequestContext<Self::Context>, | ||
) -> Result<HttpResponseUpdatedNoContent, HttpError> { | ||
let ctx = rqctx.context(); | ||
let log = ctx.log(); | ||
|
||
// Database initialization is idempotent, but not concurrency-safe. | ||
// Use a mutex to serialize requests. | ||
let lock = ctx.initialization_lock(); | ||
let _guard = lock.lock().await; | ||
|
||
// Initialize the database only if it was not previously initialized. | ||
// TODO: Migrate schema to newer version without wiping data. | ||
let client = ctx.oximeter_client(); | ||
let version = client.read_latest_version().await.map_err(|e| { | ||
HttpError::for_internal_error(format!( | ||
"can't read ClickHouse version: {e}", | ||
)) | ||
})?; | ||
if version == 0 { | ||
info!( | ||
log, | ||
"initializing replicated ClickHouse cluster to version {OXIMETER_VERSION}" | ||
); | ||
ctx.oximeter_client() | ||
.initialize_db_with_version(true, OXIMETER_VERSION) | ||
.await | ||
.map_err(|e| { | ||
HttpError::for_internal_error(format!( | ||
"can't initialize replicated ClickHouse cluster \ | ||
to version {OXIMETER_VERSION}: {e}", | ||
)) | ||
})?; | ||
} else { | ||
info!( | ||
log, | ||
"skipping initialization of replicated ClickHouse cluster at version {version}" | ||
); | ||
} | ||
|
||
Ok(HttpResponseUpdatedNoContent()) | ||
} | ||
} | ||
|
||
enum ClickhouseAdminKeeperImpl {} | ||
|
||
impl ClickhouseAdminKeeperApi for ClickhouseAdminKeeperImpl { | ||
type Context = Arc<ServerContext>; | ||
type Context = Arc<KeeperServerContext>; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call on the rename here ;) |
||
|
||
async fn generate_config_and_enable_svc( | ||
rqctx: RequestContext<Self::Context>, | ||
|
@@ -137,13 +180,13 @@ impl ClickhouseAdminKeeperApi for ClickhouseAdminKeeperImpl { | |
enum ClickhouseAdminSingleImpl {} | ||
|
||
impl ClickhouseAdminSingleApi for ClickhouseAdminSingleImpl { | ||
type Context = Arc<SingleServerContext>; | ||
type Context = Arc<ServerContext>; | ||
|
||
async fn init_db( | ||
rqctx: RequestContext<Self::Context>, | ||
) -> Result<HttpResponseUpdatedNoContent, HttpError> { | ||
let log = &rqctx.log; | ||
let ctx = rqctx.context(); | ||
let log = ctx.log(); | ||
|
||
// Database initialization is idempotent, but not concurrency-safe. | ||
// Use a mutex to serialize requests. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -114,7 +114,7 @@ pub(crate) async fn deploy_nodes( | |
}) | ||
})); | ||
} | ||
for config in server_configs { | ||
for config in &server_configs { | ||
let admin_addr = SocketAddr::V6(SocketAddrV6::new( | ||
config.settings.listen_addr, | ||
CLICKHOUSE_ADMIN_PORT, | ||
|
@@ -160,6 +160,38 @@ pub(crate) async fn deploy_nodes( | |
"Successfully deployed all clickhouse server and keeper configs" | ||
); | ||
|
||
// We only need to initialise the database schema into one of the ClickHouse replica | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this is only true for the initial cluster. If you add a new node, it will not get initialized automatically. Since the operation is idempotent I would go ahead and initialize all the servers. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
ugh, that's a bummer. yep! will change then |
||
// servers as they are all part of the same cluster. | ||
let Some(first_server_config) = server_configs.first() else { | ||
let e = concat!( | ||
"Failed to initialise database schema on the replicated ClickHouse cluster:", | ||
" no replica server configuration file found"); | ||
error!(opctx.log, "{e}",); | ||
return Err(vec![anyhow!(e)]); | ||
}; | ||
|
||
let admin_addr = SocketAddr::V6(SocketAddrV6::new( | ||
first_server_config.settings.listen_addr, | ||
CLICKHOUSE_ADMIN_PORT, | ||
0, | ||
0, | ||
)); | ||
let admin_url = format!("http://{admin_addr}"); | ||
let log = opctx.log.new(slog::o!("admin_url" => admin_url.clone())); | ||
let client = ClickhouseSingleClient::new(&admin_url, log.clone()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. whoops! good catch. Thanks! |
||
let _ = client.init_db().await.map(|_| ()).map_err(|e| { | ||
let err = format!( | ||
"Failed to initialize the replicated ClickHouse cluster database: {e}" | ||
); | ||
error!(opctx.log, "{err}"); | ||
return vec![anyhow!(err)]; | ||
}); | ||
|
||
info!( | ||
opctx.log, | ||
"Successfully initialised the replicated ClickHouse cluster database schema" | ||
); | ||
|
||
Ok(()) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: APIs with bool parameters make it hard to figure out what is being passed in at the callsite. I usually create a temp variable to name the flag then pass that in. I'd suggest that here with something like: