Closed simonw closed 10 months ago
Currently batches are executed like this:
That enrich_batch()
method is provided by the embedding subclass.
Having that method raise an exception isn't ideal, because if you hand it 100 rows and it works fine for 99 of them but errors on one we don't want to assume the entire batch failed.
Instead, it could return a list of errors - something like this:
errors = await self.enrich_batch(
db, job["table_name"], rows, json.loads(job["config"])
)
That returned list would need to map row primary keys to errors though - and the information about what a primary key is has not really been dealt with yet.
Maybe that list is a [None] * 100
for the case when everything worked, otherwise the errors are strings that correspond to the incoming list of rows? Then the calling code could count them and write them to the database.
Bit of an ugly thing to have to implement each time in the embeddings though.
Unless... there could be a Embedding
subclass called OneAtATimeEmbedding
or similar where you can implement just a single enrich_single(self, row)
method which is allowed to raise any errors it likes, and those errors will then be stringified automatically by the default enrich_batch()
method.
Then if you want to do fancy batches - like with the OpenAI embeddings API - you can implement enrich_batch()
but most embedding implementations can just implement enrich_single()
instead.
Then I can have a table called _enrichment_errors
that looks like this:
create table _enrichment_errors (
id integer primary key,
job_id integer references _enrichment_jobs(id),
row_pk text, -- JSON encoded, can be integer or string or ["compound", "pk"]
error text,
created_at text
)
I think it's on the plugins themselves to catch errors and write them to this table.
I'll add a await self.log_error(job_id, row_ids, error)
method.
I've been thinking about having an optional RowWiseEnrichment
subclass which applies them a row at a time using a enrich_row()
method - I could implement that and have any exceptions it raises automatically logged.
Or I could just have a default .enrich_batch()
method which calls .enrich_row()
- then the class author can decide which of those methods to override.
If you override .enrich_batch()
then it's on you to record errors.
New idea: instead of this:
async def log_error(self, db: "Database", job_id: int, id: Any, error: str):
I'll do this:
async def log_error(self, db: "Database", job_id: int, ids: List[Union[int, str, Tuple[]], error: str):
So any time you record an error you can record it against a LIST of row primary keys,
from typing import Union, Tuple, List
# Define the type for the elements in the list
ElementType = Union[int, str, Tuple[Union[int, str], ...]]
async def log_error(..., ids: List[ElementType]):
pass
That way I can have a default error logging mechanism where if your enrich_batch()
method throws an error I can record that along with the IDs of all of the items in that batch.
Refs:
I added a
error_count
column, and I have the idea that the job should automatically be cancelled if more than X errors occur (default X = 5).But... how does that actually work at the code level?