Skip to content

Commit

Permalink
[SPARK-39553][CORE] Multi-thread unregister shuffle shouldn't throw N…
Browse files Browse the repository at this point in the history
…PE when using Scala 2.13

This pr add a `shuffleStatus != null` condition to `o.a.s.MapOutputTrackerMaster#unregisterShuffle` method to avoid throwing NPE when using Scala 2.13.

Ensure that no NPE is thrown when `o.a.s.MapOutputTrackerMaster#unregisterShuffle` is called by multiple threads, this pr is only for Scala 2.13.

`o.a.s.MapOutputTrackerMaster#unregisterShuffle` method will be called concurrently by the following two paths:

- BlockManagerStorageEndpoint:

https://github.com/apache/spark/blob/6f1046afa40096f477b29beecca5ca6286dfa7f3/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala#L56-L62

- ContextCleaner:

https://github.com/apache/spark/blob/6f1046afa40096f477b29beecca5ca6286dfa7f3/core/src/main/scala/org/apache/spark/ContextCleaner.scala#L234-L241

When test with Scala 2.13, for example `sql/core` module,  there are many log as follows,although these did not cause UTs failure:

```
17:44:09.957 WARN org.apache.spark.storage.BlockManagerMaster: Failed to remove shuffle 87 - null
java.lang.NullPointerException
	at org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1(MapOutputTracker.scala:882)
	at org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1$adapted(MapOutputTracker.scala:881)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.MapOutputTrackerMaster.unregisterShuffle(MapOutputTracker.scala:881)
	at org.apache.spark.storage.BlockManagerStorageEndpoint$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$3(BlockManagerStorageEndpoint.scala:59)
	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.scala:17)
	at org.apache.spark.storage.BlockManagerStorageEndpoint.$anonfun$doAsync$1(BlockManagerStorageEndpoint.scala:89)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:678)
	at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
17:44:09.958 ERROR org.apache.spark.ContextCleaner: Error cleaning shuffle 94
java.lang.NullPointerException
	at org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1(MapOutputTracker.scala:882)
	at org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1$adapted(MapOutputTracker.scala:881)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.MapOutputTrackerMaster.unregisterShuffle(MapOutputTracker.scala:881)
	at org.apache.spark.ContextCleaner.doCleanupShuffle(ContextCleaner.scala:241)
	at org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3(ContextCleaner.scala:202)
	at org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3$adapted(ContextCleaner.scala:195)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.ContextCleaner.$anonfun$keepCleaning$1(ContextCleaner.scala:195)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1432)
	at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:189)
	at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:79)
```

I think this is a bug of Scala 2.13.8 and already submit an issue to scala/bug#12613, this PR is only for protection, we should remove this protection after Scala 2.13(maybe scala/scala#9957) fixes this issue.

No

- Pass GA
- Add new test `SPARK-39553: Multi-thread unregister shuffle shouldn't throw NPE` to `MapOutputTrackerSuite`, we can test manually as follows:

```
dev/change-scala-version.sh 2.13
mvn clean install -DskipTests -pl core -am -Pscala-2.13
mvn clean test -pl core -Pscala-2.13 -Dtest=none -DwildcardSuites=org.apache.spark.MapOutputTrackerSuite
```

**Before**

```
- SPARK-39553: Multi-thread unregister shuffle shouldn't throw NPE *** FAILED ***
  3 did not equal 0 (MapOutputTrackerSuite.scala:971)
Run completed in 17 seconds, 505 milliseconds.
Total number of tests run: 25
Suites: completed 2, aborted 0
Tests: succeeded 24, failed 1, canceled 0, ignored 1, pending 0
*** 1 TEST FAILED ***
```

**After**

```
- SPARK-39553: Multi-thread unregister shuffle shouldn't throw NPE
Run completed in 17 seconds, 996 milliseconds.
Total number of tests run: 25
Suites: completed 2, aborted 0
Tests: succeeded 25, failed 0, canceled 0, ignored 1, pending 0
All tests passed.

```

Closes #37024 from LuciferYang/SPARK-39553.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
(cherry picked from commit 2925896)
Signed-off-by: Sean Owen <srowen@gmail.com>
  • Loading branch information
LuciferYang authored and srowen committed Jun 29, 2022
1 parent c0d9109 commit 1387af7
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 2 deletions.
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Expand Up @@ -832,8 +832,12 @@ private[spark] class MapOutputTrackerMaster(
/** Unregister shuffle data */
def unregisterShuffle(shuffleId: Int): Unit = {
shuffleStatuses.remove(shuffleId).foreach { shuffleStatus =>
shuffleStatus.invalidateSerializedMapOutputStatusCache()
shuffleStatus.invalidateSerializedMergeOutputStatusCache()
// SPARK-39553: Add protection for Scala 2.13 due to https://github.com/scala/bug/issues/12613
// We should revert this if Scala 2.13 solves this issue.
if (shuffleStatus != null) {
shuffleStatus.invalidateSerializedMapOutputStatusCache()
shuffleStatus.invalidateSerializedMergeOutputStatusCache()
}
}
}

Expand Down
34 changes: 34 additions & 0 deletions core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark

import java.util.concurrent.atomic.LongAdder

import scala.collection.mutable.ArrayBuffer

import org.mockito.ArgumentMatchers.any
Expand Down Expand Up @@ -910,4 +912,36 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
rpcEnv.shutdown()
slaveRpcEnv.shutdown()
}

test("SPARK-39553: Multi-thread unregister shuffle shouldn't throw NPE") {
val rpcEnv = createRpcEnv("test")
val tracker = newTrackerMaster()
tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf))
val shuffleIdRange = 0 until 100
shuffleIdRange.foreach { shuffleId =>
tracker.registerShuffle(shuffleId, 2, MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES)
}
val npeCounter = new LongAdder()
// More threads will help to reproduce the problem
val threads = new Array[Thread](5)
threads.indices.foreach { i =>
threads(i) = new Thread() {
override def run(): Unit = {
shuffleIdRange.foreach { shuffleId =>
try {
tracker.unregisterShuffle(shuffleId)
} catch {
case _: NullPointerException => npeCounter.increment()
}
}
}
}
}
threads.foreach(_.start())
threads.foreach(_.join())
tracker.stop()
rpcEnv.shutdown()
assert(npeCounter.intValue() == 0)
}
}

0 comments on commit 1387af7

Please sign in to comment.