## entrypoint
* 根据环境变量`SPARK_VERSION`设置的spark版本设置`SPARK_HOME`
* 加载Hosts
* 加载扩展jar到`SPARK_DIST_CLASSPATH`
* 外部可通过`PYSPARK_PYTHON`设置pyspark的虚拟环境
* 初始化`kerberos`认证
```sh
#!/bin/bash
env
# echo commands to the terminal output
set -ex
# Check whether there is a passwd entry for the container UID
myuid=$(id -u)
mygid=$(id -g)
# turn off -e for getent because it will return error code in anonymous uid case
set +e
uidentry=$(getent passwd $myuid)
set -e
# set spark version
case "$SPARK_VERSION" in
2) export SPARK_HOME=$SPARK2_HOME ;;
3) export SPARK_HOME=$SPARK3_HOME ;;
*) echo "please set SPARK_VERSION by export environment variable,options[2,3]" ;;
esac
export PYTHONPATH="${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib/py4j-*.zip"
export PATH="${SPARK_HOME}/bin:$PATH"
# entrypoint.sh support add hosts
if [ -n "$HOSTS" ]; then
cat $HOSTS >>/etc/hosts
fi
# If there is no passwd entry for the container UID, attempt to create one
if [ -z "$uidentry" ]; then
if [ -w /etc/passwd ]; then
echo "$myuid:x:$myuid:$mygid:anonymous uid:$SPARK_HOME:/bin/false" >>/etc/passwd
else
echo "Container ENTRYPOINT failed to add passwd entry for anonymous UID"
fi
fi
# check cmd
SPARK_K8S_CMD="$1"
case "$SPARK_K8S_CMD" in
driver | driver-py | driver-r | executor)
shift 1
;;
"") ;;
*)
echo "Non-spark-on-k8s command provided, proceeding in pass-through mode..."
exec /usr/bin/tini -s -- "$@"
;;
esac
# spark classpath
SPARK_CLASSPATH="$SPARK_CLASSPATH:${SPARK_HOME}/jars/*"
env | grep SPARK_JAVA_OPT_ | sort -t_ -k4 -n | sed 's/[^=]*=\(.*\)/\1/g' >/tmp/java_opts.txt
readarray -t SPARK_EXECUTOR_JAVA_OPTS </tmp/java_opts.txt
if [ -n "$SPARK_EXTRA_CLASSPATH" ]; then
SPARK_CLASSPATH="$SPARK_CLASSPATH:$SPARK_EXTRA_CLASSPATH"
fi
# here use $SPARK_DIST_CLASSPATH, the $SPARK_HOME/jars contains the hadoop jar
# ext jars ,eg: hadoop-aws,spark-elasticsearch
export SPARK_DIST_CLASSPATH="$SPARK_DIST_CLASSPATH:${SPARK_HOME}/jars_ext/*"
# If HADOOP_HOME is set and SPARK_DIST_CLASSPATH is not set, set it here so Hadoop jars are available to the executor.
# It does not set SPARK_DIST_CLASSPATH if already set, to avoid overriding customizations of this value from elsewhere e.g. Docker/K8s.
if [ -n "${HADOOP_HOME}" ] && [ -z "${SPARK_DIST_CLASSPATH}" ]; then
export SPARK_DIST_CLASSPATH="$($HADOOP_HOME/bin/hadoop classpath)"
fi
echo "SPARK_DIST_CLASSPATH: $SPARK_DIST_CLASSPATH"
if [ -n "$PYSPARK_FILES" ]; then
PYTHONPATH="$PYTHONPATH:$PYSPARK_FILES"
fi
# python
PYSPARK_ARGS=""
if [ -n "$PYSPARK_APP_ARGS" ]; then
PYSPARK_ARGS="$PYSPARK_APP_ARGS"
fi
# r
R_ARGS=""
if [ -n "$R_APP_ARGS" ]; then
R_ARGS="$R_APP_ARGS"
fi
if [ -z "$PYSPARK_PYTHON" ]; then
if [ "$PYSPARK_MAJOR_PYTHON_VERSION" == "2" ]; then
pyv="$(python -V 2>&1)"
export PYTHON_VERSION="${pyv:7}"
export PYSPARK_PYTHON="python"
export PYSPARK_DRIVER_PYTHON="python"
elif [ "$PYSPARK_MAJOR_PYTHON_VERSION" == "3" ]; then
pyv3="$(python3 -V 2>&1)"
export PYTHON_VERSION="${pyv3:7}"
export PYSPARK_PYTHON="python3"
export PYSPARK_DRIVER_PYTHON="python3"
fi
fi
if ! [ -z ${HADOOP_CONF_DIR+x} ]; then
SPARK_CLASSPATH="$HADOOP_CONF_DIR:$SPARK_CLASSPATH"
fi
case "$SPARK_K8S_CMD" in
driver)
CMD=(
"$SPARK_HOME/bin/spark-submit"
--conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS"
--deploy-mode client
"$@"
)
;;
driver-py)
CMD=(
"$SPARK_HOME/bin/spark-submit"
--conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS"
--deploy-mode client
"$@" $PYSPARK_PRIMARY $PYSPARK_ARGS
)
;;
driver-r)
CMD=(
"$SPARK_HOME/bin/spark-submit"
--conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS"
--deploy-mode client
"$@" $R_PRIMARY $R_ARGS
)
;;
executor)
CMD=(
${JAVA_HOME}/bin/java
"${SPARK_EXECUTOR_JAVA_OPTS[@]}"
-Xms$SPARK_EXECUTOR_MEMORY
-Xmx$SPARK_EXECUTOR_MEMORY
-cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH"
org.apache.spark.executor.CoarseGrainedExecutorBackend
--driver-url $SPARK_DRIVER_URL
--executor-id $SPARK_EXECUTOR_ID
--cores $SPARK_EXECUTOR_CORES
--app-id $SPARK_APPLICATION_ID
--hostname $SPARK_EXECUTOR_POD_IP
)
;;
*)
echo "Unknown command: $SPARK_K8S_CMD" 1>&2
exit 1
;;
esac
# do kerberos auth by environment variable
if [ -n "$KRB5_CONFIG" ]; then
rm -f /etc/krb5.conf
ln -s ${KRB5_CONFIG} /etc/krb5.conf
kinit -kt ${KEYTAB} $PRINCIPAL
cron
crontab -l | {
cat
echo "0 */10 * * * kinit -kt ${KEYTAB} $PRINCIPAL"
} | crontab -
fi
# Execute the container CMD under tini for better hygiene
exec /usr/bin/tini -s -- "${CMD[@]}"
Job aborted due to stage failure: Task 1 in stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in stage 1.0 (TID 5, ip, executor 2):
java.io.IOException:
Failed on local exception:
java.io.IOException:
org.apache.hadoop.security.AccessControlException:
Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details :
local host is: "pysparkapp-1631176344099-exec-2/ip";
destination host is: "ip":8020;
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:772)
at org.apache.hadoop.ipc.Client.call(Client.java:1508)
at org.apache.hadoop.ipc.Client.call(Client.java:1441)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:231)
at com.sun.proxy.$Proxy20.create(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:313)
at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
at com.sun.proxy.$Proxy21.create(Unknown Source)
at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:2146)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1804)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1728)
at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:438)
at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:434)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:434)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:375)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:926)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907)
at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:240)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:174)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]
at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:718)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
at org.apache.hadoop.ipc.Client$Connection.handleSaslConnectionFailure(Client.java:681)
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:769)
at org.apache.hadoop.ipc.Client$Connection.access$3000(Client.java:396)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1557)
at org.apache.hadoop.ipc.Client.call(Client.java:1480)
... 39 more
Caused by: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]
at org.apache.hadoop.security.SaslRpcClient.selectSaslClient(SaslRpcClient.java:172)
at org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:396)
at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:594)
at org.apache.hadoop.ipc.Client$Connection.access$2000(Client.java:396)
at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:761)
at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:757)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:756)
... 42 more
Driver stacktrace:
## DelegationTokenFetcher
>按这个方案尝试
[when-running-spark-on-kubernetes-to-access-kerberized-hadoop-cluster-how-do-you](https://stackoverflow.com/questions/54181560/when-running-spark-on-kubernetes-to-access-kerberized-hadoop-cluster-how-do-you)
First get the delegation token from hadoop using the below command .
1. Do a kinit -kt with your keytab and principal
2. Execute the below to store the hdfs delegation token in a tmp path `spark-submit --class org.apache.hadoop.hdfs.tools.DelegationTokenFetcher "" --renewer null /tmp/spark.token`
3. Do your actual spark submit with the adding this configuration . `--conf spark.executorEnv.HADOOP_TOKEN_FILE_LOCATION=/tmp/spark.token `\
The above is how yarn executors authenticate. Do the same for kubernetes executors too.
```shell
spark-submit --class org.apache.hadoop.hdfs.tools.DelegationTokenFetcher "" --renewer null /tmp/spark.token
--conf spark.executorEnv.HADOOP_TOKEN_FILE_LOCATION=/tmp/spark.token \
container起来了然后一直在renew也不退出,也不继续执行代码...
spark on k8s折腾2天,hdfs都好的,但是卡在没解决hive数据源的kerberos认证问题
- In Spark 3.0, we upgraded the built-in Hive from 1.2 to 2.3 and it brings following impacts:
* You may need to set `spark.sql.hive.metastore.version` and `spark.sql.hive.metastore.jars` according to the version of the Hive metastore you want to connect to.
* For example: set `spark.sql.hive.metastore.version` to `1.2.1` and `spark.sql.hive.metastore.jars` to `maven` if your Hive metastore version is 1.2.1.
* You need to migrate your custom SerDes to Hive 2.3 or build your own Spark with `hive-1.2` profile. See [HIVE-15167](https://issues.apache.org/jira/browse/HIVE-15167) for more details.
* The decimal string representation can be different between Hive 1.2 and Hive 2.3 when using `TRANSFORM` operator in SQL for script transformation, which depends on hive's behavior. In Hive 1.2, the string representation omits trailing zeroes. But in Hive 2.3, it is always padded to 18 digits with trailing zeroes if necessary.
部署环境
部署要求
验证环境
cdh版本-hadoop2.6.0-cdh5.16.1
工作流程
当我们通过
spark-submit
将Spark作业提交到Kubernetes集群时,会执行以下流程:docker镜像制作
Dockerfile
ENV CDH_PACKGE cdh5.16.1.p0.3.tar.gz ENV JAVA_HOME /usr/local/openjdk-8 ENV CDH_HOME /opt/cdh ENV HADOOP_HOME ${CDH_HOME}/hadoop ENV HADOOP_CONF_DIR ${HADOOP_HOME}/etc/hadoop ENV YARN_CONF_DIR ${HADOOP_HOME}/etc/hadoop
支持多版本的spark
ENV SPARK2_HOME ${CDH_HOME}/lib/spark2 ENV SPARK3_HOME ${CDH_HOME}/lib/spark3
ENV PATH $JAVA_HOME/bin:$PATH
SHELL ["/bin/bash", "-o", "pipefail", "-c"]
ENV DEBIAN_FRONTEND=noninteractive
RUN set -ex && \ sed -i 's#deb.debian.org#mirrors.aliyun.com#g' /etc/apt/sources.list && \ apt-get update && \ ln -s /lib /lib64 && \ apt-get install -y --no-install-recommends bash tini libc6 libpam-modules libnss3 \ cron krb5-user libpam-krb5 coreutils dos2unix net-tools openrc iputils-ping ttf-dejavu fontconfig telnet && \ apt-get install -y --no-install-recommends python3 python3-pip && \ mkdir -p /opt/cdh && \ echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \ chgrp root /etc/passwd && chmod ug+rw /etc/passwd && \ rm -rf /var/cache/apt/*
download cdh package from ftp
RUN wget http://xxx/_share/installers/$CDH_PACKGE -qO- | \ tar zxf - --strip-components 1 -C /opt/cdh
COPY entrypoint.sh /opt/
WORKDIR /opt/cdh
ENTRYPOINT [ "/opt/entrypoint.sh" ]
kerberos认证
手动kinit
在spark2.4.8尝试 自行编译spark代码集成hadoop2.6,hive-1.1.0
Driver stacktrace:
container起来了然后一直在renew也不退出,也不继续执行代码...
spark on k8s折腾2天,hdfs都好的,但是卡在没解决hive数据源的kerberos认证问题
放弃2.4了,升级3.1.2,基于cdh的包,lib下放入spark2和spark3的版本,做成镜像
升级spark3
kerberos新增3个参数,支持kerberos认证,krb5Conf会写入executor的configMap
spark-hive兼容性
之前花了很大力气自己编译spark源码,兼容自己的hadoop和hive版本。
最后发现不需要这么整,Spark,对hive数据源做了很好的抽象,外部制定hive版本和jar包路径,spark内部HiveClient会记载对应的HiveConf和执行支持的Operation。
做了以下尝试:
dev/make-distribution.sh
直接修改编译的hadoop版本,编译不通过官方上面这段话,说可以自己编译hive-1.2,实际编译不了. spark-3.0.2官方有提供编译的下载包,后续版本没有看到
这个
maven
模式,可以设置"spark.sql.maven.additionalRemoteRepositories"配置自定义maven仓库根据官网的说明 ,spark从1.4.0 开始就能和不同的hive元数据进行交互,也就是说spark编译的hive内部版本和spark访问hive的元数据是独立的,可以配置不同的hive版本进行对应元数据的访问。
具体代码在org.apache.spark.sql.hive.HiveUtils和org.apache.spark.sql.hive.client.IsolatedClientLoader中,
测试路径
with hadoop
版本,必须是官方的,不能替换为其他hadoop版本。测试HiveClasspath
心里总是不停的问,\为什么就是不加载`spark.sql.hive.metastore.jars.path`目录下的jar...
通过以下spark-shell
定位问题用下面的代码救命.执行代码查看classpath
spark2和spark3与hive集成区别
spark-3.1.2-bin-hadoop2.7.tgr.gz
,and prepare dir with2.6.0-cdh5.16.1
, the hive version is1.1.0
基于
spark-3.1.2-bin-hadoop2.7
测试通过当这个日志出现的时候,我禁不住的留下了泪水---_-
21/09/12 14:53:39 INFO HiveUtils: Initializing HiveMetastoreConnection version 1.1.0 using path:
spark-pod的host如何注入
spark-sumit新增环境变量
HOSTS_FILE
,HOSTS_FILE为spark-config
中的pvc路径 docker镜像的entrypoint中读取pvc中约定的host文件并追加到/etc/hostsspark-executor日志采集
在Spark On Yarn下,可以开启
yarn.log-aggregation-enable
将日志收集聚合到HDFS中,以供查看。 在Spark On Kubernetes中,则缺少这种日志收集机制,我们只能通过Kubernetes pod的日志输出,来查看Spark的日志:spark部署history-server
spark historyserver
查看,挂载eventlog的pvc目录spark数据存储
S3A
在
$SPARK_HOME
内新增目录jars_ext,将扩展jar放入spark-3.1.2-bin-hadoop2.7.tgz
对应的aws版本(这个版本不能向下兼容,版本不能搞错)然后在entrypoint.sh中加入
export SPARK_DIST_CLASSPATH="$SPARK_DIST_CLASSPATH:${SPARK_HOME}/jars_ext/*"
spark-submit参数
HDFS
hive和hdfs集成涉及
Spark 运行需要配置 Hadoop 的配置文件位置,如果 ClassPath 没有,则无法访问 HDFS
export HADOOP_CONF_DIR=/opt/spark/conf
hdfs测试脚本:
hdfs dfs -copyFromLocal people.json hdfs://ha/user/hive/examples/src/main/resources/people.json
Spark的PVC配置
spark pvc
mount glusterfs pvc
spark-config
mount -t glusterfs -o backup-volfile-servers=glusterfs_ip k8s_ip:spark-config /var/lib/dolphinscheduler/worker-data/spark-config
spark-applicationmount -t glusterfs -o backup-volfile-servers=glusterfs_ip k8s_ip:spark-application /var/lib/dolphinscheduler/worker-data/exec/process
spark集成第三方数据源
以下数据源可通过spark相关的jar包集成(放到
jars_ext
目录),程序中配置连接参数就可以spark的driver-ui如何访问
kubectl -n spark port-forward podName 4000:4040
,这种比较原始,每个spark-submit都需要启动一个代理服务,适合测试,不适合生产环境spark-submit示例
spark on k8s在dolphinscheduler的worker运行脚本
reference