diff --git a/packages/grpc/src/comsumer/clients.ts b/packages/grpc/src/comsumer/clients.ts index 2650cfb3d1ea..1c16d309fa73 100644 --- a/packages/grpc/src/comsumer/clients.ts +++ b/packages/grpc/src/comsumer/clients.ts @@ -9,7 +9,7 @@ import { Utils, ILogger, } from '@midwayjs/core'; -import { credentials, loadPackageDefinition } from '@grpc/grpc-js'; +import { credentials, loadPackageDefinition, Metadata } from '@grpc/grpc-js'; import { DefaultConfig, IClientOptions, @@ -71,6 +71,15 @@ export class GRPCClients extends Map { connectionService[methodName] = ( clientOptions: IClientOptions = {} ) => { + const meta = new Metadata(); + + if (clientOptions.metadata) { + meta.merge(clientOptions.metadata); + clientOptions.metadata = meta; + } else { + clientOptions.metadata = meta; + } + return this.getClientRequestImpl( connectionService, originMethod, @@ -89,14 +98,26 @@ export class GRPCClients extends Map { return this.get(serviceName); } - getClientRequestImpl(client, originalFunction, options = {}) { + getClientRequestImpl(client, originalFunction, options: IClientOptions = {}) { const genericFunctionSelector = (originalFunction.requestStream ? 2 : 0) | (originalFunction.responseStream ? 1 : 0); + const meta = new Metadata(); + if (options.metadata) { + meta.merge(options.metadata); + options.metadata = meta; + } else { + options.metadata = meta; + } + let genericFunctionName; + const rpcMethod = options.metadata.get('rpc.method.type')?.[0]; switch (genericFunctionSelector) { case 0: + if (!rpcMethod) { + options.metadata.set('rpc.method.type', 'unary'); + } genericFunctionName = new ClientUnaryRequest( client, originalFunction, @@ -104,6 +125,9 @@ export class GRPCClients extends Map { ); break; case 1: + if (!rpcMethod) { + options.metadata.set('rpc.method.type', 'server'); // server streaming + } genericFunctionName = new ClientReadableRequest( client, originalFunction, @@ -111,6 +135,9 @@ export class GRPCClients extends Map { ); break; case 2: + if (!rpcMethod) { + options.metadata.set('rpc.method.type', 'client'); // client streaming + } genericFunctionName = new ClientWritableRequest( client, originalFunction, @@ -118,6 +145,9 @@ export class GRPCClients extends Map { ); break; case 3: + if (!rpcMethod) { + options.metadata.set('rpc.method.type', 'bidi'); // bidirectional streaming + } genericFunctionName = new ClientDuplexStreamRequest( client, originalFunction, diff --git a/packages/grpc/test/fixtures/base-app-multiple-package/src/config/config.default.ts b/packages/grpc/test/fixtures/base-app-multiple-package/src/config/config.default.ts index 0f3e18ac4090..10639f126369 100644 --- a/packages/grpc/test/fixtures/base-app-multiple-package/src/config/config.default.ts +++ b/packages/grpc/test/fixtures/base-app-multiple-package/src/config/config.default.ts @@ -1,6 +1,7 @@ import { join } from 'path'; export const grpcServer = { + url: 'localhost:6569', services: [ { protoPath: join(__dirname, '../../../proto/hello_world.proto'), diff --git a/packages/grpc/test/fixtures/base-app-multiple-service-2/src/provider/hero.ts b/packages/grpc/test/fixtures/base-app-multiple-service-2/src/provider/hero.ts index 4f69aebf19db..9413f61a348f 100644 --- a/packages/grpc/test/fixtures/base-app-multiple-service-2/src/provider/hero.ts +++ b/packages/grpc/test/fixtures/base-app-multiple-service-2/src/provider/hero.ts @@ -1,11 +1,15 @@ +import * as assert from 'assert'; import { GrpcMethod, MSProviderType, Provider, Provide, Inject, Init } from '@midwayjs/core'; import { helloworld, hero2 } from '../interface'; -import { Clients } from '../../../../../src'; +import { Clients, Context } from '../../../../../src'; @Provide() @Provider(MSProviderType.GRPC, { package: 'hero2' }) export class HeroService implements hero2.HeroService { + @Inject() + ctx: Context; + @Inject() grpcClients: Clients; @@ -18,6 +22,13 @@ export class HeroService implements hero2.HeroService { @GrpcMethod() async findOne(data) { + assert(this.ctx, 'should get context'); + const { metadata } = this.ctx; + assert(metadata, 'should get metadata'); + + const rpcMethodType = metadata.get('rpc.method.type'); + assert(rpcMethodType[0] === 'unary', `should get rpc.method.type, but got "${rpcMethodType[0]}"`); + const result = await this.greeterService.sayHello().sendMessage({ name: 'harry' }); diff --git a/packages/grpc/test/fixtures/base-app-multiple-service/src/config/config.default.ts b/packages/grpc/test/fixtures/base-app-multiple-service/src/config/config.default.ts index 5cd17aecf97c..06ae496b42a7 100644 --- a/packages/grpc/test/fixtures/base-app-multiple-service/src/config/config.default.ts +++ b/packages/grpc/test/fixtures/base-app-multiple-service/src/config/config.default.ts @@ -3,7 +3,7 @@ import { join } from 'path'; export const grpc = { services: [ { - url: 'localhost:6565', + url: 'localhost:6575', protoPath: join(__dirname, '../../../proto/helloworld.proto'), package: 'helloworld', clientOptions: { @@ -14,6 +14,7 @@ export const grpc = { } export const grpcServer = { + url: 'localhost:6575', services: [ { protoPath: join(__dirname, '../../../proto/hero.proto'), diff --git a/packages/grpc/test/fixtures/base-app-multiple-service/src/interface.ts b/packages/grpc/test/fixtures/base-app-multiple-service/src/interface.ts index 227c91ccba6a..0aab159a6788 100644 --- a/packages/grpc/test/fixtures/base-app-multiple-service/src/interface.ts +++ b/packages/grpc/test/fixtures/base-app-multiple-service/src/interface.ts @@ -15,13 +15,6 @@ export namespace hero { id?: number; name?: string; } - - export interface HeroService2 { - findOne2(data: HeroById, metadata?: Metadata): Promise; - } - export interface HeroService2Client { - findOne2(options?: IClientOptions): IClientUnaryService; - } } export namespace helloworld { diff --git a/packages/grpc/test/fixtures/base-app-stream-meta/package.json b/packages/grpc/test/fixtures/base-app-stream-meta/package.json new file mode 100644 index 000000000000..621cdc6a4174 --- /dev/null +++ b/packages/grpc/test/fixtures/base-app-stream-meta/package.json @@ -0,0 +1,3 @@ +{ + "name": "ali-demo" +} diff --git a/packages/grpc/test/fixtures/base-app-stream-meta/src/config/config.default.ts b/packages/grpc/test/fixtures/base-app-stream-meta/src/config/config.default.ts new file mode 100644 index 000000000000..498f37c2c253 --- /dev/null +++ b/packages/grpc/test/fixtures/base-app-stream-meta/src/config/config.default.ts @@ -0,0 +1,11 @@ +import { join } from 'path'; + +export const grpcServer = { + url: 'localhost:6571', + services: [ + { + protoPath: join(__dirname, '../../../proto/math.proto'), + package: 'math', + } + ], +} diff --git a/packages/grpc/test/fixtures/base-app-stream-meta/src/configuration.ts b/packages/grpc/test/fixtures/base-app-stream-meta/src/configuration.ts new file mode 100644 index 000000000000..3067a0e8cf81 --- /dev/null +++ b/packages/grpc/test/fixtures/base-app-stream-meta/src/configuration.ts @@ -0,0 +1,14 @@ +import { Configuration } from '@midwayjs/core'; +import * as grpc from '../../../../src'; +import { join } from 'path'; + +@Configuration({ + imports: [ + grpc + ], + importConfigs: [ + join(__dirname, './config'), + ] +}) +export class AutoConfiguration { +} diff --git a/packages/grpc/test/fixtures/base-app-stream-meta/src/interface.ts b/packages/grpc/test/fixtures/base-app-stream-meta/src/interface.ts new file mode 100644 index 000000000000..8ad8ae763091 --- /dev/null +++ b/packages/grpc/test/fixtures/base-app-stream-meta/src/interface.ts @@ -0,0 +1,41 @@ +import { + IClientDuplexStreamService, + IClientReadableStreamService, + IClientUnaryService, + IClientWritableStreamService +} from '../../../../src'; + +export namespace math { + export interface AddArgs { + id?: number; + num?: number; + } + export interface Num { + id?: number; + num?: number; + } + + /** + * server interface + */ + export interface Math { + add(data: AddArgs): Promise; + addMore(data: AddArgs): Promise; + // 服务端推,客户端读 + sumMany(fibArgs: AddArgs): Promise + // 客户端端推,服务端读 + addMany(num: AddArgs): Promise; + } + + /** + * client interface + */ + export interface MathClient { + add(): IClientUnaryService; + addMore(): IClientDuplexStreamService; + // 服务端推,客户端读 + sumMany(): IClientReadableStreamService; + // 客户端端推,服务端读 + addMany(): IClientWritableStreamService; + } +} diff --git a/packages/grpc/test/fixtures/base-app-stream-meta/src/provider/math.ts b/packages/grpc/test/fixtures/base-app-stream-meta/src/provider/math.ts new file mode 100644 index 000000000000..fc6bbee277c0 --- /dev/null +++ b/packages/grpc/test/fixtures/base-app-stream-meta/src/provider/math.ts @@ -0,0 +1,95 @@ +import * as assert from 'assert'; +import { GrpcMethod, GrpcStreamTypeEnum, Inject, MSProviderType, Provide, Provider } from '@midwayjs/core'; +import { Context } from '../../../../../src'; +import { math } from '../interface'; +import { Metadata } from '@grpc/grpc-js'; + +/** + */ +@Provide() +@Provider(MSProviderType.GRPC, { package: 'math' }) +export class Math implements math.Math { + + @Inject() + ctx: Context; + + sumDataList = []; + + @GrpcMethod() + async add(data: math.AddArgs): Promise { + assert(this.ctx, 'should get context'); + const { metadata } = this.ctx; + assert(metadata, 'should get metadata'); + + const rpcMethodType = metadata.get('rpc.method.type'); + assert(rpcMethodType[0] === 'unary', `should get rpc.method.type, but got "${rpcMethodType[0]}"`); + + return { + num: data.num + 2, + } + } + + @GrpcMethod({type: GrpcStreamTypeEnum.DUPLEX, onEnd: 'duplexEnd' }) + async addMore(message: math.AddArgs) { + const { metadata } = this.ctx; + assert(metadata, 'should get metadata'); + + const rpcMethodType = metadata.get('rpc.method.type'); + assert(rpcMethodType[0] === 'bidi', `should get rpc.method.type, but got "${rpcMethodType[0]}"`); + + this.ctx.write({ + id: message.id, + num: message.num + 10, + }); + } + + async duplexEnd() { + console.log('got client end message'); + } + + @GrpcMethod({type: GrpcStreamTypeEnum.WRITEABLE }) + async sumMany(args: math.AddArgs) { + const { metadata } = this.ctx; + assert(metadata, 'should get metadata'); + + const rpcMethodType = metadata.get('rpc.method.type'); + assert(rpcMethodType[0] === 'server', `should get rpc.method.type, but got "${rpcMethodType[0]}"`); + + this.ctx.write({ + num: 1 + args.num + }); + this.ctx.write({ + num: 2 + args.num + }); + this.ctx.write({ + num: 3 + args.num + }); + + const meta = new Metadata(); + meta.add('xxx', 'bbb'); + + this.ctx.sendMetadata(meta); + this.ctx.end(); + } + + @GrpcMethod({type: GrpcStreamTypeEnum.READABLE, onEnd: 'sumEnd' }) + async addMany(data: math.Num) { + const { metadata } = this.ctx; + assert(metadata, 'should get metadata'); + + const rpcMethodType = metadata.get('rpc.method.type'); + assert(rpcMethodType[0] === 'client', `should get rpc.method.type, but got "${rpcMethodType[0]}"`); + + this.sumDataList.push(data); + } + + async sumEnd(): Promise { + const total = this.sumDataList.reduce((pre, cur) => { + return { + num: pre.num + cur.num, + } + }); + return total; + } + +} diff --git a/packages/grpc/test/fixtures/base-app-stream/src/config/config.default.ts b/packages/grpc/test/fixtures/base-app-stream/src/config/config.default.ts index 6287c2287dfb..16f99a3fbed6 100644 --- a/packages/grpc/test/fixtures/base-app-stream/src/config/config.default.ts +++ b/packages/grpc/test/fixtures/base-app-stream/src/config/config.default.ts @@ -1,6 +1,7 @@ import { join } from 'path'; export const grpcServer = { + url: 'localhost:6568', services: [ { protoPath: join(__dirname, '../../../proto/math.proto'), diff --git a/packages/grpc/test/index.test.ts b/packages/grpc/test/index.test.ts index db3addeb0ab4..9cbfcc9bd025 100644 --- a/packages/grpc/test/index.test.ts +++ b/packages/grpc/test/index.test.ts @@ -126,7 +126,7 @@ describe('/test/index.test.ts', function () { const opts = { package: 'hero', protoPath: join(__dirname, 'fixtures/proto/hero.proto'), - url: 'localhost:6565' + url: 'localhost:6575' } const service = await createGRPCConsumer({ ...opts, }); const result = await service.findOne().sendMessage({ id: 123 }); @@ -162,7 +162,105 @@ describe('/test/index.test.ts', function () { const service = await createGRPCConsumer({ package: 'math', protoPath: join(__dirname, 'fixtures/proto/math.proto'), - url: 'localhost:6565' + url: 'localhost:6568' + }); + + // 使用发送消息的写法 + let result1 = await service.add().sendMessage({ + num: 2, + }); + + expect(result1.num).toEqual(4); + + // 服务端推送 + let total = 0; + let result2 = await service.sumMany().sendMessage({ + num: 1, + }); + + result2.forEach(data => { + total += data.num; + }); + + expect(total).toEqual(9); + + // 客户端推送 + const data = await service.addMany() + .sendMessage({num: 1}) + .sendMessage({num: 2}) + .sendMessage({num: 3}) + .end(); + + expect(data.num).toEqual(6); + + // 双向流 + const result3= await new Promise((resolve, reject) => { + const duplexCall = service.addMore().getCall(); + total = 0; + let idx = 0; + + duplexCall.on('data', (data: math.Num) => { + total += data.num; + idx++; + if (idx === 2) { + duplexCall.end(); + resolve(total); + } + }); + + duplexCall.write({ + num: 3, + }); + + duplexCall.write({ + num: 6, + }); + }); + + expect(result3).toEqual(29); + + + // 保证顺序的双向流 + const t = service.addMore({ + messageKey: 'id' + }); + + const result4 = await new Promise((resolve, reject) => { + total = 0; + t.sendMessage({ + num: 2 + }) + .then(res => { + expect(res.num).toEqual(12); + total += res.num; + }) + .catch(err => console.error(err)) + ; + t.sendMessage({ + num: 5 + }) + .then(res => { + expect(res.num).toEqual(15); + total += res.num; + resolve(total); + }) + .catch(err => console.error(err)) + ; + t.end(); + }); + + expect(result4).toEqual(27); + + await closeApp(app); + }); + + it('should support publish stream metadata gRPC server', async () => { + const app = await createServer('base-app-stream-meta'); + + const service = await createGRPCConsumer({ + package: 'math', + protoPath: join(__dirname, 'fixtures/proto/math.proto'), + url: 'localhost:6571' }); // 使用发送消息的写法 @@ -260,7 +358,7 @@ describe('/test/index.test.ts', function () { const service = await createGRPCConsumer({ package: 'hello.world', protoPath: join(__dirname, 'fixtures/proto/hello_world.proto'), - url: 'localhost:6565' + url: 'localhost:6569' }); const result = await service.sayHello().sendMessage({