New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[jvm-packages] cancel job instead of killing SparkContext #6019
Conversation
s"$taskEndReason, stopping SparkContext") | ||
TaskFailedListener.startedSparkContextKiller() | ||
s"$taskEndReason, cancelling all jobs") | ||
TaskFailedListener.cancelAllJobs() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
I don't think we should change the default behavior of killing sparkcontext, over the year, a lot of fault recovery strategy in prod env have been built based on killing context behavior...instead, we should provide both behavior. Even more, cancel the job is just for a small percentage of customers who train multiple XGB models simultaneously or train various models in the same app.
-
here you called cancelAllJobs, I don't think there is any difference than killing SparkContext, you should call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx @CodingCat. for
I don't think we should change the default behavior of killing sparkcontext, over the year, a lot of fault recovery strategy in prod env have been built based on killing context behavior...instead, we should provide both behavior. Even more, cancel the job is just for a small percentage of customers who train multiple XGB models simultaneously or train various models in the same app.
Yeah, make sense. I will update the change to add a parameter to control for both scenarios.
here you called cancelAllJobs, I don't think there is any difference than killing SparkContext, you should call
The intent of this PR is to throw an Exception when doing cancelAllJobs. User should try catch
try {
val model = xgboostClassifier.fit(dataset)
} catch {
case e: Exception =>
}
cancelAllJobs
private[scheduler] def doCancelAllJobs(): Unit = {
// Cancel all running jobs.
runningStages.map(_.firstJobId).foreach(handleJobCancellation(_,
Option("as part of cancellation of all jobs")))
activeJobs.clear() // These should already be empty by this point,
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
}
cancelJob
private[scheduler] def handleJobCancellation(jobId: Int, reason: Option[String]): Unit = {
if (!jobIdToStageIds.contains(jobId)) {
logDebug("Trying to cancel unregistered job " + jobId)
} else {
failJobAndIndependentStages(
jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, reason.getOrElse("")))
}
}
From the code, cancelAllJobs
will finally call the handleJobCancellation just like cancelJob
does.
@CodingCat Could you help to review again? the new commit introduces a new parameter kill_spark_context to control if killing SparkContext or not when training task fails. and adds two test cases for this new feature. |
Codecov Report
@@ Coverage Diff @@
## master #6019 +/- ##
=======================================
Coverage 79.10% 79.10%
=======================================
Files 12 12
Lines 3044 3044
=======================================
Hits 2408 2408
Misses 636 636 Continue to review full report at Codecov.
|
Hi @CodingCat, Could you help to review it. Many thx |
@CodingCat I would like to say you are indeed a Spark Expert. I didn't realize we can submit jobs simultaneously in a thread. IThe new commits cancel the jobs the failing task belongs to. Could you help to review it again? |
@trivialfis @hcho3 the CI failed on building gpu cuda 10.2. Could you help to check that? |
There's a breakage in conda. I think we are waiting for a fix from them. |
The CI has been fixed in the master branch. Rerunning tests now. |
This PR changes the default behavior that kills SparkContext. Instead, This PR cancels jobs when coming across task failed. That means the SparkContext is still alive even some exceptions happen.
@CodingCat Could you help to review it? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, left some minor things
@@ -220,6 +221,9 @@ private[this] class XGBoostExecutionParamsFactory(rawParams: Map[String, Any], s | |||
val cacheTrainingSet = overridedParams.getOrElse("cache_training_set", false) | |||
.asInstanceOf[Boolean] | |||
|
|||
val killSparkContext = overridedParams.getOrElse("kill_spark_context", true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kill_spark_context_on_worker_failure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, Thx for the naming suggestion
@@ -79,7 +83,7 @@ class SparkParallelismTracker( | |||
def execute[T](body: => T): T = { | |||
if (timeout <= 0) { | |||
logger.info("starting training without setting timeout for waiting for resources") | |||
body | |||
safeExecute(body) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we have to change to safeExecute()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The safeExecute wraps TaskFailedListener inside. I don't know why the body was not executed in safeExecute in the previous version, Since it may hang forever if no TaskFailedListener.
} else { | ||
val stageId = taskEnd.stageId | ||
// find job ids according to stage id and then cancel the job | ||
jobIdToStageIds.foreach(t => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jobIdToStageIds.foreach { case (jobId, stageIds) =>
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good suggestion, really thx, Done.
@hcho3, Hi Phillip, may need your help on this CI failure |
Re-triggering the CI. I opened #6074 to keep track of the flaky test. |
Thx @hcho3 , @CodingCat Could you help to review again? |
I wonder if the thread that run "sparkJobThread.start()" start job before another thread that execute the code "sc.addSparkListener, the function "onJobStart" won't be call forever. It's OK? |
Good finding, I will fix this issue. Thx again |
there is possibility that onJobStart of TaskFailedListener won't be called, if the job is submitted before the other thread adds addSparkListener. detail can be found at dmlc#6019 (comment)
there is possibility that onJobStart of TaskFailedListener won't be called, if the job is submitted before the other thread adds addSparkListener. detail can be found at dmlc#6019 (comment)
there is possibility that onJobStart of TaskFailedListener won't be called, if the job is submitted before the other thread adds addSparkListener. detail can be found at #6019 (comment)
This PR changes the default behavior that kills SparkContext. Instead, This PR
cancels jobs when coming across task failed. That means the SparkContext is
still alive even some exceptions happen.
This PR is regarding #4826 and #5978
@CodingCat @trivialfis @hcho3 Could you help to review this PR?