Skip to content

Commit

Permalink
Use AbortSignal to interrupt jobs after shutdown timeout
Browse files Browse the repository at this point in the history
- private AbortController created upon .work()
- After stop() and shutdown timeout, in-progress jobs are sent an abort
  signal and subsequently FAILed on the server.
- 3 seconds delay is added after abort signal is sent to allow cleanup
- exit code when jobs are aborted at end of grace period is 1

A job may be interrupted when a worker shuts down. In this case there
are two mechanisms to ensure graceful interruption: the shutdown timeout
and the execution context `AbortSignal`. The shutdown timeout is
configured in `WorkerOptions.timeout`. When a worker is instructed to
stop (via process signal or server message), it will stop accepting new
work (e.g. `quiet`) and wait the configured duration for any in-progress
jobs to complete uninterrupted. If this duration elapses and jobs are
still in progress, these jobs will receive an AbortSignal via
`Context.signal`. All jobs will be `FAIL`ed on the Faktory server,
allowing them to retry later. The abort signal can be used to interrupt
asynchronous processes and perform some cleanup tasks before an abrupt
exit (`process.exit`). After the abort signal is sent, a job will have 3
seconds to perform cleanup before the process is abruptly exited.
  • Loading branch information
jbielick committed Dec 19, 2024
1 parent 7467256 commit 78866d6
Show file tree
Hide file tree
Showing 9 changed files with 1,324 additions and 908 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 4.7.0 | 2024-12-17

Features:

- Job context now includes property `signal: AbortSignal` for job functions to use for more graceful shutdowns during a hard shutdown. Thanks, @knpwrs (#409)

## 4.6.0 | 2024-07-24

Features:
Expand Down
59 changes: 44 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

A node.js client and worker library for the [Faktory](https://github.com/contribsys/faktory) job server. The client allows you to push jobs and communicate with the Faktory server and the worker fetches background jobs from the Faktory server and processes them.

Faktory server compatibility: `~v1.6.1`
Faktory server compatibility: `>v1.6.1`

## Installation

Expand Down Expand Up @@ -127,6 +127,26 @@ worker.on("fail", ({ job, error }) => {
});
```

### Shutdown and AbortSignal

A job may be interrupted when a worker shuts down. In this case there are two mechanisms to ensure graceful interruption: the shutdown timeout and the execution context `AbortSignal`. The shutdown timeout is configured in `WorkerOptions.timeout`. When a worker is instructed to stop (via process signal or server message), it will stop accepting new work (e.g. `quiet`) and wait the configured duration for any in-progress jobs to complete uninterrupted. If this duration elapses and jobs are still in progress, these jobs will receive an AbortSignal via `Context.signal`. All jobs will be `FAIL`ed on the Faktory server, allowing them to retry later. The abort signal can be used to interrupt asynchronous processes and perform some cleanup tasks before an abrupt exit (`process.exit`). After the abort signal is sent, a job will have 3 seconds to perform cleanup before the process is abruptly exited.

Example - A long-running subprocess:

```js
faktory.register("JobWithAbort", (...args) => async ({ signal }) => {
try {
await execa("ffmpeg", [/* arg1, arg2, ..., argN */], { cancelSignal: signal });
} catch (e) {
if (e.code === "ABORT_ERR") {
// Remove some tempfiles or other type of cleanup...
// Propagating the ABORT_ERR is not necessary, the job will be FAILed if it was in-progress
// at the end of the shutdown timeout
}
}
});
```

### Middleware

Faktory middleware works just like [`koa`](https://github.com/koajs/koa) middleware. You can register a middleware function (async or sync) with `.use`. Middleware is called for every job that is performed. Always return a promise, `await next()`, or `return next();` to allow execution to continue down the middleware chain.
Expand Down Expand Up @@ -164,31 +184,40 @@ Here are the defaults:

```js
await faktory.work({
// The Faktory server host.
// Will be extracted from FAKTORY_URL ENV variable if set.
host: process.env.FAKTORY_URL || "127.0.0.1",

// default: 7419 -- can extracted from FAKTORY_URL env var
// The port the Faktory server is listening on (default: 7419).
// Will be extracted from FAKTORY_URL ENV variable if set.
port: 7419,

// can extracted from FAKTORY_URL env var
// The Faktory server password.
// Will be extracted from FAKTORY_URL ENV variable if set.
password: undefined,

// this is a max number of jobs the worker will have
// in progress at any time
// This sets the maximum number of jobs the worker may have in
// progress at any time.
concurrency: 20,

// the queues the worker will process—remember to preserve default if overriding this
// default fetching behavior is **Strictly Ordered**
// The list of queues the worker will fetch and process jobs from.
// Remember to preserve `default` in this list if overriding.
// Queues can be specified in two ways: **strictly ordered** and
// **weighted random**. See documentation above.
queues: ["default"],

// the number of milliseconds jobs have to complete after
// receiving a graceful shutdown signal. After this timeout, in-progress jobs may be abruptly stopped.
timeout: 8 * 1000,
// When the worker is asked to stop, in-progress jobs have this many
// seconds to finish their work before an AbortSignal is sent.
// Jobs have 3 seconds after the AbortSignal to clean up before an
// abrupt process exit occurs.
timeout: 8,

// the worker id to use in the faktory-server connection
// for this process. must be unique per process.
// The worker id to use in the faktory-server connection
// for this worker. Omit for a pseudo-randomly generated id.
// Must be unique per process.
wid: uuid().first(8),

// labels for the faktory worker process to see in the UI
// Labels for this worker as can be seen in the UI.
labels: [],
});
```
Expand All @@ -212,9 +241,8 @@ The function passed to `register` can be a thunk. The registered function will r
```js
faktory.register("JobWithHeaders", (...args) => async ({ job }) => {
const [email] = args;
I18n.locale = job.custom.locale;
log(job.custom.txid);
await sendEmail(email);
await sendEmail(email, { locale: job.custom.locale });
});
```

Expand Down Expand Up @@ -271,6 +299,7 @@ faktory.register("TouchRecord", (id) => async ({ db }) => {
- [x] Fail jobs
- [x] Add'l client commands API
- [x] Labels
- [x] AbortController

## Development

Expand Down
Loading

0 comments on commit 78866d6

Please sign in to comment.