nv-morpheus / Morpheus

Morpheus SDK
Apache License 2.0
347 stars 130 forks source link

Discrepancy in Digital Fingerprinting Model Training Duration between Jupyter Notebook and Morpheus Pipeline #1690

Open ylnhari opened 5 months ago

ylnhari commented 5 months ago

https://github.com/nv-morpheus/Morpheus/blob/808c52ca1c0ec4a74695c68fff06d67c5fad7e83/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_training.py#L86C1-L102C1

Description: We've been experiencing an issue related to the duration of our Digital Fingerprinting model training. The training times are significantly longer when the training is executed as part of a Morpheus stage/module in our pipeline, compared to when the identical code is run in a Jupyter notebook as a standalone script. The training time is reduced by a factor of three in the Jupyter notebook. This discrepancy persists even when the training is run as a single stage, without any other stages/modules in the pipeline.

Environment:

Expected Behavior: The training times in the pipeline and the Jupyter notebook should be comparable.

Actual Behavior: The training process takes approximately 30 minutes in a Jupyter notebook, but between 2.5 to 3 hours in the pipeline. This pattern of the pipeline taking roughly three times longer than the Jupyter notebook is consistent while training users individually.

ylnhari commented 4 months ago

Reproducible code

training_pipeline.py

import os
os.environ["CUDA_VISIBLE_DEVICES"] = "5"
from morpheus.config import Config
from morpheus.pipeline import LinearPipeline

from aml.digital_fingerprinting.rough.training_stage import (LoadTrainingDataStage, generate_sample_df, train_autoencoder, trainingstage)

def run_pipeline():
    config = Config()
    pipeline = LinearPipeline(config)
    pipeline.set_source(LoadTrainingDataStage(config))
    pipeline.add_stage(trainingstage(config))
    pipeline.run()

if __name__ == "__main__":

    print("\nTraining model without morpheus.")

    train_autoencoder(generate_sample_df())

    print("\nTraining model with morpheus.")
    run_pipeline()

training_stages.py

import time
import typing
import numpy as np
import mrc
import pandas as pd
from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.messages import MessageMeta
from morpheus.messages.multi_ae_message import MultiAEMessage
from morpheus.models.dfencoder import AutoEncoder
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair
from mrc.core import operators as ops

def generate_sample_df() -> pd.DataFrame:
    # Define the size of the DataFrame
    size = 8000669

    # Create the DataFrame
    df = pd.DataFrame({
        'eventName': np.random.choice(['event1', 'event2', 'event3'], size),
        'userAgent': np.random.choice(['agent1', 'agent2', 'agent3'], size),
        'userIdentitytype': np.random.choice(['type1', 'type2', 'type3'], size),
        'apiVersion': np.random.choice(['v1', 'v2', 'v3'], size),
        'userIdentityprincipalId': np.random.choice(['id1', 'id2', 'id3'], size),
        'ResourceIDType': np.random.choice(['resource1', 'resource2', 'resource3'], size),
        'errorCode': np.random.choice(['error1', 'error2', 'error3'], size),
        'sourceIPAddress': np.random.choice(['192.168.1.1', '192.168.1.2', '192.168.1.3'], size),
        'sourceASN': np.random.choice(['asn1', 'asn2', 'asn3'], size),
    })
    return df

def train_autoencoder(df: pd.DataFrame):
    start_time = time.time()  # Start the timer

    print("Started training , Data shape : ", df.shape)

    feature_columns = ['userIdentityaccountId',
    'eventName',
    'userAgent',
    'userIdentitytype',
    'apiVersion',
    'userIdentityprincipalId',
    'userIdentityarn',
    'ResourceIDType',
    'errorCode',
    'sourceIPAddress',
    'sourceASN']

    train_df = df[df.columns.intersection(feature_columns)]

    model_kwargs= {
        "encoder_layers": [512, 500],  # layers of the encoding part
        "decoder_layers": [512],  # layers of the decoding part
        "activation": "relu",  # activation function
        "swap_p": 0.2,  # noise parameter
        "lr": 0.001,  # learning rate
        "lr_decay": 0.99,  # learning decay
        "batch_size": 8192,
        "verbose": False,
        "optimizer": "adam",  # adam optimizer is selected
        "scaler": "standard",  # feature scaling method
        "min_cats": 1,  # cut off for minority categories
        "patience": 2,
        "progress_bar": True,
        "device": "cuda",
    }
    epochs = 5

    model = AutoEncoder(**model_kwargs)
    model.fit(train_df, epochs=epochs)

    end_time = time.time()  # Stop the timer
    execution_time = end_time - start_time  # Calculate the execution time

    # Convert execution time to hours, minutes, and seconds
    hours = int(execution_time // 3600)
    minutes = int((execution_time % 3600) // 60)
    seconds = int(execution_time % 60)

    print(f"\n Execution Time : {hours} hours, {minutes} minutes, {seconds} seconds")

@register_stage("loadtrainingdatastage")
class LoadTrainingDataStage(SingleOutputSource):
    """
    Parameters
    ----------
    config : morpheus.config.Config
        Pipeline configuration instance
    """
    def __init__(self, config: Config):
        super().__init__(config)
        self.config = config

    @property
    def name(self) -> str:
        return "trainingdatastage"

    def supports_cpp_node(self) -> bool:
        """Signals whether this stage utilizes a C language implementation."""
        return False

    def release_data_control_messages(self):
        for _ in range(0,1):
            print("yeilding data from morpheus source stage.")
            yield generate_sample_df()

    def _build_source(self, builder: mrc.Builder) -> StreamPair:
        node = builder.make_source(self.unique_name, self.release_data_control_messages)
        return node, pd.DataFrame

@register_stage("trainingstage")
class trainingstage(SinglePortStage):
    """
    Parameters
    ----------
    config : morpheus.config.Config
        Pipeline configuration instance
    """
    def __init__(self, config: Config):
        super().__init__(config)
        self.config = config

    @property
    def name(self) -> str:
        return "trainingstage"

    def accepted_types(self) -> pd.DataFrame:
        return (pd.DataFrame,)

    def supports_cpp_node(self) -> bool:
        """Signals whether this stage utilizes a C language implementation."""
        return False

    def train_model(self, df) -> typing.Generator[MultiAEMessage, None, None]:
        print("training stage started execution.")
        train_autoencoder(df)

    def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
        node = builder.make_node(self.unique_name, ops.map(self.train_model))
        builder.make_edge(input_stream[0], node)

        return node, input_stream[1]
efajardo-nv commented 4 months ago

@ylnhari I was able to reproduce the same discrepancy between notebook and Morpheus 23.07. I saw a 4.8x difference. Good news is that this is mostly corrected in our upcoming 24.06 release where I see it come down to around 1.4x. The remaining discrepancy is likely some overhead from how Morpheus manages threads across multiple stages to ensure stages do not get starved of processing power and don't step on each other.

ylnhari commented 4 months ago

@efajardo-nv , May I know the reason causing the issue.