Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Start queue worker with service start instead of first call #34918

Merged
merged 1 commit into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/fifty-apricots-clean.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@rocket.chat/omnichannel-services": patch
---

Fixes a behavior when running microservices that caused queue worker to process just the first 60 seconds of request.

This was due to a mistakenly bound context. Queue Worker was changed to start doing work only after it received the first request.

However, with the introduction of ASL and actual context on calls, the worker registration was absorbing the context of the call that created them, causing service calls happening inside the callbacks to fail because of a timeout.
19 changes: 4 additions & 15 deletions ee/packages/omnichannel-services/src/QueueWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService {
protected retryCount = 5;

// Default delay is 5 seconds
protected retryDelay = 5000;
protected retryDelay = Number(process.env.RETRY_DELAY) || 5000;

protected queue: MessageQueue;

private logger: Logger;

private queueStarted = false;

constructor(
private readonly db: Db,
loggerClass: typeof Logger,
Expand All @@ -28,7 +26,7 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService {
// eslint-disable-next-line new-cap
this.logger = new loggerClass('QueueWorker');
this.queue = new MessageQueue();
this.queue.pollingInterval = 5000;
this.queue.pollingInterval = Number(process.env.POLLING_INTERVAL) || 5000;
}

isServiceNotFoundMessage(message: string): boolean {
Expand All @@ -46,6 +44,7 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService {

try {
await this.createIndexes();
this.registerWorkers();
} catch (e) {
this.logger.fatal(e, 'Fatal error occurred when registering workers');
process.exit(1);
Expand All @@ -55,7 +54,7 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService {
async createIndexes(): Promise<void> {
this.logger.info('Creating indexes for queue worker');

// Library doesnt create indexes by itself, for some reason
// Library doesn't create indexes by itself, for some reason
// This should create the indexes we need and improve queue perf on reading
await this.db.collection(this.queue.collectionName).createIndex({ type: 1 });
await this.db.collection(this.queue.collectionName).createIndex({ rejectedTime: 1 }, { sparse: true });
Expand Down Expand Up @@ -105,8 +104,6 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService {

this.logger.info('Registering workers of type "workComplete"');
this.queue.registerWorker('workComplete', this.workerCallback.bind(this));

this.queueStarted = true;
}

private matchServiceCall(service: string): boolean {
Expand All @@ -123,10 +120,6 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService {
// This is a "generic" job that allows you to call any service
async queueWork<T extends Record<string, unknown>>(queue: Actions, to: string, data: T): Promise<void> {
this.logger.info(`Queueing work for ${to}`);
if (!this.queueStarted) {
this.registerWorkers();
}

if (!this.matchServiceCall(to)) {
// We don't want to queue calls to invalid service names
throw new Error(`Invalid service name ${to}`);
Expand All @@ -150,8 +143,4 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService {
])
.toArray();
}

async isQueueStarted(): Promise<boolean> {
return this.queueStarted;
}
}
Loading