Skip to content

Commit

Permalink
Migrate bookmarks validator to mononoke_app and clap 4
Browse files Browse the repository at this point in the history
Summary:
## This stack
Migrate bookmarks validator to clap 4 + mononoke_app.

## This diff

Migrates bookmarks_validator binary to use the combination of mononoke_app and clap 4.

NOTE: I would normally split this into multiple diffs. But in practice, it's very complicated to do it because in order to use clap 4, I need to use mononoke_app instead of the old cmdlib library, so I did it all at once.

I would appreciate review specially from RajivTS and markbt, who have more context on shard manager process/executor and old/new versions of clap and cmdlib, respectively

Reviewed By: RajivTS

Differential Revision: D68889082

fbshipit-source-id: b3a2d06c6ba633b9ab404988442d46e04392b3cc
  • Loading branch information
gustavoavena authored and facebook-github-bot committed Jan 31, 2025
1 parent 8ccaf24 commit 7c70efc
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 128 deletions.
6 changes: 3 additions & 3 deletions eden/mononoke/commit_rewriting/bookmarks_validator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ anyhow = "1.0.95"
async-trait = "0.1.71"
blobstore_factory = { version = "0.1.0", path = "../../blobstore/factory" }
bookmarks = { version = "0.1.0", path = "../../bookmarks" }
cmdlib = { version = "0.1.0", path = "../../cmdlib" }
cmdlib_x_repo = { version = "0.1.0", path = "../../cmdlib/x_repo" }
clap = { version = "4.5.20", features = ["derive", "env", "string", "unicode", "wrap_help"] }
cmdlib_cross_repo = { version = "0.1.0", path = "../../cmdlib/cross_repo" }
context = { version = "0.1.0", path = "../../server/context" }
cross_repo_sync = { version = "0.1.0", path = "../cross_repo_sync" }
environment = { version = "0.1.0", path = "../../cmdlib/environment" }
Expand All @@ -22,10 +22,10 @@ fbinit = { version = "0.2.0", git = "https://github.com/facebookexperimental/rus
futures = { version = "0.3.30", features = ["async-await", "compat"] }
justknobs = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
metadata = { version = "0.1.0", path = "../../server/metadata" }
mononoke_app = { version = "0.1.0", path = "../../cmdlib/mononoke_app" }
mononoke_types = { version = "0.1.0", path = "../../mononoke_types" }
pushredirect = { version = "0.1.0", path = "../../megarepo_api/pushredirect" }
sapling-clientinfo = { version = "0.1.0", path = "../../../scm/lib/clientinfo" }
scuba_ext = { version = "0.1.0", path = "../../common/scuba_ext" }
sharding_ext = { version = "0.1.0", path = "../../cmdlib/sharding_ext" }
slog = { version = "2.7", features = ["max_level_trace", "nested-values"] }
stats = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
Expand Down
247 changes: 122 additions & 125 deletions eden/mononoke/commit_rewriting/bookmarks_validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,20 @@
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::OnceLock;
use std::time::Duration;

use anyhow::bail;
use anyhow::format_err;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
use async_trait::async_trait;
use blobstore_factory::MetadataSqlFactory;
use bookmarks::BookmarkKey;
use bookmarks::Freshness;
use clap::Parser;
use clientinfo::ClientEntryPoint;
use clientinfo::ClientInfo;
use cmdlib::args;
use cmdlib::args::MononokeMatches;
use cmdlib::helpers;
use cmdlib_x_repo::create_commit_syncers_from_matches;
use cmdlib_cross_repo::create_commit_syncers_from_app;
use context::CoreContext;
use context::SessionContainer;
use cross_repo_sync::find_bookmark_diff;
Expand All @@ -34,21 +31,25 @@ use cross_repo_sync::CommitSyncer;
use cross_repo_sync::Repo as CrossRepo;
use cross_repo_sync::Syncers;
use environment::MononokeEnvironment;
use executor_lib::args::ShardedExecutorArgs;
use executor_lib::RepoShardedProcess;
use executor_lib::RepoShardedProcessExecutor;
use executor_lib::ShardedProcessExecutor;
use fbinit::FacebookInit;
use futures::future;
use futures::TryStreamExt;
use metadata::Metadata;
use mononoke_app::args::OptSourceAndTargetRepoArgs;
use mononoke_app::args::SourceAndTargetRepoArgs;
use mononoke_app::monitoring::AliveService;
use mononoke_app::monitoring::MonitoringAppExtension;
use mononoke_app::MononokeApp;
use mononoke_app::MononokeAppBuilder;
use mononoke_types::ChangesetId;
use pushredirect::PushRedirectionConfig;
use pushredirect::SqlPushRedirectionConfigBuilder;
use scuba_ext::MononokeScubaSampleBuilder;
use sharding_ext::RepoShard;
use slog::error;
use slog::info;
use slog::Logger;
use stats::prelude::*;

define_stats! {
Expand All @@ -59,38 +60,36 @@ define_stats! {
),
}

const SM_SERVICE_SCOPE: &str = "global";
const SM_CLEANUP_TIMEOUT_SECS: u64 = 120;
const APP_NAME: &str = "megarepo_bookmarks_validator";

type Repo = cross_repo_sync::ConcreteRepo;

#[derive(Debug, Parser)]
#[clap(about = "Tool to validate that small and large repo bookmarks are in sync")]
pub struct BookmarkValidatorArgs {
#[clap(flatten)]
pub sharded_executor_args: ShardedExecutorArgs,
#[clap(flatten, next_help_heading = "CROSS REPO OPTIONS")]
pub repo_args: OptSourceAndTargetRepoArgs,
}

/// Struct representing the Bookmark Validate BP.
pub struct BookmarkValidateProcess {
matches: Arc<MononokeMatches<'static>>,
fb: FacebookInit,
ctx: Arc<CoreContext>,
pub(crate) app: Arc<MononokeApp>,
}

impl BookmarkValidateProcess {
fn new(fb: FacebookInit) -> anyhow::Result<Self> {
let app_name = "Tool to validate that small and large repo bookmarks are in sync";
let app = args::MononokeAppBuilder::new(app_name)
.with_source_and_target_repos()
.with_dynamic_repos()
.with_fb303_args()
.build();
let (matches, _runtime) = app.get_matches(fb)?;
let matches = Arc::new(matches);
Ok(Self { matches, fb })
fn new(ctx: Arc<CoreContext>, app: Arc<MononokeApp>) -> Self {
Self { app, ctx }
}
}

#[async_trait]
impl RepoShardedProcess for BookmarkValidateProcess {
async fn setup(&self, repo: &RepoShard) -> anyhow::Result<Arc<dyn RepoShardedProcessExecutor>> {
let logger = self.matches.logger();
let env = self.matches.environment();
// For bookmark validator, two repos (i.e. source and target) are required as input
async fn setup(&self, repo: &RepoShard) -> Result<Arc<dyn RepoShardedProcessExecutor>> {
let logger = self.ctx.logger();

let source_repo_name = repo.repo_name.clone();
let target_repo_name = match repo.target_repo_name.clone() {
Some(repo_name) => repo_name,
Expand All @@ -109,38 +108,18 @@ impl RepoShardedProcess for BookmarkValidateProcess {
source_repo_name,
target_repo_name,
);
let ctx =
create_core_context(self.fb, logger.clone()).clone_with_repo_name(&repo.to_string());
let config_store = self.matches.config_store().clone();
let source_repo_id =
args::resolve_repo_by_name(&config_store, &self.matches, &source_repo_name)?.id;
let target_repo_id =
args::resolve_repo_by_name(&config_store, &self.matches, &target_repo_name)?.id;

let syncers = create_commit_syncers_from_matches::<Repo>(
&ctx,
&self.matches,
Some((source_repo_id, target_repo_id)),
)
.await?;
if syncers.large_to_small.get_large_repo().repo_identity().id() != source_repo_id {
let details = format!(
"Source repo must be a large repo!. Source repo: {}, Target repo: {}",
source_repo_name, target_repo_name
);
error!(logger, "{}", details);
bail!("{}", details);
}
let repo_args = SourceAndTargetRepoArgs::with_source_and_target_repo_name(
source_repo_name.clone(),
target_repo_name.clone(),
);

let executor =
BookmarkValidateProcessExecutor::new(self.ctx.clone(), self.app.clone(), repo_args)
.await?;

let details = format!(
"Completed bookmark validate command setup from repo {} to repo {}",
source_repo_name, target_repo_name
);
let executor = BookmarkValidateProcessExecutor::new(
syncers,
ctx,
env.clone(),
source_repo_name,
target_repo_name,
&source_repo_name, &target_repo_name
);
info!(logger, "{}", details,);
Ok(Arc::new(executor))
Expand All @@ -150,30 +129,55 @@ impl RepoShardedProcess for BookmarkValidateProcess {
/// Struct representing the execution of the Bookmark Validate
/// BP over the context of a provided repos.
pub struct BookmarkValidateProcessExecutor {
syncers: Syncers<Repo>,
ctx: CoreContext,
syncers: Syncers<Arc<Repo>>,
ctx: Arc<CoreContext>,
env: Arc<MononokeEnvironment>,
cancellation_requested: Arc<AtomicBool>,
source_repo_name: String,
target_repo_name: String,
}

impl BookmarkValidateProcessExecutor {
fn new(
syncers: Syncers<Repo>,
ctx: CoreContext,
env: Arc<MononokeEnvironment>,
source_repo_name: String,
target_repo_name: String,
) -> Self {
Self {
async fn new(
ctx: Arc<CoreContext>,
app: Arc<MononokeApp>,
repo_args: SourceAndTargetRepoArgs,
) -> Result<Self> {
let env = app.environment().clone();
let logger = ctx.logger();

let source_repo: Arc<Repo> = app.open_repo_unredacted(&repo_args.source_repo).await?;
let target_repo: Arc<Repo> = app.open_repo_unredacted(&repo_args.target_repo).await?;

let syncers = create_commit_syncers_from_app(
&ctx,
app.as_ref(),
source_repo.clone(),
target_repo.clone(),
)
.await?;

let source_repo_id = source_repo.repo_identity().id();
let source_repo_name = source_repo.repo_identity().name();
let target_repo_name = target_repo.repo_identity().name();

if syncers.large_to_small.get_large_repo().repo_identity().id() != source_repo_id {
let details = format!(
"Source repo must be a large repo!. Source repo: {}, Target repo: {}",
&source_repo_name, &target_repo_name
);
error!(logger, "{}", details);
bail!("{}", details);
}

Ok(Self {
syncers,
ctx,
env,
source_repo_name,
target_repo_name,
source_repo_name: source_repo_name.to_string(),
target_repo_name: target_repo_name.to_string(),
cancellation_requested: Arc::new(AtomicBool::new(false)),
}
})
}
}

Expand All @@ -187,7 +191,7 @@ impl RepoShardedProcessExecutor for BookmarkValidateProcessExecutor {
&self.target_repo_name,
);
loop_forever(
&self.ctx,
self.ctx.as_ref(),
&self.env,
self.syncers.clone(),
Arc::clone(&self.cancellation_requested),
Expand Down Expand Up @@ -220,72 +224,65 @@ impl RepoShardedProcessExecutor for BookmarkValidateProcessExecutor {
}
}

#[fbinit::main]
fn main(fb: FacebookInit) -> Result<(), Error> {
let process = BookmarkValidateProcess::new(fb)?;
let logger = process.matches.logger().clone();
let matches = process.matches.clone();
let env = matches.environment();

match process.matches.value_of("sharded-service-name") {
Some(service_name) => {
// The service name needs to be 'static to satisfy SM contract
static SM_SERVICE_NAME: OnceLock<String> = OnceLock::new();
let mut executor = ShardedProcessExecutor::new(
process.fb,
process.matches.runtime().clone(),
&logger,
SM_SERVICE_NAME.get_or_init(|| service_name.to_string()),
SM_SERVICE_SCOPE,
SM_CLEANUP_TIMEOUT_SECS,
Arc::new(process),
true, // enable shard (repo) level healing
)?;
helpers::block_execute(
executor.block_and_execute(&logger, Arc::new(AtomicBool::new(false))),
fb,
&std::env::var("TW_JOB_NAME").unwrap_or_else(|_| APP_NAME.to_string()),
matches.logger(),
&matches,
cmdlib::monitoring::AliveService,
)
}
None => {
let runtime = matches.runtime();
let ctx = create_core_context(fb, logger.clone());
let config_store = matches.config_store();
let source_repo_id =
args::not_shardmanager_compatible::get_source_repo_id(config_store, &matches)?;
let syncers = runtime.block_on(create_commit_syncers_from_matches::<Repo>(
&ctx, &matches, None,
))?;
if syncers.large_to_small.get_large_repo().repo_identity().id() != source_repo_id {
return Err(format_err!("Source repo must be a large repo!"));
}
helpers::block_execute(
loop_forever(&ctx, env, syncers, Arc::new(AtomicBool::new(false))),
fb,
APP_NAME,
&logger,
&matches,
cmdlib::monitoring::AliveService,
)
}
async fn async_main(app: MononokeApp, ctx: CoreContext) -> Result<(), Error> {
let args: BookmarkValidatorArgs = app.args()?;
let ctx = Arc::new(ctx);
let app = Arc::new(app);
let repo_args = args.repo_args.clone();
let runtime = app.runtime().clone();

if let Some(mut executor) = args.sharded_executor_args.clone().build_executor(
app.fb,
runtime.clone(),
ctx.logger(),
|| Arc::new(BookmarkValidateProcess::new(ctx.clone(), app.clone())),
true, // enable shard (repo) level healing
SM_CLEANUP_TIMEOUT_SECS,
)? {
executor
.block_and_execute(ctx.logger(), Arc::new(AtomicBool::new(false)))
.await
} else {
let repo_args = repo_args
.into_source_and_target_args()
.context("Source and Target repos must be provided when running in non-sharded mode")?;
let x_repo_process_executor =
BookmarkValidateProcessExecutor::new(ctx.clone(), app, repo_args).await?;
x_repo_process_executor.execute().await
}
}

fn create_core_context(fb: FacebookInit, logger: Logger) -> CoreContext {
#[fbinit::main]
fn main(fb: FacebookInit) -> Result<()> {
let app: MononokeApp = MononokeAppBuilder::new(fb)
.with_app_extension(MonitoringAppExtension {})
.build::<BookmarkValidatorArgs>()?;

let mut metadata = Metadata::default();
metadata.add_client_info(ClientInfo::default_with_entry_point(
ClientEntryPoint::MegarepoBookmarksValidator,
));

let mut scuba = app.environment().scuba_sample_builder.clone();
scuba.add_metadata(&metadata);

let session_container = SessionContainer::builder(fb)
.metadata(Arc::new(metadata))
.build();
let scuba_sample = MononokeScubaSampleBuilder::with_discard();

session_container.new_context(logger, scuba_sample)
let ctx = session_container.new_context(app.logger().clone(), scuba);

info!(
ctx.logger(),
"Starting session with id {}",
ctx.metadata().session_id(),
);

app.run_with_monitoring_and_logging(
|app| async_main(app, ctx.clone()),
"bookmarks_validator",
AliveService,
)
}

async fn loop_forever<R: CrossRepo>(
Expand Down

0 comments on commit 7c70efc

Please sign in to comment.