2020import static org .apache .hadoop .hbase .HConstants .REPLICATION_CLUSTER_ID ;
2121import static org .apache .hadoop .hbase .HConstants .REPLICATION_CONF_DIR ;
2222import static org .junit .Assert .assertEquals ;
23+ import static org .junit .Assert .assertNotEquals ;
2324import static org .junit .Assert .assertTrue ;
2425
2526import java .io .File ;
2627import java .io .FileOutputStream ;
2728import java .io .IOException ;
2829import java .net .UnknownHostException ;
30+ import java .util .ArrayList ;
31+ import java .util .HashMap ;
2932import java .util .List ;
3033import java .util .Map ;
3134import java .util .Optional ;
6063import org .apache .hadoop .hbase .io .hfile .HFile ;
6164import org .apache .hadoop .hbase .io .hfile .HFileContextBuilder ;
6265import org .apache .hadoop .hbase .replication .ReplicationPeerConfig ;
66+ import org .apache .hadoop .hbase .replication .ReplicationQueueStorage ;
67+ import org .apache .hadoop .hbase .replication .ReplicationStorageFactory ;
6368import org .apache .hadoop .hbase .replication .TestReplicationBase ;
6469import org .apache .hadoop .hbase .testclassification .MediumTests ;
6570import org .apache .hadoop .hbase .testclassification .ReplicationTests ;
@@ -121,6 +126,8 @@ public class TestBulkLoadReplication extends TestReplicationBase {
121126 @ ClassRule
122127 public static TemporaryFolder testFolder = new TemporaryFolder ();
123128
129+ private static ReplicationQueueStorage queueStorage ;
130+
124131 @ BeforeClass
125132 public static void setUpBeforeClass () throws Exception {
126133 setupBulkLoadConfigsForCluster (CONF1 , PEER1_CLUSTER_ID );
@@ -129,6 +136,8 @@ public static void setUpBeforeClass() throws Exception {
129136 setupConfig (UTIL3 , "/3" );
130137 TestReplicationBase .setUpBeforeClass ();
131138 startThirdCluster ();
139+ queueStorage = ReplicationStorageFactory .getReplicationQueueStorage (UTIL1 .getConnection (),
140+ UTIL1 .getConfiguration ());
132141 }
133142
134143 private static void startThirdCluster () throws Exception {
@@ -322,4 +331,81 @@ public void postBulkLoadHFile(ObserverContext<? extends RegionCoprocessorEnviron
322331 });
323332 }
324333 }
334+
335+ @ Test
336+ public void testBulkloadReplicationActiveActiveForNoRepFamily () throws Exception {
337+ Table peer1TestTable = UTIL1 .getConnection ().getTable (TestReplicationBase .tableName );
338+ Table peer2TestTable = UTIL2 .getConnection ().getTable (TestReplicationBase .tableName );
339+ Table peer3TestTable = UTIL3 .getConnection ().getTable (TestReplicationBase .tableName );
340+ byte [] row = Bytes .toBytes ("004" );
341+ byte [] value = Bytes .toBytes ("v4" );
342+ assertBulkLoadConditionsForNoRepFamily (row , value , UTIL1 , peer1TestTable , peer2TestTable ,
343+ peer3TestTable );
344+ // additional wait to make sure no extra bulk load happens
345+ Thread .sleep (400 );
346+ assertEquals (1 , BULK_LOADS_COUNT .get ());
347+ assertEquals (0 , queueStorage .getAllHFileRefs ().size ());
348+ }
349+
350+ private void assertBulkLoadConditionsForNoRepFamily (byte [] row , byte [] value ,
351+ HBaseTestingUtil utility , Table ... tables ) throws Exception {
352+ BULK_LOAD_LATCH = new CountDownLatch (1 );
353+ bulkLoadOnClusterForNoRepFamily (row , value , utility );
354+ assertTrue (BULK_LOAD_LATCH .await (1 , TimeUnit .MINUTES ));
355+ assertTableHasValue (tables [0 ], row , value );
356+ assertTableNotHasValue (tables [1 ], row , value );
357+ assertTableNotHasValue (tables [2 ], row , value );
358+ }
359+
360+ private void bulkLoadOnClusterForNoRepFamily (byte [] row , byte [] value , HBaseTestingUtil cluster )
361+ throws Exception {
362+ String bulkloadFile = createHFileForNoRepFamilies (row , value , cluster .getConfiguration ());
363+ Path bulkLoadFilePath = new Path (bulkloadFile );
364+ copyToHdfsForNoRepFamily (bulkloadFile , cluster .getDFSCluster ());
365+ BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool (cluster .getConfiguration ());
366+ Map <byte [], List <Path >> family2Files = new HashMap <>();
367+ List <Path > files = new ArrayList <>();
368+ files .add (new Path (
369+ BULK_LOAD_BASE_DIR + "/" + Bytes .toString (noRepfamName ) + "/" + bulkLoadFilePath .getName ()));
370+ family2Files .put (noRepfamName , files );
371+ bulkLoadHFilesTool .bulkLoad (tableName , family2Files );
372+ }
373+
374+ private String createHFileForNoRepFamilies (byte [] row , byte [] value , Configuration clusterConfig )
375+ throws IOException {
376+ ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory .create (CellBuilderType .DEEP_COPY );
377+ cellBuilder .setRow (row ).setFamily (TestReplicationBase .noRepfamName )
378+ .setQualifier (Bytes .toBytes ("1" )).setValue (value ).setType (Cell .Type .Put );
379+
380+ HFile .WriterFactory hFileFactory = HFile .getWriterFactoryNoCache (clusterConfig );
381+ // TODO We need a way to do this without creating files
382+ File hFileLocation = testFolder .newFile ();
383+ FSDataOutputStream out = new FSDataOutputStream (new FileOutputStream (hFileLocation ), null );
384+ try {
385+ hFileFactory .withOutputStream (out );
386+ hFileFactory .withFileContext (new HFileContextBuilder ().build ());
387+ HFile .Writer writer = hFileFactory .create ();
388+ try {
389+ writer .append (new KeyValue (cellBuilder .build ()));
390+ } finally {
391+ writer .close ();
392+ }
393+ } finally {
394+ out .close ();
395+ }
396+ return hFileLocation .getAbsoluteFile ().getAbsolutePath ();
397+ }
398+
399+ private void copyToHdfsForNoRepFamily (String bulkLoadFilePath , MiniDFSCluster cluster )
400+ throws Exception {
401+ Path bulkLoadDir = new Path (BULK_LOAD_BASE_DIR + "/" + Bytes .toString (noRepfamName ) + "/" );
402+ cluster .getFileSystem ().mkdirs (bulkLoadDir );
403+ cluster .getFileSystem ().copyFromLocalFile (new Path (bulkLoadFilePath ), bulkLoadDir );
404+ }
405+
406+ private void assertTableNotHasValue (Table table , byte [] row , byte [] value ) throws IOException {
407+ Get get = new Get (row );
408+ Result result = table .get (get );
409+ assertNotEquals (Bytes .toString (value ), Bytes .toString (result .value ()));
410+ }
325411}
0 commit comments