-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: clear stale queues #1952
feat: clear stale queues #1952
Changes from 10 commits
7218788
0512a25
3972c59
66c9984
a0be9dd
49dd6a2
378078d
75f828b
2c17244
8c005fb
fb1ac1a
781c216
85f3969
27161cd
f011db9
a8197e7
4857797
b92e726
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -367,7 +367,12 @@ class RetryQueue implements IQueue<QueueItemData> { | |
let queue = | ||
(this.getStorageEntry(QueueStatuses.QUEUE) as Nullable<QueueItem<QueueItemData>[]>) ?? []; | ||
|
||
queue = queue.slice(-(this.maxItems - 1)); | ||
if (this.maxItems > 1) { | ||
queue = queue.slice(-(this.maxItems - 1)); | ||
} else { | ||
queue = []; | ||
} | ||
|
||
queue.push(curEntry); | ||
queue = queue.sort(sortByTime); | ||
|
||
|
@@ -541,7 +546,16 @@ class RetryQueue implements IQueue<QueueItemData> { | |
const willBeRetried = this.shouldRetry(el.item, el.attemptNumber + 1); | ||
this.processQueueCb(el.item, el.done, el.attemptNumber, this.maxAttempts, willBeRetried); | ||
} catch (err) { | ||
// drop the event from in progress queue as we're unable to process it | ||
el.done(); | ||
this.logger?.error(RETRY_QUEUE_PROCESS_ERROR(RETRY_QUEUE), err); | ||
this.logger?.error('Debugging data dump starts'); | ||
this.logger?.error('Queue item', el); | ||
this.logger?.error('RetryQueue Instance', this); | ||
this.logger?.error('Primary Queue', this.getStorageEntry(QueueStatuses.QUEUE)); | ||
this.logger?.error('In-Progress Queue', this.getStorageEntry(QueueStatuses.IN_PROGRESS)); | ||
this.logger?.error('RudderStack Globals', (globalThis as typeof window).RudderStackGlobals); | ||
this.logger?.error('Debugging data dump ends'); | ||
} | ||
}); | ||
|
||
|
@@ -584,9 +598,9 @@ class RetryQueue implements IQueue<QueueItemData> { | |
validKeys: QueueStatuses, | ||
type: LOCAL_STORAGE, | ||
}); | ||
const our = { | ||
queue: (this.getStorageEntry(QueueStatuses.QUEUE) ?? []) as QueueItem[], | ||
}; | ||
|
||
const reclaimedQueueItems: QueueItem[] = []; | ||
|
||
const their = { | ||
inProgress: other.get(QueueStatuses.IN_PROGRESS) ?? {}, | ||
batchQueue: other.get(QueueStatuses.BATCH_QUEUE) ?? [], | ||
|
@@ -609,7 +623,7 @@ class RetryQueue implements IQueue<QueueItemData> { | |
// and the new entries will have the type field set | ||
const type = Array.isArray(el.item) ? BATCH_QUEUE_ITEM_TYPE : SINGLE_QUEUE_ITEM_TYPE; | ||
|
||
our.queue.push({ | ||
reclaimedQueueItems.push({ | ||
item: el.item, | ||
attemptNumber: el.attemptNumber + incrementAttemptNumberBy, | ||
time: this.schedule.now(), | ||
|
@@ -652,9 +666,15 @@ class RetryQueue implements IQueue<QueueItemData> { | |
// if the queue is abandoned, all the in-progress are failed. retry them immediately and increment the attempt# | ||
addConcatQueue(their.inProgress, 1); | ||
|
||
our.queue = our.queue.sort(sortByTime); | ||
let ourQueue = (this.getStorageEntry(QueueStatuses.QUEUE) as QueueItem[]) ?? []; | ||
const roomInQueue = Math.max(0, this.maxItems - ourQueue.length); | ||
if (roomInQueue > 0) { | ||
ourQueue.push(...reclaimedQueueItems.slice(0, roomInQueue)); | ||
} | ||
|
||
ourQueue = ourQueue.sort(sortByTime); | ||
|
||
this.setStorageEntry(QueueStatuses.QUEUE, our.queue); | ||
this.setStorageEntry(QueueStatuses.QUEUE, ourQueue); | ||
|
||
// remove all keys one by on next tick to avoid NS_ERROR_STORAGE_BUSY error | ||
this.clearQueueEntries(other, 1); | ||
|
@@ -704,40 +724,6 @@ class RetryQueue implements IQueue<QueueItemData> { | |
} | ||
|
||
checkReclaim() { | ||
const createReclaimStartTask = (store: IStore) => () => { | ||
if (store.get(QueueStatuses.RECLAIM_END) !== this.id) { | ||
return; | ||
} | ||
|
||
if (store.get(QueueStatuses.RECLAIM_START) !== this.id) { | ||
return; | ||
} | ||
|
||
this.reclaim(store.id); | ||
}; | ||
const createReclaimEndTask = (store: IStore) => () => { | ||
if (store.get(QueueStatuses.RECLAIM_START) !== this.id) { | ||
return; | ||
} | ||
|
||
store.set(QueueStatuses.RECLAIM_END, this.id); | ||
|
||
this.schedule.run( | ||
createReclaimStartTask(store), | ||
this.timeouts.reclaimWait, | ||
ScheduleModes.ABANDON, | ||
); | ||
}; | ||
const tryReclaim = (store: IStore) => { | ||
store.set(QueueStatuses.RECLAIM_START, this.id); | ||
store.set(QueueStatuses.ACK, this.schedule.now()); | ||
|
||
this.schedule.run( | ||
createReclaimEndTask(store), | ||
this.timeouts.reclaimWait, | ||
ScheduleModes.ABANDON, | ||
); | ||
}; | ||
const findOtherQueues = (name: string): IStore[] => { | ||
const res: IStore[] = []; | ||
const storageEngine = this.store.getOriginalEngine(); | ||
|
@@ -778,15 +764,8 @@ class RetryQueue implements IQueue<QueueItemData> { | |
return res; | ||
}; | ||
|
||
findOtherQueues(this.name).forEach(store => { | ||
if (this.schedule.now() - store.get(QueueStatuses.ACK) < this.timeouts.reclaimTimeout) { | ||
return; | ||
} | ||
|
||
tryReclaim(store); | ||
}); | ||
|
||
this.schedule.run(this.checkReclaim, this.timeouts.reclaimTimer, ScheduleModes.RESCHEDULE); | ||
// Instead of reclaiming stale queues, clear them | ||
findOtherQueues(this.name).forEach(store => this.clearQueueEntries(store, 0)); | ||
Comment on lines
+830
to
+831
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Potential data loss by clearing stale queues instead of reclaiming Changing the Consider evaluating whether it's acceptable to discard these items. If not, you might need to implement a mechanism to reclaim unprocessed items from stale queues to ensure data integrity. |
||
} | ||
|
||
clear() { | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,7 +1,7 @@ | ||||||
import { LOG_CONTEXT_SEPARATOR } from '../../shared-chunks/common'; | ||||||
|
||||||
const RETRY_QUEUE_PROCESS_ERROR = (context: string): string => | ||||||
`${context}${LOG_CONTEXT_SEPARATOR}Process function threw an error.`; | ||||||
`${context}${LOG_CONTEXT_SEPARATOR}Process function threw an error while processing the queue item. The item is be dropped.`; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix grammatical error in error message The error message contains a grammatical error: "The item is be dropped". This should be either "The item is dropped" or "The item will be dropped". Apply this fix: - `${context}${LOG_CONTEXT_SEPARATOR}Process function threw an error while processing the queue item. The item is be dropped.`;
+ `${context}${LOG_CONTEXT_SEPARATOR}Process function threw an error while processing the queue item. The item will be dropped.`; 📝 Committable suggestion
Suggested change
|
||||||
|
||||||
const RETRY_QUEUE_ENTRY_REMOVE_ERROR = (context: string, entry: string, attempt: number): string => | ||||||
`${context}${LOG_CONTEXT_SEPARATOR}Failed to remove local storage entry "${entry}" (attempt: ${attempt}.`; | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix potential queue size management issue.
The current implementation has a logic flaw in queue size management. When
maxItems
is 1, it clears the queue before adding the new item, which could lead to data loss if the new item fails to be added.📝 Committable suggestion