From 60bc7975c905e9758f78d07d829fb36dee035c1f Mon Sep 17 00:00:00 2001 From: gqc Date: Sun, 5 Jan 2025 20:11:46 +0800 Subject: [PATCH 1/2] feat: package bullmq --- .../fixtures/base-app/src/task/limit.task.ts | 15 ++ packages/bullmq/CHANGELOG.md | 1 + packages/bullmq/README.md | 9 + packages/bullmq/index.d.ts | 20 ++ packages/bullmq/jest.config.js | 8 + packages/bullmq/jest.setup.js | 2 + packages/bullmq/package.json | 37 +++ packages/bullmq/src/config/config.default.ts | 29 ++ packages/bullmq/src/configuration.ts | 44 +++ packages/bullmq/src/constants.ts | 3 + packages/bullmq/src/decorator.ts | 40 +++ packages/bullmq/src/framework.ts | 252 ++++++++++++++++++ packages/bullmq/src/index.ts | 4 + packages/bullmq/src/interface.ts | 33 +++ .../test/fixtures/base-app/package.json | 3 + .../fixtures/base-app/src/configuration.ts | 14 + .../fixtures/base-app/src/task/hello.task.ts | 19 ++ .../fixtures/base-app/src/task/limit.task.ts | 15 ++ .../fixtures/base-app/src/task/queue.task.ts | 16 ++ packages/bullmq/test/index.test.ts | 47 ++++ packages/bullmq/tsconfig.json | 11 + packages/bullmq/typedoc.json | 4 + 22 files changed, 626 insertions(+) create mode 100644 packages/bull/test/fixtures/base-app/src/task/limit.task.ts create mode 100644 packages/bullmq/CHANGELOG.md create mode 100644 packages/bullmq/README.md create mode 100644 packages/bullmq/index.d.ts create mode 100644 packages/bullmq/jest.config.js create mode 100644 packages/bullmq/jest.setup.js create mode 100644 packages/bullmq/package.json create mode 100644 packages/bullmq/src/config/config.default.ts create mode 100644 packages/bullmq/src/configuration.ts create mode 100644 packages/bullmq/src/constants.ts create mode 100644 packages/bullmq/src/decorator.ts create mode 100644 packages/bullmq/src/framework.ts create mode 100644 packages/bullmq/src/index.ts create mode 100644 packages/bullmq/src/interface.ts create mode 100644 packages/bullmq/test/fixtures/base-app/package.json create mode 100644 packages/bullmq/test/fixtures/base-app/src/configuration.ts create mode 100644 packages/bullmq/test/fixtures/base-app/src/task/hello.task.ts create mode 100644 packages/bullmq/test/fixtures/base-app/src/task/limit.task.ts create mode 100644 packages/bullmq/test/fixtures/base-app/src/task/queue.task.ts create mode 100644 packages/bullmq/test/index.test.ts create mode 100644 packages/bullmq/tsconfig.json create mode 100644 packages/bullmq/typedoc.json diff --git a/packages/bull/test/fixtures/base-app/src/task/limit.task.ts b/packages/bull/test/fixtures/base-app/src/task/limit.task.ts new file mode 100644 index 000000000000..6ab7fd47895d --- /dev/null +++ b/packages/bull/test/fixtures/base-app/src/task/limit.task.ts @@ -0,0 +1,15 @@ +import { App, Inject, sleep } from '@midwayjs/core'; +import { Processor, Application } from '../../../../../src'; + +@Processor('limit',2,{},{limiter: { max: 3, duration: 1000 }}) +export class QueueTask { + @App() + app: Application; + + @Inject() + logger; + + async execute(params) { + await sleep(3*1000) + } +} diff --git a/packages/bullmq/CHANGELOG.md b/packages/bullmq/CHANGELOG.md new file mode 100644 index 000000000000..fa4d35e687c4 --- /dev/null +++ b/packages/bullmq/CHANGELOG.md @@ -0,0 +1 @@ +# Change Log \ No newline at end of file diff --git a/packages/bullmq/README.md b/packages/bullmq/README.md new file mode 100644 index 000000000000..45c39ad21028 --- /dev/null +++ b/packages/bullmq/README.md @@ -0,0 +1,9 @@ +# @midwayjs/bullmq + +this is a sub package for midway. + +Document: [https://midwayjs.org](https://midwayjs.org) + +## License + +[MIT]((https://github.com/midwayjs/midway/blob/master/LICENSE)) diff --git a/packages/bullmq/index.d.ts b/packages/bullmq/index.d.ts new file mode 100644 index 000000000000..80571ab680b5 --- /dev/null +++ b/packages/bullmq/index.d.ts @@ -0,0 +1,20 @@ +import { ConnectionOptions } from 'bullmq'; +export * from './dist/index'; +export { Job } from 'bullmq'; +import { IQueueOptions, IWorkerOptions } from './dist/index'; + +declare module '@midwayjs/core/dist/interface' { + // bullmq 新引入了 worker 作为执行任务的实例,一个队列 queue 和 worker 中 connection, prefix 必须一致才能正常执行 + // 所以 config 中 connection, prefix 单独配置 + // eslint-disable-next-line + interface MidwayConfig { + bullmq?: { + connection: ConnectionOptions; + prefix?: string; + defaultQueueOptions?: IQueueOptions; + defaultWorkerOptions?: IWorkerOptions; + clearRepeatJobWhenStart?: boolean; + contextLoggerFormat?: (info: any) => string; + }; + } +} diff --git a/packages/bullmq/jest.config.js b/packages/bullmq/jest.config.js new file mode 100644 index 000000000000..5b131e2a107a --- /dev/null +++ b/packages/bullmq/jest.config.js @@ -0,0 +1,8 @@ +module.exports = { + preset: 'ts-jest', + testEnvironment: 'node', + testPathIgnorePatterns: ['/test/fixtures'], + coveragePathIgnorePatterns: ['/test/'], + setupFilesAfterEnv: ['./jest.setup.js'], + coverageProvider: 'v8', +}; diff --git a/packages/bullmq/jest.setup.js b/packages/bullmq/jest.setup.js new file mode 100644 index 000000000000..53c7930592d0 --- /dev/null +++ b/packages/bullmq/jest.setup.js @@ -0,0 +1,2 @@ +process.env.MIDWAY_TS_MODE = 'true'; +jest.setTimeout(30000); diff --git a/packages/bullmq/package.json b/packages/bullmq/package.json new file mode 100644 index 000000000000..befb515d74f0 --- /dev/null +++ b/packages/bullmq/package.json @@ -0,0 +1,37 @@ +{ + "name": "@midwayjs/bullmq", + "version": "1.0.0", + "description": "midway component for bullmq", + "main": "dist/index.js", + "typings": "index.d.ts", + "scripts": { + "build": "tsc", + "test": "node --require=ts-node/register ../../node_modules/.bin/jest --runInBand", + "cov": "node --require=ts-node/register ../../node_modules/.bin/jest --runInBand --coverage --forceExit" + }, + "keywords": [ + "midway", + "IoC", + "task", + "bullmq", + "plugin" + ], + "author": "guo qicong", + "files": [ + "dist/**/*.js", + "dist/**/*.d.ts", + "index.d.ts" + ], + "license": "MIT", + "devDependencies": { + "@midwayjs/core": "^3.19.0", + "@midwayjs/mock": "^3.19.2" + }, + "dependencies": { + "bullmq": "5.34.6" + }, + "engines": { + "node": ">=12" + }, + "repository": "https://github.com/midwayjs/midway.git" +} diff --git a/packages/bullmq/src/config/config.default.ts b/packages/bullmq/src/config/config.default.ts new file mode 100644 index 000000000000..61578cde32fb --- /dev/null +++ b/packages/bullmq/src/config/config.default.ts @@ -0,0 +1,29 @@ +export const bullmq = { + prefix: '{midway-bullmq}', + connection: { + host: '127.0.0.1', + port: 6379, + }, + defaultQueueOptions: { + defaultJobOptions: { + removeOnComplete: 3, + removeOnFail: 10, + }, + }, + defaultWorkerOptions: { + concurrency: 1, + }, + clearRepeatJobWhenStart: true, + contextLoggerFormat: info => { + const { jobId, from } = info.ctx; + return `${info.timestamp} ${info.LEVEL} ${info.pid} [${jobId} ${from.name}] ${info.message}`; + }, +}; + +export const midwayLogger = { + clients: { + bullLogger: { + fileLogName: 'midway-bull.log', + }, + }, +}; diff --git a/packages/bullmq/src/configuration.ts b/packages/bullmq/src/configuration.ts new file mode 100644 index 000000000000..d4b71f8355e7 --- /dev/null +++ b/packages/bullmq/src/configuration.ts @@ -0,0 +1,44 @@ +import { + Configuration, + Init, + Inject, + MidwayDecoratorService, +} from '@midwayjs/core'; +import * as DefaultConfig from './config/config.default'; +import { BullMQFramework } from './framework'; +import { BULLMQ_QUEUE_KEY } from './constants'; + +@Configuration({ + namespace: 'bullmq', + importConfigs: [ + { + default: DefaultConfig, + }, + ], +}) +export class BullConfiguration { + @Inject() + framework: BullMQFramework; + + @Inject() + decoratorService: MidwayDecoratorService; + + @Init() + async init() { + this.decoratorService.registerPropertyHandler( + BULLMQ_QUEUE_KEY, + ( + propertyName, + meta: { + queueName: string; + } + ) => { + return this.framework.getQueue(meta.queueName); + } + ); + } + + async onReady() { + this.framework.loadConfig(); + } +} diff --git a/packages/bullmq/src/constants.ts b/packages/bullmq/src/constants.ts new file mode 100644 index 000000000000..91255fc2f7c7 --- /dev/null +++ b/packages/bullmq/src/constants.ts @@ -0,0 +1,3 @@ +// task +export const BULLMQ_QUEUE_KEY = 'bullmq:queue'; +export const BULLMQ_PROCESSOR_KEY = 'bullmq:processor'; diff --git a/packages/bullmq/src/decorator.ts b/packages/bullmq/src/decorator.ts new file mode 100644 index 000000000000..e648cd686d27 --- /dev/null +++ b/packages/bullmq/src/decorator.ts @@ -0,0 +1,40 @@ +import { + createCustomPropertyDecorator, + Provide, + saveClassMetadata, + saveModule, + Scope, + ScopeEnum, +} from '@midwayjs/core'; +import { IQueueOptions, IWorkerOptions } from './interface'; +import { BULLMQ_PROCESSOR_KEY, BULLMQ_QUEUE_KEY } from './constants'; +import { JobsOptions } from 'bullmq'; + +export function Processor( + queueName: string, + jobOptions?: JobsOptions, + workerOptions?: IWorkerOptions, + queueOptions?: IQueueOptions +): ClassDecorator { + return function (target: any) { + saveModule(BULLMQ_PROCESSOR_KEY, target); + saveClassMetadata( + BULLMQ_PROCESSOR_KEY, + { + queueName, + jobOptions, + queueOptions, + workerOptions, + }, + target + ); + Provide()(target); + Scope(ScopeEnum.Request)(target); + }; +} + +export function InjectQueue(queueName: string): PropertyDecorator { + return createCustomPropertyDecorator(BULLMQ_QUEUE_KEY, { + queueName, + }); +} diff --git a/packages/bullmq/src/framework.ts b/packages/bullmq/src/framework.ts new file mode 100644 index 000000000000..5e7beee02d3d --- /dev/null +++ b/packages/bullmq/src/framework.ts @@ -0,0 +1,252 @@ +import { + BaseFramework, + extend, + IMidwayBootstrapOptions, + Framework, + getClassMetadata, + listModule, + Utils, + MidwayInvokeForbiddenError, +} from '@midwayjs/core'; +import { + Application, + Context, + IProcessor, + IQueue, + IQueueManager, + IQueueOptions, + IWorkerOptions, +} from './interface'; +import { + Job, + JobsOptions, + Queue, + Worker, + QueueOptions, + ConnectionOptions, +} from 'bullmq'; +import { BULLMQ_PROCESSOR_KEY } from './constants'; + +export class BullMQQueue extends Queue implements IQueue { + constructor(queueName: string, queueOptions: QueueOptions) { + super(queueName, queueOptions); + } + getJob(name: string): Promise { + throw new Error('Method not implemented.'); + } + + // bullmq 在 queue.add 新增第一个参数 jobName + // runJob 与 @midwayjs/bull 保持一致,如果想要使用 jobName 则可以直接调用 queue.add + public async runJob(data: any, options?: JobsOptions): Promise { + const { repeat, ...OtherOptions } = options ?? {}; + if (repeat) { + return this.upsertJobScheduler(this.name, repeat, { + name: 'jobName', + data, + opts: OtherOptions, // additional job options + }); + } + return this.add('jobName', data || {}, options); + } + + public getQueueName(): string { + return this.name; + } +} + +@Framework() +export class BullMQFramework + extends BaseFramework + implements IQueueManager +{ + private connection: ConnectionOptions; + private prefix?: string; + private defaultQueueConfig: IQueueOptions; + private defaultWorkerConfig: IWorkerOptions; + private clearRepeatJobWhenStart: boolean; + private queueMap: Map = new Map(); + /** keep a map of workers for gracefully shutdown */ + private workerMap: Map = new Map(); + + async applicationInitialize(options: IMidwayBootstrapOptions) { + this.app = {} as any; + } + + public loadConfig() { + this.connection = this.configService.getConfiguration('bullmq.connection'); + this.prefix = this.configService.getConfiguration('bullmq.prefix'); + this.defaultQueueConfig = this.configService.getConfiguration( + 'bullmq.defaultQueueOptions' + ); + this.defaultWorkerConfig = this.configService.getConfiguration( + 'bullmq.defaultWorkerOptions' + ); + this.clearRepeatJobWhenStart = this.configService.getConfiguration( + 'bullmq.clearRepeatJobWhenStart' + ); + } + + configure() { + return this.configService.getConfiguration('bullmq'); + } + + getFrameworkName(): string { + return 'bullmq'; + } + + async run() { + const processorModules = listModule(BULLMQ_PROCESSOR_KEY); + for (const mod of processorModules) { + const options = getClassMetadata(BULLMQ_PROCESSOR_KEY, mod) as { + queueName: string; + jobOptions?: JobsOptions; + queueOptions?: IQueueOptions; + workerOptions?: IWorkerOptions; + }; + const { repeat, ...otherOptions } = options.jobOptions ?? {}; + const queueOptions = options.queueOptions ?? {}; + const currentQueue = this.ensureQueue(options.queueName, { + ...queueOptions, + defaultJobOptions: otherOptions, + }); + if (!currentQueue) throw Error('ensureQueue failed'); + // clear old repeat job when start + if (this.clearRepeatJobWhenStart) { + const jobs = await currentQueue.getJobSchedulers(); + for (const job of jobs) { + await currentQueue.removeJobScheduler(job.key); + } + // Repeatable in jobOptions is depecrate + const repeatableJobs = await currentQueue.getRepeatableJobs(); + for (const job of repeatableJobs) { + await currentQueue.removeRepeatableByKey(job.key); + } + } + + await this.addProcessor(mod, options.queueName, options.workerOptions); + if (repeat) { + await this.runJob(options.queueName, {}, options.jobOptions); + } + } + } + + protected async beforeStop() { + // loop queueMap and stop all queue + for (const queue of this.queueMap.values()) { + await queue.close(); + } + for (const worker of this.workerMap.values()) { + await worker.close(); + } + } + + public createQueue(name: string, queueOptions: IQueueOptions) { + const queue = new BullMQQueue( + name, + extend(true, {}, this.defaultQueueConfig, queueOptions, { + connection: this.connection, + prefix: this.prefix, + }) + ); + this.queueMap.set(name, queue); + queue.on('error', err => { + this.app.getCoreLogger().error(err); + }); + return queue; + } + + public getQueue(name: string) { + return this.queueMap.get(name); + } + + public ensureQueue(name: string, queueOptions: IQueueOptions) { + if (!this.queueMap.has(name)) { + this.createQueue(name, queueOptions); + } + return this.queueMap.get(name); + } + + public getQueueList() { + return Array.from(this.queueMap.values()); + } + + public getWorker(name: string) { + return this.workerMap.get(name); + } + + public async addProcessor( + processor: new (...args) => IProcessor, + queueName: string, + workerOptions?: IWorkerOptions + ) { + const queue = this.queueMap.get(queueName); + if (!queue) throw Error(`queue not found ${queueName}`); + + const worker = new Worker( + queueName, + async (job: Job) => { + const ctx = this.app.createAnonymousContext({ + jobId: job.id, + job, + from: processor, + }); + + try { + ctx.logger.info(`start process job ${job.id} from ${processor.name}`); + + const isPassed = await this.app + .getFramework() + .runGuard(ctx, processor, 'execute'); + if (!isPassed) { + throw new MidwayInvokeForbiddenError('execute', processor); + } + + const service = await ctx.requestContext.getAsync( + processor as any + ); + const fn = await this.applyMiddleware(async ctx => { + return await Utils.toAsyncFunction(service.execute.bind(service))( + job.data, + job + ); + }); + const result = await Promise.resolve(await fn(ctx)); + ctx.logger.info( + `complete process job ${job.id} from ${processor.name}` + ); + return result; + } catch (err) { + ctx.logger.error(err); + return Promise.reject(err); + } + }, + extend(true, {}, this.defaultWorkerConfig, workerOptions, { + connection: this.connection, + prefix: this.prefix, + }) + ); + + this.workerMap.set(queueName, worker); + } + + public async runJob( + queueName: string, + jobData: any, + options?: JobsOptions + ): Promise { + const queue = this.queueMap.get(queueName); + if (queue) { + return await queue.runJob(jobData, options); + } + } + + public async getJob( + queueName: string, + jobName: string + ): Promise { + const queue = this.queueMap.get(queueName); + if (queue) { + return queue.getJob(jobName); + } + } +} diff --git a/packages/bullmq/src/index.ts b/packages/bullmq/src/index.ts new file mode 100644 index 000000000000..b2cb92dbe887 --- /dev/null +++ b/packages/bullmq/src/index.ts @@ -0,0 +1,4 @@ +export { BullConfiguration as Configuration } from './configuration'; +export { BullMQFramework as Framework, BullMQQueue } from './framework'; +export * from './decorator'; +export * from './interface'; diff --git a/packages/bullmq/src/interface.ts b/packages/bullmq/src/interface.ts new file mode 100644 index 000000000000..536e32b99558 --- /dev/null +++ b/packages/bullmq/src/interface.ts @@ -0,0 +1,33 @@ +import { IMidwayApplication, IMidwayContext, NextFunction as BaseNextFunction } from '@midwayjs/core'; +import { WorkerOptions, QueueOptions, Job, Worker } from 'bullmq'; + +export interface IProcessor { + execute(data: any); +} + +export interface IQueue { + runJob(data: Record, options?: unknown): Promise; + getJob(name: string): Promise; + getQueueName(): string; +} + +export interface IQueueManager, Job> { + runJob(queueName: string, jobData: any, options?: unknown): Promise; + getJob(queueName: string, jobName: string): Promise; + createQueue(queueName: string, queueOptions?: unknown): Queue; + getQueue(queueName: string): Queue | undefined; + getWorker(queueName: string): Worker | undefined; +} + +export interface Application extends IMidwayApplication { } +export type NextFunction = BaseNextFunction; + +export interface Context extends IMidwayContext { + jobId: string; + job: Job; + from: new (...args) => IProcessor; +} + +export type IWorkerOptions = Omit +export type IQueueOptions = Omit + diff --git a/packages/bullmq/test/fixtures/base-app/package.json b/packages/bullmq/test/fixtures/base-app/package.json new file mode 100644 index 000000000000..621cdc6a4174 --- /dev/null +++ b/packages/bullmq/test/fixtures/base-app/package.json @@ -0,0 +1,3 @@ +{ + "name": "ali-demo" +} diff --git a/packages/bullmq/test/fixtures/base-app/src/configuration.ts b/packages/bullmq/test/fixtures/base-app/src/configuration.ts new file mode 100644 index 000000000000..14f19e62e09a --- /dev/null +++ b/packages/bullmq/test/fixtures/base-app/src/configuration.ts @@ -0,0 +1,14 @@ +import { Configuration } from '@midwayjs/core'; +import * as bullmq from '../../../../src'; + +@Configuration({ + imports: [ + bullmq + ], +}) +export class ContainerConfiguration { + + async onReady() { + + } +} diff --git a/packages/bullmq/test/fixtures/base-app/src/task/hello.task.ts b/packages/bullmq/test/fixtures/base-app/src/task/hello.task.ts new file mode 100644 index 000000000000..941c769ebf2d --- /dev/null +++ b/packages/bullmq/test/fixtures/base-app/src/task/hello.task.ts @@ -0,0 +1,19 @@ +import { FORMAT, App, Inject, IMidwayApplication } from '@midwayjs/core'; +import { Processor, IProcessor } from '../../../../../src'; + +@Processor('HelloTask', { + repeat: { + pattern: FORMAT.CRONTAB.EVERY_PER_5_SECOND + } +}) +export class HelloTask implements IProcessor { + @App() + app: IMidwayApplication; + + @Inject() + logger; + + async execute() { + this.app.setAttr(`task`, 'task'); + } +} diff --git a/packages/bullmq/test/fixtures/base-app/src/task/limit.task.ts b/packages/bullmq/test/fixtures/base-app/src/task/limit.task.ts new file mode 100644 index 000000000000..119a5eb9ee75 --- /dev/null +++ b/packages/bullmq/test/fixtures/base-app/src/task/limit.task.ts @@ -0,0 +1,15 @@ +import { App, sleep, Inject } from '@midwayjs/core'; +import { Processor, Application } from '../../../../../src'; + +@Processor('concurrency', {}, { limiter: { max: 3, duration: 1000 }, concurrency: 3 }, {}) +export class QueueTask { + @App() + app: Application; + + @Inject() + logger; + + async execute(params) { + await sleep(3 * 1000); + } +} diff --git a/packages/bullmq/test/fixtures/base-app/src/task/queue.task.ts b/packages/bullmq/test/fixtures/base-app/src/task/queue.task.ts new file mode 100644 index 000000000000..d56c1270b8d4 --- /dev/null +++ b/packages/bullmq/test/fixtures/base-app/src/task/queue.task.ts @@ -0,0 +1,16 @@ +import { App, Inject } from '@midwayjs/core'; +import { Processor, Application } from '../../../../../src'; + +@Processor('test') +export class QueueTask { + @App() + app: Application; + + @Inject() + logger; + + async execute(params) { + this.logger.info(`====>QueueTask execute`); + this.app.setAttr(`queueConfig`, JSON.stringify(params)); + } +} diff --git a/packages/bullmq/test/index.test.ts b/packages/bullmq/test/index.test.ts new file mode 100644 index 000000000000..d8e09ea521f3 --- /dev/null +++ b/packages/bullmq/test/index.test.ts @@ -0,0 +1,47 @@ + +import { createApp, close } from '@midwayjs/mock'; +import { join } from 'path'; +import { sleep } from '@midwayjs/core'; +import * as bullmq from '../src'; +import expect from 'expect'; + + +describe(`/test/index.test.ts`, () => { + it('test auto repeat processor', async () => { + const app = await createApp(join(__dirname, 'fixtures', 'base-app'), {}, bullmq); + + await sleep(5 * 1000); + let res = app.getAttr(`task`); + expect(res).toEqual(`task`); + + // run job + const bullFramework = app.getApplicationContext().get(bullmq.Framework); + expect(bullFramework.getCoreLogger()).toBeDefined(); + const testQueue = bullFramework.getQueue('test'); + expect(testQueue).toBeDefined(); + + const params = { + name: 'stone-jin', + }; + const job = await testQueue?.runJob(params, { delay: 1000 }); + expect(await job?.getState()).toEqual('delayed'); + await sleep(1200); + expect(app.getAttr(`queueConfig`)).toBe(JSON.stringify(params)); + expect(await job?.getState()).toEqual('completed'); + + const concurrencyQueue = bullFramework.getQueue('concurrency'); + await concurrencyQueue.setGlobalConcurrency(2); + for (let index = 0; index < 6; index++) { + concurrencyQueue.runJob(index); + } + await sleep(1 * 1000); + expect((await concurrencyQueue.getJobCounts()).active).toEqual(2); + await concurrencyQueue.setGlobalConcurrency(4); + await sleep(4 * 1000); + expect((await concurrencyQueue.getJobCounts()).active).toEqual(3); + await sleep(3 * 1000); + await close(app); + }); + +}); + diff --git a/packages/bullmq/tsconfig.json b/packages/bullmq/tsconfig.json new file mode 100644 index 000000000000..324fe88c9b14 --- /dev/null +++ b/packages/bullmq/tsconfig.json @@ -0,0 +1,11 @@ +{ + "extends": "../../tsconfig.json", + "compileOnSave": true, + "compilerOptions": { + "rootDir": "src", + "outDir": "dist" + }, + "include": [ + "./src/**/*.ts" + ] +} diff --git a/packages/bullmq/typedoc.json b/packages/bullmq/typedoc.json new file mode 100644 index 000000000000..f593f276c273 --- /dev/null +++ b/packages/bullmq/typedoc.json @@ -0,0 +1,4 @@ +{ + "extends": ["../../typedoc.base.json"], + "entryPoints": ["src/index.ts"] +} From 7e2178fa6bb5646a04c27c5124e65774f90cf1cc Mon Sep 17 00:00:00 2001 From: gqc Date: Mon, 6 Jan 2025 17:20:39 +0800 Subject: [PATCH 2/2] fix: use bullmq-logger --- packages/bullmq/src/config/config.default.ts | 9 +++------ packages/bullmq/src/framework.ts | 7 ++++++- .../test/fixtures/base-app/src/configuration.ts | 12 ++++++++++++ 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/packages/bullmq/src/config/config.default.ts b/packages/bullmq/src/config/config.default.ts index 61578cde32fb..96607182407a 100644 --- a/packages/bullmq/src/config/config.default.ts +++ b/packages/bullmq/src/config/config.default.ts @@ -1,9 +1,5 @@ export const bullmq = { prefix: '{midway-bullmq}', - connection: { - host: '127.0.0.1', - port: 6379, - }, defaultQueueOptions: { defaultJobOptions: { removeOnComplete: 3, @@ -14,6 +10,7 @@ export const bullmq = { concurrency: 1, }, clearRepeatJobWhenStart: true, + contextLoggerApplyLogger: 'bullMQLogger', contextLoggerFormat: info => { const { jobId, from } = info.ctx; return `${info.timestamp} ${info.LEVEL} ${info.pid} [${jobId} ${from.name}] ${info.message}`; @@ -22,8 +19,8 @@ export const bullmq = { export const midwayLogger = { clients: { - bullLogger: { - fileLogName: 'midway-bull.log', + bullMQLogger: { + fileLogName: 'midway-bullmq.log', }, }, }; diff --git a/packages/bullmq/src/framework.ts b/packages/bullmq/src/framework.ts index 5e7beee02d3d..3538acb259b1 100644 --- a/packages/bullmq/src/framework.ts +++ b/packages/bullmq/src/framework.ts @@ -7,6 +7,8 @@ import { listModule, Utils, MidwayInvokeForbiddenError, + Logger, + ILogger, } from '@midwayjs/core'; import { Application, @@ -68,6 +70,9 @@ export class BullMQFramework /** keep a map of workers for gracefully shutdown */ private workerMap: Map = new Map(); + @Logger('bullMQLogger') + protected bullMQLogger: ILogger; + async applicationInitialize(options: IMidwayBootstrapOptions) { this.app = {} as any; } @@ -150,7 +155,7 @@ export class BullMQFramework ); this.queueMap.set(name, queue); queue.on('error', err => { - this.app.getCoreLogger().error(err); + this.bullMQLogger.error(err); }); return queue; } diff --git a/packages/bullmq/test/fixtures/base-app/src/configuration.ts b/packages/bullmq/test/fixtures/base-app/src/configuration.ts index 14f19e62e09a..624198619bf4 100644 --- a/packages/bullmq/test/fixtures/base-app/src/configuration.ts +++ b/packages/bullmq/test/fixtures/base-app/src/configuration.ts @@ -5,6 +5,18 @@ import * as bullmq from '../../../../src'; imports: [ bullmq ], + importConfigs: [ + { + default: { + bullmq: { + connection: { + host: '127.0.0.1', + port: 6379, + } + }, + }, + }, + ], }) export class ContainerConfiguration {