microsoft / presidio

Context aware, pluggable and customizable data protection and de-identification SDK for text and images
https://microsoft.github.io/presidio
MIT License
3.55k stars 547 forks source link

Custom Operator that extend built-in Operator is not recofnized by OperatorsFactory in presido_anonymizer and presido_deanonymizer #1093

Open alex-ber opened 1 year ago

alex-ber commented 1 year ago

Describe the bug I've created 2 custom operators:

from presidio_anonymizer.operators import Operator, Decrypt

class Decrypt2(Decrypt):
    """Decrypt text to from its encrypted form."""

    NAME = "decrypt2"
    PATTERN = r'@@@([\w\s\\]+)@@@'  # r'@@@(.*?)@@@' #

    def operate(self, text: str = None, params: Dict = None) -> str:
        """
        Decrypt the text.

        :param text: The text for decryption.
        :param params:
            **key* The key supplied by the user for the encryption.
        :return: The encrypted text
        """
        text = text.removeprefix('@@@')
        text = text.removesuffix('@@@')
        decrypted_text = super().operate(text, params)
        return decrypted_text

and

from typing import Dict

from presidio_anonymizer.entities import InvalidParamException
from presidio_anonymizer.operators import Operator, Encrypt
from presidio_anonymizer.operators.aes_cipher import AESCipher
from presidio_anonymizer.services.validators import validate_parameter

class Encrypt2(Encrypt):

    def operate(self, text: str = None, params: Dict = None) -> str:
        encrypted_text = super().operate(text, params)
        return f"@@@{encrypted_text}@@@"

    def operator_name(self) -> str:
        """Return operator name."""
        return "encrypt2"

I've imported them. In Encrypt2 I'm marking encrypted word with @@@ at the beginning and at the end. In Decrypt2 I'm removing @@@ at the beginning and at the end of the word before decryption.

The problem is in class presidio_anonomizer.operators.operators_factory.OperatorsFactory in the magic static method:

 @staticmethod
    def __get_operators_by_type(operator_type: OperatorType):
        operators = Operator.__subclasses__()
        return {
            cls.operator_name(cls): cls
            for cls in operators
            if cls.operator_type(cls) == operator_type
        }

More precisely in the line operators = Operator.__subclasses__(). This line returns only immediate sub-classes. So, it's find Encrypt\Decrypt classes because they inherit immediately from Operator, but fails to find my Encrypt/Decrypt2 classes, because Operator is only their grandfather (Encrypt2->Encrypt->Operator, Decrypt2->Decrypt->Operator).

To Reproduce Define Encrypt2\Decryp2 classes as written above. Run the following code (add required imports)

from .encrypt2 import Encrypt2  #required before running OperatorsFactory code
from .decrypt2 import Decrypt2  #required before running OperatorsFactory code

text = "Hello world, my name is Jane Doe. My number is: 034453334"
analizer = AnalyzerEngine(log_decision_process=True)
recognizerResults = analizer.analyze(text=text,
                                   entities=None,
                                   return_decision_process=True,
                                   score_threshold=0.1,
                                   language='en')

engine = AnonymizerEngine()
anonymize_result = engine.anonymize(
    text=text,
    analyzer_results=recognizerResults,
    #[
        #RecognizerResult(entity_type="PERSON", start=11, end=15, score=0.8),
        #RecognizerResult(entity_type="PERSON", start=17, end=27, score=0.8),
    #],
    operators={"DEFAULT": OperatorConfig("encrypt2", {"key": crypto_key})}
)

You will get 'presidio_anonymizer.entities.invalid_exception.InvalidParamException: Invalid operator class 'encrypt2'.'

Expected behavior No exception, correct work.

Screenshots image

Additional context As work-arround my Encryp2/Decrypt2 inherits from Operator directly and I copy&pasted code form Operator. So, for now, it's look like this:

from typing import Dict

from presidio_anonymizer.entities import InvalidParamException
from presidio_anonymizer.operators import Operator, OperatorType
from presidio_anonymizer.operators.aes_cipher import AESCipher
from presidio_anonymizer.services.validators import validate_parameter

class Encrypt2(Operator):
    KEY = "key"

    def operate(self, text: str = None, params: Dict = None) -> str:
        encoded_key = params.get(self.KEY).encode("utf8")
        encrypted_text = AESCipher.encrypt(encoded_key, text)
        return f"@@@{encrypted_text}@@@"

    def validate(self, params: Dict = None) -> None:
        """
        Validate Encrypt parameters.

        :param params:
            * *key* The key supplied by the user for the encryption.
                    Should be a string of 128, 192 or 256 bits length.
        :raises InvalidParamException in case on an invalid parameter.
        """
        key = params.get(self.KEY)
        validate_parameter(key, self.KEY, str)
        if not AESCipher.is_valid_key_size(key.encode("utf8")):
            raise InvalidParamException(
                f"Invalid input, {self.KEY} must be of length 128, 192 or 256 bits"
            )

    def operator_name(self) -> str:
        """Return operator name."""
        return "encrypt2"

    def operator_type(self) -> OperatorType:
        """Return operator type."""
        return OperatorType.Anonymize

and

from typing import Dict

from presidio_anonymizer.entities import InvalidParamException
from presidio_anonymizer.operators import Operator
from presidio_anonymizer.operators import OperatorType
from presidio_anonymizer.operators.aes_cipher import AESCipher
from presidio_anonymizer.services.validators import validate_parameter

class Decrypt2(Operator):
    """Decrypt text to from its encrypted form."""

    NAME = "decrypt2"
    KEY = "key"
    PATTERN = r'@@@([\w\s\\]+)@@@'  # r'@@@(.*?)@@@' #

    def operate(self, text: str = None, params: Dict = None) -> str:
        """
        Decrypt the text.

        :param text: The text for decryption.
        :param params:
            **key* The key supplied by the user for the encryption.
        :return: The encrypted text
        """
        encoded_key = params.get(self.KEY).encode("utf8")
        text=text.removeprefix('@@@')
        text=text.removesuffix('@@@')
        decrypted_text = AESCipher.decrypt(key=encoded_key, text=text)
        return decrypted_text

    def validate(self, params: Dict = None) -> None:
        """
        Validate Decrypt parameters.

        :param params:
            * *key* The key supplied by the user for the encryption.
                    Should be a string of 128, 192 or 256 bits length.
        :raises InvalidParamException in case on an invalid parameter.
        """
        key = params.get(self.KEY)
        validate_parameter(key, self.KEY, str)
        if not AESCipher.is_valid_key_size(key.encode("utf8")):
            raise InvalidParamException(
                f"Invalid input, {self.KEY} must be of length 128, 192 or 256 bits"
            )

    def operator_name(self) -> str:
        """Return operator name."""
        return self.NAME

    def operator_type(self) -> OperatorType:
        """Return operator type."""
        return OperatorType.Deanonymize

Above is work-around. It's works.

Proposed code fix In OperatorsFactory change __get_operators_by_type() to take into account all sub-classes of Operator, not only immediate one. It is easy to do using recursion. Change&Add in OpratorsFactory to the following code:


    @staticmethod
    def __get_all_subclasses(cls):                           #add this method
        subclasses = []
        for subclass in cls.__subclasses__():
            subclasses.append(subclass)
            subclasses.extend(OpratorsFactory.__get_all_subclasses(subclass))
        return subclasses

    @staticmethod
    def __get_operators_by_type(operator_type: OperatorType):
        #operators = Operator.__subclasses__()            #remove this line
        operators = OpratorsFactory.__get_all_subclasses(Operator)
        return {
            cls.operator_name(cls): cls
            for cls in operators
            if cls.operator_type(cls) == operator_type
        }
omri374 commented 1 year ago

Hi @alex-ber, thanks for raising this issue, we will look into this. If you're interested in creaing a PR, we would be happy to review. It is also somewhat related to #1052 as Presidio does not support the addition of external operators due to the way the OperatorsFactory is implemented.