2020import static org .apache .hadoop .hbase .HConstants .REPLICATION_CLUSTER_ID ;
2121import static org .apache .hadoop .hbase .HConstants .REPLICATION_CONF_DIR ;
2222import static org .junit .Assert .assertEquals ;
23+ import static org .junit .Assert .assertNotEquals ;
2324import static org .junit .Assert .assertTrue ;
2425
2526import java .io .File ;
2627import java .io .FileOutputStream ;
2728import java .io .IOException ;
2829import java .net .UnknownHostException ;
30+ import java .util .ArrayList ;
31+ import java .util .HashMap ;
2932import java .util .List ;
3033import java .util .Map ;
3134import java .util .Optional ;
6063import org .apache .hadoop .hbase .io .hfile .HFile ;
6164import org .apache .hadoop .hbase .io .hfile .HFileContextBuilder ;
6265import org .apache .hadoop .hbase .replication .ReplicationPeerConfig ;
66+ import org .apache .hadoop .hbase .replication .ReplicationQueueStorage ;
67+ import org .apache .hadoop .hbase .replication .ReplicationStorageFactory ;
6368import org .apache .hadoop .hbase .replication .TestReplicationBase ;
6469import org .apache .hadoop .hbase .testclassification .MediumTests ;
6570import org .apache .hadoop .hbase .testclassification .ReplicationTests ;
6671import org .apache .hadoop .hbase .tool .BulkLoadHFilesTool ;
6772import org .apache .hadoop .hbase .util .Bytes ;
6873import org .apache .hadoop .hbase .util .Pair ;
74+ import org .apache .hadoop .hbase .zookeeper .ZKUtil ;
75+ import org .apache .hadoop .hbase .zookeeper .ZKWatcher ;
6976import org .apache .hadoop .hdfs .MiniDFSCluster ;
7077import org .junit .After ;
7178import org .junit .Before ;
7986import org .slf4j .Logger ;
8087import org .slf4j .LoggerFactory ;
8188
89+ import org .apache .hbase .thirdparty .org .apache .commons .collections4 .CollectionUtils ;
90+
8291/**
8392 * Integration test for bulk load replication. Defines three clusters, with the following
8493 * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between 2
@@ -121,6 +130,8 @@ public class TestBulkLoadReplication extends TestReplicationBase {
121130 @ ClassRule
122131 public static TemporaryFolder testFolder = new TemporaryFolder ();
123132
133+ private static ReplicationQueueStorage queueStorage ;
134+
124135 @ BeforeClass
125136 public static void setUpBeforeClass () throws Exception {
126137 setupBulkLoadConfigsForCluster (CONF1 , PEER1_CLUSTER_ID );
@@ -129,6 +140,8 @@ public static void setUpBeforeClass() throws Exception {
129140 setupConfig (UTIL3 , "/3" );
130141 TestReplicationBase .setUpBeforeClass ();
131142 startThirdCluster ();
143+ queueStorage = ReplicationStorageFactory .getReplicationQueueStorage (UTIL1 .getConnection (),
144+ UTIL1 .getConfiguration ());
132145 }
133146
134147 private static void startThirdCluster () throws Exception {
@@ -322,4 +335,81 @@ public void postBulkLoadHFile(ObserverContext<? extends RegionCoprocessorEnviron
322335 });
323336 }
324337 }
338+
339+ @ Test
340+ public void testBulkloadReplicationActiveActiveForNoRepFamily () throws Exception {
341+ Table peer1TestTable = UTIL1 .getConnection ().getTable (TestReplicationBase .tableName );
342+ Table peer2TestTable = UTIL2 .getConnection ().getTable (TestReplicationBase .tableName );
343+ Table peer3TestTable = UTIL3 .getConnection ().getTable (TestReplicationBase .tableName );
344+ byte [] row = Bytes .toBytes ("004" );
345+ byte [] value = Bytes .toBytes ("v4" );
346+ assertBulkLoadConditionsForNoRepFamily (row , value , UTIL1 , peer1TestTable , peer2TestTable ,
347+ peer3TestTable );
348+ // additional wait to make sure no extra bulk load happens
349+ Thread .sleep (400 );
350+ assertEquals (1 , BULK_LOADS_COUNT .get ());
351+ assertEquals (0 , queueStorage .getAllHFileRefs ().size ());
352+ }
353+
354+ private void assertBulkLoadConditionsForNoRepFamily (byte [] row , byte [] value ,
355+ HBaseTestingUtil utility , Table ... tables ) throws Exception {
356+ BULK_LOAD_LATCH = new CountDownLatch (1 );
357+ bulkLoadOnClusterForNoRepFamily (row , value , utility );
358+ assertTrue (BULK_LOAD_LATCH .await (1 , TimeUnit .MINUTES ));
359+ assertTableHasValue (tables [0 ], row , value );
360+ assertTableNotHasValue (tables [1 ], row , value );
361+ assertTableNotHasValue (tables [2 ], row , value );
362+ }
363+
364+ private void bulkLoadOnClusterForNoRepFamily (byte [] row , byte [] value , HBaseTestingUtil cluster )
365+ throws Exception {
366+ String bulkloadFile = createHFileForNoRepFamilies (row , value , cluster .getConfiguration ());
367+ Path bulkLoadFilePath = new Path (bulkloadFile );
368+ copyToHdfsForNoRepFamily (bulkloadFile , cluster .getDFSCluster ());
369+ BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool (cluster .getConfiguration ());
370+ Map <byte [], List <Path >> family2Files = new HashMap <>();
371+ List <Path > files = new ArrayList <>();
372+ files .add (new Path (
373+ BULK_LOAD_BASE_DIR + "/" + Bytes .toString (noRepfamName ) + "/" + bulkLoadFilePath .getName ()));
374+ family2Files .put (noRepfamName , files );
375+ bulkLoadHFilesTool .bulkLoad (tableName , family2Files );
376+ }
377+
378+ private String createHFileForNoRepFamilies (byte [] row , byte [] value , Configuration clusterConfig )
379+ throws IOException {
380+ ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory .create (CellBuilderType .DEEP_COPY );
381+ cellBuilder .setRow (row ).setFamily (TestReplicationBase .noRepfamName )
382+ .setQualifier (Bytes .toBytes ("1" )).setValue (value ).setType (Cell .Type .Put );
383+
384+ HFile .WriterFactory hFileFactory = HFile .getWriterFactoryNoCache (clusterConfig );
385+ // TODO We need a way to do this without creating files
386+ File hFileLocation = testFolder .newFile ();
387+ FSDataOutputStream out = new FSDataOutputStream (new FileOutputStream (hFileLocation ), null );
388+ try {
389+ hFileFactory .withOutputStream (out );
390+ hFileFactory .withFileContext (new HFileContextBuilder ().build ());
391+ HFile .Writer writer = hFileFactory .create ();
392+ try {
393+ writer .append (new KeyValue (cellBuilder .build ()));
394+ } finally {
395+ writer .close ();
396+ }
397+ } finally {
398+ out .close ();
399+ }
400+ return hFileLocation .getAbsoluteFile ().getAbsolutePath ();
401+ }
402+
403+ private void copyToHdfsForNoRepFamily (String bulkLoadFilePath , MiniDFSCluster cluster )
404+ throws Exception {
405+ Path bulkLoadDir = new Path (BULK_LOAD_BASE_DIR + "/" + Bytes .toString (noRepfamName ) + "/" );
406+ cluster .getFileSystem ().mkdirs (bulkLoadDir );
407+ cluster .getFileSystem ().copyFromLocalFile (new Path (bulkLoadFilePath ), bulkLoadDir );
408+ }
409+
410+ private void assertTableNotHasValue (Table table , byte [] row , byte [] value ) throws IOException {
411+ Get get = new Get (row );
412+ Result result = table .get (get );
413+ assertNotEquals (Bytes .toString (value ), Bytes .toString (result .value ()));
414+ }
325415}
0 commit comments