diff --git a/drivers/r2dbc-driver/src/main/kotlin/app/cash/sqldelight/driver/r2dbc/R2dbcDriver.kt b/drivers/r2dbc-driver/src/main/kotlin/app/cash/sqldelight/driver/r2dbc/R2dbcDriver.kt index 848110c617b..346cac32c0c 100644 --- a/drivers/r2dbc-driver/src/main/kotlin/app/cash/sqldelight/driver/r2dbc/R2dbcDriver.kt +++ b/drivers/r2dbc-driver/src/main/kotlin/app/cash/sqldelight/driver/r2dbc/R2dbcDriver.kt @@ -8,7 +8,9 @@ import app.cash.sqldelight.db.SqlDriver import app.cash.sqldelight.db.SqlPreparedStatement import io.r2dbc.spi.Connection import io.r2dbc.spi.Statement -import kotlinx.coroutines.reactive.awaitLast +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.reactive.asFlow +import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.coroutines.reactive.awaitSingle class R2dbcDriver(private val connection: Connection) : SqlDriver { @@ -26,10 +28,9 @@ class R2dbcDriver(private val connection: Connection) : SqlDriver { return QueryResult.AsyncValue { val result = prepared.execute().awaitSingle() - val rowSet = mutableListOf>() - result.map { row, rowMetadata -> - rowSet.add(rowMetadata.columnMetadatas.mapIndexed { index, _ -> index to row.get(index) }.toMap()) - }.awaitLast() + val rowSet = result.map { row, rowMetadata -> + List(rowMetadata.columnMetadatas.size) { index -> row.get(index) } + }.asFlow().toList() return@AsyncValue mapper(R2dbcCursor(rowSet)) } @@ -47,7 +48,7 @@ class R2dbcDriver(private val connection: Connection) : SqlDriver { return QueryResult.AsyncValue { val result = prepared.execute().awaitSingle() - return@AsyncValue result.rowsUpdated.awaitSingle() + return@AsyncValue result.rowsUpdated.awaitFirstOrNull() ?: 0 } } @@ -64,7 +65,7 @@ class R2dbcDriver(private val connection: Connection) : SqlDriver { this.transaction = transaction if (enclosing == null) { - connection.beginTransaction().awaitSingle() + connection.beginTransaction().awaitFirstOrNull() } return@AsyncValue transaction @@ -88,9 +89,9 @@ class R2dbcDriver(private val connection: Connection) : SqlDriver { override fun endTransaction(successful: Boolean): QueryResult = QueryResult.AsyncValue { if (enclosingTransaction == null) { if (successful) { - connection.commitTransaction().awaitSingle() + connection.commitTransaction().awaitFirstOrNull() } else { - connection.rollbackTransaction().awaitSingle() + connection.rollbackTransaction().awaitFirstOrNull() } } transaction = enclosingTransaction @@ -151,7 +152,7 @@ class R2dbcPreparedStatement(private val statement: Statement) : SqlPreparedStat /** * TODO: Write a better async cursor API */ -class R2dbcCursor(val rowSet: List>) : SqlCursor { +class R2dbcCursor(val rowSet: List>) : SqlCursor { var row = -1 private set