Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using async iterator-like SQLX fetch with Rayon #1090

Open
nyurik opened this issue Sep 16, 2023 · 4 comments
Open

Using async iterator-like SQLX fetch with Rayon #1090

nyurik opened this issue Sep 16, 2023 · 4 comments

Comments

@nyurik
Copy link

nyurik commented Sep 16, 2023

Crossposting with stackoverflow:

Rust SQLX lib provides an iterator-like interface fetch(...) usually used with while let Some(row) = rows.try_next().await? {...} construct. In my case, each row may take some time to process, so I would like to use Rayon's par_iter(), but that requires a real iterator. Using fetch_all is not an option because all rows may simply not fit into memory.

How can I use Rayon's par_iter to process millions of rows produced by the sqlx's fetch stream?

@cuviper
Copy link
Member

cuviper commented Sep 18, 2023

You may have an easier time if you just use an async threadpool, like tokio provides.

What sort of results come out of your parallel process? The shape of that will affect how you might shoehorn this into rayon.

@nyurik
Copy link
Author

nyurik commented Sep 18, 2023

Thx @cuviper , each operation is essentially a bool -- in a way, I am trying to parallel-ize validation of a large dataset, searching for any row that do not pass validation. So ordering is not important, but would be good to abort early if any validation fails.

@cuviper
Copy link
Member

cuviper commented Sep 18, 2023

One way you could try is with tokio channels, using async send and sync blocking_recv something like:

std::iter::from_fn(move || rx.blocking_recv()).par_bridge() //...

@adamreichold
Copy link
Collaborator

adamreichold commented Sep 18, 2023

If you are using sqlx and therefore already run within an async runtime providing a thread pool, I also suspect that just chunking the rows using something like StreamExt::chunks and spawning a task for each chunk would be preferable to trying to force this into a parallel iterator. Alternatively, spawning a task per hardware thread and using an async mpmc channel like flume or kanal might work as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants