spark job相关
groupby reduceby区别
- reduceByKey 会进行分区内聚合,函数内部调用了combineByKey,然后再进行网络传输
- groupByKey 不会进行局部聚合
依赖
- NarrowDependency分为OneToOneDependency和RangeDependency两种
- ShuffleDependency:子依赖多个父RDD
stage & task
DAGScheduler将Stage划分->把Stage转换为TaskSet->TaskScheduler将计算任务最终提交到集群
最后的Stage包含了一组ResultTask
task
- task和partition是一对一。一组Task就组成了一个Stage
- Task
- ShuffleMapTask
- 根据Task的partitioner将计算结果放到不同的bucket
- ResultTask
- 计算结果发送回Driver
- ShuffleMapTask
如何将stage划分为taskset
Spark Standalone-cluster
- 流程
- 集群启动后worker向Master汇报资源,Master掌握集群资源
- client提交sparkApplication,向Master申请启动Driver
- Master收到请求后随机找一台节点启动Driver,Driver向Master申请executor进程资源
- driver 里scheduler将DAG划分为stage,taskscheduler划分stage为taskset
- Master找到满足资源的worker节点,启动Excutor,反向注册给Driver
- Driver的context taskscheduler再把task发给executor,监控task,回收结果(collect方法)
- 流程
缓存
- persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中
- 触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用
- StorageLevel
- shuffle会涉及到网络传输,可能会丢失数据,shuffle之前做persist,框架默认将数据持久化到磁盘
checkpoint
- 在checkpoint的时候强烈建议先进行cache
- 当你checkpoint执行成功了,那么前面所有的RDD依赖都会被销毁
- 区别persist
- persist可以将 RDD 的 partition 持久化到磁盘,但该 partition 由 blockManager 管理, blockManager stop,文件夹被删除
- checkpoint 将 RDD 持久化到 HDFS 或本地文件夹,是一直存在的,也就是说可以被下一个 driver program 使用
rdd
- 本质上是多个Partition组成的List。一个RDD可以包含多个分区,每个分区就是一个dataset片段。分区数决定了并行计算的数量
默认情况下,HDFS上面一个Block就是一个Partition。 - RDD如何保障数据处理效率
- 通过persist与patitionBy函数来控制RDD的分区与持久化
- 底层接口则是基于迭代器的
- 本质上是多个Partition组成的List。一个RDD可以包含多个分区,每个分区就是一个dataset片段。分区数决定了并行计算的数量
容错
rdd记住构建它的操作图(Graph of Operation),Worker失败时重新计算,无需replication
需要恢复执行过程的中间状态:通过Spark提供的checkpoint
lineAge:每次更新都会记录下来,比较复杂且比较耗费性能。适用于DAG中重算太耗费时间的
- 数据倾斜
- 先局部聚合,再全局聚合。
- 并行度太少了,导致个别Task的压力太大
- 自定义partition,分散key的分布,使其更加均匀
累加器和广播变量
累加器
- 累计计数等场景
- Driver端定义赋初始值,累加器只能在Driver端读取最后的值,在Excutor端更新。
广播变量
- 每个executor一个副本,普通变量每个task一个副本
- driver定义和修改
- 节点间高效分发大对象
序列化
java序列化
- static和transient修饰的变量不会被序列化
- readObject()方法和writeObject()自定义实现序列化
spark序列反序列化过程
- 代码中对象在driver本地序列化;
- 对象序列化后传输到远程executor节点;
- 远程executor节点反序列化对象
SerializationDebugger在日志中出问题的类和属性
kryo
- Kryo serialization 性能和序列化大小都比默认提供的 Java serialization 要好
.registerKryoClasses(Array(classOf[Student])) // 将自定义的类注册到Kryo
- Spark 2.0.0以来,sparkcontext初始化时某些类型已被注册进去
repartition和coalesce的区别、partitionby
- coalesce
- repartition底层为coalesce
- 一般增大rdd分区用repartition,减小用coalesce
- repartition一定涉及shuffle,coalesce根据传入参数判断是否发生shuffle
- partitionby
- repartition 和 partitionBy 都是对数据进行重新分区,默认都是使用 HashPartitioner,区别在于partitionBy 只能用于 PairRDD
- repartition 随机生成的数来当做 Key,partitionby是自己的key
- coalesce
map()和mapPartition()的区别
- map():每次处理一条数据;mapPartition():每次处一个分区的数据,可能导致OOM。
- 当内存空间较大的时候建议使用mapPartition(),以提高处理效率。
- mapPartitionsWithIndex类似mapPartitions,但func带有一个整数参数表示分片的索引值
glom
- glom会把每个批次每个分区的数据从Iterator类型转换为Array类型,所以如果每个分区的数据非常大的话会出现OOM的情况。
pipe
- 每个分区一个脚本
比hive快
- HQL 引擎还比 Spark SQL 的引擎更快
- Hadoop 每次 shuffle 操作后,必须写到磁盘,spark内存
- 消除了冗余的 MapReduce 阶段
- Hadoop 每次 MapReduce 操作,启动一个 Task 便会启动一次 JVM
Spark 基于线程,只在启动 Executor 是启动一次 JVM,内存的 Task 操作是在线程复用的。
aggregateByKey foldByKey combineByKey
- aggregateByKey
rdd.aggregateByKey(0)(math.max(_,_),_+_)
- 每个分区的各个key的value和初始值0,进行max,获得到每个key对应value的max
然后每个分区进行combine即相加
- 计算相同key对应值的相加结果:
rdd.foldByKey(0)(_+_)
- combineByKey
input.combineByKey((_,1), (acc:(Int,Int),v)=>(acc._1+v,acc._2+1),//v为当前值 (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2))
- key对应的value先映射成一个二元组(value,1),同一个分区内的相同key将二元组相加
再将不同分区的相同key的二元组相加。 - 相加时,元组的第一位和第二位都分别累加
- aggregateByKey
cogroup
- 第一个RDD元素是(1,”Allen”),(2,”Bob”),(3,”Carl”),第二个RDD元素是(1,10000),(1,5000),(2,11000),(2,6000),(3,12000),(3,6000)
- join的结果是: (1,(Allen,10000)) (1,(Allen,5000)) (2,(Bob,11000)) (2,(Bob,6000)) (3,(Carl,12000)) (3,(Carl,6000))
- cogroup的结果是: (1,([Allen],[10000, 5000])) (2,([Bob],[11000, 6000])) (3,([Carl],[12000, 6000]))
mapValues
- mapValues(_+”|||”)–每个value后加上”|||”
T级别数据
单个executor进程内RDD的分片数据是用Iterator流式访问的
RDD lineage上各个transformation携带的闭包函数复合而成的Iterator,
每访问一个元素,就对该元素应用相应的复合函数,得到的结果再流式地落地
对于shuffle stage是落地到本地文件系统留待后续stage访问,
对于result stage是落地到HDFS或送回driver端等等,视选用的action而定用户要求Spark cache该RDD,且storage level要求在内存中cache时,
Iterator计算出的结果才会被保留,通过cache manager放入内存池
本文链接: https://satyrswang.github.io/2021/04/05/spark/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!