Knight-Wu / articles

博客
3 stars 1 forks source link

spark #6

Closed Knight-Wu closed 6 years ago

Knight-Wu commented 6 years ago

spark执行的大致流程

To summarize, the following phases occur during Spark execution:

  1. User code defines a DAG (directed acyclic graph) of RDDs Operations on RDDs create new RDDs that refer back to their parents, thereby creating a graph.
  2. Actions force translation of the DAG to an execution plan When you call an action on an RDD it must be computed. This requires computing its parent RDDs as well. Spark’s scheduler submits a job to compute all needed RDDs. That job will have one or more stages, which are parallel waves of computation composed of tasks. Each stage will correspond to one or more RDDs in the DAG. A single stage can correspond to multiple RDDs due to pipelining. Tasks are scheduled and executed on a cluster
  3. Stages are processed in order, with individual tasks launching to compute segments of the RDD. Once the final stage is finished in a job, the action is complete.

image

task、partition关系

  1. stage里面的task = 当前RDD其依赖或上一次的RDD partition,若是从file生成的RDD依赖指定的partition数量
  2. task由scheduler 指定partition所在的node去执行, 等于说哪个节点保存这个partition, 由这个节点去计算task.

The number of tasks in a stage is the same as the number of partitions in the last RDD in the stage. The number of partitions in an RDD is the same as the number of partitions in the RDD on which it depends, with a couple exceptions: the coalesce transformation allows creating an RDD with fewer partitions than its parent RDD, the union transformation creates an RDD with the sum of its parents’ number of partitions, and cartesian(笛卡尔) creates an RDD with their product.

RDDs produced by textFile or hadoopFile have their partitions determined by the underlying MapReduce InputFormat that's used. Typically there will be a partition for each HDFS block being read. Partitions for RDDs produced by parallelize come from the parameter given by the user, or spark.default.parallelism if none is given.

lineage(血统)

each RDD has a set of partitions, which are atomic pieces of the dataset; a set of dependencies on parent RDDs, which capture its lineage; a function for computing the RDD based on its parents; and metadata about its partitioning scheme and data placement.

Each stage contains as many pipelined transformations with narrow dependencies as possible. The boundaries of the stages are the shuffle operations required for wide dependencies wide dependency or any cached partitions that can short-circuit the computation of a parent RDD.

Above figure depicts an RDD graph, which is the result of the following series of transformations:

val r00 = sc.parallelize(0 to 9)

val r01 = sc.parallelize(0 to 90 by 10)

val r10 = r00 cartesian df01

val r11 = r00.map(n => (n, n))

val r12 = r00 zip df01

val r13 = r01.keyBy(_ / 20)

val r20 = Seq(r11, r12, r13).foldLeft(r10)(_ union _)

图1 图2

RDD (Resilient Distributed Dataset)

  1. 在内存中计算, 内存中放不下遵循LRU(最近最少使用算法)将其余置换到disk

  2. 懒计算, 直到执行action 操作, 才会去计算RDD

  3. 容错性

    使用lineage (血统) 可以在其他节点并行计算failed partition of RDD, 如果有备份则可以直接计算,更快; 否则要根据上次计算的结果重新计算. 若driver故障, 则所有executor的计算结果都会丢失 设置replication, 参考 RDD Persistence , 使用这个配置: MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.

  4. 不可变性(Immutability)

    一旦创建了rdd, 就是不能修改的, 除非生成新的 rdd, 避免了并发计算的问题, 而且每次 rdd transformation是确定的

rdd.toDebugString() 可以打印出rdd 的生成链

spark persist

参考自 https://github.com/JerryLead/SparkInternals/blob/master/markdown/6-CacheAndCheckpoint.md

  1. reliable

SparkContext.setCheckpointDir(directory: String) to set the checkpoint directory, 目录必须是hdfs路径, 因为 checkPoint file实际上是保存在executor 机器上的.

用法: RDD.checkpoint() , 当前rdd被保存, 对 parent rdd的引用都会被移除; 每个job执行完之后, 会往前回溯所有的RDD, 若需要checkpoint, 则标记为 CheckpointingInProgress, 最后启动一个新的job 完成Checkpoint. job完成 checkpoint之后, 会将rdd的所有 dependency释放掉, 设置该rdd的状态为 checkpoint, 并为该rdd 设置一个强依赖, 设置该rdd的 parent rdd为 CheckpointRDD, 该 CheckpointRDD 负责读取文件系统的 Checkpoint文件, 生成对应rdd 的 partition.

checkpoint 也是lazy的, 触发了action之后, 才会往前回溯到需要checkpoint的RDD 进行checkpoint. checkpoint的时候, 会重新起一个新的job 去计算需要checkpoint 的rdd, 所以一般在checkpoint 之前先执行 cache, 则后续的checkpoint 的过程就直接把内存中的数据持久化到硬盘中了, 省去了重复计算.

spark shuffle

参考自 SparkInternals-shuffleDetails

shuffle会产生两个stage, 分别对应 shuffle write和shuffle read shuffle write: 可以当做mapper阶段, 第一个stage 中每个task中的记录, 通过 partitioner.partition(record.getKey())) (默认是HashPartitioner), 会被分散到 bucket上, 每个task 对应的bucket的数量 == reducer的数量 == 下一个stage的task的数量 shuffle read: 可以当做reducer阶段, 第二个stage 中的task, 根据task的id 和mapper的id从远端或者本地的 bucket上面 fetch 记录进行reduce.

spark 资源分配

spark 性能调优

美团点评spark基础篇

  1. 避免创建重复RDD 2.尽量重复使用RDD 3.对多次使用的RDD持久化 4.尽量避免shuffle类算子 5.使用map端预聚合的算子, 类似于MR的combiner 6.使用高性能算子

数据倾斜

某个parttion的大小远大于其他parttion,stage执行的时间取决于task(parttion)中最慢的那个,导致某个stage执行过慢

driver的作用

  1. 生成lineage, 将用户提交的程序划分成多个task去执行,
  2. 作为调度的角色, 提交请求到executor

spark sql和presto的区别

hive on spark, hive on MR 和spark sql的区别

hive on spark, sql的优化都是通过hive的, 这方便hive的经验比较丰富, 最后启动的还是一个spark job

spark sql 默认使用hive metastore去管理metadata, 使用spark 自身的sql 优化器: catalyst.

spark-submit

!/bin/sh

set -o nounset

第一个错误, shell终止执行

set -o errexit export SPARK_HOME= lib_path= tempView=mblTempView outputDir=/user/wutong/mblOutput mblFile=mbl.txt hdfs dfs -rm -r -f ${outputDir} ${SPARK_HOME}"/bin/spark-submit" \ --class com.mucfc.cms.spark.job.mainClass.PushMblToMerchantShellMain \ --master yarn \ --deloy-mode client \ --queue root \ --diver-class-path ${lib_path}"" \ --jars ${lib_path}"" \ --conf spark.default.parallelism=200 \ --conf spark.sql.shuffle.partitions=400 \ --conf spark.executor.cores=3 \ --conf spark.executor.memory=450m \ --conf spark.executor.instances=4 \ ${lib_path}"cms-sparkintegration.jar" "$@" \ -file_name -insertSql "insert overwrite table crm_appl.mbl_filter select ${tempView}" \ -tempView ${tempView} \ -outputSql "select from crm_app.mbl_filter" \ -showSql "select from ${tempView}" \ -outputDir ${outputDir} \ -inputPartitionNum 200 \ -outputPartitionNum 200 \ -inputFlag true \ -appNme PushMblToMerchantShellMain \ -local_file_path ${lib_path}${mblFile} -remote_file_path /user/wutong/mblFile \ -countForShortMbl 3000 -encrypt_type md5

#### spark sql 执行的大致流程
![image](https://user-images.githubusercontent.com/20329409/42792823-b5ed02f6-89a9-11e8-995b-a68e53a4f1c8.png)

> 参考自
> 1. [sparksql-catalyst](http://hbasefly.com/2017/03/01/sparksql-catalyst/)
> 2. mit.edu-sigmod_spark_sql.pdf , 百度云网盘/book

>  首先将sql 语句通过parser模块解析成语法树, 称作 unresoloved logical plan, 这个plan再通过 Analyzer借助元数据解析为 logical plan, 再通过基于规则的优化, 得到 optimized logical plan, 此时执行计划依然是逻辑的, 并不能被spark 理解, 还需要转化为 physical plan.

1. parser
> 将一长串sql 解析为一个语法树, 相当于是划分成一个个token, 并指定了token执行的先后顺序, 该模块基本都使用第三方类库 antlr实现. 
2. analyzer
> 遍历第一步提到的语法树, 将一个个token替换为具体的函数, 例如token为sum(), 则替换为具体的聚合函数; 并做数据类型的绑定, 定位到那个表的哪个列.
3. optimzer
> 分为基于规则优化和基于代价优化(目前支持还不好, 具体说来是比较多种规则优化下哪个的时间代价越小, 就采用哪些规则进行优化), 规则优化例如谓词下推(Predicate Pushdown), 例如是filter下推到join之前, 先进行过滤再join, 减少大量数据; 常量累加(Constant Folding), 将中间值事先计算, 得出一个中间结果; 和列值裁剪(Column Pruning), 例如只要一列数据的, 就只传递该列数据, 减少大量的io和带宽. 

4. 将逻辑执行计划转化为物理执行计划, 转化为特定的算子进行计算.

* 查看spark sql执行计划
> 1. 使用queryExecution方法查看逻辑执行计划,使用explain方法查看物理执行计划, 在spark-sql 命令行, 执行

spark.sql("xxxsql").queryExecution() spark.sql("xxxsql").explain()



> 2. 使用Spark WebUI进行查看

#### spark join
> 分为 shuffle hash join、broadcast hash join以及sort merge join

* hash join
> 将小表作为Build Table,大表作为Probe Table; 先将小表的join key, hash到bucket中, 构建hashtable, hashtable如果太大, 会放到磁盘上; 再讲大表的join key进行hash到同一个bucket中, 再判断两者的key是否相同. > 时间复杂度: O(a+b), 传统的笛卡尔积是 O(a*b)

> 选择小表作为build table, 生成的hashTable比较小, 能够完全放到内存中.

* hash join的分布式改造
1. broadcast hash join

> 将小表广播到大表的所有节点上, 适用于小表很小的情况

> SparkSQL规定broadcast hash join执行的基本条件为被广播小表必须小于参数spark.sql.autoBroadcastJoinThreshold,默认为10M。

2. Shuffle Hash Join

> 将两个表按照join key进行重分区(HashPartition) , 再在各个节点上进行hash join, 适用于一个大表,一个小表的情况. 

3. Sort-Merge Join
可以看下 [spark-join-pull过程](https://github.com/apache/spark/pull/3173)
> 将两个表先进行shuffle, 再在各个分区节点的数据进行sort, 最后再根据join key 进行merge. 适用两个大表的情况, 因为spark的shuffle是 sort-based shuffle,shuffle之前就排序好了. 

#### spark log 
> 分为三部分: driver log, executor log, spark history server log

* driver log
> 使用 yarn logs -applicationId xxxId > tmp.log
* executor log
> 在下图目录下, 需要去hdfs 找applicationId下的目录
![yarn-log](79898B9FF10345A0892B26B69A8BDD7D)

* spark history server log
> 配置的路径

![history-server-log](F380A9E97D484A87ACAD4CEE6C34838F)
> 也可以去spark history server直接看, 但是可能会找不到, 需要看配置. 包含了spark application执行的整个流程和rdd的细节.

---
#### 疑问
* 还是要把本地windows环境起起来, spark如何指定 scala, hadoop
* spark 序列化
* 基准测试 benchmark
* spark集群容错, 如果没有保留中间结果, 则如何重新计算, 从哪里开始 ? 

---
### 工作踩坑指南
#### spark 任务提交
* spark和scala的包都指定为provided,在客户端上指定SPARK_HOME和SCALA_HOME就可依赖到,避免版本问题
    * 如果SPARK_HOME有问题则,会出现 
    > ClassNotFound: sparkSession

    * 出现 xx.scala.xx error, 一般是scala 包有冲突
* 环境变量配置好后,用source生效,然后打开一个新的终端去重启应用才可以取到新的环境变量
* 需要配置分离,不用打包就可以改配置
* hive元数据库(mysql),hive存储文件的位置(hdfs)在切换集群时都需要更换
* 指定hadoop版本
> 参考 [Stack Overflow](https://stackoverflow.com/questions/30906412/noclassdeffounderror-com-apache-hadoop-fs-fsdatainputstream-when-execute-spark-s)
> 参考 [hadoop-provided](https://spark.apache.org/docs/latest/hadoop-provided.html)

* 启动脚本
> 在spark-class2.cmd 有一个 LAUNCHER_OUTPUT, 默认启动完会删除, 可以保留看下启动的脚本, 帮助排查问题

* windows环境下启动spark, 有可能脚本报错, 然后报错信息是乱码, 可以把spark的输出指定 GBK解码, 可能是因为中文环境下windows cmd的编码是中文编码, 所以用utf-8解码是乱码, 看到报错信息之后就很好排查问题了.

**待看**
* [http://www.cnblogs.com/jcchoiling/p/6440102.html](http://www.cnblogs.com/jcchoiling/p/6440102.html)
* book: sigmod_spark_sql.pdf , 百度云网盘

**经典资料**
1. [https://jaceklaskowski.gitbooks.io/mastering-apache-spark/](https://jaceklaskowski.gitbooks.io/mastering-apache-spark/)
2. [lhttps://github.com/JerryLead/SparkInternals](https://github.com/JerryLead/SparkInternals) 
jiangzz commented 5 years ago

您好,最近在研究spark第三方jar包依赖问题,不知为何使用spark-submit --jars 或者--packages 参数不起作用,系统任务提交后,就给我抛出classnotfound异常。找不到MySQL驱动类,我是用的不是CDH安装的Spark。环境是Spark on yarn,测试版本spark-2.4.1 +hadoop-2.9.2

 val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .getOrCreate()
    import spark.implicits._
    val jdbcDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://CentOS:3306/test")
      .option("dbtable", "t_user")
      .option("user", "root")
      .option("password", "root")
      .load()
    jdbcDF.select('id,'salary)
      .groupBy('id)
      .sum("salary")
      .map(row => row.getInt(0)+"\t"+row.getDouble(1))
      .write.text("hdfs:///aa")

任务提交如下

./bin/spark-submit --master yarn --num-executors 2 --executor-cores 2 --class 入口类 --packages mysql:mysql-connector-java:5.1.47  /root/spark-rdd-1.0-SNAPSHOT.jar

其中spark-rdd-1.0-SNAPSHOT.jar并没有将MySQL依赖打包进去,主要就是测试--packages,貌似失败了。于是乎我有使用 ---jars参数 指定jar路径 结果都是一样的,系统会显示当前mysql驱动jar会上传到spark中,但是貌似程序在执行到DriverWrap的时候,就会抛出com.mysql.jdbc.Driver加载不到驱动,ClassNotFound问题。

同样我在chd5 上测试 spark-1.6.0 同样的脚本,确实执行ok的,还请大神指点,谢谢!