Barski-lab / cwl-airflow

Python package to extend Airflow functionality with CWL1.1 support
https://barski-lab.github.io/cwl-airflow
Apache License 2.0
185 stars 32 forks source link

dag start time #16

Closed WRYFans closed 5 years ago

WRYFans commented 6 years ago
def get_active_jobs(jobs_folder, limit=10):
    """
    :param jobs_folder: job_folder: abs path to the folder with job json files  
    :param limit: max number of jobs to return
    :return: 
    """
    all_jobs = []
    for job_path in list_files(abs_path=jobs_folder, ext=[".json", ".yml", ".yaml"]):
        dag_id = gen_dag_id(job_path)
        dag_runs = DagRun.find(dag_id)
        all_jobs.append({"path": job_path,
                         "creation_date": datetime.fromtimestamp(os.path.getctime(job_path)),
                         "content": load_job(job_path),
                         "dag_id": dag_id,
                         "state": dag_runs[0].state if len(dag_runs) > 0 else State.NONE})
    success_jobs = sorted([j for j in all_jobs if j["state"] == State.SUCCESS], key=lambda k: k["creation_date"], reverse=True)[:limit]
    running_jobs = sorted([j for j in all_jobs if j["state"] == State.RUNNING], key=lambda k: k["creation_date"], reverse=True)[:limit]
    failed_jobs =  sorted([j for j in all_jobs if j["state"] == State.FAILED],  key=lambda k: k["creation_date"], reverse=True)[:limit]
    unknown_jobs = sorted([j for j in all_jobs if j["state"] == State.NONE],    key=lambda k: k["creation_date"], reverse=True)[:limit]
    return success_jobs + running_jobs + failed_jobs + unknown_jobs

Airflow use timezone.utcnow() as current time. So, why not use utcfromtimestamp in this function?

"creation_date": datetime.utcfromtimestamp(os.path.getctime(job_path)),

michael-kotliar commented 6 years ago

Thanks, utcfromtimestamp is really more appropriate here

michael-kotliar commented 5 years ago

Fixed in https://github.com/Barski-lab/cwl-airflow/pull/22