Skip to content

Commit

Permalink
Adopt R2DBC changes which do not always send a value (#3525)
Browse files Browse the repository at this point in the history
* Adopt R2DBC changes which does not always send a value

* Update R2dbcDriver.kt

* Update R2dbcDriver.kt

* Update R2dbcDriver.kt
  • Loading branch information
hfhbd committed Sep 22, 2022
1 parent 76a6f9a commit 45ba524
Showing 1 changed file with 11 additions and 10 deletions.
Expand Up @@ -8,7 +8,9 @@ import app.cash.sqldelight.db.SqlDriver
import app.cash.sqldelight.db.SqlPreparedStatement
import io.r2dbc.spi.Connection
import io.r2dbc.spi.Statement
import kotlinx.coroutines.reactive.awaitLast
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingle

class R2dbcDriver(private val connection: Connection) : SqlDriver {
Expand All @@ -26,10 +28,9 @@ class R2dbcDriver(private val connection: Connection) : SqlDriver {
return QueryResult.AsyncValue {
val result = prepared.execute().awaitSingle()

val rowSet = mutableListOf<Map<Int, Any?>>()
result.map { row, rowMetadata ->
rowSet.add(rowMetadata.columnMetadatas.mapIndexed { index, _ -> index to row.get(index) }.toMap())
}.awaitLast()
val rowSet = result.map { row, rowMetadata ->
List(rowMetadata.columnMetadatas.size) { index -> row.get(index) }
}.asFlow().toList()

return@AsyncValue mapper(R2dbcCursor(rowSet))
}
Expand All @@ -47,7 +48,7 @@ class R2dbcDriver(private val connection: Connection) : SqlDriver {

return QueryResult.AsyncValue {
val result = prepared.execute().awaitSingle()
return@AsyncValue result.rowsUpdated.awaitSingle()
return@AsyncValue result.rowsUpdated.awaitFirstOrNull() ?: 0
}
}

Expand All @@ -64,7 +65,7 @@ class R2dbcDriver(private val connection: Connection) : SqlDriver {
this.transaction = transaction

if (enclosing == null) {
connection.beginTransaction().awaitSingle()
connection.beginTransaction().awaitFirstOrNull()
}

return@AsyncValue transaction
Expand All @@ -88,9 +89,9 @@ class R2dbcDriver(private val connection: Connection) : SqlDriver {
override fun endTransaction(successful: Boolean): QueryResult<Unit> = QueryResult.AsyncValue {
if (enclosingTransaction == null) {
if (successful) {
connection.commitTransaction().awaitSingle()
connection.commitTransaction().awaitFirstOrNull()
} else {
connection.rollbackTransaction().awaitSingle()
connection.rollbackTransaction().awaitFirstOrNull()
}
}
transaction = enclosingTransaction
Expand Down Expand Up @@ -151,7 +152,7 @@ class R2dbcPreparedStatement(private val statement: Statement) : SqlPreparedStat
/**
* TODO: Write a better async cursor API
*/
class R2dbcCursor(val rowSet: List<Map<Int, Any?>>) : SqlCursor {
class R2dbcCursor(val rowSet: List<List<Any?>>) : SqlCursor {
var row = -1
private set

Expand Down

0 comments on commit 45ba524

Please sign in to comment.