apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.39k stars 2.42k forks source link

[SUPPORT] Hudi partition values not getting reflected in Athena #4267

Closed Arun-kc closed 2 years ago

Arun-kc commented 2 years ago

Describe the problem you faced

Partitioned data is not getting reflected in AWS Glue catalog (Athena table)

To Reproduce

Steps to reproduce the behavior:

  1. Create a Glue job in AWS
  2. Copy paste the code in this article
  3. Run the Glue job

Expected behavior

Partition values should be reflected in Glue catalog in Athena

Environment Description

Additional context

Trying to update partition values as mentioned in this article by @dacort

Athena table DDL is as follows

CREATE EXTERNAL TABLE `my_hudi_table`(
  `_hoodie_commit_time` string, 
  `_hoodie_commit_seqno` string, 
  `_hoodie_record_key` string, 
  `_hoodie_partition_path` string, 
  `_hoodie_file_name` string, 
  `id` string, 
  `last_update_time` string)
PARTITIONED BY ( 
  `creation_date` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://<BUCKET>/tmp/myhudidataset_001'

Stacktrace

2021-12-09 05:20:44,030 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(70)): Error from Python:Traceback (most recent call last):
  File "/tmp/test_job", line 56, in <module>
    .save("s3://<BUCKET>/tmp/myhudidataset_001/")
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 734, in save
    self._jwrite.save(path)
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o116.save.
: org.apache.hudi.hive.HoodieHiveSyncException: Cannot create hive connection jdbc:hive2://localhost:10000/
    at org.apache.hudi.hive.HoodieHiveClient.createHiveConnection(HoodieHiveClient.java:553)
    at org.apache.hudi.hive.HoodieHiveClient.<init>(HoodieHiveClient.java:109)
    at org.apache.hudi.hive.HiveSyncTool.<init>(HiveSyncTool.java:65)
    at org.apache.hudi.HoodieSparkSqlWriter$.syncHive(HoodieSparkSqlWriter.scala:234)
    at org.apache.hudi.HoodieSparkSqlWriter$.checkWriteStatus(HoodieSparkSqlWriter.scala:285)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:188)
    at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:108)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: Could not open client transport with JDBC Uri: jdbc:hive2://localhost:10000: java.net.ConnectException: Connection refused (Connection refused)
    at org.apache.hive.jdbc.HiveConnection.openTransport(HiveConnection.java:232)
    at org.apache.hive.jdbc.HiveConnection.<init>(HiveConnection.java:176)
    at org.apache.hive.jdbc.HiveDriver.connect(HiveDriver.java:105)
    at java.sql.DriverManager.getConnection(DriverManager.java:664)
    at java.sql.DriverManager.getConnection(DriverManager.java:247)
    at org.apache.hudi.hive.HoodieHiveClient.createHiveConnection(HoodieHiveClient.java:550)
    ... 38 more
Caused by: org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused (Connection refused)
    at org.apache.thrift.transport.TSocket.open(TSocket.java:226)
    at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:266)
    at org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37)
    at org.apache.hive.jdbc.HiveConnection.openTransport(HiveConnection.java:204)
    ... 43 more
Caused by: java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:607)
    at org.apache.thrift.transport.TSocket.open(TSocket.java:221)
    ... 46 more
Carl-Zhou-CN commented 2 years ago

@Arun-kc It feels like a connection problem, please check hoodie.datasource.hive_sync.jdbcurl, it seems to be a default value now

Arun-kc commented 2 years ago

@Carl-Zhou-CN The following is the hudi options I'm using as of now.

hudiOptions = {
    "hoodie.table.name": "my_hudi_table",
    "hoodie.datasource.write.recordkey.field": "id",
    "hoodie.datasource.write.partitionpath.field": "creation_date",
    "hoodie.datasource.write.precombine.field": "last_update_time",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.hive_sync.table": "my_hudi_table",
    "hoodie.datasource.hive_sync.partition_fields": "creation_date",
    "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
    "hoodie.index.type": "GLOBAL_BLOOM",                 # This is required if we want to ensure we upsert a record, even if the partition changes
    "hoodie.bloom.index.update.partition.path": "true",  # This is required to write the data into the new partition (defaults to false in 0.8.0, true in 0.9.0)
}

As for hoodie.datasource.hive_sync.jdbcurl, I'm not using any hive as of now, so what URL should I mention?

I'm doing this in AWS Glue and using a hudi connector.

Carl-Zhou-CN commented 2 years ago
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.table": "my_hudi_table",
"hoodie.datasource.hive_sync.partition_fields": "creation_date",
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",

@Arun-kc If you do not register your Hudi dataset as a table in the Hive metastore, these options are not required.

Carl-Zhou-CN commented 2 years ago

Because of your hudi version, you may need to manually update the partition after writing ALTER TABLE table_name RECOVER PARTITIONS;

nikita-sheremet-clearscale commented 2 years ago

I do not know how to fix this for Glue because it hides all nodes from management. But I know how to fix this error for EMR. The source article is - https://aws.amazon.com/ru/blogs/big-data/apply-record-level-changes-from-relational-databases-to-amazon-s3-data-lake-using-apache-hudi-on-amazon-emr-and-aws-database-migration-service/

See the error:

py4j.protocol.Py4JJavaError: An error occurred while calling o116.save.
: org.apache.hudi.hive.HoodieHiveSyncException: Cannot create hive connection jdbc:hive2://localhost:10000/
    at org.apache.hudi.hive.HoodieHiveClient.createHiveConnection(HoodieHiveClient.java:553)

It means that all nodes inside the cluster try to connect to localhost e.g, themselves and fail.

The solution for EMR

Call ListInstances with EMR ClusterId and InstanceGroupTypes MASTER. Then grab PrivateIpAddress (Json path is $.Instances[0].PrivateIpAddress). And path this as hudi config parameter:

--hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://111.111.111.111:10000

With this, all cluster nodes will connect to the master and sync table.

Couple of notes: 1) I used hudi version 0.7 from amazon and hive/glue catalog sync worked without any problems. But when I move to 0.9.0 I see no new partitions. I just changed the version nothing else. Also another application with new 0.9.0 version needs IP address manipulation. 2) I can not say how my fixes can be applied to glue job. Sorry. Try to connect to aws support and tell them that you need to get master node IP address before submit a job. Something tells me to run some code to get IP addresses and add hudi config programmatically - but is it possible to access glue job master node IP? I do not know. :-(

Carl-Zhou-CN commented 2 years ago

hoodie.datasource.hive_sync.use_jdbc -> false Try not to connect to metastore through jdbc? This might help

Arun-kc commented 2 years ago

@Carl-Zhou-CN I tried ALTER TABLE table_name RECOVER PARTITIONS;, but its not working.

image

hoodie.datasource.hive_sync.use_jdbc -> false Tried this approach too, but to no vain.

@nikita-sheremet-clearscale Yes, I'm using Glue in this scenario. I'm using a hudi connector that was subscribed when the version was 0.5.1. Now in marketplace the version is shown as 0.9.0. I'm not sure if the subscribed version gets updated automatically.

I will check on the IP part and will let you know.

Just to let you know, the hudi table I'm creating it manually in Athena using the following DDL

CREATE EXTERNAL TABLE `my_hudi_table`(
  `_hoodie_commit_time` string, 
  `_hoodie_commit_seqno` string, 
  `_hoodie_record_key` string, 
  `_hoodie_partition_path` string, 
  `_hoodie_file_name` string, 
  `id` string, 
  `last_update_time` string)
PARTITIONED BY ( 
  `creation_date` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://<BUCKET>/tmp/myhudidataset_001'
Carl-Zhou-CN commented 2 years ago

@Arun-kc Sorry, it seems I misunderstood,what needs to be done should be ALTER TABLE ADD PARTITION image image

https://docs.aws.amazon.com/athena/latest/ug/querying-hudi.html#querying-hudi-in-athena-creating-hudi-tables

Arun-kc commented 2 years ago

@Carl-Zhou-CN It's ok.

I have tried ALTER TABLE ADD PARTITION before, it does work. But we will have to specify the partitions manually. When there are a lot of partitions this is not a viable solution, until and unless we can automate it. I will have to create a script to do this using boto3, that's doable.

What I was trying to do is letting the Hudi system do this on its own so that in Athena we can query the partitions directly without running any other queries. Is it possible?

Carl-Zhou-CN commented 2 years ago

I think it is possible, but I am not familiar with Athena. I think that as long as Hudi can interact with Glue Catalog, your problem should be solved. You may need to ask others to help. @nsivabalan Do you have time to help?

Arun-kc commented 2 years ago

Hi @Carl-Zhou-CN and @nikita-sheremet-clearscale

I tried the same with hudi connector version 0.9.0 and it's working fine now. The partition is getting reflected in Athena. Seems the problem was with the hudi connector version 0.5.1.

Thanks for the help both of you 🙌 I'm closing this issue

nsivabalan commented 2 years ago

thanks for the update.

joshhamann commented 2 years ago

@Arun-kc can you update what config settings ended up working with Glue and hudi version 0.9.0 please?

nsivabalan commented 2 years ago

CC @bhasudha @rajkalluri: for doc updates if any wrt version compatabilities.