nv-morpheus / Morpheus

Morpheus SDK
Apache License 2.0
384 stars 145 forks source link

[BUG]: Executing more than one LLMEngine causes Morpheus to hang #1339

Closed dagardner-nv closed 1 year ago

dagardner-nv commented 1 year ago

Version

23.11

Which installation method(s) does this occur on?

Source

Describe the bug.

Executing more than one LLMEngine causes morpheus to hang. This was first noticed while executing unittests as multiple tests will execute an LLMEngine.

Minimum reproducible example

import logging

import cudf

from morpheus.config import Config
from morpheus.config import CppConfig
from morpheus.llm import LLMEngine
from morpheus.llm.nodes.extracter_node import ExtracterNode
from morpheus.llm.task_handlers.simple_task_handler import SimpleTaskHandler
from morpheus.messages import ControlMessage
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.llm.llm_engine_stage import LLMEngineStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.utils.concat_df import concat_dataframes
from morpheus.utils.logger import configure_logging

def _build_engine() -> LLMEngine:
    engine = LLMEngine()
    engine.add_node("extracter", node=ExtracterNode())
    engine.add_task_handler(inputs=["/extracter"], handler=SimpleTaskHandler())

    return engine

def run_pipeline():
    config = Config()
    CppConfig.set_should_use_cpp(False)
    insects = ["ant", "bee", "butterfly", "mosquito", "grasshopper"]
    mammals = ["lion", "dolphin", "gorilla", "wolf", "tiger"]
    reptiles = ['lizards', 'snakes', 'turtles', 'frogs', 'toads']
    df = cudf.DataFrame({"insects": insects, "mammals": mammals, "reptiles": reptiles})

    task_payload = {"task_type": "llm_engine", "task_dict": {"input_keys": ['mammals']}}

    pipe = LinearPipeline(config)
    pipe.set_source(InMemorySourceStage(config, dataframes=[df]))
    pipe.add_stage(
        DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=task_payload))
    pipe.add_stage(LLMEngineStage(config, engine=_build_engine()))
    sink = pipe.add_stage(InMemorySinkStage(config))

    pipe.run()

    messages = sink.get_messages()
    responses = concat_dataframes(messages)
    print(f"Pipeline complete. Received {len(messages)} responses\n{responses['response']}")

if __name__ == "__main__":
    configure_logging(logging.DEBUG)

    iteration_num = 0
    while True:
        iteration_num += 1
        print(f"\n***********\nStarting iteration: {iteration_num}\n***********\n")
        run_pipeline()

Relevant log output

Click here to see error details

 [Paste the error here, it will be hidden by default]

Full env printout

Click here to see environment details

 [Paste the results of print_env.sh here, it will be hidden by default]

Other/Misc.

No response

Code of Conduct

dagardner-nv commented 1 year ago

Executing the engine directly outside of a pipeline via asyncio.run does not hang.