Skip to content

Commit

Permalink
[SPARK-28638][WEBUI] Task summary should only contain successful task…
Browse files Browse the repository at this point in the history
…s' metrics

## What changes were proposed in this pull request?

Currently, on requesting summary metrics, cached data are returned if the current number of "SUCCESS" tasks is the same as the value in cached data.
However, the number of "SUCCESS" tasks is wrong when there are running tasks. In `AppStatusStore`, the KVStore is `ElementTrackingStore`, instead of `InMemoryStore`. The value count is always the number of "SUCCESS" tasks + "RUNNING" tasks.
Thus, even when the running tasks are finished, the out-of-update cached data is returned.

This PR is to fix the code in getting the number of "SUCCESS" tasks.

## How was this patch tested?

Test manually, run
```
sc.parallelize(1 to 160, 40).map(i => Thread.sleep(i*100)).collect()
```
and keep refreshing the stage page , we can see the task summary metrics is wrong.

### Before fix:
![image](https://user-images.githubusercontent.com/1097932/62560343-6a141780-b8af-11e9-8942-d88540659a93.png)

### After fix:
![image](https://user-images.githubusercontent.com/1097932/62560355-7009f880-b8af-11e9-8ba8-10c083a48d7b.png)

Closes apache#25369 from gengliangwang/fixStagePage.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
(cherry picked from commit 48d04f7)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
gengliangwang authored and Raphaël Luta committed Sep 17, 2019
1 parent 823f508 commit addd2e9
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 13 deletions.
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
Expand Up @@ -126,6 +126,12 @@ private[spark] class AppStatusStore(
store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).locality
}

// SPARK-26119: we only want to consider successful tasks when calculating the metrics summary,
// but currently this is very expensive when using a disk store. So we only trigger the slower
// code path when we know we have all data in memory. The following method checks whether all
// the data will be in memory.
private def isInMemoryStore: Boolean = store.isInstanceOf[InMemoryStore] || listener.isDefined

/**
* Calculates a summary of the task metrics for the given stage attempt, returning the
* requested quantiles for the recorded metrics.
Expand All @@ -146,7 +152,8 @@ private[spark] class AppStatusStore(
// cheaper for disk stores (avoids deserialization).
val count = {
Utils.tryWithResource(
if (store.isInstanceOf[InMemoryStore]) {
if (isInMemoryStore) {
// For Live UI, we should count the tasks with status "SUCCESS" only.
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(TaskIndexNames.STATUS)
Expand Down Expand Up @@ -235,7 +242,7 @@ private[spark] class AppStatusStore(
// and failed tasks differently (would be tricky). Also would require changing the disk store
// version (to invalidate old stores).
def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = {
if (store.isInstanceOf[InMemoryStore]) {
if (isInMemoryStore) {
val quantileTasks = store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(index)
Expand Down
Expand Up @@ -17,8 +17,7 @@

package org.apache.spark.status

import org.apache.spark.SparkFunSuite
import org.apache.spark.status.api.v1.TaskMetricDistributions
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.util.Distribution
import org.apache.spark.util.kvstore._

Expand Down Expand Up @@ -77,14 +76,23 @@ class AppStatusStoreSuite extends SparkFunSuite {
assert(store.count(classOf[CachedQuantile]) === 2)
}

test("only successfull task have taskSummary") {
private def createLiveStore(inMemoryStore: InMemoryStore): AppStatusStore = {
val conf = new SparkConf()
val store = new ElementTrackingStore(inMemoryStore, conf)
val listener = new AppStatusListener(store, conf, true, None)
new AppStatusStore(store, listener = Some(listener))
}

test("SPARK-28638: only successful tasks have taskSummary when with in memory kvstore") {
val store = new InMemoryStore()
(0 until 5).foreach { i => store.write(newTaskData(i, status = "FAILED")) }
val appStore = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles)
assert(appStore.size === 0)
Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore =>
val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles)
assert(summary.size === 0)
}
}

test("summary should contain task metrics of only successfull tasks") {
test("SPARK-28638: summary should contain successful tasks only when with in memory kvstore") {
val store = new InMemoryStore()

for (i <- 0 to 5) {
Expand All @@ -95,13 +103,15 @@ class AppStatusStoreSuite extends SparkFunSuite {
}
}

val summary = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles).get
Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore =>
val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get

val values = Array(0.0, 2.0, 4.0)
val values = Array(0.0, 2.0, 4.0)

val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted)
dist.zip(summary.executorRunTime).foreach { case (expected, actual) =>
assert(expected === actual)
val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted)
dist.zip(summary.executorRunTime).foreach { case (expected, actual) =>
assert(expected === actual)
}
}
}

Expand Down

0 comments on commit addd2e9

Please sign in to comment.