Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add mapped_futures #2751

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open

Conversation

StoicDeveloper
Copy link

@StoicDeveloper StoicDeveloper commented Jun 11, 2023

Adds the functionality asked for in this issue: #2329. I wanted this feature, searched for it, found only people wishing that the feature existed, and so decided to write it myself. A previous attempt that wrapped FuturesUnordered didn't work; I wanted to avoid anything unsafe which I haven't used before. Having worked with it now though, I can see that unsafe rust is really just C with some guardrails.

The motivation is to have a HashMap that can run futures. Adds the MappedFutures struct, which supports inserting, get_muting, cancelling, removing, and replacing futures by key, and running them in an identical style as FuturesUnordered.

Internally it uses a HashSet and adds a hashable wrapper around the Task struct from FuturesUnordered.
In terms of ergonomics, the API is perhaps a bit confused, including both the insert(key, fut) -> bool and replace(key, fut) -> Option<Fut> methods which are more similar to the methods of HashSet than HashMap. The source of this confusion is that some methods can only work if the Future is Unpin; ie. anything that would return an owned future or future reference.

Some justifications:

  • I set the output for the stream as (key, fut_output) to be similar to a LinkedHashMap, which in some ways this library is similar to (but with next().await in place of pop_front(). I think it will be typical for a consumer to want to know the key of whichever future has completed, though of course any future could be mapped as such prior to insertion.
  • an earlier version used HashMap internally, which was the obvious solution for a struct which is "A hashmap that runs futures", but returning the key with the output required having a copy of the key in both the HashMap and in the Task, and so the Key had to be Clone. I added a wrapper HashTask for Arc<Task> which is placed in a HashSet to get around this requirement. Its possible that a different solution would have worked (like pointers to and from a Key wrapper and Task), but this method seemed the most expedient.

Potential improvements:

  • Allow selecting which hasher to use, like how HashSet does. This should be a pretty easy change.

I did already publish these changes in a crate which I will yank if this gets merged. https://crates.io/crates/mapped_futures

@StoicDeveloper
Copy link
Author

StoicDeveloper commented Jun 12, 2023

I'm also considering further additions for more complex data structure support, such as:

  • MultiMapFutures which allows a key to index for one or more futures
  • BiMapFutures which allows futures to be accessed from either of two key types (really the BiMap is between Key-Value pairs, neither of which is a future, and each pair is associated with a future) so that each key, value, or key-value pair will index to 0 or 1 futures.
  • BiMultiMapFutures which is like BiMapFutures, but allows duplicates among keys and values, so that each key or value may index to 1 or more futures, though each key-value pair will index to only 0 or 1 futures.

@taiki-e
Copy link
Member

taiki-e commented Jun 18, 2023

Thanks for the PR! It looks like a lot of the code added here is a copy from FutureUnordered. Is it possible to replace some of them with re-exports or wrappers for the types and functions used in FutureUnordered?

@taiki-e taiki-e added the A-stream Area: futures::stream label Jun 18, 2023
@StoicDeveloper
Copy link
Author

Thanks for the PR! It looks like a lot of the code added here is a copy from FutureUnordered. Is it possible to replace some of them with re-exports or wrappers for the types and functions used in FutureUnordered?

This was my first approach, but it didn't work out. This PR required several small internal changes to FuturesUnordered, such that I don't think it would be possible to implement this as a simple wrapper. However, it may be possible to do so by making changes within the actual FuturesUnordered module that would make it easier to reuse. I'll look into that.

@taiki-e
Copy link
Member

taiki-e commented Jul 19, 2023

However, it may be possible to do so by making changes within the actual FuturesUnordered module that would make it easier to reuse. I'll look into that.

Yeah, I guess making mapped_futures a submodule of the futures_unordered module or creating a new module for the code used in both can remove a lot of duplication.

@StoicDeveloper
Copy link
Author

StoicDeveloper commented Sep 30, 2023

@taiki-e

I've implemented a new module futures_keyed which contains most of the code that was in futures_unordered. futures_unordered and mapped_futures now consume the API of futures_keyed. There is more work to do, and there are some things about this implementation that are not ideal

Work remaining:

  • implement more mapping types, like mapped_streams, bi_multi_map_futures, and bi_multi_map_streams, which I have implemented in the mapped_futures crate, but I wanted to check whether what I have so far is acceptable to this crate's maintainers before going further
  • other housekeeping, remove commented code, add more comments and documentation, implement Debug on some structs to silence compiler warnings

Things that aren't ideal

  • The API for futures_keyed adds the ReleasesTask trait, and consumers that contain KeyedFutures must provide an implementor of this trait. I personally find this to be very unergonomic, but it was the only way I could think of to make this change. The API for futures_unordered is of course unchanged, but internally it now creates a DummyStruct that implements ReleasesTask using logic that does nothing, since the existing FuturesKeyed::release_task() contains all necessary logic. This trait was necessary so that a consumer can remove residual elements from its internal data structure if release_task is called. Ex. MappedFutures uses a HashSet of Tasks, but calling release_task on the Task will not remove the corresponding struct from the HashSet; now release_task() will handle that by calling code from the trait.
  • I couldn't think of a better name than futures_keyed; perhaps futures_unordered_internal. It might be better to not say "keyed" since there is the potential to use the new key field of Task for things other than keys.
    • For example, a MultiMapFutures struct, in which a single key maps to 0 or more futures (such that a getter function could have the signature MultiMapFutures<K,F>::get(&self, key: K) -> &Vec<Entry<Task<K,F>>>), but each future does not have a unique key, could work if Task contains a pointer to the Entry instead of a key. Such a struct could be implemented as a consumer of FuturesKeyed without further modification, but the naming scheme wouldn't make sense.

Please let me know of any problems with this implementation; if you find none then I will proceed with implementing more consumers of futures_keyed

@taiki-e
Copy link
Member

taiki-e commented Oct 8, 2023

I've implemented a new module futures_keyed which contains most of the code that was in futures_unordered.

Thanks.

  • implement more mapping types, like mapped_streams, bi_multi_map_futures, and bi_multi_map_streams, which I have implemented in the mapped_futures crate, but I wanted to check whether what I have so far is acceptable to this crate's maintainers before going further

For now, I would like to add a minimum (mapped_futures and mapped_streams; other ones are implemented on top of them, right?).

I couldn't think of a better name than futures_keyed; perhaps futures_unordered_internal. It might be better to not say "keyed" since there is the potential to use the new key field of Task for things other than keys.

Yeah, futures_unordered_internal would be better.

Comment on lines +236 to +239
pub fn get(&mut self, key: &K) -> Option<&Fut>
where
Fut: Unpin,
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pin always implements Deref (i.e., we can always safely get &T from Pin<&T>/Pin<&mut T>), so I think we can remove the Unpin bound here.

use super::task::Task;
use super::FuturesUnordered;
use core::marker::PhantomData;
use crate::stream::futures_keyed;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All Send/Sync implementations in this file can be removed since they are propagated from internal types.

Comment on lines +127 to +140

// SAFETY: we do nothing thread-local and there is no interior mutability,
// so the usual structural `Send`/`Sync` apply.
unsafe impl<K: Hash + Eq, Fut: Send> Send for IterPinRef<'_, K, Fut> {}
unsafe impl<K: Hash + Eq, Fut: Sync> Sync for IterPinRef<'_, K, Fut> {}

unsafe impl<K: Hash + Eq, Fut: Send> Send for IterPinMut<'_, K, Fut> {}
unsafe impl<K: Hash + Eq, Fut: Sync> Sync for IterPinMut<'_, K, Fut> {}

unsafe impl<K: Hash + Eq, Fut: Send + Unpin> Send for IntoIter<K, Fut> {}
unsafe impl<K: Hash + Eq, Fut: Sync + Unpin> Sync for IntoIter<K, Fut> {}

unsafe impl<K: Hash + Eq, Fut: Send + Unpin> Send for Keys<'_, K, Fut> {}
unsafe impl<K: Hash + Eq, Fut: Sync + Unpin> Sync for Keys<'_, K, Fut> {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Suggested change
// SAFETY: we do nothing thread-local and there is no interior mutability,
// so the usual structural `Send`/`Sync` apply.
unsafe impl<K: Hash + Eq, Fut: Send> Send for IterPinRef<'_, K, Fut> {}
unsafe impl<K: Hash + Eq, Fut: Sync> Sync for IterPinRef<'_, K, Fut> {}
unsafe impl<K: Hash + Eq, Fut: Send> Send for IterPinMut<'_, K, Fut> {}
unsafe impl<K: Hash + Eq, Fut: Sync> Sync for IterPinMut<'_, K, Fut> {}
unsafe impl<K: Hash + Eq, Fut: Send + Unpin> Send for IntoIter<K, Fut> {}
unsafe impl<K: Hash + Eq, Fut: Sync + Unpin> Sync for IntoIter<K, Fut> {}
unsafe impl<K: Hash + Eq, Fut: Send + Unpin> Send for Keys<'_, K, Fut> {}
unsafe impl<K: Hash + Eq, Fut: Sync + Unpin> Sync for Keys<'_, K, Fut> {}

Comment on lines +169 to +178
// SAFETY: we do nothing thread-local and there is no interior mutability,
// so the usual structural `Send`/`Sync` apply.
unsafe impl<K, Fut: Send, S: ReleasesTask<K>> Send for IterPinRef<'_, K, Fut, S> {}
unsafe impl<K, Fut: Sync, S: ReleasesTask<K>> Sync for IterPinRef<'_, K, Fut, S> {}

unsafe impl<K, Fut: Send, S: ReleasesTask<K>> Send for IterPinMut<'_, K, Fut, S> {}
unsafe impl<K, Fut: Sync, S: ReleasesTask<K>> Sync for IterPinMut<'_, K, Fut, S> {}

unsafe impl<K, Fut: Send + Unpin, S: ReleasesTask<K>> Send for IntoIter<K, Fut, S> {}
unsafe impl<K, Fut: Sync + Unpin, S: ReleasesTask<K>> Sync for IntoIter<K, Fut, S> {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keys are also returned from the iterator, so they must also be Send or Sync.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-stream Area: futures::stream
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants