Skip to content

Commit

Permalink
remove the jobId from the map when one job failed.
Browse files Browse the repository at this point in the history
  • Loading branch information
wbo4958 committed Aug 26, 2020
1 parent 8852f59 commit f1df786
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 1 deletion.
Expand Up @@ -135,6 +135,7 @@ class TaskFailedListener(killSparkContext: Boolean = true) extends SparkListener

if (stageIds.contains(stageId)) {
logger.error("Cancelling jobId:" + jobId)
jobIdToStageIds.remove(jobId)
SparkContext.getOrCreate().cancelJob(jobId)
}
})
Expand Down
Expand Up @@ -106,7 +106,7 @@ class SparkParallelismTrackerSuite extends FunSuite with PerTest {
}
}

test("tracker should cancel correct job when killSparkContext=false") {
test("tracker should cancel the correct job when killSparkContext=false") {
val nWorkers = 2
val tracker = new SparkParallelismTracker(sc, 0, nWorkers, false)
val rdd: RDD[Int] = sc.parallelize(1 to 10, nWorkers)
Expand Down

0 comments on commit f1df786

Please sign in to comment.