Skip to content

Commit

Permalink
Hot buffered flow is required
Browse files Browse the repository at this point in the history
  • Loading branch information
greggiacovelli committed Dec 22, 2021
1 parent 345cff7 commit 5ee26f6
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package com.tinder.streamadapter.coroutines

import com.tinder.scarlet.Stream
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.trySendBlocking

private const val DEFAULT_BUFFER = 128
internal class ChannelForwarder<T> : Stream.Observer<T> {
private val _channel = Channel<T>(DEFAULT_BUFFER)
val channel: ReceiveChannel<T> = _channel
internal class ChannelForwarder<T>(bufferSize: Int) : Stream.Observer<T> {
private val channel = Channel<T>(bufferSize, BufferOverflow.DROP_OLDEST)
private var disposable: Stream.Disposable? = null

fun start(stream: Stream<T>): ReceiveChannel<T> {
Expand All @@ -17,17 +16,17 @@ internal class ChannelForwarder<T> : Stream.Observer<T> {
}

override fun onComplete() {
_channel.close()
channel.close()
disposable?.dispose()
}

override fun onError(throwable: Throwable) {
_channel.close(throwable)
channel.close(throwable)
disposable?.dispose()
}

override fun onNext(data: T) {
_channel.trySendBlocking(data)
channel.trySendBlocking(data)
.exceptionOrNull() ?.let { throw it }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ import java.lang.reflect.Type
/**
* A [stream adapter factory][StreamAdapter.Factory] that uses ReceiveChannel.
*/
class CoroutinesStreamAdapterFactory : StreamAdapter.Factory {
private const val DEFAULT_BUFFER = 128

class CoroutinesStreamAdapterFactory(
private val bufferSize: Int = DEFAULT_BUFFER
) : StreamAdapter.Factory {

override fun create(type: Type): StreamAdapter<Any, Any> {
return when (type.getRawType()) {
Flow::class.java -> FlowStreamAdapter()
ReceiveChannel::class.java -> ReceiveChannelAdapter()
Flow::class.java -> FlowStreamAdapter(bufferSize)
ReceiveChannel::class.java -> ReceiveChannelAdapter(bufferSize)
else -> throw IllegalArgumentException()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ package com.tinder.streamadapter.coroutines
import com.tinder.scarlet.Stream
import com.tinder.scarlet.StreamAdapter
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.receiveAsFlow

class FlowStreamAdapter<T> : StreamAdapter<T, Flow<T>> {
class FlowStreamAdapter<T>(private val buffer: Int) : StreamAdapter<T, Flow<T>> {

override fun adapt(stream: Stream<T>): Flow<T> {
val channelForwarder = ChannelForwarder<T>()
return channelForwarder.start(stream).receiveAsFlow()
return ChannelForwarder<T>(buffer).start(stream).receiveAsFlow()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import com.tinder.scarlet.Stream
import com.tinder.scarlet.StreamAdapter
import kotlinx.coroutines.channels.ReceiveChannel

class ReceiveChannelAdapter<T> : StreamAdapter<T, ReceiveChannel<T>> {
class ReceiveChannelAdapter<T>(private val buffer: Int) : StreamAdapter<T, ReceiveChannel<T>> {

override fun adapt(stream: Stream<T>): ReceiveChannel<T> {
val channelForwarder = ChannelForwarder<T>()
val channelForwarder = ChannelForwarder<T>(buffer)
return channelForwarder.start(stream)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class FlowStreamAdapterTest {
}

@Test
fun `adapt - given a stream of strings, provides a Flow interface bound to the stream`() = runTest {
fun `adapt - given a stream of strings, provides a Flow interface bound to the stream`() = runTest(dispatchTimeoutMs = 1000) {
// Given
val (client, server) = clientServerModel.givenConnectionIsEstablished()
val textMessage1 = "Hello"
Expand Down

0 comments on commit 5ee26f6

Please sign in to comment.