Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tickerFlow implementation #1908

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 30 additions & 0 deletions kotlinx-coroutines-core/jvm/src/flow/TickerFlow.kt
@@ -0,0 +1,30 @@
package kotlinx.coroutines.flow

import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.TickerMode
import kotlinx.coroutines.channels.ticker
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

/**
* Creates a flow that produces the first item after the given initial delay and subsequent items with the
* given delay between them.
*
* The resulting flow is basically using [ticker]
*
* This Flow stops producing elements immediately after [Job.cancel] invocation.
*
* @param delayMillis delay between each element in milliseconds.
* @param initialDelayMillis delay after which the first element will be produced (it is equal to [delayMillis] by default) in milliseconds.
* @param context context of the producing coroutine.
* @param mode specifies behavior when elements are not received ([FIXED_PERIOD][TickerMode.FIXED_PERIOD] by default).
*/
public fun tickerFlow(
delayMillis: Long,
initialDelayMillis: Long = delayMillis,
context: CoroutineContext = EmptyCoroutineContext,
mode: TickerMode = TickerMode.FIXED_PERIOD
): Flow<Unit> {
require(delayMillis > 0)
return ticker(delayMillis, initialDelayMillis, context, mode).receiveAsFlow()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the channel is hidden here, we probably want to handle cancellation properly.
Using consumeAsFlow() instead of receiveAsFlow() should do the trick I think.

}
99 changes: 99 additions & 0 deletions kotlinx-coroutines-core/jvm/test/flow/TickerFlowTest.kt
@@ -0,0 +1,99 @@
package flow

import kotlinx.coroutines.TestBase
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.tickerFlow
import java.util.concurrent.CancellationException
import kotlin.test.Test
import kotlin.test.assertEquals


class TickerFlowTest : TestBase() {

@Test(expected = IllegalArgumentException::class)
fun testNegativePeriod() = runTest {
// WHEN
tickerFlow(-1).launchIn(this)
}

@Test(expected = IllegalArgumentException::class)
fun testZeroPeriod() = runTest {
// WHEN
tickerFlow(0).launchIn(this)
}

@Test(expected = IllegalArgumentException::class)
fun testNegativeInitialDelay() = runTest {
// WHEN
tickerFlow(100, -1).launchIn(this)
}

@Test
fun testInitialDelay() = runTest {
// GIVEN
val inbox = mutableListOf<Unit>()

// WHEN
tickerFlow(100, 200).onEach {
inbox.add(Unit)
}.launchIn(this)

delay(500)

// THEN
assertEquals(4, inbox.size)
}

@Test
fun testZeroInitialDelay() = runTest {
// GIVEN
val inbox = mutableListOf<Unit>()

// WHEN
tickerFlow(100, 0).onEach {
inbox.add(Unit)
}.launchIn(this)

delay(500)

// THEN
assertEquals(6, inbox.size)
}


@Test
fun testReceive() = runTest {
// GIVEN
val inbox = mutableListOf<Unit>()

// WHEN
tickerFlow(100).onEach {
inbox.add(Unit)
}.launchIn(this)

delay(500)

// THEN
assertEquals(5, inbox.size)
}

@Test
fun testDoNotReceiveAfterCancel() = runTest {
// GIVEN
val inbox = mutableListOf<Unit>()

// WHEN
val periodicTicker =
tickerFlow(100).onEach {
inbox.add(Unit)
}.launchIn(this)

delay(50)
periodicTicker.cancel(CancellationException())

// THEN
assertEquals(0, inbox.size)
}
}