great-expectations / great_expectations

Always know what to expect from your data.
https://docs.greatexpectations.io/
Apache License 2.0
9.88k stars 1.53k forks source link

Broken inheritance from PR "Fluent datasource support for checkpoints" #7697 #7790

Closed lukedyer-peak closed 1 year ago

lukedyer-peak commented 1 year ago

The changes in #7697 have stopped inheritance working for the Checkpoint classes. I've subclassed a Checkpoint and a SimpleCheckpoint but not as not all arguments are passed the inheritance has broken. It would be great if there was another implementation mechanism that still allows inheritance.

austiezr commented 1 year ago

Hey @lukedyer-peak! Thanks for raising this.

Are you able to share an example of the error / complete stack trace / behavior you're seeing here, and the workflow generating that error? That will help us to better investigate a potential fix.

alexsherstinsky commented 1 year ago

@lukedyer-peak I am looking into it now.

alexsherstinsky commented 1 year ago

@lukedyer-peak While I am figuring this out, would it be possible to get from you an example that I can run in my environment and which will reproduce the issue? Thank you very much for highlighting this and your help.

alexsherstinsky commented 1 year ago

@lukedyer-peak I now have a hypothesis for what happened; hence, it would be really helpful to get from you not only the example, but also how you run this example, and the full exception stack trace (no matter how long it is). Thank you so much!

alexsherstinsky commented 1 year ago

Hello again, @lukedyer-peak -- I went ahead with my hypothesis and believe that I fixed the issue (and included a special test for extending Checkpoint classes) -- the pull request focused on this change is #7879 and it has been merged. Please give it a try and let us know whether or not the improvement is working for you. You can try it from the develop branch, or wait till Thursday, May 18, 2023 which is when the next Great Expectations release is scheduled to take place. Thank you very much!

lukedyer-peak commented 1 year ago

Hi @alexsherstinsky sorry for the late reply I've been away on holiday. Thanks for the prompt exploration and fix! I've taken a look at #7879 and I can make my extension work with that. I'll be doing something not as elegant as I will subclass the SimpleCheckpoint but will have the .run() function signature of Checkpoint. This is because I don't want runtime arguments to change the config or action_list of the checkpoint as it does in SimpleCheckpoint.

I (probably unexpectedly) subclass a Checkpoint but use the notify_on and slack_webhook arguments. I've created a SeverityLevelCheckpoint that extends the concept of error and warning expectation suites to the full range of severity levels from the python logging library (eg critical, error, warning, info, debug). It also has some sane defaults for our company's use cases (in regards to the action_list). This means that users can reduce any alerting fatigue. From a code point of view, I'd ideally prefer to use a composition approach over the inheritance approach I've done but I don't think there's a simple way to do that with the current design that is out of my hands; as a need to modify thecheckpoint_result to change the success status 🙈 .

When we create the SeverityLevelCheckpoint directly in python it works fine but the issue arose when we were creating it from a yaml config (eg from a checkpoint.yaml file). Here's the main bulk of the class and since we're in a public forum I've blurred out the imports from our codebase.

import datetime
from typing import Dict, List, Optional, Union

from great_expectations.checkpoint.checkpoint import BaseCheckpoint, Checkpoint
from great_expectations.checkpoint.configurator import ActionDict, ActionDicts
from great_expectations.checkpoint.types.checkpoint_result import CheckpointResult
from great_expectations.data_context import AbstractDataContext
from great_expectations.data_context.types.base import CheckpointConfig
from ... import SeverityLevel, str_to_severity_level
from ... import (
    DEFAULT_SEVERITY_LEVEL,
    parse_checkpoint_result,
)
from ... import SlackSeverityLevelRenderer
from ... import post_safely_to_webhook

class SeverityLevelCheckpoint(Checkpoint):
    """
    A Checkpoint that (batch) notifies on severity levels.

    All the parameters are the same as `Checkpoint` expect the following:

    Parameters:
    ----------
    notify_on:
        The minimum severity level to send Slack notifications for.
        Defaults to "info".  If it's not known will be set to "debug"
    slack_renderer:
        The slack renderer to use to render the Slack message after the checkpoint has
        finished validating all batches.
        Defaults to `SlackSeverityLevelRenderer`.
    action_list:
        A list of actions to perform after each batch is validated.
        Defaults to STORE_VALIDATION_RESULT and STORE_EVALUATION_PARAMS for convenience.

    Notes
    -----
    - The Checkpoint will still succeed if any expectation suites of debug or info fail.
    """

    FAIL_ON = DEFAULT_SEVERITY_LEVEL

    def __init__(
        self,
        name: str,
        data_context: AbstractDataContext,
        config_version: Optional[Union[int, float]] = None,
        template_name: Optional[str] = None,
        run_name_template: Optional[str] = None,
        expectation_suite_name: Optional[str] = None,
        batch_request=None,
        action_list: Optional[List[dict]] = None,
        evaluation_parameters: Optional[dict] = None,
        runtime_configuration: Optional[dict] = None,
        validations: Optional[List[dict]] = None,
        profilers: Optional[List[dict]] = None,
        validation_operator_name: Optional[str] = None,
        batches: Optional[List[dict]] = None,
        ge_cloud_id=None,
        expectation_suite_ge_cloud_id=None,
        default_validation_id: Optional[str] = None,
        slack_webhook: Optional[str] = None,
        notify_on: str = "info",
        slack_renderer=None,
        **kwargs,
    ) -> None:
        if slack_renderer is None:
            slack_renderer = SlackSeverityLevelRenderer
        self.slack_renderer = slack_renderer()

        if action_list is None:
            action_list = self._default_action_list()

        checkpoint_config = CheckpointConfig(
            name=name,
            class_name=self.__class__.__name__,
            module_name=self.__class__.__module__,
            config_version=config_version,
            template_name=template_name,
            run_name_template=run_name_template,
            expectation_suite_name=expectation_suite_name,
            batch_request=batch_request,
            action_list=action_list,
            evaluation_parameters=evaluation_parameters,
            runtime_configuration=runtime_configuration,
            validations=validations,
            profilers=profilers,
            validation_operator_name=validation_operator_name,
            batches=batches,
            ge_cloud_id=ge_cloud_id,
            expectation_suite_ge_cloud_id=expectation_suite_ge_cloud_id,
            default_validation_id=default_validation_id,
            slack_webhook=slack_webhook,
            notify_on=notify_on,
            **kwargs,
        )
        BaseCheckpoint.__init__(
            self,
            checkpoint_config=checkpoint_config,
            data_context=data_context,
        )

    @staticmethod
    def _default_action_list() -> List[ActionDict]:
        return [
            ActionDicts.STORE_VALIDATION_RESULT,
            ActionDicts.STORE_EVALUATION_PARAMS,
            ActionDicts.UPDATE_DATA_DOCS,
        ]

    def run(
        self,
        template_name: Optional[str] = None,
        run_name_template: Optional[str] = None,
        expectation_suite_name: Optional[str] = None,
        batch_request=None,
        action_list: Optional[List[dict]] = None,
        evaluation_parameters: Optional[dict] = None,
        runtime_configuration: Optional[dict] = None,
        validations: Optional[List[dict]] = None,
        profilers: Optional[List[dict]] = None,
        run_id=None,
        run_name: Optional[str] = None,
        run_time: Optional[Union[str, datetime.datetime]] = None,
        result_format: Optional[Union[str, dict]] = None,
        expectation_suite_ge_cloud_id: Optional[str] = None,
    ) -> CheckpointResult:
        checkpoint_result: CheckpointResult = super().run(
            template_name=template_name,
            run_name_template=run_name_template,
            expectation_suite_name=expectation_suite_name,
            batch_request=batch_request,
            action_list=action_list,
            evaluation_parameters=evaluation_parameters,
            runtime_configuration=runtime_configuration,
            validations=validations,
            profilers=profilers,
            run_id=run_id,
            run_name=run_name,
            run_time=run_time,
            result_format=result_format,
            expectation_suite_ge_cloud_id=expectation_suite_ge_cloud_id,
        )
        self._update_result_success_status(checkpoint_result)

        if self.config.slack_webhook:
            self._maybe_send_slack_notification(checkpoint_result)

        return checkpoint_result

    def _update_result_success_status(self, checkpoint_result):
        checkpoint_severity_result = parse_checkpoint_result(
            checkpoint_result
        ).severity_result
        if checkpoint_severity_result is None:
            checkpoint_result._success = True
        else:
            checkpoint_result._success = checkpoint_severity_result < self.FAIL_ON

    def _maybe_send_slack_notification(self, result: CheckpointResult):
        if self._check_slack_notify(result):
            message: Dict = self.slack_renderer.render(parse_checkpoint_result(result))
            # read a ${SLACK_URL} from config providers -- usually env vars
            substituted_config = substitute_config(self.data_context, self.config)
            slack_webhook = substituted_config["slack_webhook"]
            post_safely_to_webhook(
                slack_webhook,
                payload=message,
                headers={"Content-Type": "application/json"},
            )

    def _check_slack_notify(self, result: CheckpointResult) -> bool:
        """Checks to see if a Slack notification should be sent."""
        parsed_result = parse_checkpoint_result(result)
        validation_success = parsed_result.severity_result is None
        notify_on = self.config.notify_on.lower()

        # deal with legacy conditions
        if notify_on == "all":
            return True
        if notify_on == "success":
            return validation_success
        if notify_on == "failure":
            return not validation_success

        if validation_success:
            return False
        return parsed_result.severity_result >= str_to_severity_level.get(
            notify_on, SeverityLevel.DEBUG
        )

def substitute_config(context: AbstractDataContext, config) -> Dict:
    return context.config_provider.substitute_config(config)

Hopefully this is insightful at least to see what strange behaviour users concoct in the wild ☠️

alexsherstinsky commented 1 year ago

@lukedyer-peak Thank you very much for your explanation and the example. Indeed, inheritance is supported (but not composition), and unless you already got this working by extending the SimpleCheckpoint, extending the regular Checkpoint would probably afford you more flexibility. In any case, is everything working on your end the way you needed? If so, could you please close the issue; otherwise, please let me know if anything specific is still not working. Thank you very much again for your participation in this!

anthonyburdi commented 1 year ago

Hi @lukedyer-peak To echo Alex, thank you for raising the issue and for your explanation! I'll close the issue as resolved, but we'd love to hear whether this worked for you and if it did not please feel free to reopen or create a new issue!

lukedyer-peak commented 1 year ago

Hi, Yes it's fixed my issue. My resolution is slightly more "hacky" as I have to subclass a SimpleCheckpoint to be passed the slack_webhook and notify_on when constructed from a yaml config but have to modify with the inheritance using super() to not use the methods from SimpleCheckpoint. A bit complicated explanation but in code:

class SeverityLevelCheckpoint(SimpleCheckpoint):
    # We want to subclass Checkpoint but need to subclass SimpleCheckpoint so that
    # loading from yaml config works.
    # If we subclassed Checkpoint then not all run configs get passed.
    # https://github.com/great-expectations/great_expectations/issues/7790
    """
   ...
   """

    FAIL_ON = DEFAULT_SEVERITY_LEVEL

    def __init__(
        self,
  ....
    ) -> None:
        ... # same as before
        super(Checkpoint, self).__init__(
            checkpoint_config=checkpoint_config,
            data_context=data_context,
        )

    def run(
        self,
        template_name: Optional[str] = None,
        run_name_template: Optional[str] = None,
        expectation_suite_name: Optional[str] = None,
        batch_request=None,
        validator=None,
        action_list: Optional[List[dict]] = None,
        evaluation_parameters: Optional[dict] = None,
        runtime_configuration: Optional[dict] = None,
        validations: Optional[List[dict]] = None,
        profilers: Optional[List[dict]] = None,
        run_id=None,
        run_name: Optional[str] = None,
        run_time: Optional[datetime.datetime] = None,
        result_format: Optional[Union[str, dict]] = None,
        expectation_suite_ge_cloud_id: Optional[str] = None,
    ) -> CheckpointResult:
        checkpoint_result: CheckpointResult = super(Checkpoint, self).run(
            template_name=template_name,
            run_name_template=run_name_template,
            expectation_suite_name=expectation_suite_name,
            batch_request=batch_request,
            validator=validator,
            action_list=action_list,
            evaluation_parameters=evaluation_parameters,
            runtime_configuration=runtime_configuration,
            validations=validations,
            profilers=profilers,
            run_id=run_id,
            run_name=run_name,
            run_time=run_time,
            result_format=result_format,
            expectation_suite_ge_cloud_id=expectation_suite_ge_cloud_id,
        )
        ..... # rest of class is same as before