Skip to content

Commit

Permalink
Merge pull request #16 from arenadata/feature/metastore-leak-fixes
Browse files Browse the repository at this point in the history
Feature/metastore leak fixes
  • Loading branch information
Asmoday authored May 29, 2023
2 parents d8c0204 + 35221e8 commit 790b141
Show file tree
Hide file tree
Showing 45 changed files with 1,809 additions and 723 deletions.
9 changes: 9 additions & 0 deletions common/src/java/org/apache/hadoop/hive/common/LogUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ public LogInitializationException(String msg) {
}
}

/**
* This is an exception that can be passed to logger just for printing the stacktrace.
*/
public static class StackTraceLogger extends Exception {
public StackTraceLogger(final String msg) {
super(msg);
}
}

/**
* Initialize log4j.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class TimestampTZUtil {
optionalEnd().optionalEnd();
// Zone part
builder.optionalStart().appendLiteral(" ").optionalEnd();
builder.optionalStart().appendZoneText(TextStyle.NARROW).optionalEnd();
builder.optionalStart().appendZoneOrOffsetId().optionalEnd();

FORMATTER = builder.toFormatter();
}
Expand All @@ -89,9 +89,13 @@ public static TimestampTZ parse(String s) {
}

public static TimestampTZ parse(String s, ZoneId defaultTimeZone) {
return parse(s, defaultTimeZone, FORMATTER);
}

public static TimestampTZ parse(String s, ZoneId defaultTimeZone, DateTimeFormatter formatter) {
// need to handle offset with single digital hour, see JDK-8066806
s = handleSingleDigitHourOffset(s);
TemporalAccessor accessor = FORMATTER.parse(s);
TemporalAccessor accessor = formatter.parse(s);

LocalDate localDate = accessor.query(TemporalQueries.localDate());

Expand Down
8 changes: 7 additions & 1 deletion common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -2657,6 +2657,8 @@ public static enum ConfVars {
// For Arrow SerDe
HIVE_ARROW_ROOT_ALLOCATOR_LIMIT("hive.arrow.root.allocator.limit", Long.MAX_VALUE,
"Arrow root allocator memory size limitation in bytes."),
HIVE_ARROW_BATCH_ALLOCATOR_LIMIT("hive.arrow.batch.allocator.limit", 10_000_000_000L,
"Max bytes per arrow batch. This is a threshold, the memory is not pre-allocated."),
HIVE_ARROW_BATCH_SIZE("hive.arrow.batch.size", 1000, "The number of rows sent in one Arrow batch."),

// For Druid storage handler
Expand Down Expand Up @@ -3640,7 +3642,11 @@ public static enum ConfVars {
"internal use only. When false, don't suppress fatal exceptions like\n" +
"NullPointerException, etc so the query will fail and assure it will be noticed",
true),

HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED(
"hive.vectorized.execution.filesink.arrow.native.enabled", false,
"This flag should be set to true to enable the native vectorization\n" +
"of queries using the Arrow SerDe and FileSink.\n" +
"The default value is false."),
HIVE_TYPE_CHECK_ON_INSERT("hive.typecheck.on.insert", true, "This property has been extended to control "
+ "whether to check, convert, and normalize partition value to conform to its column type in "
+ "partition operations including but not limited to insert, such as alter, describe etc."),
Expand Down
16 changes: 15 additions & 1 deletion hcatalog/hcatalog-pig-adapter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,21 @@
<artifactId>joda-time</artifactId>
<version>${joda.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- test intra-project -->
<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
Expand Down
5 changes: 5 additions & 0 deletions hcatalog/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${joda.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.messaging.EventUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.events.EventUtils;
import org.apache.hive.hcatalog.api.HCatClient;
import org.apache.hive.hcatalog.api.HCatNotificationEvent;
import org.apache.thrift.TException;
Expand All @@ -43,6 +45,9 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Utility class to enable testing of Replv1 compatibility testing.
*
Expand All @@ -65,6 +70,8 @@ public class ReplicationV1CompatRule implements TestRule {
private HiveConf hconf = null;
private List<String> testsToSkip = null;

private Hive hiveDb;

public ReplicationV1CompatRule(IMetaStoreClient metaStoreClient, HiveConf hconf){
this(metaStoreClient, hconf, new ArrayList<String>());
}
Expand All @@ -79,6 +86,7 @@ protected Long initialValue(){
};
this.testsToSkip = testsToSkip;
LOG.info("Replv1 backward compatibility tester initialized at " + testEventId.get());
this.hiveDb = mock(Hive.class);
}

private Long getCurrentNotificationId(){
Expand Down Expand Up @@ -137,16 +145,17 @@ public boolean accept(NotificationEvent notificationEvent) {
return true;
}
};
EventUtils.MSClientNotificationFetcher evFetcher =
new EventUtils.MSClientNotificationFetcher(metaStoreClient);
try {
when(hiveDb.getMSC()).thenReturn(metaStoreClient);
EventUtils.MSClientNotificationFetcher evFetcher =
new EventUtils.MSClientNotificationFetcher(hiveDb);
EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
evFetcher, testEventIdBefore,
Ints.checkedCast(testEventIdAfter - testEventIdBefore) + 1,
evFilter);
evFetcher, testEventIdBefore,
Ints.checkedCast(testEventIdAfter - testEventIdBefore) + 1,
evFilter);
ReplicationTask.resetFactory(null);
assertTrue("We should have found some events",evIter.hasNext());
while (evIter.hasNext()){
assertTrue("We should have found some events", evIter.hasNext());
while (evIter.hasNext()) {
eventCount++;
NotificationEvent ev = evIter.next();
// convert to HCatNotificationEvent, and then try to instantiate a ReplicationTask on it.
Expand All @@ -155,11 +164,11 @@ public boolean accept(NotificationEvent notificationEvent) {
if (rtask instanceof ErroredReplicationTask) {
unhandledTasks.put(ev, ((ErroredReplicationTask) rtask).getCause());
}
} catch (RuntimeException re){
} catch (RuntimeException re) {
incompatibleTasks.put(ev, re);
}
}
} catch (IOException e) {
} catch (IOException | MetaException e) {
assertNull("Got an exception when we shouldn't have - replv1 backward incompatibility issue:",e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.hadoop.hive.ql.ErrorMsg;

import javax.annotation.Nullable;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
Expand Down Expand Up @@ -1555,7 +1556,7 @@ public void testInsertToMultiKeyPartition() throws IOException {
run("USE " + replDbName, driverMirror);
verifyRunWithPatternMatch("SHOW TABLE EXTENDED LIKE namelist PARTITION (year=1990,month=5,day=25)",
"location", "namelist/year=1990/month=5/day=25", driverMirror);
run("USE " + dbName, driverMirror);
run("USE " + dbName, driver);

String[] ptn_data_3 = new String[] { "abraham", "bob", "carter", "david", "fisher" };
String[] data_after_ovwrite = new String[] { "fisher" };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,28 +111,9 @@ public abstract class BaseJdbcWithMiniLlap {

// This method should be called by sub-classes in a @BeforeClass initializer
public static void beforeTest(boolean useArrow) throws Exception {
conf = inputConf;
Class.forName(MiniHS2.getJdbcDriverName());

String confDir = "../../data/conf/llap/";
if (confDir != null && !confDir.isEmpty()) {
HiveConf.setHiveSiteLocation(new URL("file://"+ new File(confDir).toURI().getPath() + "/hive-site.xml"));
System.out.println("Setting hive-site: "+HiveConf.getHiveSiteLocation());
}

conf = new HiveConf();
conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
if(useArrow) {
conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
} else {
conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, false);
}

conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
+ "/tez-site.xml"));

miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);

dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
kvDataFilePath = new Path(dataFileDir, "kv1.txt");
dataTypesFilePath = new Path(dataFileDir, "datatypes.txt");
Expand All @@ -141,6 +122,19 @@ public static void beforeTest(boolean useArrow) throws Exception {
miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous"));
}

static HiveConf defaultConf() throws Exception {
String confDir = "../../data/conf/llap/";
if (confDir != null && !confDir.isEmpty()) {
HiveConf.setHiveSiteLocation(new URL("file://"+ new File(confDir).toURI().getPath() + "/hive-site.xml"));
System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation());
}
HiveConf defaultConf = new HiveConf();
defaultConf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
defaultConf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
defaultConf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml"));
return defaultConf;
}

@Before
public void setUp() throws Exception {
hs2Conn = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
Expand Down Expand Up @@ -546,6 +540,8 @@ private int processQuery(String currentDatabase, String query, int numSplits, Ro
rowProcessor.process(row);
++rowCount;
}
//In arrow-mode this will throw exception unless all buffers have been released
//See org.apache.hadoop.hive.llap.LlapArrowBatchRecordReader
reader.close();
}
LlapBaseInputFormat.close(handleId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.junit.Before;
import org.junit.After;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;

/**
* TestJdbcWithMiniLlap for llap Row format.
Expand All @@ -33,7 +35,9 @@ public class TestJdbcWithMiniLlapRow extends BaseJdbcWithMiniLlap {

@BeforeClass
public static void beforeTest() throws Exception {
BaseJdbcWithMiniLlap.beforeTest(false);
HiveConf conf = defaultConf();
conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, false);
BaseJdbcWithMiniLlap.beforeTest(conf);
}

@Override
Expand Down
Loading

0 comments on commit 790b141

Please sign in to comment.