nv-morpheus / Morpheus

Morpheus SDK
Apache License 2.0
308 stars 119 forks source link

[BUG]: Default Parameter handling for input_topic in KafkaSourceStage constructor #1571

Open nuxwin opened 3 months ago

nuxwin commented 3 months ago

Version

v24.03.00a-runtime

Which installation method(s) does this occur on?

Manual build of release container.

Describe the bug.

The KafkaSourceStage class is designed to initialize with default parameter for input_topic. The parameter expect default to None when not explicitly passed by the caller. However, due to a bug, the parameter is being initialized as empty structures instead of None. This unexpected behavior affects conditional logic later in the code, specifically checks intended to set default value when the parameter is None.

All complex parameters with default None are affected.

Minimum reproducible example

Modified code for logging purpose:

...

import logging
import sys

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', stream=sys.stdout)

...
    def __init__(self,
                 config: Config,
                 bootstrap_servers: str,
                 input_topic: typing.List[str] = None,
                 group_id: str = "morpheus",
                 client_id: str = None,
                 poll_interval: str = "10millis",
                 disable_commit: bool = False,
                 disable_pre_filtering: bool = False,
                 auto_offset_reset: AutoOffsetReset = AutoOffsetReset.LATEST,
                 stop_after: int = 0,
                 async_commits: bool = True):
        super().__init__(config)
...
        if input_topic is None:
            input_topic = ["test_pcap"]

        logging.debug(f"input_topic value: {input_topic}")
        exit(1)
...

Pipeline run. Relevant part is the from-kafka command.

exec morpheus \
  --log_level=DEBUG \
  run \
    --num_threads 1 \
    --model_max_batch_size 1 \
    --use_cpp false \
  pipeline-other \
    --model_fea_length=90 \
  from-kafka \
    --bootstrap_servers "${KAFKA_SERVER_HOST}:${KAFKA_SERVER_PORT}" \
    --group_id ai-pf-m-001 \
    --client_id ai-pf-m-001-consumer \
  deserialize \
  num-producer \
  inf-triton \
    --model_name=model.tf \
    --server_url=ai-pf-m-001-triton-server:8001 \
    --force_convert_inputs=True \
    --use_shared_memory \
    --inout_mapping dense_input props \
  serialize \
  to-kafka \
    --bootstrap_servers "${KAFKA_SERVER_HOST}:${KAFKA_SERVER_PORT}" \
   "${OUTPUT_TOPIC_OPTS[@]}" \
    --client_id ai-pf-m-001-producer

### Relevant log output

ai-pf-001-morpheus-pipeline | 2024-03-19 13:24:18,112 - root - DEBUG - input_topic value: ()


### Full env printout

ai-pf-001-morpheus-pipeline | Module 'FileBatcher' was successfully registered with 'morpheus' namespace. ai-pf-001-morpheus-pipeline | Module 'FileToDF' was successfully registered with 'morpheus' namespace. ai-pf-001-morpheus-pipeline | Module 'FilterCmFailed' was successfully registered with 'morpheus' namespace. ai-pf-001-morpheus-pipeline | Module 'FilterControlMessage' was successfully registered with 'morpheus' namespace. ai-pf-001-morpheus-pipeline | Module 'FilterDetections' was successfully registered with 'morpheus' namespace. ai-pf-001-morpheus-pipeline | Module 'FromControlMessage' was successfully registered with 'morpheus' namespace. ai-pf-001-morpheus-pipeline | Module 'MLFlowModelWriter' was successfully registered with 'morpheus' namespace. ai-pf-001-morpheus-pipeline | Module 'PayloadBatcher' was successfully registered with 'morpheus' namespace. ai-pf-001-morpheus-pipeline | Module 'Serialize' was successfully registered with 'morpheus' namespace. ai-pf-001-morpheus-pipeline | Module 'ToControlMessage' was successfully registered with 'morpheus' namespace. ai-pf-001-morpheus-pipeline | Module 'WriteToElasticsearch' was successfully registered with 'morpheus' namespace. ai-pf-001-morpheus-pipeline | Module 'WriteToFile' was successfully registered with 'morpheus' namespace. ai-pf-001-morpheus-pipeline | Module 'deserialize' was successfully registered with 'morpheus' namespace. ai-pf-001-morpheus-pipeline | 2024-03-19 13:24:18,112 - root - DEBUG - input_topics before validation: ()



### Other/Misc.

_No response_

### Code of Conduct

- [X] I agree to follow Morpheus' Code of Conduct
- [X] I have searched the [open bugs](https://github.com/nv-morpheus/Morpheus/issues?q=is%3Aopen+is%3Aissue+label%3Abug) and have found no duplicates for this bug report
jarmak-nv commented 3 months ago

Hi @nuxwin!

Thanks for submitting this issue - our team has been notified and we'll get back to you as soon as we can! In the mean time, feel free to add any relevant information to this issue.

efajardo-nv commented 3 months ago

@nuxwin Thank you for reporting this. This appears to be a problem with how the stage is registered to the Morpheus CLI rather than the stage itself. The input_topic parameter is being inferred as a tuple. In this case, an empty tuple when input_topic is not provided. The issue is not seen when stage is used in a Python script like this:

pipeline = LinearPipeline(config)
pipeline.set_source(KafkaSourceStage(config, bootstrap_servers="auto"))

Here input_topic will default to test_pcap as expected.