-
Notifications
You must be signed in to change notification settings - Fork 4
Handlers
Handlers are application code that is run when jobs are available on a queue.
Handler concurrency is configurable, allowing job processing to be parallelized over multiple CPUs, or serialized over just one.
Handlers are always associated with a single queue.
The simplest Handler consists of the queue name that it processes, and the function to run when jobs are available on that queue.
e.g. A handler that listens on the my_jobs
queue and prints got a job
to stdout:
h := handler.New("my_jobs", func(ctx context.Context) (err error) {
fmt.Println("got a job")
return
}))
Dont forget to start listening on this queue by calling Start()
:
err = nq.Start(ctx, h)
if err != nil {
log.Println("error listening to queue", err)
}
Of course, this example is very simple, and the handler doesn't even inspect the Job
that triggered it. See below for more detailed examples.
Typically, a handler needs to know more about the Job
that triggered it. The Job
is available on the context.Context
passed into our handlers, which can be accessed as follows:
h := handler.New("my_jobs", func(ctx context.Context) (err error) {
var j *jobs.Job
j, err = jobs.FromContext(ctx)
if err != nil {
return err
}
log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
return
})
Handlers can be serialized (concurrency 1) or paralellized (concurrency >= 2)
Example
Concurrently fetch up to 4
jobs off the my_jobs
queue.
h := handler.New(queue, func(ctx context.Context) (err error) {
var j *jobs.Job
j, _ = jobs.FromContext(ctx)
log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
return
}, handler.Concurrency(4))
The job timeout enforces a strict duration that the handler may spend handling a single job. Jobs that run longer than JobTimeout
are retried with exponential backoff.
Example
Jobs on my_jobs
will never succeed because the handler is given only one millisecond per job to complete its work, but the handler function sleeps for 1 second. Jobs will be retried, but never succeed, and eventually will be moved to the dead queue.
h := handler.New("my_jobs", func(ctx context.Context) (err error) {
time.Sleep(1 * time.Second)
return
}, handler.JobTimeout(1*time.Millisecond))