spark job相关
groupby reduceby区别
- reduceByKey 会进行分区内聚合,函数内部调用了combineByKey,然后再进行网络传输
 - groupByKey 不会进行局部聚合
 
依赖
- NarrowDependency分为OneToOneDependency和RangeDependency两种
 - ShuffleDependency:子依赖多个父RDD
 
stage & task
DAGScheduler将Stage划分->把Stage转换为TaskSet->TaskScheduler将计算任务最终提交到集群
最后的Stage包含了一组ResultTask
task
- task和partition是一对一。一组Task就组成了一个Stage
 - Task
- ShuffleMapTask
- 根据Task的partitioner将计算结果放到不同的bucket
 
 - ResultTask
- 计算结果发送回Driver
 
 
 - ShuffleMapTask
 
如何将stage划分为taskset
Spark Standalone-cluster
- 流程
- 集群启动后worker向Master汇报资源,Master掌握集群资源
 - client提交sparkApplication,向Master申请启动Driver
 - Master收到请求后随机找一台节点启动Driver,Driver向Master申请executor进程资源
 - driver 里scheduler将DAG划分为stage,taskscheduler划分stage为taskset
 - Master找到满足资源的worker节点,启动Excutor,反向注册给Driver
 - Driver的context taskscheduler再把task发给executor,监控task,回收结果(collect方法)
 
 
- 流程
 
缓存
- persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中
 - 触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用
 - StorageLevel
 - shuffle会涉及到网络传输,可能会丢失数据,shuffle之前做persist,框架默认将数据持久化到磁盘
 
checkpoint
- 在checkpoint的时候强烈建议先进行cache
 - 当你checkpoint执行成功了,那么前面所有的RDD依赖都会被销毁
 - 区别persist
- persist可以将 RDD 的 partition 持久化到磁盘,但该 partition 由 blockManager 管理, blockManager stop,文件夹被删除
 - checkpoint 将 RDD 持久化到 HDFS 或本地文件夹,是一直存在的,也就是说可以被下一个 driver program 使用
 
 
rdd
- 本质上是多个Partition组成的List。一个RDD可以包含多个分区,每个分区就是一个dataset片段。分区数决定了并行计算的数量
默认情况下,HDFS上面一个Block就是一个Partition。 - RDD如何保障数据处理效率
- 通过persist与patitionBy函数来控制RDD的分区与持久化
 - 底层接口则是基于迭代器的
 
 
- 本质上是多个Partition组成的List。一个RDD可以包含多个分区,每个分区就是一个dataset片段。分区数决定了并行计算的数量
 容错
rdd记住构建它的操作图(Graph of Operation),Worker失败时重新计算,无需replication
需要恢复执行过程的中间状态:通过Spark提供的checkpoint
lineAge:每次更新都会记录下来,比较复杂且比较耗费性能。适用于DAG中重算太耗费时间的
- 数据倾斜
- 先局部聚合,再全局聚合。
 - 并行度太少了,导致个别Task的压力太大
 - 自定义partition,分散key的分布,使其更加均匀
 
 
累加器和广播变量
累加器
- 累计计数等场景
 - Driver端定义赋初始值,累加器只能在Driver端读取最后的值,在Excutor端更新。
 
广播变量
- 每个executor一个副本,普通变量每个task一个副本
 - driver定义和修改
 - 节点间高效分发大对象
 
序列化
java序列化
- static和transient修饰的变量不会被序列化
 - readObject()方法和writeObject()自定义实现序列化
 
spark序列反序列化过程
- 代码中对象在driver本地序列化;
 - 对象序列化后传输到远程executor节点;
 - 远程executor节点反序列化对象
 
SerializationDebugger在日志中出问题的类和属性
kryo
- Kryo serialization 性能和序列化大小都比默认提供的 Java serialization 要好
 .registerKryoClasses(Array(classOf[Student])) // 将自定义的类注册到Kryo- Spark 2.0.0以来,sparkcontext初始化时某些类型已被注册进去
 
repartition和coalesce的区别、partitionby
- coalesce
- repartition底层为coalesce
 - 一般增大rdd分区用repartition,减小用coalesce
 - repartition一定涉及shuffle,coalesce根据传入参数判断是否发生shuffle
 
 - partitionby
- repartition 和 partitionBy 都是对数据进行重新分区,默认都是使用 HashPartitioner,区别在于partitionBy 只能用于 PairRDD
 - repartition 随机生成的数来当做 Key,partitionby是自己的key
 
 
- coalesce
 map()和mapPartition()的区别
- map():每次处理一条数据;mapPartition():每次处一个分区的数据,可能导致OOM。
 - 当内存空间较大的时候建议使用mapPartition(),以提高处理效率。
 - mapPartitionsWithIndex类似mapPartitions,但func带有一个整数参数表示分片的索引值
 
glom
- glom会把每个批次每个分区的数据从Iterator类型转换为Array类型,所以如果每个分区的数据非常大的话会出现OOM的情况。
 
pipe
- 每个分区一个脚本
 
比hive快
- HQL 引擎还比 Spark SQL 的引擎更快
 - Hadoop 每次 shuffle 操作后,必须写到磁盘,spark内存
 - 消除了冗余的 MapReduce 阶段
 - Hadoop 每次 MapReduce 操作,启动一个 Task 便会启动一次 JVM
Spark 基于线程,只在启动 Executor 是启动一次 JVM,内存的 Task 操作是在线程复用的。 
aggregateByKey foldByKey combineByKey
- aggregateByKey 
rdd.aggregateByKey(0)(math.max(_,_),_+_)- 每个分区的各个key的value和初始值0,进行max,获得到每个key对应value的max
然后每个分区进行combine即相加 
 -  计算相同key对应值的相加结果:
rdd.foldByKey(0)(_+_) - combineByKey
input.combineByKey((_,1), (acc:(Int,Int),v)=>(acc._1+v,acc._2+1),//v为当前值 (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2))- key对应的value先映射成一个二元组(value,1),同一个分区内的相同key将二元组相加
再将不同分区的相同key的二元组相加。 - 相加时,元组的第一位和第二位都分别累加
 
 
- aggregateByKey 
 cogroup
- 第一个RDD元素是(1,”Allen”),(2,”Bob”),(3,”Carl”),第二个RDD元素是(1,10000),(1,5000),(2,11000),(2,6000),(3,12000),(3,6000)
 - join的结果是: (1,(Allen,10000)) (1,(Allen,5000)) (2,(Bob,11000)) (2,(Bob,6000)) (3,(Carl,12000)) (3,(Carl,6000))
 - cogroup的结果是: (1,([Allen],[10000, 5000])) (2,([Bob],[11000, 6000])) (3,([Carl],[12000, 6000]))
 
mapValues
- mapValues(_+”|||”)–每个value后加上”|||”
 
T级别数据
单个executor进程内RDD的分片数据是用Iterator流式访问的
RDD lineage上各个transformation携带的闭包函数复合而成的Iterator,
每访问一个元素,就对该元素应用相应的复合函数,得到的结果再流式地落地
对于shuffle stage是落地到本地文件系统留待后续stage访问,
对于result stage是落地到HDFS或送回driver端等等,视选用的action而定用户要求Spark cache该RDD,且storage level要求在内存中cache时,
Iterator计算出的结果才会被保留,通过cache manager放入内存池
本文链接: https://satyrswang.github.io/2021/04/05/spark/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!