lithops-cloud / lithops

A multi-cloud framework for big data analytics and embarrassingly parallel jobs, that provides an universal API for building parallel applications in the cloud ☁️🚀
http://lithops.cloud
Apache License 2.0
317 stars 105 forks source link

Added k8s rabbitmq backend #1170

Closed macarronesc closed 10 months ago

macarronesc commented 1 year ago

In this version, the Kubernetes backend is added using RabbitMQ as a flag within the K8s configuration. Much clearer and more precise documentation of the changes that are made is also added. Plots are also added that show the differences between the different versions.

Developer's Certificate of Origin 1.1

   By making a contribution to this project, I certify that:

   (a) The contribution was created in whole or in part by me and I
       have the right to submit it under the Apache License 2.0; or

   (b) The contribution is based upon previous work that, to the best
       of my knowledge, is covered under an appropriate open source
       license and I have the right under that license to submit that
       work with modifications, whether created in whole or in part
       by me, under the same open source license (unless I am
       permitted to submit under a different license), as indicated
       in the file; or

   (c) The contribution was provided directly to me by some other
       person who certified (a), (b) or (c) and I have not modified
       it.

   (d) I understand and agree that this project and the contribution
       are public and that a record of the contribution (including all
       personal information I submit with it, including my sign-off) is
       maintained indefinitely and may be redistributed consistent with
       this project or the open source license(s) involved.
JosepSampe commented 1 year ago

I think you didn't update your lithops version to the latest one before including you additions into the k8s backend. Version 3.0.1 includes a few new things in the k8s backend So first update your code to 3.0.1, and then include your additions.

JosepSampe commented 1 year ago

Thanks for updating it.

To clarify, I have some question:

  1. For how long you keep the containers in warm status?

  2. What is the scope of this new feature? on-prem k8s clusters? fixed-size k8s cloud clusters? I'm asking this because when you use a k8s cloud service you can set up a node autoscaler that automatically scales up and down the k8s cluster as needed. This mean that, at the moment of invocation, the cluster can have only 1 node, while the actual max nodes of the cluster can be 10, so you will miss most of the available resources because this functionality won't be able to detect non-existing nodes.

macarronesc commented 1 year ago
  1. The initial idea is that they are warm until they are manually deactivated. This could be changed in the future with a timeout or something similar.

  2. In terms of scope, this version is designed for fixed-size kubernetes clusters such as supercomputers. We could implement some function that checks all nodes when launching lithops and creates the necessary pods, but right now it is not done.

JosepSampe commented 1 year ago

Based on you response, my main question now is: at the end, does this approach make sense? If you have a fixed-size cluster (like on-prem supercomputers) I assume there will be a lot of users using it. If you reserve all the available resources for an indeterminate amount of time only for you, no one else will be able to submit anything until you manually deactivate the pods, right? Lets say there are 2 lithops clients using the same cluster, if you submit a job first, you will take all the resources, so if I try to submit a job my one will fail, right? or am I missing something?

macarronesc commented 1 year ago

It’s true that currently only single tenancy is allowed, but this alternative backend represents a change as it allows the execution of a pipeline with multiple functions in parallel in a much faster and more efficient way than the original backend thanks to the proposed changes (group invocation, rabbitMQ communication, warm-start, etc.). This solution is focused on that group of researchers.

On the other hand, in the use of supercomputers, these clusters have certain nodes assigned by each Lithops client, thus making it not a problem.

Therefore, depending on the use that you want to give to this Kubernetes backend, one solution or another is proposed. Each solution is optimized for a specific use case.

I think it is a great addition to be able to have more possibilities for the user.

JosepSampe commented 11 months ago

My main comment in your previous PR was that you cloud (probably) achieve a similar behavior using the already available config parameters. So, why did you compare your approach vs. the original approach on granularity 1 and granularity 6? It is important to make the distinction between FaaS and CaaS backends, and not try to use a CaaS backend as if it were a FaaS (https://lithops-cloud.github.io/docs/source/worker_granularity.html).

So, in the scenario you are posing, you know in advance the number of nodes you will have access and the amount of resources, so I see a fairer comparison if you compare your approach with, for example, this config in the original approach:

k8s:
    max_workers: 2
    worker_processes: 115
macarronesc commented 11 months ago

Fine, I included your proposed comparison in the plots. Please, accept my PR as I consider that my Lithops Kubernetes backend is very useful when you deploy on your premise cluster.

JosepSampe commented 11 months ago

I forgot to add runtime cpu and runtime memory, since the default is very low:

k8s:
    max_workers: 2
    runtime_cpu: 115
    runtime_memory: <XXXX MB>
    worker_processes: 115

And you can run it both in cold and warm, since the master pod (responsible of giving ids) is kept warm for a while.

I just ran an experiment with the current k8s approach, and these are my results:

Cold: image

Warm: image

macarronesc commented 11 months ago

Thanks for the parameters. As I see in your plots, even setting the most optimal configuration for this use case, there is still a substantial improvement in terms of using the alternative backend.

On the other hand, I have made the plots with the same cluster as the previous tests, to be really fair in the comparison, and once again, there is a substantial improvement in performance, going from 16.23 seconds to 8 in the cold case.

Cold start: plots_225_histogram_cold

Warm start: plots_225_histogram_warm

It must be taken into account that this comparison is being made with only 2 nodes. If it is done with a cluster of several dozen nodes, the improvement will increase substantially due to how these two architectures are designed.

Finally, we must also mention that your backend only takes into account homogeneous clusters. If there is a cluster with nodes of different sizes, your backend does not adapt to it, which our alternative backend does.

For these points, we ask you again to accept our contribution to the Lithops code and everyone, including users and researchers, will benefit from this implementation.

JosepSampe commented 11 months ago

It must be taken into account that this comparison is being made with only 2 nodes. If it is done with a cluster of several dozen nodes, the improvement will increase substantially due to how these two architectures are designed.

Why? if you have 20 nodes you only have to set max_workers = 20 in the config and you will obtain the same result. At the end, after 1 month and a half asking for the correct analysis, what I see is that the "current version warm start" is "your version cold start", Which make sense, and it was what I suggested in the first message I wrote in your first PR.

Finally, we must also mention that your backend only takes into account homogeneous clusters. If there is a cluster with nodes of different sizes, your backend does not adapt to it, which our alternative backend does.

I agree with this, I think this is one the main contributions, along with cold-start improvement. But After 1 month and a half i still don't see this in the docs. I still don't see in the docs what is the scope of this PR and who should use it, You should include all the responses you gave into this thread to the docs to let people understand in what circumstances this is better or not. Moreover, you should differentiate between invocation time and execution time, since execution time, using the exactly same resources, is not improved at all, but what you improve is invocation time. I still see the docs full of non-sense and confusing sentences.

For these points, we ask you again to accept our contribution to the Lithops code and everyone, including users and researchers, will benefit from this implementation.

The code quality must be improved a lot. I see a lot of spaghetti code, and code that is almost impossible to understand, with comments that explain nothing. If someone needs to use and test it, It does not need to be in the master branch. Anyone can install this version using python3.X -m pip install -U git+https://github.com/macarronesc/lithops.git@master

macarronesc commented 11 months ago

I hope that now all the changes meet your expectations. All requested changes have been made, along with other restructurings and optimizations to make the code more readable and understandable. And changed the documentation to better explain what the proposed changes are.

I have performed multiple tests in different scenarios and configurations to make sure that all the code works correctly and as desired in both versions of k8s.

If you want to make any changes to the code, I will be willing to make them immediately.

macarronesc commented 11 months ago

Thanks for the comments, some of them were really necessary to launch a product into production.

I have made all the changes and improved other parts of the code additionally.

Do not load the config here again. The k8s config is already loaded in self.k8s_config

I see too much repeated code here. make it simple As far as I understood, this rabbitmq implementation uses 1 pod per node. based on this code it seems it can start more than 1 pod per node. Why? is this correct?

I think this is the most stable and performant version since I started this PR. I wait your answer. Any suggestions or additional changes, I will be happy to release new updates.

JosepSampe commented 11 months ago

In this case, I have improved and optimized the existing code, but I still need to read the full configuration since k8s_config only returns you to the Kubernetes configuration. It does not return the amqp_url on de monitoring config.

You can edit the config.py file and add something like this in the load_config() function:

if self.k8s_config.get('rabbitmq_executor', False):
   config_data['k8s']['amqp_url'] = config_data['rabbitmq']['amqp_url']

This way you won't need to read the config multiple times, since the amqp_url will be in the self.config in k8s.py

macarronesc commented 11 months ago

All requested changes have been implemented to make the code more efficient and faster (unused imports, loading Lithops configuration, etc.).

What is the reason of this change? is this necessary for you? can you revert it if it is not necessary?

I changed it with the idea that this would be a better way to generate the meta runtime with the runtime_memory requested by the user, but it has already been reverted to the original version.

Can you explain what range_start, range_end, total_executions, and bases_executions are?

Range_start and range_end define the range of IDs assigned to each pod. For example, in a cluster of 5 nodes with 500 CPUs, Pod 1 would have IDs 0-99, Pod 2 would have IDs 100-199, and so on. Total_executions represent the total number of executions that only this pod needs to perform, and bases_executions indicate how many times it needs to execute its full capacity (if it has 100 CPUs and total_executions=205, then bases_executions=2. This means it has to execute 100*2 + 5 functions).

As always, I hope these changes meet your expectations and are perfect for accepting the PR. I'll be happy to make any additional changes if needed.

JosepSampe commented 11 months ago

Range_start and range_end define the range of IDs assigned to each pod. For example, in a cluster of 5 nodes with 500 CPUs, Pod 1 would have IDs 0-99, Pod 2 would have IDs 100-199, and so on. Total_executions represent the total number of executions that only this pod needs to perform, and bases_executions indicate how many times it needs to execute its full capacity (if it has 100 CPUs and total_executions=205, then bases_executions=2. This means it has to execute 100*2 + 5 functions).

So when the pod is started and receives a job, it sends a message to RabbitMQ_utils to get the range (through _assign_id()). This message is sent once per pod per job, right? Then you need to recalculate the total executions in calculate_executions().

Not sure If I'm missing something, but I see a lot of complexity here for doing this specific task. If I'm not wrong, I see that some code and the complexity of this can be sustainability reduced by doing this in a different way.

As I can see in the code, you can know in advance, before creating any pod, the total number of pods you are going to start and the total number of cpus per pod you are gonna use. So what if you send a pod_id and a dict containing the cluster_info when you create the pod in the payload of the _create_pod method in k8s.py, and then in callback_run_job() you simply calculate the correct range based on this pod_id, cluster_info, and payload['total_calls']? This way you can substantially reduce the code, because you won't need the RabbitMQ_utils file, you can reduce the total queues needed in rabbitMQ, you can also reduce code in the entry_point, and everything will run faster.

In the entry_point.py you could replace the function calculate_executions() to calculate_range(), and do something like this:

import math

def calculate_range(this_pod_id, cluster_info, total_calls): 
    total_cpus = sum(cluster_info.values())
    tasks_per_pod = {pod_id: math.ceil(cluster_info[pod_id] / total_cpus * total_calls) for pod_id in cluster_info}

    ranges = {}
    last_range_end = {}

    for pod_id in cluster_info:
        range_start = last_range_end.get(pod_id - 1, 0)
        range_end = range_start + tasks_per_pod[pod_id]
        last_range_end[pod_id] = range_end
        ranges[pod_id] = (range_start, min(total_calls, range_end))

    return ranges[this_pod_id]

def callback_run_jobs(ch, method, properties, body):
    ....
    this_pod_id = 2  # received from _create_pod() payload
    cluster_info = {0: 70, 1: 30, 2: 20}  # {pod_id: cpu_count} received from _create_pod() payload
    total_calls = 132  # from payload['total_calls']

    this_range = calculate_range(this_pod_id, cluster_info, total_calls)
    print(this_range)  # this is the correct range for this pod

This calculate_range function assigns calls proportionally to each pod, which I think is good if total_calls >= total_cpus. However all the pods will receive tasks when total_calls < total_cpus. In any case , you can change the logic and do it in different way if you prefer, I only put this as an example.

macarronesc commented 11 months ago

I have made several changes implementing the new way of assigning the ids to each of the pods. This way is faster and more efficient, making the code simpler and more readable.

I have tried to leave the code as good as possible with all the requested changes. I hope it meets Lithops standards and is good enough to add to production.

K8s tests have been carried out with and without the alternative configuration in multiple scenarios to try to make the most robust and efficient version to date.

As for calculate_executions(), it is a process that is still necessary due to how the system is designed. The entire id assignment process has been eliminated (avoiding the use of 2 queues per worker) by sending the necessary information in the payload itself.

JosepSampe commented 11 months ago

Looks better, thanks, however the idea I had in mind was even to make it simpler: if you can calculate the correct range_start and range_end in the entry_point, then you can replace the entire code in callback_run_job() to something like this (since lithops already implements multiprocessing inside the function_handler()):

def callback_run_job():
    range_start, range_end= calculate_range(this_pod_id, cluster_info, total_calls)
    dbr = [data_byte_ranges[int(call_id)] for call_id in call_ids[range_start:range_end]]
    payload['call_ids'] = call_ids[range_start:range_end]
    payload['data_byte_ranges'] = dbr
    payload['worker_processes'] = pod_cpus
    function_handler(payload)

In any case, I can see in your recent experiments that the overhead of having an extra queue is practically null or negligible. So, wouldn't it make more sense to follow a 'work queue' approach for implementing this? Instead of statically calculating executions for each worker-pod, which is prone to failure, since if one worker-pod fails, the entire job will fail. With a work queue, if one worker-pod fails, the other worker-pods will process the tasks anyway, and the job will always finish.

Have you considered this approach?

macarronesc commented 11 months ago

I understand your idea, I could try to implement it to make the code even simpler. If it is something necessary for the approval of the code, I will get to it immediately. Even so, I believe that the code is fast and efficient enough in the execution of the functions, and it could be approved in this version and include these changes in new commits.

As for the job queue, it is an idea that we have had in the group for quite some time and we want to experiment in this way once this code is in production. We want to investigate what type of architecture is faster, if one with the ids already defined in advance or another where there are no ids and jobs are taken from a queue. But this idea is something we have to investigate and is still on our TODO list.

JosepSampe commented 11 months ago

I think that, as you are already using a RabbitMQ instance, the work queue approach is the correct way to go, as it offers a lot of benefits, such as:

And I think the work queue approach is easy to implement, and quickly to experiment with. What comes right now into my mind is that you only need a queue to store individual call_ids, with something like this in k8s.py:

def invoke():
    ....

    job_queue = job_payload['job_key']
    self.channel.queue_declare(queue=job_queue)
    for call_id in job_payload['call_ids']:
        self.channel.basic_publish(exchange='', routing_key=job_queue, body=call_id)

   # Send the payload when the call_ids are already in the job_queue
   self.channel.basic_publish(exchange='lithops', routing_key='', body=json.dumps(job_payload))

then in the callback_run_jobs(), you can start a pool of processes matching the pod_cpus

def callback_run_jobs(ch, method, properties, body):
    payload = json.loads(body)

    job_queue = payload['job_key']

    processes = []
    for _ in range(pod_cpus):
        process = multiprocessing.Process(target=call_id_consumer, args=(job_queue,))
        process.start()
        processes.append(process)

    # Wait for all processes to finish
    for process in processes:
        process.join()

And consume call_ids with something like:

def call_id_consumer(job_queue):
    while True:

        method_frame, header_frame, body = channel.basic_get(queue=job_queue)

        if method_frame:
            run_job_k8s_rabbitmq(body)  # body is the call_id
            channel.basic_ack(method_frame.delivery_tag)
        else:
            break

then you have to adapt the run_job_k8s_rabbitmq() function to only read the call_id.

If this approach performs correctly, we can go straight away with this and accept it, since I have numerous use cases that can take benefit of it. What happens with the current approach is that it "works", but lacks of essential characteristics such as minimum fault tolerance and resilience, making it complex for my (and others) use cases to adopt, as these are minimum requirements for any computing platform

macarronesc commented 10 months ago

After a few days of implementation and testing, the work queue is definitely the ideal version for the rabbitmq architecture.

It is much faster, simpler and easier to understand throughout the process.

Thank you very much for your suggestion, it has made the code much better. After 2 months of changes the code has changed a lot for the better and it is thanks to your comments.

I think this is a good version. I hope this architecture is ideal and you think it is ready to be launched into production. Any additional changes or improvements, I'll be happy to work on it.

As always, it has been tested in multiple scenarios and use cases and according to the results everything works as expected.

JosepSampe commented 10 months ago

It looks much better now, thanks! Next week I will finish the review and test it

JosepSampe commented 10 months ago

@macarronesc Can you update from master branch and resolve the conflict?

macarronesc commented 10 months ago

Done

JosepSampe commented 10 months ago

How did you sync your fork? the commit Updated to last commit is re-overwriting all the files of the previous commits in master branch, making the correct history of the changes to be wrong. Please delete these lates commits and use git rebase to sync your fork with the upstream/master.

macarronesc commented 10 months ago

Done, sorry I don't have much experience with pull requests and version control across different branches and forks.

I think it's already solved.

macarronesc commented 10 months ago

I reset to commit 406830b0 and then rebase on that commit.

I think it should be correct right now.

Sorry for my inexperience with git commands.

JosepSampe commented 10 months ago

I think you can now remove the last 5 commits in your branch (First create a backup of your branch/code):

Run git rebase -i HEAD~5. This will open the nano editor with something like:

pick c895f6f0 Updated to last commit
pick 407b5c0e Merge branch 'master' into master
pick 88a92cd0 Merge remote-tracking branch 'upstream/master'
pick edf4e119 git reset to 40630b0
pick 2f605b93 Merge branch 'master' of https://github.com/macarronesc/lithops

replace all pick to drop:

drop c895f6f0 Updated to last commit
drop 407b5c0e Merge branch 'master' into master
drop 88a92cd0 Merge remote-tracking branch 'upstream/master'
drop edf4e119 git reset to 40630b0
drop 2f605b93 Merge branch 'master' of https://github.com/macarronesc/lithops

save and exit the nano editor, and finally force-push your branch with git push origin master --force

Finally review yourk8s.py and make sure everything is correct, and create a new commit if necessary to fix it

macarronesc commented 10 months ago

Thank you very much for the help.

I think it should be solved right now. Tell me if everything is correct.

macarronesc commented 10 months ago

I think everything should be perfect now, I have run some tests to verify that all the code was correct after so many version changes and all the tests have been successful.

I hope this version is ready to be released into production. I will be happy to make any modifications if you wish.

GEizaguirre commented 10 months ago

:tada: :tada: :tada: