diff --git a/99-integration/dubbo-samples-router-chain-switch/src/main/java/org/apache/dubbo/samples/provider/Application.java b/99-integration/dubbo-samples-router-chain-switch/src/main/java/org/apache/dubbo/samples/provider/Application.java index c8ddfb4527..6b8dda4b87 100644 --- a/99-integration/dubbo-samples-router-chain-switch/src/main/java/org/apache/dubbo/samples/provider/Application.java +++ b/99-integration/dubbo-samples-router-chain-switch/src/main/java/org/apache/dubbo/samples/provider/Application.java @@ -17,6 +17,8 @@ package org.apache.dubbo.samples.provider; +import java.util.concurrent.CountDownLatch; + import org.apache.dubbo.common.constants.CommonConstants; import org.apache.dubbo.config.ApplicationConfig; import org.apache.dubbo.config.ProtocolConfig; @@ -27,8 +29,6 @@ import org.apache.dubbo.samples.api.GreetingService; import org.apache.dubbo.samples.zookeeper.EmbeddedZooKeeper; -import java.util.concurrent.CountDownLatch; - public class Application { @@ -39,7 +39,8 @@ public static void main(String[] args) throws Exception { FrameworkModel frameworkModel = new FrameworkModel(); ApplicationConfig applicationConfig = new ApplicationConfig("first-dubbo-provider"); - applicationConfig.setMetadataType("remote"); + applicationConfig.setMetadataType("interface"); + applicationConfig.setRegisterMode("instance"); ServiceConfig service = new ServiceConfig<>(); service.setInterface(GreetingService.class); @@ -59,6 +60,7 @@ public static void main(String[] args) throws Exception { ApplicationConfig applicationConfig = new ApplicationConfig("second-dubbo-provider"); applicationConfig.setMetadataType("remote"); + applicationConfig.setRegisterMode("interface"); ServiceConfig service = new ServiceConfig<>(); service.setInterface(GreetingService.class); diff --git a/99-integration/dubbo-samples-router-chain-switch/src/main/resources/log4j.properties b/99-integration/dubbo-samples-router-chain-switch/src/main/resources/log4j.properties new file mode 100644 index 0000000000..d6ecd5ce34 --- /dev/null +++ b/99-integration/dubbo-samples-router-chain-switch/src/main/resources/log4j.properties @@ -0,0 +1,26 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +###set log levels### +log4j.rootLogger=info, stdout +###output to the console### +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d{dd/MM/yy hh:mm:ss:sss z}] %t %5p %c{2}: %m%n \ No newline at end of file diff --git a/99-integration/dubbo-samples-router-chain-switch/src/test/java/org/apache/dubbo/samples/DemoClientIT.java b/99-integration/dubbo-samples-router-chain-switch/src/test/java/org/apache/dubbo/samples/DemoClientIT.java index c87f9ae0c2..c629a83446 100644 --- a/99-integration/dubbo-samples-router-chain-switch/src/test/java/org/apache/dubbo/samples/DemoClientIT.java +++ b/99-integration/dubbo-samples-router-chain-switch/src/test/java/org/apache/dubbo/samples/DemoClientIT.java @@ -16,9 +16,12 @@ */ package org.apache.dubbo.samples; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.dubbo.common.constants.CommonConstants; import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.config.ApplicationConfig; +import org.apache.dubbo.config.MetadataReportConfig; import org.apache.dubbo.config.ProtocolConfig; import org.apache.dubbo.config.ReferenceConfig; import org.apache.dubbo.config.RegistryConfig; @@ -27,38 +30,45 @@ import org.apache.dubbo.rpc.model.ModuleModel; import org.apache.dubbo.samples.api.GreetingService; import org.apache.dubbo.samples.provider.GreetingServiceImpl; - import org.junit.Assert; import org.junit.Test; -import java.util.concurrent.atomic.AtomicBoolean; - public class DemoClientIT { @Test public void testInterfaceDiscovery() throws InterruptedException { ApplicationModel applicationModel = ApplicationModel.defaultModel(); - applicationModel.getApplicationConfigManager().setApplication(new ApplicationConfig("first-dubbo-consumer")); + ApplicationConfig applicationConfig = new ApplicationConfig("first-dubbo-consumer"); + applicationConfig.setRegisterMode("interface"); + applicationModel.getApplicationConfigManager().setApplication(applicationConfig); ModuleModel moduleModel = applicationModel.newModule(); String zookeeperAddress = System.getProperty("zookeeper.address", "127.0.0.1"); ReferenceConfig reference = new ReferenceConfig<>(moduleModel); reference.setInterface(GreetingService.class); reference.setRegistry(new RegistryConfig( + "zookeeper://" + zookeeperAddress + ":" + "2181?file.cache=false®istry-protocol-type=mock-registry-protocol")); + reference.setMetadataReportConfig(new MetadataReportConfig( "zookeeper://" + zookeeperAddress + ":" + "2181?file.cache=false")); + GreetingService service = reference.get(); AtomicBoolean stop = new AtomicBoolean(false); - new Thread(() -> { - while (!stop.get()) { - try { - service.sayHi(); - } catch (Exception e) { - e.printStackTrace(); + for (int i = 0; i < 32; i++) { + new Thread(() -> { + while (!stop.get()) { + try { + service.sayHi(); + } catch (Exception e) { + e.printStackTrace(); + } } - } - }).start(); + }).start(); + } + + LongWaitRouter.setEnd(false); + MockRegistryDirectory.getShouldWait().set(true); ServiceConfig serviceConfig = new ServiceConfig<>(moduleModel); serviceConfig.setInterface(GreetingService.class); @@ -76,7 +86,8 @@ public void testInterfaceDiscovery() throws InterruptedException { Assert.assertFalse(LongWaitRouter.isFoundFailed()); } finally { stop.set(true); - LongWaitRouter.setEnd(); + LongWaitRouter.setEnd(true); + MockRegistryDirectory.getShouldWait().set(false); } } diff --git a/99-integration/dubbo-samples-router-chain-switch/src/test/java/org/apache/dubbo/samples/LongWaitRouter.java b/99-integration/dubbo-samples-router-chain-switch/src/test/java/org/apache/dubbo/samples/LongWaitRouter.java index 2338eb1825..9b347d8c5c 100644 --- a/99-integration/dubbo-samples-router-chain-switch/src/test/java/org/apache/dubbo/samples/LongWaitRouter.java +++ b/99-integration/dubbo-samples-router-chain-switch/src/test/java/org/apache/dubbo/samples/LongWaitRouter.java @@ -16,6 +16,10 @@ */ package org.apache.dubbo.samples; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.dubbo.common.URL; import org.apache.dubbo.common.utils.Holder; import org.apache.dubbo.rpc.Invocation; @@ -25,10 +29,6 @@ import org.apache.dubbo.rpc.cluster.router.state.AbstractStateRouter; import org.apache.dubbo.rpc.cluster.router.state.BitList; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - public class LongWaitRouter extends AbstractStateRouter { public LongWaitRouter(URL url) { super(url); @@ -37,7 +37,7 @@ public LongWaitRouter(URL url) { private final AtomicReference>> expectedInvokers = new AtomicReference<>(); private static final AtomicBoolean foundFailed = new AtomicBoolean(false); - private static final AtomicBoolean end = new AtomicBoolean(false); + private static final AtomicBoolean end = new AtomicBoolean(true); private static final AtomicInteger invokeCount = new AtomicInteger(0); @@ -73,7 +73,7 @@ public static boolean isFoundFailed() { return foundFailed.get(); } - public static void setEnd() { - end.set(true); + public static void setEnd(boolean end) { + LongWaitRouter.end.set(end); } } diff --git a/99-integration/dubbo-samples-router-chain-switch/src/test/java/org/apache/dubbo/samples/MockRegistryDirectory.java b/99-integration/dubbo-samples-router-chain-switch/src/test/java/org/apache/dubbo/samples/MockRegistryDirectory.java new file mode 100644 index 0000000000..b8e76b2145 --- /dev/null +++ b/99-integration/dubbo-samples-router-chain-switch/src/test/java/org/apache/dubbo/samples/MockRegistryDirectory.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.samples; + +import java.util.List; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.registry.integration.RegistryDirectory; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.cluster.router.state.BitList; + +import static org.awaitility.Awaitility.await; + +public class MockRegistryDirectory extends RegistryDirectory { + public MockRegistryDirectory(Class serviceType, URL url) { + super(serviceType, url); + } + + private final AtomicInteger count = new AtomicInteger(0); + + private final AtomicInteger waitCount = new AtomicInteger(0); + + private static final AtomicBoolean shouldWait = new AtomicBoolean(false); + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + private final Semaphore semaphore = new Semaphore(3); + + @Override + public List doList(BitList bitList, Invocation invocation) { + count.incrementAndGet(); + if (semaphore.tryAcquire()) { + try { + lock.readLock().lock(); + return super.doList(bitList, invocation); + } finally { + waitCount.incrementAndGet(); + lock.readLock().unlock(); + semaphore.release(); + } + } else { + return super.doList(bitList, invocation); + } + } + + @Override + protected void setInvokers(BitList bitList) { + try { + semaphore.acquire(3); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + lock.writeLock().lock(); + semaphore.release(3); + if (shouldWait.get()) { + await().atMost(60, TimeUnit.SECONDS).until(() -> semaphore.availablePermits() == 0); + } + super.setInvokers(bitList); + count.set(0); + if (shouldWait.get()) { + await().atMost(60, TimeUnit.SECONDS).until(() -> count.get() > 10); + } + waitCount.set(0); + lock.writeLock().unlock(); + if (shouldWait.get()) { + await().atMost(60, TimeUnit.SECONDS).until(() -> waitCount.get() > 0); + } + + } + + public static AtomicBoolean getShouldWait() { + return shouldWait; + } +} diff --git a/99-integration/dubbo-samples-router-chain-switch/src/test/java/org/apache/dubbo/samples/MockRegistryProtocol.java b/99-integration/dubbo-samples-router-chain-switch/src/test/java/org/apache/dubbo/samples/MockRegistryProtocol.java new file mode 100644 index 0000000000..b44d247d70 --- /dev/null +++ b/99-integration/dubbo-samples-router-chain-switch/src/test/java/org/apache/dubbo/samples/MockRegistryProtocol.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.samples; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.registry.Registry; +import org.apache.dubbo.registry.integration.DynamicDirectory; +import org.apache.dubbo.registry.integration.InterfaceCompatibleRegistryProtocol; +import org.apache.dubbo.rpc.cluster.Cluster; +import org.apache.dubbo.rpc.cluster.ClusterInvoker; + +public class MockRegistryProtocol extends InterfaceCompatibleRegistryProtocol { + + @Override + public ClusterInvoker getServiceDiscoveryInvoker(Cluster cluster, Registry registry, Class type, URL url) { + DynamicDirectory directory = new MockServiceDiscoveryRegistryDirectory(type, url); + return doCreateInvoker(directory, cluster, registry, type); + } + + @Override + public ClusterInvoker getInvoker(Cluster cluster, Registry registry, Class type, URL url) { + DynamicDirectory directory = new MockRegistryDirectory(type, url); + return doCreateInvoker(directory, cluster, registry, type); + } +} diff --git a/99-integration/dubbo-samples-router-chain-switch/src/test/java/org/apache/dubbo/samples/MockServiceDiscoveryRegistryDirectory.java b/99-integration/dubbo-samples-router-chain-switch/src/test/java/org/apache/dubbo/samples/MockServiceDiscoveryRegistryDirectory.java new file mode 100644 index 0000000000..6a53010751 --- /dev/null +++ b/99-integration/dubbo-samples-router-chain-switch/src/test/java/org/apache/dubbo/samples/MockServiceDiscoveryRegistryDirectory.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.samples; + +import java.util.List; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.registry.client.ServiceDiscoveryRegistryDirectory; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.cluster.router.state.BitList; + +import static org.awaitility.Awaitility.await; + +public class MockServiceDiscoveryRegistryDirectory extends ServiceDiscoveryRegistryDirectory { + public MockServiceDiscoveryRegistryDirectory(Class serviceType, URL url) { + super(serviceType, url); + } + + private final AtomicInteger count = new AtomicInteger(0); + + private final AtomicInteger waitCount = new AtomicInteger(0); + + private static final AtomicBoolean shouldWait = new AtomicBoolean(false); + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + private final Semaphore semaphore = new Semaphore(3); + + @Override + public List doList(BitList bitList, Invocation invocation) { + count.incrementAndGet(); + if (semaphore.tryAcquire()) { + try { + lock.readLock().lock(); + return super.doList(bitList, invocation); + } finally { + waitCount.incrementAndGet(); + lock.readLock().unlock(); + semaphore.release(); + } + } else { + return super.doList(bitList, invocation); + } + } + + @Override + protected void setInvokers(BitList bitList) { + lock.writeLock().lock(); + await().until(() -> semaphore.availablePermits() == 0); + super.setInvokers(bitList); + count.set(0); + if (shouldWait.get()) { + await().atMost(60, TimeUnit.SECONDS).until(() -> count.get() > 0); + } + waitCount.set(0); + lock.writeLock().unlock(); + if (shouldWait.get()) { + await().atMost(60, TimeUnit.SECONDS).until(() -> waitCount.get() > 0); + } + + } + + public static AtomicBoolean getShouldWait() { + return shouldWait; + } +} diff --git a/99-integration/dubbo-samples-router-chain-switch/src/test/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol b/99-integration/dubbo-samples-router-chain-switch/src/test/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol new file mode 100644 index 0000000000..0ab16fa71e --- /dev/null +++ b/99-integration/dubbo-samples-router-chain-switch/src/test/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol @@ -0,0 +1 @@ +mock-registry-protocol=org.apache.dubbo.samples.MockRegistryProtocol \ No newline at end of file