+
+ override fun transform(outputs: TransformOutputs) {
+ ZipFile(inputArtifact.get().asFile).use { zip ->
+ zip.entries().asSequence()
+ .filter { !it.isDirectory }
+ .filter { it.name.endsWith(".jar") }
+ .forEach { zip.unzip(it, outputs.file(it.name)) }
+ }
+ }
+}
+
+private fun ZipFile.unzip(entry: ZipEntry, output: File) {
+ getInputStream(entry).use {
+ Files.copy(it, output.toPath())
+ }
+}
diff --git a/coroutines-guide.md b/coroutines-guide.md
index ea512ba68d..09cfb93cab 100644
--- a/coroutines-guide.md
+++ b/coroutines-guide.md
@@ -20,6 +20,7 @@ The main coroutines guide has moved to the [docs folder](docs/coroutines-guide.m
* [Closing resources with `finally`](docs/cancellation-and-timeouts.md#closing-resources-with-finally)
* [Run non-cancellable block](docs/cancellation-and-timeouts.md#run-non-cancellable-block)
* [Timeout](docs/cancellation-and-timeouts.md#timeout)
+ * [Asynchronous timeout and resources](docs/cancellation-and-timeouts.md#asynchronous-timeout-and-resources)
* [Composing Suspending Functions](docs/composing-suspending-functions.md#composing-suspending-functions)
* [Sequential by default](docs/composing-suspending-functions.md#sequential-by-default)
diff --git a/docs/cancellation-and-timeouts.md b/docs/cancellation-and-timeouts.md
index d8d5b7bad4..b296bde493 100644
--- a/docs/cancellation-and-timeouts.md
+++ b/docs/cancellation-and-timeouts.md
@@ -11,6 +11,7 @@
* [Closing resources with `finally`](#closing-resources-with-finally)
* [Run non-cancellable block](#run-non-cancellable-block)
* [Timeout](#timeout)
+ * [Asynchronous timeout and resources](#asynchronous-timeout-and-resources)
@@ -355,6 +356,114 @@ Result is null
+### Asynchronous timeout and resources
+
+
+
+The timeout event in [withTimeout] is asynchronous with respect to the code running in its block and may happen at any time,
+even right before the return from inside of the timeout block. Keep this in mind if you open or acquire some
+resource inside the block that needs closing or release outside of the block.
+
+For example, here we imitate a closeable resource with the `Resource` class, that simply keeps track of how many times
+it was created by incrementing the `acquired` counter and decrementing this counter from its `close` function.
+Let us run a lot of coroutines with the small timeout try acquire this resource from inside
+of the `withTimeout` block after a bit of delay and release it from outside.
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+
+//sampleStart
+var acquired = 0
+
+class Resource {
+ init { acquired++ } // Acquire the resource
+ fun close() { acquired-- } // Release the resource
+}
+
+fun main() {
+ runBlocking {
+ repeat(100_000) { // Launch 100K coroutines
+ launch {
+ val resource = withTimeout(60) { // Timeout of 60 ms
+ delay(50) // Delay for 50 ms
+ Resource() // Acquire a resource and return it from withTimeout block
+ }
+ resource.close() // Release the resource
+ }
+ }
+ }
+ // Outside of runBlocking all coroutines have completed
+ println(acquired) // Print the number of resources still acquired
+}
+//sampleEnd
+```
+
+
+
+> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-cancel-08.kt).
+
+
+
+If you run the above code you'll see that it does not always print zero, though it may depend on the timings
+of your machine you may need to tweak timeouts in this example to actually see non-zero values.
+
+> Note, that incrementing and decrementing `acquired` counter here from 100K coroutines is completely safe,
+> since it always happens from the same main thread. More on that will be explained in the next chapter
+> on coroutine context.
+
+To workaround this problem you can store a reference to the resource in the variable as opposed to returning it
+from the `withTimeout` block.
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+
+var acquired = 0
+
+class Resource {
+ init { acquired++ } // Acquire the resource
+ fun close() { acquired-- } // Release the resource
+}
+
+fun main() {
+//sampleStart
+ runBlocking {
+ repeat(100_000) { // Launch 100K coroutines
+ launch {
+ var resource: Resource? = null // Not acquired yet
+ try {
+ withTimeout(60) { // Timeout of 60 ms
+ delay(50) // Delay for 50 ms
+ resource = Resource() // Store a resource to the variable if acquired
+ }
+ // We can do something else with the resource here
+ } finally {
+ resource?.close() // Release the resource if it was acquired
+ }
+ }
+ }
+ }
+ // Outside of runBlocking all coroutines have completed
+ println(acquired) // Print the number of resources still acquired
+//sampleEnd
+}
+```
+
+
+
+> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-cancel-09.kt).
+
+This example always prints zero. Resources do not leak.
+
+
+
[launch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/launch.html
diff --git a/docs/knit.properties b/docs/knit.properties
index ab2508a114..2028ecb416 100644
--- a/docs/knit.properties
+++ b/docs/knit.properties
@@ -4,19 +4,7 @@
knit.package=kotlinx.coroutines.guide
knit.dir=../kotlinx-coroutines-core/jvm/test/guide/
-knit.pattern=example-[a-zA-Z0-9-]+-##\\.kt
-knit.include=knit.code.include
test.package=kotlinx.coroutines.guide.test
test.dir=../kotlinx-coroutines-core/jvm/test/guide/test/
-test.template=knit.test.template
-# Various test validation modes and their corresponding methods from TestUtil
-test.mode.=verifyLines
-test.mode.STARTS_WITH=verifyLinesStartWith
-test.mode.ARBITRARY_TIME=verifyLinesArbitraryTime
-test.mode.FLEXIBLE_TIME=verifyLinesFlexibleTime
-test.mode.FLEXIBLE_THREAD=verifyLinesFlexibleThread
-test.mode.LINES_START_UNORDERED=verifyLinesStartUnordered
-test.mode.LINES_START=verifyLinesStart
-test.mode.EXCEPTION=verifyExceptions
\ No newline at end of file
diff --git a/docs/knit.test.template b/docs/knit.test.template
index a912555a43..727493c662 100644
--- a/docs/knit.test.template
+++ b/docs/knit.test.template
@@ -5,6 +5,7 @@
// This file was automatically generated from ${file.name} by Knit tool. Do not edit.
package ${test.package}
+import kotlinx.coroutines.knit.*
import org.junit.Test
class ${test.name} {
diff --git a/gradle.properties b/gradle.properties
index b778ce2b64..18b95166d6 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -3,14 +3,14 @@
#
# Kotlin
-version=1.3.9-SNAPSHOT
+version=1.4.0-M1-SNAPSHOT
group=org.jetbrains.kotlinx
kotlin_version=1.4.0
# Dependencies
junit_version=4.12
atomicfu_version=0.14.4
-knit_version=0.1.3
+knit_version=0.2.0
html_version=0.6.8
lincheck_version=2.7.1
dokka_version=0.9.16-rdev-2-mpp-hacks
@@ -36,10 +36,12 @@ kotlin.js.compiler=both
gradle_node_version=1.2.0
node_version=8.9.3
npm_version=5.7.1
-mocha_version=4.1.0
+mocha_version=6.2.2
mocha_headless_chrome_version=1.8.2
-mocha_teamcity_reporter_version=2.2.2
-source_map_support_version=0.5.3
+mocha_teamcity_reporter_version=3.0.0
+source_map_support_version=0.5.16
+jsdom_version=15.2.1
+jsdom_global_version=3.0.2
# Settings
kotlin.incremental.multiplatform=true
@@ -56,7 +58,6 @@ org.gradle.jvmargs=-Xmx2g
# https://github.com/gradle/gradle/issues/11412
systemProp.org.gradle.internal.publish.checksums.insecure=true
-# This is commented out, and the property is set conditionally in build.gradle, because 1.3.71 doesn't work with it.
-# Once this property is set by default in new versions or 1.3.71 is dropped, either uncomment or remove this line.
+# todo:KLUDGE: This is commented out, and the property is set conditionally in build.gradle, because IDEA doesn't work with it.
#kotlin.mpp.enableGranularSourceSetsMetadata=true
kotlin.mpp.enableCompatibilityMetadataVariant=true
diff --git a/gradle/compile-common.gradle b/gradle/compile-common.gradle
index bee61429df..0dc1b5c014 100644
--- a/gradle/compile-common.gradle
+++ b/gradle/compile-common.gradle
@@ -3,10 +3,6 @@
*/
kotlin.sourceSets {
- commonMain.dependencies {
- api "org.jetbrains.kotlin:kotlin-stdlib-common:$kotlin_version"
- }
-
commonTest.dependencies {
api "org.jetbrains.kotlin:kotlin-test-common:$kotlin_version"
api "org.jetbrains.kotlin:kotlin-test-annotations-common:$kotlin_version"
diff --git a/gradle/compile-js-multiplatform.gradle b/gradle/compile-js-multiplatform.gradle
index 93d371a21f..b52cfc5230 100644
--- a/gradle/compile-js-multiplatform.gradle
+++ b/gradle/compile-js-multiplatform.gradle
@@ -26,10 +26,6 @@ kotlin {
}
sourceSets {
- jsMain.dependencies {
- api "org.jetbrains.kotlin:kotlin-stdlib-js:$kotlin_version"
- }
-
jsTest.dependencies {
api "org.jetbrains.kotlin:kotlin-test-js:$kotlin_version"
}
diff --git a/gradle/compile-js.gradle b/gradle/compile-js.gradle
index d0697cfd3a..55c81fe56e 100644
--- a/gradle/compile-js.gradle
+++ b/gradle/compile-js.gradle
@@ -4,25 +4,29 @@
// Platform-specific configuration to compile JS modules
-apply plugin: 'kotlin2js'
+apply plugin: 'org.jetbrains.kotlin.js'
dependencies {
- compile "org.jetbrains.kotlin:kotlin-stdlib-js:$kotlin_version"
- testCompile "org.jetbrains.kotlin:kotlin-test-js:$kotlin_version"
+ testImplementation "org.jetbrains.kotlin:kotlin-test-js:$kotlin_version"
}
-tasks.withType(compileKotlin2Js.getClass()) {
- kotlinOptions {
- moduleKind = "umd"
- sourceMap = true
- metaInfo = true
+kotlin {
+ js(LEGACY) {
+ moduleName = project.name - "-js"
+ }
+
+ sourceSets {
+ main.kotlin.srcDirs = ['src']
+ test.kotlin.srcDirs = ['test']
+ main.resources.srcDirs = ['resources']
+ test.resources.srcDirs = ['test-resources']
}
}
-compileKotlin2Js {
+tasks.withType(compileKotlinJs.getClass()) {
kotlinOptions {
- // drop -js suffix from outputFile
- def baseName = project.name - "-js"
- outputFile = new File(outputFile.parent, baseName + ".js")
+ moduleKind = "umd"
+ sourceMap = true
+ metaInfo = true
}
}
diff --git a/gradle/compile-jvm-multiplatform.gradle b/gradle/compile-jvm-multiplatform.gradle
index b226c97a57..e72d30511e 100644
--- a/gradle/compile-jvm-multiplatform.gradle
+++ b/gradle/compile-jvm-multiplatform.gradle
@@ -5,19 +5,11 @@
sourceCompatibility = 1.6
targetCompatibility = 1.6
-repositories {
- maven { url "https://dl.bintray.com/devexperts/Maven/" }
-}
-
kotlin {
targets {
fromPreset(presets.jvm, 'jvm')
}
sourceSets {
- jvmMain.dependencies {
- api 'org.jetbrains.kotlin:kotlin-stdlib'
- }
-
jvmTest.dependencies {
api "org.jetbrains.kotlin:kotlin-test:$kotlin_version"
// Workaround to make addSuppressed work in tests
diff --git a/gradle/compile-jvm.gradle b/gradle/compile-jvm.gradle
index a8116595f5..caa5c45f60 100644
--- a/gradle/compile-jvm.gradle
+++ b/gradle/compile-jvm.gradle
@@ -4,13 +4,12 @@
// Platform-specific configuration to compile JVM modules
-apply plugin: 'kotlin'
+apply plugin: 'org.jetbrains.kotlin.jvm'
sourceCompatibility = 1.6
targetCompatibility = 1.6
dependencies {
- compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
testCompile "org.jetbrains.kotlin:kotlin-test:$kotlin_version"
// Workaround to make addSuppressed work in tests
testCompile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
@@ -19,10 +18,6 @@ dependencies {
testCompile "junit:junit:$junit_version"
}
-repositories {
- maven { url "https://dl.bintray.com/devexperts/Maven/" }
-}
-
compileKotlin {
kotlinOptions {
freeCompilerArgs += ['-Xexplicit-api=strict']
diff --git a/gradle/compile-native-multiplatform.gradle b/gradle/compile-native-multiplatform.gradle
index 378e4f5f98..4487446799 100644
--- a/gradle/compile-native-multiplatform.gradle
+++ b/gradle/compile-native-multiplatform.gradle
@@ -13,36 +13,24 @@ kotlin {
}
targets {
- if (project.ext.ideaActive) {
- fromPreset(project.ext.ideaPreset, 'native')
- } else {
- addTarget(presets.linuxX64)
- addTarget(presets.iosArm64)
- addTarget(presets.iosArm32)
- addTarget(presets.iosX64)
- addTarget(presets.macosX64)
- addTarget(presets.mingwX64)
- addTarget(presets.tvosArm64)
- addTarget(presets.tvosX64)
- addTarget(presets.watchosArm32)
- addTarget(presets.watchosArm64)
- addTarget(presets.watchosX86)
- }
+ addTarget(presets.linuxX64)
+ addTarget(presets.iosArm64)
+ addTarget(presets.iosArm32)
+ addTarget(presets.iosX64)
+ addTarget(presets.macosX64)
+ addTarget(presets.mingwX64)
+ addTarget(presets.tvosArm64)
+ addTarget(presets.tvosX64)
+ addTarget(presets.watchosArm32)
+ addTarget(presets.watchosArm64)
+ addTarget(presets.watchosX86)
}
sourceSets {
nativeMain { dependsOn commonMain }
- // Empty source set is required in order to have native tests task
- nativeTest {}
+ nativeTest { dependsOn commonTest }
- if (!project.ext.ideaActive) {
- configure(nativeMainSets) {
- dependsOn nativeMain
- }
-
- configure(nativeTestSets) {
- dependsOn nativeTest
- }
- }
+ configure(nativeMainSets) { dependsOn nativeMain }
+ configure(nativeTestSets) { dependsOn nativeTest }
}
}
diff --git a/gradle/targets.gradle b/gradle/targets.gradle
deleted file mode 100644
index 08f3d989aa..0000000000
--- a/gradle/targets.gradle
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-/*
- * This is a hack to avoid creating unsupported native source sets when importing project into IDEA
- */
-project.ext.ideaActive = System.getProperty('idea.active') == 'true'
-
-kotlin {
- targets {
- def manager = project.ext.hostManager
- def linuxEnabled = manager.isEnabled(presets.linuxX64.konanTarget)
- def macosEnabled = manager.isEnabled(presets.macosX64.konanTarget)
- def winEnabled = manager.isEnabled(presets.mingwX64.konanTarget)
-
- project.ext.isLinuxHost = linuxEnabled
- project.ext.isMacosHost = macosEnabled
- project.ext.isWinHost = winEnabled
-
- if (project.ext.ideaActive) {
- def ideaPreset = presets.linuxX64
- if (macosEnabled) ideaPreset = presets.macosX64
- if (winEnabled) ideaPreset = presets.mingwX64
- project.ext.ideaPreset = ideaPreset
- }
- }
-}
diff --git a/gradle/test-mocha-js.gradle b/gradle/test-mocha-js.gradle
index 6676dc9268..7de79b9939 100644
--- a/gradle/test-mocha-js.gradle
+++ b/gradle/test-mocha-js.gradle
@@ -86,8 +86,8 @@ task testMochaChrome(type: NodeTask, dependsOn: prepareMochaChrome) {
task installDependenciesMochaJsdom(type: NpmTask, dependsOn: [npmInstall]) {
args = ['install',
"mocha@$mocha_version",
- 'jsdom@15.2.1',
- 'jsdom-global@3.0.2',
+ "jsdom@$jsdom_version",
+ "jsdom-global@$jsdom_global_version",
"source-map-support@$source_map_support_version",
'--no-save']
if (project.hasProperty("teamcity")) args += ["mocha-teamcity-reporter@$mocha_teamcity_reporter_version"]
diff --git a/integration/kotlinx-coroutines-guava/build.gradle b/integration/kotlinx-coroutines-guava/build.gradle
deleted file mode 100644
index 16bdea50fd..0000000000
--- a/integration/kotlinx-coroutines-guava/build.gradle
+++ /dev/null
@@ -1,16 +0,0 @@
-/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-ext.guava_version = '28.0-jre'
-
-dependencies {
- compile "com.google.guava:guava:$guava_version"
-}
-
-tasks.withType(dokka.getClass()) {
- externalDocumentationLink {
- url = new URL("https://google.github.io/guava/releases/$guava_version/api/docs/")
- packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL()
- }
-}
diff --git a/integration/kotlinx-coroutines-guava/build.gradle.kts b/integration/kotlinx-coroutines-guava/build.gradle.kts
new file mode 100644
index 0000000000..53e91add44
--- /dev/null
+++ b/integration/kotlinx-coroutines-guava/build.gradle.kts
@@ -0,0 +1,13 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+val guavaVersion = "28.0-jre"
+
+dependencies {
+ compile("com.google.guava:guava:$guavaVersion")
+}
+
+externalDocumentationLink(
+ url = "https://google.github.io/guava/releases/$guavaVersion/api/docs/"
+)
diff --git a/integration/kotlinx-coroutines-play-services/build.gradle b/integration/kotlinx-coroutines-play-services/build.gradle
index eb554866ed..29ce3d606f 100644
--- a/integration/kotlinx-coroutines-play-services/build.gradle
+++ b/integration/kotlinx-coroutines-play-services/build.gradle
@@ -2,12 +2,6 @@
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-import org.gradle.api.artifacts.transform.*
-
-import java.nio.file.Files
-import java.util.zip.ZipEntry
-import java.util.zip.ZipFile
-
ext.tasks_version = '16.0.1'
def artifactType = Attribute.of("artifactType", String)
@@ -49,31 +43,3 @@ tasks.withType(dokka.getClass()) {
packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL()
}
}
-
-abstract class UnpackAar implements TransformAction {
- @InputArtifact
- abstract Provider getInputArtifact()
-
- @Override
- void transform(TransformOutputs outputs) {
- ZipFile zip = new ZipFile(inputArtifact.get().asFile)
- try {
- for (entry in zip.entries()) {
- if (!entry.isDirectory() && entry.name.endsWith(".jar")) {
- unzipEntryTo(zip, entry, outputs.file(entry.name))
- }
- }
- } finally {
- zip.close()
- }
- }
-
- private static void unzipEntryTo(ZipFile zip, ZipEntry entry, File output) {
- InputStream stream = zip.getInputStream(entry)
- try {
- Files.copy(stream, output.toPath())
- } finally {
- stream.close()
- }
- }
-}
diff --git a/integration/kotlinx-coroutines-slf4j/build.gradle b/integration/kotlinx-coroutines-slf4j/build.gradle
deleted file mode 100644
index 05accb75d3..0000000000
--- a/integration/kotlinx-coroutines-slf4j/build.gradle
+++ /dev/null
@@ -1,17 +0,0 @@
-/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-dependencies {
- compile 'org.slf4j:slf4j-api:1.7.25'
- testCompile 'io.github.microutils:kotlin-logging:1.5.4'
- testRuntime 'ch.qos.logback:logback-classic:1.2.3'
- testRuntime 'ch.qos.logback:logback-core:1.2.3'
-}
-
-tasks.withType(dokka.getClass()) {
- externalDocumentationLink {
- packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL()
- url = new URL("https://www.slf4j.org/apidocs/")
- }
-}
diff --git a/integration/kotlinx-coroutines-slf4j/build.gradle.kts b/integration/kotlinx-coroutines-slf4j/build.gradle.kts
new file mode 100644
index 0000000000..c7d0d82d62
--- /dev/null
+++ b/integration/kotlinx-coroutines-slf4j/build.gradle.kts
@@ -0,0 +1,14 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+dependencies {
+ compile("org.slf4j:slf4j-api:1.7.25")
+ testCompile("io.github.microutils:kotlin-logging:1.5.4")
+ testRuntime("ch.qos.logback:logback-classic:1.2.3")
+ testRuntime("ch.qos.logback:logback-core:1.2.3")
+}
+
+externalDocumentationLink(
+ url = "https://www.slf4j.org/apidocs/"
+)
diff --git a/js/example-frontend-js/README.md b/js/example-frontend-js/README.md
index 4e534e427a..ad61372dc9 100644
--- a/js/example-frontend-js/README.md
+++ b/js/example-frontend-js/README.md
@@ -3,7 +3,7 @@
Build application with
```
-gradlew :example-frontend-js:bundle
+gradlew :example-frontend-js:build
```
The resulting application can be found in `build/dist` subdirectory.
@@ -11,7 +11,7 @@ The resulting application can be found in `build/dist` subdirectory.
You can start application with webpack-dev-server using:
```
-gradlew :example-frontend-js:start
+gradlew :example-frontend-js:run
```
Built and deployed application is available at the library documentation site
diff --git a/js/example-frontend-js/build.gradle b/js/example-frontend-js/build.gradle
index 735a70d55b..7abde63964 100644
--- a/js/example-frontend-js/build.gradle
+++ b/js/example-frontend-js/build.gradle
@@ -2,53 +2,32 @@
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-apply plugin: 'kotlin-dce-js'
-apply from: rootProject.file('gradle/node-js.gradle')
-
-// Workaround resolving new Gradle metadata with kotlin2js
-// TODO: Remove once KT-37188 is fixed
-try {
- def jsCompilerType = Class.forName("org.jetbrains.kotlin.gradle.targets.js.KotlinJsCompilerAttribute")
- def jsCompilerAttr = Attribute.of("org.jetbrains.kotlin.js.compiler", jsCompilerType)
- project.dependencies.attributesSchema.attribute(jsCompilerAttr)
- configurations {
- matching {
- it.name.endsWith("Classpath")
- }.forEach {
- it.attributes.attribute(jsCompilerAttr, jsCompilerType.legacy)
+project.kotlin {
+ js(LEGACY) {
+ binaries.executable()
+ browser {
+ distribution {
+ directory = new File(directory.parentFile, "dist")
+ }
+ webpackTask {
+ cssSupport.enabled = true
+ }
+ runTask {
+ cssSupport.enabled = true
+ }
+ testTask {
+ useKarma {
+ useChromeHeadless()
+ webpackConfig.cssSupport.enabled = true
+ }
+ }
}
}
-} catch (java.lang.ClassNotFoundException e) {
- // org.jetbrains.kotlin.gradle.targets.js.JsCompilerType is missing in 1.3.x
- // But 1.3.x doesn't generate Gradle metadata, so this workaround is not needed
-}
-
-dependencies {
- compile "org.jetbrains.kotlinx:kotlinx-html-js:$html_version"
-}
-
-compileKotlin2Js {
- kotlinOptions {
- main = "call"
- }
-}
-
-task bundle(type: NpmTask, dependsOn: [npmInstall, runDceKotlinJs]) {
- inputs.files(fileTree("$buildDir/kotlin-js-min/main"))
- inputs.files(fileTree(file("src/main/web")))
- inputs.file("npm/webpack.config.js")
- outputs.dir("$buildDir/dist")
- args = ["run", "bundle"]
-}
-task start(type: NpmTask, dependsOn: bundle) {
- args = ["run", "start"]
-}
-
-// we have not tests but kotlin-dce-js still tries to work with them and crashed.
-// todo: Remove when KT-22028 is fixed
-afterEvaluate {
- if (tasks.findByName('unpackDependenciesTestKotlinJs')) {
- tasks.unpackDependenciesTestKotlinJs.enabled = false
+ sourceSets {
+ main.dependencies {
+ implementation "org.jetbrains.kotlinx:kotlinx-html-js:$html_version"
+ implementation(npm("html-webpack-plugin", "3.2.0"))
+ }
}
}
diff --git a/js/example-frontend-js/npm/package.json b/js/example-frontend-js/npm/package.json
deleted file mode 100644
index 7668cefba3..0000000000
--- a/js/example-frontend-js/npm/package.json
+++ /dev/null
@@ -1,22 +0,0 @@
-{
- "name": "example-frontend-js",
- "version": "$version",
- "license": "Apache-2.0",
- "repository": {
- "type": "git",
- "url": "https://github.com/Kotlin/kotlinx.coroutines.git"
- },
- "devDependencies": {
- "webpack": "4.29.1",
- "webpack-cli": "3.2.3",
- "webpack-dev-server": "3.1.14",
- "html-webpack-plugin": "3.2.0",
- "uglifyjs-webpack-plugin": "2.1.1",
- "style-loader": "0.23.1",
- "css-loader": "2.1.0"
- },
- "scripts": {
- "bundle": "webpack",
- "start": "webpack-dev-server --open --no-inline"
- }
-}
diff --git a/js/example-frontend-js/npm/webpack.config.js b/js/example-frontend-js/npm/webpack.config.js
deleted file mode 100644
index a208d047b3..0000000000
--- a/js/example-frontend-js/npm/webpack.config.js
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-// This script is copied to "build" directory and run from there
-
-const webpack = require("webpack");
-const HtmlWebpackPlugin = require('html-webpack-plugin');
-const UglifyJSPlugin = require('uglifyjs-webpack-plugin');
-const path = require("path");
-
-const dist = path.resolve(__dirname, "dist");
-
-module.exports = {
- mode: "production",
- entry: {
- main: "main"
- },
- output: {
- filename: "[name].bundle.js",
- path: dist,
- publicPath: ""
- },
- devServer: {
- contentBase: dist
- },
- module: {
- rules: [
- {
- test: /\.css$/,
- use: [
- 'style-loader',
- 'css-loader'
- ]
- }
- ]
- },
- resolve: {
- modules: [
- path.resolve(__dirname, "kotlin-js-min/main"),
- path.resolve(__dirname, "../src/main/web/")
- ]
- },
- devtool: 'source-map',
- plugins: [
- new HtmlWebpackPlugin({
- title: 'Kotlin Coroutines JS Example'
- }),
- new UglifyJSPlugin({
- sourceMap: true
- })
- ]
-};
diff --git a/js/example-frontend-js/src/ExampleMain.kt b/js/example-frontend-js/src/ExampleMain.kt
index 25a73c61c0..da6e4196a6 100644
--- a/js/example-frontend-js/src/ExampleMain.kt
+++ b/js/example-frontend-js/src/ExampleMain.kt
@@ -13,7 +13,10 @@ import kotlin.coroutines.*
import kotlin.math.*
import kotlin.random.Random
+external fun require(resource: String)
+
fun main() {
+ require("style.css")
println("Starting example application...")
document.addEventListener("DOMContentLoaded", {
Application().start()
diff --git a/js/example-frontend-js/webpack.config.d/custom-config.js b/js/example-frontend-js/webpack.config.d/custom-config.js
new file mode 100644
index 0000000000..21939bece0
--- /dev/null
+++ b/js/example-frontend-js/webpack.config.d/custom-config.js
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+;(function (config) {
+ const HtmlWebpackPlugin = require('html-webpack-plugin');
+
+ config.output.filename = "[name].bundle.js"
+
+ config.plugins.push(
+ new HtmlWebpackPlugin({
+ title: 'Kotlin Coroutines JS Example'
+ })
+ )
+
+ // path from /js/packages/example-frontend-js to src/main/web
+ config.resolve.modules.push("../../../../js/example-frontend-js/src/main/web");
+})(config);
diff --git a/knit.properties b/knit.properties
new file mode 100644
index 0000000000..bc177ce44c
--- /dev/null
+++ b/knit.properties
@@ -0,0 +1,16 @@
+#
+# Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+#
+
+knit.include=docs/knit.code.include
+test.template=docs/knit.test.template
+
+# Various test validation modes and their corresponding methods from TestUtil
+test.mode.=verifyLines
+test.mode.STARTS_WITH=verifyLinesStartWith
+test.mode.ARBITRARY_TIME=verifyLinesArbitraryTime
+test.mode.FLEXIBLE_TIME=verifyLinesFlexibleTime
+test.mode.FLEXIBLE_THREAD=verifyLinesFlexibleThread
+test.mode.LINES_START_UNORDERED=verifyLinesStartUnordered
+test.mode.LINES_START=verifyLinesStart
+test.mode.EXCEPTION=verifyExceptions
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
index 36cbdb6960..bb1c0f36ab 100644
--- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
+++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
@@ -46,6 +46,7 @@ public abstract interface class kotlinx/coroutines/CancellableContinuation : kot
public abstract fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
public abstract fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
+ public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
public abstract fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
}
@@ -56,6 +57,8 @@ public final class kotlinx/coroutines/CancellableContinuation$DefaultImpls {
public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation {
public fun (Lkotlin/coroutines/Continuation;I)V
+ public final fun callCancelHandler (Lkotlinx/coroutines/CancelHandler;Ljava/lang/Throwable;)V
+ public final fun callOnCancellation (Lkotlin/jvm/functions/Function1;Ljava/lang/Throwable;)V
public fun cancel (Ljava/lang/Throwable;)Z
public fun completeResume (Ljava/lang/Object;)V
public fun getCallerFrame ()Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;
@@ -75,14 +78,12 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
public fun resumeWith (Ljava/lang/Object;)V
public fun toString ()Ljava/lang/String;
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
+ public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
public fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
}
public final class kotlinx/coroutines/CancellableContinuationKt {
public static final fun disposeOnCancellation (Lkotlinx/coroutines/CancellableContinuation;Lkotlinx/coroutines/DisposableHandle;)V
- public static final fun suspendAtomicCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
- public static final fun suspendAtomicCancellableCoroutine (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
- public static synthetic fun suspendAtomicCancellableCoroutine$default (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun suspendCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}
@@ -257,24 +258,21 @@ public final class kotlinx/coroutines/Deferred$DefaultImpls {
public abstract interface class kotlinx/coroutines/Delay {
public abstract fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
- public abstract fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle;
+ public abstract fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle;
public abstract fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V
}
public final class kotlinx/coroutines/Delay$DefaultImpls {
public static fun delay (Lkotlinx/coroutines/Delay;JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
- public static fun invokeOnTimeout (Lkotlinx/coroutines/Delay;JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle;
+ public static fun invokeOnTimeout (Lkotlinx/coroutines/Delay;JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle;
}
public final class kotlinx/coroutines/DelayKt {
+ public static final fun awaitCancellation (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun delay-p9JZ4hM (DLkotlin/coroutines/Continuation;)Ljava/lang/Object;
}
-public final class kotlinx/coroutines/DispatchedContinuationKt {
- public static final fun resumeCancellableWith (Lkotlin/coroutines/Continuation;Ljava/lang/Object;)V
-}
-
public final class kotlinx/coroutines/Dispatchers {
public static final field INSTANCE Lkotlinx/coroutines/Dispatchers;
public static final fun getDefault ()Lkotlinx/coroutines/CoroutineDispatcher;
@@ -580,6 +578,14 @@ public final class kotlinx/coroutines/channels/BroadcastKt {
public static synthetic fun broadcast$default (Lkotlinx/coroutines/channels/ReceiveChannel;ILkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
}
+public final class kotlinx/coroutines/channels/BufferOverflow : java/lang/Enum {
+ public static final field DROP_LATEST Lkotlinx/coroutines/channels/BufferOverflow;
+ public static final field DROP_OLDEST Lkotlinx/coroutines/channels/BufferOverflow;
+ public static final field SUSPEND Lkotlinx/coroutines/channels/BufferOverflow;
+ public static fun valueOf (Ljava/lang/String;)Lkotlinx/coroutines/channels/BufferOverflow;
+ public static fun values ()[Lkotlinx/coroutines/channels/BufferOverflow;
+}
+
public abstract interface class kotlinx/coroutines/channels/Channel : kotlinx/coroutines/channels/ReceiveChannel, kotlinx/coroutines/channels/SendChannel {
public static final field BUFFERED I
public static final field CONFLATED I
@@ -612,8 +618,10 @@ public final class kotlinx/coroutines/channels/ChannelIterator$DefaultImpls {
}
public final class kotlinx/coroutines/channels/ChannelKt {
- public static final fun Channel (I)Lkotlinx/coroutines/channels/Channel;
+ public static final synthetic fun Channel (I)Lkotlinx/coroutines/channels/Channel;
+ public static final fun Channel (ILkotlinx/coroutines/channels/BufferOverflow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/channels/Channel;
public static synthetic fun Channel$default (IILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
+ public static synthetic fun Channel$default (ILkotlinx/coroutines/channels/BufferOverflow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
}
public final class kotlinx/coroutines/channels/ChannelsKt {
@@ -868,7 +876,7 @@ public final class kotlinx/coroutines/debug/internal/DebuggerInfo : java/io/Seri
public final fun getState ()Ljava/lang/String;
}
-public abstract class kotlinx/coroutines/flow/AbstractFlow : kotlinx/coroutines/flow/Flow {
+public abstract class kotlinx/coroutines/flow/AbstractFlow : kotlinx/coroutines/flow/CancellableFlow, kotlinx/coroutines/flow/Flow {
public fun ()V
public final fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun collectSafely (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
@@ -895,10 +903,15 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun asFlow ([I)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow ([J)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun asSharedFlow (Lkotlinx/coroutines/flow/MutableSharedFlow;)Lkotlinx/coroutines/flow/SharedFlow;
+ public static final fun asStateFlow (Lkotlinx/coroutines/flow/MutableStateFlow;)Lkotlinx/coroutines/flow/StateFlow;
public static final fun broadcastIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
- public static final fun buffer (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
+ public static final synthetic fun buffer (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
+ public static final fun buffer (Lkotlinx/coroutines/flow/Flow;ILkotlinx/coroutines/channels/BufferOverflow;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun buffer$default (Lkotlinx/coroutines/flow/Flow;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
+ public static synthetic fun buffer$default (Lkotlinx/coroutines/flow/Flow;ILkotlinx/coroutines/channels/BufferOverflow;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun cache (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun callbackFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun cancellable (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun catch (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
@@ -988,10 +1001,15 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun onErrorReturn (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun onErrorReturn$default (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun onStart (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun onSubscription (Lkotlinx/coroutines/flow/SharedFlow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/SharedFlow;
public static final fun produceIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
+ public static final fun publish (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun publish (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static final fun publishOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
public static final fun receiveAsFlow (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/flow/Flow;
public static final fun reduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final fun replay (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun replay (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static final synthetic fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun retry (Lkotlinx/coroutines/flow/Flow;JLkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
@@ -1003,11 +1021,15 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun scan (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun scanFold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun scanReduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun shareIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;I)Lkotlinx/coroutines/flow/SharedFlow;
+ public static synthetic fun shareIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;IILjava/lang/Object;)Lkotlinx/coroutines/flow/SharedFlow;
public static final fun single (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun singleOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun skip (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static final fun startWith (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun startWith (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun stateIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final fun stateIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;Ljava/lang/Object;)Lkotlinx/coroutines/flow/StateFlow;
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;)V
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)V
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)V
@@ -1029,17 +1051,63 @@ public final class kotlinx/coroutines/flow/FlowKt {
}
public final class kotlinx/coroutines/flow/LintKt {
+ public static final fun cancel (Lkotlinx/coroutines/flow/FlowCollector;Ljava/util/concurrent/CancellationException;)V
+ public static synthetic fun cancel$default (Lkotlinx/coroutines/flow/FlowCollector;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V
+ public static final fun cancellable (Lkotlinx/coroutines/flow/SharedFlow;)Lkotlinx/coroutines/flow/Flow;
public static final fun conflate (Lkotlinx/coroutines/flow/StateFlow;)Lkotlinx/coroutines/flow/Flow;
public static final fun distinctUntilChanged (Lkotlinx/coroutines/flow/StateFlow;)Lkotlinx/coroutines/flow/Flow;
- public static final fun flowOn (Lkotlinx/coroutines/flow/StateFlow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun flowOn (Lkotlinx/coroutines/flow/SharedFlow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun getCoroutineContext (Lkotlinx/coroutines/flow/FlowCollector;)Lkotlin/coroutines/CoroutineContext;
+ public static final fun isActive (Lkotlinx/coroutines/flow/FlowCollector;)Z
}
-public abstract interface class kotlinx/coroutines/flow/MutableStateFlow : kotlinx/coroutines/flow/StateFlow {
+public abstract interface class kotlinx/coroutines/flow/MutableSharedFlow : kotlinx/coroutines/flow/FlowCollector, kotlinx/coroutines/flow/SharedFlow {
+ public abstract fun getSubscriptionCount ()Lkotlinx/coroutines/flow/StateFlow;
+ public abstract fun resetReplayCache ()V
+ public abstract fun tryEmit (Ljava/lang/Object;)Z
+}
+
+public abstract interface class kotlinx/coroutines/flow/MutableStateFlow : kotlinx/coroutines/flow/MutableSharedFlow, kotlinx/coroutines/flow/StateFlow {
+ public abstract fun compareAndSet (Ljava/lang/Object;Ljava/lang/Object;)Z
public abstract fun getValue ()Ljava/lang/Object;
public abstract fun setValue (Ljava/lang/Object;)V
}
-public abstract interface class kotlinx/coroutines/flow/StateFlow : kotlinx/coroutines/flow/Flow {
+public abstract interface class kotlinx/coroutines/flow/SharedFlow : kotlinx/coroutines/flow/Flow {
+ public abstract fun getReplayCache ()Ljava/util/List;
+}
+
+public final class kotlinx/coroutines/flow/SharedFlowKt {
+ public static final fun MutableSharedFlow (IILkotlinx/coroutines/channels/BufferOverflow;)Lkotlinx/coroutines/flow/MutableSharedFlow;
+ public static synthetic fun MutableSharedFlow$default (IILkotlinx/coroutines/channels/BufferOverflow;ILjava/lang/Object;)Lkotlinx/coroutines/flow/MutableSharedFlow;
+}
+
+public final class kotlinx/coroutines/flow/SharingCommand : java/lang/Enum {
+ public static final field START Lkotlinx/coroutines/flow/SharingCommand;
+ public static final field STOP Lkotlinx/coroutines/flow/SharingCommand;
+ public static final field STOP_AND_RESET_REPLAY_CACHE Lkotlinx/coroutines/flow/SharingCommand;
+ public static fun valueOf (Ljava/lang/String;)Lkotlinx/coroutines/flow/SharingCommand;
+ public static fun values ()[Lkotlinx/coroutines/flow/SharingCommand;
+}
+
+public abstract interface class kotlinx/coroutines/flow/SharingStarted {
+ public static final field Companion Lkotlinx/coroutines/flow/SharingStarted$Companion;
+ public abstract fun command (Lkotlinx/coroutines/flow/StateFlow;)Lkotlinx/coroutines/flow/Flow;
+}
+
+public final class kotlinx/coroutines/flow/SharingStarted$Companion {
+ public final fun WhileSubscribed (JJ)Lkotlinx/coroutines/flow/SharingStarted;
+ public static synthetic fun WhileSubscribed$default (Lkotlinx/coroutines/flow/SharingStarted$Companion;JJILjava/lang/Object;)Lkotlinx/coroutines/flow/SharingStarted;
+ public final fun getEagerly ()Lkotlinx/coroutines/flow/SharingStarted;
+ public final fun getLazily ()Lkotlinx/coroutines/flow/SharingStarted;
+}
+
+public final class kotlinx/coroutines/flow/SharingStartedKt {
+ public static final fun WhileSubscribed-9tZugJw (Lkotlinx/coroutines/flow/SharingStarted$Companion;DD)Lkotlinx/coroutines/flow/SharingStarted;
+ public static synthetic fun WhileSubscribed-9tZugJw$default (Lkotlinx/coroutines/flow/SharingStarted$Companion;DDILjava/lang/Object;)Lkotlinx/coroutines/flow/SharingStarted;
+}
+
+public abstract interface class kotlinx/coroutines/flow/StateFlow : kotlinx/coroutines/flow/SharedFlow {
public abstract fun getValue ()Ljava/lang/Object;
}
@@ -1050,13 +1118,15 @@ public final class kotlinx/coroutines/flow/StateFlowKt {
public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/coroutines/flow/internal/FusibleFlow {
public final field capacity I
public final field context Lkotlin/coroutines/CoroutineContext;
- public fun (Lkotlin/coroutines/CoroutineContext;I)V
- public fun additionalToStringProps ()Ljava/lang/String;
+ public final field onBufferOverflow Lkotlinx/coroutines/channels/BufferOverflow;
+ public fun (Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/channels/BufferOverflow;)V
+ protected fun additionalToStringProps ()Ljava/lang/String;
public fun broadcastImpl (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
public fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
protected abstract fun collectTo (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
- protected abstract fun create (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;
- public fun fuse (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/FusibleFlow;
+ protected abstract fun create (Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/channels/BufferOverflow;)Lkotlinx/coroutines/flow/internal/ChannelFlow;
+ public fun dropChannelOperators ()Lkotlinx/coroutines/flow/Flow;
+ public fun fuse (Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/channels/BufferOverflow;)Lkotlinx/coroutines/flow/Flow;
public fun produceImpl (Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
public fun toString ()Ljava/lang/String;
}
@@ -1070,11 +1140,11 @@ public final class kotlinx/coroutines/flow/internal/FlowExceptions_commonKt {
}
public abstract interface class kotlinx/coroutines/flow/internal/FusibleFlow : kotlinx/coroutines/flow/Flow {
- public abstract fun fuse (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/FusibleFlow;
+ public abstract fun fuse (Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/channels/BufferOverflow;)Lkotlinx/coroutines/flow/Flow;
}
public final class kotlinx/coroutines/flow/internal/FusibleFlow$DefaultImpls {
- public static synthetic fun fuse$default (Lkotlinx/coroutines/flow/internal/FusibleFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/FusibleFlow;
+ public static synthetic fun fuse$default (Lkotlinx/coroutines/flow/internal/FusibleFlow;Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/channels/BufferOverflow;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
}
public final class kotlinx/coroutines/flow/internal/SafeCollector_commonKt {
diff --git a/kotlinx-coroutines-core/build.gradle b/kotlinx-coroutines-core/build.gradle
index 59dc5da894..f98f6a529c 100644
--- a/kotlinx-coroutines-core/build.gradle
+++ b/kotlinx-coroutines-core/build.gradle
@@ -2,14 +2,61 @@
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-apply plugin: 'kotlin-multiplatform'
-apply from: rootProject.file("gradle/targets.gradle")
+apply plugin: 'org.jetbrains.kotlin.multiplatform'
apply from: rootProject.file("gradle/compile-jvm-multiplatform.gradle")
apply from: rootProject.file("gradle/compile-common.gradle")
apply from: rootProject.file("gradle/compile-js-multiplatform.gradle")
apply from: rootProject.file("gradle/compile-native-multiplatform.gradle")
apply from: rootProject.file('gradle/publish-npm-js.gradle')
+/* ==========================================================================
+ Configure source sets structure for kotlinx-coroutines-core:
+
+ TARGETS SOURCE SETS
+ ------- ----------------------------------------------
+
+ js -----------------------------------------------------+
+ |
+ V
+ jvm -------------------------------> concurrent ---> common
+ ^
+ ios \ |
+ macos | ---> nativeDarwin ---> native --+
+ tvos | ^
+ watchos / |
+ |
+ linux \ ---> nativeOther -------+
+ mingw /
+
+ ========================================================================== */
+
+project.ext.sourceSetSuffixes = ["Main", "Test"]
+
+void defineSourceSet(newName, dependsOn, includedInPred) {
+ for (suffix in project.ext.sourceSetSuffixes) {
+ def newSS = kotlin.sourceSets.maybeCreate(newName + suffix)
+ for (dep in dependsOn) {
+ newSS.dependsOn(kotlin.sourceSets[dep + suffix])
+ }
+ for (curSS in kotlin.sourceSets) {
+ def curName = curSS.name
+ if (curName.endsWith(suffix)) {
+ def prefix = curName.substring(0, curName.length() - suffix.length())
+ if (includedInPred(prefix)) curSS.dependsOn(newSS)
+ }
+ }
+ }
+}
+
+static boolean isNativeDarwin(String name) { return ["ios", "macos", "tvos", "watchos"].any { name.startsWith(it) } }
+static boolean isNativeOther(String name) { return ["linux", "mingw"].any { name.startsWith(it) } }
+
+defineSourceSet("concurrent", ["common"]) { it in ["jvm", "native"] }
+defineSourceSet("nativeDarwin", ["native"]) { isNativeDarwin(it) }
+defineSourceSet("nativeOther", ["native"]) { isNativeOther(it) }
+
+/* ========================================================================== */
+
/*
* All platform plugins and configuration magic happens here instead of build.gradle
* because JMV-only projects depend on core, thus core should always be initialized before configuration.
@@ -18,7 +65,7 @@ kotlin {
configure(sourceSets) {
def srcDir = name.endsWith('Main') ? 'src' : 'test'
def platform = name[0..-5]
- kotlin.srcDir "$platform/$srcDir"
+ kotlin.srcDirs = ["$platform/$srcDir"]
if (name == "jvmMain") {
resources.srcDirs = ["$platform/resources"]
} else if (name == "jvmTest") {
@@ -31,12 +78,18 @@ kotlin {
}
configure(targets) {
- def targetName = it.name
- compilations.all { compilation ->
- def compileTask = tasks.getByName(compilation.compileKotlinTaskName)
- // binary compatibility support
- if (targetName.contains("jvm") && compilation.compilationName == "main") {
- compileTask.kotlinOptions.freeCompilerArgs += ["-Xdump-declarations-to=${buildDir}/visibilities.json"]
+ // Configure additional binaries and test runs -- one for each OS
+ if (["macos", "linux", "mingw"].any { name.startsWith(it) }) {
+ binaries {
+ // Test for memory leaks using a special entry point that does not exit but returns from main
+ binaries.getTest("DEBUG").freeCompilerArgs += ["-e", "kotlinx.coroutines.mainNoExit"]
+ // Configure a separate test where code runs in background
+ test("background", [org.jetbrains.kotlin.gradle.plugin.mpp.NativeBuildType.DEBUG]) {
+ freeCompilerArgs += ["-e", "kotlinx.coroutines.mainBackground"]
+ }
+ }
+ testRuns {
+ background { setExecutionSourceFrom(binaries.backgroundDebugTest) }
}
}
}
@@ -54,23 +107,52 @@ compileKotlinMetadata {
}
}
+// :KLUDGE: Idea.active: This is needed to workaround resolve problems after importing this project to IDEA
+def configureNativeSourceSetPreset(name, preset) {
+ def hostMainCompilation = project.kotlin.targetFromPreset(preset).compilations.main
+ // Look for platform libraries in "implementation" for default source set
+ def implementationConfiguration = configurations[hostMainCompilation.defaultSourceSet.implementationMetadataConfigurationName]
+ // Now find the libraries: Finds platform libs & stdlib, but platform declarations are still not resolved due to IDE bugs
+ def hostNativePlatformLibs = files(
+ provider {
+ implementationConfiguration.findAll {
+ it.path.endsWith(".klib") || it.absolutePath.contains("klib${File.separator}platform") || it.absolutePath.contains("stdlib")
+ }
+ }
+ )
+ // Add all those dependencies
+ for (suffix in sourceSetSuffixes) {
+ configure(kotlin.sourceSets[name + suffix]) {
+ dependencies.add(implementationMetadataConfigurationName, hostNativePlatformLibs)
+ }
+ }
+}
+
+// :KLUDGE: Idea.active: Configure platform libraries for native source sets when working in IDEA
+if (Idea.active) {
+ def manager = project.ext.hostManager
+ def linuxPreset = kotlin.presets.linuxX64
+ def macosPreset = kotlin.presets.macosX64
+ // linux should be always available (cross-compilation capable) -- use it as default
+ assert manager.isEnabled(linuxPreset.konanTarget)
+ // use macOS libs for nativeDarwin if available
+ def macosAvailable = manager.isEnabled(macosPreset.konanTarget)
+ // configure source sets
+ configureNativeSourceSetPreset("native", linuxPreset)
+ configureNativeSourceSetPreset("nativeOther", linuxPreset)
+ configureNativeSourceSetPreset("nativeDarwin", macosAvailable ? macosPreset : linuxPreset)
+}
+
kotlin.sourceSets {
jvmMain.dependencies {
compileOnly "com.google.android:annotations:4.1.1.4"
}
jvmTest.dependencies {
- // This is a workaround for https://youtrack.jetbrains.com/issue/KT-39037
- def excludingCurrentProject = { dependency ->
- def result = project.dependencies.create(dependency)
- result.exclude(module: project.name)
- return result
- }
-
- api(excludingCurrentProject("org.jetbrains.kotlinx:lincheck:$lincheck_version"))
+ api "org.jetbrains.kotlinx:lincheck:$lincheck_version"
api "org.jetbrains.kotlinx:kotlinx-knit-test:$knit_version"
api "com.esotericsoftware:kryo:4.0.0"
- implementation(excludingCurrentProject(project(":android-unit-tests")))
+ implementation project(":android-unit-tests")
}
}
@@ -97,7 +179,7 @@ jvmTest {
enableAssertions = true
systemProperty 'java.security.manager', 'kotlinx.coroutines.TestSecurityManager'
// 'stress' is required to be able to run all subpackage tests like ":jvmTests --tests "*channels*" -Pstress=true"
- if (!project.ext.ideaActive && rootProject.properties['stress'] == null) {
+ if (!Idea.active && rootProject.properties['stress'] == null) {
exclude '**/*StressTest.*'
}
systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test
diff --git a/kotlinx-coroutines-core/common/src/Await.kt b/kotlinx-coroutines-core/common/src/Await.kt
index dd1e1771f2..7189349024 100644
--- a/kotlinx-coroutines-core/common/src/Await.kt
+++ b/kotlinx-coroutines-core/common/src/Await.kt
@@ -5,6 +5,7 @@
package kotlinx.coroutines
import kotlinx.atomicfu.*
+import kotlinx.coroutines.channels.*
import kotlin.coroutines.*
/**
@@ -18,6 +19,8 @@ import kotlin.coroutines.*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
* this function immediately resumes with [CancellationException].
+ * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
+ * suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
*/
public suspend fun awaitAll(vararg deferreds: Deferred): List =
if (deferreds.isEmpty()) emptyList() else AwaitAll(deferreds).await()
@@ -33,6 +36,8 @@ public suspend fun awaitAll(vararg deferreds: Deferred): List =
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
* this function immediately resumes with [CancellationException].
+ * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
+ * suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
*/
public suspend fun Collection>.awaitAll(): List =
if (isEmpty()) emptyList() else AwaitAll(toTypedArray()).await()
@@ -41,8 +46,11 @@ public suspend fun Collection>.awaitAll(): List =
* Suspends current coroutine until all given jobs are complete.
* This method is semantically equivalent to joining all given jobs one by one with `jobs.forEach { it.join() }`.
*
- * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
* this function immediately resumes with [CancellationException].
+ * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
+ * suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
*/
public suspend fun joinAll(vararg jobs: Job): Unit = jobs.forEach { it.join() }
@@ -50,8 +58,11 @@ public suspend fun joinAll(vararg jobs: Job): Unit = jobs.forEach { it.join() }
* Suspends current coroutine until all given jobs are complete.
* This method is semantically equivalent to joining all given jobs one by one with `forEach { it.join() }`.
*
- * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
* this function immediately resumes with [CancellationException].
+ * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
+ * suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
*/
public suspend fun Collection.joinAll(): Unit = forEach { it.join() }
diff --git a/kotlinx-coroutines-core/common/src/Builders.common.kt b/kotlinx-coroutines-core/common/src/Builders.common.kt
index 64bff500dc..c0924a0238 100644
--- a/kotlinx-coroutines-core/common/src/Builders.common.kt
+++ b/kotlinx-coroutines-core/common/src/Builders.common.kt
@@ -129,8 +129,9 @@ private class LazyDeferredCoroutine(
* This function uses dispatcher from the new context, shifting execution of the [block] into the
* different thread if a new dispatcher is specified, and back to the original dispatcher
* when it completes. Note that the result of `withContext` invocation is
- * dispatched into the original context in a cancellable way, which means that if the original [coroutineContext],
- * in which `withContext` was invoked, is cancelled by the time its dispatcher starts to execute the code,
+ * dispatched into the original context in a cancellable way with a **prompt cancellation guarantee**,
+ * which means that if the original [coroutineContext], in which `withContext` was invoked,
+ * is cancelled by the time its dispatcher starts to execute the code,
* it discards the result of `withContext` and throws [CancellationException].
*/
public suspend fun withContext(
diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt
index f5b511cb9c..7d9315afbf 100644
--- a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt
@@ -15,6 +15,8 @@ import kotlin.coroutines.intrinsics.*
* When the [cancel] function is explicitly invoked, this continuation immediately resumes with a [CancellationException] or
* the specified cancel cause.
*
+ * An instance of `CancellableContinuation` is created by the [suspendCancellableCoroutine] function.
+ *
* Cancellable continuation has three states (as subset of [Job] states):
*
* | **State** | [isActive] | [isCompleted] | [isCancelled] |
@@ -24,11 +26,11 @@ import kotlin.coroutines.intrinsics.*
* | _Canceled_ (final _completed_ state)| `false` | `true` | `true` |
*
* Invocation of [cancel] transitions this continuation from _active_ to _cancelled_ state, while
- * invocation of [resume] or [resumeWithException] transitions it from _active_ to _resumed_ state.
+ * invocation of [Continuation.resume] or [Continuation.resumeWithException] transitions it from _active_ to _resumed_ state.
*
* A [cancelled][isCancelled] continuation implies that it is [completed][isCompleted].
*
- * Invocation of [resume] or [resumeWithException] in _resumed_ state produces an [IllegalStateException],
+ * Invocation of [Continuation.resume] or [Continuation.resumeWithException] in _resumed_ state produces an [IllegalStateException],
* but is ignored in _cancelled_ state.
*
* ```
@@ -41,7 +43,6 @@ import kotlin.coroutines.intrinsics.*
* +-----------+
* | Cancelled |
* +-----------+
- *
* ```
*/
public interface CancellableContinuation : Continuation {
@@ -76,6 +77,14 @@ public interface CancellableContinuation : Continuation {
@InternalCoroutinesApi
public fun tryResume(value: T, idempotent: Any? = null): Any?
+ /**
+ * Same as [tryResume] but with [onCancellation] handler that called if and only if the value is not
+ * delivered to the caller because of the dispatch in the process, so that atomicity delivery
+ * guaranteed can be provided by having a cancellation fallback.
+ */
+ @InternalCoroutinesApi
+ public fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any?
+
/**
* Tries to resume this continuation with the specified [exception] and returns a non-null object token if successful,
* or `null` otherwise (it was already resumed or cancelled). When a non-null object is returned,
@@ -110,8 +119,8 @@ public interface CancellableContinuation : Continuation {
public fun cancel(cause: Throwable? = null): Boolean
/**
- * Registers a [handler] to be **synchronously** invoked on cancellation (regular or exceptional) of this continuation.
- * When the continuation is already cancelled, the handler will be immediately invoked
+ * Registers a [handler] to be **synchronously** invoked on [cancellation][cancel] (regular or exceptional) of this continuation.
+ * When the continuation is already cancelled, the handler is immediately invoked
* with the cancellation exception. Otherwise, the handler will be invoked as soon as this
* continuation is cancelled.
*
@@ -120,7 +129,15 @@ public interface CancellableContinuation : Continuation {
* processed as an uncaught exception in the context of the current coroutine
* (see [CoroutineExceptionHandler]).
*
- * At most one [handler] can be installed on a continuation.
+ * At most one [handler] can be installed on a continuation. Attempt to call `invokeOnCancellation` second
+ * time produces [IllegalStateException].
+ *
+ * This handler is also called when this continuation [resumes][Continuation.resume] normally (with a value) and then
+ * is cancelled while waiting to be dispatched. More generally speaking, this handler is called whenever
+ * the caller of [suspendCancellableCoroutine] is getting a [CancellationException].
+ *
+ * A typical example for `invokeOnCancellation` usage is given in
+ * the documentation for the [suspendCancellableCoroutine] function.
*
* **Note**: Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe.
* This `handler` can be invoked concurrently with the surrounding code.
@@ -163,7 +180,7 @@ public interface CancellableContinuation : Continuation {
* (see [CoroutineExceptionHandler]).
*
* This function shall be used when resuming with a resource that must be closed by
- * code that called the corresponding suspending function, e.g.:
+ * code that called the corresponding suspending function, for example:
*
* ```
* continuation.resume(resource) {
@@ -171,17 +188,119 @@ public interface CancellableContinuation : Continuation {
* }
* ```
*
+ * A more complete example and further details are given in
+ * the documentation for the [suspendCancellableCoroutine] function.
+ *
* **Note**: The [onCancellation] handler must be fast, non-blocking, and thread-safe.
* It can be invoked concurrently with the surrounding code.
* There is no guarantee on the execution context of its invocation.
*/
@ExperimentalCoroutinesApi // since 1.2.0
- public fun resume(value: T, onCancellation: (cause: Throwable) -> Unit)
+ public fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?)
}
/**
* Suspends the coroutine like [suspendCoroutine], but providing a [CancellableContinuation] to
- * the [block]. This function throws a [CancellationException] if the coroutine is cancelled or completed while suspended.
+ * the [block]. This function throws a [CancellationException] if the [Job] of the coroutine is
+ * cancelled or completed while it is suspended.
+ *
+ * A typical use of this function is to suspend a coroutine while waiting for a result
+ * from a single-shot callback API and to return the result to the caller.
+ * For multi-shot callback APIs see [callbackFlow][kotlinx.coroutines.flow.callbackFlow].
+ *
+ * ```
+ * suspend fun awaitCallback(): T = suspendCancellableCoroutine { continuation ->
+ * val callback = object : Callback { // Implementation of some callback interface
+ * override fun onCompleted(value: T) {
+ * // Resume coroutine with a value provided by the callback
+ * continuation.resume(value)
+ * }
+ * override fun onApiError(cause: Throwable) {
+ * // Resume coroutine with an exception provided by the callback
+ * continuation.resumeWithException(cause)
+ * }
+ * }
+ * // Register callback with an API
+ * api.register(callback)
+ * // Remove callback on cancellation
+ * continuation.invokeOnCancellation { api.unregister(callback) }
+ * // At this point the coroutine is suspended by suspendCancellableCoroutine until callback fires
+ * }
+ * ```
+ *
+ * > The callback `register`/`unregister` methods provided by an external API must be thread-safe, because
+ * > `invokeOnCancellation` block can be called at any time due to asynchronous nature of cancellation, even
+ * > concurrently with the call of the callback.
+ *
+ * ### Prompt cancellation guarantee
+ *
+ * This function provides **prompt cancellation guarantee**.
+ * If the [Job] of the current coroutine was cancelled while this function was suspended it will not resume
+ * successfully.
+ *
+ * The cancellation of the coroutine's job is generally asynchronous with respect to the suspended coroutine.
+ * The suspended coroutine is resumed with the call it to its [Continuation.resumeWith] member function or to
+ * [resume][Continuation.resume] extension function.
+ * However, when coroutine is resumed, it does not immediately start executing, but is passed to its
+ * [CoroutineDispatcher] to schedule its execution when dispatcher's resources become available for execution.
+ * The job's cancellation can happen both before, after, and concurrently with the call to `resume`. In any
+ * case, prompt cancellation guarantees that the the coroutine will not resume its code successfully.
+ *
+ * If the coroutine was resumed with an exception (for example, using [Continuation.resumeWithException] extension
+ * function) and cancelled, then the resulting exception of the `suspendCancellableCoroutine` function is determined
+ * by whichever action (exceptional resume or cancellation) that happened first.
+ *
+ * ### Returning resources from a suspended coroutine
+ *
+ * As a result of a prompt cancellation guarantee, when a closeable resource
+ * (like open file or a handle to another native resource) is returned from a suspended coroutine as a value
+ * it can be lost when the coroutine is cancelled. In order to ensure that the resource can be properly closed
+ * in this case, the [CancellableContinuation] interface provides two functions.
+ *
+ * * [invokeOnCancellation][CancellableContinuation.invokeOnCancellation] installs a handler that is called
+ * whenever a suspend coroutine is being cancelled. In addition to the example at the beginning, it can be
+ * used to ensure that a resource that was opened before the call to
+ * `suspendCancellableCoroutine` or in its body is closed in case of cancellation.
+ *
+ * ```
+ * suspendCancellableCoroutine { continuation ->
+ * val resource = openResource() // Opens some resource
+ * continuation.invokeOnCancellation {
+ * resource.close() // Ensures the resource is closed on cancellation
+ * }
+ * // ...
+ * }
+ * ```
+ *
+ * * [resume(value) { ... }][CancellableContinuation.resume] method on a [CancellableContinuation] takes
+ * an optional `onCancellation` block. It can be used when resuming with a resource that must be closed by
+ * the code that called the corresponding suspending function.
+ *
+ * ```
+ * suspendCancellableCoroutine { continuation ->
+ * val callback = object : Callback { // Implementation of some callback interface
+ * // A callback provides a reference to some closeable resource
+ * override fun onCompleted(resource: T) {
+ * // Resume coroutine with a value provided by the callback and ensure the resource is closed in case
+ * // when the coroutine is cancelled before the caller gets a reference to the resource.
+ * continuation.resume(resource) {
+ * resource.close() // Close the resource on cancellation
+ * }
+ * }
+ * // ...
+ * }
+ * ```
+ *
+ * ### Implementation details and custom continuation interceptors
+ *
+ * The prompt cancellation guarantee is the result of a coordinated implementation inside `suspendCancellableCoroutine`
+ * function and the [CoroutineDispatcher] class. The coroutine dispatcher checks for the status of the [Job] immediately
+ * before continuing its normal execution and aborts this normal execution, calling all the corresponding
+ * cancellation handlers, if the job was cancelled.
+ *
+ * If a custom implementation of [ContinuationInterceptor] is used in a coroutine's context that does not extend
+ * [CoroutineDispatcher] class, then there is no prompt cancellation guarantee. A custom continuation interceptor
+ * can resume execution of a previously suspended coroutine even if its job was already cancelled.
*/
public suspend inline fun suspendCancellableCoroutine(
crossinline block: (CancellableContinuation) -> Unit
@@ -199,29 +318,10 @@ public suspend inline fun suspendCancellableCoroutine(
}
/**
- * Suspends the coroutine like [suspendCancellableCoroutine], but with *atomic cancellation*.
- *
- * When the suspended function throws a [CancellationException], it means that the continuation was not resumed.
- * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
- * continue to execute even after it was cancelled from the same thread in the case when the continuation
- * was already resumed and was posted for execution to the thread's queue.
- *
- * @suppress **This an internal API and should not be used from general code.**
+ * Suspends the coroutine similar to [suspendCancellableCoroutine], but an instance of
+ * [CancellableContinuationImpl] is reused.
*/
-@InternalCoroutinesApi
-public suspend inline fun suspendAtomicCancellableCoroutine(
- crossinline block: (CancellableContinuation) -> Unit
-): T =
- suspendCoroutineUninterceptedOrReturn { uCont ->
- val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_ATOMIC_DEFAULT)
- block(cancellable)
- cancellable.getResult()
- }
-
-/**
- * Suspends coroutine similar to [suspendAtomicCancellableCoroutine], but an instance of [CancellableContinuationImpl] is reused if possible.
- */
-internal suspend inline fun suspendAtomicCancellableCoroutineReusable(
+internal suspend inline fun suspendCancellableCoroutineReusable(
crossinline block: (CancellableContinuation) -> Unit
): T = suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
@@ -232,12 +332,12 @@ internal suspend inline fun suspendAtomicCancellableCoroutineReusable(
internal fun getOrCreateCancellableContinuation(delegate: Continuation): CancellableContinuationImpl {
// If used outside of our dispatcher
if (delegate !is DispatchedContinuation) {
- return CancellableContinuationImpl(delegate, resumeMode = MODE_ATOMIC_DEFAULT)
+ return CancellableContinuationImpl(delegate, MODE_CANCELLABLE_REUSABLE)
}
/*
* Attempt to claim reusable instance.
*
- * suspendAtomicCancellableCoroutineReusable { // <- claimed
+ * suspendCancellableCoroutineReusable { // <- claimed
* // Any asynchronous cancellation is "postponed" while this block
* // is being executed
* } // postponed cancellation is checked here.
@@ -248,26 +348,13 @@ internal fun getOrCreateCancellableContinuation(delegate: Continuation):
* thus leaking CC instance for indefinite time.
* 2) Continuation was cancelled. Then we should prevent any further reuse and bail out.
*/
- return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState() }
- ?: return CancellableContinuationImpl(delegate, MODE_ATOMIC_DEFAULT)
+ return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetStateReusable() }
+ ?: return CancellableContinuationImpl(delegate, MODE_CANCELLABLE_REUSABLE)
}
/**
- * @suppress **Deprecated**
- */
-@Deprecated(
- message = "holdCancellability parameter is deprecated and is no longer used",
- replaceWith = ReplaceWith("suspendAtomicCancellableCoroutine(block)")
-)
-@InternalCoroutinesApi
-public suspend inline fun suspendAtomicCancellableCoroutine(
- holdCancellability: Boolean = false,
- crossinline block: (CancellableContinuation) -> Unit
-): T =
- suspendAtomicCancellableCoroutine(block)
-
-/**
- * Removes the specified [node] on cancellation.
+ * Removes the specified [node] on cancellation. This function assumes that this node is already
+ * removed on successful resume and does not try to remove it if the continuation is cancelled during dispatch.
*/
internal fun CancellableContinuation<*>.removeOnCancellation(node: LockFreeLinkedListNode) =
invokeOnCancellation(handler = RemoveOnCancel(node).asHandler)
@@ -288,7 +375,7 @@ public fun CancellableContinuation<*>.disposeOnCancellation(handle: DisposableHa
// --------------- implementation details ---------------
-private class RemoveOnCancel(private val node: LockFreeLinkedListNode) : CancelHandler() {
+private class RemoveOnCancel(private val node: LockFreeLinkedListNode) : BeforeResumeCancelHandler() {
override fun invoke(cause: Throwable?) { node.remove() }
override fun toString() = "RemoveOnCancel[$node]"
}
diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
index e25ebd3a37..cdb1b78882 100644
--- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
+++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
@@ -27,6 +27,10 @@ internal open class CancellableContinuationImpl(
final override val delegate: Continuation,
resumeMode: Int
) : DispatchedTask(resumeMode), CancellableContinuation, CoroutineStackFrame {
+ init {
+ assert { resumeMode != MODE_UNINITIALIZED } // invalid mode for CancellableContinuationImpl
+ }
+
public override val context: CoroutineContext = delegate.context
/*
@@ -88,15 +92,17 @@ internal open class CancellableContinuationImpl(
private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this)
/**
- * Resets cancellability state in order to [suspendAtomicCancellableCoroutineReusable] to work.
- * Invariant: used only by [suspendAtomicCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state.
+ * Resets cancellability state in order to [suspendCancellableCoroutineReusable] to work.
+ * Invariant: used only by [suspendCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state.
*/
- @JvmName("resetState") // Prettier stack traces
- internal fun resetState(): Boolean {
+ @JvmName("resetStateReusable") // Prettier stack traces
+ internal fun resetStateReusable(): Boolean {
+ assert { resumeMode == MODE_CANCELLABLE_REUSABLE } // invalid mode for CancellableContinuationImpl
assert { parentHandle !== NonDisposableHandle }
val state = _state.value
assert { state !is NotCompleted }
- if (state is CompletedIdempotentResult) {
+ if (state is CompletedContinuation && state.idempotentResume != null) {
+ // Cannot reuse continuation that was resumed with idempotent marker
detachChild()
return false
}
@@ -114,7 +120,6 @@ internal open class CancellableContinuationImpl(
if (checkCompleted()) return
if (parentHandle !== null) return // fast path 2 -- was already initialized
val parent = delegate.context[Job] ?: return // fast path 3 -- don't do anything without parent
- parent.start() // make sure the parent is started
val handle = parent.invokeOnCompletion(
onCancelling = true,
handler = ChildContinuation(parent, this).asHandler
@@ -130,7 +135,7 @@ internal open class CancellableContinuationImpl(
private fun checkCompleted(): Boolean {
val completed = isCompleted
- if (resumeMode != MODE_ATOMIC_DEFAULT) return completed // Do not check postponed cancellation for non-reusable continuations
+ if (!resumeMode.isReusableMode) return completed // Do not check postponed cancellation for non-reusable continuations
val dispatched = delegate as? DispatchedContinuation<*> ?: return completed
val cause = dispatched.checkPostponedCancellation(this) ?: return completed
if (!completed) {
@@ -147,10 +152,26 @@ internal open class CancellableContinuationImpl(
override fun takeState(): Any? = state
- override fun cancelResult(state: Any?, cause: Throwable) {
- if (state is CompletedWithCancellation) {
- invokeHandlerSafely {
- state.onCancellation(cause)
+ // Note: takeState does not clear the state so we don't use takenState
+ // and we use the actual current state where in CAS-loop
+ override fun cancelCompletedResult(takenState: Any?, cause: Throwable): Unit = _state.loop { state ->
+ when (state) {
+ is NotCompleted -> error("Not completed")
+ is CompletedExceptionally -> return // already completed exception or cancelled, nothing to do
+ is CompletedContinuation -> {
+ check(!state.cancelled) { "Must be called at most once" }
+ val update = state.copy(cancelCause = cause)
+ if (_state.compareAndSet(state, update)) {
+ state.invokeHandlers(this, cause)
+ return // done
+ }
+ }
+ else -> {
+ // completed normally without marker class, promote to CompletedContinuation in case
+ // if invokeOnCancellation if called later
+ if (_state.compareAndSet(state, CompletedContinuation(state, cancelCause = cause))) {
+ return // done
+ }
}
}
}
@@ -159,7 +180,7 @@ internal open class CancellableContinuationImpl(
* Attempt to postpone cancellation for reusable cancellable continuation
*/
private fun cancelLater(cause: Throwable): Boolean {
- if (resumeMode != MODE_ATOMIC_DEFAULT) return false
+ if (!resumeMode.isReusableMode) return false
val dispatched = (delegate as? DispatchedContinuation<*>) ?: return false
return dispatched.postponeCancellation(cause)
}
@@ -171,10 +192,10 @@ internal open class CancellableContinuationImpl(
val update = CancelledContinuation(this, cause, handled = state is CancelHandler)
if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
// Invoke cancel handler if it was present
- if (state is CancelHandler) invokeHandlerSafely { state.invoke(cause) }
+ (state as? CancelHandler)?.let { callCancelHandler(it, cause) }
// Complete state update
detachChildIfNonResuable()
- dispatchResume(mode = MODE_ATOMIC_DEFAULT)
+ dispatchResume(resumeMode) // no need for additional cancellation checks
return true
}
}
@@ -186,14 +207,36 @@ internal open class CancellableContinuationImpl(
detachChildIfNonResuable()
}
- private inline fun invokeHandlerSafely(block: () -> Unit) {
+ private inline fun callCancelHandlerSafely(block: () -> Unit) {
+ try {
+ block()
+ } catch (ex: Throwable) {
+ // Handler should never fail, if it does -- it is an unhandled exception
+ handleCoroutineException(
+ context,
+ CompletionHandlerException("Exception in invokeOnCancellation handler for $this", ex)
+ )
+ }
+ }
+
+ private fun callCancelHandler(handler: CompletionHandler, cause: Throwable?) =
+ /*
+ * :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
+ * because we play type tricks on Kotlin/JS and handler is not necessarily a function there
+ */
+ callCancelHandlerSafely { handler.invokeIt(cause) }
+
+ fun callCancelHandler(handler: CancelHandler, cause: Throwable?) =
+ callCancelHandlerSafely { handler.invoke(cause) }
+
+ fun callOnCancellation(onCancellation: (cause: Throwable) -> Unit, cause: Throwable) {
try {
- block()
+ onCancellation.invoke(cause)
} catch (ex: Throwable) {
// Handler should never fail, if it does -- it is an unhandled exception
handleCoroutineException(
context,
- CompletionHandlerException("Exception in cancellation handler for $this", ex)
+ CompletionHandlerException("Exception in resume onCancellation handler for $this", ex)
)
}
}
@@ -232,64 +275,75 @@ internal open class CancellableContinuationImpl(
val state = this.state
if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
// if the parent job was already cancelled, then throw the corresponding cancellation exception
- // otherwise, there is a race is suspendCancellableCoroutine { cont -> ... } does cont.resume(...)
+ // otherwise, there is a race if suspendCancellableCoroutine { cont -> ... } does cont.resume(...)
// before the block returns. This getResult would return a result as opposed to cancellation
// exception that should have happened if the continuation is dispatched for execution later.
- if (resumeMode == MODE_CANCELLABLE) {
+ if (resumeMode.isCancellableMode) {
val job = context[Job]
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
- cancelResult(state, cause)
+ cancelCompletedResult(state, cause)
throw recoverStackTrace(cause, this)
}
}
return getSuccessfulResult(state)
}
- override fun resumeWith(result: Result) {
+ override fun resumeWith(result: Result) =
resumeImpl(result.toState(this), resumeMode)
- }
- override fun resume(value: T, onCancellation: (cause: Throwable) -> Unit) {
- val cancelled = resumeImpl(CompletedWithCancellation(value, onCancellation), resumeMode)
- if (cancelled != null) {
- // too late to resume (was cancelled) -- call handler
- invokeHandlerSafely {
- onCancellation(cancelled.cause)
- }
- }
- }
+ override fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?) =
+ resumeImpl(value, resumeMode, onCancellation)
public override fun invokeOnCancellation(handler: CompletionHandler) {
- var handleCache: CancelHandler? = null
+ val cancelHandler = makeCancelHandler(handler)
_state.loop { state ->
when (state) {
is Active -> {
- val node = handleCache ?: makeHandler(handler).also { handleCache = it }
- if (_state.compareAndSet(state, node)) return // quit on cas success
+ if (_state.compareAndSet(state, cancelHandler)) return // quit on cas success
}
is CancelHandler -> multipleHandlersError(handler, state)
- is CancelledContinuation -> {
+ is CompletedExceptionally -> {
/*
- * Continuation was already cancelled, invoke directly.
+ * Continuation was already cancelled or completed exceptionally.
* NOTE: multiple invokeOnCancellation calls with different handlers are not allowed,
- * so we check to make sure that handler was installed just once.
+ * so we check to make sure handler was installed just once.
*/
if (!state.makeHandled()) multipleHandlersError(handler, state)
/*
+ * Call the handler only if it was cancelled (not called when completed exceptionally).
* :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
* because we play type tricks on Kotlin/JS and handler is not necessarily a function there
*/
- invokeHandlerSafely { handler.invokeIt((state as? CompletedExceptionally)?.cause) }
+ if (state is CancelledContinuation) {
+ callCancelHandler(handler, (state as? CompletedExceptionally)?.cause)
+ }
return
}
+ is CompletedContinuation -> {
+ /*
+ * Continuation was already completed, and might already have cancel handler.
+ */
+ if (state.cancelHandler != null) multipleHandlersError(handler, state)
+ // BeforeResumeCancelHandler does not need to be called on a completed continuation
+ if (cancelHandler is BeforeResumeCancelHandler) return
+ if (state.cancelled) {
+ // Was already cancelled while being dispatched -- invoke the handler directly
+ callCancelHandler(handler, state.cancelCause)
+ return
+ }
+ val update = state.copy(cancelHandler = cancelHandler)
+ if (_state.compareAndSet(state, update)) return // quit on cas success
+ }
else -> {
/*
- * Continuation was already completed, do nothing.
- * NOTE: multiple invokeOnCancellation calls with different handlers are not allowed,
- * but we have no way to check that it was installed just once in this case.
+ * Continuation was already completed normally, but might get cancelled while being dispatched.
+ * Change its state to CompletedContinuation, unless we have BeforeResumeCancelHandler which
+ * does not need to be called in this case.
*/
- return
+ if (cancelHandler is BeforeResumeCancelHandler) return
+ val update = CompletedContinuation(state, cancelHandler = cancelHandler)
+ if (_state.compareAndSet(state, update)) return // quit on cas success
}
}
}
@@ -299,7 +353,7 @@ internal open class CancellableContinuationImpl(
error("It's prohibited to register multiple handlers, tried to register $handler, already has $state")
}
- private fun makeHandler(handler: CompletionHandler): CancelHandler =
+ private fun makeCancelHandler(handler: CompletionHandler): CancelHandler =
if (handler is CancelHandler) handler else InvokeOnCancel(handler)
private fun dispatchResume(mode: Int) {
@@ -308,15 +362,39 @@ internal open class CancellableContinuationImpl(
dispatch(mode)
}
- // returns null when successfully dispatched resumed, CancelledContinuation if too late (was already cancelled)
- private fun resumeImpl(proposedUpdate: Any?, resumeMode: Int): CancelledContinuation? {
+ private fun resumedState(
+ state: NotCompleted,
+ proposedUpdate: Any?,
+ resumeMode: Int,
+ onCancellation: ((cause: Throwable) -> Unit)?,
+ idempotent: Any?
+ ): Any? = when {
+ proposedUpdate is CompletedExceptionally -> {
+ assert { idempotent == null } // there are no idempotent exceptional resumes
+ assert { onCancellation == null } // only successful results can be cancelled
+ proposedUpdate
+ }
+ !resumeMode.isCancellableMode && idempotent == null -> proposedUpdate // cannot be cancelled in process, all is fine
+ onCancellation != null || (state is CancelHandler && state !is BeforeResumeCancelHandler) || idempotent != null ->
+ // mark as CompletedContinuation if special cases are present:
+ // Cancellation handlers that shall be called after resume or idempotent resume
+ CompletedContinuation(proposedUpdate, state as? CancelHandler, onCancellation, idempotent)
+ else -> proposedUpdate // simple case -- use the value directly
+ }
+
+ private fun resumeImpl(
+ proposedUpdate: Any?,
+ resumeMode: Int,
+ onCancellation: ((cause: Throwable) -> Unit)? = null
+ ) {
_state.loop { state ->
when (state) {
is NotCompleted -> {
- if (!_state.compareAndSet(state, proposedUpdate)) return@loop // retry on cas failure
+ val update = resumedState(state, proposedUpdate, resumeMode, onCancellation, idempotent = null)
+ if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
detachChildIfNonResuable()
- dispatchResume(resumeMode)
- return null
+ dispatchResume(resumeMode) // dispatch resume, but it might get cancelled in process
+ return // done
}
is CancelledContinuation -> {
/*
@@ -324,14 +402,48 @@ internal open class CancellableContinuationImpl(
* because cancellation is asynchronous and may race with resume.
* Racy exceptions will be lost, too.
*/
- if (state.makeResumed()) return state // tried to resume just once, but was cancelled
+ if (state.makeResumed()) { // check if trying to resume one (otherwise error)
+ // call onCancellation
+ onCancellation?.let { callOnCancellation(it, state.cause) }
+ return // done
+ }
+ }
+ }
+ alreadyResumedError(proposedUpdate) // otherwise, an error (second resume attempt)
+ }
+ }
+
+ /**
+ * Similar to [tryResume], but does not actually completes resume (needs [completeResume] call).
+ * Returns [RESUME_TOKEN] when resumed, `null` when it was already resumed or cancelled.
+ */
+ private fun tryResumeImpl(
+ proposedUpdate: Any?,
+ idempotent: Any?,
+ onCancellation: ((cause: Throwable) -> Unit)?
+ ): Symbol? {
+ _state.loop { state ->
+ when (state) {
+ is NotCompleted -> {
+ val update = resumedState(state, proposedUpdate, resumeMode, onCancellation, idempotent)
+ if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
+ detachChildIfNonResuable()
+ return RESUME_TOKEN
+ }
+ is CompletedContinuation -> {
+ return if (idempotent != null && state.idempotentResume === idempotent) {
+ assert { state.result == proposedUpdate } // "Non-idempotent resume"
+ RESUME_TOKEN // resumed with the same token -- ok
+ } else {
+ null // resumed with a different token or non-idempotent -- too late
+ }
}
+ else -> return null // cannot resume -- not active anymore
}
- alreadyResumedError(proposedUpdate) // otherwise -- an error (second resume attempt)
}
}
- private fun alreadyResumedError(proposedUpdate: Any?) {
+ private fun alreadyResumedError(proposedUpdate: Any?): Nothing {
error("Already resumed, but proposed with update $proposedUpdate")
}
@@ -343,7 +455,7 @@ internal open class CancellableContinuationImpl(
/**
* Detaches from the parent.
- * Invariant: used used from [CoroutineDispatcher.releaseInterceptedContinuation] iff [isReusable] is `true`
+ * Invariant: used from [CoroutineDispatcher.releaseInterceptedContinuation] iff [isReusable] is `true`
*/
internal fun detachChild() {
val handle = parentHandle
@@ -352,42 +464,14 @@ internal open class CancellableContinuationImpl(
}
// Note: Always returns RESUME_TOKEN | null
- override fun tryResume(value: T, idempotent: Any?): Any? {
- _state.loop { state ->
- when (state) {
- is NotCompleted -> {
- val update: Any? = if (idempotent == null) value else
- CompletedIdempotentResult(idempotent, value)
- if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
- detachChildIfNonResuable()
- return RESUME_TOKEN
- }
- is CompletedIdempotentResult -> {
- return if (state.idempotentResume === idempotent) {
- assert { state.result === value } // "Non-idempotent resume"
- RESUME_TOKEN
- } else {
- null
- }
- }
- else -> return null // cannot resume -- not active anymore
- }
- }
- }
+ override fun tryResume(value: T, idempotent: Any?): Any? =
+ tryResumeImpl(value, idempotent, onCancellation = null)
- override fun tryResumeWithException(exception: Throwable): Any? {
- _state.loop { state ->
- when (state) {
- is NotCompleted -> {
- val update = CompletedExceptionally(exception)
- if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
- detachChildIfNonResuable()
- return RESUME_TOKEN
- }
- else -> return null // cannot resume -- not active anymore
- }
- }
- }
+ override fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any? =
+ tryResumeImpl(value, idempotent, onCancellation)
+
+ override fun tryResumeWithException(exception: Throwable): Any? =
+ tryResumeImpl(CompletedExceptionally(exception), idempotent = null, onCancellation = null)
// note: token is always RESUME_TOKEN
override fun completeResume(token: Any) {
@@ -408,11 +492,15 @@ internal open class CancellableContinuationImpl(
@Suppress("UNCHECKED_CAST")
override fun getSuccessfulResult(state: Any?): T =
when (state) {
- is CompletedIdempotentResult -> state.result as T
- is CompletedWithCancellation -> state.result as T
+ is CompletedContinuation -> state.result as T
else -> state as T
}
+ // The exceptional state in CancellableContinuationImpl is stored directly and it is not recovered yet.
+ // The stacktrace recovery is invoked here.
+ override fun getExceptionalResult(state: Any?): Throwable? =
+ super.getExceptionalResult(state)?.let { recoverStackTrace(it, delegate) }
+
// For nicer debugging
public override fun toString(): String =
"${nameString()}(${delegate.toDebugString()}){$state}@$hexAddress"
@@ -429,8 +517,20 @@ private object Active : NotCompleted {
override fun toString(): String = "Active"
}
+/**
+ * Base class for all [CancellableContinuation.invokeOnCancellation] handlers to avoid an extra instance
+ * on JVM, yet support JS where you cannot extend from a functional type.
+ */
internal abstract class CancelHandler : CancelHandlerBase(), NotCompleted
+/**
+ * Base class for all [CancellableContinuation.invokeOnCancellation] handlers that don't need to be invoked
+ * if continuation is cancelled after resumption, during dispatch, because the corresponding resources
+ * were already released before calling `resume`. This cancel handler is called only before `resume`.
+ * It avoids allocation of [CompletedContinuation] instance during resume on JVM.
+ */
+internal abstract class BeforeResumeCancelHandler : CancelHandler()
+
// Wrapper for lambdas, for the performance sake CancelHandler can be subclassed directly
private class InvokeOnCancel( // Clashes with InvokeOnCancellation
private val handler: CompletionHandler
@@ -441,16 +541,18 @@ private class InvokeOnCancel( // Clashes with InvokeOnCancellation
override fun toString() = "InvokeOnCancel[${handler.classSimpleName}@$hexAddress]"
}
-private class CompletedIdempotentResult(
- @JvmField val idempotentResume: Any?,
- @JvmField val result: Any?
-) {
- override fun toString(): String = "CompletedIdempotentResult[$result]"
-}
-
-private class CompletedWithCancellation(
+// Completed with additional metadata
+private data class CompletedContinuation(
@JvmField val result: Any?,
- @JvmField val onCancellation: (cause: Throwable) -> Unit
+ @JvmField val cancelHandler: CancelHandler? = null, // installed via invokeOnCancellation
+ @JvmField val onCancellation: ((cause: Throwable) -> Unit)? = null, // installed via resume block
+ @JvmField val idempotentResume: Any? = null,
+ @JvmField val cancelCause: Throwable? = null
) {
- override fun toString(): String = "CompletedWithCancellation[$result]"
+ val cancelled: Boolean get() = cancelCause != null
+
+ fun invokeHandlers(cont: CancellableContinuationImpl<*>, cause: Throwable) {
+ cancelHandler?.let { cont.callCancelHandler(it, cause) }
+ onCancellation?.let { cont.callOnCancellation(it, cause) }
+ }
}
diff --git a/kotlinx-coroutines-core/common/src/CompletableDeferred.kt b/kotlinx-coroutines-core/common/src/CompletableDeferred.kt
index d24f1837cd..0605817afa 100644
--- a/kotlinx-coroutines-core/common/src/CompletableDeferred.kt
+++ b/kotlinx-coroutines-core/common/src/CompletableDeferred.kt
@@ -19,7 +19,7 @@ import kotlinx.coroutines.selects.*
* All functions on this interface are **thread-safe** and can
* be safely invoked from concurrent coroutines without external synchronization.
*
- * **`CompletableDeferred` interface is not stable for inheritance in 3rd party libraries**,
+ * **The `CompletableDeferred` interface is not stable for inheritance in 3rd party libraries**,
* as new methods might be added to this interface in the future, but is stable for use.
*/
public interface CompletableDeferred : Deferred {
diff --git a/kotlinx-coroutines-core/common/src/CompletableJob.kt b/kotlinx-coroutines-core/common/src/CompletableJob.kt
index 8e6b1ab02f..74a92e36e5 100644
--- a/kotlinx-coroutines-core/common/src/CompletableJob.kt
+++ b/kotlinx-coroutines-core/common/src/CompletableJob.kt
@@ -11,7 +11,7 @@ package kotlinx.coroutines
* All functions on this interface are **thread-safe** and can
* be safely invoked from concurrent coroutines without external synchronization.
*
- * **`CompletableJob` interface is not stable for inheritance in 3rd party libraries**,
+ * **The `CompletableJob` interface is not stable for inheritance in 3rd party libraries**,
* as new methods might be added to this interface in the future, but is stable for use.
*/
public interface CompletableJob : Job {
diff --git a/kotlinx-coroutines-core/common/src/CompletedExceptionally.kt b/kotlinx-coroutines-core/common/src/CompletionState.kt
similarity index 78%
rename from kotlinx-coroutines-core/common/src/CompletedExceptionally.kt
rename to kotlinx-coroutines-core/common/src/CompletionState.kt
index b426785bd7..f09aa3ccd9 100644
--- a/kotlinx-coroutines-core/common/src/CompletedExceptionally.kt
+++ b/kotlinx-coroutines-core/common/src/CompletionState.kt
@@ -9,10 +9,17 @@ import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.jvm.*
-internal fun Result.toState(): Any? = fold({ it }, { CompletedExceptionally(it) })
+internal fun Result.toState(
+ onCancellation: ((cause: Throwable) -> Unit)? = null
+): Any? = fold(
+ onSuccess = { if (onCancellation != null) CompletedWithCancellation(it, onCancellation) else it },
+ onFailure = { CompletedExceptionally(it) }
+)
-internal fun Result.toState(caller: CancellableContinuation<*>): Any? = fold({ it },
- { CompletedExceptionally(recoverStackTrace(it, caller)) })
+internal fun Result.toState(caller: CancellableContinuation<*>): Any? = fold(
+ onSuccess = { it },
+ onFailure = { CompletedExceptionally(recoverStackTrace(it, caller)) }
+)
@Suppress("RESULT_CLASS_IN_RETURN_TYPE", "UNCHECKED_CAST")
internal fun recoverResult(state: Any?, uCont: Continuation): Result =
@@ -21,6 +28,11 @@ internal fun recoverResult(state: Any?, uCont: Continuation): Result =
else
Result.success(state as T)
+internal data class CompletedWithCancellation(
+ @JvmField val result: Any?,
+ @JvmField val onCancellation: (cause: Throwable) -> Unit
+)
+
/**
* Class for an internal state of a job that was cancelled (completed exceptionally).
*
diff --git a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt
index 1b6e7eb00f..ab1e814b8a 100644
--- a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt
+++ b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt
@@ -4,6 +4,7 @@
package kotlinx.coroutines
+import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
/**
diff --git a/kotlinx-coroutines-core/common/src/CoroutineScope.kt b/kotlinx-coroutines-core/common/src/CoroutineScope.kt
index 7dbd6a6d7b..0dde6c9352 100644
--- a/kotlinx-coroutines-core/common/src/CoroutineScope.kt
+++ b/kotlinx-coroutines-core/common/src/CoroutineScope.kt
@@ -226,10 +226,10 @@ public fun CoroutineScope.cancel(message: String, cause: Throwable? = null): Uni
/**
* Ensures that current scope is [active][CoroutineScope.isActive].
- * Throws [IllegalStateException] if the context does not have a job in it.
*
* If the job is no longer active, throws [CancellationException].
* If the job was cancelled, thrown exception contains the original cancellation cause.
+ * This function does not do anything if there is no [Job] in the scope's [coroutineContext][CoroutineScope.coroutineContext].
*
* This method is a drop-in replacement for the following code, but with more precise exception:
* ```
@@ -237,6 +237,8 @@ public fun CoroutineScope.cancel(message: String, cause: Throwable? = null): Uni
* throw CancellationException()
* }
* ```
+ *
+ * @see CoroutineContext.ensureActive
*/
public fun CoroutineScope.ensureActive(): Unit = coroutineContext.ensureActive()
diff --git a/kotlinx-coroutines-core/common/src/Deferred.kt b/kotlinx-coroutines-core/common/src/Deferred.kt
index 72f3fde141..ff996756a3 100644
--- a/kotlinx-coroutines-core/common/src/Deferred.kt
+++ b/kotlinx-coroutines-core/common/src/Deferred.kt
@@ -43,6 +43,8 @@ public interface Deferred : Job {
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
+ * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
+ * suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
*
* This function can be used in [select] invocation with [onAwait] clause.
* Use [isCompleted] to check for completion of this deferred value without waiting.
diff --git a/kotlinx-coroutines-core/common/src/Delay.kt b/kotlinx-coroutines-core/common/src/Delay.kt
index ab80912269..f7948443fa 100644
--- a/kotlinx-coroutines-core/common/src/Delay.kt
+++ b/kotlinx-coroutines-core/common/src/Delay.kt
@@ -21,9 +21,12 @@ import kotlin.time.*
public interface Delay {
/**
* Delays coroutine for a given time without blocking a thread and resumes it after a specified time.
+ *
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
+ * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
+ * suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
*/
public suspend fun delay(time: Long) {
if (time <= 0) return // don't delay
@@ -54,15 +57,57 @@ public interface Delay {
*
* This implementation uses a built-in single-threaded scheduled executor service.
*/
- public fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle =
- DefaultDelay.invokeOnTimeout(timeMillis, block)
+ public fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
+ DefaultDelay.invokeOnTimeout(timeMillis, block, context)
}
+/**
+ * Suspends until cancellation, in which case it will throw a [CancellationException].
+ *
+ * This function returns [Nothing], so it can be used in any coroutine,
+ * regardless of the required return type.
+ *
+ * Usage example in callback adapting code:
+ *
+ * ```kotlin
+ * fun currentTemperature(): Flow = callbackFlow {
+ * val callback = SensorCallback { degreesCelsius: Double ->
+ * trySend(Temperature.celsius(degreesCelsius))
+ * }
+ * try {
+ * registerSensorCallback(callback)
+ * awaitCancellation() // Suspends to keep getting updates until cancellation.
+ * } finally {
+ * unregisterSensorCallback(callback)
+ * }
+ * }
+ * ```
+ *
+ * Usage example in (non declarative) UI code:
+ *
+ * ```kotlin
+ * suspend fun showStuffUntilCancelled(content: Stuff): Nothing {
+ * someSubView.text = content.title
+ * anotherSubView.text = content.description
+ * someView.visibleInScope {
+ * awaitCancellation() // Suspends so the view stays visible.
+ * }
+ * }
+ * ```
+ */
+@ExperimentalCoroutinesApi
+public suspend fun awaitCancellation(): Nothing = suspendCancellableCoroutine {}
+
/**
* Delays coroutine for a given time without blocking a thread and resumes it after a specified time.
+ *
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
+ * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
+ * suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
+ *
+ * If you want to delay forever (until cancellation), consider using [awaitCancellation] instead.
*
* Note that delay can be used in [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
@@ -72,15 +117,23 @@ public interface Delay {
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation ->
- cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
+ // if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
+ if (timeMillis < Long.MAX_VALUE) {
+ cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
+ }
}
}
/**
* Delays coroutine for a given [duration] without blocking a thread and resumes it after the specified time.
+ *
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
+ * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
+ * suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
+ *
+ * If you want to delay forever (until cancellation), consider using [awaitCancellation] instead.
*
* Note that delay can be used in [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
diff --git a/kotlinx-coroutines-core/common/src/Timeout.kt b/kotlinx-coroutines-core/common/src/Timeout.kt
index c8e4455c92..4bfff118e8 100644
--- a/kotlinx-coroutines-core/common/src/Timeout.kt
+++ b/kotlinx-coroutines-core/common/src/Timeout.kt
@@ -24,7 +24,14 @@ import kotlin.time.*
* The sibling function that does not throw an exception on timeout is [withTimeoutOrNull].
* Note that the timeout action can be specified for a [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
- * Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
+ * **The timeout event is asynchronous with respect to the code running in the block** and may happen at any time,
+ * even right before the return from inside of the timeout [block]. Keep this in mind if you open or acquire some
+ * resource inside the [block] that needs closing or release outside of the block.
+ * See the
+ * [Asynchronous timeout and resources][https://kotlinlang.org/docs/reference/coroutines/cancellation-and-timeouts.html#asynchronous-timeout-and-resources]
+ * section of the coroutines guide for details.
+ *
+ * > Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
*
* @param timeMillis timeout time in milliseconds.
*/
@@ -48,7 +55,14 @@ public suspend fun withTimeout(timeMillis: Long, block: suspend CoroutineSco
* The sibling function that does not throw an exception on timeout is [withTimeoutOrNull].
* Note that the timeout action can be specified for a [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
- * Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
+ * **The timeout event is asynchronous with respect to the code running in the block** and may happen at any time,
+ * even right before the return from inside of the timeout [block]. Keep this in mind if you open or acquire some
+ * resource inside the [block] that needs closing or release outside of the block.
+ * See the
+ * [Asynchronous timeout and resources][https://kotlinlang.org/docs/reference/coroutines/cancellation-and-timeouts.html#asynchronous-timeout-and-resources]
+ * section of the coroutines guide for details.
+ *
+ * > Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
*/
@ExperimentalTime
public suspend fun withTimeout(timeout: Duration, block: suspend CoroutineScope.() -> T): T {
@@ -68,7 +82,14 @@ public suspend fun withTimeout(timeout: Duration, block: suspend CoroutineSc
* The sibling function that throws an exception on timeout is [withTimeout].
* Note that the timeout action can be specified for a [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
- * Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
+ * **The timeout event is asynchronous with respect to the code running in the block** and may happen at any time,
+ * even right before the return from inside of the timeout [block]. Keep this in mind if you open or acquire some
+ * resource inside the [block] that needs closing or release outside of the block.
+ * See the
+ * [Asynchronous timeout and resources][https://kotlinlang.org/docs/reference/coroutines/cancellation-and-timeouts.html#asynchronous-timeout-and-resources]
+ * section of the coroutines guide for details.
+ *
+ * > Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
*
* @param timeMillis timeout time in milliseconds.
*/
@@ -101,7 +122,14 @@ public suspend fun withTimeoutOrNull(timeMillis: Long, block: suspend Corout
* The sibling function that throws an exception on timeout is [withTimeout].
* Note that the timeout action can be specified for a [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
- * Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
+ * **The timeout event is asynchronous with respect to the code running in the block** and may happen at any time,
+ * even right before the return from inside of the timeout [block]. Keep this in mind if you open or acquire some
+ * resource inside the [block] that needs closing or release outside of the block.
+ * See the
+ * [Asynchronous timeout and resources][https://kotlinlang.org/docs/reference/coroutines/cancellation-and-timeouts.html#asynchronous-timeout-and-resources]
+ * section of the coroutines guide for details.
+ *
+ * > Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
*/
@ExperimentalTime
public suspend fun withTimeoutOrNull(timeout: Duration, block: suspend CoroutineScope.() -> T): T? =
@@ -114,7 +142,7 @@ private fun setupTimeout(
// schedule cancellation of this coroutine on time
val cont = coroutine.uCont
val context = cont.context
- coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine))
+ coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine, coroutine.context))
// restart the block using a new coroutine with a new job,
// however, start it undispatched, because we already are in the proper context
return coroutine.startUndispatchedOrReturnIgnoreTimeout(coroutine, block)
diff --git a/kotlinx-coroutines-core/common/src/Yield.kt b/kotlinx-coroutines-core/common/src/Yield.kt
index e0af04ddb7..0d8bd3bc2f 100644
--- a/kotlinx-coroutines-core/common/src/Yield.kt
+++ b/kotlinx-coroutines-core/common/src/Yield.kt
@@ -4,6 +4,7 @@
package kotlinx.coroutines
+import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
@@ -13,6 +14,8 @@ import kotlin.coroutines.intrinsics.*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed when this suspending function is invoked or while
* this function is waiting for dispatch, it resumes with a [CancellationException].
+ * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
+ * suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
*
* **Note**: This function always [checks for cancellation][ensureActive] even when it does not suspend.
*
diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
index 28c7ceabe1..53ecf06a2c 100644
--- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
@@ -16,7 +16,9 @@ import kotlin.native.concurrent.*
/**
* Abstract send channel. It is a base class for all send channel implementations.
*/
-internal abstract class AbstractSendChannel : SendChannel {
+internal abstract class AbstractSendChannel(
+ @JvmField protected val onUndeliveredElement: OnUndeliveredElement?
+) : SendChannel {
/** @suppress **This is unstable API and it is subject to change.** */
protected val queue = LockFreeLinkedListHead()
@@ -151,24 +153,34 @@ internal abstract class AbstractSendChannel : SendChannel {
// We should check for closed token on offer as well, otherwise offer won't be linearizable
// in the face of concurrent close()
// See https://github.com/Kotlin/kotlinx.coroutines/issues/359
- throw recoverStackTrace(helpCloseAndGetSendException(closedForSend ?: return false))
+ throw recoverStackTrace(helpCloseAndGetSendException(element, closedForSend ?: return false))
+ }
+ result is Closed<*> -> {
+ throw recoverStackTrace(helpCloseAndGetSendException(element, result))
}
- result is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(result))
else -> error("offerInternal returned $result")
}
}
- private fun helpCloseAndGetSendException(closed: Closed<*>): Throwable {
+ private fun helpCloseAndGetSendException(element: E, closed: Closed<*>): Throwable {
// To ensure linearizablity we must ALWAYS help close the channel when we observe that it was closed
// See https://github.com/Kotlin/kotlinx.coroutines/issues/1419
helpClose(closed)
+ // Element was not delivered -> cals onUndeliveredElement
+ onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
+ // If it crashes, add send exception as suppressed for better diagnostics
+ it.addSuppressed(closed.sendException)
+ throw it
+ }
return closed.sendException
}
- private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutineReusable sc@ { cont ->
+ private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutineReusable sc@ { cont ->
loop@ while (true) {
if (isFullImpl) {
- val send = SendElement(element, cont)
+ val send = if (onUndeliveredElement == null)
+ SendElement(element, cont) else
+ SendElementWithUndeliveredHandler(element, cont, onUndeliveredElement)
val enqueueResult = enqueueSend(send)
when {
enqueueResult == null -> { // enqueued successfully
@@ -176,7 +188,7 @@ internal abstract class AbstractSendChannel : SendChannel {
return@sc
}
enqueueResult is Closed<*> -> {
- cont.helpCloseAndResumeWithSendException(enqueueResult)
+ cont.helpCloseAndResumeWithSendException(element, enqueueResult)
return@sc
}
enqueueResult === ENQUEUE_FAILED -> {} // try to offer instead
@@ -193,7 +205,7 @@ internal abstract class AbstractSendChannel : SendChannel {
}
offerResult === OFFER_FAILED -> continue@loop
offerResult is Closed<*> -> {
- cont.helpCloseAndResumeWithSendException(offerResult)
+ cont.helpCloseAndResumeWithSendException(element, offerResult)
return@sc
}
else -> error("offerInternal returned $offerResult")
@@ -201,9 +213,15 @@ internal abstract class AbstractSendChannel : SendChannel {
}
}
- private fun Continuation<*>.helpCloseAndResumeWithSendException(closed: Closed<*>) {
+ private fun Continuation<*>.helpCloseAndResumeWithSendException(element: E, closed: Closed<*>) {
helpClose(closed)
- resumeWithException(closed.sendException)
+ val sendException = closed.sendException
+ onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
+ it.addSuppressed(sendException)
+ resumeWithException(it)
+ return
+ }
+ resumeWithException(sendException)
}
/**
@@ -375,7 +393,7 @@ internal abstract class AbstractSendChannel : SendChannel {
select.disposeOnSelect(node)
return
}
- enqueueResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(enqueueResult))
+ enqueueResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(element, enqueueResult))
enqueueResult === ENQUEUE_FAILED -> {} // try to offer
enqueueResult is Receive<*> -> {} // try to offer
else -> error("enqueueSend returned $enqueueResult ")
@@ -391,7 +409,7 @@ internal abstract class AbstractSendChannel : SendChannel {
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
return
}
- offerResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(offerResult))
+ offerResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(element, offerResult))
else -> error("offerSelectInternal returned $offerResult")
}
}
@@ -431,7 +449,7 @@ internal abstract class AbstractSendChannel : SendChannel {
// ------ private ------
private class SendSelect(
- override val pollResult: Any?,
+ override val pollResult: E, // E | Closed - the result pollInternal returns when it rendezvous with this node
@JvmField val channel: AbstractSendChannel,
@JvmField val select: SelectInstance,
@JvmField val block: suspend (SendChannel) -> R
@@ -440,11 +458,13 @@ internal abstract class AbstractSendChannel : SendChannel {
select.trySelectOther(otherOp) as Symbol? // must return symbol
override fun completeResumeSend() {
- block.startCoroutine(receiver = channel, completion = select.completion)
+ block.startCoroutineCancellable(receiver = channel, completion = select.completion)
}
override fun dispose() { // invoked on select completion
- remove()
+ if (!remove()) return
+ // if the node was successfully removed (meaning it was added but was not received) then element not delivered
+ undeliveredElement()
}
override fun resumeSendClosed(closed: Closed<*>) {
@@ -452,6 +472,10 @@ internal abstract class AbstractSendChannel : SendChannel {
select.resumeSelectWithException(closed.sendException)
}
+ override fun undeliveredElement() {
+ channel.onUndeliveredElement?.callUndeliveredElement(pollResult, select.completion.context)
+ }
+
override fun toString(): String = "SendSelect@$hexAddress($pollResult)[$channel, $select]"
}
@@ -469,7 +493,9 @@ internal abstract class AbstractSendChannel : SendChannel {
/**
* Abstract send/receive channel. It is a base class for all channel implementations.
*/
-internal abstract class AbstractChannel : AbstractSendChannel(), Channel {
+internal abstract class AbstractChannel(
+ onUndeliveredElement: OnUndeliveredElement?
+) : AbstractSendChannel(onUndeliveredElement), Channel {
// ------ extension points for buffered channels ------
/**
@@ -501,6 +527,8 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel : AbstractSendChannel(), Channel receiveSuspend(receiveMode: Int): R = suspendAtomicCancellableCoroutineReusable sc@ { cont ->
- val receive = ReceiveElement(cont as CancellableContinuation, receiveMode)
+ private suspend fun receiveSuspend(receiveMode: Int): R = suspendCancellableCoroutineReusable sc@ { cont ->
+ val receive = if (onUndeliveredElement == null)
+ ReceiveElement(cont as CancellableContinuation, receiveMode) else
+ ReceiveElementWithUndeliveredHandler(cont as CancellableContinuation, receiveMode, onUndeliveredElement)
while (true) {
if (enqueueReceive(receive)) {
removeReceiveOnCancel(cont, receive)
@@ -561,7 +591,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel : AbstractSendChannel(), Channel
@@ -785,7 +820,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel, receive: Receive<*>) =
cont.invokeOnCancellation(handler = RemoveReceiveOnCancel(receive).asHandler)
- private inner class RemoveReceiveOnCancel(private val receive: Receive<*>) : CancelHandler() {
+ private inner class RemoveReceiveOnCancel(private val receive: Receive<*>) : BeforeResumeCancelHandler() {
override fun invoke(cause: Throwable?) {
if (receive.remove())
onReceiveDequeued()
@@ -793,7 +828,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel(val channel: AbstractChannel) : ChannelIterator {
+ private class Itr(@JvmField val channel: AbstractChannel) : ChannelIterator {
var result: Any? = POLL_FAILED // E | POLL_FAILED | Closed
override suspend fun hasNext(): Boolean {
@@ -814,7 +849,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel
+ private suspend fun hasNextSuspend(): Boolean = suspendCancellableCoroutineReusable sc@ { cont ->
val receive = ReceiveHasNext(this, cont)
while (true) {
if (channel.enqueueReceive(receive)) {
@@ -832,7 +867,8 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel : AbstractSendChannel(), Channel(
+ private open class ReceiveElement(
@JvmField val cont: CancellableContinuation,
@JvmField val receiveMode: Int
) : Receive() {
@@ -860,9 +896,8 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel value
}
- @Suppress("IMPLICIT_CAST_TO_ANY")
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
- val token = cont.tryResume(resumeValue(value), otherOp?.desc) ?: return null
+ val token = cont.tryResume(resumeValue(value), otherOp?.desc, resumeOnCancellationFun(value)) ?: return null
assert { token === RESUME_TOKEN } // the only other possible result
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
otherOp?.finishPrepare()
@@ -881,12 +916,22 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel(
+ private class ReceiveElementWithUndeliveredHandler(
+ cont: CancellableContinuation,
+ receiveMode: Int,
+ @JvmField val onUndeliveredElement: OnUndeliveredElement
+ ) : ReceiveElement(cont, receiveMode) {
+ override fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? =
+ onUndeliveredElement.bindCancellationFun(value, cont.context)
+ }
+
+ private open class ReceiveHasNext(
@JvmField val iterator: Itr,
@JvmField val cont: CancellableContinuation
) : Receive() {
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
- val token = cont.tryResume(true, otherOp?.desc) ?: return null
+ val token = cont.tryResume(true, otherOp?.desc, resumeOnCancellationFun(value))
+ ?: return null
assert { token === RESUME_TOKEN } // the only other possible result
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
otherOp?.finishPrepare()
@@ -906,13 +951,17 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel Unit)? =
+ iterator.channel.onUndeliveredElement?.bindCancellationFun(value, cont.context)
+
override fun toString(): String = "ReceiveHasNext@$hexAddress"
}
@@ -927,16 +976,20 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel) {
if (!select.trySelect()) return
when (receiveMode) {
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException)
- RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed.closed(closed.closeCause), select.completion)
+ RECEIVE_RESULT -> block.startCoroutineCancellable(ValueOrClosed.closed(closed.closeCause), select.completion)
RECEIVE_NULL_ON_CLOSE -> if (closed.closeCause == null) {
- block.startCoroutine(null, select.completion)
+ block.startCoroutineCancellable(null, select.completion)
} else {
select.resumeSelectWithException(closed.receiveException)
}
@@ -948,6 +1001,9 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel Unit)? =
+ channel.onUndeliveredElement?.bindCancellationFun(value, select.completion.context)
+
override fun toString(): String = "ReceiveSelect@$hexAddress[$select,receiveMode=$receiveMode]"
}
}
@@ -959,23 +1015,27 @@ internal const val RECEIVE_RESULT = 2
@JvmField
@SharedImmutable
-internal val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
+internal val EMPTY = Symbol("EMPTY") // marker for Conflated & Buffered channels
+
+@JvmField
+@SharedImmutable
+internal val OFFER_SUCCESS = Symbol("OFFER_SUCCESS")
@JvmField
@SharedImmutable
-internal val OFFER_FAILED: Any = Symbol("OFFER_FAILED")
+internal val OFFER_FAILED = Symbol("OFFER_FAILED")
@JvmField
@SharedImmutable
-internal val POLL_FAILED: Any = Symbol("POLL_FAILED")
+internal val POLL_FAILED = Symbol("POLL_FAILED")
@JvmField
@SharedImmutable
-internal val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
+internal val ENQUEUE_FAILED = Symbol("ENQUEUE_FAILED")
@JvmField
@SharedImmutable
-internal val HANDLER_INVOKED: Any = Symbol("ON_CLOSE_HANDLER_INVOKED")
+internal val HANDLER_INVOKED = Symbol("ON_CLOSE_HANDLER_INVOKED")
internal typealias Handler = (Throwable?) -> Unit
@@ -983,7 +1043,7 @@ internal typealias Handler = (Throwable?) -> Unit
* Represents sending waiter in the queue.
*/
internal abstract class Send : LockFreeLinkedListNode() {
- abstract val pollResult: Any? // E | Closed
+ abstract val pollResult: Any? // E | Closed - the result pollInternal returns when it rendezvous with this node
// Returns: null - failure,
// RETRY_ATOMIC for retry (only when otherOp != null),
// RESUME_TOKEN on success (call completeResumeSend)
@@ -991,6 +1051,7 @@ internal abstract class Send : LockFreeLinkedListNode() {
abstract fun tryResumeSend(otherOp: PrepareOp?): Symbol?
abstract fun completeResumeSend()
abstract fun resumeSendClosed(closed: Closed<*>)
+ open fun undeliveredElement() {}
}
/**
@@ -1009,9 +1070,8 @@ internal interface ReceiveOrClosed {
/**
* Represents sender for a specific element.
*/
-@Suppress("UNCHECKED_CAST")
-internal class SendElement(
- override val pollResult: Any?,
+internal open class SendElement(
+ override val pollResult: E,
@JvmField val cont: CancellableContinuation
) : Send() {
override fun tryResumeSend(otherOp: PrepareOp?): Symbol? {
@@ -1021,9 +1081,27 @@ internal class SendElement(
otherOp?.finishPrepare() // finish preparations
return RESUME_TOKEN
}
+
override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN)
override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
- override fun toString(): String = "SendElement@$hexAddress($pollResult)"
+ override fun toString(): String = "$classSimpleName@$hexAddress($pollResult)"
+}
+
+internal class SendElementWithUndeliveredHandler(
+ pollResult: E,
+ cont: CancellableContinuation,
+ @JvmField val onUndeliveredElement: OnUndeliveredElement
+) : SendElement(pollResult, cont) {
+ override fun remove(): Boolean {
+ if (!super.remove()) return false
+ // if the node was successfully removed (meaning it was added but was not received) then we have undelivered element
+ undeliveredElement()
+ return true
+ }
+
+ override fun undeliveredElement() {
+ onUndeliveredElement.callUndeliveredElement(pollResult, cont.context)
+ }
}
/**
@@ -1048,6 +1126,7 @@ internal class Closed(
internal abstract class Receive : LockFreeLinkedListNode(), ReceiveOrClosed {
override val offerResult get() = OFFER_SUCCESS
abstract fun resumeReceiveClosed(closed: Closed<*>)
+ open fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? = null
}
@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt
index 155652fd6f..91b5473c41 100644
--- a/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt
@@ -28,7 +28,7 @@ internal class ArrayBroadcastChannel(
* Buffer capacity.
*/
val capacity: Int
-) : AbstractSendChannel(), BroadcastChannel {
+) : AbstractSendChannel(null), BroadcastChannel {
init {
require(capacity >= 1) { "ArrayBroadcastChannel capacity must be at least 1, but $capacity was specified" }
}
@@ -180,6 +180,8 @@ internal class ArrayBroadcastChannel(
this.tail = tail + 1
return@withLock // go out of lock to wakeup this sender
}
+ // Too late, already cancelled, but we removed it from the queue and need to release resources.
+ // However, ArrayBroadcastChannel does not support onUndeliveredElement, so nothing to do
}
}
}
@@ -205,7 +207,7 @@ internal class ArrayBroadcastChannel(
private class Subscriber(
private val broadcastChannel: ArrayBroadcastChannel
- ) : AbstractChannel(), ReceiveChannel {
+ ) : AbstractChannel(null), ReceiveChannel {
private val subLock = ReentrantLock()
private val _subHead = atomic(0L)
diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
index e26579eff7..80cb8aa011 100644
--- a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
@@ -23,25 +23,31 @@ internal open class ArrayChannel(
/**
* Buffer capacity.
*/
- val capacity: Int
-) : AbstractChannel() {
+ private val capacity: Int,
+ private val onBufferOverflow: BufferOverflow,
+ onUndeliveredElement: OnUndeliveredElement?
+) : AbstractChannel(onUndeliveredElement) {
init {
+ // This check is actually used by the Channel(...) constructor function which checks only for known
+ // capacities and calls ArrayChannel constructor for everything else.
require(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" }
}
private val lock = ReentrantLock()
+
/*
* Guarded by lock.
* Allocate minimum of capacity and 16 to avoid excess memory pressure for large channels when it's not necessary.
*/
- private var buffer: Array = arrayOfNulls(min(capacity, 8))
+ private var buffer: Array = arrayOfNulls(min(capacity, 8)).apply { fill(EMPTY) }
+
private var head: Int = 0
private val size = atomic(0) // Invariant: size <= capacity
protected final override val isBufferAlwaysEmpty: Boolean get() = false
protected final override val isBufferEmpty: Boolean get() = size.value == 0
protected final override val isBufferAlwaysFull: Boolean get() = false
- protected final override val isBufferFull: Boolean get() = size.value == capacity
+ protected final override val isBufferFull: Boolean get() = size.value == capacity && onBufferOverflow == BufferOverflow.SUSPEND
override val isFull: Boolean get() = lock.withLock { isFullImpl }
override val isEmpty: Boolean get() = lock.withLock { isEmptyImpl }
@@ -53,31 +59,26 @@ internal open class ArrayChannel(
lock.withLock {
val size = this.size.value
closedForSend?.let { return it }
- if (size < capacity) {
- // tentatively put element to buffer
- this.size.value = size + 1 // update size before checking queue (!!!)
- // check for receivers that were waiting on empty queue
- if (size == 0) {
- loop@ while (true) {
- receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
- if (receive is Closed) {
- this.size.value = size // restore size
- return receive!!
- }
- val token = receive!!.tryResumeReceive(element, null)
- if (token != null) {
- assert { token === RESUME_TOKEN }
- this.size.value = size // restore size
- return@withLock
- }
+ // update size before checking queue (!!!)
+ updateBufferSize(size)?.let { return it }
+ // check for receivers that were waiting on empty queue
+ if (size == 0) {
+ loop@ while (true) {
+ receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
+ if (receive is Closed) {
+ this.size.value = size // restore size
+ return receive!!
+ }
+ val token = receive!!.tryResumeReceive(element, null)
+ if (token != null) {
+ assert { token === RESUME_TOKEN }
+ this.size.value = size // restore size
+ return@withLock
}
}
- ensureCapacity(size)
- buffer[(head + size) % buffer.size] = element // actually queue element
- return OFFER_SUCCESS
}
- // size == capacity: full
- return OFFER_FAILED
+ enqueueElement(size, element)
+ return OFFER_SUCCESS
}
// breaks here if offer meets receiver
receive!!.completeResumeReceive(element)
@@ -90,41 +91,36 @@ internal open class ArrayChannel(
lock.withLock {
val size = this.size.value
closedForSend?.let { return it }
- if (size < capacity) {
- // tentatively put element to buffer
- this.size.value = size + 1 // update size before checking queue (!!!)
- // check for receivers that were waiting on empty queue
- if (size == 0) {
- loop@ while (true) {
- val offerOp = describeTryOffer(element)
- val failure = select.performAtomicTrySelect(offerOp)
- when {
- failure == null -> { // offered successfully
- this.size.value = size // restore size
- receive = offerOp.result
- return@withLock
- }
- failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
- failure === RETRY_ATOMIC -> {} // retry
- failure === ALREADY_SELECTED || failure is Closed<*> -> {
- this.size.value = size // restore size
- return failure
- }
- else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
+ // update size before checking queue (!!!)
+ updateBufferSize(size)?.let { return it }
+ // check for receivers that were waiting on empty queue
+ if (size == 0) {
+ loop@ while (true) {
+ val offerOp = describeTryOffer(element)
+ val failure = select.performAtomicTrySelect(offerOp)
+ when {
+ failure == null -> { // offered successfully
+ this.size.value = size // restore size
+ receive = offerOp.result
+ return@withLock
+ }
+ failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
+ failure === RETRY_ATOMIC -> {} // retry
+ failure === ALREADY_SELECTED || failure is Closed<*> -> {
+ this.size.value = size // restore size
+ return failure
}
+ else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
}
}
- // let's try to select sending this element to buffer
- if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
- this.size.value = size // restore size
- return ALREADY_SELECTED
- }
- ensureCapacity(size)
- buffer[(head + size) % buffer.size] = element // actually queue element
- return OFFER_SUCCESS
}
- // size == capacity: full
- return OFFER_FAILED
+ // let's try to select sending this element to buffer
+ if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
+ this.size.value = size // restore size
+ return ALREADY_SELECTED
+ }
+ enqueueElement(size, element)
+ return OFFER_SUCCESS
}
// breaks here if offer meets receiver
receive!!.completeResumeReceive(element)
@@ -135,6 +131,35 @@ internal open class ArrayChannel(
super.enqueueSend(send)
}
+ // Guarded by lock
+ // Result is `OFFER_SUCCESS | OFFER_FAILED | null`
+ private fun updateBufferSize(currentSize: Int): Symbol? {
+ if (currentSize < capacity) {
+ size.value = currentSize + 1 // tentatively put it into the buffer
+ return null // proceed
+ }
+ // buffer is full
+ return when (onBufferOverflow) {
+ BufferOverflow.SUSPEND -> OFFER_FAILED
+ BufferOverflow.DROP_LATEST -> OFFER_SUCCESS
+ BufferOverflow.DROP_OLDEST -> null // proceed, will drop oldest in enqueueElement
+ }
+ }
+
+ // Guarded by lock
+ private fun enqueueElement(currentSize: Int, element: E) {
+ if (currentSize < capacity) {
+ ensureCapacity(currentSize)
+ buffer[(head + currentSize) % buffer.size] = element // actually queue element
+ } else {
+ // buffer is full
+ assert { onBufferOverflow == BufferOverflow.DROP_OLDEST } // the only way we can get here
+ buffer[head % buffer.size] = null // drop oldest element
+ buffer[(head + currentSize) % buffer.size] = element // actually queue element
+ head = (head + 1) % buffer.size
+ }
+ }
+
// Guarded by lock
private fun ensureCapacity(currentSize: Int) {
if (currentSize >= buffer.size) {
@@ -143,6 +168,7 @@ internal open class ArrayChannel(
for (i in 0 until currentSize) {
newBuffer[i] = buffer[(head + i) % buffer.size]
}
+ newBuffer.fill(EMPTY, currentSize, newSize)
buffer = newBuffer
head = 0
}
@@ -172,6 +198,8 @@ internal open class ArrayChannel(
replacement = send!!.pollResult
break@loop
}
+ // too late, already cancelled, but we removed it from the queue and need to notify on undelivered element
+ send!!.undeliveredElement()
}
}
if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
@@ -254,17 +282,23 @@ internal open class ArrayChannel(
// Note: this function is invoked when channel is already closed
override fun onCancelIdempotent(wasClosed: Boolean) {
// clear buffer first, but do not wait for it in helpers
- if (wasClosed) {
- lock.withLock {
- repeat(size.value) {
- buffer[head] = 0
- head = (head + 1) % buffer.size
+ val onUndeliveredElement = onUndeliveredElement
+ var undeliveredElementException: UndeliveredElementException? = null // first cancel exception, others suppressed
+ lock.withLock {
+ repeat(size.value) {
+ val value = buffer[head]
+ if (onUndeliveredElement != null && value !== EMPTY) {
+ @Suppress("UNCHECKED_CAST")
+ undeliveredElementException = onUndeliveredElement.callUndeliveredElementCatchingException(value as E, undeliveredElementException)
}
- size.value = 0
+ buffer[head] = EMPTY
+ head = (head + 1) % buffer.size
}
+ size.value = 0
}
// then clean all queued senders
super.onCancelIdempotent(wasClosed)
+ undeliveredElementException?.let { throw it } // throw cancel exception at the end if there was one
}
// ------ debug ------
diff --git a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt
index 863d1387fc..790580e0a3 100644
--- a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt
@@ -10,7 +10,6 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.intrinsics.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
-import kotlin.native.concurrent.*
/**
* Broadcasts all elements of the channel.
@@ -34,8 +33,10 @@ import kotlin.native.concurrent.*
*
* This function has an inappropriate result type of [BroadcastChannel] which provides
* [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
- * the broadcasting coroutine in hard-to-specify ways. It will be replaced with
- * sharing operators on [Flow][kotlinx.coroutines.flow.Flow] in the future.
+ * the broadcasting coroutine in hard-to-specify ways.
+ *
+ * **Note: This API is obsolete.** It will be deprecated and replaced with the
+ * [Flow.shareIn][kotlinx.coroutines.flow.shareIn] operator when it becomes stable.
*
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
*/
diff --git a/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt
index 312480f943..d356566f17 100644
--- a/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt
@@ -7,9 +7,9 @@
package kotlinx.coroutines.channels
import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
+import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
/**
@@ -20,9 +20,10 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
* See `BroadcastChannel()` factory function for the description of available
* broadcast channel implementations.
*
- * **Note: This is an experimental api.** It may be changed in the future updates.
+ * **Note: This API is obsolete.** It will be deprecated and replaced by [SharedFlow][kotlinx.coroutines.flow.SharedFlow]
+ * when it becomes stable.
*/
-@ExperimentalCoroutinesApi
+@ExperimentalCoroutinesApi // not @ObsoleteCoroutinesApi to reduce burden for people who are still using it
public interface BroadcastChannel : SendChannel {
/**
* Subscribes to this [BroadcastChannel] and returns a channel to receive elements from it.
diff --git a/kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt b/kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt
new file mode 100644
index 0000000000..99994ea81b
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.channels
+
+import kotlinx.coroutines.*
+
+/**
+ * A strategy for buffer overflow handling in [channels][Channel] and [flows][kotlinx.coroutines.flow.Flow] that
+ * controls what is going to be sacrificed on buffer overflow:
+ *
+ * * [SUSPEND] — the upstream that is [sending][SendChannel.send] or
+ * is [emitting][kotlinx.coroutines.flow.FlowCollector.emit] a value is **suspended** while the buffer is full.
+ * * [DROP_OLDEST] — drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
+ * * [DROP_LATEST] — drop **the latest** value that is being added to the buffer right now on buffer overflow
+ * (so that buffer contents stay the same), do not suspend.
+ */
+@ExperimentalCoroutinesApi
+public enum class BufferOverflow {
+ /**
+ * Suspend on buffer overflow.
+ */
+ SUSPEND,
+
+ /**
+ * Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
+ */
+ DROP_OLDEST,
+
+ /**
+ * Drop **the latest** value that is being added to the buffer right now on buffer overflow
+ * (so that buffer contents stay the same), do not suspend.
+ */
+ DROP_LATEST
+}
diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt
index c4b4a9b25e..72c08e1acd 100644
--- a/kotlinx-coroutines-core/common/src/channels/Channel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt
@@ -44,19 +44,17 @@ public interface SendChannel {
* Sends the specified [element] to this channel, suspending the caller while the buffer of this channel is full
* or if it does not exist, or throws an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details).
*
- * Note that closing a channel _after_ this function has suspended does not cause this suspended [send] invocation
+ * [Closing][close] a channel _after_ this function has suspended does not cause this suspended [send] invocation
* to abort, because closing a channel is conceptually like sending a special "close token" over this channel.
* All elements sent over the channel are delivered in first-in first-out order. The sent element
* will be delivered to receivers before the close token.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with a [CancellationException].
- *
- * *Cancellation of suspended `send` is atomic*: when this function
- * throws a [CancellationException], it means that the [element] was not sent to this channel.
- * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
- * continue to execute even after it was cancelled from the same thread in the case when this `send` operation
- * was already resumed and the continuation was posted for execution to the thread's queue.
+ * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
+ * suspended, it will not resume successfully. The `send` call can send the element to the channel,
+ * but then throw [CancellationException], thus an exception should not be treated as a failure to deliver the element.
+ * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
*
* Note that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
@@ -81,6 +79,11 @@ public interface SendChannel {
* in situations when `send` suspends.
*
* Throws an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details).
+ *
+ * When `offer` call returns `false` it guarantees that the element was not delivered to the consumer and it
+ * it does not call `onUndeliveredElement` that was installed for this channel. If the channel was closed,
+ * then it calls `onUndeliveredElement` before throwing an exception.
+ * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
*/
public fun offer(element: E): Boolean
@@ -170,12 +173,10 @@ public interface ReceiveChannel