delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.62k stars 1.71k forks source link

Missing _last_checkpoint file in delta_log folder in cos-bucket #2756

Open sabbasani opened 8 months ago

sabbasani commented 8 months ago

Environment:

Spark version :3.2 Delta version :2.2.0

from pyspark.sql import SparkSession
import os

def init_spark():
    spark = SparkSession.builder \
        .appName("HiveMetastoreExample") \
        .config("spark.sql.catalogImplementation", "hive") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog.type" ,"hive") \
        .config("spark.hadoop.hive.metastore.schema.verification", "false") \
        .config("spark.hadoop.hive.metastore.schema.verification.record.version", "false") \
        .config("spark.hadoop.datanucleus.schema.autoCreateTables", "false") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.hive.metastore.uris", "thrift://xxxxxxxxxxxxxx") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
        .config("spark.hadoop.fs.s3a.bucket.xxxxxxx.endpoint" ,"xxxxx") \
        .config("spark.hadoop.fs.s3a.bucket.xxxxxxx.access.key" ,"xxxxx") \
        .config("spark.hadoop.fs.s3a.bucket.xxxxxxx.secret.key" ,"xxxxxxxxxxxxxx") \
        .enableHiveSupport() \
        .getOrCreate()
    return spark

def main():
    spark = init_spark()
    spark.sql("show databases").show()
    spark.sql("create database if not exists spark_catalog.dep LOCATION 's3a://xxxxxxx/'").show()
    spark.sql("create table if not exists spark_catalog.dep.employee1 (id bigint, name string, location string) USING DELTA").show()
    spark.sql("insert into spark_catalog.dep.employee1 VALUES (1, 'Sam','Kochi'), (2, 'Tom','Bangalore'), (3, 'Bob','Chicago'), (4, 'Alex','dubai')").show()
    spark.sql("select * from spark_catalog.dep.employee1").show()
    # spark.sql("drop table spark_catalog.dep.employee").show()
    # spark.sql("drop schema spark_catalog.dep CASCADE").show()
    # .config("spark.delta.logStore.cos.impl", "io.delta.storage.IBMCOSLogStore") \
    spark.stop()

if __name__ == '__main__':
    main()

I don't find _last_checkpoint file created when in delta_log folder when I run python sample against spark.

ebyhr commented 8 months ago

What happens if you set delta.checkpointInterval tblproperties?

create table if not exists spark_catalog.dep.employee1 (id bigint, name string, location string) USING DELTA