Skip to content

Commit 8856e9f

Browse files
shahidki31Marcelo Vanzin
authored andcommitted
[SPARK-26219][CORE] Executor summary should get updated for failure jobs in the history server UI
The root cause of the problem is, whenever the taskEnd event comes after stageCompleted event, execSummary is updating only for live UI. we need to update for history UI too. To see the previous discussion, refer: PR for apache#23038, https://issues.apache.org/jira/browse/SPARK-26100. Added UT. Manually verified Test step to reproduce: ``` bin/spark-shell --master yarn --conf spark.executor.instances=3 sc.parallelize(1 to 10000, 10).map{ x => throw new RuntimeException("Bad executor")}.collect() ``` Open Executors page from the History UI Before patch: ![screenshot from 2018-11-29 22-13-34](https://user-images.githubusercontent.com/23054875/49246338-a21ead00-f43a-11e8-8214-f1020420be52.png) After patch: ![screenshot from 2018-11-30 00-54-49](https://user-images.githubusercontent.com/23054875/49246353-aa76e800-f43a-11e8-98ef-7faecaa7a50e.png) Closes apache#23181 from shahidki31/executorUpdate. Authored-by: Shahid <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent 36edbac commit 8856e9f

File tree

2 files changed

+64
-47
lines changed

2 files changed

+64
-47
lines changed

core/src/main/scala/org/apache/spark/status/AppStatusListener.scala

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -641,9 +641,14 @@ private[spark] class AppStatusListener(
641641
}
642642
}
643643

644-
// Force an update on live applications when the number of active tasks reaches 0. This is
645-
// checked in some tests (e.g. SQLTestUtilsBase) so it needs to be reliably up to date.
646-
conditionalLiveUpdate(exec, now, exec.activeTasks == 0)
644+
// Force an update on both live and history applications when the number of active tasks
645+
// reaches 0. This is checked in some tests (e.g. SQLTestUtilsBase) so it needs to be
646+
// reliably up to date.
647+
if (exec.activeTasks == 0) {
648+
update(exec, now)
649+
} else {
650+
maybeUpdate(exec, now)
651+
}
647652
}
648653
}
649654

@@ -1024,14 +1029,6 @@ private[spark] class AppStatusListener(
10241029
}
10251030
}
10261031

1027-
private def conditionalLiveUpdate(entity: LiveEntity, now: Long, condition: Boolean): Unit = {
1028-
if (condition) {
1029-
liveUpdate(entity, now)
1030-
} else {
1031-
maybeUpdate(entity, now)
1032-
}
1033-
}
1034-
10351032
private def cleanupExecutors(count: Long): Unit = {
10361033
// Because the limit is on the number of *dead* executors, we need to calculate whether
10371034
// there are actually enough dead executors to be deleted.

core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala

Lines changed: 56 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1273,48 +1273,68 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
12731273
assert(allJobs.head.numFailedStages == 1)
12741274
}
12751275

1276-
test("SPARK-25451: total tasks in the executor summary should match total stage tasks") {
1277-
val testConf = conf.clone.set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue)
1276+
Seq(true, false).foreach { live =>
1277+
test(s"Total tasks in the executor summary should match total stage tasks (live = $live)") {
12781278

1279-
val listener = new AppStatusListener(store, testConf, true)
1279+
val testConf = if (live) {
1280+
conf.clone().set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue)
1281+
} else {
1282+
conf.clone().set(LIVE_ENTITY_UPDATE_PERIOD, -1L)
1283+
}
12801284

1281-
val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details")
1282-
listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
1283-
listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
1285+
val listener = new AppStatusListener(store, testConf, live)
12841286

1285-
val tasks = createTasks(4, Array("1", "2"))
1286-
tasks.foreach { task =>
1287-
listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
1288-
}
1287+
listener.onExecutorAdded(createExecutorAddedEvent(1))
1288+
listener.onExecutorAdded(createExecutorAddedEvent(2))
1289+
val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details")
1290+
listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
1291+
listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
12891292

1290-
time += 1
1291-
tasks(0).markFinished(TaskState.FINISHED, time)
1292-
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
1293-
Success, tasks(0), null))
1294-
time += 1
1295-
tasks(1).markFinished(TaskState.FINISHED, time)
1296-
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
1297-
Success, tasks(1), null))
1293+
val tasks = createTasks(4, Array("1", "2"))
1294+
tasks.foreach { task =>
1295+
listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
1296+
}
12981297

1299-
stage.failureReason = Some("Failed")
1300-
listener.onStageCompleted(SparkListenerStageCompleted(stage))
1301-
time += 1
1302-
listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(new RuntimeException("Bad Executor"))))
1298+
time += 1
1299+
tasks(0).markFinished(TaskState.FINISHED, time)
1300+
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
1301+
Success, tasks(0), null))
1302+
time += 1
1303+
tasks(1).markFinished(TaskState.FINISHED, time)
1304+
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
1305+
Success, tasks(1), null))
13031306

1304-
time += 1
1305-
tasks(2).markFinished(TaskState.FAILED, time)
1306-
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
1307-
ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null))
1308-
time += 1
1309-
tasks(3).markFinished(TaskState.FAILED, time)
1310-
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
1311-
ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null))
1312-
1313-
val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info)
1314-
esummary.foreach { execSummary =>
1315-
assert(execSummary.failedTasks === 1)
1316-
assert(execSummary.succeededTasks === 1)
1317-
assert(execSummary.killedTasks === 0)
1307+
stage.failureReason = Some("Failed")
1308+
listener.onStageCompleted(SparkListenerStageCompleted(stage))
1309+
time += 1
1310+
listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(
1311+
new RuntimeException("Bad Executor"))))
1312+
1313+
time += 1
1314+
tasks(2).markFinished(TaskState.FAILED, time)
1315+
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
1316+
ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null))
1317+
time += 1
1318+
tasks(3).markFinished(TaskState.FAILED, time)
1319+
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
1320+
ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null))
1321+
1322+
val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info)
1323+
esummary.foreach { execSummary =>
1324+
assert(execSummary.failedTasks === 1)
1325+
assert(execSummary.succeededTasks === 1)
1326+
assert(execSummary.killedTasks === 0)
1327+
}
1328+
1329+
val allExecutorSummary = store.view(classOf[ExecutorSummaryWrapper]).asScala.map(_.info)
1330+
assert(allExecutorSummary.size === 2)
1331+
allExecutorSummary.foreach { allExecSummary =>
1332+
assert(allExecSummary.failedTasks === 1)
1333+
assert(allExecSummary.activeTasks === 0)
1334+
assert(allExecSummary.completedTasks === 1)
1335+
}
1336+
store.delete(classOf[ExecutorSummaryWrapper], "1")
1337+
store.delete(classOf[ExecutorSummaryWrapper], "2")
13181338
}
13191339
}
13201340

0 commit comments

Comments
 (0)