|
20 | 20 | import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; |
21 | 21 | import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR; |
22 | 22 | import static org.junit.Assert.assertEquals; |
| 23 | +import static org.junit.Assert.assertNotEquals; |
23 | 24 | import static org.junit.Assert.assertTrue; |
24 | 25 |
|
25 | 26 | import java.io.File; |
26 | 27 | import java.io.FileOutputStream; |
27 | 28 | import java.io.IOException; |
28 | 29 | import java.net.UnknownHostException; |
| 30 | +import java.util.ArrayList; |
| 31 | +import java.util.HashMap; |
29 | 32 | import java.util.List; |
30 | 33 | import java.util.Map; |
31 | 34 | import java.util.Optional; |
|
66 | 69 | import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; |
67 | 70 | import org.apache.hadoop.hbase.util.Bytes; |
68 | 71 | import org.apache.hadoop.hbase.util.Pair; |
| 72 | +import org.apache.hadoop.hbase.zookeeper.ZKUtil; |
| 73 | +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; |
69 | 74 | import org.apache.hadoop.hdfs.MiniDFSCluster; |
70 | 75 | import org.junit.After; |
71 | 76 | import org.junit.Before; |
|
79 | 84 | import org.slf4j.Logger; |
80 | 85 | import org.slf4j.LoggerFactory; |
81 | 86 |
|
| 87 | +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; |
| 88 | + |
82 | 89 | /** |
83 | 90 | * Integration test for bulk load replication. Defines three clusters, with the following |
84 | 91 | * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between 2 |
@@ -322,4 +329,84 @@ public void postBulkLoadHFile(ObserverContext<? extends RegionCoprocessorEnviron |
322 | 329 | }); |
323 | 330 | } |
324 | 331 | } |
| 332 | + |
| 333 | + @Test |
| 334 | + public void testBulkloadReplicationActiveActiveForNoRepFamily() throws Exception { |
| 335 | + Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName); |
| 336 | + Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName); |
| 337 | + Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName); |
| 338 | + byte[] row = Bytes.toBytes("004"); |
| 339 | + byte[] value = Bytes.toBytes("v4"); |
| 340 | + assertBulkLoadConditionsForNoRepFamily(row, value, UTIL1, peer1TestTable, peer2TestTable, |
| 341 | + peer3TestTable); |
| 342 | + // additional wait to make sure no extra bulk load happens |
| 343 | + Thread.sleep(400); |
| 344 | + assertEquals(1, BULK_LOADS_COUNT.get()); |
| 345 | + ZKWatcher zkw = UTIL1.getZooKeeperWatcher(); |
| 346 | + List<String> znodes = ZKUtil.listChildrenNoWatch(zkw, "/1/replication/hfile-refs/2"); |
| 347 | + assertTrue(CollectionUtils.isEmpty(znodes)); |
| 348 | + |
| 349 | + } |
| 350 | + |
| 351 | + private void assertBulkLoadConditionsForNoRepFamily(byte[] row, byte[] value, |
| 352 | + HBaseTestingUtil utility, Table... tables) throws Exception { |
| 353 | + BULK_LOAD_LATCH = new CountDownLatch(1); |
| 354 | + bulkLoadOnClusterForNoRepFamily(row, value, utility); |
| 355 | + assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES)); |
| 356 | + assertTableHasValue(tables[0], row, value); |
| 357 | + assertTableNotHasValue(tables[1], row, value); |
| 358 | + assertTableNotHasValue(tables[2], row, value); |
| 359 | + } |
| 360 | + |
| 361 | + private void bulkLoadOnClusterForNoRepFamily(byte[] row, byte[] value, HBaseTestingUtil cluster) |
| 362 | + throws Exception { |
| 363 | + String bulkloadFile = createHFileForNoRepFamilies(row, value, cluster.getConfiguration()); |
| 364 | + Path bulkLoadFilePath = new Path(bulkloadFile); |
| 365 | + copyToHdfsForNoRepFamily(bulkloadFile, cluster.getDFSCluster()); |
| 366 | + BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration()); |
| 367 | + Map<byte[], List<Path>> family2Files = new HashMap<>(); |
| 368 | + List<Path> files = new ArrayList<>(); |
| 369 | + files.add(new Path( |
| 370 | + BULK_LOAD_BASE_DIR + "/" + Bytes.toString(noRepfamName) + "/" + bulkLoadFilePath.getName())); |
| 371 | + family2Files.put(noRepfamName, files); |
| 372 | + bulkLoadHFilesTool.bulkLoad(tableName, family2Files); |
| 373 | + } |
| 374 | + |
| 375 | + private String createHFileForNoRepFamilies(byte[] row, byte[] value, Configuration clusterConfig) |
| 376 | + throws IOException { |
| 377 | + ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY); |
| 378 | + cellBuilder.setRow(row).setFamily(TestReplicationBase.noRepfamName) |
| 379 | + .setQualifier(Bytes.toBytes("1")).setValue(value).setType(Cell.Type.Put); |
| 380 | + |
| 381 | + HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig); |
| 382 | + // TODO We need a way to do this without creating files |
| 383 | + File hFileLocation = testFolder.newFile(); |
| 384 | + FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null); |
| 385 | + try { |
| 386 | + hFileFactory.withOutputStream(out); |
| 387 | + hFileFactory.withFileContext(new HFileContextBuilder().build()); |
| 388 | + HFile.Writer writer = hFileFactory.create(); |
| 389 | + try { |
| 390 | + writer.append(new KeyValue(cellBuilder.build())); |
| 391 | + } finally { |
| 392 | + writer.close(); |
| 393 | + } |
| 394 | + } finally { |
| 395 | + out.close(); |
| 396 | + } |
| 397 | + return hFileLocation.getAbsoluteFile().getAbsolutePath(); |
| 398 | + } |
| 399 | + |
| 400 | + private void copyToHdfsForNoRepFamily(String bulkLoadFilePath, MiniDFSCluster cluster) |
| 401 | + throws Exception { |
| 402 | + Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR + "/" + Bytes.toString(noRepfamName) + "/"); |
| 403 | + cluster.getFileSystem().mkdirs(bulkLoadDir); |
| 404 | + cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir); |
| 405 | + } |
| 406 | + |
| 407 | + private void assertTableNotHasValue(Table table, byte[] row, byte[] value) throws IOException { |
| 408 | + Get get = new Get(row); |
| 409 | + Result result = table.get(get); |
| 410 | + assertNotEquals(Bytes.toString(value), Bytes.toString(result.value())); |
| 411 | + } |
325 | 412 | } |
0 commit comments