dpguthrie / dbtc

39 stars 12 forks source link

Optional restart jobs from failure #28

Closed matt-winkler closed 2 years ago

matt-winkler commented 2 years ago

What do you think about adding an optional config to the cloud.client.trigger_job and trigger_job_and_poll methods to attempt restart_from_failure?

Something like this:

def trigger_job_and_poll(
        self, account_id: int, job_id: int, payload: Dict, poll_interval: int = 10, restart_from_failure: bool = False
    ) -> Dict:
        """Trigger a job by its ID and poll until completion:  one of
          SUCCESS, ERROR, or CANCELLED.
        Args:
            account_id (int): Numeric ID of the account to retrieve
            job_id (int): Numeric ID of the job to trigger
            payload (dict): Payload required for post request
            poll_interval (int, optional): Number of seconds to wait in between
                polling
        """
        # TODO: where to pass this param?
        last_run_id = None

        # TODO: create separate methods for the logic in this if?
        if restart_from_failure:
            print(f'restarting job {job_id} from last failed state')
            last_run_data = client.cloud.list_runs(
                account_id=account_id,
                include_related='run_steps',
                job_definition_id=job_id,
                order_by='-id',
                limit=1
                )['data'][0]

            last_run_status = last_run_data['status_humanized']
            last_run_id = last_run_data['id']

            print(f'status from the previous run: {last_run_status}')

            if last_run_status == 'Error':
                print('identifying failed models from the most recent run')
                run_results = client.cloud.get_run_artifact(account_id=account_id, run_id=last_run_id, path='run_results.json')['results']
                rerun_models = ' '.join([
                    record['unique_id'].split('.')[-1] + '+' for record in run_results if record['status'] == 'error'
                    ])
                # how to map the correct run verb (build vs. run) here?
                payload.update({"steps_override": [f'dbt build -s {rerun_models}']})

                print(f'triggering modified job to re-run failed models: {rerun_models}')
                run_id = self.trigger_job(account_id, job_id, payload)['data']['id']

            else:
                print('no failed models found - triggering base run')
                run_id = self.trigger_job(account_id, job_id, payload)['data']['id']

        else:
            run_id = self.trigger_job(account_id, job_id, payload)['data']['id']

        while True:
            time.sleep(poll_interval)
            run = self.get_run(account_id, run_id)
            status = run['data']['status']
            status_humanized = run['data']['status_humanized']
            print(f'run status: {status_humanized}')
            if status in [
                JobRunStatus.SUCCESS,
                JobRunStatus.CANCELLED,
                JobRunStatus.ERROR,
            ]:
                break

        return run
dpguthrie commented 2 years ago

Yeah, this seems like really cool functionality to add to this library. And now I'm starting to wonder if we shouldn't consolidate this into one method, something like:

    def trigger_job(
        self,
        account_id: int,
        job_id: int,
        payload: Dict,
        should_poll: bool = True,
        poll_interval: int = 10,
        restart_from_failure: bool = False,
        restart_command: str = 'build'
    ):
        if restart_from_failure:
            run_results = self.get_most_recent_run_artifact(
                account_id, job_id, 'run_results.json'
            )['results']
            rerun_models = ' '.join([
                record['unique_id'].split('.')[-1] + '+'
                for record in run_results if record['status'] == 'error'
            ])
            if len(rerun_models) > 0:
                raise Exception('No models found to restart from')

            payload.update(
                {'steps_override': [f'dbt {restart_command} -s {rerun_models}']}
            )

        run = self._simple_request(
            f'accounts/{account_id}/jobs/{job_id}/run/',
            method='post',
            json=payload,
        )
        if should_poll:
            while True:
                time.sleep(poll_interval)
                run = self.get_run(account_id, run['id'])
                status = run['data']['status']
                if status in [
                    JobRunStatus.SUCCESS,
                    JobRunStatus.CANCELLED,
                    JobRunStatus.ERROR,
                ]:
                    break

        return run

Then, we can now just use arguments to control the behavior as opposed to switching out methods.

And then I like the idea of creating a convenience method that allows you to grab the artifact from the most recent run

    def get_most_recent_run_artifact(
        self,
        account_id: int,
        job_id: int,
        path: str,
        *,
        step: int = None,
    ):
        runs = self.list_runs(
            account_id,
            include_related=['run_steps'],
            job_definition_id=job_id,
            order_by='-id',
            limit=1
        )
        try:
            run = runs['data'][0]
        except IndexError:
            raise Exception('No run information found')

        return self.get_run_artifact(account_id, run['id'], path, step)
matt-winkler commented 2 years ago

I'm on board with all of that except raising an error when there are no failed models found in the restart inspection process. I could easily see an end user wanting to leave the restart_from_failure check set to True always so that they reduce the amount of manual intervention needed.

matt-winkler commented 2 years ago

This is super clean!

dpguthrie commented 2 years ago

Such a good point with the exception, didn't think of it like that.

dpguthrie commented 2 years ago

I think that piece then just becomes this:

            if len(rerun_models) > 0:
                payload.update(
                    {'steps_override': [f'dbt {restart_command} -s {rerun_models}']}
                )
dpguthrie commented 2 years ago

Curious if you think there are other steps you'd also want to include in that override for any reason?

dpguthrie commented 2 years ago

Also, any preferences on the default for should_poll? My initial take was True, but could definitely be swayed.

matt-winkler commented 2 years ago

On other steps: we might need to think about what happens if the user wants to leverage a dbt run / dbt test pattern - maybe let them pass the run steps as a list? In THAT case, do we also have to think about existing run-operations, seeds etc. where the restart capability doesn't apply directly, but could be a required process for them.

on should_poll: I agree with you that True is a good default.

dpguthrie commented 2 years ago

What about this implementation? The steps_override argument is a dictionary that can contain three keys: before_steps, restart_steps, and after_steps and the values of each are a list. The restart_steps should either be ['build'] or ['run', 'test']. You could also do this with three different arguments. But, the idea is that the user has maximum flexibility around their restarts.

    def trigger_job_v2(
        self,
        account_id: int,
        job_id: int,
        payload: Dict,
        should_poll: bool = True,
        poll_interval: int = 10,
        restart_from_failure: bool = False,
        steps_override: Dict[str, List] = {'restart_steps': ['build']},
    ):
        if restart_from_failure:
            run_results = self.get_most_recent_run_artifact(
                account_id, job_id, 'run_results.json'
            )['results']
            rerun_models = ' '.join([
                record['unique_id'].split('.')[-1] + '+'
                for record in run_results if record['status'] == 'error'
            ])
            if len(rerun_models) > 0:
                steps = (
                    steps_override.get('before_steps', []) +
                    [
                        f'dbt {command} -s {rerun_models}'
                        for command in steps_override.get('restart_steps', ['build'])
                    ] +
                    steps_override.get('after_steps', [])
                )
                payload.update(
                    {'steps_override': steps}
                )
dpguthrie commented 2 years ago

Or, assuming we have three arguments (restart_commands, before_steps, and after_steps):

            if len(rerun_models) > 0:
                if restart_commands != ['build'] and restart_commands != ['run', 'test']:
                    raise Exception('Invalid restart commands specified')

                restart_steps = [
                    f'dbt {command} -s {rerun_models}' for command in restart_commands
                ]
                steps = (before_steps or []) + restart_steps + (after_steps or [])
                payload.update({'steps_override': steps})
dpguthrie commented 2 years ago

@matt-winkler I started a PR that we could work from. Let me know what you think.

matt-winkler commented 2 years ago

Thanks @dpguthrie! I'll have a crack at it this week. Really like your approach above including before_steps and after_steps.