cyberchitta / openai_ex

Community maintained Elixir library for OpenAI API
https://hexdocs.pm/openai_ex
Apache License 2.0
132 stars 18 forks source link

Add `stream_timeout` option for SSE streams #88

Closed aramallo closed 3 months ago

aramallo commented 5 months ago

Describe the feature or improvement you're requesting

Currently there is no timeout when receiving the SSE streams, this means that an LLM serving using the OpenAI with limited resources (unlike OpenAI) might accept the request, enqueue it but take a very long time to start streaming results. At the moment this means we can get stuck waiting for a very long time.

In the following example, we would like the call to Stream.flat_map to timeout.

chat_stream = openai |> Chat.Completions.create(chat_req, stream: true)
chat_stream.body_stream |> Stream.flat_map(& &1) |> Enum.each(fn x -> IO.puts(inspect(x)) end)

Unless Finch (or Mint) have something to offer here a fix coule be implemented in module OpenaiEx.HttpSse at line line 56 which currently reads like this:

defp next_sse({acc, ref, task}) do
    receive do
      {:chunk, {:data, evt_data}, ^ref} ->
        {tokens, next_acc} = tokenize_data(evt_data, acc)
        {[tokens], {next_acc, ref, task}}

      {:done, ^ref} ->
        if acc != "", do: Logger.warning(inspect(Jason.decode!(acc)))
        {:halt, {acc, ref, task}}

      {:canceled, ^ref} ->
        Logger.info("Request canceled by user")
        {:halt, {acc, ref, task}}
    end
  end

The proposal would be to have a new OpenaiEx stream_timeout option that can be configured via OpenaiEx.with_stream_timeout/2.

We can then replace post/3 with

def post(openai = %OpenaiEx{}, url, json: json) do
    request = OpenaiEx.Http.build_post(openai, url, json: json)

    me = self()
    ref = make_ref()

    task =
      Task.async(fn ->
        on_chunk = create_chunk_handler(me, ref)

        request
        |> Finch.stream(Map.get(openai, :finch_name), nil, on_chunk, Http.request_options(openai))

        send(me, {:done, ref})
      end)

    status = receive(do: ({:chunk, {:status, status}, ^ref} -> status))
    headers = receive(do: ({:chunk, {:headers, headers}, ^ref} -> headers))

    stream_timeout = Map.get(openai, : stream_timeout)

    body_stream =
      Stream.resource(fn -> {"", ref, task, stream_timeout} end, &next_sse/1, fn {_data, _ref, task, _ timeout} ->
        Task.shutdown(task)
      end)

    %{task_pid: task.pid, status: status, headers: headers, body_stream: body_stream}
  end

and the previous function with

defp next_sse({acc, ref, task, timeout}) do
    receive do
      {:chunk, {:data, evt_data}, ^ref} ->
        {tokens, next_acc} = tokenize_data(evt_data, acc)
        {[tokens], {next_acc, ref, task, timeout}}

      {:done, ^ref} ->
        if acc != "", do: Logger.warning(inspect(Jason.decode!(acc)))
        {:halt, {acc, ref, task, timeout}}

      {:canceled, ^ref} ->
        Logger.info("Request canceled by user")
        {:halt, {acc, ref, task, timeout}}
   after
       timeout ->
          Logger.info("Request timeout")
           {:halt, {acc, ref, task, timeout}}
    end
  end

The above changes will make the stream fail on timeout elegantly but the caller will never know the stream halted due to a timeout. We can solve this by raising an exception

defp next_sse({acc, ref, task, timeout}) do
    receive do
      {:chunk, {:data, evt_data}, ^ref} ->
        {tokens, next_acc} = tokenize_data(evt_data, acc)
        {[tokens], {next_acc, ref, task, timeout}}

      {:done, ^ref} ->
        if acc != "", do: Logger.warning(inspect(Jason.decode!(acc)))
        {:halt, {acc, ref, task, timeout}}

      {:canceled, ^ref} ->
        Logger.info("Request canceled by user")
        {:halt, {acc, ref, task, timeout}}
   after
       timeout ->
           exit({:timeout,[timeout]})

    end
  end

Additional context

No response

restlessronin commented 5 months ago

@aramallo Thanks for pointing this out. Let me think on it some, and get back to you.

aramallo commented 5 months ago

@restlessronin If you fancy taking a look, I just did an implementation of this in a branch on my fork and it works well. I did it our of my original change and not out of your latest.

restlessronin commented 5 months ago

@aramallo Will do. Thanks for taking the time to work on it.. I'll circle around to this in a few days, when I do the next library update.

Madd0g commented 4 months ago

I'm having this problem where sometimes a request would get stuck forever.

I'm executing a command from iex and it just never becomes interactive again, I have to restart the process and run it again.

Weird that the underlying http client doesn't have any default timeout, most of the clients I've ever used have some retry mechanism when it can't connect.

restlessronin commented 4 months ago

@Madd0g are you having the same problem as @aramallo? Which openai api proxy are you using?

Madd0g commented 4 months ago

not sure it's the same, but sounds like it? I'm using groq and often get stuck requests.

restlessronin commented 3 months ago

To be clear, this is happening with the streaming chat completion call? The call itself succeeds, but the SSE stream pauses without disconnecting?

Madd0g commented 3 months ago

I am using streaming and print every token to the console. The call does not succeed - it doesn't print a single response token to the console and iex never becomes interactive again. I don't know if a connection is actually established or not.

restlessronin commented 3 months ago

can you show the code that's causing the issue? and verify if it's the latest version? v0.6.5 had some changes to how stream api errors are handled.

Madd0g commented 3 months ago

Oh I was on a much older version (0.5.8)

muh code... (changed slightly after the upgrade to 0.6.5)

defmodule MuhModule do
  alias OpenaiEx.Chat

  def complete_stream(prompt, messages \\ nil) do
    openai =
      OpenaiEx.new("muh_key")
      |> OpenaiEx.with_receive_timeout(10_000)
      |> OpenaiEx.with_base_url("https://api.groq.com/openai/v1")

    messages =
      case messages do
        nil ->
          [
            %{role: "user", content: prompt}
          ]

        messages ->
          messages ++ [%{role: "user", content: prompt}]
      end

    chat_req =
      Chat.Completions.new(
        model: "llama3-70b-8192",
        max_tokens: 450,
        messages: messages,
        temperature: 0.15,
        stop: ["</s>"]
      )

    stream = openai |> Chat.Completions.create(chat_req, stream: true)
    %{body_stream: body_stream} = stream
    body_stream
  end

  def complete_stream_with_handlers(
        prompt,
        messages \\ nil,
        handlers
      ) do
    onToken = handlers["onToken"]

    all_deltas =
      complete_stream(prompt, messages)
      |> Stream.flat_map(& &1)
      |> Stream.map(fn %{data: d} ->
        d |> Map.get("choices") |> Enum.at(0) |> Map.get("delta")
      end)
      |> Stream.filter(fn map -> map |> Map.has_key?("content") end)
      |> Stream.map(fn map -> map |> Map.get("content") end)
      |> Enum.reduce([], fn delta, acc ->
        if onToken do
          onToken.(delta)
        end

        [delta | acc]
      end)

    full_response =
      all_deltas
      |> Enum.reverse()
      |> Enum.join()

    full_response
  end
end

## and then I call it like this

handlers = %{
  # just prints the token
  "onToken" => fn token -> handle_token(token) end
}

messages = [
  %{
    "role" => "system",
    "content" => get_system()
  },
]

MuhModule.complete_stream_with_handlers("muh prompt", messages, handlers)
restlessronin commented 3 months ago

@Madd0g

My understanding is that a request sometimes works, but get stuck at other times. Is that correct? That sounds like what @aramallo described above.

My guess about why finch isn't timing out is that the http call actually succeeds within the timeout period. It's likely that the SSEs don't have a timeout applied to them, so it never times out there.

Assuming this is the case, does @aramallo's solution of having a separate timeout applied for SSE's work for you?

Madd0g commented 3 months ago

My understanding is that a request sometimes works, but get stuck at other times. Is that correct?

It works most of the time, yes.

Also, not sure if it matters, I run batches in a loop - I almost never see these failures during the batch (a run could take 20 minutes). Is that significant? I don't know.

I'm not sure about the solution, when it gets stuck I never see a single token (so it never gets stuck during a response). I think I had this problem with 2 different providers and not just groq. Can't help but wonder if they would allow a connection to just hang forever without closing it from the server side?

Assuming this is the case, does @aramallo's solution of having a separate timeout applied for SSE's work for you?

The solution means we need to handle a new timeout event somewhere in the stream handling, right? there are lots of commits on that other branch and it's hard to follow since I'm a beginner in elixir

restlessronin commented 3 months ago

@Madd0g

Also, not sure if it matters, I run batches in a loop - I almost never see these failures during the batch (a run could take 20 minutes). Is that significant? I don't know.

This doesn't impact on the solution to the stream hanging issue, but if you're running batches in a loop, might you be better served by not using SSE? Just use the vanilla synchronous endpoint? You'd avoid any hangs due to SSE problems.

I'm not sure about the solution, when it gets stuck I never see a single token (so it never gets stuck during a response). I think I had this problem with 2 different providers and not just groq. Can't help but wonder if they would allow a connection to just hang forever without closing it from the server side?

I guess you could try doing IO.puts in the lambda defined in 'create_chunk_handler' to see if there are actually bytes being received that are somehow not being accumulated in 'next_sse'.

The solution means we need to handle a new timeout event somewhere in the stream handling, right? there are lots of commits on that other branch and it's hard to follow since I'm a beginner in elixir

I would actually modify @aramallo's solution by 'throw'ing instead of 'exit'ing, and that would have to be handled at the call point of the streaming request.

Of course, the same semantics should then apply when a user 'cancel's a streaming request. So I would probably change that as well.

This breaks existing streaming request code, and although I have been lax with semver numbering, i'll likely bump to 0.7.0 as a result.

WDYT?

Madd0g commented 3 months ago

This doesn't impact on the solution to the stream hanging issue, but if you're running batches in a loop, might you be better served by not using SSE? Just use the vanilla synchronous endpoint? You'd avoid any hangs due to SSE problems.

I mean, it makes perfect sense, but I run this task manually and I like to see the text coming in, heh. It's not a critical/production system so I don't care much.

I guess you could try doing IO.puts in the lambda defined in 'create_chunk_handler' to see if there are actually bytes being received that are somehow not being accumulated in 'next_sse'.

I may have time to play with it during the weekend.

[...] solution by 'throw'ing instead of 'exit'ing, and that would have to be handled at the call point of the streaming request. WDYT?

you mean it would require a try-rescue? I remember reading that try-rescue is not idiomatic in elixir and should be avoided, but idk 🤷‍♂️ I recognize that mixing streaming and non-streaming flow is messy in any lang. I'm not against your proposal, but like I said I'm a beginner in elixir and not well versed.

restlessronin commented 3 months ago

I mean, it makes perfect sense, but I run this task manually and I like to see the text coming in, heh. It's not a critical/production system so I don't care much.

Got it.

I guess you could try doing IO.puts in the lambda defined in 'create_chunk_handler' to see if there are actually bytes being received that are somehow not being accumulated in 'next_sse'.

I may have time to play with it during the weekend.

Let me know if you see bytes actually coming in and not getting passed through. For now, the fix is based on the assumption that the server has somehow frozen on sending the bytes.

you mean it would require a try-rescue? I remember reading that try-rescue is not idiomatic in elixir and should be avoided, but idk 🤷‍♂️ I recognize that mixing streaming and non-streaming flow is messy in any lang. I'm not against your proposal, but like I said I'm a beginner in elixir and not well versed.

I'm no expert on Elixir/Erlang idiom either, although I have gotten used to the "let it crash" methodology. I'm working on a non-livebook app for the api at the moment, and i'll use that to test out the "try/rescue" approach vs "exit".

To me it seems cleaner to treat the frozen(canceled)-streaming flow as an exception - instead of trying to return some kind of "error" value in the stream itself. Or perhaps there's some way of working this without "try/rescue" that I have overlooked?

restlessronin commented 3 months ago

@aramallo and @Madd0g I have implemented the stream timeout with exception raising along the lines proposed by @aramallo (I replaced exit with raise). I also raise on user cancellation.

The userguide has been updated, as well as the streaming completion orderbot, to document usage of the exceptions for cancellation.

Since I have not run into the timeout situation myself, it would be really helpful if you guys could test the timeout code and confirm that it works as expected.

This has been released as v0.7.0