Skip to content

Commit

Permalink
[SPARK-48900] Add reason field for cancelJobGroup and `cancelJobs…
Browse files Browse the repository at this point in the history
…WithTag`

### What changes were proposed in this pull request?

This PR introduces the optional `reason` field for `cancelJobGroup` and `cancelJobsWithTag` in `SparkContext.scala`, while keeping the old APIs without the `reason`, similar to how `cancelJob` is implemented currently.

### Why are the changes needed?

Today it is difficult to determine why a job, stage, or job group was canceled. We should leverage existing Spark functionality to provide a reason string explaining the cancellation cause, and should add new APIs to let us provide this reason when canceling job groups.

**Details:**

Since [SPARK-19549](https://issues.apache.org/jira/browse/SPARK-19549) Allow providing reasons for stage/job cancelling - ASF JIRA (Spark 2.20), Spark’s cancelJob and cancelStage methods accept an optional reason: String that is added to logging output and user-facing error messages when jobs or stages are canceled. In our internal calls to these methods, we should always supply a reason. For example, we should set an appropriate reason when the “kill” links are clicked in the Spark UI (see [code](https://github.com/apache/spark/blob/b14c1f036f8f394ad1903998128c05d04dd584a9/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala#L54C1-L55)).
Other APIs currently lack a reason field. For example, cancelJobGroup and cancelJobsWithTag don’t provide any way to specify a reason, so we only see generic logs like “asked to cancel job group <group name>”. We should add an ability to pass in a group cancellation reason and thread that through into the scheduler’s logging and job failure reasons.

This feature can be implemented in two PRs:

1. Modify the current SparkContext and its downstream APIs to add the reason string, such as cancelJobGroup and cancelJobsWithTag

2. Add reasons for all internal calls to these methods.

**Note: This is the first of the two PRs to implement this new feature**

### Does this PR introduce _any_ user-facing change?

Yes, it modifies the SparkContext API, allowing users to add an optional `reason: String` to `cancelJobsWithTags` and `cancelJobGroup`, while the old methods without the `reason` are also kept. This creates a more uniform interface where the user can supply an optional reason for all job/stage cancellation calls.

### How was this patch tested?

New tests are added to `JobCancellationSuite` to test the reason fields for these calls.

For the API changes in R and PySpark, tests are added to these files:
- R/pkg/tests/fulltests/test_context.R
- python/pyspark/tests/test_pin_thread.py

### Was this patch authored or co-authored using generative AI tooling?

 No

Closes #47361 from mingkangli-db/reason_job_cancellation.

Authored-by: Mingkang Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
mingkangli-db authored and cloud-fan committed Jul 18, 2024
1 parent 9d4ebf7 commit 41b37ae
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 21 deletions.
56 changes: 52 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2633,21 +2633,69 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Cancel active jobs for the specified group. See `org.apache.spark.SparkContext.setJobGroup`
* for more information.
*
* @param groupId the group ID to cancel
* @param reason reason for cancellation
*
* @since 4.0.0
*/
def cancelJobGroup(groupId: String, reason: String): Unit = {
assertNotStopped()
dagScheduler.cancelJobGroup(groupId, cancelFutureJobs = false, Option(reason))
}

/**
* Cancel active jobs for the specified group. See `org.apache.spark.SparkContext.setJobGroup`
* for more information.
*
* @param groupId the group ID to cancel
*/
def cancelJobGroup(groupId: String): Unit = {
assertNotStopped()
dagScheduler.cancelJobGroup(groupId)
dagScheduler.cancelJobGroup(groupId, cancelFutureJobs = false, None)
}

/**
* Cancel active jobs for the specified group, as well as the future jobs in this job group.
* Note: the maximum number of job groups that can be tracked is set by
* 'spark.scheduler.numCancelledJobGroupsToTrack'. Once the limit is reached and a new job group
* is to be added, the oldest job group tracked will be discarded.
*
* @param groupId the group ID to cancel
* @param reason reason for cancellation
*
* @since 4.0.0
*/
def cancelJobGroupAndFutureJobs(groupId: String, reason: String): Unit = {
assertNotStopped()
dagScheduler.cancelJobGroup(groupId, cancelFutureJobs = true, Option(reason))
}

/**
* Cancel active jobs for the specified group, as well as the future jobs in this job group.
* Note: the maximum number of job groups that can be tracked is set by
* 'spark.scheduler.numCancelledJobGroupsToTrack'. Once the limit is reached and a new job group
* is to be added, the oldest job group tracked will be discarded.
*
* @param groupId the group ID to cancel
*/
def cancelJobGroupAndFutureJobs(groupId: String): Unit = {
assertNotStopped()
dagScheduler.cancelJobGroup(groupId, cancelFutureJobs = true)
dagScheduler.cancelJobGroup(groupId, cancelFutureJobs = true, None)
}

/**
* Cancel active jobs that have the specified tag. See `org.apache.spark.SparkContext.addJobTag`.
*
* @param tag The tag to be cancelled. Cannot contain ',' (comma) character.
* @param reason reason for cancellation
*
* @since 4.0.0
*/
def cancelJobsWithTag(tag: String, reason: String): Unit = {
SparkContext.throwIfInvalidTag(tag)
assertNotStopped()
dagScheduler.cancelJobsWithTag(tag, Option(reason))
}

/**
Expand All @@ -2660,7 +2708,7 @@ class SparkContext(config: SparkConf) extends Logging {
def cancelJobsWithTag(tag: String): Unit = {
SparkContext.throwIfInvalidTag(tag)
assertNotStopped()
dagScheduler.cancelJobsWithTag(tag)
dagScheduler.cancelJobsWithTag(tag, None)
}

/** Cancel all jobs that have been scheduled or are running. */
Expand All @@ -2673,7 +2721,7 @@ class SparkContext(config: SparkConf) extends Logging {
* Cancel a given job if it's scheduled or running.
*
* @param jobId the job ID to cancel
* @param reason optional reason for cancellation
* @param reason reason for cancellation
* @note Throws `InterruptedException` if the cancel message cannot be sent
*/
def cancelJob(jobId: Int, reason: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,9 +763,32 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {
/**
* Cancel active jobs for the specified group. See
* `org.apache.spark.api.java.JavaSparkContext.setJobGroup` for more information.
*
* @param groupId the group ID to cancel
* @param reason reason for cancellation
*
* @since 4.0.0
*/
def cancelJobGroup(groupId: String, reason: String): Unit = sc.cancelJobGroup(groupId, reason)

/**
* Cancel active jobs for the specified group. See
* `org.apache.spark.api.java.JavaSparkContext.setJobGroup` for more information.
*
* @param groupId the group ID to cancel
*/
def cancelJobGroup(groupId: String): Unit = sc.cancelJobGroup(groupId)

/**
* Cancel active jobs that have the specified tag. See `org.apache.spark.SparkContext.addJobTag`.
*
* @param tag The tag to be cancelled. Cannot contain ',' (comma) character.
* @param reason reason for cancellation
*
* @since 4.0.0
*/
def cancelJobsWithTag(tag: String, reason: String): Unit = sc.cancelJobsWithTag(tag, reason)

/**
* Cancel active jobs that have the specified tag. See `org.apache.spark.SparkContext.addJobTag`.
*
Expand Down
29 changes: 15 additions & 14 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1108,19 +1108,19 @@ private[spark] class DAGScheduler(
* Cancel all jobs in the given job group ID.
* @param cancelFutureJobs if true, future submitted jobs in this job group will be cancelled
*/
def cancelJobGroup(groupId: String, cancelFutureJobs: Boolean = false): Unit = {
def cancelJobGroup(groupId: String, cancelFutureJobs: Boolean = false, reason: Option[String]): Unit = {
logInfo(log"Asked to cancel job group ${MDC(GROUP_ID, groupId)} with " +
log"cancelFutureJobs=${MDC(CANCEL_FUTURE_JOBS, cancelFutureJobs)}")
eventProcessLoop.post(JobGroupCancelled(groupId, cancelFutureJobs))
eventProcessLoop.post(JobGroupCancelled(groupId, cancelFutureJobs, reason))
}

/**
* Cancel all jobs with a given tag.
*/
def cancelJobsWithTag(tag: String): Unit = {
def cancelJobsWithTag(tag: String, reason: Option[String]): Unit = {
SparkContext.throwIfInvalidTag(tag)
logInfo(log"Asked to cancel jobs with tag ${MDC(TAG, tag)}")
eventProcessLoop.post(JobTagCancelled(tag))
eventProcessLoop.post(JobTagCancelled(tag, reason))
}

/**
Expand Down Expand Up @@ -1209,7 +1209,8 @@ private[spark] class DAGScheduler(

private[scheduler] def handleJobGroupCancelled(
groupId: String,
cancelFutureJobs: Boolean): Unit = {
cancelFutureJobs: Boolean,
reason: Option[String]): Unit = {
// If cancelFutureJobs is true, store the cancelled job group id into internal states.
// When a job belonging to this job group is submitted, skip running it.
if (cancelFutureJobs) {
Expand All @@ -1229,11 +1230,11 @@ private[spark] class DAGScheduler(
log"Cannot find active jobs for it.")
}
val jobIds = activeInGroup.map(_.jobId)
jobIds.foreach(handleJobCancellation(_,
Option("part of cancelled job group %s".format(groupId))))
val updatedReason = reason.getOrElse("part of cancelled job group %s".format(groupId))
jobIds.foreach(handleJobCancellation(_, Option(updatedReason)))
}

private[scheduler] def handleJobTagCancelled(tag: String): Unit = {
private[scheduler] def handleJobTagCancelled(tag: String, reason: Option[String]): Unit = {
// Cancel all jobs belonging that have this tag.
// First finds all active jobs with this group id, and then kill stages for them.
val jobIds = activeJobs.filter { activeJob =>
Expand All @@ -1242,8 +1243,8 @@ private[spark] class DAGScheduler(
.split(SparkContext.SPARK_JOB_TAGS_SEP).filter(!_.isEmpty).toSet.contains(tag)
}
}.map(_.jobId)
jobIds.foreach(handleJobCancellation(_,
Option(s"part of cancelled job tag $tag")))
val updatedReason = reason.getOrElse("part of cancelled job tag %s".format(tag))
jobIds.foreach(handleJobCancellation(_, Option(updatedReason)))
}

private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo): Unit = {
Expand Down Expand Up @@ -3109,11 +3110,11 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case JobCancelled(jobId, reason) =>
dagScheduler.handleJobCancellation(jobId, reason)

case JobGroupCancelled(groupId, cancelFutureJobs) =>
dagScheduler.handleJobGroupCancelled(groupId, cancelFutureJobs)
case JobGroupCancelled(groupId, cancelFutureJobs, reason) =>
dagScheduler.handleJobGroupCancelled(groupId, cancelFutureJobs, reason)

case JobTagCancelled(tag) =>
dagScheduler.handleJobTagCancelled(tag)
case JobTagCancelled(tag, reason) =>
dagScheduler.handleJobTagCancelled(tag, reason)

case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,13 @@ private[scheduler] case class JobCancelled(

private[scheduler] case class JobGroupCancelled(
groupId: String,
cancelFutureJobs: Boolean = false)
cancelFutureJobs: Boolean = false,
reason: Option[String])
extends DAGSchedulerEvent

private[scheduler] case class JobTagCancelled(tagName: String) extends DAGSchedulerEvent
private[scheduler] case class JobTagCancelled(
tagName: String,
reason: Option[String]) extends DAGSchedulerEvent

private[scheduler] case object AllJobsCancelled extends DAGSchedulerEvent

Expand Down
68 changes: 67 additions & 1 deletion core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,35 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
assert(jobB.get() === 100)
}

test("job group with custom reason") {
sc = new SparkContext("local[2]", "test")

// Add a listener to release the semaphore once any tasks are launched.
val sem = new Semaphore(0)
sc.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
sem.release()
}
})

// jobA is the one to be cancelled.
val jobA = Future {
sc.setJobGroup("jobA", "this is a job to be cancelled")
sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
}

// Block until both tasks of job A have started and cancel job A.
sem.acquire(2)

val reason = "cancelled: test for custom reason string"
sc.clearJobGroup()
sc.cancelJobGroup("jobA", reason)

val e = intercept[SparkException] { ThreadUtils.awaitResult(jobA, Duration.Inf) }.getCause
assert(e.getMessage contains "cancel")
assert(e.getMessage contains reason)
}

test("if cancel job group and future jobs, skip running jobs in the same job group") {
sc = new SparkContext("local[2]", "test")

Expand Down Expand Up @@ -204,6 +233,39 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
assert(sc.parallelize(1 to 100).count() == 100)
}

test("cancel job group and future jobs with custom reason") {
sc = new SparkContext("local[2]", "test")

val sem = new Semaphore(0)
sc.addSparkListener(new SparkListener {
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
sem.release()
}
})

// run a job, cancel the job group and its future jobs
val jobGroupName = "job-group"
val job = Future {
sc.setJobGroup(jobGroupName, "")
sc.parallelize(1 to 1000).map { i => Thread.sleep (100); i}.count()
}
// block until job starts
sem.acquire(1)
// cancel the job group and future jobs
val reason = "cancelled: test for custom reason string"
sc.cancelJobGroupAndFutureJobs(jobGroupName, reason)
ThreadUtils.awaitReady(job, Duration.Inf).failed.foreach { case e: SparkException =>
checkError(
exception = e,
errorClass = "SPARK_JOB_CANCELLED",
sqlState = "XXKDA",
parameters = scala.collection.immutable.Map(
"jobId" -> "0",
"reason" -> reason)
)
}
}

test("only keeps limited number of cancelled job groups") {
val conf = new SparkConf()
.set(NUM_CANCELLED_JOB_GROUPS_TO_TRACK, 5)
Expand Down Expand Up @@ -325,11 +387,15 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
val acquired1 = sem.tryAcquire(4, 1, TimeUnit.MINUTES)
assert(acquired1 == true)

sc.cancelJobsWithTag("two")
// Test custom cancellation reason for job tags
val reason = "job tag cancelled: custom reason test"
sc.cancelJobsWithTag("two", reason)
val eB = intercept[SparkException] {
ThreadUtils.awaitResult(jobB, 1.minute)
}.getCause
assert(eB.getMessage contains "cancel")
assert(eB.getMessage contains reason)

val eC = intercept[SparkException] {
ThreadUtils.awaitResult(jobC, 1.minute)
}.getCause
Expand Down

0 comments on commit 41b37ae

Please sign in to comment.