prdn / pigato

PIGATO - an high-performance Node.js microservices framework
http://prdn.github.io/pigato/
MIT License
301 stars 31 forks source link

way to map/query across all workers #2

Open bmeck opened 10 years ago

bmeck commented 10 years ago

design / bikeshed

this is a common problem but being able to run a reduce across workers is important.

right now we can use a separate registry from the broker to generate a list of workers and run a reduce across them.

being able to wait on / run a reduce across all workers of a service would be a big win. technically all that is needed is the ability to:

since pigato is more than just a simple omdp we should discuss if this should be a service or built into the broker.

prdn commented 10 years ago

Are you meaning a map reduce implemention in pigato? I like it!

The best thing for me is to create a worker that handles this kind of request.

I think that there should be no need to know how many workers are available. A client when sends a REDUCE request could also define how many workers use. Then all the generated requests will be queued and any available worker for the given service will be used automatically.

Please tell me if I misunderstood your thoughts.

Paolo

On 15 Oct 2014 17:59, "Bradley Meck" notifications@github.com wrote:

design / bikeshed

this is a common problem but being able to run a reduce across workers is important.

right now we can use a separate registry from the broker to generate a list of workers and run a reduce across them.

being able to wait on / run a reduce across all workers of a service would be a big win. technically all that is needed is the ability to:

get a list of workers for a service send targeted messages to workers

since pigato is more than just a simple omdp we should discuss if this should be a service or built into the broker.

— Reply to this email directly or view it on GitHub.

bmeck commented 10 years ago

yes this is a map reduce across workers, where the workers are the source of state. In our usage we have workers sitting on boxes monitoring processes. Since the processes are tied to physical machines we cannot run out to a shared DB etc. to get a single source of data.

so if we want to query all workers, somehow. We can do the reduce client side, or server side. I would prefer not to send a reduce method over the wire like many map/reduce impls.

Renaming issue to just be a global map()

bmeck commented 9 years ago

@prdn starting impl, will need a new set of queueing on the controller.

controller.wrq[workerId] worker request queue, if the queue matches [workerId] will pick up work from this rather than calling dispatch. Means that mapping could starve srq but you might have a better idea.

bmeck commented 9 years ago

just going to have map send normal looking requests to all workers, they should be indistinguishable for the worker. unless you can think of a reason why workers need to know that this specific request a is a map request.

prdn commented 9 years ago

I think that a worker shouldn't know that a request is map request.

I have a couple of questions to better suggest a solution.

How do you know that all required workers are connected to the broker? The client will specify how many workers are required? Who will perform the reducing operation? And it is important to remember that the client could receive incomplete replies due the fact that all not required workers were connected.

We may think to a slightly different pattern. For example a broadcast request from the client to a specific service. All the workers will answer to that request.

Let me know what do you think

bmeck commented 9 years ago

How do we know that all required workers are connected?

You don't, just query what you have and see if the results have the needed info

How many workers required?

I was just going to queue the request to all currently connected workers for a service

Who performs the reduce?

Reduce will be left to the client. Since the request is not seen as a map request, the broker needs to tag responses for multiplexing purposes.

Incomplete replies

Unsure what you mean, if we are pulling multiple streams of data the concept of a complete reply would be difficult. We may not know when a stream ends (until it does).

prdn commented 9 years ago

Ok now it's more clear. Thank you.

If I've well understood the client receives a single multiplexed steam from the broker that is the result of all the workers streams. Is it correct?

bmeck commented 9 years ago

yes

prdn commented 9 years ago

Ok so the client could not have any builtin logic of reducing but receives a stream as normally. I like it.

prdn commented 9 years ago

I have to think why we need a new queue. Starting from the request id the broker could multiplex the streams from multiple brokers in one single steam.

bmeck commented 9 years ago

@prdn let me know if you have any thoughts to avoid the new queue.

right now we need a new queue because we can only queue based upon service name and not target requests to workers. if we wait for a dispatch on a service and see the queued request is not for our service id we would need to skip that request (but keep it at the front of the queue to avoid starvation). I did not see a sanctioned way to do this.

prdn commented 9 years ago

Ok. Let's do this. In the future we may merge the two queues in a more generic one. Can't say before seeing what you have in mind.

Thank you

alexeygolev commented 9 years ago

Maybe I'm wrong, but isn't ventilator/sink pattern a better fit for this?

bmeck commented 9 years ago

@alexeygolev it would be a better fit in the sense of doing parallel tasks but we start to get into territory where zmq is not exactly a good fit to my experience. We would need a multiplexed sink and ventilator per service on the broker. Preferably without opening a port per sink/ventilator. If you know a good way to do this I would be game and it would make things easier.

alexeygolev commented 9 years ago

@bmeck sounds like a mission... will research/play around

bmeck commented 9 years ago

@alexeygolev one thing to note is service workers may contain state so we need to be sure that the query hits all the workers

prdn commented 9 years ago

Guys. What about this solution..

To implement this scenario we have only to:

This approach is interesting for me because there are only little modifications for the broker and we can move the complexity to specialised workers. In the future we may implement other patterns without increasing broker complexity.

What do you think about this?

bmeck commented 9 years ago

as long as there is a way to query all of the workers reliably thats fine, but seems to complicate things since you have to manage mapreduce separately and cannot just grab a normal query from all.

On Fri, Jan 9, 2015 at 5:17 AM, Paolo Ardoino notifications@github.com wrote:

Guys. What about this solution..

  • a client send a map request on a particular service (i.e. '$mapreduce'). The request contains in his payload the real service for the request (es. 'echo') and for example the minimum number of workers needed
  • a 'special' worker handles that type of requests
  • when the worker receives a $mapreduce request it asks the list of available workers in the 'echo' service
  • the $mapreduce worker generates a request for each one of the 'echo' workers (including in the request also the respective workerId that must handle that request)
  • the $mapreduce worker collects partial and final replies and replies back to the client

To implement this scenario we have only to:

  • create a specialised worker that handles $mapreduce requests
  • add to the broker the possibility to be queried on the available workers for a given service (we can set an authentication for this kind of query)
  • add the broker to serve a request to a particular workerId

This approach is interesting for me because there are only little modifications for the broker and we can move the complexity to specialised workers. In the future we may implement other patterns without increasing broker complexity.

What do you think about this?

— Reply to this email directly or view it on GitHub https://github.com/prdn/pigato/issues/2#issuecomment-69322199.

prdn commented 9 years ago

For the Client this is totally transparent. We may for example leave the Client send requests to the 'echo' service and specify the option $mapreduce=true in the options object. Basically the multiplex-logic is moved to a Worker instead of keeping it in the Worker.

On Fri Jan 09 2015 at 12:55:51 PM Bradley Meck notifications@github.com wrote:

as long as there is a way to query all of the workers reliably thats fine, but seems to complicate things since you have to manage mapreduce separately and cannot just grab a normal query from all.

On Fri, Jan 9, 2015 at 5:17 AM, Paolo Ardoino notifications@github.com wrote:

Guys. What about this solution..

  • a client send a map request on a particular service (i.e. '$mapreduce'). The request contains in his payload the real service for the request (es. 'echo') and for example the minimum number of workers needed
  • a 'special' worker handles that type of requests
  • when the worker receives a $mapreduce request it asks the list of available workers in the 'echo' service
  • the $mapreduce worker generates a request for each one of the 'echo' workers (including in the request also the respective workerId that must handle that request)
  • the $mapreduce worker collects partial and final replies and replies back to the client

To implement this scenario we have only to:

  • create a specialised worker that handles $mapreduce requests
  • add to the broker the possibility to be queried on the available workers for a given service (we can set an authentication for this kind of query)
  • add the broker to serve a request to a particular workerId

This approach is interesting for me because there are only little modifications for the broker and we can move the complexity to specialised workers. In the future we may implement other patterns without increasing broker complexity.

What do you think about this?

— Reply to this email directly or view it on GitHub https://github.com/prdn/pigato/issues/2#issuecomment-69322199.

— Reply to this email directly or view it on GitHub https://github.com/prdn/pigato/issues/2#issuecomment-69330766.

bmeck commented 9 years ago

@prdn are we tied to the special worker approach, would a PR for allowing normal workers to act as part of a reduce be fine?

prdn commented 9 years ago

We are not tied to that approach. I'm truly interested in seeing your PR. I've started to work on broadcasting and related thing to reach then the map-reduce functionality, but I'm open to anything.

prdn commented 9 years ago

@bmeck could you explain a little bit which are the proposed modifications to the worker and how it can impact the current behaviour. I wish only to remain as generic as possible with base classes

bmeck commented 9 years ago

This would be completely transparent to the worker. It would require that the broker be able to multiplex receiving multiple streams from different workers. To the worker there would be no need to know that it is being asked as a batch request. The map function is a bit more complicated though, we can perform it client side or via a well known service. In a pseudocode sort of summary it looks like:

// client sends a batch request
request = client.queryBatch(time_service, get_time);
// ... on the broker
broker.startMultipler(get_all_services(time_service))
//
// dispatches go out when appropriate
//
// ... on any of the workers requested
worker.sendToBroker(some_data)
// ... on the broker
broker.sendToClient(multiplex(worker.id, some_data))
// ... on the client
// for each worker that replies
request.on('reply', function (rep) {
  // handle our reply like normal
});
//
// continues until all workers end
//
broker.sendBatchDoneToClient()

In this case the broker and the client need to be altered, but not the Worker. This means that you can send batch requests without any issues.

Now onto mapping...

Mapping is a bit interesting because I don't think it should be in the broker at all. For now I think it should live on the client and the client should just deal w/ the multiple replies as it comes in. We could do the map on the broker or make a well known function signature for how to accept a map request; but I think that should be put off until the ups/downs of this are more visible.

bmeck commented 9 years ago

as a side note we are faking this in a similar way to do multiplexed responses from a worker and sending back Server Sent Events; just would be using this on the broker for multiplexing worker responses rather than parts of the worker's reply stream.

prdn commented 9 years ago

If I well understand the Broker should offer a way to retrieve all workers available for a given service. The client is the real multiplexer right?

For example, let's say that the Broker answers to a Client request called $directory giving all the workerIds belonging to a given service. Then the client sends N requests (N = number of workers). Each Request specifies the workerId (possible in the pigato master). Then the Client receives all the requests and multiplexes them into a single stream.

Does this correspond to your scenario?

bmeck commented 9 years ago

in my scenario the client uses 1 connection and the broker makes multiple connections. The client is demultiplexing a single stream into multiple replies sent from that single connection to the broker.

We could make the client do a request for all the current workers of a service, then request data from each one; however, this would be multiple connections for the client which also means many connections through the broker. I am not sure if there would be an advantage to doing it this way.

On Sat, Apr 11, 2015 at 5:23 AM, Paolo Ardoino notifications@github.com wrote:

If I well understand the Broker should offer a way to retrieve all workers available for a given service. The client is the real multiplexer right?

For example, let's say that the Broker answers to a Client request called $directory giving all the workerIds belonging to a given service. Then the client sends N requests (N = number of workers). Each Request specifies the workerId (possible in the pigato master). Then the Client receives all the requests and multiplexes them into a single stream.

Does this correspond to your scenario?

— Reply to this email directly or view it on GitHub https://github.com/prdn/pigato/issues/2#issuecomment-91822473.

prdn commented 9 years ago

in my suggested scenario the client will use a single connection but sends multiple requests on that connection. The broker maintains its simplicity and let requests flow as normal. You are probably right that your solution is a bit more performing but on the other hand it implies adding complexity to the broker.

If it is not too much effort for you, don't you mind if we start with a client demultiplexer? But if you prefer and feel more comfortable with your scenario and we are able to keep the actual broker low complexity, please go ahead with that and we will discuss that eventually in the future.

bmeck commented 9 years ago

we can do it client multiplexed first, but then we will need to expose what the worker ids are, which is fine.

prdn commented 9 years ago

@bmeck please take a look to this commit f7127f21dfbbd71dc133983a394063c28046f8a0 , this file https://github.com/prdn/pigato/blob/master/services/Directory.js and this test https://github.com/prdn/pigato/blob/master/test/directory.js

Basically now I have added a new socket to the Broker (inproc) that is used by the Broker to publish its internal status to subscribed core services. The first core service is Directory that offers a '$dir' service that sends to a client an array of workerIds belonging to the given service.

Now we should have all the pieces for the map-reduce. This approach also let us to create other services very easily.

Let me know what do you think about this.