|
72 | 72 | import org.apache.hadoop.hbase.util.Bytes; |
73 | 73 | import org.apache.hadoop.hbase.util.Pair; |
74 | 74 | import org.apache.hadoop.hdfs.MiniDFSCluster; |
75 | | -import org.junit.After; |
76 | 75 | import org.junit.Before; |
77 | 76 | import org.junit.BeforeClass; |
78 | 77 | import org.junit.ClassRule; |
@@ -128,6 +127,8 @@ public class TestBulkLoadReplication extends TestReplicationBase { |
128 | 127 |
|
129 | 128 | private static ReplicationQueueStorage queueStorage; |
130 | 129 |
|
| 130 | + private static boolean replicationPeersAdded = false; |
| 131 | + |
131 | 132 | @BeforeClass |
132 | 133 | public static void setUpBeforeClass() throws Exception { |
133 | 134 | setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID); |
@@ -161,22 +162,27 @@ private static void startThirdCluster() throws Exception { |
161 | 162 | @Before |
162 | 163 | @Override |
163 | 164 | public void setUpBase() throws Exception { |
164 | | - // "super.setUpBase()" already sets replication from 1->2, |
165 | | - // then on the subsequent lines, sets 2->1, 2->3 and 3->2. |
166 | | - // So we have following topology: "1 <-> 2 <->3" |
167 | | - super.setUpBase(); |
168 | | - ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1); |
169 | | - ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2); |
170 | | - ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3); |
171 | | - // adds cluster1 as a remote peer on cluster2 |
172 | | - UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config); |
173 | | - // adds cluster3 as a remote peer on cluster2 |
174 | | - UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config); |
175 | | - // adds cluster2 as a remote peer on cluster3 |
176 | | - UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config); |
177 | | - setupCoprocessor(UTIL1); |
178 | | - setupCoprocessor(UTIL2); |
179 | | - setupCoprocessor(UTIL3); |
| 165 | + // removing the peer and adding again causing the previously completed bulk load jobs getting |
| 166 | + // submitted again, adding a check to add the peers only once. |
| 167 | + if (!replicationPeersAdded) { |
| 168 | + // "super.setUpBase()" already sets replication from 1->2, |
| 169 | + // then on the subsequent lines, sets 2->1, 2->3 and 3->2. |
| 170 | + // So we have following topology: "1 <-> 2 <->3" |
| 171 | + super.setUpBase(); |
| 172 | + ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1); |
| 173 | + ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2); |
| 174 | + ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3); |
| 175 | + // adds cluster1 as a remote peer on cluster2 |
| 176 | + UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config); |
| 177 | + // adds cluster3 as a remote peer on cluster2 |
| 178 | + UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config); |
| 179 | + // adds cluster2 as a remote peer on cluster3 |
| 180 | + UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config); |
| 181 | + setupCoprocessor(UTIL1); |
| 182 | + setupCoprocessor(UTIL2); |
| 183 | + setupCoprocessor(UTIL3); |
| 184 | + replicationPeersAdded = true; |
| 185 | + } |
180 | 186 | BULK_LOADS_COUNT = new AtomicInteger(0); |
181 | 187 | } |
182 | 188 |
|
@@ -204,15 +210,6 @@ private void setupCoprocessor(HBaseTestingUtil cluster) { |
204 | 210 | }); |
205 | 211 | } |
206 | 212 |
|
207 | | - @After |
208 | | - @Override |
209 | | - public void tearDownBase() throws Exception { |
210 | | - super.tearDownBase(); |
211 | | - UTIL2.getAdmin().removeReplicationPeer(PEER_ID1); |
212 | | - UTIL2.getAdmin().removeReplicationPeer(PEER_ID3); |
213 | | - UTIL3.getAdmin().removeReplicationPeer(PEER_ID2); |
214 | | - } |
215 | | - |
216 | 213 | protected static void setupBulkLoadConfigsForCluster(Configuration config, |
217 | 214 | String clusterReplicationId) throws Exception { |
218 | 215 | config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); |
|
0 commit comments