Skip to content

Commit

Permalink
feat: add debounce and repeating task to utils (#2795)
Browse files Browse the repository at this point in the history
This functionality is required in multiple places so add it to the
utils module.
  • Loading branch information
achingbrain committed Nov 2, 2024
1 parent ad5cfd6 commit 0a3406a
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 0 deletions.
8 changes: 8 additions & 0 deletions packages/utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@
"types": "./dist/src/close-source.d.ts",
"import": "./dist/src/close-source.js"
},
"./debounce": {
"types": "./dist/src/debounce.d.ts",
"import": "./dist/src/debounce.js"
},
"./filters": {
"types": "./dist/src/filters/index.d.ts",
"import": "./dist/src/filters/index.js"
Expand Down Expand Up @@ -112,6 +116,10 @@
"types": "./dist/src/rate-limiter.d.ts",
"import": "./dist/src/rate-limiter.js"
},
"./repeating-task": {
"types": "./dist/src/repeating-task.d.ts",
"import": "./dist/src/repeating-task.js"
},
"./stream-to-ma-conn": {
"types": "./dist/src/stream-to-ma-conn.d.ts",
"import": "./dist/src/stream-to-ma-conn.js"
Expand Down
30 changes: 30 additions & 0 deletions packages/utils/src/debounce.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import type { Startable } from '@libp2p/interface'

export interface DebouncedFunction extends Startable {
(): void
}

/**
* Returns a function wrapper that will only call the passed function once
*
* Important - the passed function should not throw or reject
*/
export function debounce (func: () => void | Promise<void>, wait: number): DebouncedFunction {
let timeout: ReturnType<typeof setTimeout> | undefined

const output = function (): void {
const later = function (): void {
timeout = undefined
void func()
}

clearTimeout(timeout)
timeout = setTimeout(later, wait)
}
output.start = () => {}
output.stop = () => {
clearTimeout(timeout)
}

return output
}
82 changes: 82 additions & 0 deletions packages/utils/src/repeating-task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { setMaxListeners } from '@libp2p/interface'
import { anySignal } from 'any-signal'
import type { AbortOptions } from '@libp2p/interface'

export interface RepeatingTask {
start(): void
stop(): void
}

export interface RepeatingTaskOptions {
/**
* How long the task is allowed to run before the passed AbortSignal fires an
* abort event
*/
timeout?: number

/**
* Whether to schedule the task to run immediately
*/
runImmediately?: boolean
}

export function repeatingTask (fn: (options?: AbortOptions) => void | Promise<void>, interval: number, options?: RepeatingTaskOptions): RepeatingTask {
let timeout: ReturnType<typeof setTimeout>
let shutdownController: AbortController

function runTask (): void {
const opts: AbortOptions = {
signal: shutdownController.signal
}

if (options?.timeout != null) {
const signal = anySignal([shutdownController.signal, AbortSignal.timeout(options.timeout)])
setMaxListeners(Infinity, signal)

opts.signal = signal
}

Promise.resolve().then(async () => {
await fn(opts)
})
.catch(() => {})
.finally(() => {
if (shutdownController.signal.aborted) {
// task has been cancelled, bail
return
}

// reschedule
timeout = setTimeout(runTask, interval)
})
}

let started = false

return {
start: () => {
if (started) {
return
}

started = true
shutdownController = new AbortController()
setMaxListeners(Infinity, shutdownController.signal)

// run now
if (options?.runImmediately === true) {
queueMicrotask(() => {
runTask()
})
} else {
// run later
timeout = setTimeout(runTask, interval)
}
},
stop: () => {
clearTimeout(timeout)
shutdownController?.abort()
started = false
}
}
}
46 changes: 46 additions & 0 deletions packages/utils/test/debounce.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { stop } from '@libp2p/interface'
import { expect } from 'aegir/chai'
import delay from 'delay'
import { debounce } from '../src/debounce.js'

describe('debounce', () => {
it('should debounce function', async () => {
let invocations = 0
const fn = (): void => {
invocations++
}

const debounced = debounce(fn, 10)

debounced()
debounced()
debounced()
debounced()
debounced()

await delay(500)

expect(invocations).to.equal(1)
})

it('should cancel debounced function', async () => {
let invocations = 0
const fn = (): void => {
invocations++
}

const debounced = debounce(fn, 10000)

debounced()
debounced()
debounced()
debounced()
debounced()

await stop(debounced)

await delay(500)

expect(invocations).to.equal(0)
})
})
70 changes: 70 additions & 0 deletions packages/utils/test/repeating-task.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { expect } from 'aegir/chai'
import delay from 'delay'
import pDefer from 'p-defer'
import { repeatingTask } from '../src/repeating-task.js'

describe('repeating-task', () => {
it('should repeat a task', async () => {
let count = 0

const task = repeatingTask(() => {
count++
}, 100)
task.start()

await delay(1000)

task.stop()

expect(count).to.be.greaterThan(1)
})

it('should run a task immediately', async () => {
let count = 0

const task = repeatingTask(() => {
count++
}, 60000, {
runImmediately: true
})
task.start()

await delay(10)

task.stop()

expect(count).to.equal(1)
})

it('should time out a task', async () => {
const deferred = pDefer()

const task = repeatingTask((opts) => {
opts?.signal?.addEventListener('abort', () => {
deferred.resolve()
})
}, 100, {
timeout: 10
})
task.start()

await deferred.promise
task.stop()
})

it('should repeat a task that throws', async () => {
let count = 0

const task = repeatingTask(() => {
count++
throw new Error('Urk!')
}, 100)
task.start()

await delay(1000)

task.stop()

expect(count).to.be.greaterThan(1)
})
})
2 changes: 2 additions & 0 deletions packages/utils/typedoc.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"./src/array-equals.ts",
"./src/close.ts",
"./src/close-source.ts",
"./src/debounce.ts",
"./src/filters/index.ts",
"./src/ip-port-to-multiaddr.ts",
"./src/is-promise.ts",
Expand All @@ -18,6 +19,7 @@
"./src/private-ip.ts",
"./src/queue/index.ts",
"./src/rate-limiter.ts",
"./src/repeating-task.ts",
"./src/stream-to-ma-conn.ts",
"./src/tracked-list.ts",
"./src/tracked-map.ts"
Expand Down

0 comments on commit 0a3406a

Please sign in to comment.