datasette / datasette-enrichments-quickjs

Enrich data with a custom JavaScript function
Apache License 2.0
1 stars 0 forks source link

Run operations in a separate thread #2

Open simonw opened 7 months ago

simonw commented 7 months ago

Currently I'm using Function: https://github.com/datasette/datasette-enrichments-quickjs/blob/d329c4afb2f59e25017017e957095c8456ee5aec/datasette_enrichments_quickjs/__init__.py#L82-L86

It turns out this already runs in a separate thread - it's a feature of the quickjs library to help avoid threading issues from QuickJS itself.

But... that library also provides a Context() object which does NOT have its own thread.

I don't think Function() is what I want because even though it runs in a separate thread it still blocks the main event loop.

quickjs documentation is effectively their test suite: https://github.com/PetterS/quickjs/blob/master/test_quickjs.py

simonw commented 6 months ago
curl 'https://raw.githubusercontent.com/datasette/datasette-enrichments-quickjs/main/datasette_enrichments_quickjs/__init__.py' \
  | llm -m claude-3-opus -s 'update this code to run the quickjs operations in a separate thread'

It suggested:

from concurrent.futures import ThreadPoolExecutor
# ...
    def __init__(self):
        self.executor = ThreadPoolExecutor(max_workers=1)
# ...

    async def enrich_batch(
        self,
        db,
        table,
        rows,
        pks,
        config,
    ):
        function = Function("enrich", config["javascript"])
        function.set_time_limit(0.1)  # 0.1s
        function.set_memory_limit(4 * 1024 * 1024)  # 4MB
        output_column = config["output_column"]

        def process_row(row):
            try:
                output = function(row)
            except Exception as ex:
                print(ex, repr(ex))
                raise
            return output

        for row in rows:
            future = self.executor.submit(process_row, row)
            output = await asyncio.wrap_future(future)
            await db.execute_write(
                "update [{table}] set [{output_column}] = ? where {wheres}".format(
                    table=table,
                    output_column=output_column,
                    wheres=" and ".join('"{}" = ?'.format(pk) for pk in pks),
                ),
                [output] + list(row[pk] for pk in pks),
            )

I didn't know about asyncio.wrap_future() described here: https://docs.python.org/3/library/asyncio-future.html#asyncio.wrap_future