nv-morpheus / Morpheus

Morpheus SDK
Apache License 2.0
337 stars 125 forks source link

[BUG]: Import Issue inside stages #692

Open tanmoyio opened 1 year ago

tanmoyio commented 1 year ago

Version

22.11

Which installation method(s) does this occur on?

Source

Describe the bug.

I want to perform umap inside my morpheus custom stage and it requires some lazy imports(specifically umap) which is taking too long. Generally if you run umap outside morpheus it takes somewhat around 2 second. But inside umap stages its taking more than 120 seconds. Anything I am doing wrong ?

Minimum reproducible example

@profile
def on_data(self, message):
    import umap
    return message

Relevant log output

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    87                                               @profile
    88                                               def on_data(self, message):
    89         1  125171785.3 125171785.3    100.0       import umap
    90         1          0.2         0.2      0.0       return message

Full env printout

No response

Other/Misc.

other imports are fine (like torch).

Code of Conduct

tanmoyio commented 1 year ago

I tried to run a code snippet inside on_data and the message was object of MessageMeta class and the message.df was object of pd.dataframe not cudf. Then I ran that function with morpheus and without morpheus. Here is the snapshot

######################## WITH MORPHEUS #######################
Timer unit: 1e-06 s

Total time: 81.5568 s
File: umap_example_morpheus.py
Function: on_data at line 65
====Pipeline Complete====

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    65                                               @profile
    66                                               def on_data(self, message: typing.Any):
    67         1         15.5     15.5      0.0          df = message.df
    68         1       1901.1   1901.1      0.0          df = df.dropna()
    69         1         17.9     17.9      0.0          reducer = umap.UMAP()
    70         1        547.0    547.0      0.0          df = df[["bill_length_mm","bill_depth_mm","flipper_length_mm","body_mass_g",]].values
    71         1        595.1    595.1      0.0          df = StandardScaler().fit_transform(df)
    72         1   81553705.6 81553705.6    100.0          embedding = reducer.fit_transform(df)
    73         1          0.2      0.2      0.0          return message

###################### WITHOUT MORPHEUS ######################
Timer unit: 1e-06 s

Total time: 5.80986 s
File: umap_example.py
Function: f at line 10

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    11                                           @profile
    12                                           def f():
    13         1       1393.7   1393.7      0.0      df = df.dropna()
    14                                           
    15         1         12.1     12.1      0.0      reducer = umap.UMAP()
    16                                           
    17         1        469.2    469.2      0.0      df = df[["bill_length_mm","bill_depth_mm","flipper_length_mm","body_mass_g",]].values
    18         1        481.7    481.7      0.0      df = StandardScaler().fit_transform(df)
    19                                           
    20         1    5212678.1 5212678.1     89.7      embedding = reducer.fit_transform(df)

It seems like morpheus is spending more time for every line. But in case of umap.fit_transform() morpheus exec time is very large.

mdemoret-nv commented 1 year ago

Without seeing the entire pipeline, it's difficult to say what could be causing the slowdown when running inside of Morpheus vs. outside. Is it possible for you to upload a small reproduction example so we can run it on our end?

Morpheus should be executing your on_data method like any other python function so there shouldnt be any performance difference when running inside Morpheus. So I suspect the issue is caused by differences in the environment or differences in the data. Here are a few things I would check:

  1. Does fit_transform utilize parallel processing at all? (i.e. multiple threads or multi-process)
    1. If fit_transform is running in parallel, it could compete with the Morpheus runtime and cause competition for resources resulting in reduced performance
  2. You mentioned that "message.df was object of pd.dataframe not cudf". Is there one that you are expecting? Are you using Pandas or cuDF when running outside of Morpheus?
    1. It's crucial for any performance comparison that you are using the same library both inside and outside of morpheus.
    2. If Morpheus was using Pandas but you were using cuDF outside of Morpheus, this could account for the performance difference.
    3. It should be pretty easy to force both to use Pandas or both to use cuDF and rerun the performance tests.
tanmoyio commented 1 year ago

@mdemoret-nv Yes fit_transform is using parallel processing. I converted the message to pd intentionally to compare the performance against the script running without morpheus. As on_data method getting executed like any other python function I am expecting atleast similar execution time for each lines inside on_data compared to the script not using morpheus. In case of fit_transform its more than 10x exec time with morpheus.

For reproducibility

import os
import srf
import logging
import typing
import umap
import pandas as pd
from sklearn.preprocessing import StandardScaler

from morpheus.config import Config
from morpheus.utils.logger import configure_logging
from morpheus.messages.message_meta import MessageMeta

from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.input.file_source_stage import FileSourceStage

from morpheus.pipeline import LinearPipeline
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.pipeline.single_output_source import SingleOutputSource

class GitSourceStage(SingleOutputSource):
    def __init__(self, c: Config, url):
        super().__init__(c)
        self.url = url

    @property
    def name(self) -> str:
        return "from-git"

    def supports_cpp_node(self):
        return False

    def _build_source(self, builder: srf.Builder) -> StreamPair:
        out_stream = builder.make_source(self.unique_name, self._generate_frames())
        out_type = MessageMeta
        return out_stream, out_type

    def _generate_frames(self):
        yield MessageMeta(pd.read_csv(self.url))

class UmapStage(SinglePortStage):
    @property
    def name(self) -> str:
        return "umap"

    def accepted_types(self) -> typing.Tuple:
        return (typing.Any, )

    def supports_cpp_node(self) -> bool:
        return False

    def on_data(self, message: typing.Any):
        df = message.df
        df = df.dropna()
        reducer = umap.UMAP()
        df = df[["bill_length_mm","bill_depth_mm","flipper_length_mm","body_mass_g",]].values
        df = StandardScaler().fit_transform(df)
        embedding = reducer.fit_transform(df)
        return message

    def _build_single(self, builder: srf.Builder, input_stream: StreamPair) -> StreamPair:
        node = builder.make_node(self.unique_name, self.on_data)
        builder.make_edge(input_stream[0], node)
        return node, input_stream[1]

def pipeline():
    configure_logging(log_level=logging.DEBUG)

    config = Config()
    config.num_threads = os.cpu_count()
    pipeline = LinearPipeline(config)

    pipeline.set_source(GitSourceStage(
        config, 
        url="https://raw.githubusercontent.com/allisonhorst/palmerpenguins/c19a904462482430170bfe2c718775ddb7dbb885/inst/extdata/penguins.csv"))
    pipeline.add_stage(UmapStage(config))
    pipeline.run()

if __name__ == '__main__':
    pipeline()
tanmoyio commented 1 year ago

env

name: morpheus-22.11
channels:
  - rapidsai
  - nvidia
  - conda-forge
  - defaults
dependencies:
  - _libgcc_mutex=0.1=conda_forge
  - _openmp_mutex=4.5=2_gnu
  - _sysroot_linux-64_curr_repodata_hack=3=h5bd9786_13
  - alabaster=0.7.13=pyhd8ed1ab_0
  - alembic=1.9.1=pyhd8ed1ab_0
  - appdirs=1.4.4=pyh9f0ad1d_0
  - arrow-cpp=8.0.1=py38h998ac4b_2_cpu
  - asn1crypto=1.5.1=pyhd8ed1ab_0
  - atk-1.0=2.38.0=hd4edc92_1
  - attrs=22.2.0=pyh71513ae_0
  - autoconf=2.69=pl5321hd708f79_11
  - automake=1.16.5=pl5321ha770c72_0
  - aws-c-cal=0.5.11=h95a6274_0
  - aws-c-common=0.6.2=h7f98852_0
  - aws-c-event-stream=0.2.7=h3541f99_13
  - aws-c-io=0.10.5=hfb6a706_0
  - aws-checksums=0.1.11=ha31a3da_7
  - aws-sdk-cpp=1.8.186=hecaee15_4
  - babel=2.11.0=pyhd8ed1ab_0
  - benchmark=1.6.1=h9c3ff4c_0
  - binutils_impl_linux-64=2.36.1=h193b22a_2
  - binutils_linux-64=2.36=hf3e587d_10
  - blinker=1.5=pyhd8ed1ab_0
  - boost-cpp=1.74.0=h6cacc03_7
  - brotlipy=0.7.0=py38h0a891b7_1005
  - bzip2=1.0.8=h7f98852_4
  - c-ares=1.18.1=h7f98852_0
  - ca-certificates=2022.12.7=ha878542_0
  - cachetools=5.0.0=pyhd8ed1ab_0
  - cairo=1.16.0=ha12eb4b_1010
  - ccache=4.7.3=h2599c5e_0
  - certifi=2022.12.7=pyhd8ed1ab_0
  - cffi=1.15.1=py38h4a40e3a_3
  - charset-normalizer=2.1.1=pyhd8ed1ab_0
  - clang=14.0.6=ha770c72_0
  - clang-14=14.0.6=default_h2e3cab8_0
  - clang-format=14.0.6=default_h2e3cab8_0
  - clang-format-14=14.0.6=default_h2e3cab8_0
  - clang-tools=14.0.6=default_h2e3cab8_0
  - clangdev=14.0.6=default_h2e3cab8_0
  - clangxx=14.0.6=default_ha074a59_0
  - click=8.1.3=unix_pyhd8ed1ab_2
  - cloudpickle=2.2.1=pyhd8ed1ab_0
  - cmake=3.22.3=h5432695_0
  - colorama=0.4.6=pyhd8ed1ab_0
  - configargparse=1.5.3=pyhd8ed1ab_0
  - configparser=5.3.0=pyhd8ed1ab_0
  - coverage=7.1.0=py38h1de0b5d_0
  - cryptography=39.0.0=py38h1724139_0
  - cuda-nvml-dev=11.5.50=h511b398_0
  - cuda-python=11.7.0=py38h3fd9d12_0
  - cudatoolkit=11.5.1=hcf5317a_9
  - cudf=22.08.00=cuda_11_py38_gb71873c701_0
  - cupy=9.5.0=py38h7818112_1
  - curl=7.87.0=h6312ad2_0
  - cyrus-sasl=2.1.27=h957375c_6
  - cython=0.29.24=py38h709712a_1
  - databricks-cli=0.17.4=pyhd8ed1ab_0
  - datacompy=0.8.4=pyhd8ed1ab_0
  - distro=1.8.0=pyhd8ed1ab_0
  - dlpack=0.5=h9c3ff4c_0
  - docker-compose=1.29.2=py38h578d9bd_2
  - docker-py=5.0.3=py38h578d9bd_2
  - docker-pycreds=0.4.0=py_0
  - dockerpty=0.4.1=py_0
  - docopt=0.6.2=py_1
  - entrypoints=0.4=pyhd8ed1ab_0
  - exceptiongroup=1.1.0=pyhd8ed1ab_0
  - expat=2.5.0=h27087fc_0
  - faker=12.3.0=pyhd8ed1ab_0
  - fastavro=1.7.1=py38h1de0b5d_0
  - fastrlock=0.8=py38hfa26641_3
  - flake8=6.0.0=pyhd8ed1ab_0
  - flask=2.2.2=pyhd8ed1ab_0
  - flatbuffers=2.0.8=hcb278e6_1
  - font-ttf-dejavu-sans-mono=2.37=hab24e00_0
  - font-ttf-inconsolata=3.000=h77eed37_0
  - font-ttf-source-code-pro=2.038=h77eed37_0
  - font-ttf-ubuntu=0.83=hab24e00_0
  - fontconfig=2.14.2=h14ed4e7_0
  - fonts-conda-ecosystem=1=0
  - fonts-conda-forge=1=0
  - freetype=2.12.1=hca18f0e_1
  - fribidi=1.0.10=h36c2ea0_0
  - fsspec=2023.1.0=pyhd8ed1ab_0
  - gcc_impl_linux-64=9.4.0=h03d3576_16
  - gcc_linux-64=9.4.0=h391b98a_10
  - gdk-pixbuf=2.42.10=h05c8ddd_0
  - gettext=0.21.1=h27087fc_0
  - gflags=2.2.2=he1b5a44_1004
  - git=2.39.1=pl5321ha3eba64_0
  - git-lfs=3.2.0=ha770c72_0
  - gitdb=4.0.10=pyhd8ed1ab_0
  - gitpython=3.1.30=pyhd8ed1ab_0
  - glog=0.6.0=h6f12383_0
  - gmock=1.10.0=h4bd325d_7
  - gputil=1.4.0=pyh9f0ad1d_0
  - graphite2=1.3.13=h58526e2_1001
  - graphviz=4.0.0=h5abf519_0
  - greenlet=2.0.2=py38h8dc9893_0
  - grpc-cpp=1.46.4=hbad87ad_7
  - grpcio=1.46.4=py38h5b6373e_7
  - gtest=1.10.0=h4bd325d_7
  - gtk2=2.24.33=h90689f9_2
  - gts=0.7.6=h64030ff_2
  - gunicorn=20.1.0=py38h578d9bd_3
  - gxx_impl_linux-64=9.4.0=h03d3576_16
  - gxx_linux-64=9.4.0=h0316aca_10
  - harfbuzz=4.2.0=h40b6f09_0
  - icu=69.1=h9c3ff4c_0
  - idna=3.4=pyhd8ed1ab_0
  - imagesize=1.4.1=pyhd8ed1ab_0
  - importlib-metadata=5.2.0=pyha770c72_0
  - importlib_resources=5.10.2=pyhd8ed1ab_0
  - include-what-you-use=0.18=h27087fc_0
  - iniconfig=2.0.0=pyhd8ed1ab_0
  - isort=5.12.0=pyhd8ed1ab_1
  - itsdangerous=2.1.2=pyhd8ed1ab_0
  - jinja2=3.1.2=pyhd8ed1ab_1
  - jpeg=9e=h166bdaf_2
  - kernel-headers_linux-64=3.10.0=h4a8ded7_13
  - keyutils=1.6.1=h166bdaf_0
  - krb5=1.20.1=hf9c8cef_0
  - ld_impl_linux-64=2.36.1=hea4e1c9_2
  - lerc=4.0.0=h27087fc_0
  - libabseil=20220623.0=cxx17_h05df665_6
  - libblas=3.9.0=16_linux64_openblas
  - libbrotlicommon=1.0.9=h166bdaf_8
  - libbrotlidec=1.0.9=h166bdaf_8
  - libbrotlienc=1.0.9=h166bdaf_8
  - libcblas=3.9.0=16_linux64_openblas
  - libclang=14.0.6=default_h2e3cab8_0
  - libclang-cpp=14.0.6=default_h2e3cab8_0
  - libclang-cpp14=14.0.6=default_h2e3cab8_0
  - libclang13=14.0.6=default_h3a83d3e_0
  - libcrc32c=1.1.2=h9c3ff4c_0
  - libcudf=22.08.00=cuda11_gb71873c701_0
  - libcurl=7.87.0=h6312ad2_0
  - libdeflate=1.17=h0b41bf4_0
  - libedit=3.1.20191231=he28a2e2_2
  - libev=4.33=h516909a_1
  - libevent=2.1.10=h9b69904_4
  - libffi=3.4.2=h7f98852_5
  - libgcc-devel_linux-64=9.4.0=hd854feb_16
  - libgcc-ng=12.2.0=h65d4601_19
  - libgcrypt=1.10.1=h166bdaf_0
  - libgd=2.3.3=h695aa2c_1
  - libgfortran-ng=12.2.0=h69a702a_19
  - libgfortran5=12.2.0=h337968e_19
  - libglib=2.74.1=h606061b_1
  - libgomp=12.2.0=h65d4601_19
  - libgoogle-cloud=2.1.0=hf2e47f9_1
  - libgpg-error=1.46=h620e276_0
  - libgsasl=1.8.0=2
  - libhiredis=1.0.2=h2cc385e_0
  - libhwloc=2.5.0=h6746aa3_0
  - libiconv=1.17=h166bdaf_0
  - liblapack=3.9.0=16_linux64_openblas
  - libllvm11=11.1.0=he0ac6c6_5
  - libllvm14=14.0.6=he0ac6c6_1
  - libnghttp2=1.51.0=hdcd2b5c_0
  - libnsl=2.0.0=h7f98852_0
  - libntlm=1.4=h7f98852_1002
  - libopenblas=0.3.21=pthreads_h78a6416_3
  - libpng=1.6.39=h753d276_0
  - libprotobuf=3.20.2=h6239696_0
  - librdkafka=1.7.0=hc49e61c_1
  - librmm=22.08.00=cuda11_gd212232c_0
  - librsvg=2.54.4=h7abd40a_0
  - libsanitizer=9.4.0=h79bfe98_16
  - libsqlite=3.40.0=h753d276_0
  - libsrf=22.11.00=cuda_11.4_h505641c_0
  - libssh2=1.10.0=haa6b8db_3
  - libstdcxx-devel_linux-64=9.4.0=hd854feb_16
  - libstdcxx-ng=12.2.0=h46fd767_19
  - libthrift=0.16.0=h491838f_2
  - libtiff=4.5.0=h6adf6a1_2
  - libtool=2.4.7=h27087fc_0
  - libutf8proc=2.8.0=h166bdaf_0
  - libuuid=2.32.1=h7f98852_1000
  - libuv=1.43.0=h7f98852_0
  - libwebp-base=1.2.4=h166bdaf_0
  - libxcb=1.13=h7f98852_1004
  - libxml2=2.9.14=haae042b_4
  - libzlib=1.2.13=h166bdaf_4
  - llvm-tools=14.0.6=he0ac6c6_1
  - llvmdev=14.0.6=he0ac6c6_1
  - llvmlite=0.38.1=py38h38d86a4_0
  - lz4-c=1.9.3=h9c3ff4c_1
  - m4=1.4.18=h516909a_1001
  - mako=1.2.4=pyhd8ed1ab_0
  - markdown-it-py=2.1.0=pyhd8ed1ab_0
  - markupsafe=2.1.2=py38h1de0b5d_0
  - mccabe=0.7.0=pyhd8ed1ab_0
  - mdit-py-plugins=0.3.3=pyhd8ed1ab_0
  - mdurl=0.1.0=pyhd8ed1ab_0
  - mlflow=1.30.0=py38he918c71_0
  - myst-parser=0.17.0=pyhd8ed1ab_0
  - ncurses=6.3=h27087fc_1
  - networkx=2.8.8=pyhd8ed1ab_0
  - ninja=1.10.2=h4bd325d_1
  - nlohmann_json=3.9.1=h9c3ff4c_1
  - nodejs=17.4.0=h8ca31f7_0
  - numba=0.55.0=py38h4bf6c61_0
  - numpy=1.21.6=py38h1d589f8_0
  - numpydoc=1.4.0=pyhd8ed1ab_1
  - nvtx=0.2.3=py38h0a891b7_2
  - oauthlib=3.2.2=pyhd8ed1ab_0
  - openssl=1.1.1s=h0b41bf4_1
  - orc=1.7.6=h6c59b99_0
  - ordered-set=4.1.0=pyhd8ed1ab_0
  - packaging=21.3=pyhd8ed1ab_0
  - pandas=1.3.5=py38h43a58ef_0
  - pango=1.50.7=hbd2fdc8_0
  - parquet-cpp=1.5.1=2
  - pcre2=10.40=hc3806b6_0
  - perl=5.32.1=2_h7f98852_perl5
  - pip=23.0=pyhd8ed1ab_0
  - pixman=0.40.0=h36c2ea0_0
  - pkg-config=0.29.2=h36c2ea0_1008
  - pluggy=1.0.0=pyhd8ed1ab_5
  - pooch=1.6.0=pyhd8ed1ab_0
  - prometheus_client=0.16.0=pyhd8ed1ab_0
  - prometheus_flask_exporter=0.21.0=pyhd8ed1ab_0
  - pthread-stubs=0.4=h36c2ea0_1001
  - ptxcompiler=0.7.0=py38h241159d_3
  - py-cpuinfo=9.0.0=pyhd8ed1ab_0
  - pyarrow=8.0.1=py38h097c49a_2_cpu
  - pybind11-stubgen=0.10.5=pyhd8ed1ab_0
  - pycodestyle=2.10.0=pyhd8ed1ab_0
  - pycparser=2.21=pyhd8ed1ab_0
  - pydot=1.4.2=py38h578d9bd_3
  - pyflakes=3.0.1=pyhd8ed1ab_0
  - pygments=2.14.0=pyhd8ed1ab_0
  - pyjwt=2.6.0=pyhd8ed1ab_0
  - pyopenssl=23.0.0=pyhd8ed1ab_0
  - pyparsing=3.0.9=pyhd8ed1ab_0
  - pyrsistent=0.19.3=py38h1de0b5d_0
  - pysocks=1.7.1=pyha2e5f31_6
  - pytest=7.2.1=pyhd8ed1ab_0
  - pytest-benchmark=4.0.0=pyhd8ed1ab_0
  - pytest-cov=4.0.0=pyhd8ed1ab_0
  - python=3.8.15=h257c98d_0_cpython
  - python-confluent-kafka=1.7.0=py38h497a2fe_2
  - python-dateutil=2.8.2=pyhd8ed1ab_0
  - python-dotenv=0.21.1=pyhd8ed1ab_0
  - python-graphviz=0.20.1=pyh22cad53_0
  - python_abi=3.8=3_cp38
  - pytz=2022.7.1=pyhd8ed1ab_0
  - pyyaml=5.4.1=py38h0a891b7_4
  - querystring_parser=1.2.4=py_0
  - rapidjson=1.1.0=he1b5a44_1002
  - re2=2022.06.01=h27087fc_1
  - readline=8.1.2=h0f457ee_0
  - requests=2.28.2=pyhd8ed1ab_0
  - rhash=1.4.3=h166bdaf_0
  - rmm=22.08.00=cuda11_py38_gd212232c_0
  - s2n=1.0.10=h9b69904_0
  - scikit-build=0.13.1=pyhca92ed8_0
  - scipy=1.10.0=py38h10c12cc_0
  - setuptools=59.8.0=py38h578d9bd_1
  - six=1.16.0=pyh6c4a22f_0
  - smmap=3.0.5=pyh44b312d_0
  - snappy=1.1.9=hbd366e4_2
  - snowballstemmer=2.2.0=pyhd8ed1ab_0
  - spdlog=1.8.5=h4bd325d_1
  - sphinx=4.5.0=pyh6c4a22f_0
  - sphinx_rtd_theme=1.1.1=pyha770c72_1
  - sphinxcontrib-applehelp=1.0.4=pyhd8ed1ab_0
  - sphinxcontrib-devhelp=1.0.2=py_0
  - sphinxcontrib-htmlhelp=2.0.0=pyhd8ed1ab_0
  - sphinxcontrib-jsmath=1.0.1=py_0
  - sphinxcontrib-qthelp=1.0.3=py_0
  - sphinxcontrib-serializinghtml=1.1.5=pyhd8ed1ab_2
  - sqlalchemy=1.4.46=py38h1de0b5d_0
  - sqlparse=0.4.3=pyhd8ed1ab_0
  - srf=22.11.00=cuda_11.4_py38_h91afa34_0
  - sysroot_linux-64=2.17=h4a8ded7_13
  - tabulate=0.9.0=pyhd8ed1ab_1
  - texttable=1.6.7=pyhd8ed1ab_0
  - tk=8.6.12=h27826a3_0
  - toml=0.10.2=pyhd8ed1ab_0
  - tomli=2.0.1=pyhd8ed1ab_0
  - tqdm=4.64.1=pyhd8ed1ab_0
  - typing-extensions=4.4.0=hd8ed1ab_0
  - typing_extensions=4.4.0=pyha770c72_0
  - typing_utils=0.1.0=pyhd8ed1ab_0
  - ucx=1.13.1=h538f049_1
  - urllib3=1.26.14=pyhd8ed1ab_0
  - watchdog=2.1.9=py38h578d9bd_1
  - websocket-client=0.57.0=py38h578d9bd_6
  - werkzeug=2.2.2=pyhd8ed1ab_0
  - wheel=0.38.4=pyhd8ed1ab_0
  - xorg-kbproto=1.0.7=h7f98852_1002
  - xorg-libice=1.0.10=h7f98852_0
  - xorg-libsm=1.2.3=hd9c2040_1000
  - xorg-libx11=1.7.2=h7f98852_0
  - xorg-libxau=1.0.9=h7f98852_0
  - xorg-libxdmcp=1.1.3=h7f98852_0
  - xorg-libxext=1.3.4=h7f98852_1
  - xorg-libxrender=0.9.10=h7f98852_1003
  - xorg-renderproto=0.11.1=h7f98852_1002
  - xorg-xextproto=7.3.0=h7f98852_1002
  - xorg-xproto=7.0.31=h7f98852_1007
  - xz=5.2.6=h166bdaf_0
  - yaml=0.2.5=h7f98852_2
  - yapf=0.32.0=pyhd8ed1ab_0
  - zipp=3.12.0=pyhd8ed1ab_0
  - zlib=1.2.13=h166bdaf_4
  - zstd=1.5.2=h3eb15da_6
  - pip:
      - aiohttp==3.8.4
      - aiosignal==1.3.1
      - annoy==1.17.1
      - anyio==3.6.2
      - argon2-cffi==21.3.0
      - argon2-cffi-bindings==21.2.0
      - arrow==1.2.3
      - asttokens==2.2.1
      - async-timeout==4.0.2
      - backcall==0.2.0
      - beautifulsoup4==4.11.2
      - bleach==6.0.0
      - bokeh==2.4.3
      - brotli==1.0.9
      - colorcet==3.0.1
      - comm==0.1.2
      - contourpy==1.0.7
      - cycler==0.11.0
      - dask==2023.1.1
      - datashader==0.14.4
      - datashape==0.5.2
      - debugpy==1.6.6
      - decorator==5.1.1
      - defusedxml==0.7.1
      - dfencoder==0.0.37
      - dgl==1.0.0
      - dill==0.3.6
      - dirty-cat==0.2.0
      - docutils==0.19
      - executing==1.2.0
      - fastjsonschema==2.16.2
      - filelock==3.9.0
      - fonttools==4.38.0
      - fqdn==1.5.1
      - frozenlist==1.3.3
      - gevent==22.10.2
      - geventhttpclient==2.0.8
      - holoviews==1.15.4
      - huggingface-hub==0.12.0
      - imageio==2.25.0
      - ipykernel==6.20.2
      - ipython==8.9.0
      - ipython-genutils==0.2.0
      - ipywidgets==8.0.4
      - isoduration==20.11.0
      - jedi==0.18.2
      - joblib==1.2.0
      - json5==0.9.11
      - jsonpointer==2.3
      - jsonschema==4.17.3
      - jupyter==1.0.0
      - jupyter-client==8.0.2
      - jupyter-console==6.5.0
      - jupyter-core==4.12.0
      - jupyter-events==0.6.3
      - jupyter-server==2.2.0
      - jupyter-server-terminals==0.4.4
      - jupyterlab==3.5.3
      - jupyterlab-pygments==0.2.2
      - jupyterlab-server==2.19.0
      - jupyterlab-widgets==3.0.5
      - kiwisolver==1.4.4
      - line-profiler==4.0.2
      - locket==1.0.0
      - markdown==3.4.1
      - matplotlib==3.6.3
      - matplotlib-inline==0.1.6
      - mistune==2.0.4
      - morpheus==22.11.0
      - multidict==6.0.4
      - multipledispatch==0.6.0
      - multiprocess==0.70.11.1
      - multiprocesspandas==1.1.5
      - nbclassic==0.5.1
      - nbclient==0.7.2
      - nbconvert==7.2.9
      - nbformat==5.7.3
      - nest-asyncio==1.5.6
      - nltk==3.8.1
      - notebook==6.5.2
      - notebook-shim==0.2.2
      - nvidia-cublas-cu11==11.10.3.66
      - nvidia-cuda-nvrtc-cu11==11.7.99
      - nvidia-cuda-runtime-cu11==11.7.99
      - nvidia-cudnn-cu11==8.5.0.96
      - nvidia-pyindex==1.0.9
      - palettable==3.3.0
      - pandocfilters==1.5.0
      - panel==0.14.3
      - param==1.12.3
      - parso==0.8.3
      - partd==1.3.0
      - pexpect==4.8.0
      - pickleshare==0.7.5
      - pillow==9.4.0
      - pkgutil-resolve-name==1.3.10
      - prompt-toolkit==3.0.36
      - protobuf==3.20.1
      - psutil==5.9.4
      - ptyprocess==0.7.0
      - pure-eval==0.2.2
      - pycrypto==2.6.1
      - pyct==0.5.0
      - pynndescent==0.5.8
      - python-json-logger==2.0.4
      - python-rapidjson==1.9
      - pyviz-comms==2.2.1
      - pywavelets==1.4.1
      - pyzmq==25.0.0
      - qtconsole==5.4.0
      - qtpy==2.3.0
      - regex==2022.10.31
      - rfc3339-validator==0.1.4
      - rfc3986-validator==0.1.1
      - scikit-image==0.19.3
      - scikit-learn==1.2.1
      - seaborn==0.12.2
      - send2trash==1.8.0
      - sentence-transformers==2.2.2
      - sentencepiece==0.1.97
      - sniffio==1.3.0
      - soupsieve==2.3.2.post1
      - splunk-sdk==1.7.2
      - squarify==0.4.3
      - stack-data==0.6.2
      - tensorboardx==2.5.1
      - terminado==0.17.1
      - threadpoolctl==3.1.0
      - tifffile==2023.2.3
      - tinycss2==1.2.1
      - tokenizers==0.13.2
      - toolz==0.12.0
      - torch==1.13.1
      - torchaudio==0.13.1
      - torchvision==0.14.1
      - tornado==6.2
      - traitlets==5.9.0
      - transformers==4.26.0
      - tritonclient==2.17.0
      - umap-learn==0.5.3
      - uri-template==1.2.0
      - wcwidth==0.2.6
      - webcolors==1.12
      - webencodings==0.5.1
      - websockets==10.4
      - widgetsnbextension==4.0.5
      - xarray==2023.1.0
      - yarl==1.8.2
      - zope-event==4.6
      - zope-interface==5.5.2
tanmoyio commented 1 year ago

@mdemoret-nv Also I tried running the same code using nvcr.io/nvidia/morpheus/morpheus:22.11-runtime with added pip install umap-learn==0.5.3. Got same delay in the result.

tanmoyio commented 1 year ago

@mdemoret-nv any updates on this ?

lmeyerov commented 1 year ago

@mdemoret-nv Is there another way you'd like us to provide a reproducible?

mdemoret-nv commented 1 year ago

@tanmoyio I was able to run your reproducer example but I wasn't able to replicate the results you were seeing. For me, when running in Morpheus and outside of Morpheus results I see nearly the same performance. Inside of Morpheus takes 7.82533 s and outside of Morpheus takes 7.51365 s.

For reference, here is the entire file I was using:

import logging
import os
import profile
import sys
import typing
from time import time

import pandas as pd
import srf
import umap
from line_profiler import LineProfiler
from sklearn.preprocessing import StandardScaler

from morpheus.config import Config
from morpheus.messages.message_meta import MessageMeta
from morpheus.pipeline import LinearPipeline
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.input.file_source_stage import FileSourceStage
from morpheus.utils.logger import configure_logging

profiler = LineProfiler()

def profile(func):

    def inner(*args, **kwargs):
        profiler.add_function(func)
        profiler.enable_by_count()
        return func(*args, **kwargs)

    return inner

def timer_func(func):
    # This function shows the execution time of
    # the function object passed
    def wrap_func(*args, **kwargs):
        t1 = time()
        result = func(*args, **kwargs)
        t2 = time()
        print(f'Function {func.__name__!r} executed in {(t2-t1):.4f}s')
        return result

    return wrap_func

class GitSourceStage(SingleOutputSource):

    def __init__(self, c: Config, url):
        super().__init__(c)
        self.url = url

    @property
    def name(self) -> str:
        return "from-git"

    def supports_cpp_node(self):
        return False

    def _build_source(self, builder: srf.Builder) -> StreamPair:
        out_stream = builder.make_source(self.unique_name, self._generate_frames())
        out_type = MessageMeta
        return out_stream, out_type

    def _generate_frames(self):
        yield MessageMeta(pd.read_csv(self.url))

class UmapStage(SinglePortStage):

    @property
    def name(self) -> str:
        return "umap"

    def accepted_types(self) -> typing.Tuple:
        return (typing.Any, )

    def supports_cpp_node(self) -> bool:
        return False

    @profile
    def on_data(self, message: MessageMeta):

        df = message.df
        df = df.dropna()
        reducer = umap.UMAP()
        df = df[[
            "bill_length_mm",
            "bill_depth_mm",
            "flipper_length_mm",
            "body_mass_g",
        ]].values
        df = StandardScaler().fit_transform(df)
        embedding = reducer.fit_transform(df)

        return message

    def _build_single(self, builder: srf.Builder, input_stream: StreamPair) -> StreamPair:
        node = builder.make_node(self.unique_name, self.on_data)
        builder.make_edge(input_stream[0], node)
        return node, input_stream[1]

def pipeline():
    configure_logging(log_level=logging.DEBUG)

    config = Config()
    config.num_threads = os.cpu_count()
    pipeline = LinearPipeline(config)

    pipeline.set_source(
        GitSourceStage(
            config,
            url=
            "https://raw.githubusercontent.com/allisonhorst/palmerpenguins/c19a904462482430170bfe2c718775ddb7dbb885/inst/extdata/penguins.csv"
        ))
    pipeline.add_stage(UmapStage(config))
    pipeline.run()

def no_morpheus():

    @profile
    def normal_umap(df):
        df = df.dropna()
        reducer = umap.UMAP()
        df = df[[
            "bill_length_mm",
            "bill_depth_mm",
            "flipper_length_mm",
            "body_mass_g",
        ]].values
        df = StandardScaler().fit_transform(df)
        embedding = reducer.fit_transform(df)

    normal_umap(
        pd.read_csv(
            "https://raw.githubusercontent.com/allisonhorst/palmerpenguins/c19a904462482430170bfe2c718775ddb7dbb885/inst/extdata/penguins.csv"
        ))

if __name__ == '__main__':

    if (len(sys.argv) > 1 and sys.argv[1] == "morpheus"):
        pipeline()
    else:
        no_morpheus()

    profiler.print_stats(output_unit=1e-6)

And here is the output I saw:

(morpheus) coder ➜ /workspaces/morpheus-dev (branch-22.11 ✗) $ python examples/umap/pipeline.py
Timer unit: 1e-06 s

Total time: 7.51365 s
File: examples/umap/pipeline.py
Function: normal_umap at line 126

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
   126                                               @profile
   127                                               def normal_umap(df):
   128         1       2550.3   2550.3      0.0          df = df.dropna()
   129         1         29.4     29.4      0.0          reducer = umap.UMAP()
   130         1        790.2    790.2      0.0          df = df[[
   131         1          0.2      0.2      0.0              "bill_length_mm",
   132         1          0.1      0.1      0.0              "bill_depth_mm",
   133         1          0.2      0.2      0.0              "flipper_length_mm",
   134         1          0.1      0.1      0.0              "body_mass_g",
   135                                                   ]].values
   136         1        735.7    735.7      0.0          df = StandardScaler().fit_transform(df)
   137         1    7509543.6 7509543.6     99.9          embedding = reducer.fit_transform(df)

(morpheus) coder ➜ /workspaces/morpheus-dev (branch-22.11 ✗) $ python examples/umap/pipeline.py morpheus
====Registering Pipeline====
====Building Pipeline====
====Building Segment: linear_segment_0====
====Building Segment Complete!====
====Building Pipeline Complete!====
Starting! Time: 1677611334.7001576
====Registering Pipeline Complete!====
====Starting Pipeline====
====Pipeline Started====
Added source: <from-git-0; GitSourceStage(url=https://raw.githubusercontent.com/allisonhorst/palmerpenguins/c19a904462482430170bfe2c718775ddb7dbb885/inst/extdata/penguins.csv)>
  └─> morpheus.MessageMeta
Added stage: <umap-1; UmapStage()>
  └─ morpheus.MessageMeta -> morpheus.MessageMeta
Timer unit: 1e-06 s

====Pipeline Complete====
Total time: 7.82533 s
File: examples/umap/pipeline.py
Function: on_data at line 84

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    84                                               @profile
    85                                               def on_data(self, message: MessageMeta):
    86                                           
    87         1         46.3     46.3      0.0          df = message.df
    88         1       4600.9   4600.9      0.1          df = df.dropna()
    89         1         26.5     26.5      0.0          reducer = umap.UMAP()
    90         1        770.9    770.9      0.0          df = df[[
    91         1          0.1      0.1      0.0              "bill_length_mm",
    92         1          0.1      0.1      0.0              "bill_depth_mm",
    93         1          0.1      0.1      0.0              "flipper_length_mm",
    94         1          0.1      0.1      0.0              "body_mass_g",
    95                                                   ]].values
    96         1        795.9    795.9      0.0          df = StandardScaler().fit_transform(df)
    97         1    7819093.0 7819093.0     99.9          embedding = reducer.fit_transform(df)
    98                                           
    99         1          0.2      0.2      0.0          return message

I didnt see any profiling code in your example. Just to eliminate that as a variable, can you try running my example and let me know if you get the same or different output?

tanmoyio commented 1 year ago

@mdemoret-nv I ran your code and in my system the results were not same,

with morpheus

File: test.py
Function: on_data at line 84
====Pipeline Complete====

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    84                                               @profile
    85                                               def on_data(self, message: MessageMeta):
    86                                           
    87         1          8.7      8.7      0.0          df = message.df
    88         1       1737.7   1737.7      0.0          df = df.dropna()
    89         1         18.0     18.0      0.0          reducer = umap.UMAP()
    90         1        510.5    510.5      0.0          df = df[[
    91         1          0.1      0.1      0.0              "bill_length_mm",
    92         1          0.1      0.1      0.0              "bill_depth_mm",
    93         1          0.1      0.1      0.0              "flipper_length_mm",
    94         1          0.1      0.1      0.0              "body_mass_g",
    95                                                   ]].values
    96         1        499.6    499.6      0.0          df = StandardScaler().fit_transform(df)
    97         1  112685573.8 112685573.8    100.0          embedding = reducer.fit_transform(df)
    98                                           
    99         1          0.2      0.2      0.0          return message

Wrote profile results to test.py.lprof
Timer unit: 1e-06 s

without morpheus

File: test.py
Function: normal_umap at line 126

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
   126                                               @profile
   127                                               def normal_umap(df):
   128         1      58622.4  58622.4      1.0          df = df.dropna()
   129         1         22.5     22.5      0.0          reducer = umap.UMAP()
   130         1       1550.0   1550.0      0.0          df = df[[
   131         1          0.1      0.1      0.0              "bill_length_mm",
   132         1          0.1      0.1      0.0              "bill_depth_mm",
   133         1          0.1      0.1      0.0              "flipper_length_mm",
   134         1          0.1      0.1      0.0              "body_mass_g",
   135                                                   ]].values
   136         1        645.6    645.6      0.0          df = StandardScaler().fit_transform(df)
   137         1    5536535.4 5536535.4     98.9          embedding = reducer.fit_transform(df)

Wrote profile results to test.py.lprof
Timer unit: 1e-06 s
tanmoyio commented 1 year ago

@mdemoret-nv even inside the container I am facing same issue, anything wrong with my system?

lmeyerov commented 1 year ago

Being more careful w/ reproducibles. Plot thickens, with docker-based repro on diff hw and using nvidia base images:

Seeing about a 5-20X slowdown


Test HW 1:

No morpheus

sudo docker run --rm -ti -v ${PWD}/test.py:/workspace/test.py --runtime=nvidia --gpus=all --net=host --cap-add ALL nvcr.io/nvidia/morpheus/morpheus:22.11-runtime bash -c "pip install line-profiler umap-learn && python3 -m test"

=>

Total time: 12.5596 s
File: /workspace/test.py
Function: normal_umap at line 125

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
   125                                               @profile
   126                                               def normal_umap(df):
   127         1    7360932.8 7360932.8     58.6          import umap
   128         1       1716.9   1716.9      0.0          df = df.dropna()
   129         1          9.8      9.8      0.0          reducer = umap.UMAP()
   130         1        505.3    505.3      0.0          df = df[[
   131         1          0.1      0.1      0.0              "bill_length_mm",
   132         1          0.1      0.1      0.0              "bill_depth_mm",
   133         1          0.2      0.2      0.0              "flipper_length_mm",
   134         1          0.1      0.1      0.0              "body_mass_g",
   135                                                   ]].values
   136         1        426.0    426.0      0.0          df = StandardScaler().fit_transform(df)
   137         1    5196021.6 5196021.6     41.4          embedding = reducer.fit_transform(df)

Morpheus

sudo docker run --rm -ti -v ${PWD}/test.py:/workspace/test.py --runtime=nvidia --gpus=all --net=host --cap-add ALL nvcr.io/nvidia/morpheus/morpheus:22.11-runtime bash -c "pip install line-profiler umap-learn && python3 -m test morpheus"

=>

Total time: 106.221 s
====Pipeline Complete====
File: /workspace/test.py
Function: on_data at line 83

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    83                                               @profile
    84                                               def on_data(self, message: MessageMeta):
    85         1    8145791.5 8145791.5      7.7          import umap
    86         1          8.1      8.1      0.0          df = message.df
    87         1       2053.2   2053.2      0.0          df = df.dropna()
    88         1          9.8      9.8      0.0          reducer = umap.UMAP()
    89         1        508.4    508.4      0.0          df = df[[
    90         1          0.2      0.2      0.0              "bill_length_mm",
    91         1          0.1      0.1      0.0              "bill_depth_mm",
    92         1          0.1      0.1      0.0              "flipper_length_mm",
    93         1          0.1      0.1      0.0              "body_mass_g",
    94                                                   ]].values
    95         1        676.9    676.9      0.0          df = StandardScaler().fit_transform(df)
    96         1   98072260.3 98072260.3     92.3          embedding = reducer.fit_transform(df)
    97                                           
    98         1          0.1      0.1      0.0          return messag

Test 2: AWS T4

No morpheus

sudo docker run --rm -ti -v ${PWD}/test.py:/workspace/test.py --runtime=nvidia --gpus=all --net=host --cap-add ALL nvcr.io/nvidia/morpheus/morpheus:22.11-runtime bash -c "pip install line-profiler umap-learn && python3 -m test"

=>

Total time: 21.2879 s
File: /workspace/test.py
Function: normal_umap at line 125

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
   125                                               @profile
   126                                               def normal_umap(df):
   127         1   12286915.3 12286915.3     57.7          import umap
   128         1       2989.1   2989.1      0.0          df = df.dropna()
   129         1         19.3     19.3      0.0          reducer = umap.UMAP()
   130         1        835.0    835.0      0.0          df = df[[
   131         1          0.2      0.2      0.0              "bill_length_mm",
   132         1          0.2      0.2      0.0              "bill_depth_mm",
   133         1          0.2      0.2      0.0              "flipper_length_mm",
   134         1          0.3      0.3      0.0              "body_mass_g",
   135                                                   ]].values
   136         1        884.7    884.7      0.0          df = StandardScaler().fit_transform(df)
   137         1    8996286.5 8996286.5     42.3          embedding = reducer.fit_transform(df)

Morpheus

sudo docker run --rm -ti -v ${PWD}/test.py:/workspace/test.py --runtime=nvidia --gpus=all --net=host --cap-add ALL nvcr.io/nvidia/morpheus/morpheus:22.11-runtime bash -c "pip install line-profiler umap-learn && python3 -m test morpheus"

=>

Total time: 37.1697 s
File: /workspace/test.py
Function: on_data at line 83

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    83                                               @profile
    84                                               def on_data(self, message: MessageMeta):
    85         1   13377536.4 13377536.4     36.0          import umap
    86         1         12.3     12.3      0.0          df = message.df
    87         1       3213.0   3213.0      0.0          df = df.dropna()
    88         1         20.0     20.0      0.0          reducer = umap.UMAP()
    89         1        899.7    899.7      0.0          df = df[[
    90         1          0.3      0.3      0.0              "bill_length_mm",
    91         1          0.2      0.2      0.0              "bill_depth_mm",
    92         1          0.3      0.3      0.0              "flipper_length_mm",
    93         1          0.2      0.2      0.0              "body_mass_g",
    94                                                   ]].values
    95         1       1248.9   1248.9      0.0          df = StandardScaler().fit_transform(df)
    96         1   23786727.9 23786727.9     64.0          embedding = reducer.fit_transform(df)
    97                                           
    98         1          0.3      0.3      0.0          return message
drobison00 commented 1 year ago

@tanmoyio @lmeyerov @mdemoret-nv

Update on this, it looks like the pip install umap-learn variant is the source of the problem; if you use mamba install -c conda-forge umap-learn, everything seems to work. I've attached both runs from a clean container build below, and I do see something like a 10x performance drop using the pip installed variant. It looks like the fit_transform call is doing something unexpected in the pip installed case.

I did have to make one or two small changes to get the morpheus path to work with the 23.03 updates, which I'll post below the repro steps, but they don't affect the underlying issue.

Repro:

Download morpheus, 23.03 DOCKER_IMAGE_TAG="v23.03.00-runtime" ./docker/build_container_release.sh grab a coffee DOCKER_IMAGE_TAG="v23.03.00-runtime" ./docker/run_container_release.sh (morpheus) root@drobison:/workspace# mamba install -y -c conda-forge -c anaconda aiohttp requests line_profiler umap-learn

(conda) UMAP - No Morpheus

(morpheus) root@drobison:/workspace# python profile_test.py 
Timer unit: 1e-06 s

Total time: 12.9776 s
File: profile_test.py
Function: normal_umap at line 132

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
   132                                               @profile
   133                                               def normal_umap(df):
   134         1    7570600.2 7570600.2     58.3          import umap
   135         1       1820.7   1820.7      0.0          df = df.dropna()
   136         1         11.1     11.1      0.0          reducer = umap.UMAP()
   137         1        542.5    542.5      0.0          df = df[[
   138         1          0.1      0.1      0.0              "bill_length_mm",
   139         1          0.1      0.1      0.0              "bill_depth_mm",
   140         1          0.2      0.2      0.0              "flipper_length_mm",
   141         1          0.1      0.1      0.0              "body_mass_g",
   142                                                   ]].values
   143         1        425.4    425.4      0.0          df = StandardScaler().fit_transform(df)
   144         1    5404178.6 5404178.6     41.6          embedding = reducer.fit_transform(df)

(conda) UMAP - Morpheus

(morpheus) root@drobison:/workspace# python profile_test.py morpheus
====Registering Pipeline====
Building single
====Building Pipeline====
====Building Pipeline Complete!====
Starting! Time: 1681251254.2549536
====Registering Pipeline Complete!====
====Starting Pipeline====
====Pipeline Started====
====Building Segment: linear_segment_0====
Added source: <from-git-0; GitSourceStage(url=https://raw.githubusercontent.com/allisonhorst/palmerpenguins/c19a904462482430170bfe2c718775ddb7dbb885/inst/extdata/penguins.csv)>
  └─> morpheus.MessageMeta
Added stage: <umap-1; UmapStage()>
  └─ morpheus.MessageMeta -> morpheus.MessageMeta
====Building Segment Complete!====
====Pipeline Complete====
Timer unit: 1e-06 s

Total time: 9.12142 s
File: profile_test.py
Function: on_data_internal at line 84

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    84                                               @profile
    85                                               def on_data_internal(self, message):
    86         1    3636203.1 3636203.1     39.9          import umap
    87                                           
    88         1       1643.7   1643.7      0.0          with message.mutable_dataframe() as mdf:
    89         1       5018.7   5018.7      0.1              df = mdf.to_pandas()
    90         1       1849.5   1849.5      0.0              df = df.dropna()
    91         1         11.8     11.8      0.0              reducer = umap.UMAP()
    92         1        481.0    481.0      0.0              df = df[[
    93         1          0.1      0.1      0.0                  "bill_length_mm",
    94         1          0.1      0.1      0.0                  "bill_depth_mm",
    95         1          0.1      0.1      0.0                  "flipper_length_mm",
    96         1          0.1      0.1      0.0                  "body_mass_g",
    97                                                       ]].values
    98         1        503.8    503.8      0.0              df = StandardScaler().fit_transform(df)
    99         1    5475684.7 5475684.7     60.0              embedding = reducer.fit_transform(df)
   100                                           
   101         1         19.5     19.5      0.0              return message

We can see both perform roughly the same.

Next, kill the container and start with a clean workspace:

DOCKER_IMAGE_TAG="v23.03.00-runtime" ./docker/run_container_release.sh (morpheus) root@drobison:/workspace# mamba install -y -c conda-forge -c anaconda aiohttp requests line_profiler (morpheus) root@drobison:/workspace# pip install umap-learn

(pip) UMAP - No Morpheus

(morpheus) root@drobison-mint:/workspace# python profile_test.py         
Timer unit: 1e-06 s

Total time: 13.2059 s
File: profile_test.py
Function: normal_umap at line 132

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
   132                                               @profile
   133                                               def normal_umap(df):
   134         1    7626270.1 7626270.1     57.7          import umap
   135         1       1872.4   1872.4      0.0          df = df.dropna()
   136         1         14.0     14.0      0.0          reducer = umap.UMAP()
   137         1        541.6    541.6      0.0          df = df[[
   138         1          0.2      0.2      0.0              "bill_length_mm",
   139         1          0.1      0.1      0.0              "bill_depth_mm",
   140         1          0.2      0.2      0.0              "flipper_length_mm",
   141         1          0.1      0.1      0.0              "body_mass_g",
   142                                                   ]].values
   143         1        436.6    436.6      0.0          df = StandardScaler().fit_transform(df)
   144         1    5576788.6 5576788.6     42.2          embedding = reducer.fit_transform(df)

(pip) UMAP - With Morpheus

(morpheus) root@drobison:/workspace# python profile_test.py morpheus
====Registering Pipeline====
Building single
====Building Pipeline====
====Building Pipeline Complete!====
Starting! Time: 1681251658.6832945
====Registering Pipeline Complete!====
====Starting Pipeline====
====Building Segment: linear_segment_0====
Added source: <from-git-0; GitSourceStage(url=https://raw.githubusercontent.com/allisonhorst/palmerpenguins/c19a904462482430170bfe2c718775ddb7dbb885/inst/extdata/penguins.csv)>
  └─> morpheus.MessageMeta
====Pipeline Started====
Added stage: <umap-1; UmapStage()>
  └─ morpheus.MessageMeta -> morpheus.MessageMeta
====Building Segment Complete!====
Timer unit: 1e-06 s

Total time: 134.247 s
File: profile_test.py
Function: on_data_internal at line 84
====Pipeline Complete====

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    84                                               @profile
    85                                               def on_data_internal(self, message):
    86         1    3642093.9 3642093.9      2.7          import umap
    87                                           
    88         1       1156.7   1156.7      0.0          with message.mutable_dataframe() as mdf:
    89         1       3494.6   3494.6      0.0              df = mdf.to_pandas()
    90         1       1634.0   1634.0      0.0              df = df.dropna()
    91         1         11.0     11.0      0.0              reducer = umap.UMAP()
    92         1       1035.0   1035.0      0.0              df = df[[
    93         1          0.1      0.1      0.0                  "bill_length_mm",
    94         1          0.2      0.2      0.0                  "bill_depth_mm",
    95         1          0.2      0.2      0.0                  "flipper_length_mm",
    96         1          0.1      0.1      0.0                  "body_mass_g",
    97                                                       ]].values
    98         1       2086.7   2086.7      0.0              df = StandardScaler().fit_transform(df)
    99         1  130595169.1 130595169.1     97.3              embedding = reducer.fit_transform(df)
   100                                           
   101         1         21.4     21.4      0.0              return message

Updated profile_test.py script:

import logging
import os
import profile
import sys
import typing
from time import time

import pandas as pd
import cudf
import mrc
from line_profiler import LineProfiler
from sklearn.preprocessing import StandardScaler

from morpheus.config import Config
from morpheus.messages.message_meta import MessageMeta
from morpheus.pipeline import LinearPipeline
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.input.file_source_stage import FileSourceStage
from morpheus.utils.logger import configure_logging

profiler = LineProfiler()

def profile(func):
    def inner(*args, **kwargs):
        profiler.add_function(func)
        profiler.enable_by_count()
        return func(*args, **kwargs)

    return inner

def timer_func(func):
    # This function shows the execution time of
    # the function object passed
    def wrap_func(*args, **kwargs):
        t1 = time()
        result = func(*args, **kwargs)
        t2 = time()
        print(f'Function {func.__name__!r} executed in {(t2 - t1):.4f}s')
        return result

    return wrap_func

class GitSourceStage(SingleOutputSource):

    def __init__(self, c: Config, url):
        super().__init__(c)
        self.url = url

    @property
    def name(self) -> str:
        return "from-git"

    def supports_cpp_node(self):
        return False

    def _build_source(self, builder: mrc.Builder) -> StreamPair:
        out_stream = builder.make_source(self.unique_name, self._generate_frames)
        out_type = MessageMeta
        return out_stream, out_type

    def _generate_frames(self):
        yield MessageMeta(cudf.read_csv(self.url))

class UmapStage(SinglePortStage):

    @property
    def name(self) -> str:
        return "umap"

    def accepted_types(self) -> typing.Tuple:
        return (typing.Any,)

    def supports_cpp_node(self) -> bool:
        return False

    @profile
    def on_data_internal(self, message):
        import umap

        with message.mutable_dataframe() as mdf:
            df = mdf.to_pandas()
            df = df.dropna()
            reducer = umap.UMAP()
            df = df[[
                "bill_length_mm",
                "bill_depth_mm",
                "flipper_length_mm",
                "body_mass_g",
            ]].values
            df = StandardScaler().fit_transform(df)
            embedding = reducer.fit_transform(df)

            return message

    def on_data(self, message: MessageMeta):
        return self.on_data_internal(message)

    def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
        print("Building single", flush=True)
        node = builder.make_node(self.unique_name, self.on_data)
        builder.make_edge(input_stream[0], node)

        return node, input_stream[1]

def pipeline():
    configure_logging(log_level=logging.DEBUG)

    config = Config()
    config.num_threads = os.cpu_count()
    pipeline = LinearPipeline(config)

    pipeline.set_source(
        GitSourceStage(
            config,
            url=
            "https://raw.githubusercontent.com/allisonhorst/palmerpenguins/c19a904462482430170bfe2c718775ddb7dbb885/inst/extdata/penguins.csv"
        ))
    pipeline.add_stage(UmapStage(config))
    pipeline.run()

def no_morpheus():
    @profile
    def normal_umap(df):
        import umap
        df = df.dropna()
        reducer = umap.UMAP()
        df = df[[
            "bill_length_mm",
            "bill_depth_mm",
            "flipper_length_mm",
            "body_mass_g",
        ]].values
        df = StandardScaler().fit_transform(df)
        embedding = reducer.fit_transform(df)

    normal_umap(
        pd.read_csv(
            "https://raw.githubusercontent.com/allisonhorst/palmerpenguins/c19a904462482430170bfe2c718775ddb7dbb885/inst/extdata/penguins.csv"
        ))

if __name__ == '__main__':

    if (len(sys.argv) > 1 and sys.argv[1] == "morpheus"):
        pipeline()
    else:
        no_morpheus()

    profiler.print_stats(output_unit=1e-6)
mdemoret-nv commented 1 year ago

@tanmoyio @lmeyerov Just checking in to make sure you saw this and wanted to get your feedback on whether this needs more investigation or not.

tanmoyio commented 1 year ago

@mdemoret-nv I tried the steps suggested by @drobison00 I was using morpheus-23.03 container, installed umap-learn and other required dependencies via mamba, got same performance as before. still its slow

I also tried it on 2 different gpu systems, with no change in the morpheus container