Open GunSik2 opened 3 years ago
Jupyter 환경 구성
!pip install pyspark==3.1.2
!pip install delta-spark
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '\
--packages io.delta:delta-core_2.12:1.0.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
pyspark-shell'
기본 테스트
from pyspark.sql import SparkSession
from delta import *
builder = SparkSession.builder.appName("MyApp") \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
data = spark.range(0, 5) data.write.format("delta").save("delta-table")
df = spark.read.format("delta").load("delta-table") df.show()
- overwrite and read delta table
data = spark.range(5, 10) data.write.format("delta").mode("overwrite").save("delta-table")
df = spark.read.format("delta").load("delta-table") df.show()
- update and read delta table
from delta.tables import from pyspark.sql.functions import
deltaTable = DeltaTable.forPath(spark, "delta-table")
deltaTable.update( condition = expr("id % 2 == 0"), set = { "id": expr("id + 100") })
spark.read.format("delta").load("delta-table").show()
- delete delta table
deltaTable.delete(condition = expr("id % 2 == 0"))
spark.read.format("delta").load("delta-table").show()
- upsert delta table
newData = spark.range(0, 20)
deltaTable.alias("oldData") \ .merge( newData.alias("newData"), "oldData.id = newData.id") \ .whenMatchedUpdate(set = { "id": col("newData.id") }) \ .whenNotMatchedInsert(values = { "id": col("newData.id") }) \ .execute()
deltaTable.toDF().show()
Time travel
df = spark.read.format("delta").option("versionAsOf", 0).load("delta-table") df.show()
df = spark.read.format("delta").option("versionAsOf", 1).load("delta-table") df.show()
스트림 데이터 Delta 쓰기
- 스트림 데이터 Delta 저장
streamingDf = spark.readStream.format("rate").load() stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "checkpoint").start("delta-table")
spark.read.format("delta").load("delta-table").show()
- 스트림 중지 및 확인
stream.stop()
spark.read.format("delta").load("delta-table").show()
- delta 테이블 변경 스트림 읽기
stream2 = spark.readStream.format("delta").load("delta-table").writeStream.format("console").start() stream2.stop()
reference
- [Delta Lake QuickStart](https://docs.delta.io/latest/quick-start.html)
Delta Test case JupyterLab
Minio 환경 :
환경 구성
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '\
--packages "io.delta:delta-core_2.12:1.0.0,org.apache.hadoop:hadoop-aws:3.2.0" \
--conf "spark.hadoop.fs.s3a.endpoint=http://172.17.0.1:9000" \
--conf "spark.hadoop.fs.s3a.access.key=miniouser" \
--conf "spark.hadoop.fs.s3a.secret.key=miniouser" \
--conf "spark.hadoop.fs.s3a.path.style.access=true" \
--conf "spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem" \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
--conf "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore" \
pyspark-shell'
S3 에 Delta 쓰고 읽기
from pyspark.sql import SparkSession
from delta import *
spark = SparkSession.builder \
.appName("quickstart1") \
.getOrCreate()
# Create a Delta table on S3:
spark.range(5).write.format("delta").save("s3a://miniouser/test01")
# Read a Delta table on S3:
spark.read.format("delta").load("s3a://miniouser/test01").show()
SQL 문으로 쓰고 읽기
from pyspark.sql import SparkSession
tableName = "delta.`s3a://miniouser/test02/`"
# Enable SQL/DML commands and Metastore tables for the current spark session.
# We need to set the following configs
spark = SparkSession.builder \
.appName("quickstart_sql1") \
.master("local[*]") \
.getOrCreate()
# Clear any previous runs
spark.sql("DROP TABLE IF EXISTS newData")
try:
# Create a table
print("############# Creating a table ###############")
spark.sql("CREATE TABLE %s(id LONG) USING delta" % tableName)
spark.sql("INSERT INTO %s VALUES 0, 1, 2, 3, 4" % tableName)
# Read the table
print("############ Reading the table ###############")
spark.sql("SELECT * FROM %s" % tableName).show()
# Upsert (merge) new data
print("########### Upsert new data #############")
spark.sql("CREATE TABLE newData(id LONG) USING parquet")
spark.sql("INSERT INTO newData VALUES 3, 4, 5, 6")
spark.sql('''MERGE INTO {0} USING newData
ON {0}.id = newData.id
WHEN MATCHED THEN
UPDATE SET {0}.id = newData.id
WHEN NOT MATCHED THEN INSERT *
'''.format(tableName))
spark.sql("SELECT * FROM %s" % tableName).show()
# Update table data
print("########## Overwrite the table ###########")
spark.sql("INSERT OVERWRITE %s select * FROM (VALUES 5, 6, 7, 8, 9) x (id)" % tableName)
spark.sql("SELECT * FROM %s" % tableName).show()
# Update every even value by adding 100 to it
print("########### Update to the table(add 100 to every even value) ##############")
spark.sql("UPDATE {0} SET id = (id + 100) WHERE (id % 2 == 0)".format(tableName))
spark.sql("SELECT * FROM %s" % tableName).show()
# Delete every even value
print("######### Delete every even value ##############")
spark.sql("DELETE FROM {0} WHERE (id % 2 == 0)".format(tableName))
spark.sql("SELECT * FROM %s" % tableName).show()
# Read old version of data using time travel
print("######## Read old data using time travel ############")
df = spark.read.format("delta").option("versionAsOf", 0).table(tableName)
df.show()
finally:
# cleanup
spark.sql("DROP TABLE IF EXISTS newData")
spark.stop()
spark.sql("DESCRIBE HISTORY %s" % tableName).show()
spark.sql("DROP TABLE IF EXISTS test02")
tableName = "delta.s3a://miniouser/test02/
"
spark.sql("SELECT * FROM %s" % tableName).show()
참고자료
- [delta-storage](https://docs.delta.io/latest/delta-storage.html)
- [S3SingleDriverLogStore 필요성](https://github.com/delta-io/delta/issues/324)
- [SQL-based INSERTS, DELETES and UPSERTS in S3](https://dev.to/awscommunity-asean/sql-based-inserts-deletes-and-upserts-in-s3-using-aws-glue-3-0-and-delta-lake-42f0)
- [Point in time Delta Lake table restore after S3 object deletion](https://analyticsanvil.wordpress.com/category/databricks/)
- [Delta Table utility commands](https://docs.databricks.com/delta/delta-utility.html#delta-vacuum)
프라이빗 환경에서 Deltalake 시험
참고자료
12