Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use channel as a stremming object #895

Open
kvokka opened this issue Nov 13, 2020 · 0 comments
Open

Use channel as a stremming object #895

kvokka opened this issue Nov 13, 2020 · 0 comments

Comments

@kvokka
Copy link

kvokka commented Nov 13, 2020

I found that Concurrent::Channel is great for representing stream data.

Initial problem was, that I needed to stream data in JRuby with Sinatra (so i stuck with Puma and was not able to use embedded sinatra stream module, which require Rainbows).

Concurrent::Channel fits almost ideally, but it needs to have some protection from unstarted stream && some callbacks in the end. Idk if it is only my implementation related, it is required in general. Anyway, I feel that it is a good usage of a lib. Before making a PR wanted to ask you first if this is worth it, or it is my very specific use case related only from your perspective.

Sample code with usage examples:

require "concurrent-edge"

module Concurrent
  class StreamingChannel < Channel
    # Array, where each element must respond_to :call
    def after_each_callbacks
      @after_each_callbacks ||= []
    end

    # After first data out we mark the stream as started.
    # This allows to determine in other threads which channels should be killed as inactive
    def each
      raise ArgumentError.new('no block given') unless block_given?

      item, more = do_next
      yield(item) unless item == Concurrent::NULL
      return unless more

      started!
      super
    ensure
      after_each_callbacks.each { |cb| cb.call(self) }
    end

    # if we processed at least something, we assume, that the streaming process
    # initiated correctly, otherwise
    def started!
      @stream_started = true
    end

    def started?
      !!@stream_started
    end
  end
end

# This illustrates how we can use a callbacks with the streaming channel
def callbacks_usage_demo
  puts 'started callbacks usage demo'
  chan = Concurrent::StreamingChannel.new(capacity: 100)
  chan.after_each_callbacks << ->(channel) { puts "Hi from callback with #{channel}" }

  ticker = Concurrent::Channel.tick(0.2)
  boom = Concurrent::Channel.after(1.02)
  Thread.new { ticker.inject(0) { |a, _e| chan << "."; a > 100 ? break : a + 1 }; chan.close }
  Thread.new { boom.take; puts "boom"; chan.close }
  chan.each { |m| print m }
  puts "ended"
end

callbacks_usage_demo

def stuck_streams_detection_demo
  # This illustrates how we can determine and manage stuck streams
  puts 'started stack streams detection demo'
  chan = Concurrent::StreamingChannel.new(capacity: 100)
  chan.after_each_callbacks << ->(channel) { puts "Hi from callback with #{channel}" }

  Thread.new do
    ~Concurrent::Channel.timer(1)
    puts 'some timeout reached, need to check if the channel started'
    chan.close unless chan.started?
  end
  puts "Is channel closed? #{chan.closed?}"
  chan.each { |m| print m }
  puts "Is channel closed? #{chan.closed?}"
  puts "ended"
end

stuck_streams_detection_demo

This piece of code produce:

started callbacks usage demo
.....boom
Hi from callback with #<Concurrent::StreamingChannel:0x00007fd28d919508>
ended
started stack streams detection demo
Is channel closed? false
some timeout reached, need to check if the channel started
Hi from callback with #<Concurrent::StreamingChannel:0x00007fd28d8abb20>
Is channel closed? true
ended
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant