Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated Kotlin and the libs for coroutines #200

Merged
merged 14 commits into from
Jan 30, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 7 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@ jobs:
name: Disable PreDexing
command: echo "disablePreDex" >> gradle.properties
- run: if [ -e ./gradlew ]; then ./gradlew dependencies;else gradle dependencies;fi
- run: mkdir -p test-reports/junit/
- run: ./gradlew test
- run: mkdir -p $CIRCLE_TEST_REPORTS/junit/
- run: find . -type f -regex ".*/build/test-results/.*xml" -exec cp {} $CIRCLE_TEST_REPORTS/junit/ \;
- run:
name: gather_test_results
command: find . -type f -regex ".*/build/test-results/.*xml" -exec cp {} test-reports/junit/ \;
when: always
- store_test_results:
path: test-reports
- run:
name: Deploy Snapshot
command: ./publish.sh
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Top-level build file where you can add configuration options common to all sub-projects/modules.

buildscript {
ext.kotlin_version = '1.4.31'
ext.kotlin_version = '1.5.30'
ext.dokka_version = '0.9.16'
repositories {
mavenCentral()
Expand All @@ -10,7 +10,7 @@ buildscript {
}
dependencies {
classpath 'com.vanniktech:gradle-maven-publish-plugin:0.8.0'
classpath 'com.android.tools.build:gradle:4.1.2'
classpath 'com.android.tools.build:gradle:4.1.3'
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
}
}
Expand Down
1 change: 1 addition & 0 deletions demo/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
implementation project(':scarlet-message-adapter-moshi')
implementation project(':scarlet-message-adapter-protobuf')
implementation project(':scarlet-stream-adapter-rxjava2')
implementation project(':scarlet-stream-adapter-coroutines')

implementation libs.appCompat
implementation libs.material
Expand Down
10 changes: 5 additions & 5 deletions demo/src/main/java/com/tinder/app/echo/api/EchoService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@ import com.tinder.scarlet.Event
import com.tinder.scarlet.State
import com.tinder.scarlet.ws.Receive
import com.tinder.scarlet.ws.Send
import io.reactivex.Flowable
import kotlinx.coroutines.flow.Flow

interface EchoService {
@Receive
fun observeState(): Flowable<State>
fun observeState(): Flow<State>

@Receive
fun observeEvent(): Flowable<Event>
fun observeEvent(): Flow<Event>

@Receive
fun observeText(): Flowable<String>
fun observeText(): Flow<String>

@Receive
fun observeBitmap(): Flowable<Bitmap>
fun observeBitmap(): Flow<Bitmap>

@Send
fun sendText(message: String): Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import com.tinder.scarlet.State
import com.tinder.scarlet.WebSocket
import io.reactivex.Flowable
import io.reactivex.processors.BehaviorProcessor
import io.reactivex.schedulers.Schedulers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import org.joda.time.DateTime
import timber.log.Timber
import java.util.concurrent.atomic.AtomicInteger
Expand All @@ -30,8 +33,7 @@ class ChatMessageRepository @Inject constructor(

init {
echoService.observeEvent()
.observeOn(Schedulers.io())
.subscribe({ event ->
.onEach { event ->
val description = when (event) {
is Event.OnLifecycle.StateChange<*> -> when (event.state) {
Lifecycle.State.Started -> "\uD83C\uDF1D On Lifecycle Start"
Expand All @@ -57,39 +59,38 @@ class ChatMessageRepository @Inject constructor(
}
Event.OnRetry -> "⏰ On Retry"
}
val chatMessage = ChatMessage.Text(generateMessageId(), description, ChatMessage.Source.RECEIVED)
val chatMessage =
ChatMessage.Text(generateMessageId(), description, ChatMessage.Source.RECEIVED)
addChatMessage(chatMessage)
}, { e ->
}.catch { e ->
Timber.e(e)
})
}.launchIn(GlobalScope)
greggiacovelli marked this conversation as resolved.
Show resolved Hide resolved

echoService.observeText()
.observeOn(Schedulers.io())
.subscribe({ text ->
.onEach { text ->
val chatMessage = ChatMessage.Text(
generateMessageId(),
text,
ChatMessage.Source.RECEIVED,
DateTime.now().plusMillis(50)
)
addChatMessage(chatMessage)
}, { e ->
}.catch { e ->
Timber.e(e)
})
}.launchIn(GlobalScope)

echoService.observeBitmap()
.observeOn(Schedulers.io())
.subscribe({ bitmap ->
.onEach { bitmap ->
val chatMessage = ChatMessage.Image(
generateMessageId(),
bitmap,
ChatMessage.Source.RECEIVED,
DateTime.now().plusMillis(50)
)
addChatMessage(chatMessage)
}, { e ->
}.catch { e ->
Timber.e(e)
})
}.launchIn(GlobalScope)
}

fun observeChatMessage(): Flowable<List<ChatMessage>> = messagesProcessor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import com.tinder.scarlet.Scarlet
import com.tinder.scarlet.lifecycle.android.AndroidLifecycle
import com.tinder.scarlet.streamadapter.rxjava2.RxJava2StreamAdapterFactory
import com.tinder.scarlet.websocket.okhttp.newWebSocketFactory
import com.tinder.streamadapter.coroutines.CoroutinesStreamAdapterFactory
import dagger.Component
import dagger.Module
import dagger.Provides
Expand Down Expand Up @@ -59,6 +60,7 @@ interface EchoBotComponent {
.lifecycle(lifecycle)
.addMessageAdapterFactory(BitmapMessageAdapter.Factory())
.addStreamAdapterFactory(RxJava2StreamAdapterFactory())
.addStreamAdapterFactory(CoroutinesStreamAdapterFactory())
.build()
return scarlet.create()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class GdaxFragment : Fragment(), GdaxTarget {
setDrawValues(false)
}

val minPrice = priceEntries.minBy { it.y }?.y ?: 0F
val minPrice = priceEntries.minByOrNull { it.y }?.y ?: 0F
val minPriceEntries = listOf(
Entry(minutesAgo.millisOfDay.toFloat(), minPrice),
Entry(now.millisOfDay.toFloat(), minPrice)
Expand Down
7 changes: 5 additions & 2 deletions dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ ext.versions = [
rxAndroid: '2.0.2',
rxKotlin: '2.2.0',
rxJava1: '1.3.4',
kotlinxCoroutines: '1.4.3',
kotlinxCoroutines: '1.6.0',

stetho: '1.5.0',
stethoOkHttp: '1.5.0',
Expand Down Expand Up @@ -42,6 +42,7 @@ ext.versions = [

junit: '4.12',
assertJ: '3.8.0',
truth: '1.1.3',

stateMachine: "0.2.0"
]
Expand All @@ -61,7 +62,8 @@ ext.libs = [
kotlinx: [
coroutines: [
core: "org.jetbrains.kotlinx:kotlinx-coroutines-core:$versions.kotlinxCoroutines",
reactive: "org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$versions.kotlinxCoroutines"
reactive: "org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$versions.kotlinxCoroutines",
test: "org.jetbrains.kotlinx:kotlinx-coroutines-test:$versions.kotlinxCoroutines"
]
],

Expand Down Expand Up @@ -100,6 +102,7 @@ ext.libs = [
junit: "junit:junit:$versions.junit",
mockito: "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0",
assertJ: "org.assertj:assertj-core:$versions.assertJ",
truth: "com.google.truth:truth:$versions.truth",

kotlin: [
stdlib: "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version",
Expand Down
3 changes: 2 additions & 1 deletion scarlet-stream-adapter-coroutines/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ dependencies {
testImplementation libs.junit
testImplementation libs.mockito
testImplementation libs.kotlin.reflect
testImplementation libs.assertJ
testImplementation libs.kotlinx.coroutines.test
testImplementation libs.truth
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.tinder.streamadapter.coroutines

import com.tinder.scarlet.Stream
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.trySendBlocking

internal class ChannelForwarder<T>(bufferSize: Int) : Stream.Observer<T> {
private val channel = Channel<T>(bufferSize, BufferOverflow.DROP_OLDEST)
private var disposable: Stream.Disposable? = null

fun start(stream: Stream<T>): ReceiveChannel<T> {
disposable = stream.start(this)
return channel
}

override fun onComplete() {
channel.close()
disposable?.dispose()
}

override fun onError(throwable: Throwable) {
channel.close(throwable)
disposable?.dispose()
}

override fun onNext(data: T) {
channel.trySendBlocking(data)
.exceptionOrNull() ?.let { throw it }
greggiacovelli marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,25 @@ package com.tinder.streamadapter.coroutines
import com.tinder.scarlet.StreamAdapter
import com.tinder.scarlet.utils.getRawType
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.Flow
import java.lang.reflect.Type

/**
* A [stream adapter factory][StreamAdapter.Factory] that uses ReceiveChannel.
* A [stream adapter factory][StreamAdapter.Factory] that allows for [ReceiveChannel]
* and [Flow] based streams.
* [bufferSize] is configurable for the underlying channels, defaults to [DEFAULT_BUFFER]
*/
class CoroutinesStreamAdapterFactory : StreamAdapter.Factory {
private const val DEFAULT_BUFFER = 128

class CoroutinesStreamAdapterFactory(
private val bufferSize: Int = DEFAULT_BUFFER
) : StreamAdapter.Factory {

override fun create(type: Type): StreamAdapter<Any, Any> {
return when (type.getRawType()) {
ReceiveChannel::class.java -> ReceiveChannelStreamAdapter()
Flow::class.java -> FlowStreamAdapter(bufferSize)
ReceiveChannel::class.java -> ReceiveChannelStreamAdapter(bufferSize)
else -> throw IllegalArgumentException()
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* © 2013 - 2018 Tinder, Inc., ALL RIGHTS RESERVED
*/

package com.tinder.streamadapter.coroutines

import com.tinder.scarlet.Stream
import com.tinder.scarlet.StreamAdapter
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.receiveAsFlow

class FlowStreamAdapter<T>(private val buffer: Int) : StreamAdapter<T, Flow<T>> {

override fun adapt(stream: Stream<T>): Flow<T> {
return ChannelForwarder<T>(buffer).start(stream).receiveAsFlow()
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
/*
* © 2013 - 2018 Tinder, Inc., ALL RIGHTS RESERVED
*/

package com.tinder.streamadapter.coroutines

import com.tinder.scarlet.Stream
import com.tinder.scarlet.StreamAdapter
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.reactive.openSubscription

class ReceiveChannelStreamAdapter<T> : StreamAdapter<T, ReceiveChannel<T>> {
class ReceiveChannelStreamAdapter<T>(private val buffer: Int) : StreamAdapter<T, ReceiveChannel<T>> {

override fun adapt(stream: Stream<T>) = stream.openSubscription()
}
override fun adapt(stream: Stream<T>): ReceiveChannel<T> {
val channelForwarder = ChannelForwarder<T>(buffer)
return channelForwarder.start(stream)
}
}