From 76490d404fc19f86ab2b8413d25b606157b1821f Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Tue, 3 Sep 2024 16:15:44 +0530 Subject: [PATCH] TEZ-4561: Improve reported exception when DAGAppMaster is shutting down. (#365). (Ayush Saxena, reviewed by Laszlo Bodor) --- .../org/apache/tez/dag/app/DAGAppMaster.java | 24 +++++++++++++-- .../apache/tez/dag/app/TestDAGAppMaster.java | 29 +++++++++++++++++-- 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 263ac76b4c..9c7cc18b60 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -36,6 +36,7 @@ import java.util.Arrays; import java.util.Calendar; import java.util.Collections; +import java.util.Date; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -936,6 +937,15 @@ public void handle(DAGAppMasterEvent event) { protected class DAGAppMasterShutdownHandler { private AtomicBoolean shutdownHandled = new AtomicBoolean(false); private long sleepTimeBeforeExit = TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT; + private long shutdownTime; + + public Date getShutdownTime() { + return new Date(shutdownTime); + } + + public void setShutdownTime(long shutdownTime) { + this.shutdownTime = shutdownTime; + } void setSleepTimeBeforeExit(long sleepTimeBeforeExit) { this.sleepTimeBeforeExit = sleepTimeBeforeExit; @@ -954,6 +964,7 @@ public void shutdown(boolean now) { synchronized (shutdownHandlerRunning) { shutdownHandlerRunning.set(true); + setShutdownTime(System.currentTimeMillis()); } LOG.info("Handling DAGAppMaster shutdown"); @@ -1680,9 +1691,11 @@ public HadoopShim getHadoopShim() { @Override public Map getApplicationACLs() { - if (getServiceState() != STATE.STARTED) { + STATE serviceState = getServiceState(); + if (serviceState != STATE.STARTED) { throw new TezUncheckedException( - "Cannot get ApplicationACLs before all services have started"); + "Cannot get ApplicationACLs before all services have started, The current service state is " + serviceState + + "." + getShutdownTimeString()); } return taskSchedulerManager.getApplicationAcls(); } @@ -1743,6 +1756,13 @@ public void setQueueName(String queueName) { } } + private String getShutdownTimeString() { + if (shutdownHandler != null && shutdownHandler.getShutdownTime() != null) { + return " The shutdown hook started at " + shutdownHandler.getShutdownTime(); + } + return ""; + } + private static class ServiceWithDependency implements ServiceStateChangeListener { ServiceWithDependency(Service service) { this.service = service; diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java index bb1e6de505..46e8c98510 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java @@ -32,7 +32,8 @@ import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -49,6 +50,7 @@ import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; @@ -67,18 +69,19 @@ import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; import java.io.File; -import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.lang.reflect.Field; -import java.net.URI; import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.Date; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -493,6 +496,26 @@ public void testDagCredentialsWithMerge() throws Exception { testDagCredentials(true); } + @Test + public void testGetACLFailure() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 2); + DAGAppMasterForTest dam = new DAGAppMasterForTest(attemptId, true); + TezConfiguration conf = new TezConfiguration(false); + conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false); + dam.init(conf); + LambdaTestUtils.intercept(TezUncheckedException.class, + "Cannot get ApplicationACLs before all services have started, The current service state is INITED", + () -> dam.getContext().getApplicationACLs()); + dam.start(); + dam.stop(); + Mockito.when(dam.mockShutdown.getShutdownTime()).thenReturn(Date.from(Instant.ofEpochMilli(Time.now()))); + LambdaTestUtils.intercept(TezUncheckedException.class, + " Cannot get ApplicationACLs before all services have started, " + + "The current service state is STOPPED. The shutdown hook started at " + + dam.mockShutdown.getShutdownTime(), () -> dam.getContext().getApplicationACLs()); + } + @Test public void testBadProgress() throws Exception { TezConfiguration conf = new TezConfiguration();