apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.41k forks source link

[SUPPORT] How to use Flink with External Hive MetaStore (HiveSync) For Community Video #11105

Closed soumilshah1995 closed 4 months ago

soumilshah1995 commented 4 months ago

Hello,

I'm currently working on creating comprehensive content for our community, focusing on integrating Apache Flink (version 1.17.1) with Python (3.8.19). While building the setup, I've encountered an issue that requires assistance from the community.

Setup:

Apache Flink version: 1.17.1 Python version: 3.8.19 Steps to Reproduce:

I've provided a Docker Compose file along with the necessary configurations to replicate the setup. Below are the essential components included in the Docker Compose file:

version: "3"

services:
  mysql:
    image: quay.io/debezium/example-mysql:2.1
    container_name: mysql
    ports:
      - "3306:3306"
    environment:
      MYSQL_ROOT_PASSWORD: debezium
      MYSQL_USER: mysqluser
      MYSQL_PASSWORD: mysqlpw
    restart: always

  fast-data-dev:
    image: dougdonohoe/fast-data-dev
    ports:
      - "3181:3181"
      - "3040:3040"
      - "7081:7081"
      - "7082:7082"
      - "7083:7083"
      - "7092:7092"
      - "8081:8081"
    environment:
      - ZK_PORT=3181
      - WEB_PORT=3040
      - REGISTRY_PORT=8081
      - REST_PORT=7082
      - CONNECT_PORT=7083
      - BROKER_PORT=7092
      - ADV_HOST=127.0.0.1

  trino-coordinator:
    image: 'trinodb/trino:latest'
    hostname: trino-coordinator
    ports:
      - '8080:8080'
    volumes:
      - ./trino/etc:/etc/trino

  metastore_db:
    image: postgres:11
    hostname: metastore_db
    ports:
      - 5432:5432
    environment:
      POSTGRES_USER: hive
      POSTGRES_PASSWORD: hive
      POSTGRES_DB: metastore
    command: ["postgres", "-c", "wal_level=logical"]
    healthcheck:
      test: ["CMD", "psql", "-U", "hive", "-c", "SELECT 1"]
      interval: 10s
      timeout: 5s
      retries: 5
    volumes:
      - ./postgresscripts:/docker-entrypoint-initdb.d

  hive-metastore:
    hostname: hive-metastore
    image: 'starburstdata/hive:3.1.2-e.18'
    ports:
      - '9083:9083' # Metastore Thrift
    environment:
      HIVE_METASTORE_DRIVER: org.postgresql.Driver
      HIVE_METASTORE_JDBC_URL: jdbc:postgresql://metastore_db:5432/metastore
      HIVE_METASTORE_USER: hive
      HIVE_METASTORE_PASSWORD: hive
      HIVE_METASTORE_WAREHOUSE_DIR: s3://warehouse/
      S3_ENDPOINT: http://minio:9000
      S3_ACCESS_KEY: admin
      S3_SECRET_KEY: password
      S3_PATH_STYLE_ACCESS: "true"
      REGION: ""
      GOOGLE_CLOUD_KEY_FILE_PATH: ""
      AZURE_ADL_CLIENT_ID: ""
      AZURE_ADL_CREDENTIAL: ""
      AZURE_ADL_REFRESH_URL: ""
      AZURE_ABFS_STORAGE_ACCOUNT: ""
      AZURE_ABFS_ACCESS_KEY: ""
      AZURE_WASB_STORAGE_ACCOUNT: ""
      AZURE_ABFS_OAUTH: ""
      AZURE_ABFS_OAUTH_TOKEN_PROVIDER: ""
      AZURE_ABFS_OAUTH_CLIENT_ID: ""
      AZURE_ABFS_OAUTH_SECRET: ""
      AZURE_ABFS_OAUTH_ENDPOINT: ""
      AZURE_WASB_ACCESS_KEY: ""
      HIVE_METASTORE_USERS_IN_ADMIN_ROLE: "admin"
    depends_on:
      - metastore_db
    healthcheck:
      test: bash -c "exec 6<> /dev/tcp/localhost/9083"

  minio:
    image: minio/minio
    environment:
      - MINIO_ROOT_USER=admin
      - MINIO_ROOT_PASSWORD=password
      - MINIO_DOMAIN=minio
    networks:
      default:
        aliases:
          - warehouse.minio
    ports:
      - 9001:9001
      - 9000:9000
    command: ["server", "/data", "--console-address", ":9001"]

  mc:
    depends_on:
      - minio
    image: minio/mc
    environment:
      - AWS_ACCESS_KEY_ID=admin
      - AWS_SECRET_ACCESS_KEY=password
      - AWS_REGION=us-east-1
    entrypoint: >
      /bin/sh -c "
      until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
      /usr/bin/mc rm -r --force minio/warehouse;
      /usr/bin/mc mb minio/warehouse;
      /usr/bin/mc policy set public minio/warehouse;
      tail -f /dev/null
      "

volumes:
  hive-metastore-postgresql:

networks:
  default:
    name: hudi

Issue:

The problem arises when I include the following three parameters related to Hive sync in my Flink setup:

'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://localhost:9083'

With these parameters included, I'm unable to insert records into the system. However, when I remove these parameters, the system functions properly, allowing record insertion.

Additional Context:

This setup has been previously tested for Delta Streamer, as documented here. The goal is to achieve similar functionality with Apache Flink instead of Spark.

Code Snippet:

I've provided a full code snippet demonstrating the setup and execution process. This includes defining the Hudi table, executing SQL queries, and attempting to insert records into the Hudi table.

import os
os.environ['JAVA_HOME'] = '/opt/homebrew/opt/openjdk@11'

from pyflink.table import EnvironmentSettings, TableEnvironment
import os

# Create a batch TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
CURRENT_DIR = os.getcwd()

# Define a list of JAR file names you want to add
jar_files = [
    "jar/flink-s3-fs-hadoop-1.17.1.jar",
    "jar/hudi-flink1.17-bundle-0.14.0.jar"
]

jar_urls = [f"file:///{CURRENT_DIR}/{jar_file}" for jar_file in jar_files]

table_env.get_config().get_configuration().set_string(
    "pipeline.jars",
    ";".join(jar_urls)
)

hudi_output_path = 'file:////Users/soumilshah/Desktop/my-flink-environment/hudi/'

hudi_sink = f"""
CREATE TABLE hudi_table(
    ts BIGINT,
    uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED,
    rider VARCHAR(20),
    driver VARCHAR(20),
    fare DOUBLE,
    city VARCHAR(20)
)
PARTITIONED BY (`city`)
WITH (
    'connector' = 'hudi',
    'path' = '{hudi_output_path}' ,
    'table.type' = 'COPY_ON_WRITE' ,
    'hive_sync.enable' = 'true',
    'hive_sync.mode' = 'hms',
    'hive_sync.metastore.uris' = 'thrift://localhost:9083'
);
"""

# Execute the SQL to create the Hudi table
table_env.execute_sql(hudi_sink).wait()

# Define the SQL query to select data from the Hudi table source
query = """
INSERT INTO hudi_table
VALUES
(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
(1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'),
(1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'),
(1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
(1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'),
(1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo'),
(1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai'),
(1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
"""

table_env.execute_sql(query)

# Define the SQL query to select data from the Hudi table source
query = """
SELECT * FROM hudi_table where  uuid = '334e26e9-8355-45cc-97c6-c31daf0df330';
"""

table_env.execute_sql(query).print()

Request for Assistance:

I'm seeking assistance from the community to understand how to properly configure Hive sync parameters in my Flink setup. Any guidance or insights would be greatly appreciated.

danny0405 commented 4 months ago

By default the Flink hudi bundle does not include any Hive related jar in dependencies, you need you package the jar manually following this doc: https://hudi.apache.org/docs/syncing_metastore#flink-setup

soumilshah1995 commented 4 months ago

is there any jar available on mvn ?

soumilshah1995 commented 4 months ago

Screenshot 2024-04-27 at 7 56 14 AM

what jar do I need can I find that on mvn ?

im running hive metastore locally and its accessible on 9083

danny0405 commented 4 months ago

is there any jar available on mvn ?

No, you need to compile it manually.

soumilshah1995 commented 4 months ago

can you help me on steps if you don't mind

soumilshah1995 commented 4 months ago

is there docker setup with Hive metastore and flink and hudi I really want to learn how to setup

ad1happy2go commented 4 months ago

@soumilshah1995 We don't have docker setup for that. You can build like this - https://hudi.apache.org/docs/syncing_metastore#install

soumilshah1995 commented 4 months ago

Thank you.

For future reference, would it be helpful to provide links for users to download pre-built JAR files? It would greatly simplify the process, akin to how we offer JAR files for Spark. Having similar resources available for Flink and Hive would be immensely beneficial.

danny0405 commented 4 months ago

Thanks for the feedback, the reason that we do not include the hive into Flink Hudi bundle jar is because Flink classloader and jar conflicts is more complex, there is high possibility the hive jar can conflict with the other ones.

soumilshah1995 commented 4 months ago

gotit make sense :D