)
@dataclass(frozen=True, init=True)
class SplitTaskJob(TaskJob):
segment_number: int
segment_range_in: int
segment_range_out: int
def __post_init__(self):
# TODO: Custom exceptions for task job validation
if not os.path.exists(self.source.file_path): # SOURCE ACCESSIBLE
raise FileNotFoundError(
f"Provided source file '{self.source.file_path}' does not exist"
)
# if os.path.exists(self.): # NO OVERWRITE
# raise FileExistsError(
# f"File already exists at provided output path {self.output_file_path}"
# )
if self.input_level not in [
"in_range=full",
"in_range=limited",
]: # CHECK VALID VIDEO LEVELS
raise ValueError(
f"Calculated video levels are invalid: '{self.input_level}'"
)
def ffmpeg_video_flip(job: TaskJob):
flip_string = ""
if job.source.h_flip:
Custom exceptions for task job validation
raise FileExistsError(
f"File already exists at provided output path {self.output_file_path}"
)
https://github.com/in03/proxima/blob/f30a0bf4098ad871fbafe5658c295dc847ec97d1/src/proxima/celery/tasks.py#L93
ca1bee3f16b2f1af9c8177011359bf60f5d3ecbb