Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tickerFlow implementation #1908

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 33 additions & 0 deletions kotlinx-coroutines-core/jvm/src/flow/TickerFlow.kt
@@ -0,0 +1,33 @@
package kotlinx.coroutines.flow

import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.awaitClose
import java.util.*
import kotlin.concurrent.schedule

/**
* Creates a flow that produces the first item after the given initial delay and subsequent items with the
* given delay between them.
*
* The resulting flow is a callback flow, which basically listens @see [Timer.schedule]
*
* This Flow stops producing elements immediately after [Job.cancel] invocation.
*
* @param period period between each element in milliseconds.
* @param initialDelay delay after which the first element will be produced (it is equal to [period] by default) in milliseconds.
*/
public fun tickerFlow(
period: Long,
initialDelay: Long = period
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To follow the example of functions like withTimeout and debounce, I think we should add the Millis suffix to these parameter names, and maybe provide an overload using kotlin.time.Duration?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point for naming and we have .receiveAsFlow() extension function.

Directly we can use kotlinx-coroutine's ticker. Also this can be better idea to have single implementation.
What do you think about it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could indeed be done this way, but that would couple it to JVM platform 😥
I am not sure a channel would be necessary behind the scenes to implement it, but I will let the maintainers comment on this.

EDIT: moved part of this comment to the relevant conversation

): Flow<Unit> = callbackFlow {
require(period > 0)
require(initialDelay > -1)

val timer = Timer()
timer.schedule(initialDelay, period) {
offer(Unit)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation uses a background thread and is JVM-specific.
Couldn't we simply use a loop with delay() and make it multiplatform? Or am I naive here?
I fail to see the benefit of running another thread here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! very good point.
Also ticker implementation can be moved to commonMain.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see now why a concurrent behaviour is necessary and would like to backtrack a bit: we can't use a simple loop with delays because otherwise the ticker delay is impacted by the execution time of the collector code, which is probably not desirable in this case.

That's why we do need some concurrent process (a coroutine?) to send the ticks.
This still doesn't mean we have to use a thread and Java's timer.
There may be a multiplatform way to do this.

I'll let the maintainers of the lib comment on this, though


awaitClose { timer.cancel() }
}

111 changes: 111 additions & 0 deletions kotlinx-coroutines-core/jvm/test/flow/TickerFlowTest.kt
@@ -0,0 +1,111 @@
package flow

import kotlinx.coroutines.TestBase
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.tickerFlow
import java.util.concurrent.CancellationException
import kotlin.test.Test
import kotlin.test.assertEquals


class TickerFlowTest : TestBase() {

@Test(expected = IllegalArgumentException::class)
fun testNegativePeriod() = runTest {
// WHEN
tickerFlow(-1).launchIn(this)
}

@Test(expected = IllegalArgumentException::class)
fun testZeroPeriod() = runTest {
// WHEN
tickerFlow(0).launchIn(this)
}

@Test(expected = IllegalArgumentException::class)
fun testNegativeInitialDelay() = runTest {
// WHEN
tickerFlow(100, -1).launchIn(this)
}

@Test
fun testInitialDelay() = runTest {
// GIVEN
val inbox = mutableListOf<Unit>()

// WHEN
val periodicTicker =
tickerFlow(100, 100).onEach {
inbox.add(Unit)
}.launchIn(this)

delay(500)

// THEN
assertEquals(4, inbox.size)

periodicTicker.cancelAndJoin()
}

@Test
fun testZeroInitialDelay() = runTest {
// GIVEN
val inbox = mutableListOf<Unit>()

// WHEN
val periodicTicker =
tickerFlow(100, 0).onEach {
inbox.add(Unit)
}.launchIn(this)

delay(500)

// THEN
assertEquals(5, inbox.size)

periodicTicker.cancelAndJoin()
}


@Test
fun testReceive() = runTest {
// GIVEN
val inbox = mutableListOf<Unit>()

// WHEN
val periodicTicker =
tickerFlow(100).onEach {
inbox.add(Unit)
}.launchIn(this)

delay(500)

// THEN
assertEquals(4, inbox.size)

periodicTicker.cancelAndJoin()
}

@Test
fun testDoNotReceiveAfterCancel() = runTest {
// GIVEN
val inbox = mutableListOf<Unit>()

// WHEN
val periodicTicker =
tickerFlow(100).onEach {
inbox.add(Unit)
}.launchIn(this)

delay(50)
periodicTicker.cancel(CancellationException())

// THEN
assertEquals(0, inbox.size)
}


}