broadinstitute / lrma-cloud-utils

A python library for interacting with GCS, Cromwell, and Terra
BSD 3-Clause "New" or "Revised" License
2 stars 1 forks source link

DO NOT CLOSE #27

Open SHuang-Broad opened 11 months ago

SHuang-Broad commented 11 months ago

This ticket is for keeping some utilities that should be incorporated in to the next release

SHuang-Broad commented 11 months ago
def is_not_available(column: pd.Series):
    """
    For checking which entities doesn't have an attribute yet
    """
    idx_1 = column.isna()

    def more_nuanced(v) -> bool:
        if isinstance(v, float):
            return np.isnan(v)
        else:
            return str(v).lower() in ('', 'nan', 'na')
    idx_2 = column.apply(more_nuanced)

    return idx_1 | idx_2
SHuang-Broad commented 11 months ago
def get_QC_ready_entities_table(input_table: pd.DataFrame,
                                upstream_indicator_columns: List[str],
                                downstream_indicator_columns: List[str]) -> pd.DataFrame:
    """
    Entities ready for QC workflows must meet the following criteria:
        upstream_indicator_columns are available, and 
        downstream_indicator_columns are unavailable, 
    """
    # upstream are ready
    idx = (~input_table.iloc[:,0].isna())  # hacky way to initialize to all True
    for attr in upstream_indicator_columns:
        if attr not in input_table.columns:  # a column isn't available, definitely not ready
            idx = input_table.iloc[:,0].isna()
            break
        idx &= (~is_not_available(input_table[attr]))

    # downstream aren't ready
    idy = input_table.iloc[:,0].isna()  # hacky way to initialize to all False
    for attr in downstream_indicator_columns:
        if attr not in input_table.columns:  # a column isn't available, definitely ready
            idy = (~input_table.iloc[:,0].isna())
            break
        idy |= is_not_available(input_table[attr])

    return input_table.loc[(idx & idy), ].reset_index(drop=True)
SHuang-Broad commented 11 months ago
def launch_QC_workflows(input_table: pd.DataFrame,
                        upstream_indicator_columns: List[str],
                        downstream_indicator_columns: List[str],
                        qc_workflows: List[str]) -> None:
    """
    For a (parsed) Terra table, launch QC workflows on entities that
    are not yet QCed, but are ready to be QCed.
    """

    qc_ready_table = get_QC_ready_entities_table(input_table, 
                                                 upstream_indicator_columns, 
                                                 downstream_indicator_columns)

    if 0 == len(qc_ready_table):
        print("Nothing to QC in this batch.")
        return

    print(f"{len(qc_ready_table)} entities to QC in this batch.")
    etype = input_table.columns[0]
    for wdl_name in qc_workflows:
        try:
            verify_before_submit(terra_namespace, terra_workspace,
                                 workflow_name=wdl_name,
                                 etype=etype,
                                 enames=qc_ready_table.iloc[:,0].tolist(),
                                 use_callcache=True,
                                 batch_type_name=f"zzBatch_{wdl_name.replace('.','_')}",
                                 expression = f'this.{etype}s',
                                 days_back = 7, count = 3)
        except Exception as e:
            print(f"Failed to launch workflow {wdl_name} for {etype}")
            print("I'll continue to the following, but humans, please check.")
            print(repr(e))
            failed_to_launch_workflow_somewhere.append(f"Failed to launch {wdl_name} for {etype}.")
SHuang-Broad commented 11 months ago
def get_QCed_entities_table(input_table: pd.DataFrame, 
                            output_table_uuid_colname: str,
                            qc_result_columns: List[str],
                            other_columns_to_carryover: List[str]) -> pd.DataFrame:
    """
    Get entities in a (parsed) Terra table that have QC results ready.
    """
    idx = (~input_table.iloc[:,0].isna())  # hacky way to initialize to all True
    for attr in qc_result_columns:
        if attr not in input_table.columns:  # a column isn't available, definitely not ready
            idx = input_table.iloc[:,0].isna()
            break
        idx &= (~is_not_available(input_table[attr]))

    etype = input_table.columns[0]
    output_col = [etype]
    output_col.extend(qc_result_columns)
    output_col.extend(other_columns_to_carryover)

    return input_table.loc[idx, output_col]\
            .reset_index(drop=True)\
            .rename({etype:output_table_uuid_colname}, axis=1)
SHuang-Broad commented 11 months ago
def compute_cost_this_month(ns: str, ws: str) -> float:
    """
    Return WDL-related compute for a workspace for the current month.
    """

    response = retry_fiss_api_call('list_submissions', 2, 
                                   namespace=terra_namespace, workspace=terra_workspace)
    if not response.ok:
        pass

    sorted_subs = sorted(response.json(), key=lambda sub: parser.parse(sub['submissionDate']))

    month_beg = parser.parse(f"{datetime.datetime.now().year}-{datetime.datetime.now().month}-01T05:00:00.000Z")
    this_mongth_subs = [sub for sub in sorted_subs
                        if parser.parse(sub['submissionDate']) > month_beg]

    sum = 0.0
    for sub in tqdm(relevant_subs):
        response = retry_fiss_api_call('get_submission', 2, 
                                       namespace=terra_namespace, workspace=terra_workspace, 
                                       submission_id=sub['submissionId'])
        if not response.ok:
            pass
        sum += response.json()['cost']

    print(f"For the month {datetime.datetime.now().strftime('%B')} of {datetime.datetime.now().year},"
          f" a total of {len(relevant_subs)} WDL submissions have costed you ${sum} so far for"
          f" {terra_namespace}/{terra_workspace} ")
    return sum
SHuang-Broad commented 7 months ago

This is an idea for retry_fiss_api_call to be even more resilient to random dropouts from firecloud API

        try:
            # call FISS API function by its name
            fiss_call = getattr(fapi, func_name)
            response = fiss_call(*args, **kwargs)
            if 200 == response.status_code:
                connection_reset = False