CERT-Polska / karton

Distributed malware processing framework based on Python, Redis and S3.
https://karton-core.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
390 stars 45 forks source link

Scaling issues in karton - thread #141

Closed alex-ilgayev closed 2 years ago

alex-ilgayev commented 2 years ago

Hey guys, we are using karton system for a while now, and we have reached a certain point where scaling the analysis throughput became a real issue. Each time the amount of total tasks in redis goes up at about >500k (the specific number isn't important, it depends on the environment), the entire system will eventually collapse where it starts with karton-system constantly crashing, karton-dashboard not responding, and from there the road to chaos is pretty short.

Just to clarify our situation - the entire infrastructure is hosted as containers on AWS ECS (all kartons, including karton-system) and we use a pretty strong managed redis server (I don't think its computation power/storage size/connection capacity would be the bottleneck).

I investigated the causes for these crashes and issues we had, and learned a few things I would like to share and have a discussion over it:

  1. The karton-system component is a crucial part of karton framework, and currently is also a single-point-of-failure for the entire system. This is an extremely important point which I will back to it later.
  2. Currently, the karton-system functionality can be scaled-out partially - several instances of it would consume the karton.tasks and karton.operations properly, but the garbage collection wouldn't be scaled efficiently. This is because each garbage collection process will run tasks = self.backend.get_all_tasks() and perform collection over this data, which is shared between other instances of the same karton-system. I think this component must be scaled-out properly.
  3. The root cause for the system to crash on high workload is tasks = self.backend.get_all_tasks() when trying to do garbage collection, or more precisely for task_data in self.redis.mget(tasks) in get_all_tasks(). When the queue of tasks increases, each redis.mget(<all_tasks>) becomes more computation-intensive, and slows down the karton operations handling. This causes the queues to increase even more, and eventually, karton-system will be going from gc call to another gc call. In the end, in our case, the karton-system is being killed by the OS due to out-of-memory, but even if it lives, it isn't operational.
  4. In the short term, I would reimplement get_all_tasks() to return a generator instead of a list, and each time query a chunk of tasks (10k tasks for example). This would solve some of the stability issues (including out-of-memory issues) - It is quite trivial, but I can explain if needed.
  5. In the long term, we can still meet situations where we starve karton operations because of constant garbage collections. I think we need to find ways to prioritize it somehow or make the gc a more "lightweight" operation (maybe a redis list of gc actions ? or part of karton.operations list ?)
  6. karton-dashboard - we find this tool extremely valuable to monitor the workloads. Unfortunately, he becomes non-responsive with high queues due to the same get_all_tasks() call on each GET request. the general usage of karton-dashboard for us is understanding the size of the queues and canceling/reproducing crushed tasks, so we rarely look at tasks content. I would be glad to propose improvements to its responsiveness through PR if you don't have any plans to do so.

Thanks again for all the efforts you make into this system, we managed to achieve great things using it so far. @psrok1 @nazywam

msm-code commented 2 years ago

First off, congratulation on the volume of your samples/tasks! Wish we had this problems at CERT.PL when I was there.

I think you raise two separate problems here:

Dashboard first. It's probably a good idea to rewrite it to be more lightweight (I remember it also used to crash sometimes, when other kartons stopped and too many tasks piled up). It's probably a good issue to raise with CERT.PL guys. Or, if you provide a patch, I think they'll be happy to merge.

By the way, have you seen the /varz endpoint? If you mostly use the dashboard for checking sizes of queues you may like the metrics (/varz exports metrics in a format that Grafana can understand and draw some pretty cool graphs). (Maybe someone from CERT.PL can paste a screenshot here to show what I mean).

Karton-system is the harder one. To be fair, we knew about the GC bottleneck from the start. It was implemented as is because 1) it was "good enough" for that time and 2) it was still 2x improvement over what we used to have (tasks with hardcoded timeout).

Thanks for digging into the issues and debugging them yourself! I have a question about:

In the short term, I would reimplement get_all_tasks() to return a generator instead of a list, and each time query a chunk of tasks (10k tasks for example). This would solve some of the stability issues (including out-of-memory issues) - It is quite trivial, but I can explain if needed.

Are you sure it will work? I might be wrong, but as far as I remember, the problem with GC is what we have right now is similar to mark&sweep. I know you saw this, but pseudocode for reference:

resources_to_remove = db.all_objects()  # by default remove everything
for task in db.get_all_tasks():
    for resource in task:
        resources_to_remove.remove(resource)  # don't remove this resource - it's referenced by a task
db.remove(resources_to_remove)  # whatever left must be unreferenced

In other words the GC removes everything by default, and only keeps resources that have at least one reference. If you only processed, let's say, half of the DB tasks, you would remove all resources referenced only by the other half.

I'm afraid the proper solution will be much harder. But I'm happy to be proven wrong!

In the long term, we can still meet situations where we starve karton operations because of constant garbage collections. I think we need to find ways to prioritize it somehow or make the gc a more "lightweight" operation (maybe a redis list of gc actions ? or part of karton.operations list ?)

Definitely. This sounds like an interesting problem, and one I didn't have an excuse to work on in CERT. I'm also quite proficient with the karton-system codebase. If there's interest from maintainers, I think I can help with this improvement.

@psrok1 @nazywam your turn.

psrok1 commented 2 years ago

Well yes, scalability problems are caused by current design of GC that was easy enough to be used for CERT.pl scale.

I remember I have somewhere drafts of more scalable design that uses resource reference counting instead of mark&sweep (that usually requires Stop the World which must be signalized to other message router vel karton-system replicas). After Christmas, I will try to find that draft somewhere and will put it here.

In addition, current karton-system implementation itself can be optimized because we almost never use pipelining (https://redis.io/topics/pipelining). But I'm not sure if it give much better results than allowing karton-system to be horizontally scaled.

As a part of new design, we need to find a way to smoothly upgrade whole production to the new, scalable version e.g. by running v4 karton-system to route the old messages and scalable v5 that provides some bridge for old tasks.

I also propose to move Karton API to some HTTP/Websocket interface to not operate directly on Redis/Minio. We can provide something like karton-gateway that will provide API for producers and consumers, or integrate that interface with new karton-system. It would give us more flexibility as we can treat Redis representation as internal and break things at this level without being afraid of backwards compatibility.

Summarizing:

I would love to cooperate on this :+1:

msm-code commented 2 years ago

I remember I have somewhere drafts of more scalable design that uses resource reference counting instead of mark&sweep (that usually requires Stop the World which must be signalized to other message router vel karton-system replicas).

+1 for refcounting. This is the hard way that I've mentioned ;).

In addition, current karton-system implementation itself can be optimized because we almost never use pipelining (https://redis.io/topics/pipelining). But I'm not sure if it give much better results than allowing karton-system to be horizontally scaled.

Right, forgot about it. This sounds like a reasonably easy win, and nothing says we can't do both. It probably won't speed up the GC much though, just about everything else.

I also propose to move Karton API to some HTTP/Websocket interface to not operate directly on Redis/Minio. We can provide something like karton-gateway that will provide API for producers and consumers, or integrate that interface with new karton-system. It would give us more flexibility as we can treat Redis representation as internal and break things at this level without being afraid of backwards compatibility.

That's a good idea, but it solves another problem (exposing karton for external users, locking down permissions, and being able to handle change more easily). Let's stay focused here :P. Or is it also directly helpful with improving performance?

alex-ilgayev commented 2 years ago

Thanks for the response guys, glad this topic is in your minds.

@psrok1 I totally agree with all the points you mentioned. Few notes

@msm-code you mentioned good points:

Dashboard first. It's probably a good idea to rewrite it to be more lightweight (I remember it also used to crash sometimes, when other kartons stopped and too many tasks piled up). It's probably a good issue to raise with CERT.PL guys. Or, if you provide a patch, I think they'll be happy to merge.

I think the crash you mentioned is the same issue I mentioned - with high amount of tasks each call to get_all_tasks() becomes heavier. I think most of the functionality karton-dashboard is doing can be achieved without this call. I didn't went deep into it, but I can try.

By the way, have you seen the /varz endpoint? If you mostly use the dashboard for checking sizes of queues you may like the metrics (/varz exports metrics in a format that Grafana can understand and draw some pretty cool graphs). (Maybe someone from CERT.PL can paste a screenshot here to show what I mean).

Ye, I'm familiar with that. At the moment we don't use it, but in our roadmap we want to integrate it with a Grafana.

In other words the GC removes everything by default, and only keeps resources that have at least one reference. If you only processed, let's say, half of the DB tasks, you would remove all resources referenced only by the other half.

Ye you are right, I misunderstood the algorithm.

ITAYC0HEN commented 2 years ago

So maybe with few adjustments - like controlling and increasing the GC time, or separating the karton-system functionality into two - tasks router, and GC which could be run as two separate containers, it could solve it for now (until we will REALLY need the scalable karton-system). So maybe before jumping into designing and implementing the long-term overhaul of the GC, maybe we can patch it

I second this. At least this will allow us to continue and stream more feeds to karton. Now we are blocked. This will give the users some air

msm-code commented 2 years ago

Now is a very slow time in Poland (because of Christmas, and soon a New Year's Eve, most people are out of office), so things are moving at glacial pace.

controlling and increasing the GC time

Sounds fine as a workaround, but not sure if that's a good long-term solution. Correct me if I'm wrong, but if GC can't keep up with the tasks this will only buy you hours before everything blows up?

It can be made a command line parameter, but I'm trying to understand how it fixes the GC throughput problem. Or maybe if the default GC delay is too short, maybe it should be increased for everyone?

Also, maybe you can test this locally and confirm that it helps? I can send you a patch if you prefer.

separating the karton-system functionality into two - tasks router, and GC which could be run as two separate containers, it could solve it for now (until we will REALLY need the scalable karton-system

Out of the top of my head, I'm not sure it's possible and easy. AFAIR GC is not thread safe, so GC will have to stop router thread anyway. At least that's true for task collection, resource gc, should be safe to do concurrently. Task collection probably can also be made thread-safe with some careful (re)design.


So what I understand is:

I'm not convinced this will help, but I'm happy to help. How would you approach reproducing your problem? Should I run a karton-system in one container, and add a dummy karton consumer (with sleep(1)) and producer to simulate a high load?

Also I think it's a good idea to consider improving the overall communication performance - there are a lot of unnecessary small redis calls, that could be batched or avoided altogether. If this improves performance more than 2x, this may give us even more time than splitting router and gc

Finally, I have no control over this project, so I'd like one of the maintainers to confirm that these ideas are OK to work on.

alex-ilgayev commented 2 years ago

Thanks for the commitment. Breaks, vacation, and family is more important, so take your time :)

Correct me if I'm wrong, but if GC can't keep up with the tasks this will only buy you hours before everything blows up?

Now I may be wrong here, but delaying the GC means you'll "lose" potential storage in both the minio+redis (tasks/resources). We can live with "losing" this storage for the sake of kartons routing to keep flowing. But it is a very small change and I cannot confirm yet that it will have substantial results.

Or maybe if the default GC delay is too short, maybe it should be increased for everyone?

Could be, we need to test it though.

I'm not convinced this will help, but I'm happy to help. How would you approach reproducing your problem? Should I run a karton-system in one container, and add a dummy karton consumer (with sleep(1)) and producer to simulate a high load?

This is a pretty complex thing to reproduce I think. I am planning to build one for myself to test if GC time could affect and improve the throughput. I would go for 3 kartons here - one "classifier" which will receive the produced tasks, and he will produce into two kartons, one with sleep(1), and one which will finish instantly (or sleep(0.1) or such). Maybe more is needed. IMO two kartons won't be enough because we need to simulate more operations in the entire system and more need for garbage collection.

Also I think it's a good idea to consider improving the overall communication performance - there are a lot of unnecessary small redis calls, that could be batched or avoided altogether. If this improves performance more than 2x, this may give us even more time than splitting router and GC

Ye, I agree that would be awesome. But as long as we got a single call that potentially should give a few GB of data in a single query, I don't think it will still solve the issue.

EDIT: After thinking more about it, I think your setup for reproducing the stress test (one consumer and one producer) will work. Measuring the stability would be more of a challenge. I'll try to get a bit deeper into this in this week.

alex-ilgayev commented 2 years ago

Hey guys

I made a stress test on our staging environment in the last few hours and had a few insights regarding the scaling issue

The setup:

My criteria for not handling the load would be the sizes of two queues: karton.tasks and karton.operations, and I explain - even if the individual kartons contain enormous queues, I do expect karton.tasks and karton.operations to be small or empty and remain stable. Of course, these enormous queues will eventually backfire through long GC times or OOM issues for redis.

Observations:

Numbers:

No. Tasks GC operation length Size of karton.operations Redis size (after GC)
100k 63s 124Mb 270Mb
140k 68s 141 Mb 327Mb
196k 85s 160Mb 432Mb
236k 103s 223Mb 542Mb
290k 111s 276Mb 630Mb

Thoughts:

I had several thoughts from this experiment:

Hope it was clear, and would love comment from you guys @msm-code @psrok1 @nazywam

msm-code commented 2 years ago

Wow, thanks for your detailed writeup. You inspired me to set up my own benchmark.

Notes about my setup: https://gist.github.com/msm-code/34701149fb4b567bd11b0e867ba0047b Notes about my benchmark: https://gist.github.com/msm-code/fef1088f59a0391689e510141b460a39 (Both docs are references for future me and maybe other people, but you can safely skip them if you're not interested).

Now I may be wrong here, but delaying the GC means you'll "lose" potential storage in both the minio+redis (tasks/resources). We can live with "losing" this storage for the sake of kartons routing to keep flowing. But it is a very small change and I cannot confirm yet that it will have substantial results.

Minio storage is cheap, so it's not a problem. But Redis is memory-only, so delaying GC too long means that we risk running OOM.

It's actually worth stressing out - if, at any point in time, Redis database grows too large to fit in RAM, everything will die a fiery death. Redis explicitly don't support DBs larger than memory ((...) we want to simply provide the best in-memory database (but persistent on disk as usual) ever, without considering at least for now the support for databases bigger than RAM).

So if you produce more tasks than you can handle, stopping the GC will probably only delay the inevitable. But I'm not sure, maybe in some workloads running GC a bit less often will improve performance.

Could be, we need to test it though.

SGTM. I've added a --gc-interval flag to my PR (more about that later).

even if the individual kartons contain enormous queues, I do expect karton.tasks and karton.operations to be small or empty and remain stable

Sounds reasonable (I think). I've confirmed that with high enough task creation rate it's possible to overwhelm karton router (I didn't address this in my PR yet).

On the one hand, I confirmed (through the numbers below) that on a high amount of tasks, GC processing becomes problematic, on the other hand, I discovered that scaling the processing of the tasks (by consuming karton.operations) is more crucial than I thought, and even a pretty strong server running only karton-system can't withstand decent load which we aim to achieve on production environment.

Yeah, looks like you're right. I didn't address the routing performance yet, but we definitely should. Just one note, that improving karton.tasks routing preformance will help with karton.operations too, since the system will have more time for it. I say that because it may be an easier target to start.

I guess that you had reasons for prioritizing karton.task over karton.operations. In my reasoning, karton.tasks is our new unrouted tasks, and I would prefer the current operations to be over before introducing any new tasks into the system. Preferring the karton.tasks is blocking future processing, reducing potential queues, and potential GC.

We did! I didn't actually remember them, but I did remember that we left a comment about that:

# Order does matter! task dispatching must be before
# karton.operations to avoid races Timeout must be shorter than GC_INTERVAL,
# but not too long allowing graceful shutdown

Ok, that's not as helpful as I had hoped. But I now remember, that this was a very delicate problem, and there is a subtle reason why the order have to be as it is (operations for a given task must be processed after the task. One easy way to ensure it is to process operations only when there are no pending tasks). In fact I've confirmed, that switching the branches will cause "hanging" tasks - tasks stuck in state started that are not being worked on.

It would be nice to remove this restriction, but it may need some careful coding.

If we keep the GC and routing as the piece of code, we definitely need to control the interval to be bigger than 3 minutes.

This is actually easy to split. I've added flags --disable-router and --disable-gc in my PR.

Optional - I wondered if it is possible to create a more minimalistic karton.operations list without the whole task. In the end, we are updating only the status and timestamp, but saving the complete task for it. Could save some potential redis memory.

Interesting idea! Sounds doable, on the other hand it would make the operation a bit slower (because we will need to get current task state from redis to update it, right?)

But there's another thing I don't understand:

            elif queue == "karton.operations":
                operation_body = json.loads(body)
                task = Task.unserialize(operation_body["task"])
                new_status = TaskState(operation_body["status"])
                if task.status != new_status:
                    # actual code

Let me get this straight: we produce "operations" with task.status == new_status, and then we do nothing with them. How does it make sense :thinking:. Unless I'm totally missing something, we can just not send these tasks:

    def set_task_status(
        self, task: Task, status: TaskState, consumer: Optional[str] = None
    ) -> None:
        if task.status == status:
            return
        # ...

Maybe someone can confirm.

Added this idea to my PR. This slashed the number of karton-operations in my test from 700 to 400 - almost 50% improvement.

This experiment has shown that we must have some method for scalable routing. The separation of karton-system into GC and routing is just a method to both be able to scale it, and stop the starving because of GC, but I'm sure there are more options to solve it.

For now I've implemented the separation. But it's still not possible to run more than one router. (It should be safe to run two GCs, but it's also pointless and they will just waste resources).

It's getting a bit late, so that's all from me today. Maybe @nazywam will take a look in his timezone when he wakes up. Sorry if that was a bit chaotic.

My PR is https://github.com/CERT-Polska/karton/pull/144. It is in a dire need of refactor, but it should work. If you could retry your benchmarks with it it would be great (it can take some time before CERT.PL merges it, or even decides if they want all these flags).

alex-ilgayev commented 2 years ago

Great work @msm-code !

So far I made a benchmark for the router performance, and it yielded significant improvement! I didn't diffed them, but are there performance changes between #144 and #145 ?

From my tests so far: For 70 tasks/s

for 130 tasks/s

I need to get a stronger machine for more tasks/s 😓 Am I wrong here with the stats?

Thanks for the GC features. I'll test it soon as well.

msm-code commented 2 years ago

I didn't diffed them, but are there performance changes between #144 and #145 ?

Yes, #144 is focused on GC improvements (it indirectly speeds up routing too, but it's not the main feature), and #145 speeds up routing (not everything that could be done has been done, but the improvements should be significant)

FWIW for my setup current implementation struggled with 60tasks/second, and I was able to get as much as 250 tasks/second with #145. But my setup is a bit different, and redis is on localhost (so latency is probably lower than in your case).

You might also want to test two instances, one with --disable-router --gc-interval=xxx and one with --disable-gc

Am I wrong here with the stats?

You're results are in the correct ballpark, unfortunately

Overall the biggest problem and bottleneck is db latency. Network and Redis should handle the load easily, but given that we have a single router, even if we do only a few redis requests per routed task we hit the wall soon. The bigger ping to the redis server, the worse that bottleneck is.

Ideally we would run multiple routers in parallel. Unfortunately it's not possible yet, but @psrok1 is thinking about removing karton.operations queue to solve this (this is a breaking change and would require karton 5.0 though).


Alternatively I just realised that it's possible to run more "karton.task routers", and this could solve our problem (karton.operations handling is faster). If this works, this is a very simple solution that could work great. I'll investigate and update this ticket.

So instead of one karton-system you would start 2+n kartons:

alex-ilgayev commented 2 years ago

FWIW for my setup current implementation struggled with 60tasks/second, and I was able to get as much as 250 tasks/second with #145. But my setup is a bit different, and redis is on localhost (so latency is probably lower than in your case).

Ye you are right, I created a more production similar setup, but still >130 tasks/s is a great throughput. From ours perspective, the throughput we managed to achieve (together with gc seperation) is more than enough for us to expand our limits until the design refactor that will come later. Of course this applies if CERT-PL will approve these changes.

psrok1 commented 2 years ago

Unfortunately it's not possible yet, but @psrok1 is thinking about removing karton.operations queue to solve this

After an analysis I think that karton.operations is not needed. Task ownership is already guaranteed and that deferred task status change only complicates things. But we can still handle it in karton-system to keep things backwards compatible.

I have tried to make routing as lightweight as possible and introduce pipelined operations in routing in https://github.com/CERT-Polska/karton/pull/146. It's not merged yet with @msm-code optimizations tho.

msm-code commented 2 years ago

Nice, I see we're running all hands here.

I've looked at your PR and left some comments there - looks good! And the changes are mostly ortogonal to my changes. I especially like extraction of queue unwinding from tight routing loop to GC.

To save you and @nazywam some trouble with merging my changes, I'll list meaningful improvements and how important they are. Warning, this post got a bit long:

  1. A lot of debug prints, ad-hoc timers, etc

Can all safely be deleted. Sorry for leaving them, I was in a hurry.

  1. New command-line flags to split router and gc

Feature request was to run a "router-only" and "gc-only" karton-system. I think this makes sense. I've used --disable-gc and --disable-router flag names. The idea was that if I call the flags "--router-only" and "--gc-only" and in the future karton-system will get another responsibility the semantic will get unclear. But I'm not particularly attached to these names

  1. New command-line flag to control GC interval

Pretty easy change. It's probably not critical though. Actually in my test I've just ran GC with --gc-interval=1 - a bit pointless, but the point is that it doesn't affect performance since it's not a bottleneck anymore.

You might want to consider if you want this or not (it's only a few lines of code, so the cost is low).

  1. Batch delete resources in gc_collect_resources

Huge performance win, strongly recommend doing that: https://github.com/msm-code/karton/blob/7857c2d7de11352797e14c9e8a2f17715e9328da/karton/system/system.py#L45

At this point the only slow part of gc_collect_resources is get_all_tasks, and 70% of time is spent on... task deserialisation. Also if RAM becomes a problem, but you might want to consider changing this into iterator (so not all deserialised tasks are in memory at once). But it's probably a small win, compared to other memory hogs.

  1. Batch delete and metrics in gc_collect_tasks

Another huge performance win, definitely worth doing: https://github.com/msm-code/karton/blob/7857c2d7de11352797e14c9e8a2f17715e9328da/karton/system/system.py#L76

  1. Batch handling of karton.operations

You're planning to deprecate karton.operations, but this is a relatively easy change that brings large benefits. The idea is to pop up to 1000 operations (random number I've conjured, may be increased or decreased I guess) every time, and handle them all at once. Then it deserialises them all at once, and registers in batch:

        self.backend.register_tasks(tasks)

Speed up is immense, so probably worth to do this for legacy users (but the code is a bit more cludgy, so YMMV).

  1. Batch handling of karton.tasks

Similar idea to yours, but taken a step further. Actually I didn't implement nice pipelining like you (I thought about doing it, but have chosen the easier route). So again, we pop up to 100 tasks, and do some operations in batch (like get_tasks) or only once (like get_binds).

Actually with your nice API you can take this one step further, and do all process_tasks with one redis pipeline which you then execute. This should speed everything up nicely, and still look clean (unlike some of my changes).

  1. Force order change when tasks are running low

This one is tricky and easy to miss. I've introduced a variable called self.forceops. Why?

Since all operations are batched, it means that handling 100 tasks at once is much faster than handling 10 tasks 10 times. So ideally when there are only 10 tasks left but many operations pending, we'd like to work on karton.operations queue instead. But the way current code works, it will only pop operation if there are zero tasks in the queue. And since @alex-ilgayev plans to produce 60 tasks per second, this may never happen, and the code will never pop from operations queue.

So the fix is to force operations queue when the previous tasks operation consumed the whole queue. I think thought this is safe, since at that point there are no tasks left in queue == all operations can be processed safely. Ok, this is not really safe, I see a race here. Well there goes my idea - maybe there is another solution of this problem.

Now this only happens at pretty high task volumes, so you can also ignore this fix. Under lower loads this is not a problem, especially since you plan to depreciate karton.operations anyway.

nazywam commented 2 years ago

:tada: