kubeedge / ianvs

Distributed Synergy AI Benchmarking
https://ianvs.readthedocs.io
Apache License 2.0
115 stars 46 forks source link

add: Cloud-edge collaborative inference for LLM based on KubeEdge-Ianvs #122

Closed FuryMartin closed 3 months ago

FuryMartin commented 4 months ago

Cloud-edge collaborative inference for LLM based on KubeEdge-Ianvs proposal

What type of PR is this? /kind design

What this PR does / why we need it: The PR is a proposal to add cloud-edge collaborative inference algorithm for LLM to combine the advantages of high inference accuracy of cloud LLM with strong privacy and fast inference of edge LLM through the strategy of cloud edge collaboration, so as to better meet the needs of edge users.

Which issue(s) this PR fixes:

96

hsj576 commented 4 months ago

The PR needs to add the design of how to implement this feature in Ianvs, especially to ensure that the new interface is consistent with the interface in Sedna.

FuryMartin commented 3 months ago

Thanks to @hsj576 and @MooreZheng for the review.

While trying to improve my proposal, I found that the integration of the new algorithm still needs to be discussed.

Basicly, there are two possible plans to integrate the cloud-edge collaborative strategy for LLMs.

In our last two meetings, we had some discussions about implementation details. From my personal understanding, @MooreZheng appears to prefer using Plan A to integrate Sedna's JointInference interface for alignment, while @hsj576’s initial idea aligns more with Plan B, which leaves the collaborative strategy to be implemented in examples/.

However, these two approaches differ significantly, and I would like to seek advice on which integration method should be adopted. If my understanding is incorrect, please feel free to point it out directly.

In the following part, I will present detailes for plan A and plan B, highlighting their respective advantages and disadvantages.

However, since we are designing an new algorithm, it's essential to have a thorough understanding of the code for both Sedna and Ianvs. I will first introduce the implementation logic and existing issues of JointInference in Sedna, followed by a review of the reasoning behind Ianvs introducing the new algorithm paradigm.

1 Sedna's JointInference

1.1 Implemetation Detail

The core class of the Joint Inference feature in Sedna is JointInference:

class JointInference(JobBase):

which mainly has an __init__ function and an inference function.

1.1.1 __init__(self, estimator=None, hard_example_mining: dict = None)

The constructor __init__ receives an estimator as the edge model instance and hard_example_mining as the configuration for the hard example mining algorithm. The specific logic is as follows.

First, get the local host, remote ip, and port by reading environment variables in below: Source Code

    def __init__(self, estimator=None, hard_example_mining: dict = None):
        super(JointInference, self).__init__(estimator=estimator)
        self.job_kind = K8sResourceKind.JOINT_INFERENCE_SERVICE.value
        self.local_ip = get_host_ip()
        self.remote_ip = self.get_parameters(
            "BIG_MODEL_IP", self.local_ip)
        self.port = int(self.get_parameters("BIG_MODEL_PORT", "5000"))

Next, declare the local reporter to report inference progress: Source Code

        report_msg = {
            "name": self.worker_name,
            "namespace": self.config.namespace,
            "ownerName": self.job_name,
            "ownerKind": self.job_kind,
            "kind": "inference",
            "results": []
        }
        period_interval = int(self.get_parameters("LC_PERIOD", "30"))
        self.lc_reporter = LCReporter(lc_server=self.config.lc_server,
                                      message=report_msg,
                                      period_interval=period_interval)
        self.lc_reporter.setDaemon(True)
        self.lc_reporter.start()

Then, load the edge model: Source Code

        if callable(self.estimator):
            self.estimator = self.estimator()
        if not os.path.exists(self.model_path):
            raise FileExistsError(f"{self.model_path} miss")
        else:
            self.estimator.load(self.model_path)

Subsequently, Create a client for calling cloud model. Source Code

        self.cloud = ModelClient(service_name=self.job_name,
                                 host=self.remote_ip, port=self.port)

Finally, instantiate the hard example mining algorithm: Source Code

        self.hard_example_mining_algorithm = None
        if not hard_example_mining:
            hard_example_mining = self.get_hem_algorithm_from_config()
        if hard_example_mining:
            hem = hard_example_mining.get("method", "IBT")
            hem_parameters = hard_example_mining.get("param", {})
            self.hard_example_mining_algorithm = ClassFactory.get_cls(
                ClassType.HEM, hem
            )(**hem_parameters)

1.1.2 inference(self, data=None, post_process=None, **kwargs)

The inference function takes the data parameter as input and the post_process parameter as a post-processing function. The specific logic is as follows:

First, register post_process as callback_func, serving as a general post-processing function: Source Code

    def inference(self, data=None, post_process=None, **kwargs):
        callback_func = None
        if callable(post_process):
            callback_func = post_process
        elif post_process is not None:
            callback_func = ClassFactory.get_cls(
                ClassType.CALLBACK, post_process)

Next, perform edge model inference and report: Source Code

        res = self.estimator.predict(data, **kwargs)
        edge_result = deepcopy(res)

        if callback_func:
            res = callback_func(res)

        self.lc_reporter.update_for_edge_inference()

Then, determine whether the example is hard or not: Source Code

        if self.hard_example_mining_algorithm:
            is_hard_example = self.hard_example_mining_algorithm(res)

If it is a hard example, call the cloud model for inference and report: Source Code

            if is_hard_example:
                try:
                    cloud_data = self.cloud.inference(
                        data.tolist(), post_process=post_process, **kwargs)
                    cloud_result = cloud_data["result"]
                except Exception as err:
                    self.log.error(f"get cloud result error: {err}")
                else:
                    res = cloud_result
                self.lc_reporter.update_for_collaboration_inference()

Finally, return the inference result: Source Code

        return [is_hard_example, res, edge_result, cloud_result]

1.2 Problems

1.2.1 Current Collaborative Strategy is hard-coded and not suitable for LLM.

As shown in the code above, this JointInference class essentially defines a certain collaborative method where edge inference is performed first, followed by hard example mining, and then a decision is made on whether cloud inference is needed.

This approach is suitable for edge-cloud collaboration for computer vision's Object Detection tasks. The example for JointInference provided by Sedna is Helmet Detection Inference , which uses IBT (image-box-thresholds) as the hard example mining algorithm. The general process includes:

However, due to the auto-regressive generation pattern of LLMs, we cannot allow a hard example to be inferenced for several seconds on edge and then again for several seconds in the cloud, which will cause extremely bad latency performance.

The LLM collaborative algorithm we currently designed is query-routing, which requires first invoking a hard example mining algorithm for query difficulty assessment before inferring either in the cloud or at the edge.

Therefore, Sedna JointInference currently just has a hard-coded collaborative strategy suitable for the CV tasks, which cannot be directly used for collaborative inference for LLMs without modifications.

1.2.2 Cloud model's ModelClient has compatible issues with the current LLM API paradigm.

There is also an issue with the ModelClient used in Sedna's JointInference. As shown in the code above, Sedna's JointInference initializes a ModelClient defined by Sedna, as detailed in the following code: Source Code

class ModelClient:
    """Remote model service"""

    def __init__(self, service_name, version="",
                 host="127.0.0.1", port="8080", protocol="http"):
        self.server_name = f"{service_name}{version}"
        self.endpoint = f"{protocol}://{host}:{port}/{service_name}"

    def check_server_status(self):
        return http_request(url=self.endpoint, method="GET")

    def inference(self, x, **kwargs):
        """Use the remote big model server to inference."""
        json_data = deepcopy(kwargs)
        json_data.update({"data": x})
        _url = f"{self.endpoint}/predict"
        return http_request(url=_url, method="POST", json=json_data)

The code shows that the API url and data format of the interface are hard-coded to align with the InferenceServer interface defined by Sedna, which means that the cloud model must be deployed using InferenceServer to complete JointInference. Source Code


class InferenceServer(BaseServer):  # pylint: disable=too-many-arguments
    """
    rest api server for inference
    """

    def __init__(
            self,
            model,
            servername,
            host: str = '127.0.0.1',
            http_port: int = 8080,
            max_buffer_size: int = 104857600,
            workers: int = 1):
        super(
            InferenceServer,
            self).__init__(
            servername=servername,
            host=host,
            http_port=http_port,
            workers=workers)
        self.model = model
        self.max_buffer_size = max_buffer_size
        self.app = FastAPI(
            routes=[
                APIRoute(
                    f"/{servername}",
                    self.model_info,
                    response_model=ServeModelInfoResult,
                    response_class=JSONResponse,
                    methods=["GET"],
                ),
                APIRoute(
                    f"/{servername}/predict",
                    self.predict,
                    response_model=ServePredictResult,
                    response_class=JSONResponse,
                    methods=["POST"],
                ),
            ],
            log_level="trace",
            timeout=600,
        )

    def start(self):
        return self.run(self.app)

    def model_info(self):
        return ServeModelInfoResult(infos=self.get_all_urls())

    def predict(self, data: InferenceItem):
        inference_res = self.model.inference(
            data.data, post_process=data.callback)
        return ServePredictResult(result=inference_res)

However, this deployment method poses certain issues for LLMs:

If users want to test cloud models like GPT-4o, Claude, Gemini, Moonshot-AI, they must rewrite a forwarding service using InferenceServer defined by Sedna, which is complicated (especially when considering features like streaming output) and unnecessary.

2 Ianvs algorithm's implemetation

In order to introduce a new algorithm for Ianvs, I need to briefly outline the implementation logic of the various algorithms currently in Ianvs.

In Ianvs, ianvs/benchmarking.py serves as the entry point of the program and constructs a BenchmarkJob:

https://github.com/kubeedge/ianvs/blob/4de73b25fd41859dca8fb502a6bfa0c0b348743b/benchmarking.py#L36-L37

The specific logic of BenchmarkJob is defined in ianvs/cmd/obj/benchmarkingjob.py. We can see that the testcase is created by the build_testcases() method of testcasecontroller:

https://github.com/kubeedge/ianvs/blob/4de73b25fd41859dca8fb502a6bfa0c0b348743b/core/cmd/obj/benchmarkingjob.py#L91-L92

build_testcases() constructs the algorithm through the _parse_algorithms_config() function:

https://github.com/kubeedge/ianvs/blob/4de73b25fd41859dca8fb502a6bfa0c0b348743b/core/testcasecontroller/testcasecontroller.py#L34-L44

The _parse_algorithms_config() function has a line indicating the instantiation of the Algorithm class:

https://github.com/kubeedge/ianvs/blob/4de73b25fd41859dca8fb502a6bfa0c0b348743b/core/testcasecontroller/testcasecontroller.py#L74

In ianvs/core/testcasecontroller/algorithm/algorithm.py, the Algorithm class is declared, which will return various algorithm instances in paradigm function:

https://github.com/kubeedge/ianvs/blob/4de73b25fd41859dca8fb502a6bfa0c0b348743b/core/testcasecontroller/algorithm/algorithm.py#L95-L105

These algorithm instances are defined in the files located in various folders under ianvs/core/testcasecontroller/algorithm/paradigm. These algorithm classes inherit from the ParadigmBase class and are required to have a run() function, which serves as the main entry point for training and inference.

Taking IncrementalLearning as an example, it is defined by the IncrementalLearning class in ianvs/core/testcasecontroller/algorithm/paradigm/incremental_learning/incremental_learning.py.

For inference tasks, IncrementalLearning first uses the build_paradigm_job() function to construct a job, and then calls the job's inference() interface to complete the inference.

https://github.com/kubeedge/ianvs/blob/4de73b25fd41859dca8fb502a6bfa0c0b348743b/core/testcasecontroller/algorithm/paradigm/incremental_learning/incremental_learning.py#L132-L139

build_paradigm_job() is defined in ianvs/core/testcasecontroller/algorithm/paradigm/base.py by ParadigmBase:

https://github.com/kubeedge/ianvs/blob/4de73b25fd41859dca8fb502a6bfa0c0b348743b/core/testcasecontroller/algorithm/paradigm/base.py#L93-L129

It can be seen that the IncrementalLearning class calls the IncrementalLearning interface from sedna.core.incremental_learning when constructing tasks, and inference() is provided by this algorithm in sedna.

During the instantiation of the sedna IncrementalLearning class, two parameters, estimator and hard_example_mining, are passed in. These two parameters are registered and instantiated by the BaseModel class from basemodel.py and the class in hard_example_mining.py implemented in examples/pcb-aoi/incremental_learning_bench/fault_detection/testalgorithms/fpn.

https://github.com/kubeedge/ianvs/blob/4de73b25fd41859dca8fb502a6bfa0c0b348743b/examples/pcb-aoi/incremental_learning_bench/fault%20detection/testalgorithms/fpn/basemodel.py#L61-L62

https://github.com/kubeedge/ianvs/blob/4de73b25fd41859dca8fb502a6bfa0c0b348743b/examples/pcb-aoi/incremental_learning_bench/fault%20detection/testalgorithms/fpn/hard_example_mining.py#L49-L50

In the process of the IncrementalLearning class in Sedna, the inference of the estimator and hard example mining will be completed sequentially, just like in Sedna JointInference.

3 Two Plans to integrate JointInference to Ianvs

Due to the current implementation flaws of JointInference in Sedna, considering Ianvs's own logic for integrating new algorithms, there are two possible ways to introduce JointInference.

3.1 Plan A

In this approach, similar to the integration of Incremental Learning and LifeLongLearning, we will build jobs based on Sedna's JointInference class. Demo Code

        if paradigm_type == ParadigmType.JOINT_INFERENCE.value:
            return JointInference(
                estimator=self.module_instances.get(
                    ModuleType.BASEMODEL.value),
                cloud=self.module_instances.get(
                    ModuleType.APIMODEL.value),
                hard_example_mining=self.module_instances.get(
                    ModuleType.HARD_EXAMPLE_MINING.value)
            )

This method requires some modifications to Sedna JointInference to address the two problems we previously mentioned:

    def __init__(self, estimator=None, cloud=None, hard_example_mining: dict = None):
        super(JointInference, self).__init__(estimator=estimator)
        mining_mode = kwargs.get("mining_mode", "inference-then-mining")

        if mining_mode == "inference-then-mining":
            res, edge_result = self._get_edge_result(data, callback_func, **kwargs)

            if self.hard_example_mining_algorithm is None:
                raise ValueError("Hard example mining algorithm is not set.")

            is_hard_example = self.hard_example_mining_algorithm(res)
            if is_hard_example:
                res, cloud_result = self._get_cloud_result(data, post_process=post_process, **kwargs)

        elif mining_mode == "mining-then-inference":
            # First conduct hard example mining, and then decide whether to execute on the edge or in the cloud.
            if self.hard_example_mining_algorithm is None:
                raise ValueError("Hard example mining algorithm is not set.")

            is_hard_example = self.hard_example_mining_algorithm(res)
            if is_hard_example:
                if not sepeculative_decoding:
                    res, cloud_result = self._get_cloud_result(data, post_process=post_process, **kwargs)
                else:
                    # do speculative_decoding
                    pass
            else:
                res, edge_result = self._get_edge_result(data, callback_func, **kwargs)

        else:
            raise ValueError(
                "Mining Mode must be in ['mining-then-inference', 'inference-then-mining']"
            )

        return [is_hard_example, res, edge_result, cloud_result]

3.1.1 Advantages

3.1.2 Drawbacks

3.2 Plan B

In addition to paradigms like Incremental Learning and Lifelong Learning that directly call Sedna's interfaces, Ianvs also includes paradigms such as Single Task Learning, which do not include any details about training and inference; those details are entirely handled by the BasedModel class found in the examples/ directory, which is as follows:

https://github.com/kubeedge/ianvs/blob/4de73b25fd41859dca8fb502a6bfa0c0b348743b/core/testcasecontroller/algorithm/paradigm/base.py#L93-L94

In Plan B, we can introduce JointInference in a manner similar to SingleTaskLearning. It does not include any specific strategy for collaborative algorithms. The newly added JointInference paradigm serves merely as an entry point for calling a collaborative algorithm, and its code is almost identical to that of SingleTaskLearning but without training step. The specific collaborative strategy will be written in the predict() function of the BaseModel found in examples/.

3.2.1 Advantages

3.2.2 Drawbacks

Which plan should we adopt? If there are any other options, please feel free to suggest them.

hsj576 commented 3 months ago

Thanks to @hsj576 and @MooreZheng for the review.

While trying to improve my proposal, I found that the integration of the new algorithm still needs to be discussed.

Basicly, there are two possible plans to integrate the cloud-edge collaborative strategy for LLMs.

  • Plan A will introduce Sedna's JointInference to ianvs/core, and modify it to support collaboration in LLM scenarios. The modified class can be contributed back to Sedna.
  • Plan B will leave the implementation of the collaborative strategy entirely up to users (i.e., implemented in examples/), with no specific details of Joint Inference included in Ianvs/core, making introducing Sedna's JointInference unnecessary.

In our last two meetings, we had some discussions about implementation details. From my personal understanding, @MooreZheng appears to prefer using Plan A to integrate Sedna's JointInference interface for alignment, while @hsj576’s initial idea aligns more with Plan B, which leaves the collaborative strategy to be implemented in examples/.

However, these two approaches differ significantly, and I would like to seek advice on which integration method should be adopted. If my understanding is incorrect, please feel free to point it out directly.

In the following part, I will present detailes for plan A and plan B, highlighting their respective advantages and disadvantages.

However, since we are designing an new algorithm, it's essential to have a thorough understanding of the code for both Sedna and Ianvs. I will first introduce the implementation logic and existing issues of JointInference in Sedna, followed by a review of the reasoning behind Ianvs introducing the new algorithm paradigm.

1 Sedna's JointInference

1.1 Implemetation Detail

The core class of the Joint Inference feature in Sedna is JointInference:

class JointInference(JobBase):

which mainly has an __init__ function and an inference function.

1.1.1 __init__(self, estimator=None, hard_example_mining: dict = None)

The constructor __init__ receives an estimator as the edge model instance and hard_example_mining as the configuration for the hard example mining algorithm. The specific logic is as follows.

First, get the local host, remote ip, and port by reading environment variables in below: Source Code

    def __init__(self, estimator=None, hard_example_mining: dict = None):
        super(JointInference, self).__init__(estimator=estimator)
        self.job_kind = K8sResourceKind.JOINT_INFERENCE_SERVICE.value
        self.local_ip = get_host_ip()
        self.remote_ip = self.get_parameters(
            "BIG_MODEL_IP", self.local_ip)
        self.port = int(self.get_parameters("BIG_MODEL_PORT", "5000"))

Next, declare the local reporter to report inference progress: Source Code

        report_msg = {
            "name": self.worker_name,
            "namespace": self.config.namespace,
            "ownerName": self.job_name,
            "ownerKind": self.job_kind,
            "kind": "inference",
            "results": []
        }
        period_interval = int(self.get_parameters("LC_PERIOD", "30"))
        self.lc_reporter = LCReporter(lc_server=self.config.lc_server,
                                      message=report_msg,
                                      period_interval=period_interval)
        self.lc_reporter.setDaemon(True)
        self.lc_reporter.start()

Then, load the edge model: Source Code

        if callable(self.estimator):
            self.estimator = self.estimator()
        if not os.path.exists(self.model_path):
            raise FileExistsError(f"{self.model_path} miss")
        else:
            self.estimator.load(self.model_path)

Subsequently, Create a client for calling cloud model. Source Code

        self.cloud = ModelClient(service_name=self.job_name,
                                 host=self.remote_ip, port=self.port)

Finally, instantiate the hard example mining algorithm: Source Code

        self.hard_example_mining_algorithm = None
        if not hard_example_mining:
            hard_example_mining = self.get_hem_algorithm_from_config()
        if hard_example_mining:
            hem = hard_example_mining.get("method", "IBT")
            hem_parameters = hard_example_mining.get("param", {})
            self.hard_example_mining_algorithm = ClassFactory.get_cls(
                ClassType.HEM, hem
            )(**hem_parameters)

1.1.2 inference(self, data=None, post_process=None, **kwargs)

The inference function takes the data parameter as input and the post_process parameter as a post-processing function. The specific logic is as follows:

First, register post_process as callback_func, serving as a general post-processing function: Source Code

    def inference(self, data=None, post_process=None, **kwargs):
      callback_func = None
        if callable(post_process):
            callback_func = post_process
        elif post_process is not None:
            callback_func = ClassFactory.get_cls(
                ClassType.CALLBACK, post_process)

Next, perform edge model inference and report: Source Code

        res = self.estimator.predict(data, **kwargs)
        edge_result = deepcopy(res)

        if callback_func:
            res = callback_func(res)

        self.lc_reporter.update_for_edge_inference()

Then, determine whether the example is hard or not: Source Code

        if self.hard_example_mining_algorithm:
            is_hard_example = self.hard_example_mining_algorithm(res)

If it is a hard example, call the cloud model for inference and report: Source Code

            if is_hard_example:
                try:
                    cloud_data = self.cloud.inference(
                        data.tolist(), post_process=post_process, **kwargs)
                    cloud_result = cloud_data["result"]
                except Exception as err:
                    self.log.error(f"get cloud result error: {err}")
                else:
                    res = cloud_result
                self.lc_reporter.update_for_collaboration_inference()

Finally, return the inference result: Source Code

        return [is_hard_example, res, edge_result, cloud_result]

1.2 Problems

1.2.1 Current Collaborative Strategy is hard-coded and not suitable for LLM.

As shown in the code above, this JointInference class essentially defines a certain collaborative method where edge inference is performed first, followed by hard example mining, and then a decision is made on whether cloud inference is needed.

This approach is suitable for edge-cloud collaboration for computer vision's Object Detection tasks. The example for JointInference provided by Sedna is Helmet Detection Inference , which uses IBT (image-box-thresholds) as the hard example mining algorithm. The general process includes:

  • Edge model inference to obtain object detection boxes;
  • Calculating hard coefficients based on the confidence of the detection boxes and a preset threshold threshold_box;
  • If the hard coefficient exceeds the threshold threshold_img, the example is considered hard and should be offloaded to the cloud for inference.

However, due to the auto-regressive generation pattern of LLMs, we cannot allow a hard example to be inferenced for several seconds on edge and then again for several seconds in the cloud, which will cause extremely bad latency performance.

The LLM collaborative algorithm we currently designed is query-routing, which requires first invoking a hard example mining algorithm for query difficulty assessment before inferring either in the cloud or at the edge.

Therefore, Sedna JointInference currently just has a hard-coded collaborative strategy suitable for the CV tasks, which cannot be directly used for collaborative inference for LLMs without modifications.

1.2.2 Cloud model's ModelClient has compatible issues with the current LLM API paradigm.

There is also an issue with the ModelClient used in Sedna's JointInference. As shown in the code above, Sedna's JointInference initializes a ModelClient defined by Sedna, as detailed in the following code: Source Code

class ModelClient:
    """Remote model service"""

    def __init__(self, service_name, version="",
                 host="127.0.0.1", port="8080", protocol="http"):
        self.server_name = f"{service_name}{version}"
        self.endpoint = f"{protocol}://{host}:{port}/{service_name}"

    def check_server_status(self):
        return http_request(url=self.endpoint, method="GET")

    def inference(self, x, **kwargs):
        """Use the remote big model server to inference."""
        json_data = deepcopy(kwargs)
        json_data.update({"data": x})
        _url = f"{self.endpoint}/predict"
        return http_request(url=_url, method="POST", json=json_data)

The code shows that the API url and data format of the interface are hard-coded to align with the InferenceServer interface defined by Sedna, which means that the cloud model must be deployed using InferenceServer to complete JointInference. Source Code

class InferenceServer(BaseServer):  # pylint: disable=too-many-arguments
    """
    rest api server for inference
    """

    def __init__(
            self,
            model,
            servername,
            host: str = '127.0.0.1',
            http_port: int = 8080,
            max_buffer_size: int = 104857600,
            workers: int = 1):
        super(
            InferenceServer,
            self).__init__(
            servername=servername,
            host=host,
            http_port=http_port,
            workers=workers)
        self.model = model
        self.max_buffer_size = max_buffer_size
        self.app = FastAPI(
            routes=[
                APIRoute(
                    f"/{servername}",
                    self.model_info,
                    response_model=ServeModelInfoResult,
                    response_class=JSONResponse,
                    methods=["GET"],
                ),
                APIRoute(
                    f"/{servername}/predict",
                    self.predict,
                    response_model=ServePredictResult,
                    response_class=JSONResponse,
                    methods=["POST"],
                ),
            ],
            log_level="trace",
            timeout=600,
        )

    def start(self):
        return self.run(self.app)

    def model_info(self):
        return ServeModelInfoResult(infos=self.get_all_urls())

    def predict(self, data: InferenceItem):
        inference_res = self.model.inference(
            data.data, post_process=data.callback)
        return ServePredictResult(result=inference_res)

However, this deployment method poses certain issues for LLMs:

  • On one hand, existing cloud-based LLMs are often too large to be deployed by users themselves;
  • On the other hand, the API defined by InferenceServer differs significantly from the OpenAI API, which has almost become a de facto standard for LLM APIs.

If users want to test cloud models like GPT-4o, Claude, Gemini, Moonshot-AI, they must rewrite a forwarding service using InferenceServer defined by Sedna, which is complicated (especially when considering features like streaming output) and unnecessary.

2 Ianvs algorithm's implemetation

In order to introduce a new algorithm for Ianvs, I need to briefly outline the implementation logic of the various algorithms currently in Ianvs.

In Ianvs, ianvs/benchmarking.py serves as the entry point of the program and constructs a BenchmarkJob:

https://github.com/kubeedge/ianvs/blob/4de73b25fd41859dca8fb502a6bfa0c0b348743b/benchmarking.py#L36-L37

The specific logic of BenchmarkJob is defined in ianvs/cmd/obj/benchmarkingjob.py. We can see that the testcase is created by the build_testcases() method of testcasecontroller:

https://github.com/kubeedge/ianvs/blob/4de73b25fd41859dca8fb502a6bfa0c0b348743b/core/cmd/obj/benchmarkingjob.py#L91-L92

build_testcases() constructs the algorithm through the _parse_algorithms_config() function:

https://github.com/kubeedge/ianvs/blob/4de73b25fd41859dca8fb502a6bfa0c0b348743b/core/testcasecontroller/testcasecontroller.py#L34-L44

The _parse_algorithms_config() function has a line indicating the instantiation of the Algorithm class:

https://github.com/kubeedge/ianvs/blob/4de73b25fd41859dca8fb502a6bfa0c0b348743b/core/testcasecontroller/testcasecontroller.py#L74

In ianvs/core/testcasecontroller/algorithm/algorithm.py, the Algorithm class is declared, which will return various algorithm instances in paradigm function:

https://github.com/kubeedge/ianvs/blob/4de73b25fd41859dca8fb502a6bfa0c0b348743b/core/testcasecontroller/algorithm/algorithm.py#L95-L105

These algorithm instances are defined in the files located in various folders under ianvs/core/testcasecontroller/algorithm/paradigm. These algorithm classes inherit from the ParadigmBase class and are required to have a run() function, which serves as the main entry point for training and inference.

Taking IncrementalLearning as an example, it is defined by the IncrementalLearning class in ianvs/core/testcasecontroller/algorithm/paradigm/incremental_learning/incremental_learning.py.

For inference tasks, IncrementalLearning first uses the build_paradigm_job() function to construct a job, and then calls the job's inference() interface to complete the inference.

https://github.com/kubeedge/ianvs/blob/4de73b25fd41859dca8fb502a6bfa0c0b348743b/core/testcasecontroller/algorithm/paradigm/incremental_learning/incremental_learning.py#L132-L139

build_paradigm_job() is defined in ianvs/core/testcasecontroller/algorithm/paradigm/base.py by ParadigmBase:

https://github.com/kubeedge/ianvs/blob/4de73b25fd41859dca8fb502a6bfa0c0b348743b/core/testcasecontroller/algorithm/paradigm/base.py#L93-L129

It can be seen that the IncrementalLearning class calls the IncrementalLearning interface from sedna.core.incremental_learning when constructing tasks, and inference() is provided by this algorithm in sedna.

During the instantiation of the sedna IncrementalLearning class, two parameters, estimator and hard_example_mining, are passed in. These two parameters are registered and instantiated by the BaseModel class from basemodel.py and the class in hard_example_mining.py implemented in examples/pcb-aoi/incremental_learning_bench/fault_detection/testalgorithms/fpn.

https://github.com/kubeedge/ianvs/blob/4de73b25fd41859dca8fb502a6bfa0c0b348743b/examples/pcb-aoi/incremental_learning_bench/fault%20detection/testalgorithms/fpn/basemodel.py#L61-L62

https://github.com/kubeedge/ianvs/blob/4de73b25fd41859dca8fb502a6bfa0c0b348743b/examples/pcb-aoi/incremental_learning_bench/fault%20detection/testalgorithms/fpn/hard_example_mining.py#L49-L50

In the process of the IncrementalLearning class in Sedna, the inference of the estimator and hard example mining will be completed sequentially, just like in Sedna JointInference.

3 Two Plans to integrate JointInference to Ianvs

Due to the current implementation flaws of JointInference in Sedna, considering Ianvs's own logic for integrating new algorithms, there are two possible ways to introduce JointInference.

3.1 Plan A

In this approach, similar to the integration of Incremental Learning and LifeLongLearning, we will build jobs based on Sedna's JointInference class. Demo Code

        if paradigm_type == ParadigmType.JOINT_INFERENCE.value:
            return JointInference(
                estimator=self.module_instances.get(
                    ModuleType.BASEMODEL.value),
                cloud=self.module_instances.get(
                    ModuleType.APIMODEL.value),
                hard_example_mining=self.module_instances.get(
                    ModuleType.HARD_EXAMPLE_MINING.value)
            )

This method requires some modifications to Sedna JointInference to address the two problems we previously mentioned:

  • Introduce a new parameter cloud to the constructor of the JointInference class, allowing users to pass a self-designed model client instance. This resolves the issue where LLM currently needs to build unnecessary forwarding service. Since cloud is an optional parameter, it will not affect existed joint inference examples in Sedna. Demo Code
    def __init__(self, estimator=None, cloud=None, hard_example_mining: dict = None):
        super(JointInference, self).__init__(estimator=estimator)
  • Introduce a parameter mining_mode for the inference() function of the JointInference class. Based on different mining modes, construct corresponding processes to address the issue of current collaborative processes not being compatible with LLMs. I have implemented a simple demo that builds two types, "inference-then-mining" for Object Detection collaborative strategy and "mining-then-inference" for LLM query routing strategy. Demo Code
        mining_mode = kwargs.get("mining_mode", "inference-then-mining")

        if mining_mode == "inference-then-mining":
            res, edge_result = self._get_edge_result(data, callback_func, **kwargs)

            if self.hard_example_mining_algorithm is None:
                raise ValueError("Hard example mining algorithm is not set.")

            is_hard_example = self.hard_example_mining_algorithm(res)
            if is_hard_example:
                res, cloud_result = self._get_cloud_result(data, post_process=post_process, **kwargs)

        elif mining_mode == "mining-then-inference":
            # First conduct hard example mining, and then decide whether to execute on the edge or in the cloud.
            if self.hard_example_mining_algorithm is None:
                raise ValueError("Hard example mining algorithm is not set.")

            is_hard_example = self.hard_example_mining_algorithm(res)
            if is_hard_example:
                if not sepeculative_decoding:
                    res, cloud_result = self._get_cloud_result(data, post_process=post_process, **kwargs)
                else:
                    # do speculative_decoding
                    pass
            else:
                res, edge_result = self._get_edge_result(data, callback_func, **kwargs)

        else:
            raise ValueError(
                "Mining Mode must be in ['mining-then-inference', 'inference-then-mining']"
            )

        return [is_hard_example, res, edge_result, cloud_result]

3.1.1 Advantages

  • Introduce Sedna's JointInference feature to Ianvs core, and may also be contributed to Sedna to support LLM joint inference.

3.1.2 Drawbacks

  • The collaborative strategy will be hard-coded into the core. If users want to implement a new collaborative themselves, they will need to modify ianvs kernel (however, we can consider adding a custom module, similar to BASEMODEL and UNSEEN_TASK_ALLOCATION, allowing users to define it in examples).

3.2 Plan B

In addition to paradigms like Incremental Learning and Lifelong Learning that directly call Sedna's interfaces, Ianvs also includes paradigms such as Single Task Learning, which do not include any details about training and inference; those details are entirely handled by the BasedModel class found in the examples/ directory, which is as follows:

https://github.com/kubeedge/ianvs/blob/4de73b25fd41859dca8fb502a6bfa0c0b348743b/core/testcasecontroller/algorithm/paradigm/base.py#L93-L94

In Plan B, we can introduce JointInference in a manner similar to SingleTaskLearning. It does not include any specific strategy for collaborative algorithms. The newly added JointInference paradigm serves merely as an entry point for calling a collaborative algorithm, and its code is almost identical to that of SingleTaskLearning but without training step. The specific collaborative strategy will be written in the predict() function of the BaseModel found in examples/.

3.2.1 Advantages

  • User-defined collaborative algorithms will be more convenient, allowing for direct customization of the collaboration process within the example.

3.2.2 Drawbacks

  • Since the newly added JointInference class in the paradigm does not contain any details of collaborative algorithms, whether it can be considered a new feature of Ianvs needs to be discussed;
  • Because the details of collaborative algorithms are entirely implemented by users, the introduction of Sedna's JointInference has become unnecessary.

Which plan should we adopt? If there are any other options, please feel free to suggest them.

I prefer plan A. The overall looks good to me.

FuryMartin commented 3 months ago

Considering that Plan A aligns better with the Sedna interface and existing algorithms, I have updated the proposal based on this plan and created some draft code.

You can find more details by clicking these links: proposal and draft code.

@MooreZheng @hsj576

hsj576 commented 3 months ago

/lgtm

MooreZheng commented 3 months ago

/lgtm

kubeedge-bot commented 3 months ago

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: hsj576, MooreZheng

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files: - ~~[OWNERS](https://github.com/kubeedge/ianvs/blob/main/OWNERS)~~ [MooreZheng] Approvers can indicate their approval by writing `/approve` in a comment Approvers can cancel approval by writing `/approve cancel` in a comment