Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adopt R2DBC changes which do not always send a value #3525

Merged
merged 4 commits into from Sep 22, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -8,7 +8,9 @@ import app.cash.sqldelight.db.SqlDriver
import app.cash.sqldelight.db.SqlPreparedStatement
import io.r2dbc.spi.Connection
import io.r2dbc.spi.Statement
import kotlinx.coroutines.reactive.awaitLast
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingle

class R2dbcDriver(private val connection: Connection) : SqlDriver {
Expand All @@ -26,10 +28,9 @@ class R2dbcDriver(private val connection: Connection) : SqlDriver {
return QueryResult.AsyncValue {
val result = prepared.execute().awaitSingle()

val rowSet = mutableListOf<Map<Int, Any?>>()
result.map { row, rowMetadata ->
rowSet.add(rowMetadata.columnMetadatas.mapIndexed { index, _ -> index to row.get(index) }.toMap())
}.awaitLast()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awaitLast requires at least one item, but this does not work if your result is empty, eg because the query does not return any rows.

val rowSet = result.map { row, rowMetadata ->
List(rowMetadata.columnMetadatas.size) { index -> row.get(index) }
}.asFlow().toList()

return@AsyncValue mapper(R2dbcCursor(rowSet))
}
Expand All @@ -47,7 +48,7 @@ class R2dbcDriver(private val connection: Connection) : SqlDriver {

return QueryResult.AsyncValue {
val result = prepared.execute().awaitSingle()
return@AsyncValue result.rowsUpdated.awaitSingle()
return@AsyncValue result.rowsUpdated.awaitFirstOrNull() ?: 0
}
}

Expand All @@ -64,7 +65,7 @@ class R2dbcDriver(private val connection: Connection) : SqlDriver {
this.transaction = transaction

if (enclosing == null) {
connection.beginTransaction().awaitSingle()
connection.beginTransaction().awaitFirstOrNull()
}

return@AsyncValue transaction
Expand All @@ -88,9 +89,9 @@ class R2dbcDriver(private val connection: Connection) : SqlDriver {
override fun endTransaction(successful: Boolean): QueryResult<Unit> = QueryResult.AsyncValue {
if (enclosingTransaction == null) {
if (successful) {
connection.commitTransaction().awaitSingle()
connection.commitTransaction().awaitFirstOrNull()
} else {
connection.rollbackTransaction().awaitSingle()
connection.rollbackTransaction().awaitFirstOrNull()
}
}
transaction = enclosingTransaction
Expand Down Expand Up @@ -151,7 +152,7 @@ class R2dbcPreparedStatement(private val statement: Statement) : SqlPreparedStat
/**
* TODO: Write a better async cursor API
*/
class R2dbcCursor(val rowSet: List<Map<Int, Any?>>) : SqlCursor {
class R2dbcCursor(val rowSet: List<List<Any?>>) : SqlCursor {
var row = -1
private set

Expand Down