datasette / datasette-enrichments

Tools for running enrichments against data stored in Datasette
https://enrichments.datasette.io
Apache License 2.0
16 stars 0 forks source link

Mechanism for monitoring, pausing, cancelling and continuing a run #17

Open simonw opened 7 months ago

simonw commented 7 months ago

The _enrichment_jobs table is designed to help support reporting on enrichment runs:

https://github.com/datasette/datasette-enrichments/blob/33d0e4b0f9a415d4842d041378f3cd5414635c93/datasette_enrichments/__init__.py#L26-L43

Since it persists the next_cursor it would also be possible to use this table to both pause and continue a run. This could happen automatically if a server restart interrupts a run.

Can also use this to build pause, continue and cancel buttons on a dashboard. Plus a progress bar.

simonw commented 7 months ago

Related:

simonw commented 7 months ago

This query gets the jobs back in the correct order ("running" at the top):

select *
from _enrichment_jobs
order by 
    case status
        when 'running' then 1
        when 'pending' then 2
        when 'finished' then 3
        when 'cancelled' then 4
    end,
    started_at desc;

I probably need to run it against every connected database though, since the _enrichment_jobs table ends up in multiple files.

Which is an indication that maybe I should move it to the _internal database instead.

simonw commented 7 months ago

I'm tempted to get an initial version of this working using the render_cell() plugin hook on that status column - I could add inline buttons and a progress bar at the same time.

simonw commented 7 months ago

The user is now redirected to /db/table?_enrichment_job=X after they start an enrichment, which can support adding UI to that page showing what is going on.

simonw commented 2 months ago

Bit of a prototype here:

diff --git a/datasette_enrichments/__init__.py b/datasette_enrichments/__init__.py
index 37ec479..c8ad53b 100644
--- a/datasette_enrichments/__init__.py
+++ b/datasette_enrichments/__init__.py
@@ -383,3 +383,134 @@ def actor_from_request(datasette, request):
         secret_token, datasette._secret_enrichments_token
     ):
         return {"_datasette_enrichments": True}
+
+
+PROGRESS_JS = """
+const endpoint = 'ENDPOINT';
+const pollInterval = 2000;
+
+let lastPolledTime = Date.now();
+let lastDone = 0;
+let intervalId;
+
+// Function to create and insert progress bar elements
+function setupProgressBar() {
+    // Create elements
+    const progressBarWrapper = document.createElement('div');
+    const progressBar = document.createElement('div');
+    const etaText = document.createElement('p');
+    const etaSpan = document.createElement('span');
+
+    // Set attributes and styles
+    progressBarWrapper.id = 'progressBarWrapper';
+    progressBar.id = 'progressBar';
+    progressBar.style.width = '0%';
+    progressBar.style.height = '20px';
+    progressBar.style.backgroundColor = 'green';
+    etaSpan.id = 'eta';
+    etaText.innerText = 'ETA: ';
+    etaText.appendChild(etaSpan);
+
+    // Append elements
+    progressBarWrapper.appendChild(progressBar);
+    progressBarWrapper.appendChild(etaText);
+
+    // Insert elements into the DOM
+    const table = document.querySelector('table.rows-and-columns');
+    if (table) {
+        table.parentNode.insertBefore(progressBarWrapper, table);
+    } else {
+        console.error('Table not found.');
+    }
+}
+
+function updateProgress() {
+    fetch(endpoint)
+        .then(response => response.json())
+        .then(data => {
+            const row = data.rows[0]
+            const todo = row.row_count;
+            const done = row.done_count;
+            const total = todo + done;
+            const progressPercent = (done / total) * 100;
+
+            // Update progress bar
+            document.getElementById('progressBar').style.width = `${progressPercent}%`;
+
+            // Check if there are remaining tasks
+            const tasksRemaining = total - done;
+            if (tasksRemaining <= 0) {
+                // Stop polling when no tasks remain and update ETA to "Completed"
+                clearInterval(intervalId);
+                document.getElementById('eta').innerText = 'Completed';
+                return;
+            }
+
+            // Calculate ETA
+            const currentTime = Date.now();
+            const timeElapsed = currentTime - lastPolledTime;
+            const tasksCompleted = done - lastDone;
+
+            if (tasksCompleted > 0) {
+                const rate = tasksCompleted / timeElapsed; // tasks per millisecond
+                const timeRemaining = tasksRemaining / rate;
+                const eta = new Date(currentTime + timeRemaining);
+
+                // Update ETA display
+                document.getElementById('eta').innerText = eta.toLocaleTimeString();
+            }
+
+            lastPolledTime = currentTime;
+            lastDone = done;
+        })
+        .catch(error => console.error('Error fetching data:', error));
+}
+
+// Setup progress bar and initiate polling
+setupProgressBar();
+updateProgress();
+intervalId = setInterval(updateProgress, pollInterval);
+""".strip()
+
+
+@hookimpl
+def extra_body_script(request, view_name, table, database, datasette):
+    if view_name != "table":
+        return
+    job_id = request.args.get("_enrichment_job_id")
+    if not job_id:
+        return
+
+    async def inner():
+        # Are there any incomplete jobs for this table?
+        db = datasette.get_database(database)
+        try:
+            jobs = await db.execute(
+                """
+                select id, status, done_count, row_count
+                from _enrichment_jobs
+                where table_name = ?
+            """,
+                (table,),
+            )
+            row = jobs.first()
+            if not row:
+                return
+        except Exception:
+            return
+        if row["done_count"] < row["row_count"]:
+            return PROGRESS_JS.replace(
+                "ENDPOINT",
+                datasette.urls.path(
+                    datasette.urls.table(database, "_enrichment_jobs")
+                    + "?"
+                    + urllib.parse.urlencode(
+                        {
+                            "database_name": database,
+                            "table_name": table,
+                        }
+                    )
+                ),
+            )
+
+    return inner
diff --git a/datasette_enrichments/templates/enrichment_picker.html b/datasette_enrichments/templates/enrichment_picker.html
index 9854739..5c29c91 100644
--- a/datasette_enrichments/templates/enrichment_picker.html
+++ b/datasette_enrichments/templates/enrichment_picker.html
@@ -39,4 +39,32 @@
 </dl>
 {% endif %}

+{% if previous_runs %}
+  <h2>Previous runs against this table</h2>
+  <ul>
+    {% for run in previous_runs %}
+      <li>
+        <strong>Run ID:</strong> {{ run.id }} <br>
+        <strong>Status:</strong> {{ run.status }} <br>
+        <strong>Progress:</strong>
+        <progress value="{{ run.done_count }}" max="{{ run.row_count }}"></progress><br>
+        <strong>Enrichment:</strong> {{ run.enrichment }} <br>
+        <strong>Database:</strong> {{ run.database_name }} <br>
+        <strong>Table:</strong> {{ run.table_name }} <br>
+        <strong>Filter Query:</strong> {{ run.filter_querystring }} <br>
+        <strong>Config:</strong> {{ run.config }} <br>
+        <strong>Started at:</strong> {{ run.started_at }} <br>
+        <strong>Finished at:</strong> {{ run.finished_at }} <br>
+        <strong>Cancel Reason:</strong> {{ run.cancel_reason }} <br>
+        <strong>Next Cursor:</strong> {{ run.next_cursor }} <br>
+        <strong>Row Count:</strong> {{ run.row_count }} <br>
+        <strong>Error Count:</strong> {{ run.error_count }} <br>
+        <strong>Done Count:</strong> {{ run.done_count }} <br>
+        <strong>Actor ID:</strong> {{ run.actor_id }} <br>
+        <strong>Cost (100ths Cent):</strong> {{ run.cost_100ths_cent }}
+      </li>
+    {% endfor %}
+  </ul>
+{% endif %}
+
 {% endblock %}
diff --git a/datasette_enrichments/views.py b/datasette_enrichments/views.py
index a435d67..3d9cee7 100644
--- a/datasette_enrichments/views.py
+++ b/datasette_enrichments/views.py
@@ -133,6 +133,16 @@ async def enrichment_picker(datasette, request):
             }
         )

+    previous_runs = []
+    sql = """
+    select * from _enrichment_jobs
+    where database_name = :database and table_name = :table
+    order by started_at desc
+    """
+    db = datasette.get_database(database)
+    for job in (await db.execute(sql, {"database": database, "table": table})).rows:
+        previous_runs.append(dict(job))
+
     return Response.html(
         await datasette.render_template(
             "enrichment_picker.html",
@@ -141,6 +151,7 @@ async def enrichment_picker(datasette, request):
                 "table": table,
                 "filtered_data": filtered_data,
                 "enrichments_and_paths": enrichments_and_paths,
+                "previous_runs": previous_runs,
             },
             request,
         )

I'll push that as prototype-run-tracking.