2020import static org .apache .hadoop .hbase .HConstants .REPLICATION_CLUSTER_ID ;
2121import static org .apache .hadoop .hbase .HConstants .REPLICATION_CONF_DIR ;
2222import static org .junit .Assert .assertEquals ;
23+ import static org .junit .Assert .assertNotEquals ;
2324import static org .junit .Assert .assertTrue ;
2425
2526import java .io .File ;
2627import java .io .FileOutputStream ;
2728import java .io .IOException ;
2829import java .net .UnknownHostException ;
30+ import java .util .ArrayList ;
31+ import java .util .HashMap ;
2932import java .util .List ;
3033import java .util .Map ;
3134import java .util .Optional ;
6063import org .apache .hadoop .hbase .io .hfile .HFile ;
6164import org .apache .hadoop .hbase .io .hfile .HFileContextBuilder ;
6265import org .apache .hadoop .hbase .replication .ReplicationPeerConfig ;
66+ import org .apache .hadoop .hbase .replication .ReplicationQueueStorage ;
67+ import org .apache .hadoop .hbase .replication .ReplicationStorageFactory ;
6368import org .apache .hadoop .hbase .replication .TestReplicationBase ;
6469import org .apache .hadoop .hbase .testclassification .MediumTests ;
6570import org .apache .hadoop .hbase .testclassification .ReplicationTests ;
6671import org .apache .hadoop .hbase .tool .BulkLoadHFilesTool ;
6772import org .apache .hadoop .hbase .util .Bytes ;
6873import org .apache .hadoop .hbase .util .Pair ;
6974import org .apache .hadoop .hdfs .MiniDFSCluster ;
70- import org .junit .After ;
7175import org .junit .Before ;
7276import org .junit .BeforeClass ;
7377import org .junit .ClassRule ;
@@ -121,6 +125,10 @@ public class TestBulkLoadReplication extends TestReplicationBase {
121125 @ ClassRule
122126 public static TemporaryFolder testFolder = new TemporaryFolder ();
123127
128+ private static ReplicationQueueStorage queueStorage ;
129+
130+ private static boolean replicationPeersAdded = false ;
131+
124132 @ BeforeClass
125133 public static void setUpBeforeClass () throws Exception {
126134 setupBulkLoadConfigsForCluster (CONF1 , PEER1_CLUSTER_ID );
@@ -129,6 +137,8 @@ public static void setUpBeforeClass() throws Exception {
129137 setupConfig (UTIL3 , "/3" );
130138 TestReplicationBase .setUpBeforeClass ();
131139 startThirdCluster ();
140+ queueStorage = ReplicationStorageFactory .getReplicationQueueStorage (UTIL1 .getConnection (),
141+ UTIL1 .getConfiguration ());
132142 }
133143
134144 private static void startThirdCluster () throws Exception {
@@ -152,22 +162,27 @@ private static void startThirdCluster() throws Exception {
152162 @ Before
153163 @ Override
154164 public void setUpBase () throws Exception {
155- // "super.setUpBase()" already sets replication from 1->2,
156- // then on the subsequent lines, sets 2->1, 2->3 and 3->2.
157- // So we have following topology: "1 <-> 2 <->3"
158- super .setUpBase ();
159- ReplicationPeerConfig peer1Config = getPeerConfigForCluster (UTIL1 );
160- ReplicationPeerConfig peer2Config = getPeerConfigForCluster (UTIL2 );
161- ReplicationPeerConfig peer3Config = getPeerConfigForCluster (UTIL3 );
162- // adds cluster1 as a remote peer on cluster2
163- UTIL2 .getAdmin ().addReplicationPeer (PEER_ID1 , peer1Config );
164- // adds cluster3 as a remote peer on cluster2
165- UTIL2 .getAdmin ().addReplicationPeer (PEER_ID3 , peer3Config );
166- // adds cluster2 as a remote peer on cluster3
167- UTIL3 .getAdmin ().addReplicationPeer (PEER_ID2 , peer2Config );
168- setupCoprocessor (UTIL1 );
169- setupCoprocessor (UTIL2 );
170- setupCoprocessor (UTIL3 );
165+ // removing the peer and adding again causing the previously completed bulk load jobs getting
166+ // submitted again, adding a check to add the peers only once.
167+ if (!replicationPeersAdded ) {
168+ // "super.setUpBase()" already sets replication from 1->2,
169+ // then on the subsequent lines, sets 2->1, 2->3 and 3->2.
170+ // So we have following topology: "1 <-> 2 <->3"
171+ super .setUpBase ();
172+ ReplicationPeerConfig peer1Config = getPeerConfigForCluster (UTIL1 );
173+ ReplicationPeerConfig peer2Config = getPeerConfigForCluster (UTIL2 );
174+ ReplicationPeerConfig peer3Config = getPeerConfigForCluster (UTIL3 );
175+ // adds cluster1 as a remote peer on cluster2
176+ UTIL2 .getAdmin ().addReplicationPeer (PEER_ID1 , peer1Config );
177+ // adds cluster3 as a remote peer on cluster2
178+ UTIL2 .getAdmin ().addReplicationPeer (PEER_ID3 , peer3Config );
179+ // adds cluster2 as a remote peer on cluster3
180+ UTIL3 .getAdmin ().addReplicationPeer (PEER_ID2 , peer2Config );
181+ setupCoprocessor (UTIL1 );
182+ setupCoprocessor (UTIL2 );
183+ setupCoprocessor (UTIL3 );
184+ replicationPeersAdded = true ;
185+ }
171186 BULK_LOADS_COUNT = new AtomicInteger (0 );
172187 }
173188
@@ -195,15 +210,6 @@ private void setupCoprocessor(HBaseTestingUtil cluster) {
195210 });
196211 }
197212
198- @ After
199- @ Override
200- public void tearDownBase () throws Exception {
201- super .tearDownBase ();
202- UTIL2 .getAdmin ().removeReplicationPeer (PEER_ID1 );
203- UTIL2 .getAdmin ().removeReplicationPeer (PEER_ID3 );
204- UTIL3 .getAdmin ().removeReplicationPeer (PEER_ID2 );
205- }
206-
207213 protected static void setupBulkLoadConfigsForCluster (Configuration config ,
208214 String clusterReplicationId ) throws Exception {
209215 config .setBoolean (HConstants .REPLICATION_BULKLOAD_ENABLE_KEY , true );
@@ -322,4 +328,81 @@ public void postBulkLoadHFile(ObserverContext<? extends RegionCoprocessorEnviron
322328 });
323329 }
324330 }
331+
332+ @ Test
333+ public void testBulkloadReplicationActiveActiveForNoRepFamily () throws Exception {
334+ Table peer1TestTable = UTIL1 .getConnection ().getTable (TestReplicationBase .tableName );
335+ Table peer2TestTable = UTIL2 .getConnection ().getTable (TestReplicationBase .tableName );
336+ Table peer3TestTable = UTIL3 .getConnection ().getTable (TestReplicationBase .tableName );
337+ byte [] row = Bytes .toBytes ("004" );
338+ byte [] value = Bytes .toBytes ("v4" );
339+ assertBulkLoadConditionsForNoRepFamily (row , value , UTIL1 , peer1TestTable , peer2TestTable ,
340+ peer3TestTable );
341+ // additional wait to make sure no extra bulk load happens
342+ Thread .sleep (400 );
343+ assertEquals (1 , BULK_LOADS_COUNT .get ());
344+ assertEquals (0 , queueStorage .getAllHFileRefs ().size ());
345+ }
346+
347+ private void assertBulkLoadConditionsForNoRepFamily (byte [] row , byte [] value ,
348+ HBaseTestingUtil utility , Table ... tables ) throws Exception {
349+ BULK_LOAD_LATCH = new CountDownLatch (1 );
350+ bulkLoadOnClusterForNoRepFamily (row , value , utility );
351+ assertTrue (BULK_LOAD_LATCH .await (1 , TimeUnit .MINUTES ));
352+ assertTableHasValue (tables [0 ], row , value );
353+ assertTableNotHasValue (tables [1 ], row , value );
354+ assertTableNotHasValue (tables [2 ], row , value );
355+ }
356+
357+ private void bulkLoadOnClusterForNoRepFamily (byte [] row , byte [] value , HBaseTestingUtil cluster )
358+ throws Exception {
359+ String bulkloadFile = createHFileForNoRepFamilies (row , value , cluster .getConfiguration ());
360+ Path bulkLoadFilePath = new Path (bulkloadFile );
361+ copyToHdfsForNoRepFamily (bulkloadFile , cluster .getDFSCluster ());
362+ BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool (cluster .getConfiguration ());
363+ Map <byte [], List <Path >> family2Files = new HashMap <>();
364+ List <Path > files = new ArrayList <>();
365+ files .add (new Path (
366+ BULK_LOAD_BASE_DIR + "/" + Bytes .toString (noRepfamName ) + "/" + bulkLoadFilePath .getName ()));
367+ family2Files .put (noRepfamName , files );
368+ bulkLoadHFilesTool .bulkLoad (tableName , family2Files );
369+ }
370+
371+ private String createHFileForNoRepFamilies (byte [] row , byte [] value , Configuration clusterConfig )
372+ throws IOException {
373+ ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory .create (CellBuilderType .DEEP_COPY );
374+ cellBuilder .setRow (row ).setFamily (TestReplicationBase .noRepfamName )
375+ .setQualifier (Bytes .toBytes ("1" )).setValue (value ).setType (Cell .Type .Put );
376+
377+ HFile .WriterFactory hFileFactory = HFile .getWriterFactoryNoCache (clusterConfig );
378+ // TODO We need a way to do this without creating files
379+ File hFileLocation = testFolder .newFile ();
380+ FSDataOutputStream out = new FSDataOutputStream (new FileOutputStream (hFileLocation ), null );
381+ try {
382+ hFileFactory .withOutputStream (out );
383+ hFileFactory .withFileContext (new HFileContextBuilder ().build ());
384+ HFile .Writer writer = hFileFactory .create ();
385+ try {
386+ writer .append (new KeyValue (cellBuilder .build ()));
387+ } finally {
388+ writer .close ();
389+ }
390+ } finally {
391+ out .close ();
392+ }
393+ return hFileLocation .getAbsoluteFile ().getAbsolutePath ();
394+ }
395+
396+ private void copyToHdfsForNoRepFamily (String bulkLoadFilePath , MiniDFSCluster cluster )
397+ throws Exception {
398+ Path bulkLoadDir = new Path (BULK_LOAD_BASE_DIR + "/" + Bytes .toString (noRepfamName ) + "/" );
399+ cluster .getFileSystem ().mkdirs (bulkLoadDir );
400+ cluster .getFileSystem ().copyFromLocalFile (new Path (bulkLoadFilePath ), bulkLoadDir );
401+ }
402+
403+ private void assertTableNotHasValue (Table table , byte [] row , byte [] value ) throws IOException {
404+ Get get = new Get (row );
405+ Result result = table .get (get );
406+ assertNotEquals (Bytes .toString (value ), Bytes .toString (result .value ()));
407+ }
325408}
0 commit comments