hortonworks-spark / spark-atlas-connector

A Spark Atlas connector to track data lineage in Apache Atlas
Apache License 2.0
263 stars 149 forks source link

atlas spark-sql bugs #286

Open zhengxle opened 4 years ago

zhengxle commented 4 years ago

I want to generate spark-sql data lineage for tables, atlas version 2.0.0, spark-atlas-connector version master branch code,

but I met a bug:

the sql cmd:

./bin/spark-sql -e 'drop table if exists tmp_dj.xr_sb_monthly_amount_spark_xs3; create table tmp_dj.xr_sb_monthly_amount_spark_xs3 as select * from tmp_dj.xr_sb_monthly_amount;' and generated Kafka Message:

19/11/28 11:53:57 DEBUG KafkaNotification: Sending message for topic ATLAS_HOOK: {"version":{"version":"1.0.0","versionParts":[1]},"msgCompressionKind":"NONE","msgSplitIdx":1,"msgSplitCount":1,"msgSourceIP":"10.1.173.4","msgCreatedBy":"appweb","msgCreationTime":1574913237421,"message":{"type":"ENTITY_CREATE_V2","user":"appweb","entities":{"entities":[{"typeName":"spark_process","attributes":{"outputs":[{"typeName":"hive_table","uniqueAttributes":{"qualifiedName":"tmp_dj.xr_sb_monthly_amount_spark_xs3@primary # clusterName to use in qualifiedName of entities. Default: primary"}}],"remoteUser":"appweb","executionId":"1","qualifiedName":"application_1573631365527_17478","inputs":[{"typeName":"hive_table","uniqueAttributes":{"qualifiedName":"tmp_dj.xr_sb_monthly_amount@primary # clusterName to use in qualifiedName of entities. Default: primary"}}],"name":"SparkSQL::10.1.173.4 application_1573631365527_17478","currUser":"appweb","details":"== Parsed Logical Plan ==\n'CreateTabletmp_dj.xr_sb_monthly_amount_spark_xs3, ErrorIfExists\n+- 'Project [*]\n +- 'UnresolvedRelationtmp_dj.xr_sb_monthly_amount\n\n== Analyzed Logical Plan ==\nCreateDataSourceTableAsSelectCommandtmp_dj.xr_sb_monthly_amount_spark_xs3, ErrorIfExists, [month, repository_num, repository_amount]\n+- Project [month#6, repository_num#7, repository_amount#8]\n +- SubqueryAliastmp_dj.xr_sb_monthly_amount\n +- Relation[month#6,repository_num#7,repository_amount#8] parquet\n\n== Optimized Logical Plan ==\nCreateDataSourceTableAsSelectCommandtmp_dj.xr_sb_monthly_amount_spark_xs3, ErrorIfExists, [month, repository_num, repository_amount]\n+- Relation[month#6,repository_num#7,repository_amount#8] parquet\n\n== Physical Plan ==\nExecute CreateDataSourceTableAsSelectCommand CreateDataSourceTableAsSelectCommandtmp_dj.xr_sb_monthly_amount_spark_xs3, ErrorIfExists, [month, repository_num, repository_amount]\n+- *(1) FileScan parquet tmp_dj.xr_sb_monthly_amount[month#6,repository_num#7,repository_amount#8] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://dijin-hadoop/warehouse/data/tmp_dj.db/xr_sb_monthly_amount], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<month:string,repository_num:string,repository_amount:string>","sparkPlanDescription":"Execute CreateDataSourceTableAsSelectCommand CreateDataSourceTableAsSelectCommandtmp_dj.xr_sb_monthly_amount_spark_xs3, ErrorIfExists, [month, repository_num, repository_amount]\n+- FileScan parquet tmp_dj.xr_sb_monthly_amount[month#6,repository_num#7,repository_amount#8] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://dijin-hadoop/warehouse/data/tmp_dj.db/xr_sb_monthly_amount], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<month:string,repository_num:string,repository_amount:string>\n"},"guid":"-3534681326608803","provenanceType":0,"version":0,"proxy":false}]}}}

Atlas Server had received the messeage 2019-11-28 11:53:57,884 DEBUG - [NotificationHookConsumer thread-0:] ~ [Consumer clientId=consumer-1, groupId=atlas] Fetch READ_UNCOMMITTED at offset 687 for partition ATLAS_HOOK-0 returned fetch data (error=NONE, highWaterMark=688, lastStableOffset = 688, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=2894) (Fetcher$1:227) 2019-11-28 11:53:57,885 DEBUG - [NotificationHookConsumer thread-0:] ~ Received Message topic =ATLAS_HOOK, partition =0, offset = 687, key = null, value = {"version":{"version":"1.0.0","versionParts":[1]},"msgCompressionKind":"NONE","msgSplitIdx":1,"msgSplitCount":1,"msgSourceIP":"10.1.173.4","msgCreatedBy":"appweb","msgCreationTime":1574913237421,"message":{"type":"ENTITY_CREATE_V2","user":"appweb","entities":{"entities":[{"typeName":"spark_process","attributes":{"outputs":[{"typeName":"hive_table","uniqueAttributes":{"qualifiedName":"tmp_dj.xr_sb_monthly_amount_spark_xs3@primary # clusterName to use in qualifiedName of entities. Default: primary"}}],"remoteUser":"appweb","executionId":"1","qualifiedName":"application_1573631365527_17478","inputs":[{"typeName":"hive_table","uniqueAttributes":{"qualifiedName":"tmp_dj.xr_sb_monthly_amount@primary # clusterName to use in qualifiedName of entities. Default: primary"}}],"name":"SparkSQL::10.1.173.4 application_1573631365527_17478","currUser":"appweb","details":"== Parsed Logical Plan ==\n'CreateTabletmp_dj.xr_sb_monthly_amount_spark_xs3, ErrorIfExists\n+- 'Project [*]\n +- 'UnresolvedRelationtmp_dj.xr_sb_monthly_amount\n\n== Analyzed Logical Plan ==\nCreateDataSourceTableAsSelectCommandtmp_dj.xr_sb_monthly_amount_spark_xs3, ErrorIfExists, [month, repository_num, repository_amount]\n+- Project [month#6, repository_num#7, repository_amount#8]\n +- SubqueryAliastmp_dj.xr_sb_monthly_amount\n +- Relation[month#6,repository_num#7,repository_amount#8] parquet\n\n== Optimized Logical Plan ==\nCreateDataSourceTableAsSelectCommandtmp_dj.xr_sb_monthly_amount_spark_xs3, ErrorIfExists, [month, repository_num, repository_amount]\n+- Relation[month#6,repository_num#7,repository_amount#8] parquet\n\n== Physical Plan ==\nExecute CreateDataSourceTableAsSelectCommand CreateDataSourceTableAsSelectCommandtmp_dj.xr_sb_monthly_amount_spark_xs3, ErrorIfExists, [month, repository_num, repository_amount]\n+- *(1) FileScan parquet tmp_dj.xr_sb_monthly_amount[month#6,repository_num#7,repository_amount#8] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://dijin-hadoop/warehouse/data/tmp_dj.db/xr_sb_monthly_amount], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<month:string,repository_num:string,repository_amount:string>","sparkPlanDescription":"Execute CreateDataSourceTableAsSelectCommand CreateDataSourceTableAsSelectCommandtmp_dj.xr_sb_monthly_amount_spark_xs3, ErrorIfExists, [month, repository_num, repository_amount]\n+- FileScan parquet tmp_dj.xr_sb_monthly_amount[month#6,repository_num#7,repository_amount#8] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://dijin-hadoop/warehouse/data/tmp_dj.db/xr_sb_monthly_amount], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<month:string,repository_num:string,repository_amount:string>\n"},"guid":"-3534681326608803","provenanceType":0,"version":0,"proxy":false}]}}} (AtlasKafkaConsumer:74)

but then Atlas server threw Exception: 2019-11-28 11:53:57,893 DEBUG - [NotificationHookConsumer thread-0:] ~ PERF|createOrUpdate()|7 (AtlasPerfTracer:77) 2019-11-28 11:53:57,893 ERROR - [NotificationHookConsumer thread-0:] ~ graph rollback due to exception AtlasBaseException:Referenced entity AtlasObjectId{guid='null', typeName='hive_table', uniqueAttributes={qualifiedName:tmp_dj.xr_sb_monthly_amount_spark_xs3@primary # clusterName to use in qualifiedName of entities. Default: primary}} is not found (GraphTransactionInterceptor:166) 2019-11-28 11:53:57,893 DEBUG - [NotificationHookConsumer thread-0:] ~ Closing outer txn (GraphTransactionInterceptor:112) 2019-11-28 11:53:57,893 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> releaseLockedObjects(): lockedGuids.size: 0 (GraphTransactionInterceptor$ObjectUpdateSynchronizer:279) 2019-11-28 11:53:57,893 DEBUG - [NotificationHookConsumer thread-0:] ~ <== releaseLockedObjects(): lockedGuids.size: 0 (GraphTransactionInterceptor$ObjectUpdateSynchronizer:289) 2019-11-28 11:53:57,894 WARN - [NotificationHookConsumer thread-0:] ~ Error handling message (NotificationHookConsumer$HookConsumer:683) org.apache.atlas.exception.AtlasBaseException: Referenced entity AtlasObjectId{guid='null', typeName='hive_table', uniqueAttributes={qualifiedName:tmp_dj.xr_sb_monthly_amount_spark_xs3@primary # clusterName to use in qualifiedName of entities. Default: primary}} is not found at org.apache.atlas.repository.store.graph.v2.UniqAttrBasedEntityResolver.resolveEntityReferences(UniqAttrBasedEntityResolver.java:68) at org.apache.atlas.repository.store.graph.v2.AtlasEntityGraphDiscoveryV2.resolveReferences(AtlasEntityGraphDiscoveryV2.java:182) at org.apache.atlas.repository.store.graph.v2.AtlasEntityGraphDiscoveryV2.discoverEntities(AtlasEntityGraphDiscoveryV2.java:75) at org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2.preCreateOrUpdate(AtlasEntityStoreV2.java:879) at org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2.createOrUpdate(AtlasEntityStoreV2.java:744) at org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2.createOrUpdate(AtlasEntityStoreV2.java:275) at org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2$$FastClassBySpringCGLIB$$6861dca9.invoke(<generated>) at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:736) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) at org.apache.atlas.GraphTransactionInterceptor.invoke(GraphTransactionInterceptor.java:80) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:671) at org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2$$EnhancerBySpringCGLIB$$de625803.createOrUpdate(<generated>) at org.apache.atlas.notification.NotificationHookConsumer$HookConsumer.createOrUpdate(NotificationHookConsumer.java:735) at org.apache.atlas.notification.NotificationHookConsumer$HookConsumer.handleMessage(NotificationHookConsumer.java:602) at org.apache.atlas.notification.NotificationHookConsumer$HookConsumer.doWork(NotificationHookConsumer.java:455) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

the atlas stores wares(solr, janusGraph, HBase) received nothing.

zhengxle commented 4 years ago

use the newest atlas master origin source code, I got it: spark-sql task can built graph

ooocici commented 4 years ago

hi, i met this bug too. how to fix it?