nv-morpheus / Morpheus

Morpheus SDK
Apache License 2.0
337 stars 125 forks source link

[BUG] Run DFP Duo Training Pipeline #1048

Closed cwharris closed 1 year ago

cwharris commented 1 year ago

Error when running the DFP Duo training pipeline according to the DFP production example documentation:

docker-compose run morpheus_pipeline bash
Creating production_morpheus_pipeline_run ... done
(morpheus) root@d8b635b38e71:/workspace/examples/digital_fingerprinting/production/morpheus# python dfp_duo_pipeline.py --train_users generic --start_time "2022-08-01" --input_file="../../../data/dfp/duo-training-data/*.json"
2023-07-12 17:16:57,593 - distributed.worker - WARNING - Compute Failed
Key:       _single_object_to_dataframe-d8c0ef384954980b9fcc8d2b0b4c2802
Function:  _single_object_to_dataframe
args:      (<OpenFile '/workspace/examples/digital_fingerprinting/production/morpheus/../../../data/dfp/duo-training-data/DUO_2022-08-02T21_10_29.396Z.json'>)
kwargs:    {}
Exception: "RuntimeError('`dask_cudf.DataFrame` is incompatible with `distributed.LocalCluster`. Please setup a `dask_cuda.LocalCUDACluster` instead. Or to run on CPU instead, provide the parameter `cpu=True` when creating the `Dataset`. ')"

2023-07-12 17:16:57,710 - distributed.worker - WARNING - Compute Failed
Key:       _single_object_to_dataframe-70cab0411a6a6da7a0dbe73283121272
Function:  _single_object_to_dataframe
args:      (<OpenFile '/workspace/examples/digital_fingerprinting/production/morpheus/../../../data/dfp/duo-training-data/DUO_2022-08-09T00_02_38.932Z.json'>)
kwargs:    {}
Exception: "RuntimeError('`dask_cudf.DataFrame` is incompatible with `distributed.LocalCluster`. Please setup a `dask_cuda.LocalCUDACluster` instead. Or to run on CPU instead, provide the parameter `cpu=True` when creating the `Dataset`. ')"

2023-07-12 17:16:57,923 - distributed.worker - WARNING - Compute Failed
Key:       _single_object_to_dataframe-69e49edd18c8af4e457f8448a81afbd7
Function:  _single_object_to_dataframe
args:      (<OpenFile '/workspace/examples/digital_fingerprinting/production/morpheus/../../../data/dfp/duo-training-data/DUO_2022-08-04T18_13_07.708Z.json'>)
kwargs:    {}
Exception: 'CommClosedError()'

E20230712 17:16:57.924346   136 context.cpp:124] /linear_segment_0/dfp-s3-to-df-2; rank: 0; size: 1; tid: 140058170467904: set_exception issued; issuing kill to current runnable. Exception msg: TypeError: object of type 'NoneType' has no len()

At:
  /workspace/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_to_df.py(218): convert_to_dataframe
Failed to download logs. Error: 
Traceback (most recent call last):
  File "/workspace/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_to_df.py", line 168, in _get_or_create_dataframe_from_s3_batch
    dfs = self._downloader.download(download_buckets, download_method)
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/morpheus/utils/downloader.py", line 157, in download
    dfs = client.gather(dfs)
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/distributed/client.py", line 2345, in gather
    return self.sync(
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/distributed/utils.py", line 349, in sync
    return sync(
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/distributed/utils.py", line 416, in sync
    raise exc.with_traceback(tb)
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/distributed/utils.py", line 389, in f
    result = yield future
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/tornado/gen.py", line 767, in run
    value = future.result()
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/distributed/client.py", line 2208, in _gather
    raise exception.with_traceback(traceback)
  File "/workspace/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_to_df.py", line 69, in _single_object_to_dataframe
    s3_df = process_dataframe(df_in=s3_df, input_schema=schema)
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/morpheus/utils/column_info.py", line 50, in process_dataframe
    return schema_transforms.process_dataframe(df_in, input_schema)
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/morpheus/utils/schema_transforms.py", line 91, in process_dataframe
    result = workflow.fit_transform(dataset).to_ddf().compute()
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/nvtabular/workflow/workflow.py", line 236, in fit_transform
    self.fit(dataset)
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/nvtabular/workflow/workflow.py", line 213, in fit
    self.executor.fit(dataset, self.graph)
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/merlin/dag/executors.py", line 457, in fit
    dataset.to_ddf(),
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/merlin/io/dataset.py", line 439, in to_ddf
    raise RuntimeError(
RuntimeError: `dask_cudf.DataFrame` is incompatible with `distributed.LocalCluster`. Please setup a `dask_cuda.LocalCUDACluster` instead. Or to run on CPU instead, provide the parameter `cpu=True` when creating the `Dataset`. 
2023-07-12 17:16:57,926 - distributed.worker - WARNING - Compute Failed
Key:       _single_object_to_dataframe-7c0d60306cbd704abf077a467f446b6a
Function:  _single_object_to_dataframe
args:      (<OpenFile '/workspace/examples/digital_fingerprinting/production/morpheus/../../../data/dfp/duo-training-data/DUO_2022-08-09T06_15_35.216Z.json'>)
kwargs:    {}
Exception: 'CancelledError("(\'head-1-1-blocks-bb674466568fe0cff96855a7dd350b16\', 0)")'

2023-07-12 17:16:57,927 - distributed.worker - WARNING - Compute Failed
Key:       _single_object_to_dataframe-5fd39a7a7c19508f34446906b6f77409
Function:  _single_object_to_dataframe
args:      (<OpenFile '/workspace/examples/digital_fingerprinting/production/morpheus/../../../data/dfp/duo-training-data/DUO_2022-08-09T03_06_46.584Z.json'>)
kwargs:    {}
Exception: 'CommClosedError()'

Error while converting S3 buckets to DF.
Traceback (most recent call last):
  File "/workspace/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_to_df.py", line 211, in convert_to_dataframe
    len(output_df),
TypeError: object of type 'NoneType' has no len()
E20230712 17:16:57.939733    28 runner.cpp:189] Runner::await_join - an exception was caught while awaiting on one or more contexts/instances - rethrowing
E20230712 17:16:57.939822    28 segment_instance.cpp:270] segment::SegmentInstance - an exception was caught while awaiting on one or more nodes - rethrowing
E20230712 17:16:57.939857    28 pipeline_instance.cpp:225] pipeline::PipelineInstance - an exception was caught while awaiting on segments - rethrowing
Traceback (most recent call last):
  File "/workspace/examples/digital_fingerprinting/production/morpheus/dfp_duo_pipeline.py", line 351, in <module>
Exception occurred in pipeline. Rethrowing
Traceback (most recent call last):
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/morpheus/pipeline/pipeline.py", line 327, in join
    await self._mrc_executor.join_async()
  File "/workspace/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_to_df.py", line 211, in convert_to_dataframe
    len(output_df),
TypeError: object of type 'NoneType' has no len()
    run_pipeline(obj={}, auto_envvar_prefix='DFP', show_default=True, prog_name="dfp")
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/click/core.py", line 1078, in main
    rv = self.invoke(ctx)
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
  File "/workspace/examples/digital_fingerprinting/production/morpheus/dfp_duo_pipeline.py", line 346, in run_pipeline
    pipeline.run()
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/morpheus/pipeline/pipeline.py", line 598, in run
    asyncio.run(self.run_async())
  File "/opt/conda/envs/morpheus/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/opt/conda/envs/morpheus/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/morpheus/pipeline/pipeline.py", line 576, in run_async
    await self.join()
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/morpheus/pipeline/pipeline.py", line 327, in join
    await self._mrc_executor.join_async()
  File "/workspace/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_to_df.py", line 211, in convert_to_dataframe
    len(output_df),
TypeError: object of type 'NoneType' has no len()
2023-07-12 17:16:58,036 - distributed.worker - WARNING - Compute Failed
Key:       _single_object_to_dataframe-3f3fbc57be4d89e5fe9cb2169c2055e9
Function:  _single_object_to_dataframe
args:      (<OpenFile '/workspace/examples/digital_fingerprinting/production/morpheus/../../../data/dfp/duo-training-data/DUO_2022-08-09T09_42_11.514Z.json'>)
kwargs:    {}
Exception: 'CancelledError("(\'head-1-1-blocks-659f0f82d50575b061ddaf28878e4cfa\', 0)")'

2023-07-12 17:16:58,036 - distributed.worker - WARNING - Compute Failed
Key:       _single_object_to_dataframe-3755bfa1c5666cedfa0c547e0b7bc117
Function:  _single_object_to_dataframe
args:      (<OpenFile '/workspace/examples/digital_fingerprinting/production/morpheus/../../../data/dfp/duo-training-data/DUO_2022-08-08T06_08_16.424Z.json'>)
kwargs:    {}
Exception: 'Exception(\'Tried sending message after closing.  Status: closed\\nMessage: {\\\'op\\\': \\\'update-graph-hlg\\\', \\\'hlg\\\': {\\\'layers\\\': [{\\\'__module__\\\': \\\'dask.highlevelgraph\\\', \\\'__name__\\\': \\\'MaterializedLayer\\\', \\\'state\\\': {\\\'dsk\\\': {"(\\\'from_cudf-19b20708786fbec0eca05e06552268c1\\\', 0)": <Serialize:                                        access_device  ...                                               user\\n0  {\\\'browser\\\': None, \\\'browser_version\\\': None, \\\'ep...  ...  {\\\'groups\\\': [], \\\'key\\\': \\\'PS47XDCOJFTXQX30RMXA\\\', ...\\n1  {\\\'browser\\\': \\\'Internet Explorer\\\', \\\'browser_vers...  ...  {\\\'groups\\\': [], \\\'key\\\': \\\'O6WJWDG0N1UKDVP6Z95M\\\', ...\\n2  {\\\'browser\\\': \\\'Google Chrome\\\', \\\'browser_version\\\'...  ...  {\\\'groups\\\': [], \\\'key\\\': \\\'P99ZB1IRCW3IP9RTXP9I\\\', ...\\n3  {\\\'browser\\\': None, \\\'browser_version\\\': None, \\\'ep...  ...  {\\\'groups\\\': [], \\\'key\\\': \\\'PS47XDCOJFTXQX30RMXA\\\', ...\\n4  {\\\'browser\\\': \\\'Internet Explorer\\\', \\\'browser_vers...  ...  {\\\'groups\\\': [], \\\'key\\\': \\\'0T301KJIYXY5F20RXBMV\\\', ...\\n5  {\\\'browser\\\': None, \\\'browser_version\\\': None, \\\'ep...  ...  {\\\'groups\\\': [], \\\'key\\\': \\\'2VNZ8V287EZ4A1E0W5RQ\\\', ...\\n6  {\\\'browser\\\': None, \\\'browser_version\\\': None, \\\'ep...  ...  {\\\'groups\\\': [], \\\'key\\\': \\\'LKM42QBWX8LHMATHXIT4\\\', ...\\n7  {\\\'browser\\\': None, \\\'browser_version\\\': None, \\\'ep...  ...  {\\\'groups\\\': [], \\\'key\\\': \\\'88NYNQAQQ1KN08ENHI8P\\\', ...\\n8  {\\\'browser\\\': None, \\\'browser_version\\\': None, \\\'ep...  ...  {\\\'groups\\\': [], \\\'key\\\': \\\'0O0ZLW0MDFTJS5GV3R1G\\\', ...\\n\\n[9 rows x 13 columns]>}, \\\'dependencies\\\': {"(\\\'from_cudf-19b20708786fbec0eca05e06552268c1\\\', 0)": set()}}, \\\'annotations\\\': {}}, {\\\'__module__\\\': \\\'dask.highlevelgraph\\\', \\\'__name__\\\': \\\'MaterializedLayer\\\', \\\'state\\\': {\\\'dsk\\\': {"(\\\'blocks-42e473b7479831f0647f0217262c5727\\\', 0)": <Serialize: (\\\'from_cudf-19b20708786fbec0eca05e06552268c1\\\', 0)>}, \\\'dependencies\\\': {"(\\\'blocks-42e473b7479831f0647f0217262c5727\\\', 0)": {"(\\\'from_cudf-19b20708786fbec0eca05e06552268c1\\\', 0)"}}}, \\\'annotations\\\': {}}, {\\\'__module__\\\': \\\'dask.highlevelgraph\\\', \\\'__name__\\\': \\\'MaterializedLayer\\\', \\\'state\\\': {\\\'dsk\\\': {"(\\\'head-1-1-blocks-42e473b7479831f0647f0217262c5727\\\', 0)": {\\\'function\\\': b\\\'\\\\x80\\\\x05\\\\x95*\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x8c\\\\ndask.utils\\\\x94\\\\x8c\\\\x0cmethodcaller\\\\x94\\\\x93\\\\x94\\\\x8c\\\\x04head\\\\x94\\\\x85\\\\x94R\\\\x94.\\\', \\\'args\\\': b"\\\\x80\\\\x05\\\\x956\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x8c.(\\\'blocks-42e473b7479831f0647f0217262c5727\\\', 0)\\\\x94K\\\\x01\\\\x86\\\\x94."}}, \\\'dependencies\\\': {"(\\\'head-1-1-blocks-42e473b7479831f0647f0217262c5727\\\', 0)": {"(\\\'blocks-42e473b7479831f0647f0217262c5727\\\', 0)"}}}, \\\'annotations\\\': {}}]}, \\\'keys\\\': ["(\\\'head-1-1-blocks-42e473b7479831f0647f0217262c5727\\\', 0)"], \\\'priority\\\': None, \\\'submitting_task\\\': \\\'_single_object_to_dataframe-3755bfa1c5666cedfa0c547e0b7bc117\\\', \\\'fifo_timeout\\\': \\\'60s\\\', \\\'actors\\\': None, \\\'code\\\': \\\'    def sample_data(self, n=1):\\\\n        """Return a sample of real data from the dataset\\\\n\\\\n        Sample the partitions of the underlying Dask collection\\\\n        until a non-empty partition is found. Then, use the first\\\\n        ``n`` rows of that partition to infer dtype info. If no\\\\n        non-empty partitions are found, use the Dask metadata.\\\\n        """\\\\n        _ddf = self.to_ddf()\\\\n        for partition_index in range(_ddf.npartitions):\\\\n            _head = _ddf.partitions[partition_index].head(n)\\\\n            if len(_head):\\\\n                return _head\\\\n        return _ddf._meta\\\\n\\\'}\')'

*** Aborted at 1689182218 (unix time) try "date -d @1689182218" if you are using GNU date ***
PC: @                0x0 (unknown)
*** SIGSEGV (@0x0) received by PID 28 (TID 0x7f60e7fff640) from PID 0; stack trace: ***
    @     0x7f64bf7d9197 google::(anonymous namespace)::FailureSignalHandler()
    @     0x7f64d2c2c520 (unknown)
    @     0x7f64d2c8c8d7 (unknown)
    @     0x7f64d2c8f4d3 free
    @     0x7f643acc3159 (unknown)
    @     0x7f643acc3680 (unknown)
    @     0x7f643ace68c2 (unknown)
    @     0x7f643a9d843c (unknown)
    @     0x7f643a9dac0c (unknown)
    @     0x7f643a9cb544 (unknown)
    @     0x7f643a9adbdc (unknown)
    @     0x7f643a9a7923 (unknown)
    @     0x7f643a99fed4 nvrtcCompileProgram
    @     0x7f646f6d33fa __pyx_f_13cupy_backends_4cuda_4libs_5nvrtc_compileProgram()
    @     0x7f646f6d38e2 __pyx_pw_13cupy_backends_4cuda_4libs_5nvrtc_11compileProgram()
    @     0x5636f8cc0516 cfunction_call
    @     0x5636f8cb9a6b _PyObject_MakeTpCall.localalias
    @     0x5636f8cb59d6 _PyEval_EvalFrameDefault
    @     0x5636f8cc099c _PyFunction_Vectorcall
    @     0x5636f8cb0c5c _PyEval_EvalFrameDefault
    @     0x5636f8cc099c _PyFunction_Vectorcall
    @     0x5636f8cb0850 _PyEval_EvalFrameDefault
    @     0x5636f8cc099c _PyFunction_Vectorcall
    @     0x5636f8cb0850 _PyEval_EvalFrameDefault
    @     0x5636f8cc099c _PyFunction_Vectorcall
    @     0x5636f8cb0850 _PyEval_EvalFrameDefault
    @     0x5636f8cc099c _PyFunction_Vectorcall
    @     0x5636f8ccd205 PyVectorcall_Call.localalias
    @     0x7f64803bedbf __Pyx_PyObject_Call()
    @     0x7f6480403d79 __pyx_f_4cupy_5_core_4core_compile_with_cache()
    @     0x7f646d1c45b9 __pyx_f_4cupy_5_core_10_reduction__create_reduction_function_from_code()
    @     0x7f646d1cf697 __pyx_f_4cupy_5_core_10_reduction__create_reduction_function()
Segmentation fault (core dumped)

There are similar errors for the dfp_azure_pipeline.py.

drobison00 commented 1 year ago

This was related to our switch to using Merlin's Distributed construct in the downloader; allowing Merlin to handle closing the cluster resolves the problem.