apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.39k stars 2.42k forks source link

[SUPPORT] Seeking Assistance with Hudi Integration Issue in Spark Thrift Server and DBT #10287

Closed soumilshah1995 closed 10 months ago

soumilshah1995 commented 10 months ago

Hey community,

I hope you're doing well. I recently launched a Thrift server using Spark, incorporating the Hudi library. The server runs smoothly, and I can interact with it using Beeline to query data successfully.

spark-submit \
  --master 'local[*]' \
  --conf spark.executor.extraJavaOptions=-Duser.timezone=Etc/UTC \
  --conf spark.eventLog.enabled=false \
  --conf spark.sql.warehouse.dir=file:///Users/soumilshah/Desktop/soumil/sparkwarehouse \
  --packages 'org.apache.hudi:hudi-spark3-bundle_2.12:0.14.0,org.apache.spark:spark-sql_2.12:3.4.0,org.apache.spark:spark-hive_2.12:3.4.0' \
  --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 \
  --name "Thrift JDBC/ODBC Server" \
  --executor-memory 512m \
  --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
  --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
  --conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar

BEELINE

beeline -u jdbc:hive2://localhost:10000/default

Screenshot 2023-12-08 at 3 48 08 PM

CREATE TABLE hudi_table (
    ts BIGINT,
    uuid STRING,
    rider STRING,
    driver STRING,
    fare DOUBLE,
    city STRING
) USING HUDI
PARTITIONED BY (city);

Works fine

INSerted data

INSERT INTO hudi_table
VALUES
(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
(1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'),
(1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'),
(1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
(1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'    ),
(1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo'    ),
(1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai'      ),
(1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');

Screenshot 2023-12-08 at 3 49 11 PM

DBT debug

 dbt debug
20:49:42  Running with dbt=1.7.3
20:49:42  dbt version: 1.7.3
20:49:42  python version: 3.9.6
20:49:42  python path: /Users/soumilshah/IdeaProjects/mdbt/project/dbt-env/bin/python
20:49:42  os info: macOS-13.6.1-x86_64-i386-64bit
20:49:42  Using profiles dir at /Users/soumilshah/.dbt
20:49:42  Using profiles.yml file at /Users/soumilshah/.dbt/profiles.yml
20:49:42  Using dbt_project.yml file at /Users/soumilshah/IdeaProjects/mdbt/project/hudidbt/dbt_project.yml
20:49:42  adapter type: spark
20:49:42  adapter version: 1.7.1
20:49:42  Configuration:
20:49:42    profiles.yml file [OK found and valid]
20:49:42    dbt_project.yml file [OK found and valid]
20:49:42  Required dependencies:
20:49:42   - git [OK found]

20:49:42  Connection:
20:49:42    host: localhost
20:49:42    port: 10000
20:49:42    cluster: None
20:49:42    endpoint: None
20:49:42    schema: default
20:49:42    organization: 0
20:49:42  Registered adapter: spark=1.7.1
20:49:42    Connection test: [OK connection ok]

20:49:42  All checks passed!
(dbt-env) soumilshah@Soumils-MBP hudidbt % 

Directory

Screenshot 2023-12-08 at 3 51 21 PM

schema.yml


version: 2

models:
  - name: hudi_insert_table
    description: "Hudi insert non-partitioned table with incremental materialization"
    columns:
      - name: id
        description: "The primary key for this table"

hudi_insert_overwrite_table.sql

{{
    config(
        materialized='incremental',
        file_format='hudi',
        unique_key='id'
    )
}}

with source_data as (

    select format_number(rand()*1000, 0) as id
    union all
    select null as id

    )

select *
from source_data
where id is not null

dbt_project.yml

name: 'hudidbt'
version: '1.0.0'
config-version: 2
profile: 'hudidbt'

model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

clean-targets:         # directories to be removed by `dbt clean`
  - "target"
  - "dbt_packages"

models:
  +file_format: hudi
  hudidbt:
    core:
      materialized: table

DBT run

 dbt run
20:52:40  Running with dbt=1.7.3
20:52:40  Registered adapter: spark=1.7.1
20:52:40  Found 1 model, 0 sources, 0 exposures, 0 metrics, 439 macros, 0 groups, 0 semantic models
20:52:40  
20:52:41  Concurrency: 1 threads (target='dev')
20:52:41  
20:52:41  1 of 1 START sql incremental model default.hudi_insert_overwrite_table ......... [RUN]
20:52:41  1 of 1 ERROR creating sql incremental model default.hudi_insert_overwrite_table  [ERROR in 0.14s]
20:52:41  
20:52:41  Finished running 1 incremental model in 0 hours 0 minutes and 0.60 seconds (0.60s).
20:52:41  
20:52:41  Completed with 1 error and 0 warnings:
20:52:41  
20:52:41    Runtime Error in model hudi_insert_overwrite_table (models/core/hudi_insert_overwrite_table.sql)
  Database Error
    org.apache.hive.service.cli.HiveSQLException: Error running query: [DATA_SOURCE_NOT_FOUND] org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: hudi. Please find packages at `https://spark.apache.org/third-party-projects.html`.
        at org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:43)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:261)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:165)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties(SparkOperation.scala:79)
        at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties$(SparkOperation.scala:63)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:40)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:165)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:160)
        at java.base/java.security.AccessController.doPrivileged(Native Method)
        at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:174)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: hudi. Please find packages at `https://spark.apache.org/third-party-projects.html`.
        at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:738)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
        at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog.org$apache$spark$sql$catalyst$analysis$ResolveSessionCatalog$$isV2Provider(ResolveSessionCatalog.scala:604)
        at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog$$anonfun$apply$1.applyOrElse(ResolveSessionCatalog.scala:180)
        at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog$$anonfun$apply$1.applyOrElse(ResolveSessionCatalog.scala:52)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:111)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:110)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog.apply(ResolveSessionCatalog.scala:52)
        at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog.apply(ResolveSessionCatalog.scala:46)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:228)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:224)
        at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:224)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:671)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:226)
        ... 16 more
    Caused by: java.lang.ClassNotFoundException: hudi.DefaultSource
        at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
        at scala.util.Try$.apply(Try.scala:213)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
        at scala.util.Failure.orElse(Try.scala:224)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
        ... 70 more

20:52:41  
20:52:41  Done. PASS=0 WARN=0 ERROR=1 SKIP=0 TOTAL=1
(dbt-env) soumilshah@Soumils-MBP hudidbt % 

Any insights or guidance on resolving this issue would be greatly appreciated! If you have any experience with integrating Hudi into Spark Thrift Server and overcoming similar challenges, your expertise would be invaluable.

Thanks in advance for your help!

Regards Soumil

soumilshah1995 commented 10 months ago

previous test i used spark parquet and had launched thrift server using similar technique worked fine

Thrift sever

spark-submit \
  --master 'local[*]' \
  --conf spark.executor.extraJavaOptions=-Duser.timezone=Etc/UTC \
  --conf spark.eventLog.enabled=false \
  --conf spark.sql.warehouse.dir=file:///Users/soumilshah/Desktop/soumil/sparkwarehouse \
  --packages 'org.apache.spark:spark-sql_2.12:3.4.0,org.apache.spark:spark-hive_2.12:3.4.0' \
  --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 \
  --name "Thrift JDBC/ODBC Server" \
  --executor-memory 512m

this was just using spark and parquet

{{ config(
    materialized='incremental',
    file_format='parquet',
    incremental_strategy='insert_overwrite',
) }}

SELECT
    date(d) AS id,
    d AS full_date,
    EXTRACT (YEAR FROM d)  AS YEAR,
    EXTRACT (WEEK FROM d)  AS year_week,
    EXTRACT (DAY FROM d)   AS year_day,
    EXTRACT (YEAR FROM  d)  AS fiscal_year,
    EXTRACT (QUARTER FROM d) AS fiscal_qtr,
    EXTRACT (MONTH FROM d) AS MONTH,
    date_format(d, 'MMMM')  AS month_name,
    EXTRACT (DOW FROM d)  AS week_day,
    date_format(d, 'EEEE')  AS day_name,
    (CASE WHEN date_format(d, 'EEEE') NOT IN ('Sunday', 'Saturday') THEN 0 ELSE 1 END) AS day_is_weekday
FROM (SELECT EXPLODE(months) AS d FROM (SELECT SEQUENCE (TO_DATE('2000-01-01'), TO_DATE('2023-01-01'), INTERVAL 1 DAY) AS months))

DBT project.yml

name: 'sparkdbt'
version: '1.0.0'
config-version: 2

profile: 'sparkdbt'

model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

clean-targets:
  - "target"
  - "dbt_packages"

models:
  sparkdbt:
    core:
      +enabled: true
      +materialized: table
    dim_date:
      +materialized: table

works fine i was able to see table and query it using beeline and dbeaver as well i am seeing issue with Hudi tables not sure what CONF i am missing here

I also have read examples provided https://github.com/apache/hudi/blob/master/hudi-examples/hudi-examples-dbt/README.md

could not get it to work for some reason

ad1happy2go commented 10 months ago

@soumilshah1995 I saw this error before. Can you put your hudi bundle jars in the jars folder and set spark configurations in spark-defaults.yaml. Actually DBT on the runtime doesn't take dependencies/config which you gave in the command to start thrift server.

soumilshah1995 commented 10 months ago

I explored two pathways in addressing this challenge. The first route involved initiating the process using pure Hive and Spark SQL, coupled with Thrift Server. However, when attempting to run dbt, I encountered a specific issue.

The second route, a more intricate and time-consuming approach, required the installation of Apache Derby and Spark. Despite my efforts, an odd complication arose: when executing "dbt run," it led to the unexpected crash of both Thrift Server and Apache Derby.

Step 1: Create DBT Environment

# Create virtual environment for DBT
python -m venv dbt-env
source dbt-env/bin/activate

# Install required packages
pip install dbt-core
pip install dbt-spark
pip install 'dbt-spark[PyHive]'

# Navigate to DBT directory
cd ~/.dbt/

# Set Java environment variable

export JAVA_HOME=/opt/homebrew/Cellar/openjdk@11/11.0.21/libexec/openjdk.jdk/Contents/Home


# Step 2: Download and Run Apache Derby
```bash

export DERBY_VERSION=10.14.2.0
curl -O https://archive.apache.org/dist/db/derby/db-derby-$DERBY_VERSION/db-derby-$DERBY_VERSION-bin.tar.gz -P /opt/
tar -xf db-derby-$DERBY_VERSION-bin.tar.gz
export DERBY_HOME=/Users/soumilshah/Desktop/soumil/dbt/db-derby-10.14.2.0-bin
echo $DERBY_HOME
rm -r db-derby-10.14.2.0-bin.tar.gz
$DERBY_HOME/bin/startNetworkServer -h localhost

Step 3: Install Apache Spark

# Specify Spark version
export SPARK_VERSION=3.2.3

# Download and extract Spark
curl -O https://archive.apache.org/dist/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop2.7.tgz
tar -xf spark-$SPARK_VERSION-bin-hadoop2.7.tgz

# Set Spark home
export SPARK_HOME=/Users/soumilshah/Desktop/soumil/dbt/spark-3.2.3-bin-hadoop2.7
echo $SPARK_HOME

# Clean up downloaded files
rm spark-3.2.3-bin-hadoop2.7.tgz

Step 4: Copy JAR files

# Copy JAR files to Spark JARS directory
cp /Users/soumilshah/Desktop/myjar/*.jar $SPARK_HOME/jars/

Step 5: Spark Submit Configuration

# Submit Spark job
spark-submit \
  --master 'local[*]' \
  --conf spark.executor.extraJavaOptions=-Duser.timezone=Etc/UTC \
  --conf spark.eventLog.enabled=false \
  --conf spark.sql.warehouse.dir=file:///Users/soumilshah/Desktop/soumil/dbt \
  --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 \
  --packages 'org.apache.spark:spark-sql_2.12:3.2.3,org.apache.spark:spark-hive_2.12:3.2.3,org.apache.hudi:hudi-spark3.2-bundle_2.12:0.14.0' \
  --name "Thrift JDBC/ODBC Server" \
  --executor-memory 5g \
  --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
  --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
  --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \
  --conf hive.metastore.warehouse.dir=/Users/soumilshah/Desktop/soumil/dbt \
  --conf hive.metastore.schema.verification=false \
  --conf datanucleus.schema.autoCreateAll=true \
  --conf javax.jdo.option.ConnectionDriverName=org.apache.derby.jdbc.ClientDriver \
  --conf 'javax.jdo.option.ConnectionURL=jdbc:derby://localhost:1527/MyDatabase;create=true'

Question why do I have to use apache derby I mean when I am simply using Spark SQL and hive server I am able to create Hudi Tables through beeline why does it fails on dbt run

soumilshah1995 commented 10 months ago

Screenshot 2023-12-23 at 11 13 29 AM

will be creating YouTube videos for this which will help everyone