datahub-project / datahub

The Metadata Platform for your Data Stack
https://datahubproject.io
Apache License 2.0
9.64k stars 2.85k forks source link

Correctly Identify AWS Glue as the Metastore when running Spark on Amazon EMR #4399

Open garystafford opened 2 years ago

garystafford commented 2 years ago

Describe the bug When using Apache Spark on Amazon EMR, you have the ability to replace Apache Hive with AWS Glue as the Hive-compatible metastore, with the underlying data being stored in S3. When doing so, the lineage metadata sent to DataHub incorrectly identifies the data catalog tables as Apache Hive Datasets and not AWS Glue Datasets. Additionally, the tables do not contain the schema, nor do they have an upstream link to the underlying data in Amazon S3.

To Reproduce Steps to reproduce the behavior:

  1. Create a PySpark job that reads data from one or more AWS Glue Data Catalog tables, transforms the data, and writes back to an AWS Glue Data Catalog table. Create the SparkSession configuration as follows, updating it to your own DataHub REST endpoint:
    spark = SparkSession \
    .builder \
    .appName("Spark Glue DataHub Test App") \
    .config("hive.metastore.client.factory.class",
            "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
    .config("spark.jars.packages", "io.acryl:datahub-spark-lineage:0.8.27") \
    .config("spark.extraListeners", "datahub.spark.DatahubSparkListener") \
    .config("spark.datahub.rest.server", "http://111.222.333.444:55555") \
    .enableHiveSupport() \
    .getOrCreate()
  2. Run the PySpark job on Amazon EMR; I am using EMR v6.5.0 (Spark v3.1.2)

Expected behavior I would expect that when using Apache Spark on Amazon EMR and substituting Apache Hive with AWS Glue as the metastore with the underlying data is stored in S3, the data catalog tables (Datasets) identified as part of the lineage graph are of type AWS Glue. Additionally, it would be a bonus if they contained the schema and an upstream link to the underlying Amazon S3 data associated with the Glue table.

Screenshots

Screen Shot 2022-03-13 at 4 33 25 PM

Desktop (please complete the following information):

Additional context None.

maggiehays commented 2 years ago

@MugdhaHardikar-GSLab please take a look at this one! @treff7es, hoping you can help with review.

MugdhaHardikar-GSLab commented 2 years ago

@garystafford Can you please get the debug level enabled logs with latest version? To enable debugging logs, add below configuration in log4j.properties file log4j.logger.datahub.spark=DEBUG log4j.logger.datahub.client.rest=DEBUG

garystafford commented 2 years ago

@MugdhaHardikar-GSLab logs below as requested. Just to reiterate, the Spark job executes just fine on EMR and metadata is emitted to DataHub. The issue is that DataHub thinks the source and target datasets are Apache Hive and not AWS Glue (see diagram). Apache Hive is not installed on this EMR Cluster. For example, the new target dataset (sinusitis_2022_03_22_test) is created as a table in AWS Glue Data Catalog as expected.

22/03/22 13:37:18 INFO SparkContext: Running Spark version 3.1.2-amzn-1
22/03/22 13:37:18 INFO ResourceUtils: ==============================================================
22/03/22 13:37:18 INFO ResourceUtils: No custom resources configured for spark.driver.
22/03/22 13:37:18 INFO ResourceUtils: ==============================================================
22/03/22 13:37:18 INFO SparkContext: Submitted application: sinusitis_2022_03_22_test
22/03/22 13:37:18 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 4, script: , vendor: , memory -> name: memory, amount: 18971, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
22/03/22 13:37:18 INFO ResourceProfile: Limiting resource is cpus at 4 tasks per executor
22/03/22 13:37:18 INFO ResourceProfileManager: Added ResourceProfile id: 0
22/03/22 13:37:18 INFO SecurityManager: Changing view acls to: hadoop
22/03/22 13:37:18 INFO SecurityManager: Changing modify acls to: hadoop
22/03/22 13:37:18 INFO SecurityManager: Changing view acls groups to:
22/03/22 13:37:18 INFO SecurityManager: Changing modify acls groups to:
22/03/22 13:37:18 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(hadoop); groups with view permissions: Set(); users  with modify permissions: Set(hadoop); groups with modify permissions: Set()
22/03/22 13:37:18 INFO Utils: Successfully started service 'sparkDriver' on port 45749.
22/03/22 13:37:18 INFO SparkEnv: Registering MapOutputTracker
22/03/22 13:37:18 INFO SparkEnv: Registering BlockManagerMaster
22/03/22 13:37:18 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
22/03/22 13:37:18 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
22/03/22 13:37:18 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
22/03/22 13:37:18 INFO DiskBlockManager: Created local directory at /mnt/tmp/blockmgr-d9a55e1d-d6f5-4feb-afd0-a0770dacf997
22/03/22 13:37:19 INFO MemoryStore: MemoryStore started with capacity 912.3 MiB
22/03/22 13:37:19 INFO SparkEnv: Registering OutputCommitCoordinator
22/03/22 13:37:19 INFO log: Logging initialized @3211ms to org.sparkproject.jetty.util.log.Slf4jLog
22/03/22 13:37:19 INFO Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_322-b06
22/03/22 13:37:19 INFO Server: Started @3335ms
22/03/22 13:37:19 INFO AbstractConnector: Started ServerConnector@1c5f6e66{HTTP/1.1, (http/1.1)}{0.0.0.0:4040}
22/03/22 13:37:19 INFO Utils: Successfully started service 'SparkUI' on port 4040.
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@6f5b4afe{/jobs,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@7119f392{/jobs/json,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@28363662{/jobs/job,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@7b7e1f04{/jobs/job/json,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@3692a1b2{/stages,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@8ed20a2{/stages/json,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@5c5da024{/stages/stage,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@7c0d06d2{/stages/stage/json,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@5334bd58{/stages/pool,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@c2e6423{/stages/pool/json,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@5df612bb{/storage,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@34c2c3b9{/storage/json,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@7eb14f5{/storage/rdd,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@7b361c2a{/storage/rdd/json,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@6381773f{/environment,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2fcc7159{/environment/json,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@6f21663a{/executors,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@402d2ec1{/executors/json,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2d2a92f6{/executors/threadDump,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@36c8b31e{/executors/threadDump/json,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@7780ba3f{/static,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@14e6c763{/,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@247b9c2{/api,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@17501472{/jobs/job/kill,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@7b0b62cc{/stages/stage/kill,null,AVAILABLE,@Spark}
22/03/22 13:37:19 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://ip-192-168-23-58.ec2.internal:4040
22/03/22 13:37:19 INFO Utils: Using 50 preallocated executors (minExecutors: 0). Set spark.dynamicAllocation.preallocateExecutors to `false` disable executor preallocation.
22/03/22 13:37:19 INFO RMProxy: Connecting to ResourceManager at ip-192-168-23-58.ec2.internal/192.168.23.58:8032
22/03/22 13:37:19 INFO Client: Requesting a new application from cluster with 2 NodeManagers
22/03/22 13:37:20 INFO Configuration: resource-types.xml not found
22/03/22 13:37:20 INFO ResourceUtils: Unable to find 'resource-types.xml'.
22/03/22 13:37:20 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (24576 MB per container)
22/03/22 13:37:20 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
22/03/22 13:37:20 INFO Client: Setting up container launch context for our AM
22/03/22 13:37:20 INFO Client: Setting up the launch environment for our AM container
22/03/22 13:37:20 INFO Client: Preparing resources for our AM container
22/03/22 13:37:20 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
22/03/22 13:37:25 INFO Client: Uploading resource file:/mnt/tmp/spark-2800aac5-999f-49f3-b344-c0aa55bb368b/__spark_libs__1664476559231582996.zip -> hdfs://ip-192-168-23-58.ec2.internal:8020/user/hadoop/.sparkStaging/application_1647953757276_0006/__spark_libs__1664476559231582996.zip
22/03/22 13:37:27 INFO Client: Uploading resource file:/usr/lib/spark/python/lib/pyspark.zip -> hdfs://ip-192-168-23-58.ec2.internal:8020/user/hadoop/.sparkStaging/application_1647953757276_0006/pyspark.zip
22/03/22 13:37:27 INFO Client: Uploading resource file:/usr/lib/spark/python/lib/py4j-0.10.9-src.zip -> hdfs://ip-192-168-23-58.ec2.internal:8020/user/hadoop/.sparkStaging/application_1647953757276_0006/py4j-0.10.9-src.zip
22/03/22 13:37:27 INFO Client: Uploading resource file:/mnt/tmp/spark-2800aac5-999f-49f3-b344-c0aa55bb368b/__spark_conf__5545734210753318074.zip -> hdfs://ip-192-168-23-58.ec2.internal:8020/user/hadoop/.sparkStaging/application_1647953757276_0006/__spark_conf__.zip
22/03/22 13:37:27 INFO SecurityManager: Changing view acls to: hadoop
22/03/22 13:37:27 INFO SecurityManager: Changing modify acls to: hadoop
22/03/22 13:37:27 INFO SecurityManager: Changing view acls groups to:
22/03/22 13:37:27 INFO SecurityManager: Changing modify acls groups to:
22/03/22 13:37:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(hadoop); groups with view permissions: Set(); users  with modify permissions: Set(hadoop); groups with modify permissions: Set()
22/03/22 13:37:27 INFO Client: Submitting application application_1647953757276_0006 to ResourceManager
22/03/22 13:37:27 INFO YarnClientImpl: Submitted application application_1647953757276_0006
22/03/22 13:37:28 INFO Client: Application report for application_1647953757276_0006 (state: ACCEPTED)
22/03/22 13:37:28 INFO Client:
     client token: N/A
     diagnostics: AM container is launched, waiting for AM container to Register with RM
     ApplicationMaster host: N/A
     ApplicationMaster RPC port: -1
     queue: default
     start time: 1647956247482
     final status: UNDEFINED
     tracking URL: http://ip-192-168-23-58.ec2.internal:20888/proxy/application_1647953757276_0006/
     user: hadoop
22/03/22 13:37:29 INFO Client: Application report for application_1647953757276_0006 (state: ACCEPTED)
22/03/22 13:37:30 INFO Client: Application report for application_1647953757276_0006 (state: ACCEPTED)
22/03/22 13:37:31 INFO Client: Application report for application_1647953757276_0006 (state: ACCEPTED)
22/03/22 13:37:32 INFO Client: Application report for application_1647953757276_0006 (state: ACCEPTED)
22/03/22 13:37:33 INFO YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> ip-192-168-23-58.ec2.internal, PROXY_URI_BASES -> http://ip-192-168-23-58.ec2.internal:20888/proxy/application_1647953757276_0006), /proxy/application_1647953757276_0006
22/03/22 13:37:33 INFO Client: Application report for application_1647953757276_0006 (state: RUNNING)
22/03/22 13:37:33 INFO Client:
     client token: N/A
     diagnostics: N/A
     ApplicationMaster host: 192.168.12.253
     ApplicationMaster RPC port: -1
     queue: default
     start time: 1647956247482
     final status: UNDEFINED
     tracking URL: http://ip-192-168-23-58.ec2.internal:20888/proxy/application_1647953757276_0006/
     user: hadoop
22/03/22 13:37:33 INFO YarnClientSchedulerBackend: Application application_1647953757276_0006 has started running.
22/03/22 13:37:33 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 42803.
22/03/22 13:37:33 INFO NettyBlockTransferService: Server created on ip-192-168-23-58.ec2.internal:42803
22/03/22 13:37:33 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
22/03/22 13:37:33 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, ip-192-168-23-58.ec2.internal, 42803, None)
22/03/22 13:37:33 INFO BlockManagerMasterEndpoint: Registering block manager ip-192-168-23-58.ec2.internal:42803 with 912.3 MiB RAM, BlockManagerId(driver, ip-192-168-23-58.ec2.internal, 42803, None)
22/03/22 13:37:33 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, ip-192-168-23-58.ec2.internal, 42803, None)
22/03/22 13:37:33 INFO BlockManager: external shuffle service port = 7337
22/03/22 13:37:33 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, ip-192-168-23-58.ec2.internal, 42803, None)
22/03/22 13:37:33 INFO ServerInfo: Adding filter to /metrics/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/03/22 13:37:33 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@77f91454{/metrics/json,null,AVAILABLE,@Spark}
22/03/22 13:37:33 INFO SingleEventLogFileWriter: Logging events to hdfs:/var/log/spark/apps/application_1647953757276_0006.inprogress
22/03/22 13:37:34 INFO Utils: Using 50 preallocated executors (minExecutors: 0). Set spark.dynamicAllocation.preallocateExecutors to `false` disable executor preallocation.
22/03/22 13:37:34 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
22/03/22 13:37:34 INFO DatahubSparkListener: DatahubSparkListener initialised.
22/03/22 13:37:34 INFO SparkContext: Registered listener datahub.spark.DatahubSparkListener
22/03/22 13:37:34 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
22/03/22 13:37:34 INFO DatahubSparkListener: Application started: SparkListenerApplicationStart(sinusitis_2022_03_22_test,Some(application_1647953757276_0006),1647956238324,hadoop,None,None,None)
22/03/22 13:37:34 INFO McpEmitter: REST Emitter Configuration: GMS url http://192.168.18.96:31000
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
22/03/22 13:37:34 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(spark-client://YarnAM)
22/03/22 13:37:34 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('hdfs:///user/spark/warehouse').
22/03/22 13:37:34 INFO SharedState: Warehouse path is 'hdfs:///user/spark/warehouse'.
22/03/22 13:37:34 INFO ServerInfo: Adding filter to /SQL: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/03/22 13:37:34 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@ea50364{/SQL,null,AVAILABLE,@Spark}
22/03/22 13:37:34 INFO ServerInfo: Adding filter to /SQL/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/03/22 13:37:34 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@7495557e{/SQL/json,null,AVAILABLE,@Spark}
22/03/22 13:37:34 INFO ServerInfo: Adding filter to /SQL/execution: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/03/22 13:37:34 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@72ec685e{/SQL/execution,null,AVAILABLE,@Spark}
22/03/22 13:37:34 INFO ServerInfo: Adding filter to /SQL/execution/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/03/22 13:37:34 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@892a893{/SQL/execution/json,null,AVAILABLE,@Spark}
22/03/22 13:37:34 INFO ServerInfo: Adding filter to /static/sql: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/03/22 13:37:34 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@62181a81{/static/sql,null,AVAILABLE,@Spark}
22/03/22 13:37:34 DEBUG McpEmitter: emitting mcpw: MetadataChangeProposalWrapper(entityType=dataFlow, entityUrn=urn:li:dataFlow:(spark,sinusitis_2022_03_22_test,yarn), changeType=UPSERT, aspect={name=sinusitis_2022_03_22_test, customProperties={appName=sinusitis_2022_03_22_test, appId=application_1647953757276_0006, startedAt=2022-03-22T13:37:18.324Z, sparkUser=hadoop}}, aspectName=dataFlowInfo)
22/03/22 13:37:34 INFO McpEmitter: MetadataWriteResponse(success=true, responseContent={"value":"urn:li:dataFlow:(spark,sinusitis_2022_03_22_test,yarn)"}, underlyingResponse=HTTP/1.1 200 OK [Date: Tue, 22 Mar 2022 13:37:34 GMT, Content-Type: application/json, X-RestLi-Protocol-Version: 2.0.0, Content-Length: 66, Server: Jetty(9.4.20.v20190813)] [Content-Length: 66,Chunked: false])
22/03/22 13:37:37 DEBUG DatahubSparkListener: SQL Exec start event with id 0
22/03/22 13:37:37 DEBUG DatahubSparkListener: PLAN for execution id: sinusitis_2022_03_22_test:0
22/03/22 13:37:37 WARN SQLConf: SQL configurations from Hive module is not loaded
22/03/22 13:37:37 DEBUG DatahubSparkListener: SetCatalogAndNamespace org.apache.spark.sql.connector.catalog.CatalogManager@9ed326d, spark_catalog, ArrayBuffer(synthea_patient_big_data)
22/03/22 13:37:37 DEBUG DatahubSparkListener: Skipping execution as no output dataset present for execution id: sinusitis_2022_03_22_test:0
22/03/22 13:37:37 INFO HiveConf: Found configuration file file:/etc/spark/conf.dist/hive-site.xml
22/03/22 13:37:37 INFO HiveUtils: Initializing HiveMetastoreConnection version 2.3.7-amzn-4 using Spark classes.
22/03/22 13:37:37 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (192.168.12.253:34682) with ID 2,  ResourceProfileId 0
22/03/22 13:37:37 INFO ExecutorMonitor: New executor 2 has registered (new total is 1)
22/03/22 13:37:37 INFO HiveConf: Found configuration file file:/etc/spark/conf.dist/hive-site.xml
22/03/22 13:37:37 INFO BlockManagerMasterEndpoint: Registering block manager ip-192-168-12-253.ec2.internal:43095 with 9.7 GiB RAM, BlockManagerId(2, ip-192-168-12-253.ec2.internal, 43095, None)
22/03/22 13:37:37 INFO SessionState: Created HDFS directory: /tmp/hive/hadoop/fed3380f-cf63-41fe-ab91-99ef9f02796f
22/03/22 13:37:37 INFO SessionState: Created local directory: /tmp/hadoop/fed3380f-cf63-41fe-ab91-99ef9f02796f
22/03/22 13:37:37 INFO SessionState: Created HDFS directory: /tmp/hive/hadoop/fed3380f-cf63-41fe-ab91-99ef9f02796f/_tmp_space.db
22/03/22 13:37:37 INFO HiveClientImpl: Warehouse location for Hive client (version 2.3.7) is hdfs:///user/spark/warehouse
22/03/22 13:37:38 INFO AWSGlueClientFactory: Using region from ec2 metadata : us-east-1
22/03/22 13:37:38 WARN CredentialsLegacyConfigLocationProvider: Found the legacy config profiles file at [/home/hadoop/.aws/config]. Please move it to the latest default location [~/.aws/credentials].
22/03/22 13:37:39 DEBUG DatahubSparkListener: SQL Exec end event with id 0
22/03/22 13:37:40 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (192.168.20.114:60404) with ID 1,  ResourceProfileId 0
22/03/22 13:37:40 INFO ExecutorMonitor: New executor 1 has registered (new total is 2)
22/03/22 13:37:41 INFO BlockManagerMasterEndpoint: Registering block manager ip-192-168-20-114.ec2.internal:38173 with 9.7 GiB RAM, BlockManagerId(1, ip-192-168-20-114.ec2.internal, 38173, None)
22/03/22 13:37:41 DEBUG DatahubSparkListener: SQL Exec start event with id 1
22/03/22 13:37:41 DEBUG DatahubSparkListener: PLAN for execution id: sinusitis_2022_03_22_test:1

22/03/22 13:37:41 DEBUG DatahubSparkListener: CreateDataSourceTableAsSelectCommand `sinusitis_2022_03_22_test`, Overwrite, [patient, code, description, condition_age, marital, race, ethnicity, gender]
+- Repartition 1, false
   +- Aggregate [patient#3, code#5L, description#6, condition_age#0, marital#18, race#19, ethnicity#20, gender#21], [patient#3, code#5L, description#6, condition_age#0, marital#18, race#19, ethnicity#20, gender#21]
      +- Project [patient#3, code#5L, description#6, datediff(cast(substr(start#1, 1, 10) as date), cast(substr(cast(birthdate#8 as string), 1, 10) as date)) AS condition_age#0, marital#18, race#19, ethnicity#20, gender#21]
         +- Join Inner, (patient#3 = id#7)
            :- Project [start#1, patient#3, code#5L, description#6]
            :  +- Filter ((isnotnull(description#6) AND Contains(description#6, sinusitis)) AND isnotnull(patient#3))
            :     +- HiveTableRelation [`synthea_patient_big_data`.`conditions`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [start#1, stop#2, patient#3, encounter#4, code#5L, description#6], Partition Cols: []]
            +- Project [id#7, birthdate#8, marital#18, race#19, ethnicity#20, gender#21]
               +- Filter ((((((((isnotnull(gender#21) AND isnotnull(ethnicity#20)) AND isnotnull(race#19)) AND isnotnull(marital#18)) AND NOT (gender#21 = )) AND NOT (ethnicity#20 = )) AND NOT (race#19 = )) AND NOT (marital#18 = )) AND isnotnull(id#7))
                  +- HiveTableRelation [`synthea_patient_big_data`.`patients`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, birthdate#8, deathdate#9, ssn#10, drivers#11, passport#12, prefix#13, first#14, last#15, s..., Partition Cols: []]

22/03/22 13:37:41 DEBUG DatahubSparkListener: CHILD class org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
CreateDataSourceTableAsSelectCommand `sinusitis_2022_03_22_test`, Overwrite, [patient, code, description, condition_age, marital, race, ethnicity, gender]
+- Repartition 1, false
   +- Aggregate [patient#3, code#5L, description#6, condition_age#0, marital#18, race#19, ethnicity#20, gender#21], [patient#3, code#5L, description#6, condition_age#0, marital#18, race#19, ethnicity#20, gender#21]
      +- Project [patient#3, code#5L, description#6, datediff(cast(substr(start#1, 1, 10) as date), cast(substr(cast(birthdate#8 as string), 1, 10) as date)) AS condition_age#0, marital#18, race#19, ethnicity#20, gender#21]
         +- Join Inner, (patient#3 = id#7)
            :- Project [start#1, patient#3, code#5L, description#6]
            :  +- Filter ((isnotnull(description#6) AND Contains(description#6, sinusitis)) AND isnotnull(patient#3))
            :     +- HiveTableRelation [`synthea_patient_big_data`.`conditions`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [start#1, stop#2, patient#3, encounter#4, code#5L, description#6], Partition Cols: []]
            +- Project [id#7, birthdate#8, marital#18, race#19, ethnicity#20, gender#21]
               +- Filter ((((((((isnotnull(gender#21) AND isnotnull(ethnicity#20)) AND isnotnull(race#19)) AND isnotnull(marital#18)) AND NOT (gender#21 = )) AND NOT (ethnicity#20 = )) AND NOT (race#19 = )) AND NOT (marital#18 = )) AND isnotnull(id#7))
                  +- HiveTableRelation [`synthea_patient_big_data`.`patients`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, birthdate#8, deathdate#9, ssn#10, drivers#11, passport#12, prefix#13, first#14, last#15, s..., Partition Cols: []]

-------------

22/03/22 13:37:41 DEBUG DatahubSparkListener: CHILD class org.apache.spark.sql.catalyst.plans.logical.Repartition
Repartition 1, false
+- Aggregate [patient#3, code#5L, description#6, condition_age#0, marital#18, race#19, ethnicity#20, gender#21], [patient#3, code#5L, description#6, condition_age#0, marital#18, race#19, ethnicity#20, gender#21]
   +- Project [patient#3, code#5L, description#6, datediff(cast(substr(start#1, 1, 10) as date), cast(substr(cast(birthdate#8 as string), 1, 10) as date)) AS condition_age#0, marital#18, race#19, ethnicity#20, gender#21]
      +- Join Inner, (patient#3 = id#7)
         :- Project [start#1, patient#3, code#5L, description#6]
         :  +- Filter ((isnotnull(description#6) AND Contains(description#6, sinusitis)) AND isnotnull(patient#3))
         :     +- HiveTableRelation [`synthea_patient_big_data`.`conditions`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [start#1, stop#2, patient#3, encounter#4, code#5L, description#6], Partition Cols: []]
         +- Project [id#7, birthdate#8, marital#18, race#19, ethnicity#20, gender#21]
            +- Filter ((((((((isnotnull(gender#21) AND isnotnull(ethnicity#20)) AND isnotnull(race#19)) AND isnotnull(marital#18)) AND NOT (gender#21 = )) AND NOT (ethnicity#20 = )) AND NOT (race#19 = )) AND NOT (marital#18 = )) AND isnotnull(id#7))
               +- HiveTableRelation [`synthea_patient_big_data`.`patients`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, birthdate#8, deathdate#9, ssn#10, drivers#11, passport#12, prefix#13, first#14, last#15, s..., Partition Cols: []]

-------------

22/03/22 13:37:41 DEBUG DatahubSparkListener: CHILD class org.apache.spark.sql.catalyst.plans.logical.Aggregate
Aggregate [patient#3, code#5L, description#6, condition_age#0, marital#18, race#19, ethnicity#20, gender#21], [patient#3, code#5L, description#6, condition_age#0, marital#18, race#19, ethnicity#20, gender#21]
+- Project [patient#3, code#5L, description#6, datediff(cast(substr(start#1, 1, 10) as date), cast(substr(cast(birthdate#8 as string), 1, 10) as date)) AS condition_age#0, marital#18, race#19, ethnicity#20, gender#21]
   +- Join Inner, (patient#3 = id#7)
      :- Project [start#1, patient#3, code#5L, description#6]
      :  +- Filter ((isnotnull(description#6) AND Contains(description#6, sinusitis)) AND isnotnull(patient#3))
      :     +- HiveTableRelation [`synthea_patient_big_data`.`conditions`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [start#1, stop#2, patient#3, encounter#4, code#5L, description#6], Partition Cols: []]
      +- Project [id#7, birthdate#8, marital#18, race#19, ethnicity#20, gender#21]
         +- Filter ((((((((isnotnull(gender#21) AND isnotnull(ethnicity#20)) AND isnotnull(race#19)) AND isnotnull(marital#18)) AND NOT (gender#21 = )) AND NOT (ethnicity#20 = )) AND NOT (race#19 = )) AND NOT (marital#18 = )) AND isnotnull(id#7))
            +- HiveTableRelation [`synthea_patient_big_data`.`patients`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, birthdate#8, deathdate#9, ssn#10, drivers#11, passport#12, prefix#13, first#14, last#15, s..., Partition Cols: []]

-------------

22/03/22 13:37:41 DEBUG DatahubSparkListener: CHILD class org.apache.spark.sql.catalyst.plans.logical.Project
Project [patient#3, code#5L, description#6, datediff(cast(substr(start#1, 1, 10) as date), cast(substr(cast(birthdate#8 as string), 1, 10) as date)) AS condition_age#0, marital#18, race#19, ethnicity#20, gender#21]
+- Join Inner, (patient#3 = id#7)
   :- Project [start#1, patient#3, code#5L, description#6]
   :  +- Filter ((isnotnull(description#6) AND Contains(description#6, sinusitis)) AND isnotnull(patient#3))
   :     +- HiveTableRelation [`synthea_patient_big_data`.`conditions`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [start#1, stop#2, patient#3, encounter#4, code#5L, description#6], Partition Cols: []]
   +- Project [id#7, birthdate#8, marital#18, race#19, ethnicity#20, gender#21]
      +- Filter ((((((((isnotnull(gender#21) AND isnotnull(ethnicity#20)) AND isnotnull(race#19)) AND isnotnull(marital#18)) AND NOT (gender#21 = )) AND NOT (ethnicity#20 = )) AND NOT (race#19 = )) AND NOT (marital#18 = )) AND isnotnull(id#7))
         +- HiveTableRelation [`synthea_patient_big_data`.`patients`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, birthdate#8, deathdate#9, ssn#10, drivers#11, passport#12, prefix#13, first#14, last#15, s..., Partition Cols: []]

-------------

22/03/22 13:37:41 DEBUG DatahubSparkListener: CHILD class org.apache.spark.sql.catalyst.plans.logical.Join
Join Inner, (patient#3 = id#7)
:- Project [start#1, patient#3, code#5L, description#6]
:  +- Filter ((isnotnull(description#6) AND Contains(description#6, sinusitis)) AND isnotnull(patient#3))
:     +- HiveTableRelation [`synthea_patient_big_data`.`conditions`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [start#1, stop#2, patient#3, encounter#4, code#5L, description#6], Partition Cols: []]
+- Project [id#7, birthdate#8, marital#18, race#19, ethnicity#20, gender#21]
   +- Filter ((((((((isnotnull(gender#21) AND isnotnull(ethnicity#20)) AND isnotnull(race#19)) AND isnotnull(marital#18)) AND NOT (gender#21 = )) AND NOT (ethnicity#20 = )) AND NOT (race#19 = )) AND NOT (marital#18 = )) AND isnotnull(id#7))
      +- HiveTableRelation [`synthea_patient_big_data`.`patients`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, birthdate#8, deathdate#9, ssn#10, drivers#11, passport#12, prefix#13, first#14, last#15, s..., Partition Cols: []]

-------------

22/03/22 13:37:41 DEBUG DatahubSparkListener: CHILD class org.apache.spark.sql.catalyst.plans.logical.Project
Project [start#1, patient#3, code#5L, description#6]
+- Filter ((isnotnull(description#6) AND Contains(description#6, sinusitis)) AND isnotnull(patient#3))
   +- HiveTableRelation [`synthea_patient_big_data`.`conditions`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [start#1, stop#2, patient#3, encounter#4, code#5L, description#6], Partition Cols: []]

-------------

22/03/22 13:37:41 DEBUG DatahubSparkListener: CHILD class org.apache.spark.sql.catalyst.plans.logical.Filter
Filter ((isnotnull(description#6) AND Contains(description#6, sinusitis)) AND isnotnull(patient#3))
+- HiveTableRelation [`synthea_patient_big_data`.`conditions`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [start#1, stop#2, patient#3, encounter#4, code#5L, description#6], Partition Cols: []]

-------------

22/03/22 13:37:41 DEBUG DatahubSparkListener: CHILD class org.apache.spark.sql.catalyst.catalog.HiveTableRelation
HiveTableRelation [`synthea_patient_big_data`.`conditions`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [start#1, stop#2, patient#3, encounter#4, code#5L, description#6], Partition Cols: []]

-------------

22/03/22 13:37:41 DEBUG DatahubSparkListener: CHILD class org.apache.spark.sql.catalyst.plans.logical.Project
Project [id#7, birthdate#8, marital#18, race#19, ethnicity#20, gender#21]
+- Filter ((((((((isnotnull(gender#21) AND isnotnull(ethnicity#20)) AND isnotnull(race#19)) AND isnotnull(marital#18)) AND NOT (gender#21 = )) AND NOT (ethnicity#20 = )) AND NOT (race#19 = )) AND NOT (marital#18 = )) AND isnotnull(id#7))
   +- HiveTableRelation [`synthea_patient_big_data`.`patients`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, birthdate#8, deathdate#9, ssn#10, drivers#11, passport#12, prefix#13, first#14, last#15, s..., Partition Cols: []]

-------------

22/03/22 13:37:41 DEBUG DatahubSparkListener: CHILD class org.apache.spark.sql.catalyst.plans.logical.Filter
Filter ((((((((isnotnull(gender#21) AND isnotnull(ethnicity#20)) AND isnotnull(race#19)) AND isnotnull(marital#18)) AND NOT (gender#21 = )) AND NOT (ethnicity#20 = )) AND NOT (race#19 = )) AND NOT (marital#18 = )) AND isnotnull(id#7))
+- HiveTableRelation [`synthea_patient_big_data`.`patients`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, birthdate#8, deathdate#9, ssn#10, drivers#11, passport#12, prefix#13, first#14, last#15, s..., Partition Cols: []]

-------------

22/03/22 13:37:41 DEBUG DatahubSparkListener: CHILD class org.apache.spark.sql.catalyst.catalog.HiveTableRelation
HiveTableRelation [`synthea_patient_big_data`.`patients`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, birthdate#8, deathdate#9, ssn#10, drivers#11, passport#12, prefix#13, first#14, last#15, s..., Partition Cols: []]

-------------

22/03/22 13:37:41 DEBUG McpEmitter: emitting mcpw: MetadataChangeProposalWrapper(entityType=dataJob, entityUrn=urn:li:dataJob:(urn:li:dataFlow:(spark,sinusitis_2022_03_22_test,yarn),QueryExecId_1), changeType=UPSERT, aspect={inputDatasets=[urn:li:dataset:(urn:li:dataPlatform:hive,synthea_patient_big_data.conditions,PROD), urn:li:dataset:(urn:li:dataPlatform:hive,synthea_patient_big_data.patients,PROD)], outputDatasets=[urn:li:dataset:(urn:li:dataPlatform:hive,sinusitis_2022_03_22_test,PROD)]}, aspectName=dataJobInputOutput)
22/03/22 13:37:41 DEBUG McpEmitter: emitting mcpw: MetadataChangeProposalWrapper(entityType=dataJob, entityUrn=urn:li:dataJob:(urn:li:dataFlow:(spark,sinusitis_2022_03_22_test,yarn),QueryExecId_1), changeType=UPSERT, aspect={customProperties={SQLQueryId=1, appName=sinusitis_2022_03_22_test, appId=application_1647953757276_0006, startedAt=2022-03-22T13:37:41.911Z, description=saveAsTable at NativeMethodAccessorImpl.java:0, queryPlan=CreateDataSourceTableAsSelectCommand `sinusitis_2022_03_22_test`, Overwrite, [patient, code, description, condition_age, marital, race, ethnicity, gender]
+- Repartition 1, false
   +- Aggregate [patient#3, code#5L, description#6, condition_age#0, marital#18, race#19, ethnicity#20, gender#21], [patient#3, code#5L, description#6, condition_age#0, marital#18, race#19, ethnicity#20, gender#21]
      +- Project [patient#3, code#5L, description#6, datediff(cast(substr(start#1, 1, 10) as date), cast(substr(cast(birthdate#8 as string), 1, 10) as date)) AS condition_age#0, marital#18, race#19, ethnicity#20, gender#21]
         +- Join Inner, (patient#3 = id#7)
            :- Project [start#1, patient#3, code#5L, description#6]
            :  +- Filter ((isnotnull(description#6) AND Contains(description#6, sinusitis)) AND isnotnull(patient#3))
            :     +- HiveTableRelation [`synthea_patient_big_data`.`conditions`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [start#1, stop#2, patient#3, encounter#4, code#5L, description#6], Partition Cols: []]
            +- Project [id#7, birthdate#8, marital#18, race#19, ethnicity#20, gender#21]
               +- Filter ((((((((isnotnull(gender#21) AND isnotnull(ethnicity#20)) AND isnotnull(race#19)) AND isnotnull(marital#18)) AND NOT (gender#21 = )) AND NOT (ethnicity#20 = )) AND NOT (race#19 = )) AND NOT (marital#18 = )) AND isnotnull(id#7))
                  +- HiveTableRelation [`synthea_patient_big_data`.`patients`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, birthdate#8, deathdate#9, ssn#10, drivers#11, passport#12, prefix#13, first#14, last#15, s..., Partition Cols: []]
}, name=saveAsTable at NativeMethodAccessorImpl.java:0, type={string=sparkJob}, status=IN_PROGRESS}, aspectName=dataJobInfo)
22/03/22 13:37:42 INFO McpEmitter: MetadataWriteResponse(success=true, responseContent={"value":"urn:li:dataJob:(urn:li:dataFlow:(spark,sinusitis_2022_03_22_test,yarn),QueryExecId_1)"}, underlyingResponse=HTTP/1.1 200 OK [Date: Tue, 22 Mar 2022 13:37:41 GMT, Content-Type: application/json, X-RestLi-Protocol-Version: 2.0.0, Content-Length: 97, Server: Jetty(9.4.20.v20190813)] [Content-Length: 97,Chunked: false])
22/03/22 13:37:42 INFO McpEmitter: MetadataWriteResponse(success=true, responseContent={"value":"urn:li:dataJob:(urn:li:dataFlow:(spark,sinusitis_2022_03_22_test,yarn),QueryExecId_1)"}, underlyingResponse=HTTP/1.1 200 OK [Date: Tue, 22 Mar 2022 13:37:41 GMT, Content-Type: application/json, X-RestLi-Protocol-Version: 2.0.0, Content-Length: 97, Server: Jetty(9.4.20.v20190813)] [Content-Length: 97,Chunked: false])
22/03/22 13:37:42 DEBUG DatahubSparkListener: LINEAGE
DatasetLineage(sources=[CatalogTableDataset(urn=urn:li:dataset:(urn:li:dataPlatform:hive,synthea_patient_big_data.conditions,PROD)), CatalogTableDataset(urn=urn:li:dataset:(urn:li:dataPlatform:hive,synthea_patient_big_data.patients,PROD))], callSiteShort=saveAsTable at NativeMethodAccessorImpl.java:0, plan=CreateDataSourceTableAsSelectCommand `sinusitis_2022_03_22_test`, Overwrite, [patient, code, description, condition_age, marital, race, ethnicity, gender]
+- Repartition 1, false
   +- Aggregate [patient#3, code#5L, description#6, condition_age#0, marital#18, race#19, ethnicity#20, gender#21], [patient#3, code#5L, description#6, condition_age#0, marital#18, race#19, ethnicity#20, gender#21]
      +- Project [patient#3, code#5L, description#6, datediff(cast(substr(start#1, 1, 10) as date), cast(substr(cast(birthdate#8 as string), 1, 10) as date)) AS condition_age#0, marital#18, race#19, ethnicity#20, gender#21]
         +- Join Inner, (patient#3 = id#7)
            :- Project [start#1, patient#3, code#5L, description#6]
            :  +- Filter ((isnotnull(description#6) AND Contains(description#6, sinusitis)) AND isnotnull(patient#3))
            :     +- HiveTableRelation [`synthea_patient_big_data`.`conditions`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [start#1, stop#2, patient#3, encounter#4, code#5L, description#6], Partition Cols: []]
            +- Project [id#7, birthdate#8, marital#18, race#19, ethnicity#20, gender#21]
               +- Filter ((((((((isnotnull(gender#21) AND isnotnull(ethnicity#20)) AND isnotnull(race#19)) AND isnotnull(marital#18)) AND NOT (gender#21 = )) AND NOT (ethnicity#20 = )) AND NOT (race#19 = )) AND NOT (marital#18 = )) AND isnotnull(id#7))
                  +- HiveTableRelation [`synthea_patient_big_data`.`patients`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, birthdate#8, deathdate#9, ssn#10, drivers#11, passport#12, prefix#13, first#14, last#15, s..., Partition Cols: []]
, sink=CatalogTableDataset(urn=urn:li:dataset:(urn:li:dataPlatform:hive,sinusitis_2022_03_22_test,PROD)))

22/03/22 13:37:42 DEBUG DatahubSparkListener: Parsed execution id sinusitis_2022_03_22_test:1
22/03/22 13:37:43 INFO ClientConfigurationFactory: Set initial getObject socket timeout to 2000 ms.
22/03/22 13:37:43 INFO ParquetFileFormat: Using user defined output committer for Parquet: com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
22/03/22 13:37:43 INFO SQLConfCommitterProvider: Getting user defined output committer class com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
22/03/22 13:37:43 INFO EmrOptimizedParquetOutputCommitter: EMR Optimized Committer: ENABLED
22/03/22 13:37:43 INFO EmrOptimizedParquetOutputCommitter: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileSystemOptimizedCommitter
22/03/22 13:37:43 INFO FileOutputCommitter: File Output Committer Algorithm version is 2
22/03/22 13:37:43 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: true
22/03/22 13:37:43 INFO FileOutputCommitter: File Output Committer Algorithm version is 2
22/03/22 13:37:43 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: true
22/03/22 13:37:43 INFO SQLConfCommitterProvider: Using output committer class com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
22/03/22 13:37:43 INFO FileSystemOptimizedCommitter: Nothing to setup as successful task attempt outputs are written directly
22/03/22 13:37:44 INFO CodeGenerator: Code generated in 212.464667 ms
22/03/22 13:37:44 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 378.3 KiB, free 911.9 MiB)
22/03/22 13:37:44 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 36.2 KiB, free 911.9 MiB)
22/03/22 13:37:44 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-192-168-23-58.ec2.internal:42803 (size: 36.2 KiB, free: 912.3 MiB)
22/03/22 13:37:44 INFO SparkContext: Created broadcast 0 from
22/03/22 13:37:44 INFO GPLNativeCodeLoader: Loaded native gpl library
22/03/22 13:37:44 INFO LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 049362b7cf53ff5f739d6b1532457f2c6cd495e8]
22/03/22 13:37:45 INFO FileInputFormat: Total input files to process : 1
22/03/22 13:37:45 INFO DAGScheduler: Registering RDD 5 (saveAsTable at NativeMethodAccessorImpl.java:0) as input to shuffle 0
22/03/22 13:37:45 INFO DAGScheduler: Got map stage job 0 (saveAsTable at NativeMethodAccessorImpl.java:0) with 8 output partitions
22/03/22 13:37:45 INFO DAGScheduler: Final stage: ShuffleMapStage 0 (saveAsTable at NativeMethodAccessorImpl.java:0)
22/03/22 13:37:45 INFO DAGScheduler: Parents of final stage: List()
22/03/22 13:37:45 INFO DAGScheduler: Missing parents: List()
22/03/22 13:37:45 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[5] at saveAsTable at NativeMethodAccessorImpl.java:0), which has no missing parents
22/03/22 13:37:45 INFO CodeGenerator: Code generated in 53.696282 ms
22/03/22 13:37:45 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 378.6 KiB, free 911.5 MiB)
22/03/22 13:37:45 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 36.3 KiB, free 911.5 MiB)
22/03/22 13:37:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on ip-192-168-23-58.ec2.internal:42803 (size: 36.3 KiB, free: 912.2 MiB)
22/03/22 13:37:45 INFO SparkContext: Created broadcast 1 from
22/03/22 13:37:45 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 16.8 KiB, free 911.5 MiB)
22/03/22 13:37:45 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 8.2 KiB, free 911.5 MiB)
22/03/22 13:37:45 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on ip-192-168-23-58.ec2.internal:42803 (size: 8.2 KiB, free: 912.2 MiB)
22/03/22 13:37:45 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1484
22/03/22 13:37:45 INFO DAGScheduler: Submitting 8 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[5] at saveAsTable at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7))
22/03/22 13:37:45 INFO YarnScheduler: Adding task set 0.0 with 8 tasks resource profile 0
22/03/22 13:37:45 INFO FileInputFormat: Total input files to process : 1
22/03/22 13:37:45 INFO DAGScheduler: Registering RDD 11 (saveAsTable at NativeMethodAccessorImpl.java:0) as input to shuffle 1
22/03/22 13:37:45 INFO DAGScheduler: Got map stage job 1 (saveAsTable at NativeMethodAccessorImpl.java:0) with 8 output partitions
22/03/22 13:37:45 INFO DAGScheduler: Final stage: ShuffleMapStage 1 (saveAsTable at NativeMethodAccessorImpl.java:0)
22/03/22 13:37:45 INFO DAGScheduler: Parents of final stage: List()
22/03/22 13:37:45 INFO DAGScheduler: Missing parents: List()
22/03/22 13:37:45 INFO DAGScheduler: Submitting ShuffleMapStage 1 (MapPartitionsRDD[11] at saveAsTable at NativeMethodAccessorImpl.java:0), which has no missing parents
22/03/22 13:37:45 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (ip-192-168-12-253.ec2.internal, executor 2, partition 0, RACK_LOCAL, 4545 bytes) taskResourceAssignments Map()
22/03/22 13:37:45 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 19.9 KiB, free 911.4 MiB)
22/03/22 13:37:45 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 9.3 KiB, free 911.4 MiB)
22/03/22 13:37:45 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on ip-192-168-23-58.ec2.internal:42803 (size: 9.3 KiB, free: 912.2 MiB)
22/03/22 13:37:45 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1484
22/03/22 13:37:45 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1) (ip-192-168-20-114.ec2.internal, executor 1, partition 1, RACK_LOCAL, 4545 bytes) taskResourceAssignments Map()
22/03/22 13:37:45 INFO DAGScheduler: Submitting 8 missing tasks from ShuffleMapStage 1 (MapPartitionsRDD[11] at saveAsTable at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7))
22/03/22 13:37:45 INFO YarnScheduler: Adding task set 1.0 with 8 tasks resource profile 0
22/03/22 13:37:45 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2) (ip-192-168-12-253.ec2.internal, executor 2, partition 2, RACK_LOCAL, 4545 bytes) taskResourceAssignments Map()
22/03/22 13:37:45 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3) (ip-192-168-20-114.ec2.internal, executor 1, partition 3, RACK_LOCAL, 4545 bytes) taskResourceAssignments Map()
22/03/22 13:37:45 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4) (ip-192-168-12-253.ec2.internal, executor 2, partition 4, RACK_LOCAL, 4545 bytes) taskResourceAssignments Map()
22/03/22 13:37:45 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID 5) (ip-192-168-20-114.ec2.internal, executor 1, partition 5, RACK_LOCAL, 4545 bytes) taskResourceAssignments Map()
22/03/22 13:37:45 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID 6) (ip-192-168-12-253.ec2.internal, executor 2, partition 6, RACK_LOCAL, 4545 bytes) taskResourceAssignments Map()
22/03/22 13:37:45 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID 7) (ip-192-168-20-114.ec2.internal, executor 1, partition 7, RACK_LOCAL, 4545 bytes) taskResourceAssignments Map()
22/03/22 13:37:45 INFO ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 3 for resource profile id: 0)
22/03/22 13:37:45 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on ip-192-168-12-253.ec2.internal:43095 (size: 8.2 KiB, free: 9.7 GiB)
22/03/22 13:37:45 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on ip-192-168-20-114.ec2.internal:38173 (size: 8.2 KiB, free: 9.7 GiB)
22/03/22 13:37:46 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-192-168-20-114.ec2.internal:38173 (size: 36.2 KiB, free: 9.7 GiB)
22/03/22 13:37:46 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-192-168-12-253.ec2.internal:43095 (size: 36.2 KiB, free: 9.7 GiB)
22/03/22 13:37:46 INFO ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 4 for resource profile id: 0)
22/03/22 13:37:49 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 8) (ip-192-168-12-253.ec2.internal, executor 2, partition 0, RACK_LOCAL, 4541 bytes) taskResourceAssignments Map()
22/03/22 13:37:49 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 9) (ip-192-168-12-253.ec2.internal, executor 2, partition 1, RACK_LOCAL, 4541 bytes) taskResourceAssignments Map()
22/03/22 13:37:49 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 10) (ip-192-168-12-253.ec2.internal, executor 2, partition 2, RACK_LOCAL, 4541 bytes) taskResourceAssignments Map()
22/03/22 13:37:49 INFO TaskSetManager: Starting task 3.0 in stage 1.0 (TID 11) (ip-192-168-12-253.ec2.internal, executor 2, partition 3, RACK_LOCAL, 4541 bytes) taskResourceAssignments Map()
22/03/22 13:37:50 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 4578 ms on ip-192-168-12-253.ec2.internal (executor 2) (1/8)
22/03/22 13:37:50 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 4584 ms on ip-192-168-12-253.ec2.internal (executor 2) (2/8)
22/03/22 13:37:50 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 4635 ms on ip-192-168-12-253.ec2.internal (executor 2) (3/8)
22/03/22 13:37:50 INFO TaskSetManager: Finished task 6.0 in stage 0.0 (TID 6) in 4582 ms on ip-192-168-12-253.ec2.internal (executor 2) (4/8)
22/03/22 13:37:50 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on ip-192-168-12-253.ec2.internal:43095 (size: 9.3 KiB, free: 9.7 GiB)
22/03/22 13:37:50 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on ip-192-168-12-253.ec2.internal:43095 (size: 36.3 KiB, free: 9.7 GiB)
22/03/22 13:37:50 INFO TaskSetManager: Starting task 4.0 in stage 1.0 (TID 12) (ip-192-168-20-114.ec2.internal, executor 1, partition 4, RACK_LOCAL, 4541 bytes) taskResourceAssignments Map()
22/03/22 13:37:50 INFO TaskSetManager: Starting task 5.0 in stage 1.0 (TID 13) (ip-192-168-20-114.ec2.internal, executor 1, partition 5, RACK_LOCAL, 4541 bytes) taskResourceAssignments Map()
22/03/22 13:37:50 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 4623 ms on ip-192-168-20-114.ec2.internal (executor 1) (5/8)
22/03/22 13:37:50 INFO TaskSetManager: Starting task 6.0 in stage 1.0 (TID 14) (ip-192-168-20-114.ec2.internal, executor 1, partition 6, RACK_LOCAL, 4541 bytes) taskResourceAssignments Map()
22/03/22 13:37:50 INFO TaskSetManager: Starting task 7.0 in stage 1.0 (TID 15) (ip-192-168-20-114.ec2.internal, executor 1, partition 7, RACK_LOCAL, 4541 bytes) taskResourceAssignments Map()
22/03/22 13:37:50 INFO TaskSetManager: Finished task 7.0 in stage 0.0 (TID 7) in 4624 ms on ip-192-168-20-114.ec2.internal (executor 1) (6/8)
22/03/22 13:37:50 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 4628 ms on ip-192-168-20-114.ec2.internal (executor 1) (7/8)
22/03/22 13:37:50 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID 5) in 4625 ms on ip-192-168-20-114.ec2.internal (executor 1) (8/8)
22/03/22 13:37:50 INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool
22/03/22 13:37:50 INFO DAGScheduler: ShuffleMapStage 0 (saveAsTable at NativeMethodAccessorImpl.java:0) finished in 4.838 s
22/03/22 13:37:50 INFO DAGScheduler: looking for newly runnable stages
22/03/22 13:37:50 INFO DAGScheduler: running: Set(ShuffleMapStage 1)
22/03/22 13:37:50 INFO DAGScheduler: waiting: Set()
22/03/22 13:37:50 INFO DAGScheduler: failed: Set()
22/03/22 13:37:50 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on ip-192-168-20-114.ec2.internal:38173 (size: 9.3 KiB, free: 9.7 GiB)
22/03/22 13:37:50 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on ip-192-168-20-114.ec2.internal:38173 (size: 36.3 KiB, free: 9.7 GiB)
22/03/22 13:37:51 INFO TaskSetManager: Finished task 3.0 in stage 1.0 (TID 11) in 1032 ms on ip-192-168-12-253.ec2.internal (executor 2) (1/8)
22/03/22 13:37:51 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 9) in 1040 ms on ip-192-168-12-253.ec2.internal (executor 2) (2/8)
22/03/22 13:37:51 INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID 10) in 1055 ms on ip-192-168-12-253.ec2.internal (executor 2) (3/8)
22/03/22 13:37:51 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 8) in 1079 ms on ip-192-168-12-253.ec2.internal (executor 2) (4/8)
22/03/22 13:37:51 INFO TaskSetManager: Finished task 5.0 in stage 1.0 (TID 13) in 1047 ms on ip-192-168-20-114.ec2.internal (executor 1) (5/8)
22/03/22 13:37:51 INFO TaskSetManager: Finished task 4.0 in stage 1.0 (TID 12) in 1093 ms on ip-192-168-20-114.ec2.internal (executor 1) (6/8)
22/03/22 13:37:51 INFO TaskSetManager: Finished task 7.0 in stage 1.0 (TID 15) in 1105 ms on ip-192-168-20-114.ec2.internal (executor 1) (7/8)
22/03/22 13:37:51 INFO TaskSetManager: Finished task 6.0 in stage 1.0 (TID 14) in 1112 ms on ip-192-168-20-114.ec2.internal (executor 1) (8/8)
22/03/22 13:37:51 INFO YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool
22/03/22 13:37:51 INFO DAGScheduler: ShuffleMapStage 1 (saveAsTable at NativeMethodAccessorImpl.java:0) finished in 5.764 s
22/03/22 13:37:51 INFO DAGScheduler: looking for newly runnable stages
22/03/22 13:37:51 INFO DAGScheduler: running: Set()
22/03/22 13:37:51 INFO DAGScheduler: waiting: Set()
22/03/22 13:37:51 INFO DAGScheduler: failed: Set()
22/03/22 13:37:51 INFO ShufflePartitionsUtil: For shuffle(0, 1), advisory target size: 67108864, actual target size 1133122.
22/03/22 13:37:51 INFO HashAggregateExec: spark.sql.codegen.aggregate.map.twolevel.enabled is set to true, but current version of codegened fast hashmap does not support this aggregate.
22/03/22 13:37:51 INFO HashAggregateExec: spark.sql.codegen.aggregate.map.twolevel.enabled is set to true, but current version of codegened fast hashmap does not support this aggregate.
22/03/22 13:37:51 INFO CodeGenerator: Code generated in 100.29097 ms
22/03/22 13:37:51 INFO CodeGenerator: Code generated in 27.793073 ms
22/03/22 13:37:51 INFO CodeGenerator: Code generated in 17.096593 ms
22/03/22 13:37:51 INFO HashAggregateExec: spark.sql.codegen.aggregate.map.twolevel.enabled is set to true, but current version of codegened fast hashmap does not support this aggregate.
22/03/22 13:37:51 INFO HashAggregateExec: spark.sql.codegen.aggregate.map.twolevel.enabled is set to true, but current version of codegened fast hashmap does not support this aggregate.
22/03/22 13:37:51 INFO SparkContext: Starting job: saveAsTable at NativeMethodAccessorImpl.java:0
22/03/22 13:37:51 INFO DAGScheduler: Got job 2 (saveAsTable at NativeMethodAccessorImpl.java:0) with 1 output partitions
22/03/22 13:37:51 INFO DAGScheduler: Final stage: ResultStage 4 (saveAsTable at NativeMethodAccessorImpl.java:0)
22/03/22 13:37:51 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 2, ShuffleMapStage 3)
22/03/22 13:37:51 INFO DAGScheduler: Missing parents: List()
22/03/22 13:37:51 INFO DAGScheduler: Submitting ResultStage 4 (MapPartitionsRDD[23] at saveAsTable at NativeMethodAccessorImpl.java:0), which has no missing parents
22/03/22 13:37:51 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 262.3 KiB, free 911.2 MiB)
22/03/22 13:37:51 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 98.8 KiB, free 911.1 MiB)
22/03/22 13:37:51 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on ip-192-168-23-58.ec2.internal:42803 (size: 98.8 KiB, free: 912.1 MiB)
22/03/22 13:37:51 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1484
22/03/22 13:37:51 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 4 (MapPartitionsRDD[23] at saveAsTable at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
22/03/22 13:37:51 INFO YarnScheduler: Adding task set 4.0 with 1 tasks resource profile 0
22/03/22 13:37:51 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 16) (ip-192-168-12-253.ec2.internal, executor 2, partition 0, PROCESS_LOCAL, 6767 bytes) taskResourceAssignments Map()
22/03/22 13:37:51 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on ip-192-168-12-253.ec2.internal:43095 (size: 98.8 KiB, free: 9.7 GiB)
22/03/22 13:37:52 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 192.168.12.253:34682
22/03/22 13:37:52 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 1 to 192.168.12.253:34682
22/03/22 13:37:55 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 16) in 3530 ms on ip-192-168-12-253.ec2.internal (executor 2) (1/1)
22/03/22 13:37:55 INFO DAGScheduler: ResultStage 4 (saveAsTable at NativeMethodAccessorImpl.java:0) finished in 3.592 s
22/03/22 13:37:55 INFO DAGScheduler: Job 2 is finished. Cancelling potential speculative or zombie tasks for this job
22/03/22 13:37:55 INFO YarnScheduler: Removed TaskSet 4.0, whose tasks have all completed, from pool
22/03/22 13:37:55 INFO YarnScheduler: Killing all running tasks in stage 4: Stage finished
22/03/22 13:37:55 INFO DAGScheduler: Job 2 finished: saveAsTable at NativeMethodAccessorImpl.java:0, took 3.614238 s
22/03/22 13:37:55 INFO MultipartUploadOutputStream: close closed:false s3://databrew-demo-111222333444-us-east-1/sinusitis_2022_03_22_test/_SUCCESS
22/03/22 13:37:55 INFO FileFormatWriter: Write Job 9d632b05-e2a4-4097-bfc2-eba7eb22bd6a committed.
22/03/22 13:37:55 INFO FileFormatWriter: Finished processing stats for write job 9d632b05-e2a4-4097-bfc2-eba7eb22bd6a.
22/03/22 13:37:55 INFO InMemoryFileIndex: It took 60 ms to list leaf files for 1 paths.
22/03/22 13:37:56 INFO HiveExternalCatalog: Persisting bucketed data source table `synthea_patient_big_data`.`sinusitis_2022_03_22_test` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive bucketed table. But Hive can read this table as a non-bucketed table.
22/03/22 13:37:56 INFO SQLStdHiveAccessController: Created SQLStdHiveAccessController for session context : HiveAuthzSessionContext [sessionString=fed3380f-cf63-41fe-ab91-99ef9f02796f, clientType=HIVECLI]
22/03/22 13:37:56 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
22/03/22 13:37:56 INFO AWSCatalogMetastoreClient: Mestastore configuration hive.metastore.filter.hook changed from org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl to org.apache.hadoop.hive.ql.security.authorization.plugin.AuthorizationMetaStoreFilterHook
22/03/22 13:37:56 INFO AWSGlueClientFactory: Using region from ec2 metadata : us-east-1
22/03/22 13:37:56 WARN CredentialsLegacyConfigLocationProvider: Found the legacy config profiles file at [/home/hadoop/.aws/config]. Please move it to the latest default location [~/.aws/credentials].
22/03/22 13:37:56 INFO AWSGlueClientFactory: Using region from ec2 metadata : us-east-1
22/03/22 13:37:56 WARN CredentialsLegacyConfigLocationProvider: Found the legacy config profiles file at [/home/hadoop/.aws/config]. Please move it to the latest default location [~/.aws/credentials].
22/03/22 13:37:57 DEBUG DatahubSparkListener: SQL Exec end event with id 1
22/03/22 13:37:57 DEBUG McpEmitter: emitting mcpw: MetadataChangeProposalWrapper(entityType=dataJob, entityUrn=urn:li:dataJob:(urn:li:dataFlow:(spark,sinusitis_2022_03_22_test,yarn),QueryExecId_1), changeType=UPSERT, aspect={customProperties={SQLQueryId=1, completedAt=2022-03-22T13:37:57.341Z, appName=sinusitis_2022_03_22_test, appId=application_1647953757276_0006, startedAt=2022-03-22T13:37:41.911Z, description=saveAsTable at NativeMethodAccessorImpl.java:0, queryPlan=CreateDataSourceTableAsSelectCommand `sinusitis_2022_03_22_test`, Overwrite, [patient, code, description, condition_age, marital, race, ethnicity, gender]
+- Repartition 1, false
   +- Aggregate [patient#3, code#5L, description#6, condition_age#0, marital#18, race#19, ethnicity#20, gender#21], [patient#3, code#5L, description#6, condition_age#0, marital#18, race#19, ethnicity#20, gender#21]
      +- Project [patient#3, code#5L, description#6, datediff(cast(substr(start#1, 1, 10) as date), cast(substr(cast(birthdate#8 as string), 1, 10) as date)) AS condition_age#0, marital#18, race#19, ethnicity#20, gender#21]
         +- Join Inner, (patient#3 = id#7)
            :- Project [start#1, patient#3, code#5L, description#6]
            :  +- Filter ((isnotnull(description#6) AND Contains(description#6, sinusitis)) AND isnotnull(patient#3))
            :     +- HiveTableRelation [`synthea_patient_big_data`.`conditions`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [start#1, stop#2, patient#3, encounter#4, code#5L, description#6], Partition Cols: []]
            +- Project [id#7, birthdate#8, marital#18, race#19, ethnicity#20, gender#21]
               +- Filter ((((((((isnotnull(gender#21) AND isnotnull(ethnicity#20)) AND isnotnull(race#19)) AND isnotnull(marital#18)) AND NOT (gender#21 = )) AND NOT (ethnicity#20 = )) AND NOT (race#19 = )) AND NOT (marital#18 = )) AND isnotnull(id#7))
                  +- HiveTableRelation [`synthea_patient_big_data`.`patients`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, birthdate#8, deathdate#9, ssn#10, drivers#11, passport#12, prefix#13, first#14, last#15, s..., Partition Cols: []]
}, name=saveAsTable at NativeMethodAccessorImpl.java:0, type={string=sparkJob}}, aspectName=dataJobInfo)
22/03/22 13:37:57 INFO McpEmitter: MetadataWriteResponse(success=true, responseContent={"value":"urn:li:dataJob:(urn:li:dataFlow:(spark,sinusitis_2022_03_22_test,yarn),QueryExecId_1)"}, underlyingResponse=HTTP/1.1 200 OK [Date: Tue, 22 Mar 2022 13:37:57 GMT, Content-Type: application/json, X-RestLi-Protocol-Version: 2.0.0, Content-Length: 97, Server: Jetty(9.4.20.v20190813)] [Content-Length: 97,Chunked: false])
22/03/22 13:37:57 DEBUG DatahubSparkListener: SQL Exec start event with id 2
22/03/22 13:37:57 DEBUG DatahubSparkListener: PLAN for execution id: sinusitis_2022_03_22_test:2
22/03/22 13:37:57 DEBUG DatahubSparkListener: AlterTableSetPropertiesCommand `synthea_patient_big_data`.`sinusitis_2022_03_22_test`, [classification=parquet], false
22/03/22 13:37:57 DEBUG DatahubSparkListener: Skipping execution as no output dataset present for execution id: sinusitis_2022_03_22_test:2
22/03/22 13:37:59 INFO log: Updating table stats fast for sinusitis_2022_03_22_test
22/03/22 13:37:59 INFO log: Updated size of table sinusitis_2022_03_22_test to 2210971
22/03/22 13:37:59 DEBUG DatahubSparkListener: SQL Exec end event with id 2
22/03/22 13:37:59 INFO DatahubSparkListener: Application ended : sinusitis_2022_03_22_test application_1647953757276_0006
22/03/22 13:37:59 DEBUG McpEmitter: emitting mcpw: MetadataChangeProposalWrapper(entityType=dataFlow, entityUrn=urn:li:dataFlow:(spark,sinusitis_2022_03_22_test,yarn), changeType=UPSERT, aspect={name=sinusitis_2022_03_22_test, customProperties={completedAt=2022-03-22T13:37:59.545Z, appName=sinusitis_2022_03_22_test, appId=application_1647953757276_0006, startedAt=2022-03-22T13:37:18.324Z, sparkUser=hadoop}}, aspectName=dataFlowInfo)
22/03/22 13:37:59 INFO AbstractConnector: Stopped Spark@1c5f6e66{HTTP/1.1, (http/1.1)}{0.0.0.0:4040}
22/03/22 13:37:59 INFO SparkUI: Stopped Spark web UI at http://ip-192-168-23-58.ec2.internal:4040
22/03/22 13:37:59 INFO YarnClientSchedulerBackend: Interrupting monitor thread
22/03/22 13:37:59 INFO YarnClientSchedulerBackend: Shutting down all executors
22/03/22 13:37:59 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
22/03/22 13:37:59 INFO McpEmitter: MetadataWriteResponse(success=true, responseContent={"value":"urn:li:dataFlow:(spark,sinusitis_2022_03_22_test,yarn)"}, underlyingResponse=HTTP/1.1 200 OK [Date: Tue, 22 Mar 2022 13:37:59 GMT, Content-Type: application/json, X-RestLi-Protocol-Version: 2.0.0, Content-Length: 66, Server: Jetty(9.4.20.v20190813)] [Content-Length: 66,Chunked: false])
22/03/22 13:37:59 INFO YarnClientSchedulerBackend: YARN client scheduler backend Stopped
22/03/22 13:38:00 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/03/22 13:38:00 INFO MemoryStore: MemoryStore cleared
22/03/22 13:38:00 INFO BlockManager: BlockManager stopped
22/03/22 13:38:00 INFO BlockManagerMaster: BlockManagerMaster stopped
22/03/22 13:38:00 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/03/22 13:38:00 INFO SparkContext: Successfully stopped SparkContext
22/03/22 13:38:00 INFO ShutdownHookManager: Shutdown hook called
22/03/22 13:38:00 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-2800aac5-999f-49f3-b344-c0aa55bb368b
22/03/22 13:38:00 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-a4efc7e7-28b0-4cda-ab29-88f6dbc9c758
22/03/22 13:38:00 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-2800aac5-999f-49f3-b344-c0aa55bb368b/pyspark-50b460fd-21a5-4a8e-bf3d-18278b4baa4c
github-actions[bot] commented 1 year ago

This issue is stale because it has been open for 30 days with no activity. If you believe this is still an issue on the latest DataHub release please leave a comment with the version that you tested it with. If this is a question/discussion please head to https://slack.datahubproject.io. For feature requests please use https://feature-requests.datahubproject.io

treff7es commented 1 year ago

I think the Spark Execution plan doesn't know if it is Glue or Hive. We should add some options to identify Hive tables as Glue if user wants that.

WiseLin1125 commented 11 months ago

@treff7es @garystafford I encountered the same issue and especially on AWS blog https://aws.amazon.com/blogs/big-data/part-2-deploy-datahub-using-aws-managed-services-and-ingest-metadata-from-aws-glue-and-amazon-redshift/

Bt in my case, my DataHub can only capture the Spark task without any Hive or Glue task/dataset. Is there any new solution for this?

drewhayward commented 9 months ago

I'm facing some similar issues and have made some progress. I think the root problem of the issue is now solved with a spark config.

To get Spark Job -> Glue table to work I had to

  1. Configure the spark extractor with spark.datahub.metadata.table.hive_platform_alias=glue connector config. This made the spark job properly extract the glue platform. However, since 0.10.1 (See the breaking changes) these newly created tables won't get displayed since they don't have any "aspects"
  2. Re-ingest the related glue-catalog, which then populated the glue table node created by spark, so that it appears in the UI
denystyshetskyy commented 2 months ago

Same issue for me. Any update on this one? Looks like it has been a while since raised Have some more details here

treff7es commented 1 month ago

@denystyshetskyy can you try the new spark plugin? https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta

if you set spark.datahub.metadata.dataset.hivePlatformAlias to glue then it should properly capture glue datasets.