bitwalker / libgraph

A graph data structure library for Elixir projects
MIT License
524 stars 75 forks source link

Add a new sorting function `batch_topsort/1` #70

Closed akoutmos closed 5 months ago

akoutmos commented 1 year ago

First of all, thanks for the great library! Secondly, not sure if this is something that you would be interested in merging into libgraph, but figured I would share it :).

This function, similarly to topsort/1, returns a topological sort of the provided graph with the added benefit that it groups non-dependent vertices together into sublists. I have used something like this in the past when I needed to run workflows constructed with libgraph and wanted to execute non-dependent workflow steps concurrently.

As a simple example, given the following workflow:

flowchart
    2110325["add_to_new_customer_email_sequence"]
    23862866["persist_cdn_id_to_db"]
    38442252["send_user_welcome_email"]
    243566562["create_user_in_db"]
    256209870["upload_image_to_cdn"]
    243566562 --> 2110325
    243566562 --> 38442252
    243566562 --> 256209870
    256209870 --> 23862866

When you run a batch_topsort/1 you would get the following:

Graph.new()
|> Graph.add_edges(Workflow.workflow_definition())
|> Graph.batch_topsort()
[
  [:create_user_in_db],
  [:add_to_new_customer_email_sequence, :send_user_welcome_email, :upload_image_to_cdn],
  [:persist_cdn_id_to_db]
]

And then you can run multiple vertices concurrently via Task.async/1 and Task.await_many/1 like in the dummy implementation of the workflow and enjoy a nice speed up in total execution time:

defmodule Workflow do
  def workflow_definition do
    [
      {:create_user_in_db, :upload_image_to_cdn},
      {:create_user_in_db, :send_user_welcome_email},
      {:create_user_in_db, :add_to_new_customer_email_sequence},
      {:upload_image_to_cdn, :persist_cdn_id_to_db}
    ]
  end

  # :timer.tc(fn -> Workflow.execute_workflow_with_topsort() end)
  # Running :create_user_in_db
  # Running :upload_image_to_cdn
  # Running :send_user_welcome_email
  # Running :persist_cdn_id_to_db
  # Running :add_to_new_customer_email_sequence
  # {1604693, :ok}
  def execute_workflow_with_topsort do
    Graph.new()
    |> Graph.add_edges(workflow_definition())
    |> Graph.topsort()
    |> Enum.each(fn step ->
      execute_workflow_step(step)
    end)
  end

  # :timer.tc(fn -> Workflow.execute_workflow_with_batch_topsort() end)
  # Running :create_user_in_db
  # Running :add_to_new_customer_email_sequence
  # Running :send_user_welcome_email
  # Running :upload_image_to_cdn
  # Running :persist_cdn_id_to_db
  # {1003964, :ok}
  def execute_workflow_with_batch_topsort do
    Graph.new()
    |> Graph.add_edges(workflow_definition())
    |> Graph.batch_topsort()
    |> Enum.each(fn steps ->
      steps
      |> Enum.map(fn step ->
        Task.async(fn -> execute_workflow_step(step) end)
      end)
      |> Task.await_many()
    end)
  end

  defp execute_workflow_step(:create_user_in_db) do
    IO.puts("Running :create_user_in_db")
    Process.sleep(100)
  end

  defp execute_workflow_step(:upload_image_to_cdn) do
    IO.puts("Running :upload_image_to_cdn")
    Process.sleep(800)
  end

  defp execute_workflow_step(:send_user_welcome_email) do
    IO.puts("Running :send_user_welcome_email")
    Process.sleep(300)
  end

  defp execute_workflow_step(:add_to_new_customer_email_sequence) do
    IO.puts("Running :add_to_new_customer_email_sequence")
    Process.sleep(300)
  end

  defp execute_workflow_step(:persist_cdn_id_to_db) do
    IO.puts("Running :persist_cdn_id_to_db")
    Process.sleep(100)
  end
end
akoutmos commented 9 months ago

Giving this PR a friendly bump :)

bitwalker commented 5 months ago

Thanks!