Open LaiZhou opened 6 years ago
Do you notice a single task if you remove t2 from your query ?
@jramnara
select * from t1 where user_id='1'
this job will use 56 tasks, I set the lead conf: -spark.executor.cores=100, and the number of this server cores is 56.
I think snappy's design should first prune the partition of the job by predicates, get the ids of the partitions in this job, then shcedule tasks .
can someone tell me the internal design of it?
If table t1 is partitioned on user_id you will should see only a single task being executed. I didn't follow your observation on the core count. Is there a problem ?
Jags SnappyData blog http://www.snappydata.io/blog Download binary, source https://www.snappydata.io/download
On Thu, Jul 26, 2018 at 7:32 PM, jessyZu notifications@github.com wrote:
@jramnara https://github.com/jramnara select * from t1 where user_id='1' this job will use 56 tasks, I set the lead conf: -spark.executor.cores=100, and the number of this server cores is 56.
I think snappy's design should first prune the partition of the job by predicates, get the ids of the partitions in this job, then shcedule tasks .
can someone tell me the internal design of it?
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/SnappyDataInc/snappydata/issues/1108#issuecomment-408293122, or mute the thread https://github.com/notifications/unsubscribe-auth/AB2KBnZbxRCUKhUERIAiGsmgctaLgVYtks5uKnvTgaJpZM4Vhq17 .
CREATE TABLE t0 ( user_id varchar(100) ,
order_original_id bigint NOT NULL PRIMARY KEY ,apply_time string,loan_type int,is_eff int) USING ROW OPTIONS (PARTITION_BY 'user_id')
create table t1(user_id varchar(100),order_original_id varchar(100),apply_time string,bill_cnt bigint)USING ROW OPTIONS (PARTITION_BY 'user_id',COLOCATE_WITH 't0')
create table t2(user_id varchar(100),order_original_id varchar(100),apply_time string,bill_cnt bigint)
USING ROW OPTIONS (PARTITION_BY 'user_id',COLOCATE_WITH 't0')
insert into t0(user_id,order_original_id) values('1',1)
insert into t0(user_id,order_original_id) values('2',2)
insert into t0(user_id,order_original_id) values('3',3)
insert into t1(user_id,order_original_id,apply_time,bill_cnt) values('1','1','2018-01-01',100)
insert into t1(user_id,order_original_id,apply_time,bill_cnt) values('1','1','2018-01-01',101)
insert into t1(user_id,order_original_id,apply_time,bill_cnt) values('2','2','2018-01-01',101)
select * from t1 where user_id='1'
insert into t2(user_id,order_original_id,apply_time,bill_cnt) values('1','1','2018-01-01',100)
insert into t2(user_id,order_original_id,apply_time,bill_cnt) values('1','1','2018-01-01',101)
insert into t2(user_id,order_original_id,apply_time,bill_cnt) values('2','2','2018-01-01',101)
select a.*,b.* from
( select user_id
,order_original_id
,apply_time,sum(bill_cnt) from t1 where user_id='1' and order_original_id='1'
group by user_id
,order_original_id
,apply_time )a
left outer join
(select user_id
,order_original_id
,apply_time,sum(bill_cnt) from t2 where user_id='1' and order_original_id='1' and bill_cnt>100
group by user_id
,order_original_id
,apply_time ) b on a.order_original_id=b.order_original_id and a.user_id=b.user_id and a.user_id='1' and a.order_original_id='1' and b.user_id='1' and b.order_original_id='1'
when execute a join sql , given the user_id in predicates, the job will schedule 128 cores ,128 is the number of the default buckets.
I think If snappy know the number of partition is 1, only 1 task need be scheduled.
You don't want to use column tables? You will see the behavior you expect.
We will probe into the behavior with Row tables.
Jags SnappyData blog http://www.snappydata.io/blog Download binary, source https://www.snappydata.io/download
On Thu, Jul 26, 2018 at 8:07 PM, jessyZu notifications@github.com wrote:
insert into t2(user_id,order_original_id,apply_time,bill_cnt) values('1','1','2018-01-01',100) insert into t2(user_id,order_original_id,apply_time,bill_cnt) values('1','1','2018-01-01',101) insert into t2(user_id,order_original_id,apply_time,bill_cnt) values('2','2','2018-01-01',101)
select a.,b. from ( select user_id ,order_original_id ,apply_time,sum(bill_cnt) from t1 where user_id='1' and order_original_id='1' group by user_id ,order_original_id ,apply_time )a
left outer join
(select user_id ,order_original_id ,apply_time,sum(bill_cnt) from t2 where user_id='1' and order_original_id='1' and bill_cnt>100 group by user_id ,order_original_id ,apply_time ) b on a.order_original_id=b.order_original_id and a.user_id=b.user_id and a.user_id='1' and a.order_original_id='1' and b.user_id='1' and b.order_original_id='1'
when execute a join sql , given the user_id in predicates, the job will schedule 128 cores ,128 is the number of the default buckets. I think If snappy know the number of partition is 1, only 1 task need be scheduled.
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/SnappyDataInc/snappydata/issues/1108#issuecomment-408298012, or mute the thread https://github.com/notifications/unsubscribe-auth/AB2KBnmkt7aVc8JrK9KKCdD31hCDXDVoks5uKoPsgaJpZM4Vhq17 .
I first use column tables , but I worry index may not be well supported in release 1.0.1. I make a test: create index on a column table->insert into a row -> delete the row but the index of this row is not deleted,be stale. will it affect in the future if insert a new row to the table,how to expire the stale row of the index table? and how to set ttl for column table ?
need I maintain the whole lifecycle of the index for clolumn table ?
after dig into this case when use column table , when the number of columns increase a lot, I found a Exception the snappy reported:
org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass": Code of method "snappyhashaggregate_doAggregateWithKeys$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator;)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" grows beyond 64 KB
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:361)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:234)
at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:446)
at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:313)
at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:235)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:204)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:965)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1031)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1028)
at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4004)
at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:916)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:362)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
at org.apache.spark.sql.execution.aggregate.SnappyHashAggregateExec.doExecute(SnappyHashAggregateExec.scala:211)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:272)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:78)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:75)
at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:94)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:74)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:74)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.codehaus.janino.InternalCompilerException: Code of method "snappyhashaggregate_doAggregateWithKeys$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator;)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" grows beyond 64 KB
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:990)
at org.codehaus.janino.CodeContext.write(CodeContext.java:882)
at org.codehaus.janino.UnitCompiler.writeShort(UnitCompiler.java:11892)
at org.codehaus.janino.UnitCompiler.load(UnitCompiler.java:11551)
at org.codehaus.janino.UnitCompiler.load(UnitCompiler.java:11536)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4139)
at org.codehaus.janino.UnitCompiler.access$7200(UnitCompiler.java:212)
at org.codehaus.janino.UnitCompiler$12$1.visitLocalVariableAccess(UnitCompiler.java:4082)
at org.codehaus.janino.UnitCompiler$12$1.visitLocalVariableAccess(UnitCompiler.java:4074)
at org.codehaus.janino.Java$LocalVariableAccess.accept(Java.java:4103)
at org.codehaus.janino.UnitCompiler$12.visitLvalue(UnitCompiler.java:4074)
at org.codehaus.janino.UnitCompiler$12.visitLvalue(UnitCompiler.java:4070)
at org.codehaus.janino.Java$Lvalue.accept(Java.java:3977)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4070)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4135)
at org.codehaus.janino.UnitCompiler.access$6700(UnitCompiler.java:212)
at org.codehaus.janino.UnitCompiler$12$1.visitAmbiguousName(UnitCompiler.java:4077)
at org.codehaus.janino.UnitCompiler$12$1.visitAmbiguousName(UnitCompiler.java:4074)
at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4053)
at org.codehaus.janino.UnitCompiler$12.visitLvalue(UnitCompiler.java:4074)
at org.codehaus.janino.UnitCompiler$12.visitLvalue(UnitCompiler.java:4070)
at org.codehaus.janino.Java$Lvalue.accept(Java.java:3977)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4070)
at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5253)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3477)
at org.codehaus.janino.UnitCompiler.access$5300(UnitCompiler.java:212)
at org.codehaus.janino.UnitCompiler$9.visitAssignment(UnitCompiler.java:3439)
at org.codehaus.janino.UnitCompiler$9.visitAssignment(UnitCompiler.java:3419)
at org.codehaus.janino.Java$Assignment.accept(Java.java:4306)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3419)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2339)
at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:212)
at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1473)
at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1466)
at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2851)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1546)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1532)
at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:212)
at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1472)
at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1466)
at org.codehaus.janino.Java$Block.accept(Java.java:2756)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2444)
at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:212)
at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1474)
at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1466)
at org.codehaus.janino.Java$IfStatement.accept(Java.java:2926)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1546)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1532)
at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:212)
at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1472)
at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1466)
at org.codehaus.janino.Java$Block.accept(Java.java:2756)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1821)
at org.codehaus.janino.UnitCompiler.access$2200(UnitCompiler.java:212)
at org.codehaus.janino.UnitCompiler$6.visitWhileStatement(UnitCompiler.java:1477)
at org.codehaus.janino.UnitCompiler$6.visitWhileStatement(UnitCompiler.java:1466)
at org.codehaus.janino.Java$WhileStatement.accept(Java.java:3031)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1546)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3075)
at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1336)
at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1309)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:799)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:958)
at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:212)
at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:393)
at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:385)
at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1286)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:385)
at org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1285)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:825)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:411)
at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:212)
at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:390)
at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:385)
at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1405)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:385)
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:357)
... 43 more
18/07/27 14:13:30.348 CST broadcast-exchange-10<tid=0x86a> WARN WholeStageCodegenExec: Whole-stage codegen disabled for this plan:
*CachedPlanHelper
+- *SnappyHashAggregate(keys=[USER_ID#7116, ORDER_ORIGINAL_ID#7117, APPLY_TIME#7118], modes=Final
......
18/07/27 14:13:30.358 CST Function Execution Processor1<tid=0x73> INFO CodegenSparkFallback: SnappyData code generation failed. Falling back to Spark plans.
@jramnara
when execute sql like:
select t1.* from t1 join t2 on t1.user_id=t2.user_id and t1.user_id=12345
the table t1,t2 is partitioned by user_id , and 1 server with 128 buckets.snappy can't prune the partition by partitioning columns, will use 128 task to do the job,but 127 of the tasks handle empty data,it's a waste.how to improve perfomance in this case?
@sumwale @rishitesh