-
Notifications
You must be signed in to change notification settings - Fork 192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(user-data): use semaphore to limit reads COMPASS-7256 #6427
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
import { expect } from 'chai'; | ||
import { Semaphore } from './semaphore'; | ||
|
||
describe('semaphore', function () { | ||
const maxConcurrentOps = 5; | ||
let semaphore: Semaphore; | ||
let taskHandler: (id: number) => Promise<number>; | ||
|
||
beforeEach(() => { | ||
semaphore = new Semaphore(maxConcurrentOps); | ||
taskHandler = async (id: number) => { | ||
const release = await semaphore.waitForRelease(); | ||
const delay = Math.floor(Math.random() * 450) + 50; | ||
try { | ||
await new Promise((resolve) => setTimeout(resolve, delay)); | ||
return id; | ||
} finally { | ||
release(); | ||
} | ||
}; | ||
}); | ||
|
||
it('should run operations concurrently', async function () { | ||
const tasks = Array.from({ length: 10 }, (_, i) => taskHandler(i)); | ||
const results = await Promise.all(tasks); | ||
expect(results).to.have.lengthOf(10); | ||
}); | ||
}); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
export class Semaphore { | ||
private currentCount = 0; | ||
private queue: (() => void)[] = []; | ||
constructor(private maxConcurrentOps: number) {} | ||
|
||
waitForRelease(): Promise<() => void> { | ||
return new Promise((resolve) => { | ||
const attempt = () => { | ||
this.currentCount++; | ||
resolve(this.release.bind(this)); | ||
}; | ||
if (this.currentCount < this.maxConcurrentOps) { | ||
attempt(); | ||
} else { | ||
this.queue.push(attempt); | ||
} | ||
}); | ||
} | ||
|
||
private release() { | ||
this.currentCount--; | ||
if (this.queue.length > 0) { | ||
const next = this.queue.shift(); | ||
next && next(); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ import { createLogger } from '@mongodb-js/compass-logging'; | |
import { getStoragePath } from '@mongodb-js/compass-utils'; | ||
import type { z } from 'zod'; | ||
import writeFile from 'write-file-atomic'; | ||
import { Semaphore } from './semaphore'; | ||
|
||
const { log, mongoLogId } = createLogger('COMPASS-USER-STORAGE'); | ||
|
||
|
@@ -68,6 +69,7 @@ export class UserData<T extends z.Schema> { | |
private readonly serialize: SerializeContent<z.input<T>>; | ||
private readonly deserialize: DeserializeContent; | ||
private readonly getFileName: GetFileName; | ||
private readonly semaphore = new Semaphore(1000); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Node.js thread pool for fs operations ( |
||
|
||
constructor( | ||
private readonly validator: T, | ||
|
@@ -122,7 +124,9 @@ export class UserData<T extends z.Schema> { | |
let data: string; | ||
let stats: Stats; | ||
let handle: fs.FileHandle | undefined = undefined; | ||
let release: (() => void) | undefined = undefined; | ||
try { | ||
release = await this.semaphore.waitForRelease(); | ||
handle = await fs.open(absolutePath, 'r'); | ||
[stats, data] = await Promise.all([ | ||
handle.stat(), | ||
|
@@ -139,6 +143,7 @@ export class UserData<T extends z.Schema> { | |
throw error; | ||
} finally { | ||
await handle?.close(); | ||
release?.(); | ||
} | ||
|
||
try { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this test also run the risk of running into file descriptor limitations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah that's a good point. i'll remove this test.