Skip to content

Commit 4dfcd1b

Browse files
chandrasekhar-188kApache9
authored andcommitted
HBASE-22335 do add hfile ref only when replication_scope is 1 (#6955)
Co-authored-by: chenxu14 <[email protected]> Signed-off-by: Duo Zhang <[email protected]> (cherry picked from commit e3aacaf)
1 parent 01832e7 commit 4dfcd1b

File tree

2 files changed

+124
-27
lines changed

2 files changed

+124
-27
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hadoop.conf.Configuration;
2424
import org.apache.hadoop.fs.Path;
2525
import org.apache.hadoop.hbase.HConstants;
26+
import org.apache.hadoop.hbase.TableName;
2627
import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
2728
import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
2829
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -31,6 +32,7 @@
3132
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
3233
import org.apache.hadoop.hbase.regionserver.HRegionServer;
3334
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
35+
import org.apache.hadoop.hbase.util.Bytes;
3436
import org.apache.hadoop.hbase.util.Pair;
3537
import org.apache.yetus.audience.InterfaceAudience;
3638
import org.slf4j.Logger;
@@ -65,11 +67,23 @@ public void preCommitStoreFile(final ObserverContext<? extends RegionCoprocessor
6567
+ "data replication.");
6668
return;
6769
}
70+
TableName tableName = env.getRegionInfo().getTable();
71+
if (
72+
env.getRegion().getTableDescriptor().getColumnFamily(family).getScope()
73+
!= HConstants.REPLICATION_SCOPE_GLOBAL
74+
) {
75+
LOG
76+
.debug("Skipping recording bulk load entries in preCommitStoreFile for table:{}, family:{},"
77+
+ " Because the replication is not enabled", tableName, Bytes.toString(family));
78+
return;
79+
}
80+
6881
// This is completely cheating AND getting a HRegionServer from a RegionServerEnvironment is
6982
// just going to break. This is all private. Not allowed. Regions shouldn't assume they are
7083
// hosted in a RegionServer. TODO: fix.
7184
RegionServerServices rss = ((HasRegionServerServices) env).getRegionServerServices();
7285
Replication rep = (Replication) ((HRegionServer) rss).getReplicationSourceService();
73-
rep.addHFileRefsToQueue(env.getRegionInfo().getTable(), family, pairs);
86+
87+
rep.addHFileRefsToQueue(tableName, family, pairs);
7488
}
7589
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java

Lines changed: 109 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@
2020
import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
2121
import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
2222
import static org.junit.Assert.assertEquals;
23+
import static org.junit.Assert.assertNotEquals;
2324
import static org.junit.Assert.assertTrue;
2425

2526
import java.io.File;
2627
import java.io.FileOutputStream;
2728
import java.io.IOException;
2829
import java.net.UnknownHostException;
30+
import java.util.ArrayList;
31+
import java.util.HashMap;
2932
import java.util.List;
3033
import java.util.Map;
3134
import java.util.Optional;
@@ -60,14 +63,15 @@
6063
import org.apache.hadoop.hbase.io.hfile.HFile;
6164
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
6265
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
66+
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
67+
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
6368
import org.apache.hadoop.hbase.replication.TestReplicationBase;
6469
import org.apache.hadoop.hbase.testclassification.MediumTests;
6570
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
6671
import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
6772
import org.apache.hadoop.hbase.util.Bytes;
6873
import org.apache.hadoop.hbase.util.Pair;
6974
import org.apache.hadoop.hdfs.MiniDFSCluster;
70-
import org.junit.After;
7175
import org.junit.Before;
7276
import org.junit.BeforeClass;
7377
import org.junit.ClassRule;
@@ -121,6 +125,10 @@ public class TestBulkLoadReplication extends TestReplicationBase {
121125
@ClassRule
122126
public static TemporaryFolder testFolder = new TemporaryFolder();
123127

128+
private static ReplicationQueueStorage queueStorage;
129+
130+
private static boolean replicationPeersAdded = false;
131+
124132
@BeforeClass
125133
public static void setUpBeforeClass() throws Exception {
126134
setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID);
@@ -129,6 +137,8 @@ public static void setUpBeforeClass() throws Exception {
129137
setupConfig(UTIL3, "/3");
130138
TestReplicationBase.setUpBeforeClass();
131139
startThirdCluster();
140+
queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(UTIL1.getConnection(),
141+
UTIL1.getConfiguration());
132142
}
133143

134144
private static void startThirdCluster() throws Exception {
@@ -152,22 +162,27 @@ private static void startThirdCluster() throws Exception {
152162
@Before
153163
@Override
154164
public void setUpBase() throws Exception {
155-
// "super.setUpBase()" already sets replication from 1->2,
156-
// then on the subsequent lines, sets 2->1, 2->3 and 3->2.
157-
// So we have following topology: "1 <-> 2 <->3"
158-
super.setUpBase();
159-
ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1);
160-
ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2);
161-
ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3);
162-
// adds cluster1 as a remote peer on cluster2
163-
UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config);
164-
// adds cluster3 as a remote peer on cluster2
165-
UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
166-
// adds cluster2 as a remote peer on cluster3
167-
UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
168-
setupCoprocessor(UTIL1);
169-
setupCoprocessor(UTIL2);
170-
setupCoprocessor(UTIL3);
165+
// removing the peer and adding again causing the previously completed bulk load jobs getting
166+
// submitted again, adding a check to add the peers only once.
167+
if (!replicationPeersAdded) {
168+
// "super.setUpBase()" already sets replication from 1->2,
169+
// then on the subsequent lines, sets 2->1, 2->3 and 3->2.
170+
// So we have following topology: "1 <-> 2 <->3"
171+
super.setUpBase();
172+
ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1);
173+
ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2);
174+
ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3);
175+
// adds cluster1 as a remote peer on cluster2
176+
UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config);
177+
// adds cluster3 as a remote peer on cluster2
178+
UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
179+
// adds cluster2 as a remote peer on cluster3
180+
UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
181+
setupCoprocessor(UTIL1);
182+
setupCoprocessor(UTIL2);
183+
setupCoprocessor(UTIL3);
184+
replicationPeersAdded = true;
185+
}
171186
BULK_LOADS_COUNT = new AtomicInteger(0);
172187
}
173188

@@ -195,15 +210,6 @@ private void setupCoprocessor(HBaseTestingUtil cluster) {
195210
});
196211
}
197212

198-
@After
199-
@Override
200-
public void tearDownBase() throws Exception {
201-
super.tearDownBase();
202-
UTIL2.getAdmin().removeReplicationPeer(PEER_ID1);
203-
UTIL2.getAdmin().removeReplicationPeer(PEER_ID3);
204-
UTIL3.getAdmin().removeReplicationPeer(PEER_ID2);
205-
}
206-
207213
protected static void setupBulkLoadConfigsForCluster(Configuration config,
208214
String clusterReplicationId) throws Exception {
209215
config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
@@ -322,4 +328,81 @@ public void postBulkLoadHFile(ObserverContext<? extends RegionCoprocessorEnviron
322328
});
323329
}
324330
}
331+
332+
@Test
333+
public void testBulkloadReplicationActiveActiveForNoRepFamily() throws Exception {
334+
Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName);
335+
Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName);
336+
Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
337+
byte[] row = Bytes.toBytes("004");
338+
byte[] value = Bytes.toBytes("v4");
339+
assertBulkLoadConditionsForNoRepFamily(row, value, UTIL1, peer1TestTable, peer2TestTable,
340+
peer3TestTable);
341+
// additional wait to make sure no extra bulk load happens
342+
Thread.sleep(400);
343+
assertEquals(1, BULK_LOADS_COUNT.get());
344+
assertEquals(0, queueStorage.getAllHFileRefs().size());
345+
}
346+
347+
private void assertBulkLoadConditionsForNoRepFamily(byte[] row, byte[] value,
348+
HBaseTestingUtil utility, Table... tables) throws Exception {
349+
BULK_LOAD_LATCH = new CountDownLatch(1);
350+
bulkLoadOnClusterForNoRepFamily(row, value, utility);
351+
assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
352+
assertTableHasValue(tables[0], row, value);
353+
assertTableNotHasValue(tables[1], row, value);
354+
assertTableNotHasValue(tables[2], row, value);
355+
}
356+
357+
private void bulkLoadOnClusterForNoRepFamily(byte[] row, byte[] value, HBaseTestingUtil cluster)
358+
throws Exception {
359+
String bulkloadFile = createHFileForNoRepFamilies(row, value, cluster.getConfiguration());
360+
Path bulkLoadFilePath = new Path(bulkloadFile);
361+
copyToHdfsForNoRepFamily(bulkloadFile, cluster.getDFSCluster());
362+
BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration());
363+
Map<byte[], List<Path>> family2Files = new HashMap<>();
364+
List<Path> files = new ArrayList<>();
365+
files.add(new Path(
366+
BULK_LOAD_BASE_DIR + "/" + Bytes.toString(noRepfamName) + "/" + bulkLoadFilePath.getName()));
367+
family2Files.put(noRepfamName, files);
368+
bulkLoadHFilesTool.bulkLoad(tableName, family2Files);
369+
}
370+
371+
private String createHFileForNoRepFamilies(byte[] row, byte[] value, Configuration clusterConfig)
372+
throws IOException {
373+
ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY);
374+
cellBuilder.setRow(row).setFamily(TestReplicationBase.noRepfamName)
375+
.setQualifier(Bytes.toBytes("1")).setValue(value).setType(Cell.Type.Put);
376+
377+
HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig);
378+
// TODO We need a way to do this without creating files
379+
File hFileLocation = testFolder.newFile();
380+
FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
381+
try {
382+
hFileFactory.withOutputStream(out);
383+
hFileFactory.withFileContext(new HFileContextBuilder().build());
384+
HFile.Writer writer = hFileFactory.create();
385+
try {
386+
writer.append(new KeyValue(cellBuilder.build()));
387+
} finally {
388+
writer.close();
389+
}
390+
} finally {
391+
out.close();
392+
}
393+
return hFileLocation.getAbsoluteFile().getAbsolutePath();
394+
}
395+
396+
private void copyToHdfsForNoRepFamily(String bulkLoadFilePath, MiniDFSCluster cluster)
397+
throws Exception {
398+
Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR + "/" + Bytes.toString(noRepfamName) + "/");
399+
cluster.getFileSystem().mkdirs(bulkLoadDir);
400+
cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
401+
}
402+
403+
private void assertTableNotHasValue(Table table, byte[] row, byte[] value) throws IOException {
404+
Get get = new Get(row);
405+
Result result = table.get(get);
406+
assertNotEquals(Bytes.toString(value), Bytes.toString(result.value()));
407+
}
325408
}

0 commit comments

Comments
 (0)