shinyscorpion / task_bunny

TaskBunny is a background processing application written in Elixir and uses RabbitMQ as a messaging backend
MIT License
202 stars 30 forks source link

Reply to handling #49

Closed norbu09 closed 6 years ago

norbu09 commented 6 years ago

This is the first version of reply_to handling. It uses the reply_to option in AMQP to tell the worker where the outcome of a job should be written to.

It has stubs for correlation_id handling as well but they are not fully handed through the system yet.

norbu09 commented 6 years ago

Getting closer to what I need, this is where I am at now:

in my application I can use TaskBunny this way now:

defmodule Worker do
  @timeout Application.get_env(:worker, :timeout, 5000)
  require Logger

  def call(worker, job) do
    corr_id = random_string()
    task = Task.Supervisor.async(Worker.TaskSupervisor, fn ->
      receive do
        response ->
          response
        after @timeout ->
          {:error, :timeout}
        end
    end)
    app_id = task.pid |> :erlang.pid_to_list |> List.to_string
    TaskBunny.Job.enqueue!(worker, job, [reply_to: "Worker.Router", correlation_id: corr_id, app_id: app_id])
    Task.await(task, @timeout)
  end

  def cast(worker, job) do
    TaskBunny.Job.enqueue!(worker, job)
  end

  defp random_string(len \\ 8) do
    :crypto.strong_rand_bytes(len)
    |> Base.encode64
    |> String.replace(~r/\W/, "", global: true)
  end
end

In combination with a worker like this:

defmodule Worker.Router do
  use TaskBunny.Job
  require Logger

  def perform(message) do
    case message["meta"]["app_id"] do
      "undefined" ->
        Logger.info("response: #{inspect message}")
      pid_string ->
        pid = pid_string
        |> String.to_charlist
        |> :erlang.list_to_pid
        send(pid, message)
    end
    :ok
  end
end

I can now call a worker and get the result from it this way:

res = Worker.call(Worker.Store, %{"job" => %{"foo" => "bar", "two" => 1}})

Is this going into a direction that you are interested in having support for in TaskBunny?

ono commented 6 years ago

Hi @norbu09

Thanks for sharing your work. I can't really picture reply_to to be a generic function. The message you want to send from the job can vary. I don't think enqueue is the best place for most users to specify the parameter. In the end, you can just do it inside job.

defmodule SampleJob do
  use TaskBunny.Job

  def perform(args) do
    case MyBusiness.do_it(args) do
      {:ok, something} -> ResponseJob.enqueue!(something)
      {:error, something} -> OtherResponseJob.report(something)
    end
    :ok
  end
end

You can also have your own job template to make it DRY.

defmodule BaseJob do
  @callback do_perform(args) :: :ok | {:ok, any} | {:error, term}
  defmacro __using__(_options \\ []) do
    use TaskBunny.Job
    @behaviour BaseJob

    def perform(args) do
      case do_perform(args) do
        {:ok, something} -> ResponseJob.enqueue!(something)
        {:error, something} -> OtherResponseJob.report(something)
      end
      :ok
    end
  end
end

defmodule SampleJob do
  use BaseJob

  def do_perform(args) do
    ...
  end
end

There are many other ways to handle it in job layer.

We might consider more generic approach for the job reporter like the way we implemented failure backends. However I think it's still best to handle it on job layer so that any errors can be handled properly by TaskBunny.

norbu09 commented 6 years ago

I will give it some more thought and see if I can generalise it more or just run with what I have for now. Thanks for all the input and the amazing work on TaskBunny :)