simonw / datasette-openai

SQL functions for calling OpenAI APIs
https://datasette.io/plugins/datasette-openai
Apache License 2.0
21 stars 2 forks source link

Support streaming API to help avoid timeouts #3

Open simonw opened 1 year ago

simonw commented 1 year ago

I'm seeing a lot of timeouts where the openai_davinci() SQL function takes too long to load.

The streaming API - add "stream": true to the API JSON call - could be a neat way to avoid this... but it's obviously not compatible with running inside a SQL execution.

So how about a separate mechanism where you can define a SQL query that returns a prompt and Datasette then gives you a separate UI which executes that prompt in a streaming manner and streams it to your browser?

This might also provide a neat way to add features like rate-limiting, and to hide the API key while still allowing users to use it.

simonw commented 1 year ago

One curious way to implement this would be as an output renderer: https://docs.datasette.io/en/stable/plugin_hooks.html#register-output-renderer-datasette

It could fire on queries that return a single row with columns model, prompt, max_tokens and temperature (and maybe some more).

simonw commented 1 year ago

It would need to return a streaming response, which I think can be done by overriding this asgi_send method: https://github.com/simonw/datasette/blob/8e7073404379d79a2d269167a12bbb58439edd39/datasette/utils/asgi.py#L344

simonw commented 1 year ago

Built a working prototype:

diff --git a/datasette_openai/__init__.py b/datasette_openai/__init__.py
index b8cca70..1a3091a 100644
--- a/datasette_openai/__init__.py
+++ b/datasette_openai/__init__.py
@@ -1,5 +1,6 @@
-from datasette import hookimpl
+from datasette import hookimpl, Response
 import httpx
+import json
 import struct

@@ -56,3 +57,69 @@ def decode(blob):

 def encode(values):
     return struct.pack("f" * 1536, *values)
+
+
+def can_render(columns):
+    return {"prompt", "model", "max_tokens"}.issubset(columns)
+
+
+async def render(rows):
+    row = dict(rows[0])
+    prompt = row["prompt"]
+    model = row["model"]
+    max_tokens = row["max_tokens"]
+    api_key = "sk-..."
+
+    class GptResponse(Response):
+        async def asgi_send(self, send):
+            headers = {}
+            headers["content-type"] = "text/plain"
+            raw_headers = [
+                [key.encode("utf-8"), value.encode("utf-8")]
+                for key, value in headers.items()
+            ]
+            await send(
+                {
+                    "type": "http.response.start",
+                    "status": 200,
+                    "headers": raw_headers,
+                }
+            )
+            client = httpx.AsyncClient()
+            async with client.stream(
+                "POST",
+                "https://api.openai.com/v1/completions",
+                headers={"Authorization": f"Bearer {api_key}"},
+                json={
+                    "model": model,
+                    "prompt": prompt,
+                    "max_tokens": max_tokens,
+                    # "temperature": temperature,
+                    "stream": True,
+                },
+                timeout=15.0,
+            ) as response:
+                async for link in response.aiter_lines():
+                    if link.startswith("data: {"):
+                        decoded = json.loads(link.split("data: ", 1)[1])
+                        bit = decoded["choices"][0]["text"]
+                        await send(
+                            {
+                                "type": "http.response.body",
+                                "body": bit.encode("utf-8"),
+                                "more_body": True,
+                            }
+                        )
+
+            await send({"type": "http.response.body", "body": b""})
+
+    return GptResponse()
+
+
+@hookimpl
+def register_output_renderer(datasette):
+    return {
+        "extension": "openai",
+        "render": render,
+        "can_render": can_render,
+    }

This streamed a line at a time:

% curl 'http://127.0.0.1:8001/simonwillisonblog.openai?sql=select+%27write+a+poem+about+an+otter+and+owl+who+are+friends%27+as+prompt%2C+%27text-davinci-003%27+as+model%2C+512+as+max_tokens'

Two soulmates united,
An Otter and an Owl
A friendship so devoted,
It's a must for them to cuddle. 

A dutiful duo that loves life,
Adventuring along with no strife. 
Lighthearted, silly and comical,
Striding into the unknown—ecstatic and full.

The Otter and Owl cross paths so divine,
As if there were only one line,
Connecting their hearts through the air,
Creating a bond they'll be sure to share.

The Owl soaring high, 
His intelligence shines a light.
The Otter plunging deep,
Her love so fierce, so bright.

They'll stay together, close and tight
Keeping each other warm day and night.
An Otter and an Owl, friends 'till the end
What a special bond, for them to defend.
simonw commented 1 year ago

Good proof of concept. Some decisions I need to make:

simonw commented 1 year ago

I got ChatGPT to generate this:

const eventSource = new EventSource("API_ENDPOINT_URL");
const textarea = document.getElementById("textarea-id");

eventSource.onmessage = (event) => {
  textarea.value += event.data + '\n';
};

It said that this won't handle errors, so I got it to produce this instead:

let eventSource = new EventSource("API_ENDPOINT_URL");
const textarea = document.getElementById("textarea-id");
let connectionAttempts = 0;
const maxConnectionAttempts = 5;

eventSource.onmessage = (event) => {
  textarea.value += event.data + '\n';
};

eventSource.onerror = (event) => {
  if (event.target.readyState === EventSource.CLOSED) {
    if (connectionAttempts < maxConnectionAttempts) {
      console.log(`Connection lost. Attempting to reconnect (attempt ${connectionAttempts})...`);
      connectionAttempts++;
      setTimeout(() => {
        eventSource.close();
        eventSource = new EventSource("API_ENDPOINT_URL");
      }, 5000);
    } else {
      console.log(`Maximum connection attempts reached. Giving up.`);
    }
  }
};

My prompts to get that final result:

simonw commented 1 year ago

That prototype in easier to copy-and-paste format:


def can_render(columns):
    return {"prompt", "model", "max_tokens"}.issubset(columns)

async def render(rows):
    row = dict(rows[0])
    prompt = row["prompt"]
    model = row["model"]
    max_tokens = row["max_tokens"]
    api_key = "sk-..."

    class GptResponse(Response):
        async def asgi_send(self, send):
            headers = {}
            headers["content-type"] = "text/plain"
            raw_headers = [
                [key.encode("utf-8"), value.encode("utf-8")]
                for key, value in headers.items()
            ]
            await send(
                {
                    "type": "http.response.start",
                    "status": 200,
                    "headers": raw_headers,
                }
            )
            client = httpx.AsyncClient()
            async with client.stream(
                "POST",
                "https://api.openai.com/v1/completions",
                headers={"Authorization": f"Bearer {api_key}"},
                json={
                    "model": model,
                    "prompt": prompt,
                    "max_tokens": max_tokens,
                    # "temperature": temperature,
                    "stream": True,
                },
                timeout=15.0,
            ) as response:
                async for link in response.aiter_lines():
                    if link.startswith("data: {"):
                        decoded = json.loads(link.split("data: ", 1)[1])
                        bit = decoded["choices"][0]["text"]
                        await send(
                            {
                                "type": "http.response.body",
                                "body": bit.encode("utf-8"),
                                "more_body": True,
                            }
                        )

            await send({"type": "http.response.body", "body": b""})

    return GptResponse()

@hookimpl
def register_output_renderer(datasette):
    return {
        "extension": "openai",
        "render": render,
        "can_render": can_render,
    }