Spark
- tags: Bigdata
Spark 编程语言选择⌗
毋庸置疑,Python 应该是最简单也是大部分的选择,但是如果有依赖那么将要付出额外的心智负担(Spark 管理 Python 依赖)。 JVM 语言的依赖组织方式则具有天然的优势,可以将依赖(排除 Spark 生态之后)都 bundle 进 Jar 包里。 其中 Scala 兼具简单和 JVM 的优势,但是它「不流行」。
Spark Driver & Executor⌗
- Driver 执行 spark-commit 客户端,创建
SparkContext
执行main
函数。 - Executor Spark Worker 上的线程
See also:
Spark 代码执行⌗
我在配置 Spark 的时候就在好奇,从观察上看部分代码应该是执行在 Driver 上部分代码会执行在 Executer,这让我很好奇。 但是我通过学习 Spark RDD 学习到了一些知识。
以下代码是在 Executor 上执行的:
- Transformations 和 Actions 是执行在 Spark 集群的。
- 传递给 Transformations 和 Actions 的闭包函数也是执行在 Spark 集群上的。
其他额外的代码都是执行在 Driver 上的,所以想要在 Driver 打印日志需要上使用 collect
:
rdd.collect().foreach(println)
collect
可能会导致 Driver 内存爆掉,可以使用 take
:
rd.take(100).foreach(println)
所以在这就带来在闭包中共享变量的问题,参见 Spark 共享变量。
Spark 编程抽象⌗
Spark RDD⌗
集合并行化⌗
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
外部数据集⌗
- 输入源支持支持 Hadoop 支持的任何存储源,包括:本地文件系统、HDFS、Cassandra、HBase、Amazaon S3 等
- 输入格式支持:文本文件、SequenceFiles 和任何其他 Hadoop InputFormat
如果是本地文件系统,则文件需要存在与所有 Worker 节点上。
Spark Transformations vs Actions⌗
Spark 支持两种操作类型:
- transformations:从现有数据集创建新的数据集,比如
map
。 - actions:在数据集上进行运算然后返回值给 driver,比如
reduce
。
Spark Transformations 懒执行⌗
所有的 Spark transformations 会记住应用的基础数据集,只要在需要将结果返回给 driver 的时候才进行计算。
比如,我们可以感知到一个数据集(dataset)通过 map
创建,将会被 reduce
使用并返回 reduce
的结果给 driver 而不是一个映射过(mapped)的大数据集。
Spark transformations 重复计算⌗
默认情况下,每一次在一个 RDD 上运行 action Spark 都可能会进行重新计算,这时候可以使用 persist 缓存一个 RDD 到内存中。 下一次查询将会被加速,同时 Spark 支持存储到磁盘或者跨多节点复制(replicated)。
Spark 共享变量⌗
Spark 支持两种共享变量的方式:
- Broadcast Variables
- Accumulators
设置 Spark Python 版本⌗
export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
export PYSPARK_PYTHON=./environment/bin/python # Executor
上面 environment 是提交的时候需要在 --archives
缀上的:
spark-submit --archives pyspark_conda_env.tar.gz#environment app.py
Note that
PYSPARK_DRIVER_PYTHON
above should not be set for cluster modes in YARN or Kubernetes.
Spark 管理 Python 依赖⌗
YARN⌗
支持 --archives
参数上传打包好的环境信息,主要三种方式:
- PySpark 原生特性,
--py-files
支持 zip 和 egg 格式,但是不支持 whl - Python vendor package
See alos: Python Package Management
Standalone cluster⌗
可以借助上面的 Python 包管理机制,将打包好的环境在各个节点进行同步。假设将 conda-pack 解压到 /opt/conda-envs/test
,可以通过在 Spark 任务脚本最上方通过 PYSPARK_PYTHON
指定解释器:
import os
os.environ['PYSPARK_PYTHON'] = '/opt/conda-envs/test'
conf = {}
sc = SparkContext(conf=conf)
Spark Hive 表问题汇总⌗
Spark 2.3 之后读取 Hive Orc 字段全是 null 或者无法过滤⌗
主要是因为 Orc 文件在 Hive 中存储的时候是大小写敏感的 Schema。 通过如下配置关闭 2.3 之后启用的选项:
spark.sql.hive.convertMetastoreOrc=false
但是启用这个会导致写 Hive Orc 表的时候报错:
[2021-11-20 08:22:26,500] {spark_submit.py:523} INFO - : java.lang.NoSuchMethodException: org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(org.apache.hadoop.fs.Path, java.lang.String, java.util.Map, boolean, boolean, boolean, boolean, boolean, boolean)
只能在读指定表的时候动态设置:
spark.conf.set("spark.sql.hive.convertMetastoreOrc", False)
更多坑可以看 Upgrading Guide⌗
Spark 写入的 Hive Orc 表但是旧版 Hive 无法读取⌗
# 解决写入 Orc 表但是 Hive 无法读取的问题
spark.sql.orc.impl=hive