r-lib / httr2

Make HTTP requests and process their responses. A modern reimagining of httr.
https://httr2.r-lib.org
Other
235 stars 56 forks source link

Add req_perform_open, which makes resp$body the underlying stream #521

Closed jcheng5 closed 2 weeks ago

jcheng5 commented 2 weeks ago

Alternative, lower-level approach to https://github.com/r-lib/httr2/pull/520. SSE support could be built on top--I'll take a stab at that too.

Example usage:

library(httr2)

# Define the API endpoint and your API key
api_endpoint <- "https://api.openai.com/v1/chat/completions"
api_key <- Sys.getenv("OPENAI_API_KEY")

# Build the request
response <- request(api_endpoint) %>%
  req_headers(
    "Content-Type" = "application/json",
    "Authorization" = paste("Bearer", api_key)
  ) %>%
  req_body_json(list(
    model = "gpt-4o",
    temperature = 0.7,
    stream = TRUE,
    messages = list(
      list(
        role = "system",
        content = "You're an incredibly verbose assistant."
      ),
      list(
        role = "user",
        content = "When did the modern Olympics start?"
      )
    )
  )) %>%
  req_perform_open()

while (isIncomplete(response$body)) {
  line <- readLines(response$body, n = 1)
  if (length(line) > 0) {
    message(line)
  } else {
    # If no data was available, wait a bit
    message("SLEEPING...\n")
    Sys.sleep(0.25)
  }
}

close(response$body)
jcheng5 commented 2 weeks ago

SSE equivalent usage:

library(httr2)

# Define the API endpoint and your API key
api_endpoint <- "https://api.openai.com/v1/chat/completions"
api_key <- Sys.getenv("OPENAI_API_KEY")

# Build the request
response <- request(api_endpoint) %>%
  req_headers(
    "Content-Type" = "application/json",
    "Authorization" = paste("Bearer", api_key)
  ) %>%
  req_body_json(list(
    model = "gpt-4o",
    temperature = 0.7,
    stream = TRUE,
    messages = list(
      list(
        role = "system",
        content = "You're an incredibly verbose assistant."
      ),
      list(
        role = "user",
        content = "When did the modern Olympics start?"
      )
    )
  )) %>%
  req_perform_open(blocking = FALSE)

while (isIncomplete(response$body)) {
  msg <- read_sse(response$body)
  if (!is.null(msg)) {
    cat(msg$data)
    cat("\n")
  } else {
    message("SLEEPING...\n")
    Sys.sleep(0.25)
  }
}

close(response$body)
jcheng5 commented 2 weeks ago

BTW, req_perform_connection(blocking=TRUE) and blocking=FALSE both make sense for both the readLines and read_sse scenarios. Use the former when you don't have anything better to do but wait, and you'll get the answer back with the least possible delay and maximum efficiency (no "busy wait"). Use the latter when you want to do other things while you wait.

Ideally we'd eventually have a true callback based async version that is both efficient and nonblocking.

jcheng5 commented 2 weeks ago

One issue we must address before merging is that resp_stream_sse only works if the response$body was opened in text mode. This is because it uses pushBack, which only works on text connections. We could avoid this restriction by implementing our own pushBack, or do what I did locally, which was to add a mode argument to req_perform_connection (and btw resp_stream_sse is also always UTF-8 per the specification) and make sure to use rt if you know you're going to resp_stream_sse.

hadley commented 2 weeks ago

And at a more meta level, we need to add some tests for resp_stream_sse() too. I think I should have time this afternoon.

jcheng5 commented 2 weeks ago

I'm not saying we need to implement this or, even if we want to, that it needs to be part of this PR, but I just wanted to point out two things about the SSE spec:

  1. There is a standard JavaScript API for consuming SSE, a transliteration into R would look something like:
es <- EventSource$new("http://localhost:5000")
es$addEventListener("message", \(e) { message("Message received: ", e$data) })
...
es$close()

I wonder if a high-level SSE wrapper like this (or a more idiomatic version) is something httr2 users would expect to see and/or find useful.

  1. The SSE spec has a mechanism for (client initiated) reconnecting of dropped connections, including tracking the last event that was successfully received and reporting that during reconnection. With this PR as-is, you could do it yourself, but it'd be a bit of an exercise.
hadley commented 2 weeks ago

No one else has asked for SSE support, so I don't think we need to do prospective work, but I'm certainly happy to continue to build on this API as we discover more about what we need.

I'm planning on merging this PR today.

jcheng5 commented 2 weeks ago

Looks great, thanks!