Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add method to Mpsc(Unbounded)ArrayQueue to allow non-consumer thread to get a snapshot of the queue content #228

Closed
srdo opened this issue Jan 17, 2019 · 2 comments

Comments

@srdo
Copy link
Contributor

srdo commented Jan 17, 2019

Hi,

I would like to be able to peek at the contents of the MpscArrayQueue or MpscUnboundedArrayQueue from a non-consumer thread.

I'm working on Apache Storm, which has the following queue setup to process a stream of data:

Inbound thread -> inputQueue -> user code -> outputQueue -> Outbound thread

I would like for a thread that is not the user code or the outbound thread to be able to peek at, but not modify, the elements of inputQueue and outputQueue. Essentially I would like a public Collection<E> unorderedSnapshot() method that just returns a copy of the contents of the queue's buffer, filtering out any nulls or SKIP instructions.

Just to justify why I need this, feel free to skip if it's not important:

Storm is a system for distributed processing of data streams. When a new element in a data stream is submitted for processing, it may go to many physical machines before it is considered fully processed.

Storm puts a timeout on how long the full processing sequence is allowed to take, in order to offer at-least-once processing guarantees in the presence of e.g. crashing machines or network issues.

I would like to be able to keep this timeout from expiring as long as any machines are still actively processing an element. One of the places an element is at risk of timing out is when it is stuck in the inputQueue or outputQueue, either due to slow user code, or due to backpressure from downstream processing steps.

To keep the timeout from expiring, I would like for a heartbeat thread to occasionally reset the timeout for messages that are stuck in the inputQueue or outputQueue. In order to do this, I need this thread to be able to read the queue contents.

@nitsanw
Copy link
Contributor

nitsanw commented Jan 18, 2019

This could be implemented if the queues supported iterator in a weak fashion. The reason iterators were not implemented for JCTools queues are:

  1. Implementing remove is terrible array backed queues
  2. Extra work for not much interest
  3. Inconsistencies with concurrent access

If we agree on not implementing remove and offering no guarantees with regards to concurrent access I think this is fairly straight forwards. The weak iterator would be only work properly on queues at rest (i.e. that are not being modified concurrently), but it would be hard to enforce no early termination in the face of consumers. Consider the case where a linked node is consumed in parallel to iteration: If we do not clear the linking reference we will cause GC nepotism, but if we do clear it we cause iterators to abort mid queue. Alternatively consider iteration on an array backed buffer where consumers/producer may work their way around the buffer before the iterator is moved. One thing that is not acceptable to my mind is hampering the main usecase(message passing) to support iterators.

I'm not keen on returning a collection here, I would prefer giving you an iterator and you can dump it into a collection of your choice or not at all.

Thanks!

@srdo
Copy link
Contributor Author

srdo commented Jan 18, 2019

Thanks for reviewing.

it would be hard to enforce no early termination in the face of consumers

Yes. What I want to do with the method would be something like

Collection<E> olderSnapshot;
Collection<E> currentSnapshot;
scheduleRecurring(interval, () -> {
  olderSnapshot.forEach(resetTimeout);
  olderSnapshot = currentSnapshot;
  currentSnapshot = queue.unorderedSnapshot();
})

to avoid doing any reset work unless elements have been enqueued for a while. Having the snapshot be missing later parts of the queue because of unlucky racing with the consumer thread would make a mechanism like this very unreliable.

I was not aware of the GC nepotism issue, I'll see if I can work around it. I think the important part is for the iterator to be able to distinguish the case where the last element is null because the producer hasn't filled the buffer yet, and so the iterator should stop, and the case where the last element is null because the consumer is done consuming everything in the buffer.

Maybe we can insert a marker element or something in place of the nextBuffer reference once the consumer is done with a buffer, instead of nulling it. The iterator could react to this marker by reloading the buffer reference from the consumerBuffer field, instead of trying to follow the same links the consumer used.

Thanks for the advice, I'll keep working on it. I agree that a method like this isn't worth it if it makes message passing more expensive.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants