Open asaf opened 3 months ago
We can go with a simple AsyncJob
model and a non scheduled ad-hoc executor for the time being to make things simple, alternatives:
class AsyncJobStatus(str, Enum):
pending = "pending"
running = "running"
completed = "completed"
failed = "failed"
class JobType(str, Enum):
# Define your job types here, similar to JobStatus
pass
class AsyncJob(BaseModel):
id: str = Field(..., alias="id")
type: JobType = Field(..., alias="type")
status: JobStatus = Field(..., alias="status")
retry_count: int = Field(..., alias="retry_count")
max_retries: int = Field(..., alias="max_retries")
last_attempted_at: Optional[datetime] = Field(None, alias="last_attempted_at")
scheduled_at: Optional[datetime] = Field(None, alias="scheduled_at")
parameters: Dict = Field(..., alias="parameters")
result_data: Dict = Field(..., alias="result_data")
error: str = Field(..., alias="error")
created_at: datetime = Field(default_factory=datetime.now)
updated_at: datetime = Field(default_factory=datetime.now)
with a simple executor (this is just a pseudo code):
class JobExecutor:
def __init__(self, registry: Registry, service: AsyncJobService):
self.registry = registry
self.s = service
def process_job(self, ctx, job: AsyncJob):
handler = self.registry.get_handler(job.type)
# Check if the job is scheduled for a future time
now = datetime.utcnow()
if job.scheduled_at and job.scheduled_at > now:
err = Exception(f"Job is scheduled for future execution at {job.scheduled_at}")
logger.error(f"Job {job.id} failed: {err}")
self.fail_job(ctx, job, err)
return
if job.status in {JobStatus.running, JobStatus.completed} or \
(job.status == JobStatus.failed and job.retry_count >= job.max_retries):
err = Exception(f"Job is already in {job.status} status or has reached max retries")
logger.error(f"Job {job.id} failed: {err}")
self.fail_job(ctx, job, err)
return
# Update job to running status
job.status = JobStatus.running
job.last_attempted_at = now
try:
self.s.replace(ctx, job)
except Exception as err:
err = Exception(f"Error updating job status to running: {err}")
logger.error(f"Job {job.id} failed: {err}")
self.fail_job(ctx, job, err)
return
# Execute the job handler
try:
res = handler(ctx, job)
except Exception as err:
logger.error(f"Error executing job {job.id}: {err}")
job.status = JobStatus.failed
self.fail_job(ctx, job, err)
return
# Update job with final status and result
job.status = JobStatus.completed
try:
res_map = convert_to_json_map(res)
except Exception as err:
err = Exception(f"Error converting result to map: {err}")
logger.error(f"Job {job.id} failed: {err}")
self.fail_job(ctx, job, err)
return
job.result_data = res_map
try:
self.s.replace(ctx, job)
except Exception as err:
logger.error(f"Error updating final job status for job {job.id}: {err}")
def fail_job(self, ctx, job: AsyncJob, err: Exception):
job.status = JobStatus.failed
job.error = str(err)
try:
self.s.replace(ctx, job)
except Exception as e:
logger.error(f"Error updating job status to failed for job {job.id}: {e}")
I put an alternative via celery https://github.com/crossid/accessbot/tree/draft/celery
pros:
cons:
We have to audit the actual granted access, in general not having a retry mechanism is a bad idea but at this point is not mandatory.
We should be able to create a report of: recommendation summary -> (optional: data owner decision summary) -> granted access.