Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to process payload from notify after collected notify? #1131

Open
Barto-Paja opened this issue Apr 15, 2024 · 0 comments
Open

How to process payload from notify after collected notify? #1131

Barto-Paja opened this issue Apr 15, 2024 · 0 comments

Comments

@Barto-Paja
Copy link

Barto-Paja commented Apr 15, 2024

Hello,
I'm trying to process the collected payload from a notification. When I receive notification from PostgreSQL database, I would like to insert new data into the db.
I took the code from example

#[tokio::test]
async fn notifications() {
let (client, mut connection) = connect_raw("user=postgres").await.unwrap();
let (tx, rx) = mpsc::unbounded();
let stream =
stream::poll_fn(move |cx| connection.poll_message(cx)).map_err(|e| panic!("{}", e));
let connection = stream.forward(tx).map(|r| r.unwrap());
tokio::spawn(connection);
client
.batch_execute(
"LISTEN test_notifications;
NOTIFY test_notifications, 'hello';
NOTIFY test_notifications, 'world';",
)
.await
.unwrap();
drop(client);
let notifications = rx
.filter_map(|m| match m {
AsyncMessage::Notification(n) => future::ready(Some(n)),
_ => future::ready(None),
})
.collect::<Vec<_>>()
.await;
assert_eq!(notifications.len(), 2);
assert_eq!(notifications[0].channel(), "test_notifications");
assert_eq!(notifications[0].payload(), "hello");
assert_eq!(notifications[1].channel(), "test_notifications");
assert_eq!(notifications[1].payload(), "world");
}

something like this:

let notifications = rx
    .filter_map(|m| match m {
        AsyncMessage::Notification(n) => {
            println!("Notification {:?}", n);
            // insert_stuff(connection_parameters, n.payload()) <-- insert new data into db, but here asking for add await
            future::ready(Some(n))
        },
        _ => future::ready(None),
    })
    .collect::<Vec<_>>()
    .await;

Note: newbie in rust

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant