LinkedInAttic / Cubert

Fast and efficient batch computation engine for complex analysis and reporting of massive datasets on Hadoop
http://linkedin.github.io/Cubert/
Apache License 2.0
246 stars 61 forks source link

JOIN Operator failed to parse #7

Closed OopsOutOfMemory closed 9 years ago

OopsOutOfMemory commented 9 years ago

Hi, @mparkhe Would you like to give an example for a join operator. I follow the document in http://linkedin.github.io/Cubert/operators/join.html but seems not work.

JOB "Join count words"
    REDUCERS 5;
    MAP {
        a = LOAD "/cubert/words.txt" USING TEXT("schema": "STRING word");
    }

    MAP {
        b = LOAD "/cubert/words.txt" USING TEXT("schema": "STRING word");
    }

    test_joined = HASH-JOIN a BY word, b BY word;

    STORE test_joined INTO "/cubert/woud_count/join_output" USING TEXT();
END

shengli-mac$ cubert join.cmr Using HADOOP_CLASSPATH=:/Users/shengli/git_repos/cubert/release/lib/* line 13:9 mismatched input '=' expecting {'.', ID} line 13:23 mismatched input 'BY' expecting '{'

Cannot parse cubert script. Exiting. PROGRAM "Join Word Count";

Cannot compile cubert script. Exiting. Exception in thread "main" java.text.ParseException at com.linkedin.cubert.plan.physical.PhysicalParser.parsingTask(PhysicalParser.java:197) at com.linkedin.cubert.plan.physical.PhysicalParser.parseInputStream(PhysicalParser.java:161) at com.linkedin.cubert.plan.physical.PhysicalParser.parseProgram(PhysicalParser.java:156) at com.linkedin.cubert.ScriptExecutor.compile(ScriptExecutor.java:304) at com.linkedin.cubert.ScriptExecutor.main(ScriptExecutor.java:523) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.util.RunJar.main(RunJar.java:212)

suvodeep-pyne commented 9 years ago

@shengli

You are using a multimap. This kind of performs an implicit UNION. The final output from each multimap should have the same name for it to compile. Also, you are missing a Reduce Phase and you have 5 reducers. Plus you need to mention the SHUFFLE phase as well. However, in this case I think you are trying to do a join here? That can be done in a single MAP block.

I haven't tested the following code give it a shot and let me know if that works.

JOB "Join count words" REDUCERS 5; MAP { a = LOAD "/cubert/words.txt" USING TEXT("schema": "STRING word");

b = LOAD-CACHED "/cubert/words.txt" USING TEXT("schema": "STRING word");

test_joined = HASH-JOIN a BY word, b BY word;

} STORE test_joined INTO "/cubert/woud_count/join_output" USING TEXT(); END

Regards Suvodeep

On Tue, Jun 23, 2015 at 1:11 AM, Sheng, Li notifications@github.com wrote:

Hi, @mparkhe https://github.com/mparkhe Would you like to give an example for a join operator. I follow the document in http://linkedin.github.io/Cubert/operators/join.html but seems not work.

JOB "Join count words" REDUCERS 5; MAP { a = LOAD "/cubert/words.txt" USING TEXT("schema": "STRING word"); }

MAP {
    b = LOAD "/cubert/words.txt" USING TEXT("schema": "STRING word");
}

test_joined = HASH-JOIN a BY word, b BY word;

STORE test_joined INTO "/cubert/woud_count/join_output" USING TEXT();

END

shengli-mac$ cubert join.cmr Using HADOOP_CLASSPATH=:/Users/shengli/git_repos/cubert/release/lib/* line 13:9 mismatched input '=' expecting {'.', ID} line 13:23 mismatched input 'BY' expecting '{'

Cannot parse cubert script. Exiting. PROGRAM "Join Word Count";

Cannot compile cubert script. Exiting. Exception in thread "main" java.text.ParseException at com.linkedin.cubert.plan.physical.PhysicalParser.parsingTask(PhysicalParser.java:197) at com.linkedin.cubert.plan.physical.PhysicalParser.parseInputStream(PhysicalParser.java:161) at com.linkedin.cubert.plan.physical.PhysicalParser.parseProgram(PhysicalParser.java:156) at com.linkedin.cubert.ScriptExecutor.compile(ScriptExecutor.java:304) at com.linkedin.cubert.ScriptExecutor.main(ScriptExecutor.java:523) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.util.RunJar.main(RunJar.java:212)

— Reply to this email directly or view it on GitHub https://github.com/linkedin/Cubert/issues/7.

OopsOutOfMemory commented 9 years ago

Hi, @suvodeep-pyne @mparkhe Thanks for your reply. Here I'm trying to do a join operation. Maybe I'm using the wrong pattern(like UNION here).

Problem 1. I tried your code, exception throws:

2015-06-25 10:11:31,885 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.lang.UnsupportedOperationException
    at com.linkedin.cubert.io.text.TextStorage.getCachedFileReader(TextStorage.java:89)
    at com.linkedin.cubert.block.LocalFileBlock.configure(LocalFileBlock.java:45)
    at com.linkedin.cubert.operator.LoadBlockFromCacheOperator.setInput(LoadBlockFromCacheOperator.java:47)
    at com.linkedin.cubert.plan.physical.PhaseExecutor.prepareOperatorChain(PhaseExecutor.java:259)
    at com.linkedin.cubert.plan.physical.PhaseExecutor.<init>(PhaseExecutor.java:111)
    at com.linkedin.cubert.plan.physical.CubertMapper.run(CubertMapper.java:115)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

Problem 2. My scenario is : Job 1 -> load a file and BLOCKGEN partitioned on someKeys store the BLOCKGEN result into path1 Job 2 -> load another file with different path(not from distributed-cache) and BLOCKGEN partitioned on someKeys store the BLOCKGEN result into path2 Job3 -> I'm trying to join the two datasets on the join key.

What exactly make me confused is the official document dose not mention where to place the code example(

// inner join; default
joined_dataset = HASH-JOIN relation1 BY joinKey1, relation2 BY joinKey2;

// outer joins
joined_dataset = HASH-JOIN LEFT OUTER relation1 BY joinKey1, relation2 BY joinKey2;
joined_dataset = HASH-JOIN RIGHT OUTER relation1 BY joinKey1, relation2 BY joinKey2;
joined_dataset = HASH-JOIN FULL OUTER relation1 BY joinKey1, relation2 BY joinKey2;

) on. (I mean where to load the two relations, in one MAP?)

My test results shows if I use LOAD operator to load two datasets in one MAP section, job will throws exception.

PROGRAM "Join Word Count";

JOB "join count words"
MAP {
      a = LOAD "/cubert/words.txt" USING TEXT("schema": "STRING word");
      b = LOAD "/cubert/words.txt" USING TEXT("schema": "STRING word");
      test_joined = HASH-JOIN a BY word, b BY word;
}
        STORE test_joined INTO "/cubert/woud_count/join_output" USING TEXT("overwrite":"true");
END

Exceptions:

line 6:15 mismatched input '"/cubert/words.txt"' expecting {'.', ID}
line 6:45 mismatched input '(' expecting {',', '{'}

Cannot compile cubert script. Exiting.
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: LOAD"/cubert/words.txt"USING
    at com.linkedin.cubert.plan.physical.PhysicalParser$PhysicalListener.exitUriOperator(PhysicalParser.java:2095)
    at com.linkedin.cubert.antlr4.CubertPhysicalParser$UriOperatorContext.exitRule(CubertPhysicalParser.java:3917)
    at org.antlr.v4.runtime.tree.ParseTreeWalker.exitRule(ParseTreeWalker.java:71)
    at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:54)
    at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:52)
    at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:52)
    at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:52)
    at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:52)
    at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:52)
    at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:52)
    at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:52)
    at com.linkedin.cubert.plan.physical.PhysicalParser.parsingTask(PhysicalParser.java:192)
    at com.linkedin.cubert.plan.physical.PhysicalParser.parseInputStream(PhysicalParser.java:161)
    at com.linkedin.cubert.plan.physical.PhysicalParser.parseProgram(PhysicalParser.java:156)
    at com.linkedin.cubert.ScriptExecutor.compile(ScriptExecutor.java:304)
    at com.linkedin.cubert.ScriptExecutor.main(ScriptExecutor.java:523)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: LOAD"/cubert/words.txt"USING
    at com.linkedin.cubert.functions.builtin.FunctionFactory.createFunctionObject(FunctionFactory.java:151)
    at com.linkedin.cubert.plan.physical.PhysicalParser$PhysicalListener.exitUriOperator(PhysicalParser.java:2089)
    ... 20 more
Caused by: java.lang.ClassNotFoundException: LOAD"/cubert/words.txt"USING
    at java.lang.Class.forName0(Native Method)

Hope you can give me some advices : ) Thanks!

suvodeep-pyne commented 9 years ago

If I understand this correctly.

That should give you the output of the join.This would be a merge join in that case.

Other thoughts:

If your 2nd table is small enough to fit entirely in memory, then you can use HASHJOIN. In the hash join case then, the 2nd table needs to be copied into distributed cache.

Also, you are correct, there can be only 1 LOAD statement in a map. Other loads are the LOAD-CACHED (present in Distributed cache) or LOAD BLOCK which is used to load a matching block (same index as the primary block)

Documentation is currently a work in progress and we will be adding more sample cases to illustrate computation patterns.

Regards

On Wed, Jun 24, 2015 at 7:50 PM, Sheng, Li notifications@github.com wrote:

Hi, @suvodeep-pyne https://github.com/suvodeep-pyne Thanks for your reply. Here I'm trying to do a join operation. Maybe I'm using the wrong pattern(like UNION here).

Problem 1. I tried your code, exception throws:

2015-06-25 10:11:31,885 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.lang.UnsupportedOperationException at com.linkedin.cubert.io.text.TextStorage.getCachedFileReader(TextStorage.java:89) at com.linkedin.cubert.block.LocalFileBlock.configure(LocalFileBlock.java:45) at com.linkedin.cubert.operator.LoadBlockFromCacheOperator.setInput(LoadBlockFromCacheOperator.java:47) at com.linkedin.cubert.plan.physical.PhaseExecutor.prepareOperatorChain(PhaseExecutor.java:259) at com.linkedin.cubert.plan.physical.PhaseExecutor.(PhaseExecutor.java:111) at com.linkedin.cubert.plan.physical.CubertMapper.run(CubertMapper.java:115) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

Problem 2. My scenario is : Job 1 -> load a file and BLOCKGEN partitioned on someKeys store the BLOCKGEN result into path1 Job 2 -> load another file with different path(not from distributed-cache) and BLOCKGEN partitioned on someKeys store the BLOCKGEN result into path2 Job3 -> I'm trying to join the two datasets on the join key.

What exactly make me confused is the official document dose not mention where to place the code example(

// inner join; default joined_dataset = HASH-JOIN relation1 BY joinKey1, relation2 BY joinKey2;

// outer joins joined_dataset = HASH-JOIN LEFT OUTER relation1 BY joinKey1, relation2 BY joinKey2; joined_dataset = HASH-JOIN RIGHT OUTER relation1 BY joinKey1, relation2 BY joinKey2; joined_dataset = HASH-JOIN FULL OUTER relation1 BY joinKey1, relation2 BY joinKey2;

) on. (I mean where to load the two relations, in one MAP?)

My test results shows if I use LOAD operator to load two datasets in one MAP section, job will throws exception.

PROGRAM "Join Word Count";

JOB "join count words" MAP { a = LOAD "/cubert/words.txt" USING TEXT("schema": "STRING word"); b = LOAD "/cubert/words.txt" USING TEXT("schema": "STRING word"); test_joined = HASH-JOIN a BY word, b BY word; } STORE test_joined INTO "/cubert/woud_count/join_output" USING TEXT("overwrite":"true"); END

Exceptions:

line 6:15 mismatched input '"/cubert/words.txt"' expecting {'.', ID} line 6:45 mismatched input '(' expecting {',', '{'}

Cannot compile cubert script. Exiting. Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: LOAD"/cubert/words.txt"USING at com.linkedin.cubert.plan.physical.PhysicalParser$PhysicalListener.exitUriOperator(PhysicalParser.java:2095) at com.linkedin.cubert.antlr4.CubertPhysicalParser$UriOperatorContext.exitRule(CubertPhysicalParser.java:3917) at org.antlr.v4.runtime.tree.ParseTreeWalker.exitRule(ParseTreeWalker.java:71) at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:54) at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:52) at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:52) at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:52) at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:52) at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:52) at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:52) at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:52) at com.linkedin.cubert.plan.physical.PhysicalParser.parsingTask(PhysicalParser.java:192) at com.linkedin.cubert.plan.physical.PhysicalParser.parseInputStream(PhysicalParser.java:161) at com.linkedin.cubert.plan.physical.PhysicalParser.parseProgram(PhysicalParser.java:156) at com.linkedin.cubert.ScriptExecutor.compile(ScriptExecutor.java:304) at com.linkedin.cubert.ScriptExecutor.main(ScriptExecutor.java:523) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.util.RunJar.main(RunJar.java:212) Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: LOAD"/cubert/words.txt"USING at com.linkedin.cubert.functions.builtin.FunctionFactory.createFunctionObject(FunctionFactory.java:151) at com.linkedin.cubert.plan.physical.PhysicalParser$PhysicalListener.exitUriOperator(PhysicalParser.java:2089) ... 20 more Caused by: java.lang.ClassNotFoundException: LOAD"/cubert/words.txt"USING at java.lang.Class.forName0(Native Method)

Hope you can give me some advices : ) Thanks!

— Reply to this email directly or view it on GitHub https://github.com/linkedin/Cubert/issues/7#issuecomment-115081696.

OopsOutOfMemory commented 9 years ago

Thanks, @suvodeep-pyne , It's ok for me to perform a JOIN operation now.

BTW, how to perform a Column Pruning after a JOIN Operation since result of join will always output each columns of the two datasets.

eg: given two relations:

a (col1,col2,joinkey1)
b (col3,col4,joinkey2)

result = JOIN a jonkey1, b joinkey2 

result will list all the columns of a and b.

What I really wanted is only output columns like a.col1, a.col2, b.col4, a.joinkey1.

suvodeep-pyne commented 9 years ago

Just use a generate statement. That should do it.

On Thursday, June 25, 2015, Sheng, Li notifications@github.com wrote:

Thanks, @suvodeep-pyne https://github.com/suvodeep-pyne , It's ok for me to perform a JOIN operation now.

BTW, how to perform a Column Pruning after a JOIN Operation since result of join will always output each columns of the two datasets.

eg: given two relations:

a (col1,col2,joinkey1) b (col3,col4,joinkey2)

result = JOIN a jonkey1, b joinkey2

result will list all the columns of a and b.

What I really wanted is only output columns like a.col1, a.col2, b.col4, a.joinkey1.

— Reply to this email directly or view it on GitHub https://github.com/linkedin/Cubert/issues/7#issuecomment-115505230.

OopsOutOfMemory commented 9 years ago

@suvodeep-pyne Thank you for helping me resolve this issue : ) It's very efficient than Hive in cube and complex aggregation computing.

One more question about JOIN:

Do we support join on multiple datasets in one statement ?

like: result = JOIN a BY joinkey1, b BY joinkey2, c BY joinkey3, d BY joinkey4

or other operation combination can achieve this goal ?

suvodeep-pyne commented 9 years ago

AFAIK, I don't think you will be able to join multiple datasets at once. However, if the datasets are copartitioned i.e. they have the same index, then you can use LOAD BLOCK to load the blocks in one single phase (MAP or REDUCE) and join them one by one using a MERGE join (they need to be sorted on the join key). You can have multiple LOAD BLOCK statements.

On Thu, Jun 25, 2015 at 11:37 PM, Sheng, Li notifications@github.com wrote:

@suvodeep-pyne https://github.com/suvodeep-pyne Thank you for helping me resolve this issue : ) It's very efficient than Hive in cube and complex aggregation computing.

One more question about JOIN:

Do we support join on multiple datasets in one statement ?

like: result = JOIN a BY joinkey1, b BY joinkey2, c BY joinkey3, d BY joinkey4

or other operation combination can achieve this goal ?

— Reply to this email directly or view it on GitHub https://github.com/linkedin/Cubert/issues/7#issuecomment-115544605.

OopsOutOfMemory commented 9 years ago

yeah, Thanks。 I think your answer resolved my issue. Close this ~~