apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.44k stars 2.42k forks source link

[SUPPORT] Trino Hudi Minio return Empty Data #11183

Closed soumilshah1995 closed 6 months ago

soumilshah1995 commented 6 months ago

Writer

try:
    import os
    import sys
    import uuid
    import pyspark
    import datetime
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
    from faker import Faker
    import datetime
    from datetime import datetime
    from pyspark.sql.types import StructType, StructField, StringType

    import random 
    import pandas as pd  # Import Pandas library for pretty printing

    print("Imports loaded ")

except Exception as e:
    print("error", e)

HUDI_VERSION = '1.0.0-beta1'
SPARK_VERSION = '3.4'

os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hadoop:hadoop-aws:3.3.2,org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

# Spark session
spark = SparkSession.builder \
    .config('spark.executor.memory', '4g') \
    .config('spark.driver.memory', '4g') \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()

spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://127.0.0.1:9000/")
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "admin")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "password")
spark._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider",
                                     "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")

import uuid
from faker import Faker
from datetime import datetime

faker = Faker()

def get_customer_data(total_customers=2):
    customers_array = []
    for i in range(0, total_customers):
        customer_data = {
            "customer_id": str(uuid.uuid4()),
            "name": faker.name(),
            "created_at": datetime.now().isoformat().__str__(),
            "address": faker.address(),
            "state": str(faker.state_abbr()),  # Adding state information
            "salary": faker.random_int(min=30000, max=100000) 
        }
        customers_array.append(customer_data)
    return customers_array

global total_customers, order_data_sample_size
total_customers = 10000
customer_data = get_customer_data(total_customers=total_customers)

spark_df = spark_df_customers
database="default"
table_name="customers_t1"

path = f"s3a://warehouse/{database}/{table_name}"

hudi_options = {
    'hoodie.table.name': table_name,
    'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
    'hoodie.datasource.write.table.name': 'customers',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.recordkey.field': 'customer_id',
    'hoodie.datasource.write.precombine.field': 'created_at',
    "hoodie.datasource.write.partitionpath.field": "state",

    "hoodie.enable.data.skipping": "true",
    "hoodie.metadata.enable": "true",
    "hoodie.metadata.index.column.stats.enable": "true",

    "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
    "hoodie.datasource.hive_sync.metastore.uris": "thrift://localhost:9083",
    "hoodie.datasource.hive_sync.mode": "hms",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.hive_sync.database": "default",
    "hoodie.datasource.hive_sync.table": table_name,

}

print("\n")
print(path)
print("\n")

spark_df.write.format("hudi"). \
    options(**hudi_options). \
    mode("append"). \
    save(path)

Docker compose

version: "3"

services:
#  mysql:
#    image: quay.io/debezium/example-mysql:2.1
#    container_name: mysql
#    ports:
#      - "3306:3306"
#    environment:
#      MYSQL_ROOT_PASSWORD: debezium
#      MYSQL_USER: mysqluser
#      MYSQL_PASSWORD: mysqlpw
#    restart: always
#
#  fast-data-dev:
#    image: dougdonohoe/fast-data-dev
#    ports:
#      - "3181:3181"
#      - "3040:3040"
#      - "7081:7081"
#      - "7082:7082"
#      - "7083:7083"
#      - "7092:7092"
#      - "8081:8081"
#    environment:
#      - ZK_PORT=3181
#      - WEB_PORT=3040
#      - REGISTRY_PORT=8081
#      - REST_PORT=7082
#      - CONNECT_PORT=7083
#      - BROKER_PORT=7092
#      - ADV_HOST=127.0.0.1

  trino-coordinator:
    image: 'trinodb/trino:428'
    hostname: trino-coordinator
    ports:
      - '8080:8080'
    volumes:
      - ./trino/etc:/etc/trino

  presto:
    container_name: presto
    ports:
      - '8082:8082'
    image: 'prestodb/presto:0.285'
    volumes:
      - ./presto/catalog:/opt/presto-server/etc/catalog
      - ./presto/config.properties:/opt/presto-server/etc/config.properties
      - ./presto/jvm.config:/opt/presto-server/etc/jvm.config
      - ./presto/node.properties:/opt/presto-server/etc/node.properties
      - ./presto/minio.properties:/usr/lib/presto/etc/catalog/minio.properties

  metastore_db:
    image: postgres:11
    hostname: metastore_db
    ports:
      - 5432:5432
    environment:
      POSTGRES_USER: hive
      POSTGRES_PASSWORD: hive
      POSTGRES_DB: metastore
    command: ["postgres", "-c", "wal_level=logical"]
    healthcheck:
      test: ["CMD", "psql", "-U", "hive", "-c", "SELECT 1"]
      interval: 10s
      timeout: 5s
      retries: 5
    volumes:
      - ./postgresscripts:/docker-entrypoint-initdb.d

  hive-metastore:
    hostname: hive-metastore
    image: 'starburstdata/hive:3.1.2-e.18'
    ports:
      - '9083:9083' # Metastore Thrift
    environment:
      HIVE_METASTORE_DRIVER: org.postgresql.Driver
      HIVE_METASTORE_JDBC_URL: jdbc:postgresql://metastore_db:5432/metastore
      HIVE_METASTORE_USER: hive
      HIVE_METASTORE_PASSWORD: hive
      HIVE_METASTORE_WAREHOUSE_DIR: s3://datalake/
      S3_ENDPOINT: http://minio:9000
      S3_ACCESS_KEY: admin
      S3_SECRET_KEY: password
      S3_PATH_STYLE_ACCESS: "true"
      REGION: ""
      GOOGLE_CLOUD_KEY_FILE_PATH: ""
      AZURE_ADL_CLIENT_ID: ""
      AZURE_ADL_CREDENTIAL: ""
      AZURE_ADL_REFRESH_URL: ""
      AZURE_ABFS_STORAGE_ACCOUNT: ""
      AZURE_ABFS_ACCESS_KEY: ""
      AZURE_WASB_STORAGE_ACCOUNT: ""
      AZURE_ABFS_OAUTH: ""
      AZURE_ABFS_OAUTH_TOKEN_PROVIDER: ""
      AZURE_ABFS_OAUTH_CLIENT_ID: ""
      AZURE_ABFS_OAUTH_SECRET: ""
      AZURE_ABFS_OAUTH_ENDPOINT: ""
      AZURE_WASB_ACCESS_KEY: ""
      HIVE_METASTORE_USERS_IN_ADMIN_ROLE: "admin"
    depends_on:
      - metastore_db
    healthcheck:
      test: bash -c "exec 6<> /dev/tcp/localhost/9083"

  minio:
    image: minio/minio
    environment:
      - MINIO_ROOT_USER=admin
      - MINIO_ROOT_PASSWORD=password
      - MINIO_DOMAIN=minio
    networks:
      default:
        aliases:
          - warehouse.minio
    ports:
      - 9001:9001
      - 9000:9000
    command: ["server", "/data", "--console-address", ":9001"]

  mc:
    depends_on:
      - minio
    image: minio/mc
    environment:
      - AWS_ACCESS_KEY_ID=admin
      - AWS_SECRET_ACCESS_KEY=password
      - AWS_REGION=us-east-1
    entrypoint: >
      /bin/sh -c "
      until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
      /usr/bin/mc rm -r --force minio/warehouse;
      /usr/bin/mc mb minio/warehouse;
      /usr/bin/mc policy set public minio/warehouse;
      tail -f /dev/null
      "

volumes:
  hive-metastore-postgresql:

networks:
  default:
    name: hudi

catalog/hive.property

connector.name=hive
hive.metastore.uri=thrift://hive-metastore:9083

hudi.properties

connector.name=hudi
hive.metastore.uri=thrift://hive-metastore:9083
hive.s3.aws-access-key=admin
hive.s3.aws-secret-key=password
hive.s3.endpoint=http://minio:9000/
hive.s3.path-style-access=true
hive.s3.ssl.enabled=false

config.properties

#single node install config
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
discovery-server.enabled=true
discovery.uri=http://localhost:8080

jvm.config

-server
-Xmx1G
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:+UseGCOverheadLimit
-XX:+ExitOnOutOfMemoryError
-XX:ReservedCodeCacheSize=256M
-Djdk.attach.allowAttachSelf=true
-Djdk.nio.maxCachedBufferSize=2000000

node.properties

node.environment=docker
node.data-dir=/data/trino
plugin.dir=/usr/lib/trino/plugin

Screenshot 2024-05-09 at 9 57 52 AM

Query o/p

image

soumilshah1995 commented 6 months ago

tried image: 'trinodb/trino:latest' same issue looks like they have swapped java version to 22

h-5.1$ java --version
openjdk 22.0.1 2024-04-16
OpenJDK Runtime Environment Temurin-22.0.1+8 (build 22.0.1+8)
OpenJDK 64-Bit Server VM Temurin-22.0.1+8 (build 22.0.1+8, mixed mode, sharing)
sh-5.1$ 

any updates how to fix this issue ?

soumilshah1995 commented 6 months ago

older version solved the issue image: 'trinodb/trino:400'