certtools / intelmq

IntelMQ is a solution for IT security teams for collecting and processing security feeds using a message queuing protocol.
https://docs.intelmq.org/latest/
GNU Affero General Public License v3.0
963 stars 295 forks source link

Multiprocessing per queue is not supported #186

Open adulau opened 9 years ago

adulau commented 9 years ago

Multiprocessing per queue is not supported. If you have to process a huge dataset and only a single process is handling a queue, you are time bound with DNS resolution or alike. It would be nice to ensure that you can handle multiple processes for handling a single queue.

Rafiot commented 9 years ago

FYI, it is supported on AIL by this module: https://github.com/CIRCL/AIL-framework/blob/master/bin/Helper.py

I will see if it is possible to implement it.

aaronkaplan commented 9 years ago

On May 28, 2015, at 3:52 PM, Alexandre Dulaunoy notifications@github.com wrote:

Multiprocessing per queue is not supported. If you have to process a huge dataset and only a single process is handling a queue, you are time bound with DNS resolution or alike. It would be nice to ensure that you can handle multiple processes for handling a single queue.

— Reply to this email directly or view it on GitHub.

So concerning DNS in particular, we had a discussion on this already. Tomas tried to implement threading here, but it turned out to be a bit awkward and ugly (a.k.a. complex).

So, we decided against this and reverted to processes and/or asynchronous DNS. In fact, I'd like to see something like adns-tools for DNS lookups.

However, right now (at least for us) speed was not the main concern.

Any experience with adns in python?

Rafiot commented 9 years ago

Protip: never use threading in python. Been here, done that, bad idea.

My approach on AIL is to run processes poping values from a set and pushing the output on the next one.

I will dig into the code of intelmq to see if I can use the same library as in AIL.

SYNchroACK commented 9 years ago

The new version that is under testing (https://github.com/certtools/intelmq-beta) is possible to enable 'load_balance' which is in the reality a splitte.

https://github.com/certtools/intelmq-beta/blob/master/intelmq/lib/pipeline.py#L82

What do you think about the idea? Is not multi-thread, is "muti-process".

aaronkaplan commented 9 years ago

On May 28, 2015, at 4:35 PM, Raphaël Vinot notifications@github.com wrote:

Protip: never use threading in python. Been here, done that, bad idea.

ACK :)

My approach on AIL is to run processes poping values from a set and pushing the output on the next one.

I will dig into the code of intelmq to see if I can use the same library as in AIL.

okay. Cool! Thx a lot.

— Reply to this email directly or view it on GitHub.

SYNchroACK commented 9 years ago

Parsers will have the load_balance option equal True, so they will send one event to cymru-1 and second one to cymru-2, and so on...

PS: manager graph will need the have a hierarchy structured to be more understandable.

Visualization of idea:

pic

aaronkaplan commented 9 years ago

I strongly dislike that we create loops in the graph. Even if it is just loops in the underlying undirected graph (in your example, there are no cycles in the DAG - but the underlying undirected one has cycles).

adulau commented 9 years ago

The best is to simplify such approach. In AIL for example, multiple processes can read from the same queue or publisher. That's it. So depending of the resource of the machine you can start one or more processes per queue/publisher.

SYNchroACK commented 9 years ago

@aaronkaplan check the code and propose. Its a little bit hard to get a simple solution like the current one.

aaronkaplan commented 9 years ago

On May 28, 2015, at 5:17 PM, Alexandre Dulaunoy notifications@github.com wrote:

The best is to simplify such approach. In AIL for example, multiple processes can read from the same queue or publisher. That's it. So depending of the resource of the machine you can start one or more processes per queue/publisher.

Agreed. That makes sense.

— Reply to this email directly or view it on GitHub.

Rafiot commented 9 years ago

Here is my model for one queue in AIL module

You can have multiple Queues subscribing to the same publisher.

aaronkaplan commented 9 years ago

+1 pub / sub seems a good way . Are pushes and pops atomic in redis?

adulau commented 9 years ago

Yes.

aaronkaplan commented 9 years ago

Let me maybe elaborate why I think the solution by @Rafiot seems better for me: because you can't make loops. Loops can bite us a lot if we are not careful to mark all the data which has already been processed. So what I do not yet understand in @Rafiot 's picture: if the data goes into the Set , is the pop atomic as well even for a whole message/event (seems like yes, because a list is a certain type of set (note: I am not a Redis guru))? Secondly, have we thought about using more of the pub/sub model of Redis in pushing and popping ? Is that what's actually implied in the picture? Third, how would you re-arrange the base bot class so that it follows this principle? What features does an abstract bot class need?

--> maybe some of these questions are answered by looking at the AIL source :)

Rafiot commented 9 years ago

With my solution, you can make loops, which is a feature and not a bug because I fetch onions and process them again :) Obviously, it only work if you use redis, and not ZMQ because you can't have multiple publisher on one one context.

In AIL, the 'message' is the path to the paste on the disk. In intelmq, it should be the json representation of one event (serialized).

tl;dr: QueueIn and QueueOut will serialize and deserialize an event properly so any of them can be processed independently by the multiple Module scripts. This part is hidden and nobody creating a module should have to deal with that complexity (that's the case in AIL).

SYNchroACK commented 9 years ago

@aaronkaplan I think we miss something in the picture. Look carefully to arrows and you will see: collector -> parser -> experts -> output

To be clear, new image: newpic

aaronkaplan commented 9 years ago

yes, I saw that. :) The DAG (https://en.wikipedia.org/wiki/Directed_acyclic_graph) is a DAG :) Well, acyclical. But the underlying undirected graph has cycles. It hurts my eyes. But I know right now it does not harm the processing :)

But yes, I agree your approach would work as well of course.

To be maybe clearer: I prefer one cymru bot which can do things in parallel (internally! for example with asynchronous DNS) and show up as one single box in the display. Or you hide the internal multiple parallel processes in the GUI.

Rafiot commented 9 years ago

The schema I did yesterday is for one queue.

in my terminology, Alienvault-collector is one queue, alienvault-parser is one queue, cymru-to-asn is one queue.

it doesn't make any sense to have multiple alienvault parser so we would not have multiple processes but the parser could be started multiple times, on multiple files and obviously cymru-to-asn will be started multiple times

Rafiot commented 9 years ago

Here is the very initial version of the queuing code I'm porting to a standalone module from AIL: https://github.com/Rafiot/MultiProcQueue

It is quite far from being ready, but most of the logic in there.

Rafiot commented 9 years ago

The code is almost ready, I use the same concepts as IntelMQ to make it an (easy enough) drop-in replacement. I will create a branch in IntelMQ for testing ASAP.

In theory the following should be enough:

We will see in practice... :)

Rafiot commented 9 years ago

Short update: I'm going to update my code to use disque[1] instead of my custom queuing system: it seems much more reliable, is properly supported, and will much more easily give the possibility to use multiprocessing.

[1] https://github.com/antirez/disque

adulau commented 9 years ago

From the disque repo:

WARNING: This is alpha code NOT suitable for production. The implementation and API will likely
change in significant ways during the next months. The code and algorithms are not tested enough. A
lot more work is needed.

What do you think? Which one is more reliable?

Rafiot commented 9 years ago

mine is, I think, more reliable for now. But also not used in production either so I cannot really tell. I really want to give a try to disque, because it would take care of the whole subscriber to set and set to publisher code, which is something I would rather enjoy.

Rafiot commented 9 years ago

Here is the testing code I wrote: https://github.com/Rafiot/MultiProcQueue/tree/master/example_disque

+:

-:

ondj commented 8 years ago

Hi,

because we need to process thousands of messages and use remote calls in experts, multiprocessing is necessary. But I think multiprocessing thought load_balance is broken by design because you must define new bot, copy configuration in runtime config file and define new pipelines. And if you find out, that for example 2 processes are not enough, you must change the whole configuration again.

My proposal is really simple: use only one source and destination queue, but every process uses only own internal queue. This is possible because Redis command BRPOPLPUSH used in receive method is atomic, so there is not a problem with concurrency.

With a different internal queue, it is possible to run bot multiple times or use Python threading.

What do you think about this solution?

aaronkaplan commented 4 years ago

@wagner-certat is this the case by now? each bot an internal queue?

ghost commented 3 years ago

internal queues are only necessary for the redis MQ, we do not use & need them for the AMQP MQ. This is also the reason why multithreading is only implemented for AMQP and not Redis - to implement it for redis would mean high efforts as we would need to implement more MQ functionality ourselves.

I'd be fine if - in case we implement multiprocessing - the feature is not available for all message queues (as with mutlithreading).