Open simonw opened 3 years ago
Current design: https://docs.datasette.io/en/stable/plugin_hooks.html#register-output-renderer-datasette
@hookimpl
def register_output_renderer(datasette):
return {
"extension": "test",
"render": render_demo,
"can_render": can_render_demo, # Optional
}
Where render_demo
looks something like this:
async def render_demo(datasette, columns, rows):
db = datasette.get_database()
result = await db.execute("select sqlite_version()")
first_row = " | ".join(columns)
lines = [first_row]
lines.append("=" * len(first_row))
for row in rows:
lines.append(" | ".join(row))
return Response(
"\n".join(lines),
content_type="text/plain; charset=utf-8",
headers={"x-sqlite-version": result.first()[0]}
)
Meanwhile here's where the CSV streaming mode is implemented: https://github.com/simonw/datasette/blob/4bac9f18f9d04e5ed10f072502bcc508e365438e/datasette/views/base.py#L297-L380
The trick I'm using here is to follow the next_url
in order to paginate through all of the matching results. The loop calls the data()
method multiple times, once for each page of results: https://github.com/simonw/datasette/blob/4bac9f18f9d04e5ed10f072502bcc508e365438e/datasette/views/base.py#L304-L307
Yet another use-case for this: I want to be able to stream newline-delimited JSON in order to better import into Pandas:
pandas.read_json("https://latest.datasette.io/fixtures/compound_three_primary_keys.json?_shape=array&_nl=on", lines=True)
Idea: instead of returning a dictionary, register_output_renderer
could return an object. The object could have the following properties:
.extension
- the extension to use.can_render(...)
- says if it can render this.can_stream(...)
- says if streaming is supportedasync .stream_rows(rows_iterator, send)
- method that loops through all rows and uses send
to send them to the response in the correct formatI can then deprecate the existing dict
return type for 1.0.
With this structure it will become possible to stream non-newline-delimited JSON array-of-objects too - the stream_rows()
method could output [
first, then each row followed by a comma, then ]
after the very last row.
This would really help with this issue: https://github.com/eyeseast/datasette-geojson/issues/7
Relevant blog post: https://simonwillison.net/2021/Jun/25/streaming-large-api-responses/ - including notes on efficiently streaming formats with some kind of separator in between the records (regular JSON).
Some export formats are friendlier for streaming than others. CSV and TSV are pretty easy to stream, as is newline-delimited JSON.
Regular JSON requires a bit more thought: you can output a
[
character, then output each row in a stream with a comma suffix, then skip the comma for the last row and output a]
. Doing that requires peeking ahead (looping two at a time) to verify that you haven't yet reached the end.Or... Martin De Wulf pointed out that you can output the first row, then output every other row with a preceeding comma---which avoids the whole "iterate two at a time" problem entirely.
Maybe the simplest design for this is to add an optional can_stream
to the contract:
@hookimpl
def register_output_renderer(datasette):
return {
"extension": "tsv",
"render": render_tsv,
"can_render": lambda: True,
"can_stream": lambda: True
}
When streaming, a new parameter could be passed to the render function - maybe chunks
- which is an iterator/generator over a sequence of chunks of rows.
Or it could use the existing rows
parameter but treat that as an iterator?
What if you split rendering and streaming into two things:
render
is a function that returns a responsestream
is a function that sends chunks, or yields chunks passed to an ASGI send
callbackThat way current plugins still work, and streaming is purely additive. A stream
function could get a cursor or iterator of rows, instead of a list, so it could more efficiently handle large queries.
I'm questioning if the mechanisms should be separate at all now - a single response rendering is really just a case of a streaming response that only pulls the first N records from the iterator.
It probably needs to be an async for
iterator, which I've not worked with much before. Good opportunity to learn.
This actually gets a fair bit more complicated due to the work I'm doing right now to improve the default JSON API:
I want to do things like make faceting results optionally available to custom renderers - which is a separate concern from streaming rows.
I'm going to poke around with a bunch of prototypes and see what sticks.
The datasette-geojson
plugin is actually an interesting case here, because of the way it converts SpatiaLite geometries into GeoJSON: https://github.com/eyeseast/datasette-geojson/blob/602c4477dc7ddadb1c0a156cbcd2ef6688a5921d/datasette_geojson/__init__.py#L61-L66
if isinstance(geometry, bytes):
results = await db.execute(
"SELECT AsGeoJSON(:geometry)", {"geometry": geometry}
)
return geojson.loads(results.single_value())
That actually seems to work really well as-is, but it does worry me a bit that it ends up having to execute an extra SELECT
query for every single returned row - especially in streaming mode where it might be asked to return 1m rows at once.
My PostgreSQL/MySQL engineering brain says that this would be better handled by doing a chunk of these (maybe 100) at once, to avoid the per-query-overhead - but with SQLite that might not be necessary.
At any rate, this is one of the reasons I'm interested in "iterate over this sequence of chunks of 100 rows at a time" as a potential option here.
Of course, a better solution would be for datasette-geojson
to have a way to influence the SQL query before it is executed, adding a AsGeoJSON(geometry)
clause to it - so that's something I'm open to as well.
Ha! That was your idea (and a good one).
But it's probably worth measuring to see what overhead it adds. It did require both passing in the database and making the whole thing async
.
Just timing the queries themselves:
AsGeoJSON(geometry) as geometry
takes 10.235 msLooking at the network panel:
fetch
requestI'm not sure how best to time the GeoJSON generation, but it would be interesting to check. Maybe I'll write a plugin to add query times to response headers.
The other thing to consider with async streaming is that it might be well-suited for a slower response. When I have to get the whole result and send a response in a fixed amount of time, I need the most efficient query possible. If I can hang onto a connection and get things one chunk at a time, maybe it's ok if there's some overhead.
Idea for supporting streaming with the register_output_renderer
hook:
@hookimpl
def register_output_renderer(datasette):
return {
"extension": "test",
"render": render_demo,
"can_render": can_render_demo,
"render_stream": render_demo_stream, # This is new
}
So there's a new "render_stream"
key which can be returned, which if present means that the output renderer supports streaming.
I'll play around with the design of that function signature in:
Idea: instead of returning a dictionary,
register_output_renderer
could return an object. The object could have the following properties:* `.extension` - the extension to use * `.can_render(...)` - says if it can render this * `.can_stream(...)` - says if streaming is supported * `async .stream_rows(rows_iterator, send)` - method that loops through all rows and uses `send` to send them to the response in the correct format
I can then deprecate the existing
dict
return type for 1.0.
I really like this. One thing that would be great to add would be the ability to cancel the rows_iterator. The use case I have is to have a datatables extension that would wrap the orginal sql query in a new sql query like select * from ({original_query}) as t where [..... conditions from datatables server side api]
.
if the streaming data was truly lazy, then this would be easy, I would just throw away the original row_iterator, and then construct my own. This could handle many of the use cases of #2000.
Originally posted by @simonw in https://github.com/simonw/datasette/issues/1096#issuecomment-732542285