Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust keepAlive for test under JDK 9 [ci: last-only] #10732

Open
wants to merge 3 commits into
base: 2.13.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 17 additions & 6 deletions src/reflect/scala/reflect/internal/util/ScalaClassLoader.scala
Expand Up @@ -51,17 +51,28 @@ final class RichClassLoader(private val self: JClassLoader) extends AnyVal {
tryToInitializeClass[AnyRef](path).map(_.getConstructor().newInstance()).orNull

/** Create an instance with ctor args, or invoke errorFn before throwing. */
def create[T <: AnyRef : ClassTag](path: String, errorFn: String => Unit)(args: AnyRef*): T = {
def create[T <: AnyRef : ClassTag](path: String, errorFn: String => Unit)(args: Any*): T = {
def fail(msg: String) = error(msg, new IllegalArgumentException(msg))
def error(msg: String, e: Throwable) = { errorFn(msg) ; throw e }
def error(msg: String, e: Throwable) = { errorFn(msg); throw e }
try {
val clazz = Class.forName(path, /*initialize =*/ true, /*loader =*/ self)
if (classTag[T].runtimeClass isAssignableFrom clazz) {
if (classTag[T].runtimeClass.isAssignableFrom(clazz)) {
val ctor = {
val maybes = clazz.getConstructors filter (c => c.getParameterCount == args.size &&
(c.getParameterTypes zip args).forall { case (k, a) => k isAssignableFrom a.getClass })
val bySize = clazz.getConstructors.filter(_.getParameterCount == args.size)
if (bySize.isEmpty) fail(s"No constructor takes ${args.size} parameters.")
def isAssignable(k: Class[?], a: Any): Boolean =
if (k == classOf[Int]) a.isInstanceOf[Integer]
else if (k == classOf[Boolean]) a.isInstanceOf[java.lang.Boolean]
else if (k == classOf[Long]) a.isInstanceOf[java.lang.Long]
else k.isAssignableFrom(a.getClass)
val maybes = bySize.filter(c => c.getParameterTypes.zip(args).forall { case (k, a) => isAssignable(k, a) })
if (maybes.size == 1) maybes.head
else fail(s"Constructor must accept arg list (${args map (_.getClass.getName) mkString ", "}): ${path}")
else if (bySize.size == 1)
fail(s"One constructor takes ${args.size} parameters but ${
bySize.head.getParameterTypes.zip(args).collect { case (k, a) if !isAssignable(k, a) => s"$k != ${a.getClass}" }.mkString("; ")
}.")
else
fail(s"Constructor must accept arg list (${args.map(_.getClass.getName).mkString(", ")}): ${path}")
}
(ctor.newInstance(args: _*)).asInstanceOf[T]
} else {
Expand Down
2 changes: 2 additions & 0 deletions test/files/jvm/scala-concurrent-tck-b.check
@@ -0,0 +1,2 @@
starting testUncaughtExceptionReporting
finished testUncaughtExceptionReporting
122 changes: 122 additions & 0 deletions test/files/jvm/scala-concurrent-tck-b.scala
@@ -0,0 +1,122 @@
// javaVersion: 9+
import scala.concurrent.{
TimeoutException,
ExecutionContext,
ExecutionContextExecutorService,
Await,
Awaitable,
}
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.tools.testkit.AssertUtil.{Fast, Slow, waitFor, waitForIt}
import scala.util.{Try, Success, Failure}
import scala.util.chaining._
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit.{MILLISECONDS => Milliseconds, SECONDS => Seconds}

trait TestBase {

trait Done { def apply(proof: => Boolean): Unit }

def once(body: Done => Unit): Unit = {
import java.util.concurrent.LinkedBlockingQueue
val q = new LinkedBlockingQueue[Try[Boolean]]
body(new Done {
def apply(proof: => Boolean): Unit = q offer Try(proof)
})
var tried: Try[Boolean] = null
def check = q.poll(5000L, Milliseconds).tap(tried = _) != null
waitForIt(check, progress = Slow, label = "concurrent-tck")
assert(tried.isSuccess)
assert(tried.get)
// Check that we don't get more than one completion
assert(q.poll(50, Milliseconds) eq null)
}

def test[T](name: String)(body: => T): T = {
println(s"starting $name")
body.tap(_ => println(s"finished $name"))
}

def await[A](value: Awaitable[A]): A = {
def check: Option[A] =
Try(Await.result(value, Duration(500, "ms"))) match {
case Success(x) => Some(x)
case Failure(_: TimeoutException) => None
case Failure(t) => throw t
}
waitFor(check, progress = Fast, label = "concurrent-tck test result")
}
}

class ReportingExecutionContext extends TestBase {
val progress = Fast
@volatile var thread: Thread = null
@volatile var reportedOn: Thread = null
@volatile var reported: Throwable = null
val latch = new CountDownLatch(1)

def report(t: Thread, e: Throwable): Unit = {
reportedOn = t
reported = e
latch.countDown()
}

def ecesUsingDefaultFactory = {
import java.util.concurrent.{ForkJoinPool}
import java.util.function.Predicate
import scala.reflect.internal.util.RichClassLoader._

val path = "java.util.concurrent.ForkJoinPool"
val n = 2 // parallelism
val factory = scala.concurrent.TestUtil.threadFactory(report)
val ueh: Thread.UncaughtExceptionHandler = report(_, _)
val async = true
val coreSize = 4
val maxSize = 4
val minRun = 1 // minimumRunnable for liveness
val saturate: Predicate[ForkJoinPool] = (fjp: ForkJoinPool) => false // whether to continue after blocking at maxSize
val keepAlive = 2000L
val fjp = new ForkJoinPool(n, factory, ueh, async, coreSize, maxSize, minRun, saturate, keepAlive, Milliseconds)
ExecutionContext.fromExecutorService(fjp, report(null, _))
}

def testUncaughtExceptionReporting(ec: ExecutionContextExecutorService): Unit = once {
done =>
val example = new InterruptedException

@tailrec def spinForThreadDeath(turns: Int): Boolean =
turns > 0 && (thread != null && !thread.isAlive || { Thread.sleep(100L); spinForThreadDeath(turns - 1) })

def truthfully(b: Boolean): Option[Boolean] = if (b) Some(true) else None

// jdk17 thread receives pool exception handler, so wait for thread to die slow and painful expired keepalive
def threadIsDead = waitFor(truthfully(spinForThreadDeath(turns = 10)), progress = progress, label = "concurrent-tck-thread-death")

try {
ec.execute(() => {
thread = Thread.currentThread
throw example
})
latch.await(2, Seconds)
done(threadIsDead && (example.eq(reported) || example.eq(reported.getCause)))
}
finally ec.shutdown()
}

test("testUncaughtExceptionReporting")(testUncaughtExceptionReporting {
ecesUsingDefaultFactory
})
}

object Test extends App {
new ReportingExecutionContext

System.exit(0)
}

package scala.concurrent {
object TestUtil {
def threadFactory(uncaughtExceptionHandler: Thread.UncaughtExceptionHandler) = new impl.ExecutionContextImpl.DefaultThreadFactory(daemonic=true, maxBlockers=256, prefix="test-thread", uncaughtExceptionHandler)
}
}
2 changes: 0 additions & 2 deletions test/files/jvm/scala-concurrent-tck.check
Expand Up @@ -124,8 +124,6 @@ starting rejectedExecutionException
finished rejectedExecutionException
starting testNameOfGlobalECThreads
finished testNameOfGlobalECThreads
starting testUncaughtExceptionReporting
finished testUncaughtExceptionReporting
starting testOnSuccessCustomEC
finished testOnSuccessCustomEC
starting testKeptPromiseCustomEC
Expand Down
40 changes: 5 additions & 35 deletions test/files/jvm/scala-concurrent-tck.scala
@@ -1,9 +1,11 @@

import scala.concurrent.{
Future,
Promise,
TimeoutException,
ExecutionException,
ExecutionContext,
ExecutionContextExecutorService,
CanAwait,
Await,
Awaitable,
Expand All @@ -15,7 +17,7 @@ import scala.reflect.{classTag, ClassTag}
import scala.tools.testkit.AssertUtil.{Fast, Slow, assertThrows, waitFor, waitForIt}
import scala.util.{Try, Success, Failure}
import scala.util.chaining._
import java.util.concurrent.CountDownLatch
import java.util.concurrent.{CountDownLatch, ThreadPoolExecutor}
import java.util.concurrent.TimeUnit.{MILLISECONDS => Milliseconds, SECONDS => Seconds}

trait TestBase {
Expand All @@ -29,7 +31,7 @@ trait TestBase {
def apply(proof: => Boolean): Unit = q offer Try(proof)
})
var tried: Try[Boolean] = null
def check = { tried = q.poll(5000L, Milliseconds) ; tried != null }
def check = q.poll(5000L, Milliseconds).tap(tried = _) != null
waitForIt(check, progress = Slow, label = "concurrent-tck")
assert(tried.isSuccess)
assert(tried.get)
Expand Down Expand Up @@ -747,7 +749,7 @@ class Blocking extends TestBase {

class BlockContexts extends TestBase {
import ExecutionContext.Implicits._
import scala.concurrent.{ Await, Awaitable, BlockContext }
import scala.concurrent.BlockContext

private def getBlockContext(body: => BlockContext): BlockContext = await(Future(body))

Expand Down Expand Up @@ -877,7 +879,6 @@ class GlobalExecutionContext extends TestBase {
}

class CustomExecutionContext extends TestBase {
import scala.concurrent.{ ExecutionContext, Awaitable }

def defaultEC = ExecutionContext.global

Expand Down Expand Up @@ -987,37 +988,6 @@ class CustomExecutionContext extends TestBase {
assert(count >= 1)
}

def testUncaughtExceptionReporting(): Unit = once { done =>
val example = new InterruptedException
val latch = new CountDownLatch(1)
@volatile var thread: Thread = null
@volatile var reported: Throwable = null
val ec = ExecutionContext.fromExecutorService(null, t => {
reported = t
latch.countDown()
})

@tailrec def waitForThreadDeath(turns: Int): Boolean =
turns > 0 && (thread != null && !thread.isAlive || { Thread.sleep(10L) ; waitForThreadDeath(turns - 1) })

def truthfully(b: Boolean): Option[Boolean] = if (b) Some(true) else None

// jdk17 thread receives pool exception handler, so wait for thread to die slow and painful expired keepalive
def threadIsDead =
waitFor(truthfully(waitForThreadDeath(turns = 100)), progress = Slow, label = "concurrent-tck-thread-death")

try {
ec.execute(() => {
thread = Thread.currentThread
throw example
})
latch.await(2, Seconds)
done(threadIsDead && (reported eq example))
}
finally ec.shutdown()
}

test("testUncaughtExceptionReporting")(testUncaughtExceptionReporting())
test("testOnSuccessCustomEC")(testOnSuccessCustomEC())
test("testKeptPromiseCustomEC")(testKeptPromiseCustomEC())
test("testCallbackChainCustomEC")(testCallbackChainCustomEC())
Expand Down