Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid FuturesUnordered #1647

Merged
merged 6 commits into from
Oct 17, 2023
Merged

Conversation

dignifiedquire
Copy link
Contributor

Description

Workaround for #1646 and likely makes more sense in the context of Tokio usage anyway.

In general JoinSet is simply used instead.

Workaround for #1646  and likely makes more sense in the context of Tokio usage anyway.

In general `JoinSet` is simply used instead.
@dignifiedquire
Copy link
Contributor Author

CI is sad because MSRV bump to 1.71 is needed due to the new dep.

@Arqu
Copy link
Collaborator

Arqu commented Oct 16, 2023

/netsim

@github-actions
Copy link

fix-avoid-futures-unordered.2ed574d0617698ab2783c815e87c28830e5f4ba5
Perf report:

test case throughput_gbps throughput_transfer
iroh_latency_20ms 1_to_1 4.00 5.11
iroh_latency_20ms 1_to_3 8.02 9.64
iroh_latency_20ms 1_to_5 8.03 8.14
iroh_latency_20ms 1_to_10 8.34 8.08
iroh_latency_20ms 2_to_2 6.33 7.46
iroh_latency_20ms 2_to_4 10.76 12.05
iroh_latency_20ms 2_to_6 11.88 12.77
iroh_latency_20ms 2_to_10 15.27 15.45
iroh 1_to_1 3.31 3.95
iroh 1_to_3 8.06 9.04
iroh 1_to_5 8.02 8.16
iroh 1_to_10 8.60 8.41
iroh 2_to_2 6.28 7.38
iroh 2_to_4 11.49 13.08
iroh 2_to_6 13.58 14.63
iroh 2_to_10 14.00 14.00
iroh_latency_200ms 1_to_1 3.34 3.98
iroh_latency_200ms 1_to_3 7.44 8.75
iroh_latency_200ms 1_to_5 7.62 8.06
iroh_latency_200ms 1_to_10 7.77 7.37
iroh_latency_200ms 2_to_2 6.70 8.01
iroh_latency_200ms 2_to_4 10.77 12.07
iroh_latency_200ms 2_to_6 12.12 13.10
iroh_latency_200ms 2_to_10 14.17 14.18

@Arqu
Copy link
Collaborator

Arqu commented Oct 17, 2023

Ran tests with valgrind last night. Made 52 loops before it died. No errors reported. LGTM

@rklaehn
Copy link
Contributor

rklaehn commented Oct 17, 2023

So what exactly is the difference in terms of how the futures are being polled? Or is this just trying something until it works?

@dignifiedquire
Copy link
Contributor Author

So what exactly is the difference in terms of how the futures are being polled? Or is this just trying something until it works?

this is not about how futures are polled, it is about how the unsafe code in FuturesUnordered produces invalid memory reads and writes :(

.pending
.join_next()
.await
.expect("not canceled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it not be safer to log the JoinError and skip to the next item in the set if it was cancelled? I don't even know right now if this is a guarantee upheld. It was still in pending_peers i guess but is that a guarantee? And it seems like something that could easily change in a refactor without realising.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, to try to find the first working one

std::task::Poll::Ready(Some(Err(e))) => {
// Should not happen unless the task paniced or got canceled
// TODO: is this what we want to do here?
panic!("{:?}", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log the error? panicking because we couldn't dial something seems a bit harsh

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe, usually panics in one part of the code result in the other parts not quite working as intended anymore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now only logs

Some(Ok(Err(_))) => (),
Some(Err(e)) => {
warn!("fatal probes error: {:?}", e);
probes.abort_all();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if one probeset pancicked we don't need to abort all the other probesets I think. it should be fine to let the other probesets continue. (same for cancelled, but less likely to happen due to how we cancel things i think)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continues now

self.handle_abort_probes();
}
None => {
probes.abort_all();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is nothing to abort anymore, they're all done. it probably doesn't do any harm though, but line 303 already ensures it cleans up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@@ -253,6 +252,7 @@ impl Actor {

_ = &mut probe_timer => {
warn!("probes timed out");
probes.abort_all();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The probes are aborted on line 303 and they get a log message there. if you abort here i'm not sure if you get the log message. (admittedly the log message there is a bit misleading in case of a timeout, maybe it could be tweaked)

Then self.handle_abort_probes() makes sure that we break out of the loop. It's designed this way because it should not matter whether we fall in this condition by the timeout or by the actor message, and this way everything is done by the same code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

async fn prepare_probes_task(
&mut self,
) -> Result<FuturesUnordered<Pin<Box<impl Future<Output = Result<ProbeReport>>>>>> {
async fn prepare_probes_task(&mut self) -> Result<JoinSet<Result<ProbeReport>>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This no longer prepares a future but will spawn the futures on tasks and they'll start running right away. Maybe rename the function to start_probe_tasks or spawn_probe_tasks and update the first line of the docstring as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed

Comment on lines 637 to 640
Err(err) => {
warn!("fatal probe set error, aborting: {:#}", err);
return Err(anyhow::anyhow!(err));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a single probe panics (or is cancelled) I think it's probably fine to continue the other probes. Not really sure if all of them would panic as they run the same code... but really none should panic so I guess I'd still choose to carry on.

Not sure if we can log the panic in a reasonable way though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kept the logging, can continue now

@@ -266,11 +266,19 @@ impl Actor {
}

// Drive the probes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe update this comment to say something like "wait for probe tasks to finish" as this no longer drives the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

@rklaehn
Copy link
Contributor

rklaehn commented Oct 17, 2023

So what exactly is the difference in terms of how the futures are being polled? Or is this just trying something until it works?

this is not about how futures are polled, it is about how the unsafe code in FuturesUnordered produces invalid memory reads and writes :(

Oh boy... Did you open an issue?

@dignifiedquire
Copy link
Contributor Author

Oh boy... Did you open an issue?

of course : #1646 and rust-lang/futures-rs#2781

@dignifiedquire dignifiedquire added this pull request to the merge queue Oct 17, 2023
Merged via the queue into main with commit 5813e09 Oct 17, 2023
15 checks passed
@dignifiedquire dignifiedquire deleted the fix-avoid-futures-unordered branch October 17, 2023 13:43
dignifiedquire added a commit that referenced this pull request Oct 18, 2023
## Description

Workaround for #1646 and likely makes more sense in the context of Tokio
usage anyway.

In general `JoinSet` is simply used instead.
@b5 b5 added this to the v0.8.0 milestone Oct 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

None yet

5 participants