hanami / events

[Experimental] Events framework for Hanami
MIT License
43 stars 7 forks source link

First-Class Support for Subscribers in Separate Processes #75

Open ianks opened 6 years ago

ianks commented 6 years ago

I have recently had the pleasure of building a Google Cloud Pub/Sub adapter for hanami-events. Overall, the experience was very straightforward and I think hanami-events fits perfecting with the pub/sub model.

One thing I would like to add to hanami-events is:

First-Class Support for Subscribers in Separate Processes

In most production systems, workers/subscribers will need to scale independently of the web process. For example, on Monday we may need to process 10x the events we do on Sunday.

To address the need for independent scaling, I think having a first-class API for a "Runner/Manager" should be provided. By adding this, we can build a general-purpose CLI for running these processes, which will handle low-level details that the adapter should not have to care about (Signal Handlers, health checks, daemonization, etc).

With this CLI, users could simply run hanami worker run --adapter sqsredis/pubsub/etc.

In order to make this possible, we should begin designing the interface which adapters should implement to provide this functionality. My first (rough) sketch looks something like this:

# Singleton class for managing event subscribers
class Hanami::Events::Runner
  # Runners can define their own options (which can be passed via the CLI)
  option :threads, type: Types::Integer, desc: 'Size of the thread pool'

  def start(options)
    # Called to start the runner (subscribes to topics/etc)
  end

  def pause # or pause
    # Will be called on TSTP
    #
    # Stop processing new events (unsubscribe from topics, etc)
    #
    # For example, if you needed to restart a Redis server you could first
    # call pause to allow for a safe restart of the runner.
  end

  def gracefully_shutdown
    # Will be called on SIGINT, TERM
    #
    # Responsible for gracefully shutting down the runner. This may involve
    # waiting for messages to finish processing, etc. If this method succesfully
    # runs, there should be no potential for undefined behavior.
  end

  def force_shutdown!
    # (optional) Kill all subscribers
    #
    # If a gracefully_shutdown times out or fails, this method will be called.
    # It is a last ditch effort to salvage resources and is used as a "damage
    # control" mechanism.
    #
    # Should we provide a mechanism to report what caused a forced shutdown?
  end

  def ready?
    # Is the runner ready to start processing events?
    #
    # Starting the runner may be asyncronous (spawning threads, etc)
    # Instead of making `start` blocking, expose a way to probe for readiness
    # After this check occurs, `healthy?` will be honored.
    #
    # This pattern is similar to Kubernete's healthiness and readniess probes
    # and is much more useful than only having a `healthy?` check
    #
    # See: https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/
  end

  def healthy?
    # Is the runner healthy enough to keep going?
    #
    # Indicated whether or not the runner is healthy, useful for determing
    # whether or not the process should be restarted
  end

  def print_debug_info(stream = STDOUT)
    # Print out some useful debugging information
    #
    # Called on TTIN to inspect the state of the runner in a terminal friendly
    # output. This provides a simple debugging if the runner gets stuck for
    # some reason.
    #
    # See: https://github.com/mperham/sidekiq/blob/e447dae961ebc894f12848d9f33446a07ffc67dc/bin/sidekiqload#L74
  end
end

By abstracting our some conventions, we can create powerful tooling to empower Hanami users (maybe even a pretty dashboard for managing things one day :wink:).

I would love to help make this a reality, so please provide as much feedback as possible!

davydovanton commented 6 years ago

hey, thanks for this issue. I really love this idea and want to see it in 0.3 version. we can create Hanami::Events::Runner class which take event object and will start server, but I'm not sure that putting all config information to CLI is a good idea. Also, what we will do if we have more than one events instance in the project?

ianks commented 6 years ago

what we will do if we have more than one events instance in the project?

That's a good question. I wonder if that's a common use case? Currently, I have it configured so a runner is only responsible for a single events instance:

https://github.com/adHawk/hanami-events-cloud_pubsub

davydovanton commented 6 years ago

so, we don't have a global state. It's mean that you can create 1+ instance of hanami events and use it. Also, I will be happy to stay with it. We can create runner without global state, just inject event object into constructor:

runner = Hanami::Events::Runner.new(event_object, **options)
runner.start

It will help us use different events instances with different ways (local memory pubsub, persistence pubsub, etc). WDYT?

davydovanton commented 6 years ago

also, one more thing here: I think we need to use different runners for different types of events. Something like thread runner for memory async and server runner for redis events (like sidekiq).

Not sure but I think we can use one runner with different adapters in thread and in CLI.

ianks commented 6 years ago

Yeah, I think we should decouple events from the runner. events should just register the subscribers, while the runner can actually provide the events with an adapter to actually subscribe to the events.

ianks commented 6 years ago

I think we need to use different runners for different types of events.

Would this mean you would need two runners if you had users.created and users.deleted events?

davydovanton commented 6 years ago

@ianks I created PoC for runner and merged it. Do you have any ideas what we need to do next? From my perspective: middlewares and test it with other adapters.

Also, I checked your runner and events fork. Maybe we need to think about Listener in the current implementation. But I'm not sure that it's a great idea for PoC now