@@ -23,13 +23,14 @@ public abstract class RoutingAlgorithm<T> implements ZuClusterEventListener{
2323 private volatile Set <Integer > shards = null ;
2424 private final InetSocketAddressDecorator <T > socketDecorator ;
2525 private volatile Map <InetSocketAddress , T > addrMap = new HashMap <InetSocketAddress , T >();
26-
26+ private volatile Map <T , InetSocketAddress > serviceMap = new HashMap <T , InetSocketAddress >();
27+
2728 public RoutingAlgorithm (InetSocketAddressDecorator <T > socketDecorator ) {
2829 this .socketDecorator = socketDecorator ;
2930 }
30-
31+
3132 public abstract T route (byte [] key , int shard );
32-
33+
3334 public Set <Integer > getShards () {
3435 return shards == null ? new HashSet <Integer >() : shards ;
3536 }
@@ -38,7 +39,8 @@ public final void clusterChanged(Map<Integer, List<InetSocketAddress>> view){
3839 shards = view .keySet ();
3940 Map <Integer , ArrayList <T >> clusterView = new HashMap <Integer , ArrayList <T >>();
4041 Map <InetSocketAddress , T > newAddrMap = new HashMap <InetSocketAddress , T >();
41-
42+ Map <T , InetSocketAddress > newServiceMap = new HashMap <T , InetSocketAddress >();
43+
4244 for (Entry <Integer ,List <InetSocketAddress >> entry : view .entrySet ()) {
4345 Integer key = entry .getKey ();
4446 List <InetSocketAddress > value = entry .getValue ();
@@ -52,40 +54,39 @@ public final void clusterChanged(Map<Integer, List<InetSocketAddress>> view){
5254 elem = socketDecorator .decorate (addr );
5355 if (elem != null ) {
5456 newAddrMap .put (addr , elem );
57+ newServiceMap .put (elem , addr );
5558 }
5659 }
5760 list .add (elem );
5861 }
5962 clusterView .put (key , list );
6063 }
61-
64+
6265 updateCluster (clusterView );
6366 addrMap = newAddrMap ;
67+ serviceMap = newServiceMap ;
6468 }
65-
69+
6670 @ Override
6771 public void nodesRemoved (Set <InetSocketAddress > removedNodes ) {
68- Set <T > set = new HashSet <T >();
69- for (InetSocketAddress host : removedNodes ) {
70- //addrMap.get() returns null here, as it has been updated and doesn't contain removed hosts
71- set .add (addrMap .get (host ));
72- }
73- //commented out as it causes NPE inside cleanup
74- //socketDecorator.cleanup(set);
7572 }
7673
7774 public void updateCluster (Map <Integer ,ArrayList <T >> clusterView ){
7875 this .clusterView = clusterView ;
7976 }
8077
78+ public InetSocketAddress getServiceAddress (T service ) {
79+ return serviceMap .get (service );
80+ }
81+
8182 public static class RandomAlgorithm <T > extends RoutingAlgorithm <T > {
82- private Random rand = new Random ();
83-
84-
83+ private final Random rand = new Random ();
84+
85+
8586 public RandomAlgorithm (InetSocketAddressDecorator <T > socketDecorator ){
8687 super (socketDecorator );
8788 }
88-
89+
8990 @ Override
9091 public T route (byte [] key , int partition ) {
9192 if (clusterView == null ) return null ;
@@ -94,14 +95,14 @@ public T route(byte[] key, int partition) {
9495 return nodes .get (rand .nextInt (nodes .size ()));
9596 }
9697 }
97-
98+
9899 public static class RoundRobinAlgorithm <T > extends RoutingAlgorithm <T > {
99100 private final Map <Integer ,AtomicLong > countMap = Collections .synchronizedMap (new HashMap <Integer ,AtomicLong >());
100-
101+
101102 public RoundRobinAlgorithm (InetSocketAddressDecorator <T > socketDecorator ){
102103 super (socketDecorator );
103104 }
104-
105+
105106 @ Override
106107 public T route (byte [] key , int partition ) {
107108 if (clusterView == null ) {
@@ -118,7 +119,7 @@ public T route(byte[] key, int partition) {
118119 else {
119120 idxVal = idx .incrementAndGet ();
120121 }
121- return nodes .get ((int )(idxVal % ( long ) nodes .size ()));
122+ return nodes .get ((int )(idxVal % nodes .size ()));
122123 }
123124 }
124125}
0 commit comments