GunSik2 / k8s-ai

ai/bigdata/gpu examples with k8s
0 stars 0 forks source link

spark programming #21

Open GunSik2 opened 2 years ago

GunSik2 commented 2 years ago

Spark 3.1

Delta

참고자료

GunSik2 commented 2 years ago

Apache Spark and Getting Started

Expend

sum 1 ~ 100 numbers ``` from pyspark.sql import SparkSession # Spark session & context spark = SparkSession.builder.master('local').getOrCreate() sc = spark.sparkContext # Sum of the first 100 whole numbers rdd = sc.parallelize(range(100 + 1)) rdd.sum() ``` count words from text file ``` from pyspark.sql import SparkSession # Spark session & context spark = SparkSession.builder.master('local').getOrCreate() # Read file and count strings = spark.read.text("test.txt").cache() strings.show(5, truncate=False) strings.count() ``` count filtered words from text file ``` from pyspark.sql import SparkSession # Spark session & context spark = SparkSession.builder.master('local').getOrCreate() # Read file and count strings = spark.read.text("test.txt").cache() numAs = logData.filter(strings.value.contains('a')).count() numBs = logData.filter(strings.value.contains('b')).count() print("Lines with a: %i, lines with b: %i" % (numAs, numBs)) spark.stop() ``` [MnMCount](https://github.com/databricks/LearningSparkV2/blob/master/chapter2/py/src/mnmcount.py) ``` git clone https://github.com/databricks/LearningSparkV2 cd LearningSparkV2/chapter2/py/src park-submit mnmcount.py data/mnm_dataset.csv ```

GunSik2 commented 2 years ago

Apache Spark’s Structured APIs

Expend

### RDD (Resilient Distributed Datasets) - Fault-Tolerant Abstraction for In-Memory Cluster Computing - 스파크의 가장 기본적인 추상화 - 세 가지 중요한 특성 - 종속성 : 일련의 종속성을 이용해 입력값을 이용 RDD 재구성이 가능 (Spark는 모든 변환의 계보를 유지) - 파티션 : 작업을 분할하여 연산을 병렬 처리 (Locality 이용한 데이터 근접에 Executor 실행하여 데이터 전송을 줄임) - 연산 기능 (파티션) : RDD에 저장될 데이터로 Iterator[T] 를 생성하는 연산 함수 - 문제점 - 연산함수는 Spark에게 불투명하여 조인, 필터, 선택 과 같은 수행 작업을 구분 못하며, 최적화 할 수 없음 - Spark는 데이터 개체에 대한 정보 없이 개체를 바이트로 직렬화하는 역할만 수행 가능 - 예시 ``` # Create an RDD of tuples (name, age) dataRDD = sc.parallelize([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)]) # Use map and reduceByKey transformations with their lambda # expressions to aggregate and then compute average agesRDD = (dataRDD .map(lambda x: (x[0], (x[1], 1))) .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) .map(lambda x: (x[0], x[1][0]/x[1][1]))) ``` ### Structuring Spark - Spark 2.x는 Spark를 구조화하기 위한 몇 가지 핵심 체계를 도입 - 데이터 분석에서 발견되는 공통 패턴을 사용하여 표현 : 필터링, 선택, 계산, 집계, 평균화 및 그룹화 - 실행을 위한 효율적인 쿼리 계획을 구성 - 구조화는 Spark 구성 요소 전반에 걸쳐 더 나은 성능과 공간 효율성 이점 제공 - 예시 ``` from pyspark.sql import SparkSession from pyspark.sql.functions import avg # Create a DataFrame using SparkSession spark = (SparkSession .builder .appName("AuthorsAges") .getOrCreate()) # Create a DataFrame data_df = spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)], ["name", "age"]) # Group the same names together, aggregate their ages, and compute an average avg_df = data_df.groupBy("name").agg(avg("age")) # Show the results of the final execution avg_df.show() +------+--------+ | name|avg(age)| +------+--------+ |Brooke| 22.5| | Jules| 30.0| | TD| 35.0| | Denny| 31.0| +------+--------+ ``` ### DataFrame API - Spark DataFrames는 명명된 열 및 스키마가 있는 분산 메모리 테이블과 유사 - 각 열에는 정수, 문자열, 배열, 지도, 실수, 날짜와 같은 특정 데이터 유형 유지 - Spark 의 스키마 는 DataFrame에 대한 열 이름 및 관련 데이터 유형을 정의 - 스키마 정의 예시 : 프로그래밍 방식 ``` # In Python from pyspark.sql.types import * schema = StructType([StructField("author", StringType(), False), StructField("title", StringType(), False), StructField("pages", IntegerType(), False)]) ``` - 스키마 정의 예시 : DDL 방식 ``` # In Python schema = "author STRING, title STRING, pages INT" ``` - DataFrame 예시 ``` # In Python from pyspark.sql import SparkSession # Define schema for our data using DDL schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING, `Published` STRING, `Hits` INT, `Campaigns` ARRAY" # Create our static data data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter", "LinkedIn"]], [2, "Brooke","Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter", "LinkedIn"]], [3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web", "twitter", "FB", "LinkedIn"]], [4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568, ["twitter", "FB"]], [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web", "twitter", "FB", "LinkedIn"]], [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568, ["twitter", "LinkedIn"]] ] # Main program if __name__ == "__main__": # Create a SparkSession spark = (SparkSession.builder.appName("Example-3_6").getOrCreate()) # Create a DataFrame using the schema defined above blogs_df = spark.createDataFrame(data, schema) # Show the DataFrame; it should reflect our table above blogs_df.show() # Print the schema used by Spark to process the DataFrame print(blogs_df.printSchema()) ``` ![image](https://user-images.githubusercontent.com/11453229/136155884-57c7caaf-f362-4ce7-8af7-9b4e7d708a91.png) ### DataFrames vs Datasets ![image](https://user-images.githubusercontent.com/11453229/136157069-924e83d8-978b-451b-ba2a-949bc5ee6d02.png) ### Spark SQL and the Underlying Engine - ANSI SQL:2003 호환 쿼리를 실행 - Spark 구성 요소를 통합하고 Java, Scala, Python 및 R의 DataFrames/Datasets에 대한 추상화를 허용 - Apache Hive 메타스토어 및 테이블에 연결 - 구조화된 파일 형식(JSON, CSV, Text, Avro, Parquet, ORC 등)에서 특정 스키마로 구조화된 데이터를 읽고 쓰고 데이터를 임시 테이블로 변환 - 표준 데이터베이스 JDBC/ODBC 커넥터를 통해 외부 도구에 대한 브리지를 제공 - 최종 실행을 위해 JVM에 최적화된 쿼리 계획 및 압축 코드를 생성 ![image](https://user-images.githubusercontent.com/11453229/136159867-222796ad-ab9c-4dcf-8f7b-a089729d9cfc.png) - 쿼리 플랜 확인 ``` count_mnm_df.explain(True) ``` ![image](https://user-images.githubusercontent.com/11453229/136159982-9d20591a-7635-4cd1-bb0a-ecf1c0de0e5b.png)

GunSik2 commented 2 years ago

Spark SQL and DataFrames: Introduction to Built-in Data Sources

Expend

### Basic Query Examples In Jupyter terminal ``` git clone https://github.com/databricks/LearningSparkV2 ``` In Jupyter notebook ``` from pyspark.sql import SparkSession # Create a SparkSession spark = (SparkSession.builder.appName("SparkSQLExampleApp").getOrCreate()) # Path to data set csv_file = "LearningSparkV2/databricks-datasets/learning-spark-v2/flights/departuredelays.csv" # Read and create a temporary view # Infer schema (note that for larger files you # may want to specify the schema) df = (spark.read.format("csv") .option("inferSchema", "true") .option("header", "true") .load(csv_file)) df.createOrReplaceTempView("us_delay_flights_tbl") ``` 거리가 1,000마일보다 큰 모든 항공편 (Spark.SQL 쿼리 이용) ``` spark.sql("""SELECT distance, origin, destination FROM us_delay_flights_tbl WHERE distance > 1000 ORDER BY distance DESC""").show(10) ``` 거리가 1,000마일보다 큰 모든 항공편 ( DataFrame API 쿼리 이용) ``` from pyspark.sql.functions import col, desc (df.select("distance", "origin", "destination") .where(col("distance") > 1000) .orderBy(desc("distance"))).show(10) ``` 결과 ![image](https://user-images.githubusercontent.com/11453229/136175616-b204a73d-3508-40f1-9332-6aa4e432121e.png) ### SQL 테이블 및 뷰 Spark 테이블은 데이터와 메타데이터를 보유 - 메타 정보: 스키마, 설명, 테이블 이름, 데이터베이스 이름, 열 이름, 파티션, 실제 데이터가 상주하는 물리적 위치 등 - 기본적으로 /user/hive/warehouse에 있는 Apache Hive 메타스토어를 사용 - 구성 변수 spark.sql.warehouse.dir 로 외부 분산 저장소로 설정 가능 - 관리형 테이블과 관리되지 않는 테이블 두 종류 테이블 생성 가능 - 관리형 테이블은 Spark 이 로컬, HDFS, S3 와 같은 파일 저장소에 메타데이터와 데이터 모두를 저장하고 관리 - 비 관리형 테이블은 Cassandra와 같은 외부 데이터는 사용자가 관리하고 Spark은 메타데이터만 관리 관리 - 비 관리형 테이블에서 DROP TABLE 명령은 메타 데이터만을 삭제하며, 실 데이터는 유지됨 - Spark는 기본 default 데이터베이스 내에 테이블을 생성 환경 변수 확인 ``` import os from pyspark.conf import SparkConf sparkConf = SparkConf() sparkConf.getAll() # spark.sql.warehouse.dir os.environ ``` 데이터베이스 및 테이블 생성 ``` from pyspark.sql import SparkSession spark = (SparkSession.builder.appName("SparkSQLExampleApp").getOrCreate()) spark.sql("CREATE DATABASE learn_spark_db") spark.sql("USE learn_spark_db") spark.catalog.listDatabases() spark.catalog.listTables() spark.catalog.listColumns("managed_us_delay_flights_tbl") ``` 관리 테이블 생성 - SQL 쿼리 이용 ``` spark.sql("CREATE TABLE managed_us_delay_flights_tbl (date STRING, delay INT, distance INT, origin STRING, destination STRING)") -- AnalysisException: Hive support is required to CREATE Hive TABLE (AS SELECT); ' CreateTable `learn_spark_db`.`managed_us_delay_flights_tbl`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, ErrorIfExists -- // 메타 확인 spark.catalog.listColumns("managed_us_delay_flights_tbl") ``` - DataFrame API 이용 ``` # Path to data set csv_file = "LearningSparkV2/databricks-datasets/learning-spark-v2/flights/departuredelays.csv" schema="date STRING, delay INT, distance INT, origin STRING, destination STRING" flights_df = spark.read.csv(csv_file, schema=schema) flights_df.write.saveAsTable("managed_us_delay_flights_tbl") ``` 비 관리형 테이블 생성 - SQL 이용 ``` spark.sql("""CREATE TABLE us_delay_flights_tbl(date STRING, delay INT, distance INT, origin STRING, destination STRING) USING csv OPTIONS (PATH 'LearningSparkV2/databricks-datasets/learning-spark-v2/flights/departuredelays.csv')""") ``` - DataFrame API 이용 ``` (flights_df .write .option("path", "/tmp/data/us_flights_delay") .saveAsTable("us_delay_flights_tbl")) ``` 테이블을 DataFrame으로 읽기 ``` us_flights_df = spark.sql("SELECT * FROM managed_us_delay_flights_tbl") ``` 테스트 예시 ![image](https://user-images.githubusercontent.com/11453229/136311137-272ec017-f3bc-42f6-9377-3cd447837eba.png) ### 데이터 읽기 및 저장 - DataFrameWriter및 DataFrameReader - format: "parquet", "csv", "txt", "json", "jdbc", "orc", "avro" - no schema is needed when reading from a static Parquet data source - Parquet 읽고 쓰기 ``` df = (spark.read.format("parquet") .load("LearningSparkV2/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/2010-summary.parquet/") ) // 파일로 쓰기 (df.write.format("parquet") .mode("overwrite") .option("compression", "snappy") .save("/tmp/df_parquet")) ``` - CSV 읽고 쓰기 ``` // cvs write df.write.format("csv").mode("overwrite").save("/tmp/df_csv") // csv load as dataframe df = (spark.read.format("csv") .option("inferSchema", "true") .option("header", "true") .option("mode", "PERMISSIVE") .load("/tmp/df_csv/*")) df.show() ``` - Json 읽고 쓰기 ``` // json write df.write.format("json").mode("overwrite").save("df_json") // json load df = spark.read.format("json").load("df_json/*") df.show() ``` - SQL 읽고 쓰기 ``` // csv load as SQL table spark.sql("""CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_cvs_tbl USING csv OPTIONS ( path "/tmp/df_csv/*", header "true", inferSchema "true", mode "FAILFAST") """) spark.sql("select * from us_delay_flights_cvs_tbl").show() // parquet load as SQL table spark.sql("""CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_sql_tbl USING parquet OPTIONS ( path "/tmp/df_parquet") """) spark.sql("select * from us_delay_flights_sql_tbl").show() // SQL 테이블로 쓰기 df.write.mode("overwrite").saveAsTable("us_delay_flights_tbl") spark.sql("select * from us_delay_flights_tbl").show() ``` - 이미지 읽기 ``` from pyspark.ml import image image_dir = "LearningSparkV2/databricks-datasets/learning-spark-v2/cctvVideos/train_images/" images_df = spark.read.format("image").load(image_dir) images_df.printSchema() root |-- image: struct (nullable = true) | |-- origin: string (nullable = true) | |-- height: integer (nullable = true) | |-- width: integer (nullable = true) | |-- nChannels: integer (nullable = true) | |-- mode: integer (nullable = true) | |-- data: binary (nullable = true) |-- label: integer (nullable = true) images_df.select("image.height", "image.width", "image.nChannels", "image.mode", "label").show(5, truncate=False) +------+-----+---------+----+-----+ |height|width|nChannels|mode|label| +------+-----+---------+----+-----+ |288 |384 |3 |16 |0 | |288 |384 |3 |16 |1 | |288 |384 |3 |16 |0 | |288 |384 |3 |16 |0 | |288 |384 |3 |16 |0 | +------+-----+---------+----+-----+ only showing top 5 rows ```

GunSik2 commented 2 years ago

Structured Straming

df = (spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "my-cluster-kafka-bootstrap:9092") .option("subscribe", "my-topic") .option("startingOffsets", "latest")
.load())

ds = df.selectExpr("CAST(value AS STRING)")

df.printSchema() ds.printSchema()

![image](https://user-images.githubusercontent.com/11453229/136893181-a292db4d-51dc-4b5e-ab1b-6de97fa5d313.png)

- 실행2

kubectl run kafka-producer -ti --image=quay.io/strimzi/kafka:0.25.0-kafka-2.8.0 \ --rm=true --restart=Never -- bin/kafka-console-producer.sh \ --broker-list my-cluster-kafka-bootstrap:9092 \ --topic my-topic

![image](https://user-images.githubusercontent.com/11453229/136893371-cd62bf3c-a83a-4642-8ce1-97a0af2073a2.png)

- 실행3

rawQuery = df.writeStream.queryName("qraw").format("memory").start()

raw = spark.sql("select * from qraw") raw.show()

![image](https://user-images.githubusercontent.com/11453229/136893211-e4684e68-df0d-4969-9c4b-51073e38efcb.png)

- 실행4

alertQuery = ds.writeStream.queryName("qalerts").format("memory").start()

alerts = spark.sql("select * from qalerts") alerts.show()

![image](https://user-images.githubusercontent.com/11453229/136893234-e442188d-a9d2-4df2-8a87-5fbe844cf74e.png)

### Kafka 환경구성
방법1) Jupyter 노트북에서 kafak 모듈 자동 로딩 환경 구성

// Jupyter Pod 이동 후 설정 변경

cat /opt/conda/share/jupyter/kernels/python3/kernel.json

"language": "python", "env" : { "PYSPARK_SUBMIT_ARGS": "--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 pyspark-shell" }

방법2) Jupyter 노트북에서 OS 환경 설정 

import os os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 pyspark-shell'



참고자료
- https://mtpatter.github.io/bilao/notebooks/html/01-spark-struct-stream-kafka.html
- https://stackoverflow.com/questions/49861973/run-pyspark-and-kafka-in-jupyter-notebook
- https://docs.databricks.com/spark/latest/structured-streaming/demo-notebooks.html#structured-streaming-demo-python-notebook