ruby-concurrency / concurrent-ruby

Modern concurrency tools including agents, futures, promises, thread pools, supervisors, and more. Inspired by Erlang, Clojure, Scala, Go, Java, JavaScript, and classic concurrency patterns.
https://ruby-concurrency.github.io/concurrent-ruby/
Other
5.71k stars 420 forks source link

Use channel as a stremming object #895

Open kvokka opened 4 years ago

kvokka commented 4 years ago

I found that Concurrent::Channel is great for representing stream data.

Initial problem was, that I needed to stream data in JRuby with Sinatra (so i stuck with Puma and was not able to use embedded sinatra stream module, which require Rainbows).

Concurrent::Channel fits almost ideally, but it needs to have some protection from unstarted stream && some callbacks in the end. Idk if it is only my implementation related, it is required in general. Anyway, I feel that it is a good usage of a lib. Before making a PR wanted to ask you first if this is worth it, or it is my very specific use case related only from your perspective.

Sample code with usage examples:

require "concurrent-edge"

module Concurrent
  class StreamingChannel < Channel
    # Array, where each element must respond_to :call
    def after_each_callbacks
      @after_each_callbacks ||= []
    end

    # After first data out we mark the stream as started.
    # This allows to determine in other threads which channels should be killed as inactive
    def each
      raise ArgumentError.new('no block given') unless block_given?

      item, more = do_next
      yield(item) unless item == Concurrent::NULL
      return unless more

      started!
      super
    ensure
      after_each_callbacks.each { |cb| cb.call(self) }
    end

    # if we processed at least something, we assume, that the streaming process
    # initiated correctly, otherwise
    def started!
      @stream_started = true
    end

    def started?
      !!@stream_started
    end
  end
end

# This illustrates how we can use a callbacks with the streaming channel
def callbacks_usage_demo
  puts 'started callbacks usage demo'
  chan = Concurrent::StreamingChannel.new(capacity: 100)
  chan.after_each_callbacks << ->(channel) { puts "Hi from callback with #{channel}" }

  ticker = Concurrent::Channel.tick(0.2)
  boom = Concurrent::Channel.after(1.02)
  Thread.new { ticker.inject(0) { |a, _e| chan << "."; a > 100 ? break : a + 1 }; chan.close }
  Thread.new { boom.take; puts "boom"; chan.close }
  chan.each { |m| print m }
  puts "ended"
end

callbacks_usage_demo

def stuck_streams_detection_demo
  # This illustrates how we can determine and manage stuck streams
  puts 'started stack streams detection demo'
  chan = Concurrent::StreamingChannel.new(capacity: 100)
  chan.after_each_callbacks << ->(channel) { puts "Hi from callback with #{channel}" }

  Thread.new do
    ~Concurrent::Channel.timer(1)
    puts 'some timeout reached, need to check if the channel started'
    chan.close unless chan.started?
  end
  puts "Is channel closed? #{chan.closed?}"
  chan.each { |m| print m }
  puts "Is channel closed? #{chan.closed?}"
  puts "ended"
end

stuck_streams_detection_demo

This piece of code produce:

started callbacks usage demo
.....boom
Hi from callback with #<Concurrent::StreamingChannel:0x00007fd28d919508>
ended
started stack streams detection demo
Is channel closed? false
some timeout reached, need to check if the channel started
Hi from callback with #<Concurrent::StreamingChannel:0x00007fd28d8abb20>
Is channel closed? true
ended