diff --git a/Sources/Queues/QueuesCommand.swift b/Sources/Queues/QueuesCommand.swift index b9ff152..679adc6 100644 --- a/Sources/Queues/QueuesCommand.swift +++ b/Sources/Queues/QueuesCommand.swift @@ -196,7 +196,7 @@ public final class QueuesCommand: Command { } // stop all scheduled jobs self.scheduledTasks.values.forEach { - $0.task.syncCancel(on: self.eventLoopGroup.next()) + $0.task.cancel() } } diff --git a/Sources/Queues/ScheduledJob.swift b/Sources/Queues/ScheduledJob.swift index 6c57c72..04b6e6a 100644 --- a/Sources/Queues/ScheduledJob.swift +++ b/Sources/Queues/ScheduledJob.swift @@ -1,7 +1,8 @@ -import class NIO.RepeatedTask +import struct NIO.Scheduled /// Describes a job that can be scheduled and repeated public protocol ScheduledJob { + /// The name unique to this `ScheduledJob` var name: String { get } /// The method called when the job is run /// - Parameter context: A `JobContext` that can be used @@ -24,10 +25,10 @@ class AnyScheduledJob { extension AnyScheduledJob { struct Task { - let task: RepeatedTask + let task: Scheduled let done: EventLoopFuture } - + func schedule(context: QueueContext) -> Task? { context.logger.trace("Beginning the scheduler process") guard let date = self.scheduler.nextDate() else { @@ -36,15 +37,21 @@ extension AnyScheduledJob { } context.logger.debug("Scheduling \(self.job.name) to run at \(date)") let promise = context.eventLoop.makePromise(of: Void.self) - let task = context.eventLoop.scheduleRepeatedTask( - initialDelay: .microseconds(Int64(date.timeIntervalSinceNow * 1_000_000)), - delay: .seconds(0) - ) { task in - // always cancel - task.cancel() + let initialDelay: TimeAmount = .nanoseconds( + Int64(date.timeIntervalSinceNow * 1000 * 1000 * 1000)) + let task = context.eventLoop.scheduleTask(in: initialDelay) { context.logger.trace("Running the scheduled job \(self.job.name)") self.job.run(context: context).cascade(to: promise) } +// let task = context.eventLoop.scheduleRepeatedTask( +// initialDelay: .nanoseconds(Int64(date.timeIntervalSinceNow * 1000 * 1000 * 1000)), +// delay: .zero +// ) { task in +// // always cancel +// task.cancel() +// context.logger.trace("Running the scheduled job \(self.job.name)") +// self.job.run(context: context).cascade(to: promise) +// } return .init(task: task, done: promise.futureResult) } }