elixir-cloud-aai / proTES

Proxy service for injecting middleware into GA4GH TES requests
Apache License 2.0
5 stars 6 forks source link

Task state set incorrectly when best TES instance fails issue resolved #140

Closed ranjan2829 closed 1 year ago

ranjan2829 commented 1 year ago

Here is a step-by-step process to resolve the issue of setting the task state to SYSTEM_ERROR when submission to the best TES instance fails:

Identify the code section where the task submission to the best available TES instance is attempted.

Introduce a new variable, let's call it "success," that will be initialized to false.

Wrap the task submission code in a loop that iterates through all available TES instances until either the task is successfully submitted or all TES instances have been exhausted.

Within the loop, attempt to submit the task to the current TES instance. If the submission is successful, set the "success" variable to true and break out of the loop.

If the submission to the current TES instance fails, log the error and move on to the next TES instance in the list.

After all TES instances have been attempted, check the value of the "success" variable. If it is true, then the task was successfully submitted to at least one TES instance, so the task state should not be set to SYSTEM_ERROR. If the "success" variable is false, then the task submission failed on all available TES instances, so the state should be set to SYSTEM_ERROR.

Update the code that sets the task state to use the new "success" variable to determine whether the state should be set to SYSTEM_ERROR or not.

Test the updated code thoroughly to ensure that it behaves as expected and does not set the task state to SYSTEM_ERROR prematurely.

If necessary, update any relevant documentation or user-facing messaging to reflect the new behavior of the task submission process.

By following these steps, the proTES code will only set the task state to SYSTEM_ERROR when all available TES instances have been attempted and none of them were able to successfully submit the task. This will provide a more accurate reflection of the actual state of the task submission process and help prevent confusion or unnecessary error messages for users.

To implement the suggested change, you need to modify the task__track_task_progress function to update the task state to TesState.SYSTEM_ERROR only after the task submission fails on all available TES instances. Here's one way to modify the code:

pylint: disable-msg=too-many-locals

pylint: disable=unsubscriptable-object

@celery.task( name="tasks.track_run_progress", bind=True, ignore_result=True, track_started=True, ) def task__track_task_progress( # pylint: disable=too-many-arguments self, # pylint: disable=unused-argument worker_id: str, remote_host: str, remote_base_path: str, remote_task_id: str, user: str, password: str, ) -> None: """Relay task run request to remote TES and track run progress. Args: worker_id: Worker identifier. remote_host: Host at which the TES API is served that is processing this request; note that this should include the path information but not the base path defined in the TES API specification; e.g., specify https://my.tes.com/api if the actual API is hosted at https://my.tes.com/api/ga4gh/tes/v1. remote_base_path: Override the default path suffix defined in the tes API specification, i.e., /ga4gh/tes/v1. remote_task_id: task run identifier on remote tes service. user: User name for basic authentication. password: Password for basic authentication. Returns: Task identifier. """ foca_config: Config = current_app.config.foca controller_config: Dict = foca_config.controllers["post_task"]

# create database client
collection = _create_mongo_client(
    app=Flask(__name__),
    host=foca_config.db.host,
    port=foca_config.db.port,
    db="taskStore",
).db["tasks"]
db_client = DbDocumentConnector(
    collection=collection,
    worker_id=worker_id,
)

# update state: INITIALIZING
db_client.update_task_state(state=TesState.INITIALIZING.value)

url = f"{remote_host.strip('/')}/{remote_base_path.strip('/')}"

# fetch task log and upsert database document
try:
    cli = tes.HTTPClient(
        url,
        timeout=5,
        user=user,
        password=password,
    )
    response = cli.get_task(task_id=remote_task_id)
except Exception:
    db_client.update_task_state(state=TesState.SYSTEM_ERROR.value)
    raise

# track task progress
task_state: TesState = TesState.UNKNOWN
attempt: int = 1
tes_instances = controller_config["tes_instances"]
tes_instance_count = len(tes_instances)
while task_state not in States.FINISHED:
    sleep(controller_config["polling"]["wait"])
    try:
        response = cli.get_task(
            task_id=remote_task_id,
        )
    except Exception as exc:  # pylint: disable=broad-except
        if attempt <= controller_config["polling"]["attempts"]:
            attempt += 1
            logger.warning(exc, exc_info=True)
            continue
        if attempt == tes_instance_count:
            db_client.update_task_state(state=TesState.SYSTEM_ERROR.value)
        else:
            db_client.update_task_state(state=TesState.QUEUED.value)
        raise
    if response.state != task_state:
        task_state = response.state
        db_client.update_task
Ayush5120 commented 1 year ago

Thank you, @ranjan2829, for submitting the PR. However, the changes made to the track_task_progress module were intended to track task progress and not to submit tasks. The submission of tasks is handled in the pro_tes/ga4gh/tes/task_runs module. Please go through the codebase again, and you can refer to the create_task function, which handles the actual submission. Kindly undo the changes made to the track_task_progress module before moving forward.