nv-morpheus / Morpheus

Morpheus SDK
Apache License 2.0
363 stars 135 forks source link

[BUG]: In DFP, for a large training set, the "generic_user" model training kills the jupyter notebook kernel #933

Open tgrunzweig-cpacket opened 1 year ago

tgrunzweig-cpacket commented 1 year ago

Version

23.07

Which installation method(s) does this occur on?

Docker

Describe the bug.

Training DFP on a jupyter notebook very similar to dfp_duo_training.ipynb from examples/digital_fingerprinting/production

The data set is ~20 million rows, with ~20 features and ~400 distinct usernames (i.e. models).

After the training of all the specific models is complete, the last model is the "generic_user". While training that model the jupyter lab kernel of the training notebook dies. The training job is not completed and the mlflow server is not updated for a generic_user model.

see the "Relevant log output" below, The whole log is very long, but the sections associated with the "generic_user" are copied and pasted together below, specifically:

"Training AE model for user: 'generic_user'..." is the last output of the log, the kernel errors out at some point after that entry.

Minimum reproducible example

# The config details for this run, mostly unchanged from the example in dfp_duo_training.ipynb

# Global options
train_users =  "all"

# Enter any users to skip here
skip_users: typing.List[str] = []

# Location where cache objects will be saved
cache_dir = "/workspace/.cache/dfp"

# Input files to read from
input_files = ['/workspace/examples/data/dfp/my_training_set/*.JSON']

# The format to use for models
model_name_formatter = "source-ip-{user_id}"

# The format to use for experiment names
experiment_name_formatter = "dfp/cpacket/{reg_model_name}"

# === Derived Options ===
# To include the generic, we must be training all or generic
include_generic = train_users == "all" or train_users == "generic"

# To include individual, we must be either training or inferring
include_individual = train_users != "generic"

# None indicates we arent training anything
is_training = train_users != "none"

# the pipeline creation and running is unchanged from the dfp_duo_training.ipynb

Relevant log output

Added stage: <dfp-split-users-3; DFPSplitUsersStage(include_generic=True, include_individual=True, skip_users=[], only_users=None)>

...

Rolling window complete for generic_user in 109477.13 ms. Input: 21254290 rows from 2022-10-25 23:50:56+00:00 to 2022-11-01 08:00:54+00:00. Output: 21254290 rows from 2022-10-25 23:50:56+00:00 to 2022-11-01 08:00:54+00:00

...

2023/04/29 00:40:39 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: source-ip-91.229.45.91, version 1
ML Flow model upload complete: 91.229.45.91:source-ip-91.229.45.91:1
Training AE model for user: '91.229.45.93'... Complete.
2023/04/29 00:40:40 INFO mlflow.tracking.fluent: Experiment with name 'dfp/tcp-open/training/source-ip-91.229.45.93' does not exist. Creating a new experiment.
Successfully registered model 'source-ip-91.229.45.93'.
2023/04/29 00:40:40 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: source-ip-91.229.45.93, version 1
ML Flow model upload complete: 91.229.45.93:source-ip-91.229.45.93:1
Training AE model for user: 'generic_user'...

Full env printout

No response

Other/Misc.

This is using the same data set as described in #932, maybe its an extreme example of the same issue (i.e. kernel dead instead of hanging for 15 minutes) ?

Code of Conduct

jarmak-nv commented 1 year ago

Hi @Tzahi-cpacket!

Thanks for submitting this issue - our team has been notified and we'll get back to you as soon as we can! In the mean time, feel free to add any relevant information to this issue.

dagardner-nv commented 1 year ago

kills the jupyter notebook kernel

Does this mean it would otherwise run successfully from the Python interpreter?

tgrunzweig-cpacket commented 1 year ago

I can't get a comparable script to work from the (morpheus) container prompt, but this is unrelated and could be because I'm doing something wrong.

dagardner-nv commented 1 year ago

The documentation assumes the reader is using an official release version. If your working directly from branch-23.07 or your own feature branch you will want to do:

From the root of the morpheus repo:

./docker/build_container_release.sh

This should create an image named/tagged as nvcr.io/nvidia/morpheus/morpheus:v23.07.00a-runtime.

Then:

cd examples/digital_fingerprinting/production
export MORPHEUS_CONTAINER_VERSION="v23.07.00a-runtime"
docker compose build
docker compose run morpheus_pipeline bash

Note: depending on your version of docker you'll need to use docker-compose or docker compose.

tgrunzweig-cpacket commented 1 year ago

I created a dtp_tcp_open_pipeline.py, modeled after the dfp_duo_pipeline.py but with my features definitions. My data is in parquet format, which is now supported (thank you).

so following the steps above, with branch-23.07 (commit 0c99762293446b4178b6dd9782c8929af2c8e479) , and now I'm in a morpheus container. I do the following:

python dfp_tcp_open_pipeline.py --train_users generic --start_time "2022-10-01" --input_file="../../../data/dfp/tcp-open-training-data-processed-big-parquet-n2/*.parquet" --log_level DEBUG

and get the following output (I've replaced the actual schema in the output below to (...) ):

Running training pipeline with the following options:
Train generic_user: True
Skipping users: []
Start Time: 2022-10-01 00:00:00
Input data rate: 0 messages [00:00, ? messages/Duration: 60 days, 0:00:00
Training rate: 0 messages [00:00, ? messages/s]Cache Dir: /workspace/.cache/dfp
Tracking URI: http://mlflow:5000, ? messages/s]
input file: ('../../../data/dfp/tcp-open-training-data-processed-big-parquet-n2/*.parquet',)
====Registering Pipeline====
====Building Pipeline====
====Building Pipeline Complete!====
Starting! Time: 1686267490.8929222
====Registering Pipeline Complete!====
====Starting Pipeline====
====Building Segment: linear_segment_0====
Added source: <from-multi-file-0; MultiFileSource(filenames=['../../../data/dfp/tcp-open-training-data-processed-big-parquet-n2/*.parquet'])>
  └─> fsspec.OpenFiles
Added stage: <dfp-file-batcher-1; DFPFileBatcherStage(date_conversion_func=functools.partial(<function date_extractor at 0x7f3cfc5989d0>, filename_regex=re.compile('(?P<year>\\d{4})-(?P<month>\\d{1,2})-(?P<day>\\d{1,2})T(?P<hour>\\d{1,2})(:|_)(?P<minute>\\d{1,2})(:|_)(?P<second>\\d{1,2})(?P<microsecond>\\.\\d{1,6})?Z')), period=None, sampling_rate_s=None, start_time=None, end_time=None, sampling=None)>
  └─ fsspec.OpenFiles -> List[fsspec.core.OpenFiles]
Added source: <dfp-s3-to-df-2; DFPFileToDataFrameStage(schema=DataFrameInputSchema(...)], preserve_columns=None, row_filter=None), filter_null=True, file_type=FileTypes.PARQUET, parser_kwargs=None, cache_dir=/workspace/.cache/dfp)>
  └─> cudf.DataFrame
Added stage: <dfp-s3-to-df-2; DFPFileToDataFrameStage(schema=DataFrameInputSchema(...)], preserve_columns=None, row_filter=None), filter_null=True, file_type=FileTypes.PARQUET, parser_kwargs=None, cache_dir=/workspace/.cache/dfp)>
  └─ List[fsspec.core.OpenFiles] -> cudf.DataFrame
Added stage: <monitor-3; MonitorStage(description=Input data rate, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None, log_level=LogLevels.INFO)>
  └─ cudf.DataFrame -> cudf.DataFrame
Added stage: <dfp-split-users-4; DFPSplitUsersStage(include_generic=True, include_individual=False, skip_users=[], only_users=[])>
  └─ cudf.DataFrame -> dfp.DFPMessageMeta
Added stage: <dfp-rolling-window-5; DFPRollingWindowStage(min_history=300, min_increment=300, max_history=60d, cache_dir=/workspace/.cache/dfp)>
  └─ dfp.DFPMessageMeta -> dfp.MultiDFPMessage
Added stage: <dfp-preproc-6; DFPPreprocessingStage(input_schema=DataFrameInputSchema(...)], preserve_columns=re.compile('(_batch_id)'), row_filter=None))>
  └─ dfp.MultiDFPMessage -> dfp.MultiDFPMessage
Added stage: <dfp-training-7; DFPTraining(model_kwargs=None, epochs=30, validation_size=0.1)>
  └─ dfp.MultiDFPMessage -> morpheus.MultiAEMessage
Added stage: <monitor-8; MonitorStage(description=Training rate, smoothing=0.001, unit=messages, delayed_start=False, determine_count_fn=None, log_level=LogLevels.INFO)>
  └─ morpheus.MultiAEMessage -> morpheus.MultiAEMessage
Added stage: <dfp-mlflow-model-writer-9; DFPMLFlowModelWriterStage(model_name_formatter=DFP-server-ip-{user_id}, experiment_name_formatter=dfp/tcp_open/training/{reg_model_name}, databricks_permissions=None)>
  └─ morpheus.MultiAEMessage -> morpheus.MultiAEMessage
====Building Segment Complete!====
====Pipeline Started====
Input data rate: 21199134 messages [00:11, 1795903.99 messages/s]S3 objects to DF complete. Rows: 21199134, Cache: hit, Duration: 11627.13646888733 ms [00:11, ? messages/s]
Input data rate[Complete]: 21199134 messages [0Batch split users complete. Input: 21199134 rows from 2022-10-25 23:50:56.001000+00:00 to 2022-11-01 08:00:54.029000+00:00. Output: 1 users, rows/user min: 21199134, max: 21199134, avg: 21199134.00. Duration: 10686.98 ms
Input data rate[Complete]: 21199134 messages [0Rolling window complete for generic_user in 91196.29 ms. Input: 21199134 rows from 2022-10-25 23:50:56.001000+00:00 to 2022-11-01 08:00:54.029000+00:00. Output: 21199134 rows from 2022-10-25 23:50:56.001000+00:00 to 2022-11-01 08:00:54.029000+00:00
Input data rate[Complete]: 21199134 messages [0Preprocessed 21199134 data for logs in 2022-10-25 23:50:56.001000 to 2022-11-01 08:00:54.029000 in 40053.675413131714 mses/s]
Input data rate[Complete]: 21199134 messages [0Training AE model for user: 'generic_user'...
Input data rate[Complete]: 21199134 messages [0Killed1795903.99 messages/s]

Running the same with the --train_users flag set to all, I see that all the individual users get their models trained.

What I'm thinking that happens is that for this large data set, the "generic_user" get trained with all of the data, which is too big, while the individual users get trained with part of the data so its not big enough to crash.

dagardner-nv commented 1 year ago

The Killed bit in the output makes me think the system is running out of memory. Check syslog and see something along the lines of:

kernel: [601525.247671] oom-kill:constraint=CONSTRAINT_NONE,nodemask=(null),cpuset=user.slice,mems_allowed=0,global_oom,task_memcg=/user.slice/user-1000.slice/session-2.scope,task=python,pid=1727621,uid=1000
kernel: [601525.247686] Out of memory: Killed process 1727621 (python) total-vm:75798300kB, anon-rss:46688116kB, file-rss:72kB, shmem-rss:0kB, UID:1000 pgtables:147692kB oom_score_adj:0

Assuming the issue is memory related, try setting the --sample_rate_s flag, maybe try --sample_rate_s=10 and keep increasing the value until the pipeline is able to complete.

tgrunzweig-cpacket commented 1 year ago

Looks like this is it.

 kernel: [26245.076726] oom-kill:constraint=CONSTRAINT_NONE,nodemask=(null),cpuset=user.slice,mems_allowed=0,global_oom,task_memcg=/system.slice/docker-5f30a2988395baf9f83f1d9356f980e8ae2de8c605b54bf18610a439c4740ecc.scope,task=python,pid=87731,uid=0

 kernel: [26245.076880] Out of memory: Killed process 87731 (python) total-vm:78408140kB, anon-rss:62826060kB, file-rss:72536kB, shmem-rss:12292kB, UID:0 pgtables:127040kB oom_score_adj:0

Sample rate fix could be an immediate fix, but is it the desired fundamental fix? I have a couple of arguments against:

  1. Data spike might happen, or the host might have some unrelated memory issue and so an automatic training script might still get a production system down.
  2. Using this fix, the more data we have to train with, the less time resolution we have for the rollingwindow right? But the frequencies of interest should be set by the underlying anomalies rate, not system parameters

Maybe it would be advantageous to use partial fit in training instead?

dagardner-nv commented 1 year ago

Another thing to try would be lowering the period argument to the DFPFileBatcherStage stage. The default is one day D, you might try lowering it to one hour H which group the incoming dataframes into smaller batches.

tgrunzweig-cpacket commented 1 year ago

I tried playing with the period argument, but that leads to the following issue (I think). We have a combination of loud and quiet users. Loud users have many rows in the input, so to accommodate them we need to set the period to be small, like "3T" . But then, for the quiet users, dfp_rolling_window_stage.py breaks at line #150

   # Find the index of the first and last row
            match = train_df[train_df["_row_hash"] == incoming_hash.iloc[0]]

            if (len(match) == 0):
                raise RuntimeError("Invalid rolling window")

i.e. the quiet users can have a gap thats bigger than the period thats needed for the loud users, breaking the pipeline.

jarmak-nv commented 1 year ago

Removing the Needs Triage label as we have targeted work on this for the 23.11 release.

drobison00 commented 1 year ago

Should be mitigated by https://github.com/nv-morpheus/Morpheus/issues/1206