simonw / datasette

An open source multi-tool for exploring and publishing data
https://datasette.io
Apache License 2.0
9.61k stars 690 forks source link

Research option for returning all rows from arbitrary query #1550

Open simonw opened 2 years ago

simonw commented 2 years ago

Inspired by thinking about #1549 - returning ALL rows from an arbitrary query is a lot easier if you just run that query and keep iterating over the cursor.

I've avoided doing that in the past because it could tie up a connection for a long time - but in private instances this wouldn't be such a problem.

simonw commented 2 years ago

I wonder if this could work for public instances too with some kind of queuing mechanism?

I really need to use benchmarking to figure out what the right number of maximum SQLite connections is. I'm just guessing at the moment.

simonw commented 2 years ago

I built a tiny Starlette app to experiment with this a bit:

import asyncio
import janus
from starlette.applications import Starlette
from starlette.responses import JSONResponse, HTMLResponse, StreamingResponse
from starlette.routing import Route
import sqlite3
from concurrent import futures

executor = futures.ThreadPoolExecutor(max_workers=10)

async def homepage(request):
    return HTMLResponse(
        """
        <html>
        <head><title>SQL CSV Server</title>
        <style>body { width: 40rem; font-family: helvetica; margin: 2em auto; }</style>
        <body>
        <h1>SQL CSV Server</h1>
        <form action="/csv">
        <label style="display: block">SQL query:
        <textarea style="width: 90%; height: 20em" name="sql"></textarea>
        </label>
        <input type="submit" value="Run query">
        </form>
        </head>
        """
    )

def run_query_in_thread(sql, sync_q):
    db = sqlite3.connect("../datasette/covid.db")
    cursor = db.cursor()
    cursor.arraysize = 100 # Default is 1 apparently?
    cursor.execute(sql)
    columns = [d[0] for d in cursor.description]
    sync_q.put([columns])
    # Now start putting batches of rows
    while True:
        rows = cursor.fetchmany()
        if rows:
            sync_q.put(rows)
        else:
            break
    # Let queue know we are finished\
    sync_q.put(None)

async def csv_query(request):
    sql = request.query_params["sql"]

    queue = janus.Queue()
    loop = asyncio.get_running_loop()

    async def csv_generator():
        loop.run_in_executor(None, run_query_in_thread, sql, queue.sync_q)
        while True:
            rows = await queue.async_q.get()
            if rows is not None:
                for row in rows:
                    yield ",".join(map(str, row)) + "\n "
                queue.async_q.task_done()
            else:
                # Cleanup
                queue.close()
                await queue.wait_closed()
                break

    return StreamingResponse(csv_generator(), media_type='text/plain')

app = Starlette(
    debug=True,
    routes=[
        Route("/", homepage),
        Route("/csv", csv_query),
    ],
)

But.. if I run this in a terminal window:

/tmp % wget 'http://127.0.0.1:8000/csv?sql=select+*+from+ny_times_us_counties'

it takes about 20 seconds to run and returns a 50MB file - but while it is running no other requests can be served by that server - not even the homepage! So something is blocking the event loop.

Maybe I should be using fut = loop.run_in_executor(None, run_query_in_thread, sql, queue.sync_q) and then awaiting fut somewhere, like in the Janus documentation? Don't think that's needed though. Needs more work to figure out why this is blocking.