Closed dongyoungy closed 5 years ago
It's very possible, but not sure what is the cause. Were the answers to those individual queries returned from Presto? The log does not yet show that part.
I believe our log does not show intermediate query results. I just ran one of these individual queries separately to confirm that it ran quickly and gave the results like following:
presto> select sum(vt1."horizontal_accuracy") as "agg0", count(*) as "agg1", vt1."verdictdbtier" as "verdictdb_tier_alias_333927_0" from "dy"."raw_scramble" as vt1 where (st_contains((st_polygon('polygon ((40 -85, 45 -85, 45 -83, 41 -83, 40 -85))')), (st_point(vt1."lat", vt1."lon")))) and (vt1."verdictdbblock" = 9) group by vt1."verdictdbtier";
agg0 | agg1 | verdictdb_tier_alias_333927_0
---------------+-------+-------------------------------
1431840.60000 | 19875 | 0
(1 row)
Query 20181111_185248_00165_t9qej, FINISHED, 2 nodes
Splits: 82 total, 82 done (100.00%)
0:01 [1.32M rows, 10.8MB] [2.35M rows/s, 19.2MB/s]
Can you insert a log statement at this line to see if the query answers are returned?
If so, the problem is due to the error estimation logic. Otherwise, the problem is probably due to the in-memory aggregation logic.
I added the logs as follows:
log.info("query start executing.");
VerdictResultStream stream = streamsql(query);
log.info("query stream received.");
if (stream == null) {
return null;
}
QueryResultAccuracyEstimator accEst = new QueryResultAccuracyEstimatorFromDifference(runningCoordinator);
try {
log.info("query stream has next check.");
while (stream.hasNext()) {
log.info("query stream has next.");
VerdictSingleResult rs = stream.next();
log.info("query returned.");
//return rs;
accEst.add(rs);
if (accEst.isLastResultAccurate()) {
return rs;
}
}
// return the last result otherwise
return accEst.getAnswers().get(accEst.getAnswerCount()-1);
and the log shows:
19:58:42.401 [Thread-2] DEBUG org.verdictdb.VerdictContext - com.facebook.presto.jdbc.PrestoDriver has been loaded into the classpath.
19:58:42.407 [Thread-2] DEBUG o.v.c.ConcurrentJdbcConnection - Creating 10 JDBC connections with this url: jdbc:presto://localhost:8080/hive?user=test;loglevel=trace;file_loglevel=trace
19:58:42.783 [Thread-2] DEBUG o.v.connection.JdbcConnection - Issuing the following query to DBMS: SHOW SCHEMAS
19:58:43.327 [Thread-2] DEBUG o.v.connection.JdbcConnection - Issuing the following query to DBMS: SHOW TABLES IN "verdictdbmeta"
19:58:43.432 [Thread-2] DEBUG o.v.connection.JdbcConnection - Issuing the following query to DBMS: select t."original_schema", t."original_table", t."scramble_schema", t."scramble_table", t."added_at", t."data" from "verdictdbmeta"."verdictdbmeta" as t order by "added_at" desc
19:58:43.729 [Thread-2] DEBUG o.v.connection.JdbcConnection - Issuing the following query to DBMS: create schema if not exists verdictdbtemp
19:58:50.480 [Thread-2] INFO o.v.coordinator.ExecutionContext - query start executing.
19:58:50.482 [Thread-2] DEBUG o.v.coordinator.ExecutionContext - Query type: select
19:58:50.486 [Thread-2] DEBUG o.v.connection.JdbcConnection - Issuing the following query to DBMS: SHOW SCHEMAS
19:58:50.609 [Thread-2] DEBUG o.v.connection.JdbcConnection - Issuing the following query to DBMS: DESCRIBE "default"."raw"
19:58:50.752 [Thread-2] INFO o.v.sqlreader.ScrambleTableReplacer - Automatic table replacement: default.raw -> dy.raw_scramble
19:58:50.819 [Thread-2] DEBUG o.v.c.SelectQueryCoordinator - Async plan created.
19:58:50.831 [Thread-2] DEBUG o.v.c.SelectQueryCoordinator - Plan simplification done.
19:58:50.948 [Thread-4] DEBUG o.v.c.execplan.ExecutableNodeRunner - No dependency exists. Simply run org.verdictdb.core.querying.SelectAggExecutionNode@54133ee2[subscriberCount=1,sourceCount=0,aggmeta=org.verdictdb.core.querying.ola.AggMeta@3a643d63[aggAliasPairs={(sum,BaseColumn[schemaName=default,tableSourceAlias=vt1,tableName=raw,columnName=horizontal_accuracy])=agg0, (count,AsteriskColumn)=agg1},tierColumns=[verdictdb_tier_alias_508740_0]]]
19:58:50.949 [Thread-5] DEBUG o.v.c.execplan.ExecutableNodeRunner - No dependency exists. Simply run org.verdictdb.core.querying.SelectAggExecutionNode@551e616b[subscriberCount=1,sourceCount=0,aggmeta=org.verdictdb.core.querying.ola.AggMeta@1837a104[aggAliasPairs={(sum,BaseColumn[schemaName=default,tableSourceAlias=vt1,tableName=raw,columnName=horizontal_accuracy])=agg0, (count,AsteriskColumn)=agg1},tierColumns=[verdictdb_tier_alias_508740_0]]]
19:58:50.950 [Thread-6] DEBUG o.v.c.execplan.ExecutableNodeRunner - No dependency exists. Simply run org.verdictdb.core.querying.SelectAggExecutionNode@240e84e0[subscriberCount=1,sourceCount=0,aggmeta=org.verdictdb.core.querying.ola.AggMeta@2bdbc33[aggAliasPairs={(sum,BaseColumn[schemaName=default,tableSourceAlias=vt1,tableName=raw,columnName=horizontal_accuracy])=agg0, (count,AsteriskColumn)=agg1},tierColumns=[verdictdb_tier_alias_508740_0]]]
19:58:50.951 [Thread-3] DEBUG o.v.c.execplan.ExecutableNodeRunner - No dependency exists. Simply run org.verdictdb.core.querying.SelectAggExecutionNode@1f55ed79[subscriberCount=1,sourceCount=0,aggmeta=org.verdictdb.core.querying.ola.AggMeta@71de3f0b[aggAliasPairs={(sum,BaseColumn[schemaName=default,tableSourceAlias=vt1,tableName=raw,columnName=horizontal_accuracy])=agg0, (count,AsteriskColumn)=agg1},tierColumns=[verdictdb_tier_alias_508740_0]]]
19:58:50.952 [Thread-7] DEBUG o.v.c.execplan.ExecutableNodeRunner - No dependency exists. Simply run org.verdictdb.core.querying.SelectAggExecutionNode@3c5ddf69[subscriberCount=1,sourceCount=0,aggmeta=org.verdictdb.core.querying.ola.AggMeta@6ca47586[aggAliasPairs={(sum,BaseColumn[schemaName=default,tableSourceAlias=vt1,tableName=raw,columnName=horizontal_accuracy])=agg0, (count,AsteriskColumn)=agg1},tierColumns=[verdictdb_tier_alias_508740_0]]]
19:58:50.954 [Thread-10] DEBUG o.v.c.execplan.ExecutableNodeRunner - No dependency exists. Simply run org.verdictdb.core.querying.SelectAggExecutionNode@1680a42b[subscriberCount=1,sourceCount=0,aggmeta=org.verdictdb.core.querying.ola.AggMeta@1739f563[aggAliasPairs={(sum,BaseColumn[schemaName=default,tableSourceAlias=vt1,tableName=raw,columnName=horizontal_accuracy])=agg0, (count,AsteriskColumn)=agg1},tierColumns=[verdictdb_tier_alias_508740_0]]]
19:58:50.954 [Thread-12] DEBUG o.v.c.execplan.ExecutableNodeRunner - No dependency exists. Simply run org.verdictdb.core.querying.SelectAggExecutionNode@747d0fda[subscriberCount=1,sourceCount=0,aggmeta=org.verdictdb.core.querying.ola.AggMeta@739f3074[aggAliasPairs={(sum,BaseColumn[schemaName=default,tableSourceAlias=vt1,tableName=raw,columnName=horizontal_accuracy])=agg0, (count,AsteriskColumn)=agg1},tierColumns=[verdictdb_tier_alias_508740_0]]]
19:58:50.956 [Thread-8] DEBUG o.v.c.execplan.ExecutableNodeRunner - No dependency exists. Simply run org.verdictdb.core.querying.SelectAggExecutionNode@1cb8aa8c[subscriberCount=1,sourceCount=0,aggmeta=org.verdictdb.core.querying.ola.AggMeta@4ecc98e5[aggAliasPairs={(sum,BaseColumn[schemaName=default,tableSourceAlias=vt1,tableName=raw,columnName=horizontal_accuracy])=agg0, (count,AsteriskColumn)=agg1},tierColumns=[verdictdb_tier_alias_508740_0]]]
19:58:50.957 [Thread-2] INFO o.v.coordinator.ExecutionContext - query stream received.
19:58:50.958 [Thread-9] DEBUG o.v.c.execplan.ExecutableNodeRunner - No dependency exists. Simply run org.verdictdb.core.querying.SelectAggExecutionNode@3a2763e3[subscriberCount=1,sourceCount=0,aggmeta=org.verdictdb.core.querying.ola.AggMeta@631252e1[aggAliasPairs={(sum,BaseColumn[schemaName=default,tableSourceAlias=vt1,tableName=raw,columnName=horizontal_accuracy])=agg0, (count,AsteriskColumn)=agg1},tierColumns=[verdictdb_tier_alias_508740_0]]]
19:58:50.958 [Thread-2] INFO o.v.coordinator.ExecutionContext - query stream has next check.
19:58:50.961 [Thread-11] DEBUG o.v.c.execplan.ExecutableNodeRunner - No dependency exists. Simply run org.verdictdb.core.querying.SelectAggExecutionNode@3086fc75[subscriberCount=1,sourceCount=0,aggmeta=org.verdictdb.core.querying.ola.AggMeta@23b4b7b2[aggAliasPairs={(sum,BaseColumn[schemaName=default,tableSourceAlias=vt1,tableName=raw,columnName=horizontal_accuracy])=agg0, (count,AsteriskColumn)=agg1},tierColumns=[verdictdb_tier_alias_508740_0]]]
19:58:50.967 [Thread-5] DEBUG o.v.connection.JdbcConnection - Issuing the following query to DBMS: select sum(vt1."horizontal_accuracy") as "agg0", count(*) as "agg1", vt1."verdictdbtier" as "verdictdb_tier_alias_508740_0" from "dy"."raw_scramble" as vt1 where (st_contains((st_polygon('polygon ((40 -85, 45 -85, 45 -83, 41 -83, 40 -85))')), (st_point(vt1."lat", vt1."lon")))) and (vt1."verdictdbblock" = 2) group by vt1."verdictdbtier"
19:58:50.969 [Thread-8] DEBUG o.v.connection.JdbcConnection - Issuing the following query to DBMS: select sum(vt1."horizontal_accuracy") as "agg0", count(*) as "agg1", vt1."verdictdbtier" as "verdictdb_tier_alias_508740_0" from "dy"."raw_scramble" as vt1 where (st_contains((st_polygon('polygon ((40 -85, 45 -85, 45 -83, 41 -83, 40 -85))')), (st_point(vt1."lat", vt1."lon")))) and (vt1."verdictdbblock" = 5) group by vt1."verdictdbtier"
19:58:50.970 [Thread-10] DEBUG o.v.connection.JdbcConnection - Issuing the following query to DBMS: select sum(vt1."horizontal_accuracy") as "agg0", count(*) as "agg1", vt1."verdictdbtier" as "verdictdb_tier_alias_508740_0" from "dy"."raw_scramble" as vt1 where (st_contains((st_polygon('polygon ((40 -85, 45 -85, 45 -83, 41 -83, 40 -85))')), (st_point(vt1."lat", vt1."lon")))) and (vt1."verdictdbblock" = 7) group by vt1."verdictdbtier"
19:58:50.970 [Thread-9] DEBUG o.v.connection.JdbcConnection - Issuing the following query to DBMS: select sum(vt1."horizontal_accuracy") as "agg0", count(*) as "agg1", vt1."verdictdbtier" as "verdictdb_tier_alias_508740_0" from "dy"."raw_scramble" as vt1 where (st_contains((st_polygon('polygon ((40 -85, 45 -85, 45 -83, 41 -83, 40 -85))')), (st_point(vt1."lat", vt1."lon")))) and (vt1."verdictdbblock" = 6) group by vt1."verdictdbtier"
19:58:50.971 [Thread-4] DEBUG o.v.connection.JdbcConnection - Issuing the following query to DBMS: select sum(vt1."horizontal_accuracy") as "agg0", count(*) as "agg1", vt1."verdictdbtier" as "verdictdb_tier_alias_508740_0" from "dy"."raw_scramble" as vt1 where (st_contains((st_polygon('polygon ((40 -85, 45 -85, 45 -83, 41 -83, 40 -85))')), (st_point(vt1."lat", vt1."lon")))) and (vt1."verdictdbblock" = 1) group by vt1."verdictdbtier"
19:58:50.973 [Thread-3] DEBUG o.v.connection.JdbcConnection - Issuing the following query to DBMS: select sum(vt1."horizontal_accuracy") as "agg0", count(*) as "agg1", vt1."verdictdbtier" as "verdictdb_tier_alias_508740_0" from "dy"."raw_scramble" as vt1 where (st_contains((st_polygon('polygon ((40 -85, 45 -85, 45 -83, 41 -83, 40 -85))')), (st_point(vt1."lat", vt1."lon")))) and (vt1."verdictdbblock" = 0) group by vt1."verdictdbtier"
19:58:50.974 [Thread-6] DEBUG o.v.connection.JdbcConnection - Issuing the following query to DBMS: select sum(vt1."horizontal_accuracy") as "agg0", count(*) as "agg1", vt1."verdictdbtier" as "verdictdb_tier_alias_508740_0" from "dy"."raw_scramble" as vt1 where (st_contains((st_polygon('polygon ((40 -85, 45 -85, 45 -83, 41 -83, 40 -85))')), (st_point(vt1."lat", vt1."lon")))) and (vt1."verdictdbblock" = 3) group by vt1."verdictdbtier"
19:58:50.975 [Thread-7] DEBUG o.v.connection.JdbcConnection - Issuing the following query to DBMS: select sum(vt1."horizontal_accuracy") as "agg0", count(*) as "agg1", vt1."verdictdbtier" as "verdictdb_tier_alias_508740_0" from "dy"."raw_scramble" as vt1 where (st_contains((st_polygon('polygon ((40 -85, 45 -85, 45 -83, 41 -83, 40 -85))')), (st_point(vt1."lat", vt1."lon")))) and (vt1."verdictdbblock" = 4) group by vt1."verdictdbtier"
19:58:50.992 [Thread-12] DEBUG o.v.connection.JdbcConnection - Issuing the following query to DBMS: select sum(vt1."horizontal_accuracy") as "agg0", count(*) as "agg1", vt1."verdictdbtier" as "verdictdb_tier_alias_508740_0" from "dy"."raw_scramble" as vt1 where (st_contains((st_polygon('polygon ((40 -85, 45 -85, 45 -83, 41 -83, 40 -85))')), (st_point(vt1."lat", vt1."lon")))) and (vt1."verdictdbblock" = 9) group by vt1."verdictdbtier"
19:58:51.007 [Thread-11] DEBUG o.v.connection.JdbcConnection - Issuing the following query to DBMS: select sum(vt1."horizontal_accuracy") as "agg0", count(*) as "agg1", vt1."verdictdbtier" as "verdictdb_tier_alias_508740_0" from "dy"."raw_scramble" as vt1 where (st_contains((st_polygon('polygon ((40 -85, 45 -85, 45 -83, 41 -83, 40 -85))')), (st_point(vt1."lat", vt1."lon")))) and (vt1."verdictdbblock" = 8) group by vt1."verdictdbtier"
It looks like it never gets into the while loop: while (stream.hasNext())
Previously, I also tried to return rs
before error estimation logic (which is why you see 'return rs' commented out in the middle) to see if it is the problem, but it seems like that is not the case here.
I see. Can you then check the execution at this line?
That is the step where the results to individual queries are inserted into the H2 database for future aggregation.
It looks like gets stuck in the InMemoryAggreate.createTable():
@Override
public ExecutionInfoToken createToken(DbmsQueryResult result) {
ExecutionInfoToken token = new ExecutionInfoToken();
token.setKeyValue("aggMeta", aggMeta);
token.setKeyValue("dependentQuery", this.selectQuery);
// insert value to in memory database.
VerdictDBLogger log = VerdictDBLogger.getLogger(this.getClass());
String tableName= "";
synchronized (SelectAggExecutionNode.class) {
tableName = inMemoryTableName+selectAggID;
selectAggID++;
}
log.info(String.format("insert result into in-memory DB: %s", tableName));
try {
log.info("before createTable");
InMemoryAggregate.createTable(result, tableName);
log.info("after createTable");
} catch (SQLException e) {
e.printStackTrace();
}
token.setKeyValue("schemaName", "PUBLIC");
token.setKeyValue("tableName", tableName);
return token;
}
log:
...
20:42:57.818 [Thread-5] DEBUG o.v.connection.JdbcConnection - Issuing the following query to DBMS: select sum(vt1."horizontal_accuracy") as "agg0", count(*) as "agg1", vt1."verdictdbtier" as "verdictdb_tier_alias_510640_0" from "dy"."raw_scramble" as vt1 where (st_contains((st_polygon('polygon ((40 -85, 45 -85, 45 -83, 41 -83, 40 -85))')), (st_point(vt1."lat", vt1."lon")))) and (vt1."verdictdbblock" = 2) group by vt1."verdictdbtier"
20:42:57.820 [Thread-4] DEBUG o.v.connection.JdbcConnection - Issuing the following query to DBMS: select sum(vt1."horizontal_accuracy") as "agg0", count(*) as "agg1", vt1."verdictdbtier" as "verdictdb_tier_alias_510640_0" from "dy"."raw_scramble" as vt1 where (st_contains((st_polygon('polygon ((40 -85, 45 -85, 45 -83, 41 -83, 40 -85))')), (st_point(vt1."lat", vt1."lon")))) and (vt1."verdictdbblock" = 1) group by vt1."verdictdbtier"
20:42:57.823 [Thread-12] DEBUG o.v.connection.JdbcConnection - Issuing the following query to DBMS: select sum(vt1."horizontal_accuracy") as "agg0", count(*) as "agg1", vt1."verdictdbtier" as "verdictdb_tier_alias_510640_0" from "dy"."raw_scramble" as vt1 where (st_contains((st_polygon('polygon ((40 -85, 45 -85, 45 -83, 41 -83, 40 -85))')), (st_point(vt1."lat", vt1."lon")))) and (vt1."verdictdbblock" = 9) group by vt1."verdictdbtier"
20:42:57.835 [Thread-10] DEBUG o.v.connection.JdbcConnection - Issuing the following query to DBMS: select sum(vt1."horizontal_accuracy") as "agg0", count(*) as "agg1", vt1."verdictdbtier" as "verdictdb_tier_alias_510640_0" from "dy"."raw_scramble" as vt1 where (st_contains((st_polygon('polygon ((40 -85, 45 -85, 45 -83, 41 -83, 40 -85))')), (st_point(vt1."lat", vt1."lon")))) and (vt1."verdictdbblock" = 7) group by vt1."verdictdbtier"
20:43:00.253 [Thread-6] INFO o.v.c.q.SelectAggExecutionNode - insert result into in-memory DB: VERDICTDB_SELECTAGG0
20:43:00.253 [Thread-6] INFO o.v.c.q.SelectAggExecutionNode - before createTable
20:43:00.350 [Thread-12] INFO o.v.c.q.SelectAggExecutionNode - insert result into in-memory DB: VERDICTDB_SELECTAGG1
20:43:00.350 [Thread-12] INFO o.v.c.q.SelectAggExecutionNode - before createTable
20:43:00.383 [Thread-10] INFO o.v.c.q.SelectAggExecutionNode - insert result into in-memory DB: VERDICTDB_SELECTAGG2
20:43:00.389 [Thread-10] INFO o.v.c.q.SelectAggExecutionNode - before createTable
20:43:00.393 [Thread-5] INFO o.v.c.q.SelectAggExecutionNode - insert result into in-memory DB: VERDICTDB_SELECTAGG3
20:43:00.393 [Thread-5] INFO o.v.c.q.SelectAggExecutionNode - before createTable
20:43:00.449 [Thread-4] INFO o.v.c.q.SelectAggExecutionNode - insert result into in-memory DB: VERDICTDB_SELECTAGG4
20:43:00.449 [Thread-4] INFO o.v.c.q.SelectAggExecutionNode - before createTable
20:43:00.467 [Thread-11] INFO o.v.c.q.SelectAggExecutionNode - insert result into in-memory DB: VERDICTDB_SELECTAGG5
20:43:00.467 [Thread-11] INFO o.v.c.q.SelectAggExecutionNode - before createTable
20:43:00.617 [Thread-7] INFO o.v.c.q.SelectAggExecutionNode - insert result into in-memory DB: VERDICTDB_SELECTAGG6
20:43:00.618 [Thread-7] INFO o.v.c.q.SelectAggExecutionNode - before createTable
20:43:00.618 [Thread-3] INFO o.v.c.q.SelectAggExecutionNode - insert result into in-memory DB: VERDICTDB_SELECTAGG7
20:43:00.619 [Thread-3] INFO o.v.c.q.SelectAggExecutionNode - before createTable
20:43:00.776 [Thread-9] INFO o.v.c.q.SelectAggExecutionNode - insert result into in-memory DB: VERDICTDB_SELECTAGG8
20:43:00.777 [Thread-9] INFO o.v.c.q.SelectAggExecutionNode - before createTable
20:43:00.787 [Thread-8] INFO o.v.c.q.SelectAggExecutionNode - insert result into in-memory DB: VERDICTDB_SELECTAGG9
20:43:00.788 [Thread-8] INFO o.v.c.q.SelectAggExecutionNode - before createTable
I suspect that H2 database could be in deadlock. Can you try the following (unless you have other ideas)?
Lastly, in pom.xml
, please change the H2 dependency to a regular scope. It seems to be in the test scope currently.
I feel like I am experiencing something similar on my local machine. Let me also work on this for count(*) queries.
Can you pull from my branch 'yongjoo' and see if it resolves the issue? I added close() lines, and the issue is gone for myself.
It looks like the latest change in your branch fixed the issue for me as well.
I have been trying to run a simple spatial aggregation query using
dyoon-feature-290
(which is up-to-date with the current master) branch via pyverdict like the following:VerdictDB seems to correctly identify its scramble and run individual queries on each verdictdb block, but it just sits there and does not proceed further to summarize the final result:
I do not think it is being stalled in each of these small queries, since they get executed in a few seconds.
Would it be possible that any of recent changes in the execution logic might have caused this?