diff --git a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api index 9400aa58ce8..ac070cde2b0 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api +++ b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api @@ -94,6 +94,7 @@ public final class arrow/fx/coroutines/CircuitBreaker$State$Open : arrow/fx/coro } public abstract class arrow/fx/coroutines/ExitCase { + public static final field Companion Larrow/fx/coroutines/ExitCase$Companion; } public final class arrow/fx/coroutines/ExitCase$Cancelled : arrow/fx/coroutines/ExitCase { @@ -107,6 +108,10 @@ public final class arrow/fx/coroutines/ExitCase$Cancelled : arrow/fx/coroutines/ public fun toString ()Ljava/lang/String; } +public final class arrow/fx/coroutines/ExitCase$Companion { + public final fun ExitCase (Ljava/lang/Throwable;)Larrow/fx/coroutines/ExitCase; +} + public final class arrow/fx/coroutines/ExitCase$Completed : arrow/fx/coroutines/ExitCase { public static final field INSTANCE Larrow/fx/coroutines/ExitCase$Completed; public fun toString ()Ljava/lang/String; @@ -303,6 +308,7 @@ public final class arrow/fx/coroutines/Race3Kt { public abstract class arrow/fx/coroutines/Resource { public static final field Companion Larrow/fx/coroutines/Resource$Companion; + public final fun allocated (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun ap (Larrow/fx/coroutines/Resource;)Larrow/fx/coroutines/Resource; public final fun flatMap (Lkotlin/jvm/functions/Function1;)Larrow/fx/coroutines/Resource; public final fun map (Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Resource; diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Bracket.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Bracket.kt index d92e4be3343..d9273f3ef95 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Bracket.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Bracket.kt @@ -13,6 +13,11 @@ public sealed class ExitCase { public data class Cancelled(val exception: CancellationException) : ExitCase() public data class Failure(val failure: Throwable) : ExitCase() + + public companion object { + public fun ExitCase(error: Throwable): ExitCase = + if (error is CancellationException) Cancelled(error) else Failure(error) + } } /** diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt index 8fef5fbe0b6..e622f634cf1 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt @@ -4,14 +4,15 @@ import arrow.core.continuations.AtomicRef import arrow.core.continuations.update import arrow.core.identity import arrow.core.prependTo +import arrow.fx.coroutines.ExitCase.Companion.ExitCase import arrow.fx.coroutines.continuations.ResourceScope -import kotlin.coroutines.CoroutineContext -import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow import kotlinx.coroutines.withContext +import kotlin.coroutines.CoroutineContext import kotlin.experimental.ExperimentalTypeInference /** @@ -180,9 +181,8 @@ public sealed class Resource { val a = dsl(effect) f(a) } catch (e: Throwable) { - val ex = if (e is CancellationException) ExitCase.Cancelled(e) else ExitCase.Failure(e) val ee = withContext(NonCancellable) { - effect.finalizers.get().cancelAll(ex, e) ?: e + effect.finalizers.get().cancelAll(ExitCase(e), e) ?: e } throw ee } @@ -318,7 +318,7 @@ public sealed class Resource { public inline fun zip( b: Resource, c: Resource, - crossinline map: (A, B, C) -> D, + crossinline map: (A, B, C) -> D ): Resource = arrow.fx.coroutines.continuations.resource { map(bind(), b.bind(), c.bind()) @@ -328,7 +328,7 @@ public sealed class Resource { b: Resource, c: Resource, d: Resource, - crossinline map: (A, B, C, D) -> E, + crossinline map: (A, B, C, D) -> E ): Resource = arrow.fx.coroutines.continuations.resource { map(bind(), b.bind(), c.bind(), d.bind()) @@ -339,7 +339,7 @@ public sealed class Resource { c: Resource, d: Resource, e: Resource, - crossinline map: (A, B, C, D, E) -> G, + crossinline map: (A, B, C, D, E) -> G ): Resource = arrow.fx.coroutines.continuations.resource { map(bind(), b.bind(), c.bind(), d.bind(), e.bind()) @@ -351,7 +351,7 @@ public sealed class Resource { d: Resource, e: Resource, f: Resource, - crossinline map: (A, B, C, D, E, F) -> G, + crossinline map: (A, B, C, D, E, F) -> G ): Resource = arrow.fx.coroutines.continuations.resource { map(bind(), b.bind(), c.bind(), d.bind(), e.bind(), f.bind()) @@ -364,7 +364,7 @@ public sealed class Resource { e: Resource, f: Resource, g: Resource, - crossinline map: (A, B, C, D, E, F, G) -> H, + crossinline map: (A, B, C, D, E, F, G) -> H ): Resource = arrow.fx.coroutines.continuations.resource { map(bind(), b.bind(), c.bind(), d.bind(), e.bind(), f.bind(), g.bind()) @@ -378,7 +378,7 @@ public sealed class Resource { f: Resource, g: Resource, h: Resource, - crossinline map: (A, B, C, D, E, F, G, H) -> I, + crossinline map: (A, B, C, D, E, F, G, H) -> I ): Resource = arrow.fx.coroutines.continuations.resource { map(bind(), b.bind(), c.bind(), d.bind(), e.bind(), f.bind(), g.bind(), h.bind()) @@ -393,7 +393,7 @@ public sealed class Resource { g: Resource, h: Resource, i: Resource, - crossinline map: (A, B, C, D, E, F, G, H, I) -> J, + crossinline map: (A, B, C, D, E, F, G, H, I) -> J ): Resource = arrow.fx.coroutines.continuations.resource { map(bind(), b.bind(), c.bind(), d.bind(), e.bind(), f.bind(), g.bind(), h.bind(), i.bind()) @@ -409,7 +409,7 @@ public sealed class Resource { h: Resource, i: Resource, j: Resource, - crossinline map: (A, B, C, D, E, F, G, H, I, J) -> K, + crossinline map: (A, B, C, D, E, F, G, H, I, J) -> K ): Resource = arrow.fx.coroutines.continuations.resource { map(bind(), b.bind(), c.bind(), d.bind(), e.bind(), f.bind(), g.bind(), h.bind(), i.bind(), j.bind()) @@ -464,12 +464,74 @@ public sealed class Resource { public fun parZip( ctx: CoroutineContext = Dispatchers.Default, fb: Resource, - f: suspend (A, B) -> C, + f: suspend (A, B) -> C ): Resource = arrow.fx.coroutines.continuations.resource { parZip(ctx, { this@Resource.bind() }, { fb.bind() }) { a, b -> f(a, b) } } + /** + * Deconstruct [Resource] into an `acquire` and `release` handlers. + * The `release` action **must** always be called with resource [A] returned from `acquire`, + * if the `release` step is never called, then the resource [A] will leak. The `acquire` and `release` + * steps are already made `NonCancellable` to guarantee correct invocation like `Resource` or `bracketCase`. + * + * ```kotlin + * import arrow.fx.coroutines.* + * import arrow.fx.coroutines.ExitCase.Companion.ExitCase + * + * val resource = Resource({ println("Acquire") }) { _, exitCase -> + * println("Release $exitCase") + * } + * + * suspend fun main(): Unit { + * val (acquire, release) = resource.allocated() + * val a = acquire() + * try { + * /** Do something with A */ + * release(a, ExitCase.Completed) + * } catch(e: Throwable) { + * val e2 = runCatching { release(a, ExitCase(e)) }.exceptionOrNull() + * throw Platform.composeErrors(e, e2) + * } + * } + * ``` + * + * + * This is a **delicate** API. It is easy to accidentally create resource or memory leaks `allocated` is used. + * A `Resource` allocated by `allocated` is not subject to the guarantees that [Resource] makes, + * instead the caller is responsible for correctly invoking the `release` handler at the appropriate time. + * This API is useful for building inter-op APIs between [Resource] and non-suspending code, such as Java libraries. + */ + @DelicateCoroutinesApi + public suspend fun allocated(): Pair A, suspend (@UnsafeVariance A, ExitCase) -> Unit> = + when (this) { + is Bind<*, A> -> + Dsl { + val any = source.bind() + val ff = f as (Any?) -> Resource + ff(any).bind() + }.allocated() + is Allocate -> acquire to release + is Defer -> resource().allocated() + is Dsl -> { + val effect = ResourceScopeImpl() + val allocated = try { + val allocate: suspend () -> A = suspend { dsl(effect) } + val release: suspend (A, ExitCase) -> Unit = { _, e -> + effect.finalizers.get().cancelAll(e)?.let { throw it } + } + allocate to release + } catch (e: Throwable) { + val ee = withContext(NonCancellable) { + effect.finalizers.get().cancelAll(ExitCase(e), e) ?: e + } + throw ee + } + allocated + } + } + @Deprecated( "Bind is being deprecated. Use resource DSL instead", ReplaceWith( @@ -481,7 +543,7 @@ public sealed class Resource { public class Allocate( public val acquire: suspend () -> A, - public val release: suspend (A, ExitCase) -> Unit, + public val release: suspend (A, ExitCase) -> Unit ) : Resource() @Deprecated( @@ -517,11 +579,11 @@ public sealed class Resource { * } * } * ``` - * + * */ public operator fun invoke( acquire: suspend () -> A, - release: suspend (A, ExitCase) -> Unit, + release: suspend (A, ExitCase) -> Unit ): Resource = Allocate(acquire, release) /** @@ -571,7 +633,7 @@ public sealed class Resource { * println(res) * } * ``` - * + * */ @Deprecated( "Use the resource computation DSL instead", @@ -661,7 +723,7 @@ public inline fun Iterable.traverseResource(crossinline f: (A) -> Reso * res.forEach(::println) * } * ``` - * + * */ @OptIn(ExperimentalTypeInference::class) @OverloadResolutionByLambdaReturnType @@ -706,7 +768,7 @@ public inline fun Iterable.traverse(crossinline f: (A) -> Resource) * res.forEach(::println) * } * ``` - * + * */ @Suppress("NOTHING_TO_INLINE") public inline fun Iterable>.sequence(): Resource> = @@ -775,7 +837,7 @@ private class ResourceScopeImpl : ResourceScope { private suspend fun List Unit>.cancelAll( exitCase: ExitCase, - first: Throwable? = null, + first: Throwable? = null ): Throwable? = fold(first) { acc, finalizer -> val other = kotlin.runCatching { finalizer(exitCase) }.exceptionOrNull() other?.let { diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt index f68e052ed3d..318607a2f6e 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt @@ -3,7 +3,9 @@ package arrow.fx.coroutines import arrow.core.Either import arrow.core.identity import arrow.core.left +import arrow.fx.coroutines.ExitCase.Companion.ExitCase import io.kotest.assertions.fail +import io.kotest.inspectors.forAll import io.kotest.matchers.collections.shouldContainExactly import io.kotest.matchers.should import io.kotest.matchers.shouldBe @@ -21,6 +23,7 @@ import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.async import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList +import kotlin.random.Random class ResourceTest : ArrowFxSpec( spec = { @@ -217,15 +220,19 @@ class ResourceTest : ArrowFxSpec( val res = if (isLeft) Resource({ latch.await() shouldBe (1..depth).sum() throw cancel - }) { _, _ -> }.parZip(resource.flatMap { - Resource({ require(latch.complete(it)) }) { _, _ -> } - }) { _, _ -> } + }) { _, _ -> }.parZip( + resource.flatMap { + Resource({ require(latch.complete(it)) }) { _, _ -> } + } + ) { _, _ -> } else resource.flatMap { Resource({ require(latch.complete(it)) }) { _, _ -> } - }.parZip(Resource({ - latch.await() shouldBe (1..depth).sum() - throw cancel - }) { _, _ -> }) { _, _ -> } + }.parZip( + Resource({ + latch.await() shouldBe (1..depth).sum() + throw cancel + }) { _, _ -> } + ) { _, _ -> } res.use { fail("It should never reach here") } }.shouldBeTypeOf() @@ -265,9 +272,11 @@ class ResourceTest : ArrowFxSpec( started.await() throw cancel }) { _, _ -> } - .parZip(Resource({ require(started.complete(Unit)); i }, { ii, ex -> - require(released.complete(ii to ex)) - })) { _, _ -> } + .parZip( + Resource({ require(started.complete(Unit)); i }, { ii, ex -> + require(released.complete(ii to ex)) + }) + ) { _, _ -> } .use { fail("It should never reach here") } }.shouldBeTypeOf() @@ -310,7 +319,8 @@ class ResourceTest : ArrowFxSpec( Resource( { require(started.complete(Unit)); i }, { ii, ex -> require(released.complete(ii to ex)) } - )) { _, _ -> } + ) + ) { _, _ -> } .use { fail("It should never reach here") } } shouldBe throwable @@ -426,10 +436,12 @@ class ResourceTest : ArrowFxSpec( modifyGate.await() r.update { i -> "$i$a" } }) { _, _ -> } - .parZip(Resource({ - r.set("$b") - require(modifyGate.complete(0)) - }) { _, _ -> }) { _a, _b -> _a to _b } + .parZip( + Resource({ + r.set("$b") + require(modifyGate.complete(0)) + }) { _, _ -> } + ) { _a, _b -> _a to _b } .use { r.get() shouldBe "$b$a" } @@ -443,6 +455,47 @@ class ResourceTest : ArrowFxSpec( r.asFlow().map { it + 1 }.toList() shouldBe listOf(n + 1) } } + + suspend fun checkAllocated(mkResource: (() -> Int, (Int, ExitCase) -> Unit) -> Resource) { + listOf( + ExitCase.Completed, + ExitCase.Failure(Exception()), + ExitCase.Cancelled(CancellationException(null, null)) + ).forAll { exit -> + val released = CompletableDeferred() + val seed = Random.nextInt() + + val (allocate, release) = mkResource({ seed }) { i, _ -> released.complete(i) }.allocated() + + release(allocate(), exit) + + released.getCompleted() shouldBe seed + } + } + + "allocated - Allocate" { + checkAllocated { allocate, release -> + Resource(allocate, release) + } + } + + "allocated - Defer" { + checkAllocated { allocate, release -> + Resource.defer { Resource(allocate, release) } + } + } + + "allocated - Bind" { + checkAllocated { allocate, release -> + Resource.Bind(Resource(allocate, release)) { Resource.just(it) } + } + } + + "allocated - Dsl" { + checkAllocated { allocate, close -> + arrow.fx.coroutines.continuations.resource { allocate() } releaseCase (close) + } + } } ) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/examples/example-resource-08.kt b/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/examples/example-resource-08.kt index b1743db3bdf..fa5813146e6 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/examples/example-resource-08.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/examples/example-resource-08.kt @@ -2,13 +2,20 @@ package arrow.fx.coroutines.examples.exampleResource08 import arrow.fx.coroutines.* +import arrow.fx.coroutines.ExitCase.Companion.ExitCase -suspend fun acquireResource(): Int = 42.also { println("Getting expensive resource") } -suspend fun releaseResource(r: Int, exitCase: ExitCase): Unit = println("Releasing expensive resource: $r, exit: $exitCase") +val resource = Resource({ println("Acquire") }) { _, exitCase -> + println("Release $exitCase") +} suspend fun main(): Unit { - val resource = Resource(::acquireResource, ::releaseResource) - resource.use { - println("Expensive resource under use! $it") + val (acquire, release) = resource.allocated() + val a = acquire() + try { + /** Do something with A */ + release(a, ExitCase.Completed) + } catch(e: Throwable) { + val e2 = runCatching { release(a, ExitCase(e)) }.exceptionOrNull() + throw Platform.composeErrors(e, e2) } } diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/examples/example-resource-09.kt b/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/examples/example-resource-09.kt index 7b9020cc4c0..d492b4e97b7 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/examples/example-resource-09.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/examples/example-resource-09.kt @@ -3,24 +3,12 @@ package arrow.fx.coroutines.examples.exampleResource09 import arrow.fx.coroutines.* -class File(url: String) { - suspend fun open(): File = this - suspend fun close(): Unit {} - override fun toString(): String = "This file contains some interesting content!" -} - -suspend fun openFile(uri: String): File = File(uri).open() -suspend fun closeFile(file: File): Unit = file.close() -suspend fun fileToString(file: File): String = file.toString() +suspend fun acquireResource(): Int = 42.also { println("Getting expensive resource") } +suspend fun releaseResource(r: Int, exitCase: ExitCase): Unit = println("Releasing expensive resource: $r, exit: $exitCase") suspend fun main(): Unit { - val res = resource { - openFile("data.json") - } release { file -> - closeFile(file) - } use { file -> - fileToString(file) + val resource = Resource(::acquireResource, ::releaseResource) + resource.use { + println("Expensive resource under use! $it") } - - println(res) } diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/examples/example-resource-10.kt b/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/examples/example-resource-10.kt index 57e567884b4..d0bd9907f85 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/examples/example-resource-10.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/examples/example-resource-10.kt @@ -14,18 +14,13 @@ suspend fun closeFile(file: File): Unit = file.close() suspend fun fileToString(file: File): String = file.toString() suspend fun main(): Unit { - val res: List = listOf( - "data.json", - "user.json", - "resource.json" - ).traverse { uri -> - resource { - openFile(uri) - } release { file -> - closeFile(file) - } - }.use { files -> - files.map { fileToString(it) } + val res = resource { + openFile("data.json") + } release { file -> + closeFile(file) + } use { file -> + fileToString(file) } - res.forEach(::println) + + println(res) } diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/examples/example-resource-11.kt b/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/examples/example-resource-11.kt index 3987d148522..7a2ed86df12 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/examples/example-resource-11.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/examples/example-resource-11.kt @@ -18,13 +18,13 @@ suspend fun main(): Unit { "data.json", "user.json", "resource.json" - ).map { uri -> + ).traverse { uri -> resource { openFile(uri) } release { file -> closeFile(file) } - }.sequence().use { files -> + }.use { files -> files.map { fileToString(it) } } res.forEach(::println) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/examples/example-resource-12.kt b/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/examples/example-resource-12.kt new file mode 100644 index 00000000000..ffcb368f565 --- /dev/null +++ b/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/examples/example-resource-12.kt @@ -0,0 +1,31 @@ +// This file was automatically generated from Resource.kt by Knit tool. Do not edit. +package arrow.fx.coroutines.examples.exampleResource12 + +import arrow.fx.coroutines.* + +class File(url: String) { + suspend fun open(): File = this + suspend fun close(): Unit {} + override fun toString(): String = "This file contains some interesting content!" +} + +suspend fun openFile(uri: String): File = File(uri).open() +suspend fun closeFile(file: File): Unit = file.close() +suspend fun fileToString(file: File): String = file.toString() + +suspend fun main(): Unit { + val res: List = listOf( + "data.json", + "user.json", + "resource.json" + ).map { uri -> + resource { + openFile(uri) + } release { file -> + closeFile(file) + } + }.sequence().use { files -> + files.map { fileToString(it) } + } + res.forEach(::println) +}