Skip to content

Commit

Permalink
query_as: don't stop stream after decoding error (#1887)
Browse files Browse the repository at this point in the history
* query_as: don't stop stream after decoding error

Fixes #1884

When a single row cannot be converted to the target type of query_as,
it should not prevent the library user from accessing the other rows

Otherwise, the user cannot access all query results in query_as.

* use union in tests to maximize db compatibility
  • Loading branch information
lovasoa committed Jun 2, 2022
1 parent 24baac7 commit 20d61f4
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 12 deletions.
20 changes: 8 additions & 12 deletions sqlx-core/src/query_as.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,18 +110,14 @@ where
O: 'e,
A: 'e,
{
Box::pin(try_stream! {
let mut s = executor.fetch_many(self.inner);

while let Some(v) = s.try_next().await? {
r#yield!(match v {
Either::Left(v) => Either::Left(v),
Either::Right(row) => Either::Right(O::from_row(&row)?),
});
}

Ok(())
})
executor
.fetch_many(self.inner)
.map(|v| match v {
Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
Ok(Either::Left(v)) => Ok(Either::Left(v)),
Err(e) => Err(e),
})
.boxed()
}

/// Execute the query and return all the generated results, collected into a [`Vec`].
Expand Down
29 changes: 29 additions & 0 deletions tests/any/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,35 @@ async fn it_executes_with_pool() -> anyhow::Result<()> {
Ok(())
}

#[sqlx_macros::test]
async fn it_does_not_stop_stream_after_decoding_error() -> anyhow::Result<()> {
use futures::stream::StreamExt;
// see https://github.com/launchbadge/sqlx/issues/1884
let pool = sqlx_test::pool::<Any>().await?;

#[derive(Debug, PartialEq)]
struct MyType;
impl<'a> sqlx::FromRow<'a, AnyRow> for MyType {
fn from_row(row: &'a AnyRow) -> sqlx::Result<Self> {
let n = row.try_get::<i32, _>(0)?;
if n == 1 {
Err(sqlx::Error::RowNotFound)
} else {
Ok(MyType)
}
}
}

let rows = sqlx::query_as("SELECT 0 UNION ALL SELECT 1 UNION ALL SELECT 2")
.fetch(&pool)
.map(|r| r.ok())
.collect::<Vec<_>>()
.await;

assert_eq!(rows, vec![Some(MyType), None, Some(MyType)]);
Ok(())
}

#[sqlx_macros::test]
async fn it_gets_by_name() -> anyhow::Result<()> {
let mut conn = new::<Any>().await?;
Expand Down

0 comments on commit 20d61f4

Please sign in to comment.