elixir-nx / nx

Multi-dimensional arrays (tensors) and numerical definitions for Elixir
2.65k stars 193 forks source link

Nx.Serving drops `batched_run` when using stream of `Nx.Batch` #1511

Closed NduatiK closed 4 months ago

NduatiK commented 4 months ago

Nx.Serving drops concurrent requests to batched_run when using a stream of Nx.Batch.

Minimal example

# Install deps
Mix.install(
  [
    {:nx, "~> 0.7.2"},
    {:kino, "~> 0.13.1"},
    {:exla, "~> 0.7.2"}
  ],
  config: [
    nx: [
      default_backend: EXLA.Backend
    ]
  ]
)

Example

defmodule MyDefn do
  import Nx.Defn

  defn multiply(x) do
    x * 2
  end
end

serving = Nx.Serving.new(fn opts -> Nx.Defn.jit(&MyDefn.multiply/1, opts) end)
:ok
:ok
Kino.start_child({
  Nx.Serving,
  serving: serving, name: MultiplyServing, batch_size: 4, batch_timeout: 1000
})

Task.async_stream(
  1..4,
  fn f ->
    IO.puts("start #{f}")

   # If we don't create a stream, the code works as expected.
    data = Stream.map([Nx.Batch.stack([Nx.tensor([1, 2, 3])])], & &1)

    Nx.Serving.batched_run(MultiplyServing, data)

    IO.puts("end #{f}")
  end,
  timeout: 60_000,
  # error only appears when concurrency is less than number of inputs
  max_concurrency: 2  
)
|> Enum.map(fn {:ok, results} -> results end)
|> Enum.to_list()
start 1
start 2
end 1
start 3
end 3
start 4
end 4
# Never terminates
NduatiK commented 4 months ago

This issue was first identified by an Elixir Discord user (proximal) who found that concurrent batched_run calls to a Whisper serving from multiple threads would never terminate.

# Install deps
Mix.install([
    {:exla, "~> 0.7.2"},
    {:bumblebee, "~> 0.5.3"},
    {:nx, "~> 0.7.2"},
    {:kino, "~> 0.13.1"}
  ],
  config: [
    nx: [
      default_backend: EXLA.Backend
    ]
  ]
)
# Configure serving & start
model_info = {:hf, "openai/whisper-tiny"}

{:ok, whisper} = Bumblebee.load_model(model_info)
{:ok, featurizer} = Bumblebee.load_featurizer(model_info)
{:ok, tokenizer} = Bumblebee.load_tokenizer(model_info)
{:ok, config} = Bumblebee.load_generation_config(model_info)

serving = Bumblebee.Audio.speech_to_text_whisper(
  whisper,
  featurizer,
  tokenizer,
  config,
  defn_options: [
    compiler: EXLA,
    partitions: false
  ],
  compile: [
    batch_size: 4
  ],
  stream: false,
  chunk_num_seconds: 30,
  timestamps: :segments,
  preallocate_params: true
)

Kino.start_child(
  {Nx.Serving,
   serving: serving,
   name: TestServing,
   batch_size: 4,
   batch_timeout: 100})
# Gets the path of 8 small (1 - 2 mb) mp4 files, these are all 30 - 70 second long videos with audio and dialogue
files = Path.wildcard("/Users/joss/Desktop/vids/*.mp4")
["/Users/joss/Desktop/vids/1.mp4", "/Users/joss/Desktop/vids/2.mp4",
 "/Users/joss/Desktop/vids/3.mp4", "/Users/joss/Desktop/vids/4.mp4",
 "/Users/joss/Desktop/vids/5.mp4", "/Users/joss/Desktop/vids/6.mp4",
 "/Users/joss/Desktop/vids/7.mp4", "/Users/joss/Desktop/vids/8.mp4"]
eval_all = fn -> 
  Task.async_stream(files, fn f ->
    IO.puts "start #{f}"

    {time, _caps} = :timer.tc(fn ->
      Nx.Serving.batched_run(TestServing, {:file, f}, &Nx.backend_transfer/1)
    end)

    time = time / 1_000_000

    IO.puts "end #{f} in #{time} seconds"
    {f, time}
  end, timeout: 600_000)
  |> Enum.map(fn {:ok, results} -> results end)
  |> Enum.to_list
end
start /Users/joss/Desktop/vids/1.mp4
start /Users/joss/Desktop/vids/2.mp4
start /Users/joss/Desktop/vids/3.mp4
start /Users/joss/Desktop/vids/4.mp4
start /Users/joss/Desktop/vids/5.mp4
start /Users/joss/Desktop/vids/6.mp4
start /Users/joss/Desktop/vids/7.mp4
start /Users/joss/Desktop/vids/8.mp4
end /Users/joss/Desktop/vids/4.mp4 in 2.482991 seconds
end /Users/joss/Desktop/vids/5.mp4 in 11.540121 seconds
end /Users/joss/Desktop/vids/3.mp4 in 18.112282 seconds
end /Users/joss/Desktop/vids/8.mp4 in 21.167669 seconds
end /Users/joss/Desktop/vids/7.mp4 in 22.279491 seconds
# Processing freezes
josevalim commented 4 months ago

Thank you! We should definitely fix this but keep in mind putting a batch inside the stream will remove the benefits of batching. I will add some docs around this as well.

josevalim commented 4 months ago

We should definitely fix this but keep in mind putting a batch inside the stream will remove the benefits of batching.

Ignore me. We officially supports streams of batches. I will look into it.

NduatiK commented 4 months ago

Awesome! Thanks