Skip to content

Commit

Permalink
移除Reference初始化前读取缓存的动作
Browse files Browse the repository at this point in the history
添加友好无服务与无引用的友好提示
  • Loading branch information
icodening committed Apr 5, 2021
1 parent 9dcf7b8 commit 538cc85
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,14 @@
import cn.icodening.rpc.core.URL;
import cn.icodening.rpc.core.boot.AbstractBootAdapter;
import cn.icodening.rpc.core.extension.ExtensionLoader;
import cn.icodening.rpc.core.util.FileUtil;
import cn.icodening.rpc.core.util.MessageManager;
import cn.icodening.rpc.registry.NotifyListener;
import cn.icodening.rpc.registry.Registry;
import cn.icodening.rpc.registry.RegistryFactory;
import cn.icodening.rpc.registry.RegistryKeyConstant;
import cn.icodening.rpc.transport.Client;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.log4j.Logger;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -47,49 +42,12 @@ public class ReferenceConfigRunner extends AbstractBootAdapter implements NrpcRu

private static final Logger LOGGER = Logger.getLogger(ReferenceConfigRunner.class);

private static final String REFERENCE_FILE_NAME = "cache/reference_cache.nrpc";

private static final String REFERENCE_FILE_STATUS_OPEN = "OPEN";
private static final String REFERENCE_FILE_STATUS_CLOSE = "CLOSE";

@Override
@SuppressWarnings("unchecked")
protected void doStart() {
//读取缓存
JSONObject cache = loadCache();
if (cache != null) {
String status = cache.getString("status");
if (REFERENCE_FILE_STATUS_OPEN.equals(status)) {
//说明是非正常停机,读取消费者配置,后续订阅读到的消费者
String cacheString = cache.getString("references");
List<ReferenceConfig> referenceConfigs = JSON.parseArray(cacheString, ReferenceConfig.class);
List<RegistryConfig> registryConfigs = NrpcBootstrap.getInstance().getRegistryConfigs();
if (referenceConfigs != null && !referenceConfigs.isEmpty()) {
for (ReferenceConfig referenceConfig : referenceConfigs) {
for (RegistryConfig registryConfig : registryConfigs) {
URL registryConfigUrl = registryConfig.getUrl();
RegistryFactory factory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(registryConfigUrl.getProtocol());
Registry registry = factory.getRegistry(registryConfigUrl);
URL url = new URL();
url.addParameter(RegistryKeyConstant.SERVICE, referenceConfig.getServiceName());
registry.subscribe(url, new NotifyListener() {
@Override
public void onNotify(List<URL> urls) {
System.out.println("ReferenceConfigRunner notify");
for (URL url1 : urls) {
System.out.println(url1);
}
}
});
}
}
}
}
}
//合并配置信息
//TODO
List<ReferenceConfig> referenceConfigs = NrpcBootstrap.getInstance().getReferenceConfigs();
if (referenceConfigs == null || referenceConfigs.isEmpty()) {
LOGGER.info(MessageManager.get("no.reference"));
return;
}
for (ReferenceConfig referenceConfig : referenceConfigs) {
Expand Down Expand Up @@ -121,7 +79,7 @@ public void onNotify(List<URL> urls) {
Method setClient = ClientDelegateInvoker.class.getMethod("setClient", Client.class);
setClient.invoke(AopUtil.getProxyTarget(client), refer);
} catch (Exception e) {
e.printStackTrace();
LOGGER.warn(e);
}
refer.initialize();
refer.start();
Expand Down Expand Up @@ -181,7 +139,7 @@ public void onNotify(List<URL> urls) {
Method setClient = ClientDelegateInvoker.class.getMethod("setClient", Client.class);
setClient.invoke(AopUtil.getProxyTarget(client), refer);
} catch (Exception e) {
e.printStackTrace();
LOGGER.warn(e);
}
refer.initialize();
refer.start();
Expand All @@ -197,19 +155,6 @@ public void onNotify(List<URL> urls) {
}
}

private JSONObject loadCache() {
InputStream inputStream = FileUtil.getInputStream(new File(REFERENCE_FILE_NAME));
try {
if (inputStream == null) {
return null;
}
return JSON.parseObject(inputStream, JSONObject.class);
} catch (IOException e) {
LOGGER.warn("load cache fail !", e);
}
return null;
}

@Override
public int getPriority() {
return REFERENCE_PRIORITY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import cn.icodening.rpc.core.URL;
import cn.icodening.rpc.core.boot.AbstractBootAdapter;
import cn.icodening.rpc.core.extension.ExtensionLoader;
import cn.icodening.rpc.core.util.MessageManager;
import cn.icodening.rpc.transport.Server;
import org.apache.log4j.Logger;

import java.util.List;
import java.util.Map;
Expand All @@ -22,12 +24,15 @@ public class ServiceExportRunner extends AbstractBootAdapter implements NrpcRunn

private static final String DEFAULT_LOCAL_SERVICE_CACHE_KEY = "service";

private static final Logger LOGGER = Logger.getLogger(ServiceExportRunner.class);

@Override
@SuppressWarnings("unchecked")
protected void doStart() {
NrpcBootstrap instance = NrpcBootstrap.getInstance();
List<ServiceConfig> serviceConfigs = instance.getServiceConfigs();
if (serviceConfigs.isEmpty()) {
LOGGER.info(MessageManager.get("no.service"));
return;
}
String localIp = System.getProperty("local.ip");
Expand Down
4 changes: 3 additions & 1 deletion nrpc-config/src/main/resources/message.properties
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
no.available.protocol=No available protocol
no.available.protocol=No available protocol
no.reference=No reference
no.service=No service
4 changes: 3 additions & 1 deletion nrpc-config/src/main/resources/message_zh_CN.properties
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
no.available.protocol=无可用协议
no.available.protocol=无可用协议
no.reference=无远程引用
no.service=无服务
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,22 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
request.setData(argsList);
cluster.invoke(request);
ResponseFuture responseFuture = futureCache.get(request.getId());
futureCache.remove(request.getId());
if (responseFuture == null) {
return null;
}
//FIXME 当服务器突然断开后,此处会被阻塞直到超时
Response response = responseFuture.get(5, TimeUnit.SECONDS);
if (response == null) {
return null;
}
Object result = response.getResult();
if (result instanceof JSONObject) {
return JSON.parseObject(((JSONObject) result).toJSONString(), method.getReturnType());
Object result;
try {
//FIXME 当服务器突然断开后,此处会被阻塞直到超时
Response response = responseFuture.get(5, TimeUnit.SECONDS);
if (response == null) {
return null;
}
result = response.getResult();
if (result instanceof JSONObject) {
return JSON.parseObject(((JSONObject) result).toJSONString(), method.getReturnType());
}
} finally {
futureCache.remove(request.getId());
}
return result;
}
Expand Down

0 comments on commit 538cc85

Please sign in to comment.