openai / openai-node

The official Node.js / Typescript library for the OpenAI API
https://www.npmjs.com/package/openai
Apache License 2.0
7.31k stars 762 forks source link

How to use stream: true? #18

Closed raphaelrk closed 1 year ago

raphaelrk commented 1 year ago

I'm a bit lost as to how to actually use stream: true in this library.

Example incorrect syntax:

const res = await openai.createCompletion({
  model: "text-davinci-002",
  prompt: "Say this is a test",
  max_tokens: 6,
  temperature: 0,
  stream: true,
});

res.onmessage = (event) => {
  console.log(event.data);
}
schnerd commented 1 year ago

Unfortunately streaming is not currently supported by this library 😢

I'm not sure if the SDK auto-generation tool we use (openai-generator) is able to support event streams. Will have to do more research.

The python openai package does support it: https://pypi.org/project/openai/

If anyone knows of a good way to consume server-sent events in Node (that also supports POST requests), please share!

keraf commented 1 year ago

If anyone knows of a good way to consume server-sent events in Node (that also supports POST requests), please share!

This can be done with the request method of Node's https API. You can create a request with the options you want (such as POST as a method) and then read the streamed data using the data event on the response. You can also use the close event to know when the request has finished.

schnerd commented 1 year ago

Thanks @keraf, we'll try to look into getting this working soon.

smervs commented 1 year ago

You can use axios stream response type. But you still need to parse the returned data.

const res = await openai.createCompletion({
  model: "text-davinci-002",
  prompt: "Say this is a test",
  max_tokens: 6,
  temperature: 0,
  stream: true,
}, { responseType: 'stream' });

res.on('data', console.log)
LasseSander commented 1 year ago

Thanks! @smervs currently getting: Property 'on' does not exist on type 'AxiosResponse<CreateCompletionResponse, any>' when trying though - have you had any luck?

smervs commented 1 year ago

Thanks! @smervs currently getting: Property 'on' does not exist on type 'AxiosResponse<CreateCompletionResponse, any>' when trying though - have you had any luck?

can you try this?

res.data.on('data', console.log)
mattgabor commented 1 year ago

@smervs your code is working for me, but it logs as

<Buffer 64 61 74 61 3a 20 7b 22 69 64 22 3a 20 22 63 6d 70 6c 2d 36 4a 6e 56 35 4d 70 4d 41 44 4f 41 61 56 74 50 64 30 56 50 72 45 42 4f 62 34 48 54 6c 22 2c ... 155 more bytes>

Do you know how to parse this response?

smervs commented 1 year ago

@smervs your code is working for me, but it logs as

<Buffer 64 61 74 61 3a 20 7b 22 69 64 22 3a 20 22 63 6d 70 6c 2d 36 4a 6e 56 35 4d 70 4d 41 44 4f 41 61 56 74 50 64 30 56 50 72 45 42 4f 62 34 48 54 6c 22 2c ... 155 more bytes>

Do you know how to parse this response?

here

res.data.on('data', data => console.log(data.toString()))
brianfoody commented 1 year ago

This format still waits and gives you the entire response at the end though no? Is there not a way to get the results as they stream back as per the OpenAI frontend?

Awendel commented 1 year ago

I second this, streaming experience is currently not good and only seems to return all chunks in bulk instead of as they come in.

This is especially problematic with large responses, where it takes a long time to finish - a much better user experience would be to show early tokens as they come in - really just being able to match Playground UX.

A pure HTTP example using request / curl would also be fine for now, would be happy to create a higher level utility function once I see a working example

Awendel commented 1 year ago

I solved it using the inbuilt node http / https module:

const prompt = "Sample prompt. What's 2+2?"

const req = https.request({
    hostname:"api.openai.com",
    port:443,
    path:"/v1/completions",
    method:"POST",
    headers:{
        "Content-Type":"application/json",
        "Authorization":"Bearer "+ KEY_API  
    }
}, function(res){
    res.on('data', (chunk) => {
        console.log("BODY: "+chunk);
    });
    res.on('end', () => {
        console.log('No more data in response.');
    });
})

const body = JSON.stringify({
    model:"text-davinci-003",
    prompt:prompt,
    temperature:0.6,
    max_tokens:512,
    top_p:1.0,
    frequency_penalty:0.5,
    presence_penalty:0.7,
    stream:true
})

req.on('error', (e) => {
    console.error("problem with request:"+e.message);
        });

req.write(body)

req.end()
FaceMr commented 1 year ago

java okHttpClient

BufferedSource source = response.body().source(); Buffer buffer = new Buffer(); StringBuilder result = new StringBuilder(); while (!source.exhausted()) { long count = response.body().source().read(buffer, 8192); // handle data in buffer. String r = buffer.readUtf8(); log.info("result:" + r); result.append(r); buffer.clear(); }

result eg : (非常多的这样的数据) data: {"id": "cmpl-xxxx", "object": "text_completion", "created": 1672230176, "choices": [{"text": "\u672f", "index": 0, "logprobs": null, "finish_reason": null}], "model": "text-davinci-003"} data: {"id": "cmpl-xxxx", "object": "text_completion", "created": 1672230176, "choices": [{"text": "\uff1a", "index": 0, "logprobs": null, "finish_reason": null}], "model": "text-davinci-003"}

Awendel commented 1 year ago

Yes I also found this strange, sometimes the OpenAI API returns multiple segments of data: {} that are not comma seperated and hence hard to parse as JSON What I did: string replace all "data: {" with ", {" instead of the first occurence (there just use "{")

then it can be parsed via JSON.parse, and one can extract all the text parts via .choices[0].text

ghost commented 1 year ago

In my use case streams is more useful for the request data though, so that you can concatenate the results from different requests.

There is no dependency here apart from dotenv.

This is for the response anyways. Uses fetch which is now built into node v19 (and prev. versions using experimental flags)

See code ```js import * as dotenv from 'dotenv'; // I just used a story as a string with backticks import { text } from './string.mjs'; dotenv.config(); const apiUrl = 'https://api.openai.com/v1/completions'; const apiKey = process.env.OPENAI_API_KEY; const fetchOptions = { method: 'POST', headers: { Accept: 'application/json', 'Content-Type': 'application/json', Authorization: `Bearer ${apiKey}`, }, body: JSON.stringify({ model: 'text-davinci-003', //queues the model to return a summary, works fine. prompt: `Full Text: ${text} Summary:`, temperature: 0, max_tokens: 1000, presence_penalty: 0.0, stream: true, // stop: ['\n'], }), }; fetch(apiUrl, fetchOptions).then(async (response) => { const r = response.body; if (!r) throw new Error('No response body'); const d = new TextDecoder('utf8'); const reader = await r.getReader(); let fullText = '' while (true) { const { value, done } = await reader.read(); if (done) { console.log('done'); break; } else { const decodedString = d.decode(value); console.log(decodedString); try { //fixes string not json-parseable otherwise fullText += JSON.parse(decodedString.slice(6)).choices[0].text; } catch (e) { // the last line is data: [DONE] which is not parseable either, so we catch that. console.log( e, '\n\n\n\n' 'But parsed string is below\n\n\n\n', ); console.log(fullText); } } } }); ```

Also simplest code without any library:

See code ```js /* eslint-disable camelcase */ import * as dotenv from 'dotenv'; import { text } from './string.mjs'; //populates `process.env` with .env variables dotenv.config(); const apiUrl = 'https://api.openai.com/v1/completions'; const apiKey = process.env.OPENAI_API_KEY; const fetchOptions = { method: 'POST', headers: { Accept: 'application/json', 'Content-Type': 'application/json', Authorization: `Bearer ${apiKey}`, }, body: JSON.stringify({ model: 'text-davinci-003', prompt: `Full Text: ${text} Summary:`, temperature: 0, max_tokens: 1000, presence_penalty: 0.0, // stream: true, // stop: ['\n'], }), }; fetch(apiUrl, fetchOptions).then(async (response) => { const r = await response.json(); console.log(r); }); ```
gfortaine commented 1 year ago

Many thanks for this very insightful discussion 👍

As a side note, it looks like that one could consume Server-Sent Events in Node and at the same supports POST requests (even if it is not spec compliant given that only GET requests should be allowed) cc @schnerd :

@microsoft/fetch-event-source

launchdarkly-eventsource

However, it appears that we would lose all the benefits of SDK auto-generation tool. Moreover, it seems that the only TS generator supporting stream at the time of writing is the axios one (typescript-fetch doesn’t expose a method to consume the body as stream).

Hence, @smervs' answer is perfectly valid and should be the accepted one. However, we could enhance it, especially regarding the parser because a few options exist. By example, if we take the one from a customized @microsoft/fetch-event-source (note : the package has been specially retrofitted for the purpose by exporting ./parse), here is the result :

http://www.github.com/gfortaine/fortbot


import { Configuration, OpenAIApi } from "openai";
import * as parse from "@fortaine/fetch-event-source/parse";

const configuration = new Configuration({
  apiKey: process.env.OPENAI_API_KEY,
});
const openai = new OpenAIApi(configuration);

const prompt = "Hello world";
// https://help.openai.com/en/articles/4936856-what-are-tokens-and-how-to-count-them
const max_tokens = 4097 - prompt.length;

const completion = await openai.createCompletion(
  {
    model: "text-davinci-003",
    max_tokens,
    prompt,
    stream: true,
  },
  { responseType: "stream" }
);

completion.data.on(
  "data",
  parse.getLines(
    parse.getMessages((event) => {
      const { data } = event;

      // https://beta.openai.com/docs/api-reference/completions/create#completions/create-stream
      if (data === "[DONE]") {
        process.stdout.write("\n");
        return;
      }

      const { text } = JSON.parse(data).choices[0];
      process.stdout.write(text);
    })
  )
);
schnerd commented 1 year ago

@gfortaine we actually use @microsoft/fetch-event-source for the playground to do streaming with POST 👍

Thank you all for sharing your solutions here! I agree that @smervs solution currently looks like the best option available for the openai-node package. Here's a more complete example with proper error handling and no extra dependencies:

try {
    const res = await openai.createCompletion({
        model: "text-davinci-002",
        prompt: "It was the best of times",
        max_tokens: 100,
        temperature: 0,
        stream: true,
    }, { responseType: 'stream' });

    res.data.on('data', data => {
        const lines = data.toString().split('\n').filter(line => line.trim() !== '');
        for (const line of lines) {
            const message = line.replace(/^data: /, '');
            if (message === '[DONE]') {
                return; // Stream finished
            }
            try {
                const parsed = JSON.parse(message);
                console.log(parsed.choices[0].text);
            } catch(error) {
                console.error('Could not JSON parse stream message', message, error);
            }
        }
    });
} catch (error) {
    if (error.response?.status) {
        console.error(error.response.status, error.message);
        error.response.data.on('data', data => {
            const message = data.toString();
            try {
                const parsed = JSON.parse(message);
                console.error('An error occurred during OpenAI request: ', parsed);
            } catch(error) {
                console.error('An error occurred during OpenAI request: ', message);
            }
        });
    } else {
        console.error('An error occurred during OpenAI request', error);
    }
}

This could probably be refactored into a streamCompletion helper function (that uses either callbacks or es6 generators to emit new messages).

Apologies there's not an easier way to do this within the SDK itself – the team will continue evaluating how to get this added natively, despite the lack of support in the current sdk generator tool we're using.

gfortaine commented 1 year ago

@schnerd Please find a PR : https://github.com/openai/openai-node/pull/45, as well as an updated example. Comments are welcome 👍 :

http://www.github.com/gfortaine/fortbot


import { Configuration, OpenAIApi } from "@fortaine/openai";
import { streamCompletion } from "@fortaine/openai/stream";

const configuration = new Configuration({
  apiKey: process.env.OPENAI_API_KEY,
});
const openai = new OpenAIApi(configuration);

try {
  const completion = await openai.createCompletion(
    {
      model: "text-davinci-003",
      max_tokens: 100,
      prompt: "It was the best of times",
      stream: true,
    },
    { responseType: "stream" }
  );

  for await (const message of streamCompletion(completion.data)) {
    try {
      const parsed = JSON.parse(message);
      const { text } = parsed.choices[0];

      process.stdout.write(text);
    } catch (error) {
      console.error("Could not JSON parse stream message", message, error);
    }
  }

  process.stdout.write("\n");
} catch (error) {
  if (error.response?.status) {
    console.error(error.response.status, error.message);

    for await (const data of error.response.data) {
      const message = data.toString();

      try {
        const parsed = JSON.parse(message);

        console.error("An error occurred during OpenAI request: ", parsed);
      } catch (error) {
        console.error("An error occurred during OpenAI request: ", message);
      }
    }
  } else {
    console.error("An error occurred during OpenAI request", error);
  }
}
gfortaine commented 1 year ago

@gfortaine we actually use @microsoft/fetch-event-source for the playground to do streaming with POST 👍

Thank you all for sharing your solutions here! I agree that @smervs solution currently looks like the best option available for the openai-node package. Here's a more complete example with proper error handling and no extra dependencies:

try {
    const res = await openai.createCompletion({
        model: "text-davinci-002",
        prompt: "It was the best of times",
        max_tokens: 100,
        temperature: 0,
        stream: true,
    }, { responseType: 'stream' });

    res.data.on('data', data => {
        const lines = data.toString().split('\n').filter(line => line.trim() !== '');
        for (const line of lines) {
            const message = line.replace(/^data: /, '');
            if (message === '[DONE]') {
                return; // Stream finished
            }
            try {
                const parsed = JSON.parse(message);
                console.log(parsed.choices[0].text);
            } catch(error) {
                console.error('Could not JSON parse stream message', message, error);
            }
        }
    });
} catch (error) {
    if (error.response?.status) {
        console.error(error.response.status, error.message);
        error.response.data.on('data', data => {
            const message = data.toString();
            try {
                const parsed = JSON.parse(message);
                console.error('An error occurred during OpenAI request: ', parsed);
            } catch(error) {
                console.error('An error occurred during OpenAI request: ', message);
            }
        });
    } else {
        console.error('An error occurred during OpenAI request', error);
    }
}

This could probably be refactored into a streamCompletion helper function (that uses either callbacks or es6 generators to emit new messages).

Apologies there's not an easier way to do this within the SDK itself – the team will continue evaluating how to get this added natively, despite the lack of support in the current sdk generator tool we're using.

@schnerd Here it is (streamCompletion helper function code inspired by this snippet, courtesy of @rauschma) 👍 :


// https://2ality.com/2018/04/async-iter-nodejs.html#generator-%231%3A-from-chunks-to-lines
async function* chunksToLines(chunksAsync) {
  let previous = "";
  for await (const chunk of chunksAsync) {
    const bufferChunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
    previous += bufferChunk;
    let eolIndex;
    while ((eolIndex = previous.indexOf("\n")) >= 0) {
      // line includes the EOL
      const line = previous.slice(0, eolIndex + 1).trimEnd();
      if (line === "data: [DONE]") break;
      if (line.startsWith("data: ")) yield line;
      previous = previous.slice(eolIndex + 1);
    }
  }
}

async function* linesToMessages(linesAsync) {
  for await (const line of linesAsync) {
    const message = line.substring("data :".length);

    yield message;
  }
}

async function* streamCompletion(data) {
  yield* linesToMessages(chunksToLines(data));
}

try {
  const completion = await openai.createCompletion(
    {
      model: "text-davinci-003",
      max_tokens: 100,
      prompt: "It was the best of times",
      stream: true,
    },
    { responseType: "stream" }
  );

  for await (const message of streamCompletion(completion.data)) {
    try {
      const parsed = JSON.parse(message);
      const { text } = parsed.choices[0];

      process.stdout.write(text);
    } catch (error) {
      console.error("Could not JSON parse stream message", message, error);
    }
  }

  process.stdout.write("\n");
} catch (error) {
  if (error.response?.status) {
    console.error(error.response.status, error.message);

    for await (const data of error.response.data) {
      const message = data.toString();

      try {
        const parsed = JSON.parse(message);

        console.error("An error occurred during OpenAI request: ", parsed);
      } catch (error) {
        console.error("An error occurred during OpenAI request: ", message);
      }
    }
  } else {
    console.error("An error occurred during OpenAI request", error);
  }
}
blakeross commented 1 year ago

@gfortaine This solution works great with next.js API endpoints running on localhost. But once you deploy to Vercel, streaming responses via serverless functions are prohibited by AWS Lambda. You can get around this limitation by switching to next.js' experimental new Edge runtime, but then as far as I can tell that doesn't work with axios... which your solution relies on. So I still haven't found a way to actually stream openAI responses via next.js in production. Any ideas?

blakeross commented 1 year ago

@gfortaine Have got it working using fetch directly instead of the openAI lib but I believe there's a bug with chunksToLine. It appears to assume that chunks will be >= 1 line, but chunks can actually be part of a line. @rauschma's original implementation addresses this.

gtokman commented 1 year ago

@blakeross do you have any sample code on how you got it to work with next.js and vercel? Wouldn't the lambda finish if you sent a response back to the client?

blakeross commented 1 year ago

@gtokman it works if you use Vercel's new Edge runtime functions

dan-kwiat commented 1 year ago

@gtokman @blakeross may be useful: https://github.com/dan-kwiat/openai-edge

gfortaine commented 1 year ago

Here is a fetch based client fully generated from SDK auto-generation tool 🎉 cc @schnerd @santimirandarp @blakeross @gtokman @dan-kwiat : https://github.com/openai/openai-node/pull/45#issuecomment-1371569799

(Bonus : it is wrapped by @vercel/fetch to provide retry (429 Network Error, ...) & DNS caching)


import { createConfiguration, OpenAIApi } from "@fortaine/openai";
import { streamCompletion } from "@fortaine/openai/stream";

import dotenv from "dotenv-flow";
dotenv.config({
  node_env: process.env.APP_ENV || process.env.NODE_ENV || "development",
  silent: true,
});

const configurationOpts = {
  authMethods: {
    apiKeyAuth: {
      accessToken: process.env.OPENAI_API_KEY,
    },
  },
};

const configuration = createConfiguration(configurationOpts);

const openai = new OpenAIApi(configuration);

try {
  const completion = await openai.createCompletion({
    model: "text-davinci-003",
    prompt: "1,2,3,",
    max_tokens: 193,
    temperature: 0,
    stream: true,
  });

  for await (const message of streamCompletion(completion)) {
    try {
      const parsed = JSON.parse(message);
      const { text } = parsed.choices[0];

      process.stdout.write(text);
    } catch (error) {
      console.error("Could not JSON parse stream message", message, error);
    }
  }
  process.stdout.write("\n");
} catch (error) {
  if (error.code) {
    try {
      const parsed = JSON.parse(error.body);
      console.error("An error occurred during OpenAI request: ", parsed);
    } catch (error) {
      console.error("An error occurred during OpenAI request: ", error);
    }
  } else {
    console.error("An error occurred during OpenAI request", error);
  }
}
shawnswed commented 1 year ago

@gfortaine we actually use @microsoft/fetch-event-source for the playground to do streaming with POST 👍

Thank you all for sharing your solutions here! I agree that @smervs solution currently looks like the best option available for the openai-node package. Here's a more complete example with proper error handling and no extra dependencies:

try {
    const res = await openai.createCompletion({
        model: "text-davinci-002",
        prompt: "It was the best of times",
        max_tokens: 100,
        temperature: 0,
        stream: true,
    }, { responseType: 'stream' });

    res.data.on('data', data => {
        const lines = data.toString().split('\n').filter(line => line.trim() !== '');
        for (const line of lines) {
            const message = line.replace(/^data: /, '');
            if (message === '[DONE]') {
                return; // Stream finished
            }
            try {
                const parsed = JSON.parse(message);
                console.log(parsed.choices[0].text);
            } catch(error) {
                console.error('Could not JSON parse stream message', message, error);
            }
        }
    });
} catch (error) {
    if (error.response?.status) {
        console.error(error.response.status, error.message);
        error.response.data.on('data', data => {
            const message = data.toString();
            try {
                const parsed = JSON.parse(message);
                console.error('An error occurred during OpenAI request: ', parsed);
            } catch(error) {
                console.error('An error occurred during OpenAI request: ', message);
            }
        });
    } else {
        console.error('An error occurred during OpenAI request', error);
    }
}

This could probably be refactored into a streamCompletion helper function (that uses either callbacks or es6 generators to emit new messages).

Apologies there's not an easier way to do this within the SDK itself – the team will continue evaluating how to get this added natively, despite the lack of support in the current sdk generator tool we're using.

Hi. Thanks for the great code. It works great in straight Node.js but in React it throws a 'res.data.on is not a function error. Maybe something to do with Webpack. Any insight would be appreciated. Thanks again.

shawnswed commented 1 year ago

Hi everyone.@smervs solution works great with straight Node.js but in React it throws a 'res.data.on() is not a function error. Maybe something to do with Webpack. Any insight would be appreciated. Thanks again.

DerBasler commented 1 year ago

@shawnswed I am facing the same issue: Property 'on' does not exist on type 'CreateCompletionResponse' 🤔 I assume that we all using "openai": "^3.1.0", I saw the pr from @gfortaine https://github.com/openai/openai-node/pull/45 so hopefully this one will soon be in In the mean time I will try to somehow trick ts to ignore type and try to see if it works anyway. I hope I remember to update you ^^

shawnswed commented 1 year ago

Thanks, DerBasler. Please keep me in the loop.

Munkyfoot commented 1 year ago

Here's a quick and dirty workaround.

Edit: If you are using NextJS, a better solution can be found here https://vercel.com/blog/gpt-3-app-next-js-vercel-edge-functions.

Server-Side:

// Import the Readable stream module
import { Readable } from "stream";

// Set the response headers
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");

// Generate the response using the OpenAI API
const response = await openai.createCompletion({
    prompt: "It was the best of times",
    stream: true,
    ...
}, { responseType: 'stream' });

// Convert the response to a Readable stream (this is a temporary workaround)
const stream = response.data as any as Readable;

// Process the data stream
let streamHead = true; // Flag to indicate whether a message begins the stream or is a continuation
stream.on("data", (chunk) => {
    try {
        // Parse the chunk as a JSON object
        const data = JSON.parse(chunk.toString().trim().replace("data: ", ""));
        console.log(data);
        // Write the text from the response to the output stream
        res.write(JSON.stringify({text: data.choices[0].text, streamHead: streamHead}));
        streamHead = false;
        // Send immediately to allow chunks to be sent as they arrive
        res.flush();
    } catch (error) {
        // End the stream but do not send the error, as this is likely the DONE message from createCompletion
        console.error(error);
        res.end();
    }
});

// Send the end of the stream on stream end
stream.on("end", () => {
    res.end();
});

// If an error is received from the completion stream, send an error message and end the response stream
stream.on("error", (error) => {
    console.error(error);
    res.end(JSON.stringify({ error: true, message: "Error generating response." }));
});

Client-Side:

// Query your endpoint
const res = await fetch('/yourapi/', {...})
// Create a reader for the response body
const reader = res.body.getReader();
// Create a decoder for UTF-8 encoded text
const decoder = new TextDecoder("utf-8");
let result = "";
// Function to read chunks of the response body
const readChunk = async () => {
    return reader.read().then(({ value, done }) => {
        if (!done) {
            const dataString = decoder.decode(value);
            const data = JSON.parse(dataString);
            console.log(data);

            if (data.error) {
                console.error("Error while generating content: " + data.message);
            } else {
                result = data.streamHead ? data.text : result + data.text;
                return readChunk();
            }
        } else {
            console.log("done");
        }
    });
};

await readChunk();

The result variable is updated as the content arrives.

microsoftbuild commented 1 year ago

Thanks for the neat implementation @schnerd

I am using this with the listFineTuneEvents() and getting similar error as reported by @DerBasler : Property 'on' does not exist on type 'ListFineTuneEventsResponse'.

Currently on "openai": "^3.1.0"

xithalius commented 1 year ago

@shawnswed You can use onDownloadProgress which you can pass to the createCompletion options. You can use @schnerd 's snippet on progressEvent.currentTarget.response.

smadikanti commented 1 year ago

Running into The provided value 'stream' is not a valid enum value of type XMLHttpRequestResponseType.

Munkyfoot commented 1 year ago

Here is a much better solution than the hacky one I posted earlier: https://vercel.com/blog/gpt-3-app-next-js-vercel-edge-functions

I found that you need to modify the return new Response(stream) line to return new Response(stream, { headers: { 'Content-Type': 'text/event-stream' } }) in the code from the Edge example to get it to stream the text as it comes in.

MarkoTelek commented 1 year ago

Here's a quick and dirty workaround.

Edit: If you are using NextJS, a better solution can be found here https://vercel.com/blog/gpt-3-app-next-js-vercel-edge-functions.

Server-Side:

// Import the Readable stream module
import { Readable } from "stream";

// Set the response headers
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");

// Generate the response using the OpenAI API
const response = await openai.createCompletion({
    prompt: "It was the best of times",
    stream: true,
    ...
}, { responseType: 'stream' });

// Convert the response to a Readable stream (this is a temporary workaround)
const stream = response.data as any as Readable;

// Process the data stream
let streamHead = true; // Flag to indicate whether a message begins the stream or is a continuation
stream.on("data", (chunk) => {
    try {
        // Parse the chunk as a JSON object
        const data = JSON.parse(chunk.toString().trim().replace("data: ", ""));
        console.log(data);
        // Write the text from the response to the output stream
        res.write(JSON.stringify({text: data.choices[0].text, streamHead: streamHead}));
        streamHead = false;
        // Send immediately to allow chunks to be sent as they arrive
        res.flush();
    } catch (error) {
        // End the stream but do not send the error, as this is likely the DONE message from createCompletion
        console.error(error);
        res.end();
    }
});

// Send the end of the stream on stream end
stream.on("end", () => {
    res.end();
});

// If an error is received from the completion stream, send an error message and end the response stream
stream.on("error", (error) => {
    console.error(error);
    res.end(JSON.stringify({ error: true, message: "Error generating response." }));
});

Client-Side:

// Query your endpoint
const res = await fetch('/yourapi/', {...})
// Create a reader for the response body
const reader = res.body.getReader();
// Create a decoder for UTF-8 encoded text
const decoder = new TextDecoder("utf-8");
let result = "";
// Function to read chunks of the response body
const readChunk = async () => {
    return reader.read().then(({ value, done }) => {
        if (!done) {
            const dataString = decoder.decode(value);
            const data = JSON.parse(dataString);
            console.log(data);

            if (data.error) {
                console.error("Error while generating content: " + data.message);
            } else {
                result = data.streamHead ? data.text : result + data.text;
                return readChunk();
            }
        } else {
            console.log("done");
        }
    });
};

await readChunk();

The result variable is updated as the content arrives.

Hey. How do I get the flush function in the NextApiReponse? It works when I run it on my machine, but when I deploy to Vercel, then it throws an Error because there's no flush function in the NextApiReponse.

Munkyfoot commented 1 year ago

Hey. How do I get the flush function in the NextApiReponse? It works when I run it on my machine, but when I deploy to Vercel, then it throws an Error because there's no flush function in the NextApiReponse.

I highly recommend using this implementation instead. https://vercel.com/blog/gpt-3-app-next-js-vercel-edge-functions

There's a version for a standard Next api route and one that uses Edge. If your project allows, I highly recommend using the Edge version as it will save on resources and comes with some other advantages.

MarkoTelek commented 1 year ago

Hey. How do I get the flush function in the NextApiReponse? It works when I run it on my machine, but when I deploy to Vercel, then it throws an Error because there's no flush function in the NextApiReponse.

I highly recommend using this implementation instead. https://vercel.com/blog/gpt-3-app-next-js-vercel-edge-functions

There's a version for a standard Next api route and one that uses Edge. If your project allows, I highly recommend using the Edge version as it will save on resources and comes with some other advantages.

I've been trying that one as well. But it seems like these edge functions run on client side? Because I can't get it to run prisma/database, nor do I see a way to use useSession with it. I couldn't even get it to run level db. How do I distinguish between users using the api without a database with edge function, or am I missing something? I remember reading somewhere that a lot of npm packages are not supported with edge functions but I wasnt able to find the source of it.

maceip commented 1 year ago

But it seems like these edge functions run on client side? Because I can't get it to run prisma/database, nor do I see a way to use useSession with it.

use https://www.npmjs.com/package/@auth/core, the web compatible version of nextauth (useSession is not included)

Edge functions are technically server side but don't support most node apis

yairhaimo commented 1 year ago

I created an MIT library that helps you compose and run AI pipelines (including GPT streaming). It supports Vercel Edge Functions out of the box. Check it out at https://client.aigur.dev

leducgiachoang commented 1 year ago

Tôi đã tạo một thư viện MIT giúp bạn soạn và chạy các quy trình AI (bao gồm cả phát trực tuyến GPT). Nó hỗ trợ các Chức năng Vercel Edge ngay lập tức. Kiểm tra nó tại https://client.aigur.dev

@yairhaimo How to integrate for pure Vuejs application, using javascripts

yairhaimo commented 1 year ago

@yairhaimo How to integrate for pure Vuejs application, using javascripts

Im not really familiar with Vue so I can't give exact instructions but if you want to run the Pipelines on the frontend you just need to compose them (check out the "getting started" section in the docs) and just call them using the invoke method (also in the "getting started"). If you want to move it to the server side (invoking from the client but running on the server) it depends which server and infrastructure you're using.

Just be aware that if you're running it on the client side your OpenAI key will be exposed and it might will be abused by people.

shitianfang commented 1 year ago

我创建了一个 MIT 库,可帮助你编写和运行 AI 管道(包括 GPT 流式处理)。它支持开箱即用的Vercel Edge功能。在 https://client.aigur.dev 查看

@yairhaimo Will there be a delay time limit for vercel Free, such as 10 seconds, or will stream reduce the time to resolve the limit

yairhaimo commented 1 year ago

我创建了一个 MIT 库,可帮助你编写和运行 AI 管道(包括 GPT 流式处理)。它支持开箱即用的Vercel Edge功能。在 https://client.aigur.dev 查看

@yairhaimo Will there be a delay time limit for vercel Free, such as 10 seconds, or will stream reduce the time to resolve the limit

Right now the Vercel helper function returns the response only when the Pipeline finishes executing so the 10 second limit on the Hobby plan might an issue. I'll change it to return the response right away if it's a streaming Pipeline so the 10 second timeout wont be an issue. Meanwhile you can do that yourself if you dont use the Vercel helper and dont want to wait for a fix.

xithalius commented 1 year ago

Please move the conversation about the Aigur Client elsewhere.

Note for those using the new ChatGPT model in combination with streaming. The snippet from @schnerd has to be updated from parsed.choices[0].text to parsed.choices[0].delta.content. The typings of CreateChatCompletionResponseChoicesInner do not match when using streaming.

lennartle commented 1 year ago

My take on it, with punctuation detection to prevent response over-spamming

const openAiCompletion = async (messages, onText) => {
    try {
        const response = await fetch('https://api.openai.com/v1/chat/completions', {
            method: 'POST',
            headers: {
                'Authorization': `Bearer ${OPENAI_TOKEN}`,
                'Content-Type': 'application/json',
            },
            body: JSON.stringify({
                messages,
                model: "gpt-3.5-turbo",
                max_tokens: 2048,
                stream: true
            })
        });

        const decoder = new TextDecoder('utf8');
        const reader = response.body.getReader();

        let fullText = ''
        let lastFire = 0

        async function read() {
            const { value, done } = await reader.read();

            if (done) return onText(fullText)

            const delta = decoder.decode(value).match(/"delta":\s*({.*?"content":\s*".*?"})/)?.[1]

            if (delta) {
                const content = JSON.parse(delta).content

                fullText += content

                //Detects punctuation, if yes, fires onText once per .5 sec
                if (/[\p{P}\p{S}]/u.test(content)) {
                    const now = Date.now();

                    if (now - lastFire > 500) {
                        lastFire = now
                        onText(fullText)
                    }
                }
            }

            await read()

        }

        await read()

        return fullText
    } catch (error) {
        return error;
    }
}

use it like this

const aiResponse = await openAiCompletion(prompt, (text) => { 
//update UI or whatever
})

//do something with full response
console.log(aiResponse)
zachkirsch commented 1 year ago

Here's another SDK that supports streaming but in a more first class way https://github.com/fern-openai/openai-node

Screenshot 2023-03-08 at 11 33 24 PM (1)

It also has callbacks for onError and onFinish

Screenshot 2023-03-09 at 12 45 12 AM

If you have feedback, please file an issue on that repo! You can make a PR as a proof of concept, but the SDK is autogenerated by Fern so any code changes will have to go in the generator.

revmischa commented 1 year ago

I just published a really basic completion streamer module: https://www.npmjs.com/package/openai-stream-mini Has no dependencies, uses built-in node 18 or browser fetch

Give it a try

raphaelrk commented 1 year ago

This post really blew up 😆 here's the code I actually wound up using for this, works on both frontend and backend, not yet updated for chat. It's okay code. Rewriting with generators could be nice.

Usage:

await streamOne(modelName, prompt, onToken, onDone, onError, options);

Source

import { Configuration, OpenAIApi } from 'openai';
const OPENAI_API_KEY = 'sk-...';
const configuration = new Configuration({ apiKey: OPENAI_API_KEY });
const api = new OpenAIApi(configuration);

type OtherOptions = {
  maxTokens?: number;
  temp?: number;
  n?: number;
  stop?: string | string[];
}

type CompleteResponse = {
  responses: string[];
  cost: number;
  tokenUsage: number;
  promptTokens: number;
  completionTokens: number;
};

type CompleteOneResponse = {
  response: string;
  cost: number;
  tokenUsage: number;
  promptTokens: number;
  completionTokens: number;
};

export type OpenAIModel =
  'ada' |
  'babbage' |
  'curie' |
  'davinci' |
  'text-ada-001' |
  'text-babbage-001' |
  'text-curie-001' |
  'text-davinci-001' |
  'text-davinci-002' |
  'text-davinci-003' |
  'code-cushman-001' |
  'code-davinci-002';

export const modelToPrice: Record<OpenAIModel, number> = {
  'ada': 0.0004,
  'babbage': 0.0005,
  'curie': 0.002,
  'davinci': 0.02,
  'text-ada-001': 0.0004,
  'text-babbage-001': 0.0005,
  'text-curie-001': 0.002,
  'text-davinci-001': 0.02,
  'text-davinci-002': 0.02,
  'text-davinci-003': 0.02,
  'code-cushman-001': 0.0,
  'code-davinci-002': 0.0,
};

export async function complete(model: OpenAIModel, prompt: string, otherOptions?: OtherOptions): Promise<CompleteResponse> {
  const price = modelToPrice[model];
  if (price === undefined) throw new Error('Unknown model: ' + model);

  let max_tokens = otherOptions?.maxTokens || 80;
  let temperature = otherOptions?.temp || 0.3;
  let n = otherOptions?.n || 1;
  let best_of = n;
  let stop = otherOptions?.stop || '\n';
  let frequency_penalty = 0.2;
  let presence_penalty = 0.1;

  const res = await api.createCompletion({ model, prompt, max_tokens, temperature, n, best_of, stop, frequency_penalty, presence_penalty, stream: false });

  const tokenUsage = res.data.usage.total_tokens;
  const cost = tokenUsage / 1000 * price;
  const responses = res.data.choices.map(choice => choice.text);
  const promptTokens = res.data.usage.prompt_tokens;
  const completionTokens = res.data.usage.completion_tokens;

  return { responses, cost, tokenUsage, promptTokens, completionTokens };
}

export async function completeOne(model: OpenAIModel, prompt: string, otherOptions?: OtherOptions): Promise<CompleteOneResponse> {
  const res = await complete(model, prompt, otherOptions);
  return { response: res.responses[0], cost: res.cost, tokenUsage: res.tokenUsage, promptTokens: res.promptTokens, completionTokens: res.completionTokens };
}

type OnToken = (token: string) => any;
type OnDone = () => any;
type OnError = (msg: string) => any;

// TODO: estimate prompt tokens / cost including prompt tokens
export async function streamOne(model: OpenAIModel, prompt: string, onToken: OnToken, onDone: OnDone, onError: OnError, otherOptions?: OtherOptions): Promise<void> {
  // verify model and get price
  const price = modelToPrice[model];
  if (price === undefined) throw new Error('Unknown model: ' + model);

  // set options
  let max_tokens = otherOptions?.maxTokens || 80;
  let temperature = otherOptions?.temp || 0.3;
  let n = otherOptions?.n || 1;
  let best_of = n;
  let stop = otherOptions?.stop || '\n';
  let frequency_penalty = 0.2;
  let presence_penalty = 0.1;

  // create stream
  const fetchPromise = fetch(`https://api.openai.com/v1/completions`, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${OPENAI_API_KEY}`,
    },
    body: JSON.stringify({ model, prompt, max_tokens, temperature, n, best_of, stop, frequency_penalty, presence_penalty, stream: true }),
  });
  const response = await fetchPromise;
  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  // keep track of tokens
  let concat = '';
  let completionTokenCount = 0;

  // read stream
  let gotReaderDone = false;
  let gotDoneMessage = false;
  while (true) {
    // get next chunk
    const { done, value } = await reader.read();
    if (done) {
      gotReaderDone = true;
      break;
    }
    const text = decoder.decode(value);

    // split chunk into lines
    // todo: there's probs a better way to do this
    const lines = text.split('\n').filter(line => line.trim() !== '');
    for (const line of lines) {
      // remove the data: prefix
      const lineMessage = line.replace(/^data: /, '');

      // if we got the done message, stop
      if (lineMessage === '[DONE]') {
        gotDoneMessage = true;
        break; // return;
      }

      // try to parse the line as JSON, and if it works, get the token and call the callback
      try {
        const parsed = JSON.parse(lineMessage);
        const token = parsed.choices[0].text;
        concat += token;
        completionTokenCount++;
        onToken(token);
      } catch (error) {
        // todo: handle error better -- retry? inform caller?
        console.error(`Could not JSON parse stream message`, { text, lines, line, lineMessage, error });

        try {
          let errorInfo = JSON.parse(text);
          console.error(`Error info`, errorInfo);
          if (errorInfo.message) return onError(errorInfo.message);
          if (errorInfo.error.message) return onError(errorInfo.error.message);
        } catch (error) {
          // ignore
        }
      }
    }
    if (gotDoneMessage) break;
  }

  const cost = completionTokenCount / 1000 * price;
  console.log(`Streamed ${completionTokenCount} tokens. $${cost}`);
  console.log('Final text:', concat);
  onDone();
}
raphaelrk commented 1 year ago

Also for token estimation I've been using:

tokenizer.ts

// Name:
//   tokenizer.ts
//
// Description:
//   Tokenizes a string into a list of tokens
//   Uses the python tokenizer.py script via the python_util.ts wrapper
//   Supports string and string[] inputs
//
// Example usage:
//   import { tokenize } from './tokenizer';
//
//   let result1 = await tokenize('hello world');
//   console.log(result1); // ['hello', ' world']
//
//   let result2 = await tokenize(['hello world', 'goodbye world']);
//   console.log(result2); // [['hello', ' world'], ['good', 'bye', ' world']]

import { PythonShellInstance } from './python_util';

const tokenizerPyshell = new PythonShellInstance('tokenizer.py');
const chatGPTtokenizerPyshell = new PythonShellInstance('chat_gpt_tokenizer.py');
export type TokenizerEnum = 'cl100k_base' | 'chat_gpt';

export async function tokenizeOne(msg: string, tokenizer: TokenizerEnum = "cl100k_base"): Promise<string[]> {
  let pyshell = tokenizer === 'cl100k_base' ? tokenizerPyshell : chatGPTtokenizerPyshell;
  const { tokens } = await pyshell.sendJsonToPythonAndParseOne<{ tokens: string[] }>({ input: msg });
  return tokens;
}

export async function tokenizeMany(msgs: string[], tokenizer: TokenizerEnum ="cl100k_base"): Promise<string[][]> {
  let results : string[][] = [];
  for (let msg of msgs) {
    results.push(await tokenizeOne(msg, tokenizer));
  }
  return results;
}

export async function tokenize<T extends string | string[]>(msg: T, tokenizer: TokenizerEnum ="cl100k_base"): Promise<T extends string ? string[] : string[][]> {
  if (typeof msg === 'string') {
    return await tokenizeOne(msg, tokenizer) as T extends string ? string[] : string[][];
  } else {
    return await tokenizeMany(msg, tokenizer) as T extends string ? string[] : string[][];
  }
}

python_util.ts (requires python-shell -- npm i python-shell)

// Name:
//   python_util.ts
//
// Description:
//   Utility functions for interacting with python scripts
//   Wraps the python-shell library
//   Given a python script, creates a singleton python shell
//   Sends messages to the python shell and waits for a response
//   Parses the response as JSON
//
// Example usage:
//   import { PythonShellInstance } from './python_util';
//   let pyshell = new PythonShellInstance('tokenizer.py');
//   let result = await pyshell.sendToPythonAndParseOne('hello world');
//   console.log(result);
//
// Note:
//   at some point need to automate this,
//   but be sure to run these commands in the ./py directory:
//     source venv/bin/activate.fish
//     pip install -r requirements.txt

import { PythonShell } from 'python-shell';

export class PythonShellInstance {
  private pyshell: PythonShell;
  private pyshellResolveQueue: ((msg: string) => void)[] = [];

  constructor(scriptName: string) {
    this.pyshell = new PythonShell(scriptName, {
      mode: 'text',
      pythonPath: './py/venv/bin/python',
      pythonOptions: ['-u'],
      scriptPath: './py',
    });

    // when we get a message from python, run the corresponding callback
    this.pyshell.on('message', msg => {
      console.log("Got message from python:", { msg });
      let resolve = this.pyshellResolveQueue.shift();
      if (resolve) resolve(msg);
    });

    // log other events:
    // - close
    // - stderr
    // - pythonError
    // - error (error spawning, killing, or messaging the process)
    this.pyshell.on('close', () => console.log("Python shell closed"));
    this.pyshell.on('stderr', err => console.log("Got stderr from python:", { err }));
    this.pyshell.on('pythonError', err => console.log("Got stderr from python:", { err }));
    this.pyshell.on('error', err => console.log("Got error from python:", { err }));
  }

  // function to send a json message to python and wait for a response
  async sendJsonToPythonAndParseOne<T>(json: object): Promise<T> {
    console.log("Sending json to python:", { json });
    return new Promise(resolve => {
      this.pyshellResolveQueue.push(msg => resolve(JSON.parse(msg)));
      this.pyshell.send(JSON.stringify(json));
    });
  }

  // function to send a string message to python and wait for a response
  async sendTextToPythonAndParseOne<T>(msg: string): Promise<T> {
    console.log("Sending message to python:", { msg });
    return new Promise(resolve => {
      this.pyshellResolveQueue.push(msg => resolve(JSON.parse(msg)));
      this.pyshell.send(msg);
    });
  }
}

requirements.txt

tiktoken==0.2.0

tokenizer.py

#######################################################
# Name:
#   tokenizer.js
#
# Description:
#   tokenizer.py reads a string from stdin, encodes it, and prints the tokens to stdout
#   the input string is expected to be a JSON object with a single key "input"
#   the output is a JSON object with a single key "tokens"
#   the value of "tokens" is an array of strings, each string is a token
#
# Example:
#   $ echo '{"input": "hello world"}' | python tokenizer.py
#   {"tokens": ["hello", " world"]}
#
#######################################################

# import openai's tiktoken library and json
import tiktoken
import json

# initialize the tokenizer
enc = tiktoken.get_encoding("cl100k_base")

# loop: read from stdin, tokenize, print to stdout
while True:
    # try to read a line from stdin
    try:
        raw_line = input()
    except EOFError:
        break

    # parse the line as JSON
    line_json = json.loads(raw_line)

    # get the input string
    input_str = line_json["input"]

    # tokenize the input string
    tokens = enc.encode(input_str)

    # convert the tokens to a list of strings
    tokens_str = [enc.decode([t]) for t in tokens]

    # create dict with the tokens, and print it as JSON
    output_json = {"tokens": tokens_str}
    print(json.dumps(output_json))

chat_gpt_tokenizer.py

#######################################################
# Name:
#   tokenizer.js
#
# Description:
#   tokenizer.py reads a string from stdin, encodes it, and prints the tokens to stdout
#   the input string is expected to be a JSON object with a single key "input"
#   the output is a JSON object with a single key "tokens"
#   the value of "tokens" is an array of strings, each string is a token
#
# Example:
#   $ echo '{"input": "hello world"}' | python tokenizer.py
#   {"tokens": ["hello", " world"]}
#
#######################################################

# import openai's tiktoken library and json
import tiktoken
import json

# initialize the tokenizer
cl100k_base = tiktoken.get_encoding("cl100k_base")
enc = tiktoken.Encoding(
    name="chat-davinci-003",
    pat_str=cl100k_base._pat_str,
    mergeable_ranks=cl100k_base._mergeable_ranks,
    special_tokens={
        **cl100k_base._special_tokens,
        "<|im_start|>": 100264,
        "<|im_end|>": 100265,
        "<|im_sep|>": 100266,
    }
)

# test
tokens = enc.encode(
    "<|im_start|>user\nHello<|im_end|><|im_start|>assistant",
    allowed_special={"<|im_start|>", "<|im_end|>"},
)
assert len(tokens) == 7
assert tokens == [100264, 882, 198, 9906, 100265, 100264, 78191]

# loop: read from stdin, tokenize, print to stdout
while True:
    # try to read a line from stdin
    try:
        raw_line = input()
    except EOFError:
        break

    # parse the line as JSON
    line_json = json.loads(raw_line)

    # get the input string
    input_str = line_json["input"]

    # tokenize the input string
    tokens = enc.encode(
        input_str,
        allowed_special={"<|im_start|>", "<|im_end|>"},
    )

    # convert the tokens to a list of strings
    tokens_str = [enc.decode([t]) for t in tokens]

    # create dict with the tokens, and print it as JSON
    output_json = {"tokens": tokens_str}
    print(json.dumps(output_json))
syonfox commented 1 year ago

Sick raphaelrk thanks what I was looking for check out https://github.com/syonfox/GPT-3-Encoder/tree/GPToken for a js/ts implementation of gpt token encoder. One todo item is to validate it against the reference python implementation but it works for estimation in the browser :)

feel free to open an issue there. Eventually the project will surpass that repo but I want the demo_app to be a decent base.

I am putting together for more proper demo.

One open question is whether anyone knows how the n / best_of works with streaming. I have found that logprobs: 3 does work but the others don't seem to have much of an effect.

Also, it seems that different models have slight variations on how the response is constructed.

Is this expected or do the models all use a common API interface that is standardized? for the complete API.

Give me the code! node18 fetch with readable streams needed. could find a proper polyfill (needs readable stream support) but using a new node was easiest. First of all thanks for the great starting point. I think this small lib might be a good base for some other stuff. Just rip out this complexity. still needs a little refining but can update it here or keep an eye on the demo_app ### `nano streamOne.js` ```js // https://github.com/openai/openai-node/issues/18#issuecomment-1463774674 // import { Configuration, OpenAIApi } from 'openai'; // const OPENAI_API_KEY = 'sk-...'; // const configuration = new Configuration({ apiKey: OPENAI_API_KEY }); // const api = new OpenAIApi(configuration); // type OtherOptions = { // maxTokens?: number; // temp?: number; // n?: number; // stop?: string | string[]; // } const modelToPrice/*: Record*/ = { 'ada': 0.0004, 'babbage': 0.0005, 'curie': 0.002, 'davinci': 0.02, 'text-ada-001': 0.0004, 'text-babbage-001': 0.0005, 'text-curie-001': 0.002, 'text-davinci-001': 0.02, 'text-davinci-002': 0.02, 'text-davinci-003': 0.02, 'code-cushman-001': 0.0, 'code-davinci-002': 0.0, }; const gptoken = require("gptoken"); class TextCompletion { constructor(prompt, config) { this.done = false; streamOne(); // todo this class might be a way of tracking response and streaming it to the user's browser by listening and creating socket events or something. //just for inspiration not used :) } } class Token { /** * Represents all we can know about a token from a streamed response * { * "text": "\n", * "index": 0, * "logprobs": { * "tokens": [ * "\n" * ], * "token_logprobs": [ * -0.4092595 * ], * "top_logprobs": null, * "text_offset": [ * 68 * ] * }, * "finish_reason": null * } * @param choice */ constructor(choice) { this.text = choice.text; this.token = gptoken.encode(this.text); this.choice = choice; this.log_index = choice.index; this.logprobs = choice.logprobs; this.text_offset = this.logprobs.text_offset[this.log_index]; this.prob = this.logprobs.token_logprobs[this.log_index]; } } function estamateTokens(prompt, response_limit=0) { return gptoken.countTokens(prompt) + response_limit } async function streamOne(model, prompt, onToken, onDone, onError, otherOptions) { // verify model and get price const price = modelToPrice[model]; if (price === undefined) throw new Error('Unknown model: ' + model); // set options otherOptions = otherOptions || {}; let max_tokens = otherOptions.maxTokens || 250; let temperature = otherOptions.temp || 0.5;// 0 to 1 let top_p = otherOptions.topP || 0.8 // 0 to 1 let n = otherOptions.n || 1; // not sure this working with stream let logprobs = otherOptions.logProbs || 0; // 0 - 5 let best_of = otherOptions.bestOf || 1; // not sure this working with stream let stop = otherOptions.stop || null; //string or array Optional Defaults to null Up to 4 sequences where the API will stop generating further tokens. The returned text will not contain the stop sequence. let frequency_penalty = 0.2; let presence_penalty = 0.1; let endpoint = otherOptions.endpoint || "/v1/completions" // create stream const fetchPromise = fetch(`https://api.openai.com${endpoint}`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${process.env.OPENAI_API_KEY}`, }, body: JSON.stringify({ model, prompt, max_tokens, temperature, n, best_of, logprobs, stop, frequency_penalty, presence_penalty, stream: true }), }).catch(e => { //catch network error/ fetch bugs console.error("Fetch Error: What is wrong? ", e); onError(e); }); //handel response const response = await fetchPromise; const reader = response.body.getReader(); const decoder = new TextDecoder(); // keep track of tokens // let full_text = prompt; // in c we could make this a buffer of size prompt + limit // and insert tokend by index for max performance. let tokens = [] let concat = ''; let completionTokenCount = 0; // read stream let gotReaderDone = false; let gotDoneMessage = false; console.debug("ttfb... openai-processing-time: ", response.headers.get("openai-processing-ms")) while (true) { // get next chunk const {done, value} = await reader.read(); if (done) { gotReaderDone = true; break; } const text = decoder.decode(value); // console.log(text); // split chunk into lines // todo: there's probs a better way to do this idk seems ok. const lines = text.split('\n').filter(line => line.trim() !== ''); // console.log("Number of tokens in chuck: ", lines.length); for (const line of lines) { // remove the data: prefix const lineMessage = line.replace(/^data: /, ''); // if we got the done message, stop if (lineMessage === '[DONE]') { gotDoneMessage = true; break; // return; } // try to parse the line as JSON, and if it works, get the token and call the callback try { const parsed = JSON.parse(lineMessage); const choice = parsed.choices[0]; //completion.addToken(choice); let myToken = new Token(choice); completionTokenCount++; // tokens.length?? tokens.push(myToken); // const logprobs = choice.logprobs; // const token = choice.text; // let i = choice.index; // let prob = logprobs.token_logprobs[i]; // let text_offset = logprobs.text_offset[i]; // let text = logprobs.tokens[i]; // let top_p = logprobs.top_logprobs; concat += myToken.text; onToken(myToken.text, myToken, parsed, response); } catch (error) { // todo: handle error better -- retry? inform caller? console.error(`Could not JSON parse stream message`, {text, lines, line, lineMessage, error}); try { let errorInfo = JSON.parse(text); console.error(`Error info`, errorInfo); if (errorInfo.message) return onError(errorInfo.message); if (errorInfo.error.message) return onError(errorInfo.error.message); } catch (error) { console.error("Failed to parse error response from stream.") // ignore if we cant read the error } } } if (gotDoneMessage) break; } const fullCount = estamateTokens(prompt+concat) const cost = completionTokenCount / 1000 * price; const fullCost = fullCount / 1000 * price; console.log(` Streamed ${completionTokenCount} tokens. $${cost} ... full cost with prompt(${fullCount}): $${fullCost}`); console.log('Final text:', prompt + concat); console.log("Full req+res token count: ", ); // let promptCount = gptoken.countTokens(prompt) onDone(prompt, concat, tokens, fullCost, response); } streamOne.modelToPrice = modelToPrice; /** * A test function to ask gpt for a quote / example usage * @returns {Promise} */ streamOne.test = async function () { function onToken(text, token, parsed, response) { console.log("Got Token: ", text, token.prob, token.text_offset ) } function onDone(prompt, data, tokens, cost, response) { console.log("Got Data: ", data) let fulltext = prompt + data; console.log(fulltext) console.log("full text token count: ", gptoken.countTokens(fulltext)) } function onError(err) { console.error("Got Error:)"); } let data = await streamOne("text-davinci-002", "What is the inspirational quote of the day in one sentence? \n QOTD: ", onToken, onDone, onError, {stop: ['.'], logProbs: 3}); //You can't be a real country unless you have a beer and an airline- it helps if you have some kind of a football team, or some nuclear weapons, but at the very least you need a beer console.log("Data Resolved:", data); } module.exports = streamOne; /** * * Hola Mi Amie, * * Today I will tell you all you need to know about text completion streaming * * each token is returned on at a time. you have the option to select a different word theoretically * to do this turn on logprobs: 3-5 max and then you can offer an editing on a per word bases after compleation. * * each chunk is returned as data: {... choices: [{text, index, logprobs}]} * Each choice looks like not this info we want is * text: choice[0].text * prob: choice[0].logprobs.token_logprobs[0] * full_text_offset: logprobs.text_offset[0] * */ ``` ### `nano demo.js` ``` const gptoken = require('gptoken'); require('dotenv').config(); const streamOne = require('./streamOne'); const {encode, decode, countTokens, tokenStats} = gptoken; const aiia = require("./aiia"); // import aiia from "./aiia" streamOne.test(); const str = 'This is an example sentence to try encoding out on!' const encoded = encode(str) console.log('Encoded this string looks like: ', encoded) console.log('We can look at each token and what it represents') for (let token of encoded) { console.log({token, string: decode([token])}) } //example count tokens usage if (countTokens(str) > 5) { console.log("String is over five tokens, inconcevable"); } ``` ### `nano .env` ``` OPENAI_API_KEY="sk-...." OPENAI_ENDPOINT="https://api.openai.com/" ``` ## Running it ``` mkdir app cd app npm init npm install gptoken dotenv nano streamOne.js nano demo.js nano .env nvm install v18.15.0 # or find a legit good polyfill for fetch :) .. and lmk node demo.js ```
Sample output ```js Encoded this string looks like: [ 1212, 318, 281, 1672, 6827, 284, 1949, 21004, 503, 319, 0 ] We can look at each token and what it represents { token: 1212, string: 'This' } { token: 318, string: ' is' } { token: 281, string: ' an' } { token: 1672, string: ' example' } { token: 6827, string: ' sentence' } { token: 284, string: ' to' } { token: 1949, string: ' try' } { token: 21004, string: ' encoding' } { token: 503, string: ' out' } { token: 319, string: ' on' } { token: 0, string: '!' } String is over five tokens, inconcevable String Token Stats: { count: 6, unique: 5, frequency: { '275': 1, '1031': 1, '2318': 2, '21943': 1, '22944': 1 }, positions: { '275': [ 4 ], '1031': [ 5 ], '2318': [ 2, 3 ], '21943': [ 0 ], '22944': [ 1 ] }, tokens: [ 21943, 22944, 2318, 2318, 275, 1031 ] } We can decode it back into: This is an example sentence to try encoding out on! Fetching... get : https://api.openai.com/v1/models Server listening on port 3000 ttfb... openai-processing-time: 260 Got Token: -0.41367477 68 Got Token: -0.14588133 69 Got Token: " -0.37068415 70 Got Token: You -1.591554 71 Got Token: can -0.598935 74 Got Token: 't -0.011316275 78 Got Token: be -0.4461555 80 Got Token: a -0.00020513259 83 Got Token: real -0.0061629214 85 Got Token: country -0.0022323753 90 Fine tuning Models: [ 'cushman:2020-05-03', 'if-davinci:3.0.0', 'davinci-if:3.0.0', 'davinci-instruct-beta:2.0.0' ] Got Token: unless -0.000015567284 98 Got Token: you -3.076318e-7 105 Got Token: have -0.0000020966954 109 Got Token: a -0.000014736571 114 Got Token: beer -0.00026218753 116 Got Token: and -0.000015805701 121 Got Token: an -0.00003583558 125 Got Token: airline -0.00001855031 128 Got Token: - -0.009223087 136 Got Token: it -0.015826264 137 Got Token: helps -0.00012548709 140 Got Token: if -0.0000034089344 146 Got Token: you -0.000016282536 149 Got Token: have -0.000059085025 153 Got Token: some -0.00004144026 158 Got Token: kind -0.00048095893 163 Got Token: of -4.277735e-7 168 Got Token: a -0.06332793 171 Got Token: football -0.00010092916 173 Got Token: team -0.000097351025 182 Got Token: , -0.0008433579 187 Got Token: or -0.000088051806 188 Got Token: some -0.00004871012 191 Got Token: nuclear -0.0014420545 196 Got Token: weapons -0.00001306671 204 Got Token: , -0.000055269407 212 Got Token: but -0.000027133337 213 Got Token: at 0 217 Got Token: the -0.0000061516675 220 Got Token: very -0.0000071062755 224 Got Token: least -0.000009251094 229 Got Token: you -0.000012468796 235 Got Token: need -0.0000029311614 239 Got Token: a -0.000009606849 244 Got Token: beer -0.000016642034 246 Streamed 45 tokens. $0.0009 ... full cost with prompt(64): $0.00128 Final text: What is the inspirational quote of the day in one sentence? QOTD: "You can't be a real country unless you have a beer and an airline- it helps if you have some kind of a football team, or some nuclear weapons, but at the very least you need a beer Full req+res token count: Got Data: "You can't be a real country unless you have a beer and an airline- it helps if you have some kind of a football team, or some nuclear weapons, but at the very least you need a beer What is the inspirational quote of the day in one sentence? QOTD: "You can't be a real country unless you have a beer and an airline- it helps if you have some kind of a football team, or some nuclear weapons, but at the very least you need a beer full text token count: 64 Data Resolved: undefined ```
ponytojas commented 1 year ago

My take on it, with punctuation detection to prevent response over-spamming

const openAiCompletion = async (messages, onText) => {
    try {
        const response = await fetch('https://api.openai.com/v1/chat/completions', {
            method: 'POST',
            headers: {
                'Authorization': `Bearer ${OPENAI_TOKEN}`,
                'Content-Type': 'application/json',
            },
            body: JSON.stringify({
                messages,
                model: "gpt-3.5-turbo",
                max_tokens: 2048,
                stream: true
            })
        });

        const decoder = new TextDecoder('utf8');
        const reader = response.body.getReader();

        let fullText = ''
        let lastFire = 0

        async function read() {
            const { value, done } = await reader.read();

            if (done) return onText(fullText)

            const delta = decoder.decode(value).match(/"delta":\s*({.*?"content":\s*".*?"})/)?.[1]

            if (delta) {
                const content = JSON.parse(delta).content

                fullText += content

                //Detects punctuation, if yes, fires onText once per .5 sec
                if (/[\p{P}\p{S}]/u.test(content)) {
                    const now = Date.now();

                    if (now - lastFire > 500) {
                        lastFire = now
                        onText(fullText)
                    }
                }
            }

            await read()

        }

        await read()

        return fullText
    } catch (error) {
        return error;
    }
}

use it like this

const aiResponse = await openAiCompletion(prompt, (text) => { 
//update UI or whatever
})

//do something with full response
console.log(aiResponse)

I have found some errors in the execution of the code due to the regex because sometimes I have found multiple deltas.

I leave here my solution in case it could be useful for someone.


  __cleanResponse = (res) => {
          const deltas = [];
          const splitted = res.split('\n');
          let finished = false;
          for (let i = 0; i < splitted.length; i += 1) {
              try {
                  let test = splitted[i];
                  if (test === 'data: [DONE]' || test.indexOf('data: [DONE]') >= 0) {
                      finished = true;
                      continue;
                  }
                  if (test.length === 0) continue;
                  if (test.startsWith('data:')) test = test.slice(5);
                  test = JSON.parse(test);
                  if (test.choices[0].delta.content) deltas.push(test.choices[0].delta.content);
              } catch (err) {
                  console.error(Error during message clean: " + err);
                  continue;
              }
          }
          return { deltas, finished };
      }

    __readResponse = async (reader, previousText = '', cbStream = () => { }) => {
        let fullText = previousText;
        const decoder = new TextDecoder('utf8');

        const { value, done } = await reader.read();

        if (done) return cbStream(fullText);
        const { deltas, finished } = this.__cleanResponse(decoder.decode(value));
        let inserted = false;
        if (deltas.length > 0) {
            for (let deltaIndex = 0; deltaIndex < deltas.length; deltaIndex += 1) {
                if (!inserted) inserted = true;
                fullText += deltas[deltaIndex];
            }
            if (inserted) cbStream(fullText);
        }
        if (!finished) fullText += await this.__readResponse(reader, fullText, cbStream);
        return fullText;
    }

  function ask() {
    let fullText = '';
    fetch('https://api.openai.com/v1/chat/completions', {
                method: 'POST',
                headers: {
                    Authorization: "Bearer" + this.apiKey,
                    'Content-Type': 'application/json',
                },
                body: JSON.stringify({
                    messages,
                    model: 'gpt-3.5-turbo',
                    stream: true,
                }),
            }).then(async (res) => {
                const reader = res.body.getReader();
                fullText += await this.__readResponse(reader, fullText, cbStream);
            }).catch((err) => {
                error = true;
                console.error(err);
            });
  }