Skip to content

Commit

Permalink
完善基本功能
Browse files Browse the repository at this point in the history
1.请求拦截,扩展时实现 ClientRequestInterceptor接口,默认仅有请求异步回调拦截扩展 RequestResponseFutureInterceptor
2.负载均衡,扩展时实现 LoadBalance接口,当前仅实现随机负载
3.集群支持,支持快速失败、故障转移
4.接口调用, 使用方式参见nrpc-example模块 nrpc-consumer
5.服务发现,远程服务上下线时,会动态改变其可调用的服务列表
6.启动步骤扩展,需要在启动过程中执行一些逻辑时可实现NrpcRunner接口
7.协议支持,lightning(自定义协议,其报文格式参见LightningServerMessageCodec、LightningClientMessageCodec编解码的实现)、rest(普通http请求,仅支持json报文)
8.修复部分漏洞
  • Loading branch information
icodening committed Apr 4, 2021
1 parent b414c28 commit 18a26d5
Show file tree
Hide file tree
Showing 117 changed files with 2,815 additions and 300 deletions.
5 changes: 5 additions & 0 deletions nrpc-common/nrpc-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>cn.icodening</groupId>
<artifactId>nrpc-transport-api</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package cn.icodening.rpc.common;

import cn.icodening.rpc.common.model.NrpcService;
import cn.icodening.rpc.core.LocalCache;
import cn.icodening.rpc.core.URL;
import cn.icodening.rpc.core.exchange.Response;
import cn.icodening.rpc.core.exchange.StandardResponse;
import cn.icodening.rpc.core.extension.ExtensionLoader;
import cn.icodening.rpc.core.util.ResponseFuture;
import cn.icodening.rpc.transport.Client;
import cn.icodening.rpc.transport.NrpcChannelHandler;
import cn.icodening.rpc.transport.Server;
import cn.icodening.rpc.transport.TransportFactory;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author icodening
* @date 2021.04.02
*/
public abstract class AbstractProtocol implements Protocol {

private final Map<String, Server> serverMap = new ConcurrentHashMap<>();

private final NrpcChannelHandler serverChannelHandler = (nrpcChannel, message) -> {
String targetClazzName = message.getHeader("targetClazz");
String methodName = message.getHeader("method");
Object data = message.getData();
List<Class<?>> classes = new ArrayList<>();
List<Object> objs = new ArrayList<>();
if (data instanceof JSONArray) {
JSONArray d = (JSONArray) data;
for (int i = 0; i < d.size(); i++) {
JSONObject o = d.getJSONObject(i);
o.forEach((k, v) -> {
try {
classes.add(Class.forName(k));
objs.add(v);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
});
}
}
LocalCache<String, NrpcService> service = ExtensionLoader.getExtensionLoader(LocalCache.class).getExtension("service");
NrpcService nrpcService = service.get(targetClazzName);
try {
Method method = nrpcService.getServiceInterface().getMethod(methodName, classes.toArray(new Class[0]));
Object invoke = method.invoke(nrpcService.getRef(), objs.toArray());
StandardResponse standardResponse = new StandardResponse();
standardResponse.setRequestId(message.getId());
standardResponse.setResult(invoke);
standardResponse.setHeaders(message.getHeaders());
nrpcChannel.call(standardResponse);
} catch (Exception e) {
e.printStackTrace();
}
};

private final NrpcChannelHandler clientChannelHandler = (nrpcChannel, message) -> {
LocalCache<Long, ResponseFuture> futureCache = ExtensionLoader.getExtensionLoader(LocalCache.class).getExtension("responseFutureCache");
long requestId = ((Response) message).getRequestId();
ResponseFuture responseFuture = futureCache.get(requestId);
responseFuture.setResponse((Response) message);
};

@Override
public Client refer(URL url) {
String clientType = url.getParameter("client", "netty4");
TransportFactory transportFactory = ExtensionLoader.getExtensionLoader(TransportFactory.class).getExtension(clientType);
return transportFactory.createClient(url, getClientCodec(), getClientHandler());
}


@Override
public Server export(URL url) {
String serverType = url.getParameter("server", "netty4");
Server server = serverMap.get(serverType);
if (server == null) {
synchronized (serverType.intern()) {
server = serverMap.get(serverType);
if (server == null) {
server = createServer(url);
serverMap.putIfAbsent(serverType, server);
}
}
}
return server;
}

protected Server createServer(URL url) {
String serverType = url.getParameter("server", "netty4");
TransportFactory transportFactory = ExtensionLoader.getExtensionLoader(TransportFactory.class).getExtension(serverType);
return transportFactory.createServer(url, getServerCodec(), getServerHandler());
}

protected NrpcChannelHandler getServerHandler() {
return serverChannelHandler;
}

protected NrpcChannelHandler getClientHandler() {
return clientChannelHandler;
}

}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package cn.icodening.rpc.common;

/**
* @author icodening
* @date 2021.03.28
*/
public interface Invocation {

String getMethodName();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package cn.icodening.rpc.common;

/**
* @author icodening
* @date 2021.03.28
*/
public interface Invoker {

void invoke(Invocation invocation);
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package cn.icodening.rpc.common;

import cn.icodening.rpc.common.codec.ClientCodec;
import cn.icodening.rpc.common.codec.ServerCodec;
import cn.icodening.rpc.core.URL;
import cn.icodening.rpc.core.codec.ClientCodec;
import cn.icodening.rpc.core.codec.ServerCodec;
import cn.icodening.rpc.core.extension.Extensible;
import cn.icodening.rpc.transport.Client;
import cn.icodening.rpc.transport.Server;

/**
* @author icodening
Expand All @@ -23,4 +26,9 @@ public interface Protocol {
ClientCodec getClientCodec();

ServerCodec getServerCodec();

Client refer(URL url);

Server export(URL url);

}

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package cn.icodening.rpc.common.model;

/**
* @author icodening
* @date 2021.03.21
*/
public class NrpcService {

private String name;

private String version;

private Object ref;

private Class<?> serviceInterface;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getVersion() {
return version;
}

public void setVersion(String version) {
this.version = version;
}

public Object getRef() {
return ref;
}

public void setRef(Object ref) {
this.ref = ref;
}

public Class<?> getServiceInterface() {
return serviceInterface;
}

public void setServiceInterface(Class<?> interfaceClass) {
this.serviceInterface = interfaceClass;
}
}
Loading

0 comments on commit 18a26d5

Please sign in to comment.