Skip to content

Commit

Permalink
add descriptive message on message broker errors
Browse files Browse the repository at this point in the history
  • Loading branch information
trxcllnt committed Dec 24, 2024
1 parent 33ba14a commit 5ba94d7
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 33 deletions.
57 changes: 57 additions & 0 deletions docs/Distributed.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,63 @@ may request compilation from Linux, Windows or macOS. Linux compilations will
attempt to automatically package the compiler in use, while Windows and macOS
users will need to specify a toolchain for cross-compilation ahead of time.

## Message Brokers

The sccache-dist scheduler and servers communicate via an external message
broker, either an AMQP v0.9.1 implementation (like RabbitMQ) or Redis.

The message broker is a third-party service, and is responsible for reliable
message delivery, acknowledgement, retries, and reporting failures.

All major CSPs provide managed AMQP or Redis services, or you can deploy
RabbitMQ or Redis as part of your infrastructure.

### Managed Message Broker Services

Here is a (non-exhaustive) list of managed message broker services available
in popular cloud service providers.

RabbitMQ:
* [AWS AMQ](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/welcome.html)
* [GCP RabbitMQ Connector](https://cloud.google.com/integration-connectors/docs/connectors/rabbitmq/configure)
* [CloudAMQP](https://www.cloudamqp.com/) and [CloudAMQP on GCP](https://www.cloudamqp.com/docs/google-GCP-marketplace-rabbitmq.html)

Valkey/Redis:
* [AWS ElastiCache for Redis](https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/WhatIs.html)
* [GCP Memorystore](https://cloud.google.com/memorystore/?hl=en)
* [Azure Managed Redis](https://learn.microsoft.com/en-us/azure/azure-cache-for-redis/managed-redis/managed-redis-overview)

### Running locally

For local development or custom deployments, you can run the message broker locally.

#### RabbitMQ
To run `sccache-dist` locally with RabbitMQ as the message broker, either:
1. Run the `rabbitmq` docker container:
```shell
docker run --rm --name sccache-dist-rabbitmq -p 5672:5672 rabbitmq:latest
```
2. Install and run the RabbitMQ service (instructions [here](https://www.rabbitmq.com/docs/platforms))

Then configure `sccache-dist` to use your `rabbitmq` instance via either:
* Setting the `AMQP_ADDR=amqp://127.0.0.1:5672//` environment variable
* Adding `message_broker.amqp = "amqp://127.0.0.1:5672//"` to your scheduler config file

*Note:* The two slashes at the end of `amqp` address above is not a typo.

#### Redis

To run `sccache-dist` locally with Redis as the message broker, either:
1. Run the `redis` docker container:
```shell
docker run --rm --name sccache-dist-redis -p 6379:6379 redis:latest
```
2. Install and run the RabbitMQ service (instructions [here](https://redis.io/docs/latest/operate/oss_and_stack/install/install-redis/))

Then configure `sccache-dist` to use your `rabbitmq` instance via either:
* Setting the `REDIS_ADDR=redis://127.0.0.1:6379/` environment variable
* Adding `message_broker.redis = "redis://127.0.0.1:6379"` to your scheduler config file

## Communication

The HTTP implementation of sccache has the following API, where all HTTP body content is encoded using [`bincode`](http://docs.rs/bincode):
Expand Down
97 changes: 64 additions & 33 deletions src/bin/sccache-dist/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,33 @@ fn main() {
});
}

fn message_broker_uri(message_broker: Option<MessageBroker>) -> Result<String> {
match message_broker {
Some(MessageBroker::AMQP(uri)) => Ok(uri),
Some(MessageBroker::Redis(uri)) => Ok(uri),
None => bail!(
"Missing required message broker configuration!\n\n{}",
message_broker_info_text()
),
}
}

fn message_broker_info_text() -> String {
"\
The sccache-dist scheduler and servers communicate via an external message
broker, either an AMQP v0.9.1 implementation (like RabbitMQ) or Redis.
All major CSPs provide managed AMQP or Redis services, or you can deploy
RabbitMQ or Redis as part of your infrastructure.
For local development, you can install RabbitMQ/Redis services locally or
run their containers.
More details can be found in in the sccache-dist documentation at:
https://github.com/mozilla/sccache/blob/main/docs/Distributed.md#message-brokers"
.into()
}

fn run(command: Command) -> Result<()> {
let num_cpus = std::thread::available_parallelism()?.get();

Expand All @@ -102,24 +129,20 @@ fn run(command: Command) -> Result<()> {
toolchains_fallback,
toolchains,
}) => {
let broker_uri =
match message_broker.expect("Missing required message broker configuration") {
MessageBroker::AMQP(uri) => uri,
MessageBroker::Redis(uri) => uri,
};
let broker_uri = message_broker_uri(message_broker)?;

let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;

runtime.block_on(async move {
let toolchain_storage = sccache::cache::cache::storage_from_config(
&toolchains,
&toolchains_fallback,
&tokio::runtime::Handle::current(),
)
.context("Failed to initialize toolchain storage")?;
let toolchain_storage = sccache::cache::cache::storage_from_config(
&toolchains,
&toolchains_fallback,
runtime.handle(),
)
.context("Failed to initialize toolchain storage")?;

runtime.block_on(async move {
// Verify read/write access to toolchain storage
match toolchain_storage.check().await {
Ok(sccache::cache::CacheMode::ReadWrite) => {}
Expand Down Expand Up @@ -148,7 +171,13 @@ fn run(command: Command) -> Result<()> {
.nacks_enabled(true)
.build()
.await
.unwrap(),
.map_err(|err| {
let err_message = match err {
CeleryError::BrokerError(err) => err.to_string(),
err => err.to_string(),
};
anyhow!("{}\n\n{}", err_message, message_broker_info_text())
})?,
);

task_queue
Expand Down Expand Up @@ -235,32 +264,22 @@ fn run(command: Command) -> Result<()> {
toolchains,
toolchains_fallback,
}) => {
let num_cpus = (num_cpus - num_cpus_to_ignore).max(1) as f64;
let prefetch_count = (num_cpus * max_per_core_load).floor().max(1f64) as u16;

let broker_uri =
match message_broker.expect("Missing required message broker configuration") {
MessageBroker::AMQP(uri) => uri,
MessageBroker::Redis(uri) => uri,
};

let server_id =
env::var("SCCACHE_DIST_SERVER_ID").unwrap_or(uuid::Uuid::new_v4().to_string());
let broker_uri = message_broker_uri(message_broker)?;

let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;

runtime.block_on(async move {
let toolchain_storage = sccache::cache::cache::storage_from_config(
&toolchains,
&toolchains_fallback,
&tokio::runtime::Handle::current(),
)
.context("Failed to initialize toolchain storage")?;
let toolchain_storage = sccache::cache::cache::storage_from_config(
&toolchains,
&toolchains_fallback,
runtime.handle(),
)
.context("Failed to initialize toolchain storage")?;

runtime.block_on(async move {
// Verify toolchain storage
let _ = toolchain_storage
toolchain_storage
.check()
.await
.context("Failed to initialize toolchain storage")?;
Expand All @@ -271,6 +290,11 @@ fn run(command: Command) -> Result<()> {
toolchain_storage,
)));

let num_cpus = (num_cpus - num_cpus_to_ignore).max(1) as f64;
let prefetch_count = (num_cpus * max_per_core_load).floor().max(1f64) as u16;
let server_id =
env::var("SCCACHE_DIST_SERVER_ID").unwrap_or(uuid::Uuid::new_v4().to_string());

let builder: Box<dyn dist::BuilderIncoming> = match builder {
#[cfg(not(target_os = "freebsd"))]
sccache::config::server::BuilderType::Docker => Box::new(
Expand Down Expand Up @@ -318,7 +342,14 @@ fn run(command: Command) -> Result<()> {
.acks_on_failure_or_timeout(false)
.nacks_enabled(true)
.build()
.await?,
.await
.map_err(|err| {
let err_message = match err {
CeleryError::BrokerError(err) => err.to_string(),
err => err.to_string(),
};
anyhow!("{}\n\n{}", err_message, message_broker_info_text())
})?,
);

task_queue.register_task::<server_run_build>().await?;
Expand Down

0 comments on commit 5ba94d7

Please sign in to comment.