diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt index 19334ea706..155652fd6f 100644 --- a/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt @@ -218,13 +218,15 @@ internal class ArrayBroadcastChannel( override val isBufferAlwaysFull: Boolean get() = error("Should not be used") override val isBufferFull: Boolean get() = error("Should not be used") - override fun onCancelIdempotent(wasClosed: Boolean) { + override fun close(cause: Throwable?): Boolean { + val wasClosed = super.close(cause) if (wasClosed) { broadcastChannel.updateHead(removeSub = this) subLock.withLock { subHead = broadcastChannel.tail } } + return wasClosed } // returns true if subHead was updated and broadcast channel's head must be checked diff --git a/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelLeakTest.kt b/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelLeakTest.kt new file mode 100644 index 0000000000..66b08c74e4 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelLeakTest.kt @@ -0,0 +1,34 @@ +package kotlinx.coroutines.channels + +import kotlinx.coroutines.* +import org.junit.Test +import kotlin.test.* + +class BroadcastChannelLeakTest : TestBase() { + @Test + fun testArrayBroadcastChannelSubscriptionLeak() { + checkLeak { ArrayBroadcastChannel(1) } + } + + @Test + fun testConflatedBroadcastChannelSubscriptionLeak() { + checkLeak { ConflatedBroadcastChannel() } + } + + enum class TestKind { BROADCAST_CLOSE, SUB_CANCEL, BOTH } + + private fun checkLeak(factory: () -> BroadcastChannel) = runTest { + for (kind in TestKind.values()) { + val broadcast = factory() + val sub = broadcast.openSubscription() + broadcast.send("OK") + assertEquals("OK", sub.receive()) + // now close broadcast + if (kind != TestKind.SUB_CANCEL) broadcast.close() + // and then cancel subscription + if (kind != TestKind.BROADCAST_CLOSE) sub.cancel() + // subscription should not be reachable from the channel anymore + FieldWalker.assertReachableCount(0, broadcast) { it === sub } + } + } +} \ No newline at end of file