digital-fabric / polyphony

Fine-grained concurrency for Ruby
https://www.rubydoc.info/gems/polyphony
MIT License
654 stars 17 forks source link

Pub/sub with Polyphony instead of threads #127

Open gottlike opened 2 weeks ago

gottlike commented 2 weeks ago

I've looked at all the examples and a bunch of docs, but I can't wrap my head around how to do something like this the right way:

# frozen_string_literal: true

require 'bundler/setup'
require 'redis'
require 'polyphony/adapters/redis'

10_000.times do |i|
  spin do
    redis = Redis.new
    msg = redis.blpop("redis-channel-#{i}")
    puts msg
  end

  spin do
    redis = Redis.new

    # Do some IO work (few milliseconds or many seconds)

    redis.lpush("redis-channel-#{i}", Time.now.to_i)
  end
end

suspend

My use case here, is trying to read from a Redis stream with XREADGROUP and then spin up workers that process the retrieved stream entries as fast as possible. The "Do some IO work" part could mean to have another blpop/lpush combination that should block, like kind of a REST call over Redis. Can someone help? Am also open to booking consulting hours for this.. I just can't get this to work 🙈

gottlike commented 2 weeks ago

I just found a tool that does exactly what I needed: https://docs.nats.io/nats-concepts/core-nats/reqreply/reqreply_walkthrough

Maybe Polyphony can enhance some things here, too (Ruby client: https://github.com/nats-io/nats-pure.rb)? But in any case, I think this solves my question. Will leave the issue open, in case Polyphony can make things even better with NATS 😁

gottlike commented 2 weeks ago

Alright, so I am indeed pretty happy with NATS for my use case. Now I would like to use Polyphony instead of threads and ran into issues. Code for reproduction can be found here: https://gist.github.com/gottlike/e9cfed216ea7637c1c9a4ef031eb4c9e

You can run everything like this:

  1. docker run --rm -p 4222:4222 --name nats nats
  2. ruby worker_sub.rb
  3. ruby worker.rb
  4. ruby benchmark.rb

There's two issues:

When using threads, everything works fine.

noteflakes commented 2 weeks ago

OK, so you're integrating with nats.io. I saw your gist uses the nats-pure gem (I guess because the nats.rb gem is based on EventMachine?) Anyway, I took a look: https://github.com/nats-io/nats-pure.rb/blob/0f8ca2f4416f53864e663862c3a5b5def0ac2758/lib/nats/io/client.rb#L1922-L1938

      def read(max_bytes, deadline=nil)
        begin
          return @socket.read_nonblock(max_bytes)
        rescue ::IO::WaitReadable
          if ::IO.select([@socket], nil, nil, deadline)
            retry
          else
            raise NATS::IO::SocketTimeoutError
          end
        rescue ::IO::WaitWritable
          if ::IO.select(nil, [@socket], nil, deadline)
            retry
          else
            raise NATS::IO::SocketTimeoutError
          end
        ...

They do a non-blocking read, and if no data is ready for reading they use select to wait for incoming data or timeout. This most probably won't work properly with Polyphony. There might be other places in their code where they take liberty (so-to-speak) with the underlying concurrency model.

I'll try to take another look tonight and see if I can make an adapter for it.

gottlike commented 2 weeks ago

Thanks @noteflakes! I use nats-pure, because the nats gem is not maintained anymore. In their Slack they said that nats-pure will replace nats at some point.

gottlike commented 2 weeks ago

Something else popped up, with regard to running my script in Docker:

/root/vendor/bundle/ruby/3.2.0/gems/polyphony-1.6/lib/polyphony.rb:8:in `initialize': Operation not permitted - Operation not permitted (Errno::EPERM)
    from /root/vendor/bundle/ruby/3.2.0/gems/polyphony-1.6/lib/polyphony.rb:8:in `new'
    from /root/vendor/bundle/ruby/3.2.0/gems/polyphony-1.6/lib/polyphony.rb:8:in `<top (required)>'
    from <internal:/usr/lib/ruby/3.2.0/rubygems/core_ext/kernel_require.rb>:38:in `require'
    from <internal:/usr/lib/ruby/3.2.0/rubygems/core_ext/kernel_require.rb>:38:in `require'
    from /root/mq.rb:6:in `<top (required)>'
    from function.rb:8:in `require_relative'
    from function.rb:8:in `<main>'

When I run the script locally via ruby script.rb it works, but in Docker with ENTRYPOINT ["ruby", "script.rb"] it doesn't.

noteflakes commented 2 weeks ago

What kernel version are you on?

noteflakes commented 2 weeks ago

Looking at https://www.man7.org/linux/man-pages/man2/io_uring_setup.2.html:

EPERM /proc/sys/kernel/io_uring_disabled has the value 2, or it has the value 1 and the calling process does not hold the CAP_SYS_ADMIN capability or is not a member of /proc/sys/kernel/io_uring_group.

noteflakes commented 2 weeks ago

You may want to prevent Polyphony from using io_uring, by setting POLYPHONY_LIBEV=1 in your ENV.

gottlike commented 2 weeks ago

I'm using Alpine 3.18 (Kernel 5.15). I just passed this to my Docker process with --env POLYPHONY_LIBEV=1, but the error persists.

gottlike commented 2 weeks ago

Kernel 6.1.. shouldn't trust the AI's response 😆

gottlike commented 2 weeks ago

Ah, never mind. I had to put this before the bundle install in the Dockerfile, like so: POLYPHONY_LIBEV=1 bundle install

Now it works 👍