datasalt / pangool

Tuple MapReduce for Hadoop: Hadoop API made easy
http://datasalt.github.io/pangool/
Apache License 2.0
57 stars 13 forks source link

"Streams" or different group-by's #39

Open pereferrera opened 10 years ago

pereferrera commented 10 years ago

As pointed out by Alexei: https://groups.google.com/forum/#!topic/pangool-user/iWf3BODBI9o

It would be nice to have the possibility of specifying different group bys for the same Pangool Job. Pig seems to have this possibility built-in in its optimizer. Even though I'm not sure if this is always the most efficient way of executing several groupBy's over the same input, it makes sense that Pangool supports such advanced usages because the philosophy of Pangool is to make "low-level MapReduce easier". If there are low-level use cases that we don't natively support I think it is worth taking a look and seeing how reasonable that is to implement.

Alexei proposes an API based on declaring "Streams", which would be quite different to the current API where you have to specify intermediate schemas. At first sight I wouldn't recommend such an API revamp, because this is still a particular use case and it doesn't make sense to me to make it so "first class".

Here we can follow up a discussion on this. I have been thinking on it and I have a few ideas on the matter:

addGroupBy(groupBy)
  .withSchema(schema1);

So the user can either act as now (use a common group by) or add K particular groupBys to the same Job, each associated with one or more Schemas:

addGroupBy(groupBy)
  .withSchema(schema1)
  .withSchema(schema2);

The Pangool Mapper would check this and emit as many Tuples as needed internally if one schema has more than one groupBy, assigning different stream ids.

addGroupBy(groupBy)
  .withSchema(schema1)
  .withSchema(schema2)
  .withReducer(reducer);

So the Pangool Reducer would instantiate many reducers in this case and delegate to one or another depending on the "stream id". If this methods are used, then the TupleMRBuilder won't complain if setTupleReducer() hasn't been called. If both things are called, the most particular thing wins: for schemas that have a particular reducer associated, this would be the one which would be called, for the rest the "general" reducer would be called.

addGroupBy(groupBy)
  .withSchema(schema1, orderBy1)
  .withSchema(schema2, orderBy2)
  .withReducer(reducer);

This all seems a bit complex, but it is actually challenging to be as backwards compatible as possible, and remain coherent with the whole idea of the API and Pangool.

That would be my two cents as to how this could be implemented.

alexeipab commented 10 years ago

A couple of questions: 1) groupBy, is it a comma separated String of column names or a structure? 2) what happens in cases when there is a need for multiple schemas to be streamed to the specific groupBy and reducer. Ex

"g1,g2" with Schema1, Schema2 in reducer1
"g3,g4" with Schema2, Schema5 in reducer2

3) How to deal with different sorting criteria for different groupBys? It could be very useful in implementing unique counts without consuming RAM, which can be a problem with skewed data sets. 4) How to define partitioning logic based on the specific groupBy?

pereferrera commented 10 years ago

Hello Alexei,

You are right, there were missing cases with respect to your original idea. I have updated the description with the new additions... take a look and see what do you think.

In the end your MRStream proposal covers all cases well, and I'm not sure if my proposal is better or not. I try to fit the functionality more into the current API rather than defining a new entity (MRStream). But maybe the best way to implement this is something like your original idea, I'm not really sure now.

alexeipab commented 10 years ago

Hi Pere,

I think it would be possible to add streams as a separate Builder and it will not require any changes to the existing Pangool API. It looks like the secrete ingredients in Pangool are the SortComparator, GroupComparator, TupleSerialization, HadoopSerialization, Schema, these are quite sophisticated and I would not want to touch them as I think they will work as they are. Over next couple of days I will try to outline another proposal.

pereferrera commented 10 years ago

Hi Alexei,

Yes, we can discuss your proposal. A separate Builder doesn't sound too bad from the user point of view.

Based on what you proposed on the list I have some feedback:

And regarding serialization: we should find a name for the integer representing now the schemaId which I called "stream_id". I think what I wrote is still valid: there should be no need to add another integer to the current serialization mechanism.

alexeipab commented 10 years ago

Hi Pere,

Yes I agree, streams might not be the best name. Before encountering Hadoop I used to play with CUDA on NVidia cards (not for math, but for adhoc queries on data) and after that I see Hadoop to be very similar to a big graphics cards with more flexibility in writing kernels. When I build MR code I see Mapper and Reducer stages as slots to run specific kernels with spills and merge sort managing the flow of data between mapper stage and reducer stage. With current Pangool API it is possible to specify multiple Mapper implementations (mapper kernels) for different input paths, but not possible to do the same for Reducer implementation. May be we could introduce MRKernel API with MKernel and RKernel?

I think TupleMRConfigBuilder has most of the things required to define a single RKernel.

In the future I want to build a GUI ETL Tool similar to AB-Initio or Informatica, which does not hide Map Reduce and Java, but relies on it. At this moment Pangool is the best execution platform for it. If I could add the Kernels in, than the rest will be easy.