Skip to content

Commit

Permalink
feat: add package bullmq (#4257)
Browse files Browse the repository at this point in the history
* feat: package bullmq

* fix: use bullmq-logger

---------

Co-authored-by: gqc <[email protected]>
  • Loading branch information
harperKKK and gqc authored Jan 6, 2025
1 parent 5cea26b commit ddceb32
Show file tree
Hide file tree
Showing 22 changed files with 640 additions and 0 deletions.
15 changes: 15 additions & 0 deletions packages/bull/test/fixtures/base-app/src/task/limit.task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { App, Inject, sleep } from '@midwayjs/core';
import { Processor, Application } from '../../../../../src';

@Processor('limit',2,{},{limiter: { max: 3, duration: 1000 }})
export class QueueTask {
@App()
app: Application;

@Inject()
logger;

async execute(params) {
await sleep(3*1000)
}
}
1 change: 1 addition & 0 deletions packages/bullmq/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Change Log
9 changes: 9 additions & 0 deletions packages/bullmq/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# @midwayjs/bullmq

this is a sub package for midway.

Document: [https://midwayjs.org](https://midwayjs.org)

## License

[MIT]((https://github.com/midwayjs/midway/blob/master/LICENSE))
20 changes: 20 additions & 0 deletions packages/bullmq/index.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { ConnectionOptions } from 'bullmq';
export * from './dist/index';
export { Job } from 'bullmq';
import { IQueueOptions, IWorkerOptions } from './dist/index';

declare module '@midwayjs/core/dist/interface' {
// bullmq 新引入了 worker 作为执行任务的实例,一个队列 queue 和 worker 中 connection, prefix 必须一致才能正常执行
// 所以 config 中 connection, prefix 单独配置
// eslint-disable-next-line
interface MidwayConfig {
bullmq?: {
connection: ConnectionOptions;
prefix?: string;
defaultQueueOptions?: IQueueOptions;
defaultWorkerOptions?: IWorkerOptions;
clearRepeatJobWhenStart?: boolean;
contextLoggerFormat?: (info: any) => string;
};
}
}
8 changes: 8 additions & 0 deletions packages/bullmq/jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module.exports = {
preset: 'ts-jest',
testEnvironment: 'node',
testPathIgnorePatterns: ['<rootDir>/test/fixtures'],
coveragePathIgnorePatterns: ['<rootDir>/test/'],
setupFilesAfterEnv: ['./jest.setup.js'],
coverageProvider: 'v8',
};
2 changes: 2 additions & 0 deletions packages/bullmq/jest.setup.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
process.env.MIDWAY_TS_MODE = 'true';
jest.setTimeout(30000);
37 changes: 37 additions & 0 deletions packages/bullmq/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{
"name": "@midwayjs/bullmq",
"version": "1.0.0",
"description": "midway component for bullmq",
"main": "dist/index.js",
"typings": "index.d.ts",
"scripts": {
"build": "tsc",
"test": "node --require=ts-node/register ../../node_modules/.bin/jest --runInBand",
"cov": "node --require=ts-node/register ../../node_modules/.bin/jest --runInBand --coverage --forceExit"
},
"keywords": [
"midway",
"IoC",
"task",
"bullmq",
"plugin"
],
"author": "guo qicong",
"files": [
"dist/**/*.js",
"dist/**/*.d.ts",
"index.d.ts"
],
"license": "MIT",
"devDependencies": {
"@midwayjs/core": "^3.19.0",
"@midwayjs/mock": "^3.19.2"
},
"dependencies": {
"bullmq": "5.34.6"
},
"engines": {
"node": ">=12"
},
"repository": "https://github.com/midwayjs/midway.git"
}
26 changes: 26 additions & 0 deletions packages/bullmq/src/config/config.default.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
export const bullmq = {
prefix: '{midway-bullmq}',
defaultQueueOptions: {
defaultJobOptions: {
removeOnComplete: 3,
removeOnFail: 10,
},
},
defaultWorkerOptions: {
concurrency: 1,
},
clearRepeatJobWhenStart: true,
contextLoggerApplyLogger: 'bullMQLogger',
contextLoggerFormat: info => {
const { jobId, from } = info.ctx;
return `${info.timestamp} ${info.LEVEL} ${info.pid} [${jobId} ${from.name}] ${info.message}`;
},
};

export const midwayLogger = {
clients: {
bullMQLogger: {
fileLogName: 'midway-bullmq.log',
},
},
};
44 changes: 44 additions & 0 deletions packages/bullmq/src/configuration.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import {
Configuration,
Init,
Inject,
MidwayDecoratorService,
} from '@midwayjs/core';
import * as DefaultConfig from './config/config.default';
import { BullMQFramework } from './framework';
import { BULLMQ_QUEUE_KEY } from './constants';

@Configuration({
namespace: 'bullmq',
importConfigs: [
{
default: DefaultConfig,
},
],
})
export class BullConfiguration {
@Inject()
framework: BullMQFramework;

@Inject()
decoratorService: MidwayDecoratorService;

@Init()
async init() {
this.decoratorService.registerPropertyHandler(
BULLMQ_QUEUE_KEY,
(
propertyName,
meta: {
queueName: string;
}
) => {
return this.framework.getQueue(meta.queueName);
}
);
}

async onReady() {
this.framework.loadConfig();
}
}
3 changes: 3 additions & 0 deletions packages/bullmq/src/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// task
export const BULLMQ_QUEUE_KEY = 'bullmq:queue';
export const BULLMQ_PROCESSOR_KEY = 'bullmq:processor';
40 changes: 40 additions & 0 deletions packages/bullmq/src/decorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import {
createCustomPropertyDecorator,
Provide,
saveClassMetadata,
saveModule,
Scope,
ScopeEnum,
} from '@midwayjs/core';
import { IQueueOptions, IWorkerOptions } from './interface';
import { BULLMQ_PROCESSOR_KEY, BULLMQ_QUEUE_KEY } from './constants';
import { JobsOptions } from 'bullmq';

export function Processor(
queueName: string,
jobOptions?: JobsOptions,
workerOptions?: IWorkerOptions,
queueOptions?: IQueueOptions
): ClassDecorator {
return function (target: any) {
saveModule(BULLMQ_PROCESSOR_KEY, target);
saveClassMetadata(
BULLMQ_PROCESSOR_KEY,
{
queueName,
jobOptions,
queueOptions,
workerOptions,
},
target
);
Provide()(target);
Scope(ScopeEnum.Request)(target);
};
}

export function InjectQueue(queueName: string): PropertyDecorator {
return createCustomPropertyDecorator(BULLMQ_QUEUE_KEY, {
queueName,
});
}
Loading

0 comments on commit ddceb32

Please sign in to comment.