[Proposal] New Scheduling algorithm #3621

style95 commented 6 years ago

This is to get more people involved in the discussion about this proposal: https://lists.apache.org/list.html?dev@openwhisk.apache.org:lte=1M:New%20scheduling%20algorithm%20proposal.

That addresses few performance issues and proposes new scheduling algorithm. Let me copy the contents of the email.

I already did some prototyping and it seems work well. Once it is elaborated through discussion, I will open a pull request as well.

Main issue is slow performance of Docker. I observed about 130 ~ 425ms to pause/resume and 700 ~ 1300ms to remove/create containers. Let me share how Docker aggravates the situation. ​

1. Interventions of actions.

​ Currently HomeInvoker which will handle the invocation of the given action is decided based on Hash. Let's assume there are 2 actions(A, B) with same HomeInvoker. Though there are more resources in other invokers, request for these two actions will be sent to same invoker. ​ Under the situation invokers' remaining slot is not enough, if two requests come alternatively, then each action will try to remove warmed container of the other. For example, if execution of action A is over, a request for action B comes, and if busyPool + freePool = maxPoolSize, then it will try to remove warmed container for action A. After then, execution of action B is over, and a request for action A comes, then it will try to remove warmed container for action A. As mentioned above, container deletion/creation takes about 700 ~ 1300ms, performance becomes poor, and if there is no PreWarm container at that time for some reason, it degrades performance more. ​ If they were scheduled to different invokers, container reuse could be maximized, but currently HomeInvoker is only decided based on Hash function, this can degrade the performance though there are enough resources in other invokers. ​

2. Does not wait for previous run.

​ Let's assume HomeInvoker of action A is already saturated(there are already many running containers), so new request is sent to an invoker other than HomeInvoker. But at that time, if PreWarm container is not available(This could easily happen because we cannot increase the number of PreWarm container to huge value. For example, lets assume we had 2 PreWarm containers, and they ware taken by 2 subsequent requests. At this time, if new request comes, it cannot take PreWarm container until container creation is done. Container creation takes upto 1300ms, during that time, all new requests will trigger ColdStart.), ColdStart begins. Surely it increases the execution time(at least 700ms more). This describes a worst case, but as I mentioned above, ColdStart quite frequently happen under heavy loads. ​ If execution time of action is lesser than 700ms, it would be better to wait for completion of previous run. For example if execution time is 20ms, then currently, it could take upto 720ms in worst case. But if it waits for the previous run, it will take about 40ms(20ms + 20ms). It is 18 times more execution time(720ms vs 40ms). ​ Actually, action can run for at most 5 minutes, if execution time is longer than 700ms ~ 1300ms, it does not matter. However, all actions whose execution time is shorter than 700ms can suffer from this. ​

3. Invoker coordinates all requests.

​ Currently invokers receive all messages via invokerN topic. So if any request failed or rescheduled for some reasons, it affects subsequent requests processing. ​ Surely invokers process all requests concurrently, and there is no blocking logic. But because one invoker coordinates all messages, we could not take advantage of parallel processing. Partition number of invokerN topic is 1, only 1 consumer can read messages at the same time and invoker processes messages in serial order. Also, though only one kind of action requests come, invokers always proceeds many logics such as, checking free pool, taking PreWarm containers or triggering ColdStart and so on. As an invoker manages more and more containers, performance becomes poorer. ​ ​

4. Not able to accurately control concurrent invocation.

​ Currently ConcurrentInvocations value for a namespace is increased when Activation message is sent, and decreased when Completion message is received. Unless Completion message is received, controllers recognize execution is not over though actual invocation is already finished. So there are differences between real concurrent invocations and limit value. When I did benchmark with setting ConcurrentInvocations as 5, I could only observe 3 concurrent containers. So actually, there are only 3 running containers, new requests are rejected with 429 Too many requests. ​ ​

5. TPS is not deterministic.

​ For above reasons, performance of the system becomes non-deterministic. If actions interfere each others because of same HomeInvoker, TPS(throughput) dropped. If there is no intervention, TPS increased. ​ And TPS is highly dependent on container creation/deletion. Once ColdStart is triggered, execution time increases. Since this can affect subsequent request processing, TPS dropped. ​ These things make TPS not proportional to the number of invokers. ​ When I did benchmark with 1 action and 100 virtual users, I got 20,000TPS. But when I did same benchmark with 10 different actions, I only got 6,000 TPS, with 100 actions, I got only 30 TPS.

​( I used noopThroughput action which is being used in performance repo to create all actions.)

Though I fix the number of actions to 10, if I increase the number of virtual users, TPS dropped again. ​

So we cannot calculate or estimate official TPS for system. It is highly dependent on the number of actions and the number of users.

Under this situation, it's not easy to say what the official TPS of our system is.

Ideally, it would be great if it works like this:

If we can arithmetically calculate TPS like this, we can easily meet target TPS by just adding more invokers and it makes easy to do resource planning.

So let me recap the issues.

  1. Intervention happens among actions because of hash based scheduling.
  2. It does not wait for the completion of previous run, so it should wait for ColdStart in worst case.
  3. Invoker coordinates all requests, so one problematic request affects subsequent requests, and no parallelism applied.
  4. Cannot get fine-grained/accurate control over concurrent execution.
  5. TPS is highly dependent on docker command(container creation/deletion).
  6. TPS is not deterministic.

Main issues here are, controller should do location-aware scheduling to maximize TPS and slow Docker command is not considered. Currently container reuse is critical performance factor. So requests for same action should be sent to the same invoker to maximize reuse of warmed containers. However, resources(size of container pool) are finite and there could be many actions with same HomeInvoker. Container creation/deletion frequently happen and it degrades performance. If controller can send messages to any invokers regardless of their HomeInvoker, loads would be evenly distributed and container creation/deletion could be minimized. ​ ​ To resolve these issues, I designed new scheduling algorithm and did prototyping. I could get about 160 times more TPS and 150 times less execution time with 100 actions.

My proposal is as follow:

  1. Each actions has its own Kafka topic(ex: ${namespace}-{name}).
  2. Invoker only receives container creation/deletion message via invokerN topic.
  3. Each ContainerProxy actors reads Kafka messages directly from Kafka with its own topic.
  4. Controller just sends activation messages to action topics, at the same time it sends ContainerCreation message to invokerN, if required.
  5. Controller checks limit for the given action, the number of active consumers and ConsumerLag for the given topic to figure out whether to send ContainerCreation message to invokerN or not. This procedure is independent(asynchronously processed) from sending activation message.
  6. Once invoker recieves ‘ContainerCreation’ message, it checks its busy pool size, check limit and the number of consumers for the given topic, and create new container. Since invoker server does not coordinates requests, this procedure is also independent from activation processing. Container creation/deletion does not affect activation processing.
  7. Now ConcurrentInvocation limit is configured per action based, default is 1.
  8. So there would be only 1 running container for an action by default.
  9. Once container is created, it is not paused or deleted for some times(10s ~ 30s, need to find optimal value).
  10. Now parallelism to execute actions is managed by limit and Kafka partition. If more concurrent containers are required, limit will be increased. Since partition is a unit of parallelism in Kafka, Kafka partition number is also increased to make consumers read messages in parallel. Each containers(ContainerProxy) which shares same topic, form a same ConsumerGroup. So if the limit is 3, there would be 3 concurrent containers(Kafka consumer) for the given action, the number of partitions for the given action topic would be 3 as well. They form a same ‘ConsumerGroup, they can read messages no matter how they are distributed among invokers. 10. Throttler now checks consumer lag for the given topic to decide whether to respond with 429 Too many requests.
  11. If the number of containers(consumers) < limit, controller checks consumer lag to decide whether to create more containers or not. So if one container is enough to handle requests, no more container is created.
  12. If there is no request for some times(10s ~ 30s), the container will be terminated(or paused). ​​ Most important part in my algorithm is, it enables container location-free scheduling.

It has many merits.

1. Controller doesn't need to send requests to the same invoker.

No matter where containers for the given topic are located, how many containers are running, even no container for the topic is running, it does not matter, it can just send activation message to action topic.

2. Invoker server does not involve in activation processing.

Since each ContainerProxy actors directly reads messages from Kafka, there is no invoker server involvement while executing actions.

3. Container creation/deletion does not affect activation processing

Now invoker server only takes care of container creation/deletion. While creating/deleting containers, each ContainerProxy can still read and process activation messages. Container creation/deletion does not affect performance of existing containers.

4. Base TPS for 1 container is increased.

Once container is created, it just reads activation message, process it and sends the response back. There is no other logic involved. So base TPS for 1 container increases. I got about 1,500 TPS with 1 action container.

5. Easy to do resource/TPS planning.

Since container performance is affected by neither container coordination(creation/deletion) nor other containers, TPS becomes more deterministic and it makes easy to meet target TPS by just adding more containers for the action.

6. Loads are evenly distributed.

Controller can send ContainerCreation message to any invokers no matter where existing action containers are located. So controller can just send ContainerCreation message to an invoker with least load. So loads(the number of running containers) are evenly distributed.

7. Same number of action containers with limit is guaranteed.

It also has disadvantages.

  1. Controller should check consumer lag for every requests -> may increase execution time.
  2. Same number of topics with the number of actions are required.
  3. Action container can be reserved for 10s ~ 30s.
  4. the number of partitions should be changed as limit for the given action is changed.

​There might be more side-effects, but let me address above issues first.

1. Checking ConsumerLag may increase execution time.

I performed benchmark to figure out how much time does it take to check ConsumerLag. In my benchmark result, I got about 10,000 TPS with 10 users, and time taken is just 1.07 ms. Since I used only 10 users, TPS is not important here, execution time is more important. Under heavy loads, it could be increased more, but I think it might not be a huge burden.

​2. Many number of topics are required.

If there are about 10K actions, we need 10K Kafka topics. But at some point, the number of active topics will be limited.

For example, if I have 10 invokers with 20 MaxPoolSize, maximum number of concurrent containers are 200. It means, maximum active topics would be 200 as well.

I also performed benchmark against 3 Kafka nodes with different number of topics.

Even with 3 Kafka nodes with default configuration, I could get about 30K TPS with 1K topics, So if the number of active topic increases as the number of invokers increase, I think we can easily handle this with more number of Kafka nodes.

3. Container can be reserved.

​ This could be a problem when size of container pool almost reaches MaxPoolSize. So at maximum loads, requests for new action comes(existing containers will handle requests for existing actions), there will be no space in the pool and requests will be rejected. ​ Surely we can configure reserving time to small enough to prevent this situation, then the container will be removed and new action container will be running. But container deletion/creation takes about 700ms ~ 1300ms. It means, if we configure it as too small value, action execution time could increase. So I think this is a tradeoff between resource utilization and performance guarantee and I am inclined to later one. This only happens under the situation that, resources(concurrent containers) are almost full, and there is no existing container to execute the new request. Then I think this is the right time to scale out the cluster rather than maximize resource utilization with small reserve time to guarantee performance. ​

4. the number of partitions should be changed as limit is changed.

​ This requires data rebalancing in Kafka, so normally it will take much times. But let's think of characteristics of activation message. Once invocation is finished, activation message in Kafka is meaningless. We do not retrieve it more than once, we do not replicate those data in other store. That means, we can limit retention bytes and hour of each action topics. Then rebalancing would not take much time. ​ When I did, it takes lesser than 1s. Since changes on limit or the number of partitions does not happen frequently, I think this is enough to take in.

markusthoemmes commented 6 years ago

As this is a major proposal, let's please keep this on the dev-list and/or decide on the dev-list to pull it into an issue here. Following two tracks will be chaotic.

I'll reopen if the dev-list decides to move this here.

chen116 commented 5 years ago

Hi, Is there any follow up for this proposal?? Thanks!

markusthoemmes commented 5 years ago

@chen116 Please refer to the dev-list for a follow up and discussion on this proposal.

style95 commented 5 years ago


I have been working on the next version of this proposal for the last few months. I am almost at the end of implementation. I will share what I have done and start discussion thread to contribute it.

chen116 commented 5 years ago

cool! looking forward to it

rabbah commented 5 years ago

@style95 that’s great. I’d suggest small incremental PRs where possible to make it feasible and practical to review effectively.

style95 commented 5 years ago

@rabbah Thanks for the suggestion. Ok I will.