Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run until no more work is pending #134

Merged
merged 2 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions Sources/Queues/QueueWorker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,31 @@ extension Queue {
public struct QueueWorker: Sendable {
let queue: any Queue

/// Actually run the queue. This is a thin wrapper for ELF-style callers.
/// Run the queue until there is no more work to be done.
/// This is a thin wrapper for ELF-style callers.
public func run() -> EventLoopFuture<Void> {
self.queue.eventLoop.makeFutureWithTask {
try await self.run()
try await run()
}
}

/// Pop a job off the queue and try to run it. If no jobs are available, do nothing.

/// Run the queue until there is no more work to be done.
/// This is the main async entrypoint for a queue worker.
public func run() async throws {
while try await self.runOneJob() {}
}

/// Pop a job off the queue and try to run it. If no jobs are available, do
/// nothing. Returns whether a job was run.
private func runOneJob() async throws -> Bool {
var logger = self.queue.logger
logger[metadataKey: "queue"] = "\(self.queue.queueName.string)"
logger.trace("Popping job from queue")

guard let id = try await self.queue.pop().get() else {
// No job found, go around again.
return logger.trace("No pending jobs")
logger.trace("No pending jobs")
return false
}

logger[metadataKey: "job-id"] = "\(id.string)"
Expand All @@ -39,23 +48,26 @@ public struct QueueWorker: Sendable {

guard let job = self.queue.configuration.jobs[data.jobName] else {
logger.warning("No job with the desired name is registered, discarding")
return try await self.queue.clear(id).get()
try await self.queue.clear(id).get()
return false
}

// If the job has a delay that isn't up yet, requeue it.
guard (data.delayUntil ?? .distantPast) < Date() else {
logger.trace("Job is delayed, requeueing for later execution", metadata: ["delayed-until": "\(data.delayUntil ?? .distantPast)"])
return try await self.queue.push(id).get()
try await self.queue.push(id).get()
return false
}

await self.queue.sendNotification(of: "dequeue", logger: logger) {
try await $0.didDequeue(jobId: id.string, eventLoop: self.queue.eventLoop).get()
}

try await self.run(id: id, job: job, jobData: data, logger: logger)
try await self.runOneJob(id: id, job: job, jobData: data, logger: logger)
return true
}

private func run(id: JobIdentifier, job: any AnyJob, jobData: JobData, logger: Logger) async throws {
private func runOneJob(id: JobIdentifier, job: any AnyJob, jobData: JobData, logger: Logger) async throws {
logger.info("Dequeing and running job", metadata: ["attempt": "\(jobData.currentAttempt)", "retries-left": "\(jobData.remainingAttempts)"])
do {
try await job._dequeue(self.queue.context, id: id.string, payload: jobData.payload).get()
Expand Down
12 changes: 8 additions & 4 deletions Tests/QueuesTests/AsyncQueueTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ final class AsyncQueueTests: XCTestCase {

app.get("foo") { req in
try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar"))
try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "baz"))
try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "quux"))
return "done"
}

Expand All @@ -45,8 +47,8 @@ final class AsyncQueueTests: XCTestCase {
XCTAssertEqual(res.body.string, "done")
}

XCTAssertEqual(app.queues.test.queue.count, 1)
XCTAssertEqual(app.queues.test.jobs.count, 1)
XCTAssertEqual(app.queues.test.queue.count, 3)
XCTAssertEqual(app.queues.test.jobs.count, 3)
let job = app.queues.test.first(MyAsyncJob.self)
XCTAssert(app.queues.test.contains(MyAsyncJob.self))
XCTAssertNotNil(job)
Expand All @@ -67,6 +69,8 @@ final class AsyncQueueTests: XCTestCase {

app.get("foo") { req in
try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar"))
try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "baz"))
try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "quux"))
return "done"
}

Expand All @@ -75,8 +79,8 @@ final class AsyncQueueTests: XCTestCase {
XCTAssertEqual(res.body.string, "done")
}

XCTAssertEqual(app.queues.asyncTest.queue.count, 1)
XCTAssertEqual(app.queues.asyncTest.jobs.count, 1)
XCTAssertEqual(app.queues.asyncTest.queue.count, 3)
XCTAssertEqual(app.queues.asyncTest.jobs.count, 3)
let job = app.queues.asyncTest.first(MyAsyncJob.self)
XCTAssert(app.queues.asyncTest.contains(MyAsyncJob.self))
XCTAssertNotNil(job)
Expand Down
28 changes: 28 additions & 0 deletions Tests/QueuesTests/QueueTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,34 @@ final class QueueTests: XCTestCase {
await XCTAssertEqualAsync(try await promise.futureResult.get(), "bar")
}

func testRunUntilEmpty() async throws {
let promise1 = self.app.eventLoopGroup.any().makePromise(of: String.self)
self.app.queues.add(Foo1(promise: promise1))
let promise2 = self.app.eventLoopGroup.any().makePromise(of: String.self)
self.app.queues.add(Foo2(promise: promise2))

self.app.get("foo") { req in
try await req.queue.dispatch(Foo1.self, .init(foo: "bar"))
try await req.queue.dispatch(Foo1.self, .init(foo: "quux"))
try await req.queue.dispatch(Foo2.self, .init(foo: "baz"))
return "done"
}

try await self.app.testable().test(.GET, "foo") { res async in
XCTAssertEqual(res.status, .ok)
XCTAssertEqual(res.body.string, "done")
}

XCTAssertEqual(self.app.queues.test.queue.count, 3)
XCTAssertEqual(self.app.queues.test.jobs.count, 3)
try await self.app.queues.queue.worker.run()
XCTAssertEqual(self.app.queues.test.queue.count, 0)
XCTAssertEqual(self.app.queues.test.jobs.count, 0)

await XCTAssertEqualAsync(try await promise1.futureResult.get(), "quux")
await XCTAssertEqualAsync(try await promise2.futureResult.get(), "baz")
}

func testSettingCustomId() async throws {
let promise = self.app.eventLoopGroup.any().makePromise(of: String.self)
self.app.queues.add(Foo1(promise: promise))
Expand Down