Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,22 +170,21 @@ protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup) throws I
LOG.debug("copying archive {} to {}", archive, tgt);
archiveFiles.add(archive.toString());
}
mergeSplitBulkloads(activeFiles, archiveFiles, srcTable);
incrementalCopyBulkloadHFiles(tgtFs, srcTable);
mergeSplitAndCopyBulkloadedHFiles(activeFiles, archiveFiles, srcTable, tgtFs);
}
return bulkLoads;
}

private void mergeSplitBulkloads(List<String> activeFiles, List<String> archiveFiles,
TableName tn) throws IOException {
private void mergeSplitAndCopyBulkloadedHFiles(List<String> activeFiles,
List<String> archiveFiles, TableName tn, FileSystem tgtFs) throws IOException {
int attempt = 1;

while (!activeFiles.isEmpty()) {
LOG.info("MergeSplit {} active bulk loaded files. Attempt={}", activeFiles.size(), attempt++);
// Active file can be archived during copy operation,
// we need to handle this properly
try {
mergeSplitBulkloads(activeFiles, tn);
mergeSplitAndCopyBulkloadedHFiles(activeFiles, tn, tgtFs);
break;
} catch (IOException e) {
int numActiveFiles = activeFiles.size();
Expand All @@ -199,11 +198,12 @@ private void mergeSplitBulkloads(List<String> activeFiles, List<String> archiveF
}

if (!archiveFiles.isEmpty()) {
mergeSplitBulkloads(archiveFiles, tn);
mergeSplitAndCopyBulkloadedHFiles(archiveFiles, tn, tgtFs);
}
}

private void mergeSplitBulkloads(List<String> files, TableName tn) throws IOException {
private void mergeSplitAndCopyBulkloadedHFiles(List<String> files, TableName tn, FileSystem tgtFs)
throws IOException {
MapReduceHFileSplitterJob player = new MapReduceHFileSplitterJob();
conf.set(MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY,
getBulkOutputDirForTable(tn).toString());
Expand All @@ -225,6 +225,8 @@ private void mergeSplitBulkloads(List<String> files, TableName tn) throws IOExce
throw new IOException(
"Failed to run MapReduceHFileSplitterJob with invalid result: " + result);
}

incrementalCopyBulkloadHFiles(tgtFs, tn);
}

private void updateFileLists(List<String> activeFiles, List<String> archiveFiles)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -101,11 +102,14 @@ public TestIncrementalBackup(Boolean b) {
public void ensurePreviousBackupTestsAreCleanedUp() throws Exception {
TEST_UTIL.flush(table1);
TEST_UTIL.flush(table2);
TEST_UTIL.flush(table1_restore);

TEST_UTIL.truncateTable(table1).close();
TEST_UTIL.truncateTable(table2).close();
TEST_UTIL.truncateTable(table1_restore).close();

if (TEST_UTIL.getAdmin().tableExists(table1_restore)) {
TEST_UTIL.flush(table1_restore);
TEST_UTIL.truncateTable(table1_restore).close();
}

TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(rst -> {
try {
Expand Down Expand Up @@ -429,6 +433,73 @@ public void TestIncBackupRestoreWithOriginalSplitsSeperateFs() throws Exception

}

@Test
public void TestIncBackupRestoreHandlesArchivedFiles() throws Exception {
byte[] fam2 = Bytes.toBytes("f2");
TableDescriptor newTable1Desc = TableDescriptorBuilder.newBuilder(table1Desc)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(fam2).build()).build();
TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
try (Connection conn = ConnectionFactory.createConnection(conf1);
BackupAdminImpl admin = new BackupAdminImpl(conn)) {
String backupTargetDir = TEST_UTIL.getDataTestDir("backupTarget").toString();
BACKUP_ROOT_DIR = new File(backupTargetDir).toURI().toString();

List<TableName> tables = Lists.newArrayList(table1);

insertIntoTable(conn, table1, famName, 3, 100);
String fullBackupId = takeFullBackup(tables, admin, true);
assertTrue(checkSucceeded(fullBackupId));

insertIntoTable(conn, table1, famName, 4, 100);

HRegion regionToBulkload = TEST_UTIL.getHBaseCluster().getRegions(table1).get(0);
String regionName = regionToBulkload.getRegionInfo().getEncodedName();
// Requires a mult-fam bulkload to ensure we're appropriately handling
// multi-file bulkloads
Path regionDir = doBulkload(table1, regionName, famName, fam2);

// archive the files in the region directory
Path archiveDir =
HFileArchiveUtil.getStoreArchivePath(conf1, table1, regionName, Bytes.toString(famName));
TEST_UTIL.getTestFileSystem().mkdirs(archiveDir);
RemoteIterator<LocatedFileStatus> iter =
TEST_UTIL.getTestFileSystem().listFiles(regionDir, true);
List<Path> paths = new ArrayList<>();
while (iter.hasNext()) {
Path path = iter.next().getPath();
if (path.toString().contains("_SeqId_")) {
paths.add(path);
}
}
assertTrue(paths.size() > 1);
Path path = paths.get(0);
String name = path.toString();
int startIdx = name.lastIndexOf(Path.SEPARATOR);
String filename = name.substring(startIdx + 1);
Path archiveFile = new Path(archiveDir, filename);
// archive 1 of the files
boolean success = TEST_UTIL.getTestFileSystem().rename(path, archiveFile);
assertTrue(success);
assertTrue(TEST_UTIL.getTestFileSystem().exists(archiveFile));
assertFalse(TEST_UTIL.getTestFileSystem().exists(path));

BackupRequest request =
createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR, true);
String incrementalBackupId = admin.backupTables(request);
assertTrue(checkSucceeded(incrementalBackupId));

TableName[] fromTable = new TableName[] { table1 };
TableName[] toTable = new TableName[] { table1_restore };

admin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, incrementalBackupId, false,
fromTable, toTable, true));

int actualRowCount = TEST_UTIL.countRows(table1_restore);
int expectedRowCount = TEST_UTIL.countRows(table1);
assertEquals(expectedRowCount, actualRowCount);
}
}

private void checkThrowsCFMismatch(IOException ex, List<TableName> tables) {
Throwable cause = Throwables.getRootCause(ex);
assertEquals(cause.getClass(), ColumnFamilyMismatchException.class);
Expand All @@ -450,12 +521,13 @@ private String takeFullBackup(List<TableName> tables, BackupAdminImpl backupAdmi
return backupId;
}

private static void doBulkload(TableName tn, String regionName, byte[]... fams)
private static Path doBulkload(TableName tn, String regionName, byte[]... fams)
throws IOException {
Path regionDir = createHFiles(tn, regionName, fams);
Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> results =
BulkLoadHFiles.create(conf1).bulkLoad(tn, regionDir);
assertFalse(results.isEmpty());
return regionDir;
}

private static Path createHFiles(TableName tn, String regionName, byte[]... fams)
Expand Down