piotrlaczkowski / keras-data-processor

Data Preprocessing model based on Keras preprocessing layers that can be used as a standalone model or incorporated to Keras model as first layers.
https://piotrlaczkowski.github.io/keras-data-processor/latest
MIT License
3 stars 2 forks source link

Custom Preprocessing Steps Revamp #3

Open piotrlaczkowski opened 2 months ago

piotrlaczkowski commented 2 months ago

When defining custom preprocessing steps we need to find a way to pass an initialized output of the first layer to the input of the consecutive layer, ex:

preprocessors=[
            PreprocessorLayerFactory.rescaling_layer,
            PreprocessorLayerFactory.normalization_layer,

        ], 

when defining a custom preprocessor for text columns we would need to be careful about how the first layer can impact the input of the second layer -> we do not necessarily take this into account currently (in an automatic way). For example, when the first layer would make everything lowercase but the second layer would be initialized with uppercase vocabulary, this may become a problem. In this case, the second layer would always take the "UNK" token since the first layer will completely modify the input.

piotrlaczkowski commented 1 month ago

The idea of the potential improvement would be to go over the sequence of preprocessing layers, check what they need for initialisation and then transform the input data through all initialised layers.

The dummy code demonstrating the approach to implement could look, like:

import tensorflow as tf

class DynamicPreprocessingPipeline:
    """
    Dynamically initializes a sequence of Keras preprocessing layers based on the output
    from each previous layer, allowing each layer to access the outputs of all prior layers where relevant.
    """

    def __init__(self, layers):
        """
        Initializes the DynamicPreprocessingPipeline with a list of layers.

        Args:
            layers (list): A list of Keras preprocessing layers, each potentially named for reference.
        """
        self.layers = layers

    def initialize_and_transform(self, init_data):
        """
        Sequentially processes each layer, applying transformations selectively based on each layer's
        input requirements and ensuring efficient data usage and processing. Each layer can access the output
        of all previous layers.

        Args:
            init_data (dict): A dictionary with initialization data, dynamically keyed.

        Returns:
            dict: The dictionary containing selectively transformed data for each layer.

        Example:
            ```python
            layers = [
                tf.keras.layers.Lambda(lambda x: tf.strings.lower(x), name="lowercase"),  # Make text lowercase
                tf.keras.layers.Lambda(lambda x: tf.strings.regex_replace(x, '[^a-z]', ''), name="remove_non_alpha")  # Remove non-alphabetic characters
            ]
            pipeline = DynamicPreprocessingPipeline(layers)
            init_data = {
                'text': tf.constant(["DOG!", "CAT", "DOG", "MOUSE"])
            }
            transformed_data = pipeline.initialize_and_transform(init_data)
    """
    current_data = init_data

    for i, layer in enumerate(self.layers):
        # Determine necessary keys for the current layer based on its callable signature
        if hasattr(layer, 'input_spec'):
            required_keys = set(tf.nest.flatten(tf.nest.map_structure(lambda x: x.name, layer.input_spec)))
        else:
            # If no input_spec, assume all current data is needed
            required_keys = current_data.keys()

        # Prepare input for the current layer based on the determined keys
        if i == 0:
            # First layer uses the raw init_data directly
            current_input = {k: v for k, v in current_data.items() if k in required_keys}
        else:
            # Subsequent layers use the outputs of all previous layers that are relevant
            current_input = {k: current_data[k] for k in required_keys if k in current_data}

        # Apply transformation selectively
        transformed_output = {layer.name: layer(tf.nest.map_structure(lambda x: x, current_input))}

        # Update current data with transformed output for use by the next layer
        current_data.update(transformed_output)

    return current_data

Example usage

layers = [ tf.keras.layers.Lambda(lambda x: tf.strings.lower(x), name="lowercase"), # Make text lowercase tf.keras.layers.Lambda(lambda x: tf.strings.regex_replace(x, '[^a-z]', ''), name="remove_non_alpha") # Remove non-alphabetic characters ] pipeline = DynamicPreprocessingPipeline(layers) init_data = { 'text': tf.constant(["DOG!", "CAT", "DOG", "MOUSE"]) } transformed_data = pipeline.initialize_and_transform(init_data)

piotrlaczkowski commented 1 month ago

A more complex implementation, but with more optimization, could be:

import tensorflow as tf

class DynamicPreprocessingPipeline:
    """
    Dynamically initializes and manages a sequence of Keras preprocessing layers, with selective retention of outputs
    based on dependencies among layers, and supports streaming data through the pipeline.
    """

    def __init__(self, layers):
        """
        Initializes the pipeline with a list of preprocessing layers.

        Args:
            layers (list): A list of TensorFlow preprocessing layers, each potentially named for reference.
        """
        self.layers = layers
        self.dependency_map = self._analyze_dependencies()

    def _analyze_dependencies(self):
        """
        Analyzes and determines the dependencies of each layer on the outputs of previous layers.

        Returns:
            dict: A dictionary mapping each layer to the set of layer outputs it depends on.
        """
        dependencies = {}
        all_outputs = set()

        for i, layer in enumerate(self.layers):
            if hasattr(layer, 'input_spec') and layer.input_spec is not None:
                required_inputs = set(tf.nest.flatten(tf.nest.map_structure(lambda x: x.name, layer.input_spec)))
            else:
                required_inputs = all_outputs  # If no specific input_spec, depend on all previous outputs

            dependencies[layer.name] = required_inputs
            all_outputs.update(required_inputs)

            # Ensure that each layer's output is available as potential input to subsequent layers
            all_outputs.add(layer.name)

        return dependencies

    def process(self, dataset):
        """
        Processes the dataset through the pipeline using TensorFlow's tf.data API for efficient streaming.

        Args:
            dataset (tf.data.Dataset): The dataset to process.

        Returns:
            tf.data.Dataset: The processed dataset.
        """
        def _apply_transformations(features):
            current_data = features
            for i, layer in enumerate(self.layers):
                # Prepare input for the current layer based on required keys from dependency map
                required_keys = self.dependency_map[layer.name]
                current_input = {k: current_data[k] for k in required_keys if k in current_data}

                # Apply transformation
                transformed_output = {layer.name: layer(current_input[k]) for k in required_keys if k in current_input}

                # Update current data with transformed output
                current_data.update(transformed_output)

            return current_data

        return dataset.map(_apply_transformations)

# Example usage
layers = [
    tf.keras.layers.Lambda(lambda x: tf.strings.lower(x), name="lowercase"),
    tf.keras.layers.Lambda(lambda x: tf.strings.regex_replace(x, '[^a-z]', ''), name="remove_non_alpha")
]
pipeline = DynamicPreprocessingPipeline(layers)
dataset = tf.data.Dataset.from_tensor_slices({
    'text': ["DOG!", "CAT", "DOG", "MOUSE"]
}).batch(2)  # Example of streaming in batches
processed_dataset = pipeline.process(dataset)

# Print results
for item in processed_dataset:
    print(item)
piotrlaczkowski commented 1 month ago

Another idea:

import tensorflow as tf

class DynamicPreprocessingPipeline:
    """
    Dynamically initializes and manages a sequence of Keras preprocessing layers, with selective retention of outputs
    based on dependencies among layers, and supports streaming data through the pipeline.
    """
    def __init__(self, layers):
        self.layers = layers
        self.dependency_map = self._analyze_dependencies()

    def _analyze_dependencies(self):
        dependencies = {}
        all_outputs = set()
        for i, layer in enumerate(self.layers):
            if hasattr(layer, 'input_spec') and layer.input_spec is not None:
                required_inputs = set(tf.nest.flatten(tf.nest.map_structure(lambda x: x.name, layer.input_spec)))
            else:
                required_inputs = all_outputs
            dependencies[layer.name] = required_inputs
            all_outputs.update(required_inputs)
            all_outputs.add(layer.name)
        return dependencies

    def process(self, dataset):
        def _apply_transformations(features):
            current_data = features
            for i, layer in enumerate(self.layers):
                required_keys = self.dependency_map[layer.name]
                current_input = {k: current_data[k] for k in required_keys if k in current_data}
                transformed_output = {layer.name: layer(current_input[k]) for k in required_keys if k in current_input}
                current_data.update(transformed_output)
            return current_data
        return dataset.map(_apply_transformations)

# Example usage assuming layer factory methods are compatible with tf.keras.layers
layers = [
    PreprocessorLayerFactory.rescaling_layer(),  # Assuming this is a callable returning a TensorFlow layer
    PreprocessorLayerFactory.normalization_layer()  # Assuming this is a callable returning a TensorFlow layer
]
pipeline = DynamicPreprocessingPipeline(layers)
dataset = tf.data.Dataset.from_tensor_slices({
    'image_data': tf.random.uniform([10, 256, 256, 3])  # Example data
}).batch(2)
processed_dataset = pipeline.process(dataset)

# Print results
for item in processed_dataset:
    print(item)

the combination of these ideas should give some nice code for starters