Skip to content

Commit

Permalink
feat(grpc): initialize default metadata, passing "rpc.method.type"
Browse files Browse the repository at this point in the history
key: rpc.method.type
value: unary|server|client
  • Loading branch information
waitingsong committed Nov 17, 2024
1 parent 437e92d commit 57fa626
Show file tree
Hide file tree
Showing 12 changed files with 313 additions and 14 deletions.
34 changes: 32 additions & 2 deletions packages/grpc/src/comsumer/clients.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
Utils,
ILogger,
} from '@midwayjs/core';
import { credentials, loadPackageDefinition } from '@grpc/grpc-js';
import { credentials, loadPackageDefinition, Metadata } from '@grpc/grpc-js';
import {
DefaultConfig,
IClientOptions,
Expand Down Expand Up @@ -71,6 +71,15 @@ export class GRPCClients extends Map {
connectionService[methodName] = (
clientOptions: IClientOptions = {}
) => {
const meta = new Metadata();

if (clientOptions.metadata) {
meta.merge(clientOptions.metadata);
clientOptions.metadata = meta;
} else {
clientOptions.metadata = meta;
}

return this.getClientRequestImpl(
connectionService,
originMethod,
Expand All @@ -89,35 +98,56 @@ export class GRPCClients extends Map {
return this.get(serviceName);
}

getClientRequestImpl(client, originalFunction, options = {}) {
getClientRequestImpl(client, originalFunction, options: IClientOptions = {}) {
const genericFunctionSelector =
(originalFunction.requestStream ? 2 : 0) |
(originalFunction.responseStream ? 1 : 0);

const meta = new Metadata();
if (options.metadata) {
meta.merge(options.metadata);
options.metadata = meta;
} else {
options.metadata = meta;
}

let genericFunctionName;
const rpcMethod = options.metadata.get('rpc.method.type')?.[0];
switch (genericFunctionSelector) {
case 0:
if (!rpcMethod) {
options.metadata.set('rpc.method.type', 'unary');
}
genericFunctionName = new ClientUnaryRequest(
client,
originalFunction,
options
);
break;
case 1:
if (!rpcMethod) {
options.metadata.set('rpc.method.type', 'server'); // server streaming
}
genericFunctionName = new ClientReadableRequest(
client,
originalFunction,
options
);
break;
case 2:
if (!rpcMethod) {
options.metadata.set('rpc.method.type', 'client'); // client streaming
}
genericFunctionName = new ClientWritableRequest(
client,
originalFunction,
options
);
break;
case 3:
if (!rpcMethod) {
options.metadata.set('rpc.method.type', 'bidi'); // bidirectional streaming
}
genericFunctionName = new ClientDuplexStreamRequest(
client,
originalFunction,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { join } from 'path';

export const grpcServer = {
url: 'localhost:6569',
services: [
{
protoPath: join(__dirname, '../../../proto/hello_world.proto'),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import * as assert from 'assert';
import { GrpcMethod, MSProviderType, Provider, Provide, Inject, Init } from '@midwayjs/core';
import { helloworld, hero2 } from '../interface';
import { Clients } from '../../../../../src';
import { Clients, Context } from '../../../../../src';

@Provide()
@Provider(MSProviderType.GRPC, { package: 'hero2' })
export class HeroService implements hero2.HeroService {

@Inject()
ctx: Context;

@Inject()
grpcClients: Clients;

Expand All @@ -18,6 +22,13 @@ export class HeroService implements hero2.HeroService {

@GrpcMethod()
async findOne(data) {
assert(this.ctx, 'should get context');
const { metadata } = this.ctx;
assert(metadata, 'should get metadata');

const rpcMethodType = metadata.get('rpc.method.type');
assert(rpcMethodType[0] === 'unary', `should get rpc.method.type, but got "${rpcMethodType[0]}"`);

const result = await this.greeterService.sayHello().sendMessage({
name: 'harry'
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { join } from 'path';
export const grpc = {
services: [
{
url: 'localhost:6565',
url: 'localhost:6575',
protoPath: join(__dirname, '../../../proto/helloworld.proto'),
package: 'helloworld',
clientOptions: {
Expand All @@ -14,6 +14,7 @@ export const grpc = {
}

export const grpcServer = {
url: 'localhost:6575',
services: [
{
protoPath: join(__dirname, '../../../proto/hero.proto'),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,6 @@ export namespace hero {
id?: number;
name?: string;
}

export interface HeroService2 {
findOne2(data: HeroById, metadata?: Metadata): Promise<Hero>;
}
export interface HeroService2Client {
findOne2(options?: IClientOptions): IClientUnaryService<HeroById, Hero>;
}
}

export namespace helloworld {
Expand Down
3 changes: 3 additions & 0 deletions packages/grpc/test/fixtures/base-app-stream-meta/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"name": "ali-demo"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { join } from 'path';

export const grpcServer = {
url: 'localhost:6571',
services: [
{
protoPath: join(__dirname, '../../../proto/math.proto'),
package: 'math',
}
],
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Configuration } from '@midwayjs/core';
import * as grpc from '../../../../src';
import { join } from 'path';

@Configuration({
imports: [
grpc
],
importConfigs: [
join(__dirname, './config'),
]
})
export class AutoConfiguration {
}
41 changes: 41 additions & 0 deletions packages/grpc/test/fixtures/base-app-stream-meta/src/interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import {
IClientDuplexStreamService,
IClientReadableStreamService,
IClientUnaryService,
IClientWritableStreamService
} from '../../../../src';

export namespace math {
export interface AddArgs {
id?: number;
num?: number;
}
export interface Num {
id?: number;
num?: number;
}

/**
* server interface
*/
export interface Math {
add(data: AddArgs): Promise<Num>;
addMore(data: AddArgs): Promise<void>;
// 服务端推,客户端读
sumMany(fibArgs: AddArgs): Promise<void>
// 客户端端推,服务端读
addMany(num: AddArgs): Promise<void>;
}

/**
* client interface
*/
export interface MathClient {
add(): IClientUnaryService<AddArgs, Num>;
addMore(): IClientDuplexStreamService<AddArgs, Num>;
// 服务端推,客户端读
sumMany(): IClientReadableStreamService<AddArgs, Num>;
// 客户端端推,服务端读
addMany(): IClientWritableStreamService<AddArgs, Num>;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import * as assert from 'assert';
import { GrpcMethod, GrpcStreamTypeEnum, Inject, MSProviderType, Provide, Provider } from '@midwayjs/core';
import { Context } from '../../../../../src';
import { math } from '../interface';
import { Metadata } from '@grpc/grpc-js';

/**
*/
@Provide()
@Provider(MSProviderType.GRPC, { package: 'math' })
export class Math implements math.Math {

@Inject()
ctx: Context;

sumDataList = [];

@GrpcMethod()
async add(data: math.AddArgs): Promise<math.Num> {
assert(this.ctx, 'should get context');
const { metadata } = this.ctx;
assert(metadata, 'should get metadata');

const rpcMethodType = metadata.get('rpc.method.type');
assert(rpcMethodType[0] === 'unary', `should get rpc.method.type, but got "${rpcMethodType[0]}"`);

return {
num: data.num + 2,
}
}

@GrpcMethod({type: GrpcStreamTypeEnum.DUPLEX, onEnd: 'duplexEnd' })
async addMore(message: math.AddArgs) {
const { metadata } = this.ctx;
assert(metadata, 'should get metadata');

const rpcMethodType = metadata.get('rpc.method.type');
assert(rpcMethodType[0] === 'bidi', `should get rpc.method.type, but got "${rpcMethodType[0]}"`);

this.ctx.write({
id: message.id,
num: message.num + 10,
});
}

async duplexEnd() {
console.log('got client end message');
}

@GrpcMethod({type: GrpcStreamTypeEnum.WRITEABLE })
async sumMany(args: math.AddArgs) {
const { metadata } = this.ctx;
assert(metadata, 'should get metadata');

const rpcMethodType = metadata.get('rpc.method.type');
assert(rpcMethodType[0] === 'server', `should get rpc.method.type, but got "${rpcMethodType[0]}"`);

this.ctx.write({
num: 1 + args.num
});
this.ctx.write({
num: 2 + args.num
});
this.ctx.write({
num: 3 + args.num
});

const meta = new Metadata();
meta.add('xxx', 'bbb');

this.ctx.sendMetadata(meta);
this.ctx.end();
}

@GrpcMethod({type: GrpcStreamTypeEnum.READABLE, onEnd: 'sumEnd' })
async addMany(data: math.Num) {
const { metadata } = this.ctx;
assert(metadata, 'should get metadata');

const rpcMethodType = metadata.get('rpc.method.type');
assert(rpcMethodType[0] === 'client', `should get rpc.method.type, but got "${rpcMethodType[0]}"`);

this.sumDataList.push(data);
}

async sumEnd(): Promise<math.Num> {
const total = this.sumDataList.reduce((pre, cur) => {
return {
num: pre.num + cur.num,
}
});
return total;
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { join } from 'path';

export const grpcServer = {
url: 'localhost:6568',
services: [
{
protoPath: join(__dirname, '../../../proto/math.proto'),
Expand Down
Loading

0 comments on commit 57fa626

Please sign in to comment.