Skip to content

Commit

Permalink
fix(JingleSessionPC): call queued task callbacks upon queue clearing (j…
Browse files Browse the repository at this point in the history
…itsi#2370)

This ensures that promises relying on the task callback will settle.
  • Loading branch information
Auxane committed Oct 11, 2023
1 parent cb416cf commit 5a2cb53
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 1 deletion.
20 changes: 20 additions & 0 deletions modules/util/AsyncQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,19 @@ import { queue } from 'async-es';

const logger = getLogger(__filename);

/**
* Error to be passed to a callback of a queued task when the queue is cleared.
*/
export class ClearedQueueError extends Error {
/**
* Creates new instance.
*/
constructor(message) {
super(message);
this.name = 'ClearedQueueError';
}
}

/**
* A queue for async task execution.
*/
Expand All @@ -13,12 +26,16 @@ export default class AsyncQueue {
constructor() {
this._queue = queue(this._processQueueTasks.bind(this), 1);
this._stopped = false;
this._taskCallbacks = new Map();
}

/**
* Removes any pending tasks from the queue.
*/
clear() {
for (const finishedCallback of this._taskCallbacks.values()) {
finishedCallback(new ClearedQueueError('The queue has been cleared'));
}
this._queue.kill();
}

Expand All @@ -31,6 +48,8 @@ export default class AsyncQueue {
} catch (error) {
logger.error(`Task failed: ${error?.stack}`);
finishedCallback(error);
} finally {
this._taskCallbacks.delete(task);
}
}

Expand Down Expand Up @@ -64,6 +83,7 @@ export default class AsyncQueue {

return;
}
this._taskCallbacks.set(task, callback);
this._queue.push(task, callback);
}

Expand Down
22 changes: 21 additions & 1 deletion modules/xmpp/JingleSessionPC.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import SDP from '../sdp/SDP';
import SDPDiffer from '../sdp/SDPDiffer';
import SDPUtil from '../sdp/SDPUtil';
import Statistics from '../statistics/statistics';
import AsyncQueue from '../util/AsyncQueue';
import AsyncQueue, { ClearedQueueError } from '../util/AsyncQueue';
import GlobalOnErrorHandler from '../util/GlobalOnErrorHandler';

import browser from './../browser';
Expand Down Expand Up @@ -1321,6 +1321,11 @@ export default class JingleSessionPC extends JingleSession {
workFunction,
error => {
if (error) {
if (error instanceof ClearedQueueError) {
// The session might have been terminated before the task was executed, making it obsolete.
logger.debug(`${this} ICE restart task aborted: session terminated`);
success();
}
logger.error(`${this} ICE restart task failed: ${error}`);
failure(error);
} else {
Expand Down Expand Up @@ -2194,6 +2199,11 @@ export default class JingleSessionPC extends JingleSession {
workFunction,
error => {
if (error) {
if (error instanceof ClearedQueueError) {
// The session might have been terminated before the task was executed, making it obsolete.
logger.debug(`${this} renegotiation after addTrack aborted: session terminated`);
resolve();
}
logger.error(`${this} renegotiation after addTrack error`, error);
reject(error);
} else {
Expand Down Expand Up @@ -2306,6 +2316,11 @@ export default class JingleSessionPC extends JingleSession {
workFunction,
error => {
if (error) {
if (error instanceof ClearedQueueError) {
// The session might have been terminated before the task was executed, making it obsolete.
logger.debug('Replace track aborted: session terminated');
resolve();
}
logger.error(`${this} Replace track error:`, error);
reject(error);
} else {
Expand Down Expand Up @@ -2504,6 +2519,11 @@ export default class JingleSessionPC extends JingleSession {
workFunction,
error => {
if (error) {
if (error instanceof ClearedQueueError) {
// The session might have been terminated before the task was executed, making it obsolete.
logger.debug(`${this} ${operationName} aborted: session terminated`);
resolve();
}
logger.error(`${this} ${operationName} failed`);
reject(error);
} else {
Expand Down
4 changes: 4 additions & 0 deletions types/hand-crafted/modules/util/AsyncQueue.d.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
export class ClearedQueueError extends Error {
constructor();
}

export default class AsyncQueue {
constructor();
push: ( task: ( callback: ( err?: Error ) => void ) => void, callback?: ( err: Error ) => void ) => void; // TODO: check this
Expand Down

0 comments on commit 5a2cb53

Please sign in to comment.