Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lossy Channels #400

Open
neachdainn opened this issue Jul 3, 2019 · 15 comments
Open

Lossy Channels #400

neachdainn opened this issue Jul 3, 2019 · 15 comments

Comments

@neachdainn
Copy link

Is there any current work for adding lossy bounded channels? Or any strong reason to not add them? If not, I might open a PR in the next few weeks with an implementation.

@vorner
Copy link
Contributor

vorner commented Jul 7, 2019

What do you mean by lossy? If „drop stuff when it's full“, shouldn't try_send serve your needs?

@neachdainn
Copy link
Author

neachdainn commented Jul 7, 2019

Apologies for not being clear. By "lossy channel", I mean one that discards old data when the buffer is full. This is useful for things like robotics where new sensor information is more important than old information and likely makes the old information completely worthless.

The try_send method sort of works but it either adds an unnecessary lock or a racey loop.

@vorner
Copy link
Contributor

vorner commented Jul 7, 2019

I see, try_send is more suitable to drop the new data than the old one.

I'm not sure if you really need a full channel for such thing ‒ do you need a „buffer“ for several updates, then? If I was doing something like sensor data, I'd probably have some kind of atomic storage for single (newest) snapshot of each sensor, then a mechanism to wake up the thread that consumes the data. The wake-up could be a single-element bounded channel with element = (). That one could use try_send, since it doesn't matter if I throw out the old or new ().

@neachdainn
Copy link
Author

neachdainn commented Jul 7, 2019

I'm not sure if you really need a full channel for such thing ‒ do you need a „buffer“ for several updates, then?

You may want the last N updates. You might also want all the updates you can get but the sensor is vastly outpacing the processing and so you have to put a hard cap on it. Once you start doing that, you either have to start locking your buffer or make it atomic, which is essentially the lossy channel.

A sizeable chunk of the robotics community, and probably most of robotics in acedamia, heavily utilize lossy channels like this (see ROS). The select macro has been a nice change of pace from callbacks, which don't work very well in Rust and so I would like to see something like lossy channels usable with the macro.

@ghost
Copy link

ghost commented Jul 8, 2019

@neachdainn In case the channel is full and you want to "overwrite" the oldest element in it, what if you called .recv() to drop the oldest element and then follow up with .send() to send the new element?

I wonder if that would work for you?

@neachdainn
Copy link
Author

That is what I'm doing now but it requires a racey loop.

@Chopinsky
Copy link

I can see @neachdainn 's point here: a common use case of channels usually require separation of the sender and the receiver, where the sender will be owned by the data producer, while the receiver owned by the consumer. And in this pattern, the producer usually don't have access to the receiver to pop one or more messages when the channel is full (except making a receiver clone and use it solely for full-inbox-poping, seems a waste), and it is also hard for the consumer to decide if it shall discard some "old" messages to clean up space for "newer" messages, as it can't foresee if more messages are coming.

I think a naive implementation of the lossy sender would just be a wrapper to pop a message from the full channel before sending a new one, but that could save some headache for the channel users.

@neachdainn
Copy link
Author

Just to be clear: I am not asking for someone to implement this for me. I am willing to spend time implementing this, I just want to know if a PR for something like this would be (potentially) accepted or if someone is already working on a similar channel flavor.

@ghost
Copy link

ghost commented Jul 22, 2019

I don't think adding a whole new channel flavor is worth it. At best, we might introduce a new method, perhaps named force_send. Unlike try_send, it would overwrite the last element in the channel instead of failing when the channel is full.

But even so, I find this a bit of a niche use case and the problem is not difficult to get around manually. I'm wary of adding small helper methods like these -- the channel interface is already complex enough, and there's always a bunch of new methods we could add.

@neachdainn
Copy link
Author

neachdainn commented Jul 22, 2019

I don't think adding a whole new channel flavor is worth it. At best, we might introduce a new method, perhaps named force_send. Unlike try_send, it would overwrite the last element in the channel instead of failing when the channel is full.

That would definitely cover my use-case (assuming "last element" means the oldest element) and a new flavor is probably overkill.

But even so, I find this a bit of a niche use case and the problem is not difficult to get around manually.

My example might be niche but I think the concept of a channel that drops the oldest item is not uncommon. And while it isn't difficult to get around manually, it is very inelegant. At the very least, the problem of figuring out when to break out of the try_send/recv loop is not trivial. Currently, my code either tries to send twice before giving up or I have had to add a timestamp into the structure, both of which are way less elegant than having a force_send style method.

I'm wary of adding small helper methods like these -- the channel interface is already complex enough, and there's always a bunch of new methods we could add.

That is fair. I really like how Crossbeam and select have been able to clean up my code and would like to keep using it, but I can definitely refactor and write my own queue if I need to.

@Matthias247
Copy link

FWIW, I added an asynchronous variant of such a channel type to https://github.com/Matthias247/futures-intrusive, which I called StateBroadcastChannel.

The motivation is more or less what @neachdainn was asking for. A certain component generating state updates which must be distributed to potentially more than 1 consumer. I had use-cases for that before, when working on embedded (soft)realtime system.

In my case I did not add any additional buffering to the channel. I guess if that would be required it would be more of a buffer per consumer instead of on inside the channel.

@gterzian
Copy link

gterzian commented Oct 22, 2019

Perhaps a Mutex<VecDeque> could be more appropriate to implement the shared data buffer.

You could try to use a technique similar to graphics programming where you would swap buffers, see https://computergraphics.stackexchange.com/questions/4550/how-double-buffers-works-in-opengl

For example, let's say the consumer always wants a batch of N updates. Then you could have the producer keep push_back updates into a shared-queue, and when the consumer is ready to take the next batch, it could:

  1. let shared_queue be a Arc<Mutex<VecDeque<Update>>>, shared between consumer and producer.
  2. let consumer_queue be a VecDeque<Update>, found in the consumer thread, initially empty.
  3. let consumer_queue be the result of mem::replace(*shared_queue.lock(), consumer_queue);.
  4. Resize consumer_queue to your needs of N updates, for example doing rotate_left(N) followed by truncate(N).
  5. Process consumer_queue.
  6. When consumer_queue is empty, go back to 3.

Alternatively, you could maybe do something from the producer thread that would combine buffering updates locally for the "next batch", while doing a send_timeout of the current batch to the consumer. Then when the timeout hits, that means the consumer has missed a batch, so you discard it and take a new one from the buffer that has accumulated updates in the meantime. That might require a third thread, depending on what the source of updates is.

I really like how Crossbeam and select have been able to clean up my code and would like to keep using it, but I can definitely refactor and write my own queue if I need to.

You could still use a channel and select to signal various control operation between consumer and producer, and using the channel as the underlying buffer might be less good of a fit.

@neachdainn
Copy link
Author

neachdainn commented Oct 22, 2019

What I have now works well enough for me - I opened the issue originally because I thought it would be a feature that people (including myself) would appreciate having. I've since started digging through the code and I've come to two conclusions:

  1. Out of the three flavors, this really only makes sense for one of them and as a result I am less convinced this functionality belongs here.
  2. Implementing this myself will probably take more time than I will be able to muster up for a long time.

Based on those conclusions, I would be fine if this issue is closed.

@jstoneham
Copy link

We are still interested in this feature.

@taiki-e
Copy link
Member

taiki-e commented Dec 2, 2023

This should be able to be implemented just by porting #789 to channel.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

No branches or pull requests

7 participants