openai / openai-go

The official Go library for the OpenAI API
Apache License 2.0
294 stars 19 forks source link

Assistant SubmitToolOutputsStreaming breaks with certain payloads #99

Open joe-yeomans opened 4 days ago

joe-yeomans commented 4 days ago

I've come across an issue when submitting toll outputs and streaming the response back to the client. The stream just seems to end straight away and doesn't send any data back. Alternatively, when I submit the tool outputs via a standard http call, the responses stream back just fine.

This is the tool output I'm sending back, this is just test data so feel free to use it. products.json

This is the code I use to submit the tool outputs using the SDK:

toolStream := a.client.Beta.Threads.Runs.SubmitToolOutputsStreaming(ctx, threadID, data.ID, openai.BetaThreadRunSubmitToolOutputsParams{
    ToolOutputs: openai.F(toolOutputs),
})

if toolStream.Err() != nil {
    l.Error(fmt.Errorf("error submitting tool outputs: %w", toolStream.Err()).Error())
    continue
}

a.handleStreamSSE(ctx, l, threadID, dal, toolStream, w, flusher)

There is no error from the stream, it just ends straight away.

This is the http code I'm using where the streaming works:

client := http.Client{}

payload2 := ToolReq{
    ToolOutputs: toolOutputs,
    Stream:      true,
}

payloadBytes, err := json.Marshal(payload2)
if err != nil {
    l.Error(fmt.Errorf("error marshalling payload: %w", err).Error())
    continue
}

req, err := http.NewRequest("POST", fmt.Sprintf("https://api.openai.com/v1/threads/%s/runs/%s/submit_tool_outputs", threadID, data.ID), bytes.NewBuffer(payloadBytes))
if err != nil {
    l.Error(fmt.Errorf("error creating request: %w", err).Error())
    continue
}

req.Header.Add("OpenAI-Beta", "assistants=v2")
req.Header.Add("Content-Type", "application/json")
req.Header.Add("Authorization", "Bearer KEY")

res, err := client.Do(req)
if err != nil {
    l.Error(fmt.Errorf("error submitting tool outputs: %w", err).Error())
    continue
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
    l.Error(fmt.Errorf("error submitting tool outputs: %d", res.StatusCode).Error())
    continue
}

reader := io.Reader(res.Body)
bufReader := bufio.NewReader(reader)

for {
    line, err := bufReader.ReadString('\n')
    if err != nil {
        if err == io.EOF {
            break
        }
        l.Error(fmt.Errorf("error reading response: %w", err).Error())
    }

    line = strings.TrimSpace(line)
    if line == "" {
        continue
    }

    // Check if the line contains an event or data
    if strings.HasPrefix(line, "event:") {
        event := strings.TrimSpace(line[len("event:"):])
        l.Debug(fmt.Sprintf("Event: %s", event))
    }
}

I'm not sure what other information you'd require, but if you do need anything else please let me know. I'm happy to help out.

jacobzim-stl commented 4 days ago

Thanks so much for the in depth bug report! I'll do some investigating

joe-yeomans commented 4 days ago

I did some more digging and found this: When adding a print statement in packages/ssestream/stream.go on line 71:

    for s.scn.Scan() {
        txt := s.scn.Bytes()

        fmt.Println("txt: ", string(txt))

I get the following output:

txt:  event: thread.run.step.completed
txt:  event: done
txt:  data: [DONE]

So it looks like it's completing the stream straight away.