Fork me on GitHub

Spark最佳实践(上)

简介

本书是Spark的实战指南,全书共8章,前四章介绍Spark部署,工作机制和内核,后四章通过实战项目介绍Spark SQL,Spark Streaming,Spark GraphX和Spark MLlib功能模块。

通过阅读,读者可以接收的信息和获取的能力包括:

Spark单机及集群部署,一键部署大规模集群的脚本;

对Spark运行过程,数据结构,内核函数进行深入解析;

hdfs解决了数据的分布式存储,Spark作为优秀的计算框架解决了数据处理的效率问题,那么把数据屯起来处理,想要干啥?这就是本书第三个重要的部分,对大量数据进行查询支持的Spark SQL、应对网站实时统计,数据流处理的Spark Streaming、在社交网络关系挖掘中举足轻重的图处理 Spark GraphX、在当前最火的机器学习上提供了Spark MLlib模块,实现了分类聚类回归等多种算法,这些可以处理实际问题的模块是Spark 业内热度飙升的重要原因。

作者是陈欢和林世飞,在腾讯的社交和效果广告部负责大数据处理和分析的相关工作,掌握着海量的服务器资源,利用成规模的Spark集群来捣腾海量数据。

成书时间2016年2月,技术迭代很快又很慢,随着市场发展新的功能需求不断产生,技术的深度和使用技巧快速演进,慢则体现在技术的稳定性,五年前十年前的应用和概念多得是,很多十多年前的技术书籍现在拿来看依然唇齿留香,经典绝伦。Spark作为已经相当成熟的大数据领域工业应用级框架,以后肯定会不断发展,但根基整个推翻是不太可能了,那意味着整个行业所有跑在Spark上面的应用全部废掉,本书作为资深人士刚写出来结合行业内当前发展基础性技术书籍,预期3-5年不会过时,作为入门书籍那是相当靠谱。

前言

互联网快速发展,最近几年全球数据量以每年50%的速度递增,大数据在事实上改变着我们的思维和生产方式。2015年9月国务院印发《促进大数据发展行动纲要》,大数据与各行各业的结合是大势所趋,大数据相关的企业创业和从业者的需求眼看着就是一波大发展。

03年开始谷歌陆续发表三大论文,在Apache软件基金会和雅虎等互联网公司的支持下,实现了大量了开源服务框架,包括hadoop,hive等重量级产品,hadoop拉开了IT行业使用大数据技术的序幕,后续发展如火如荼,数据量持续增长技术不断演化,10年美国加州伯克利分校陆续提出了多份RDD(Resilient Distributed Dataset,弹性分布式数据集)相关论文,随之推出了开源的Spark框架,对比传统的Hadoop,拥有深厚学术背景的Spark将以往的MapReduce,流式计算,机器学习等全部统合,提供了一站解决方案,让数据挖掘和机器学习的门槛大大降低,加速了大数据技术在各个行业的普及。

大数据热门之后出现了非常多的大数据技术,有Hadoop、Hive、HBase、Flume、Kafka、Lucene、Spark、Storm等,还有很多NoSQL技术可以归入大数据,比如MongoDB、CouchDB、Cassandra等。有人列举过大数据技术族谱,其中至少上百种技术。面对如此多的技术,我们往往会产生疑惑:到底该如何选择?这里有一个建议:在充分理解自身需求的基础上,挑选最合适的。

第一章 Spark和大数据

概述

数据只是工具,最终还是要用于创造价值,大数据只是一种新的实践。

介绍大数据的发展状况,以及Spark的起源、特点、优势、未来

大数据是什么

大数据大势所趋

“大数据”(big data)这个词最早出现在UNIX用户协会的会议上,来自SGI公司的科学家在其文章”大数据与下一代基础架构”中用它描述数据的快速增长,具有”4V”,大量(volume),多样(variety),快速(velocity),价值(value)的特征,从数据量上来看,通常认为当一个计算单元容纳不下要处理的数据,那就是大数据的场景。

电脑发明之前做人口统计,全国耕地面积普查,气象预报等;使用普通pc,服务器的时候碰到了互联网级别数据统计,如搜索引擎需要把网上的多数网页抓取来建立索引,这样我们搜索的时候才能毫秒级返回结果。假设每个网页平均大小20KB,大概有价值的中文网页有200亿,大概总共有400TB大小,那么一台计算机以30MB~40MB的速度从硬盘读写,大概需要4个月,还不包括做复杂的分析工作,这时候就需要计算机集群和大数据技术来存储处理。

大数据的两大问题:存储计算

X表示一台服务器是否正常工作,X=0表示正常工作,X=1表示不正常工作。服务器一个月内发生故障的概率是ε,P(X= 0)= 1 -ε,那么1000台电脑都正常工作的概率是(1-ε)^1000。假设故障率ε=0.001,所有1000台服务器1个月内正常工作的概率只有0.37,不到一半,所以随着数据的增长,机器会不断增加,数据存储和运行过程中都会产生宕机丢数据网络波动等意外状况。hadoop的hdfs从设计上解决了数据的分布式存储,集群的扩展性和容错性都非常完善。

计算框架包括很多优秀的平台,如hadoop的mapreduce做批处理任务,storm做流式处理,hive进行分布式的SQL查询,还有很多如mahout,mpi等,这些可以分别满足不同场景的优秀框架解决了海量数据的处理问题,使得从中进行数据挖掘,实现机器学习成为可能。

大数据发展的技术要素都已经存在,2015年9月5日国务院印发了《促进大数据发展行动纲要》,纲要提出要建设公共数据资源开放的统一开放平台,“2020年底前,逐步实现信用、交通、医疗、卫生、就业、社保、地理、文化、教育、科技、资源、农业、环境、安监、金融、质量、统计、气象、海洋、企业登记监管等民生保障服务相关领域的政府数据集向社会开放”,以及“到2020年,培育10家国际领先的大数据核心龙头企业,500家大数据应用、服务和产品制造企业”,大数据的浪潮已经不可阻挡!

这里引出一个概念,”云计算”,云计算是一种按使用量付费的模式,这种模式提供可用的、便捷的、按需的网络访问, 进入可配置的计算资源共享池(资源包括网络,服务器,存储,应用软件,服务),有不同层级的服务模式,IaaS(Infrastructure as Service):基础设施即服务,PaaS(Platform as Service):平台即服务,SaaS(Software as Service):软件即服务。其中IaaS支持按天付费,而且可以动态按需扩容。在云计算服务商的帮助下,如今一个小创业公司都能够快速开发和部署大数据应用。云计算服务商的国外领导者是AmazonAWS,国内主要是阿里云、腾讯云和UCloud。

大数据的重要解决方案hadoop

Hadoop开天辟地

谷歌的三大论文引发了人们在大数据领域的大量研究,直接导致了Hadoop的出现——MapReduce范式的开源实现。如今Hadoop,包括MapReduce与分布式文件系统(HDFS),已经成为数据处理的事实标准。大量的工业界应用,例如腾讯、百度、阿里巴巴、华为、迪斯尼、沃尔玛、AT&T都已经有自己的Hadoop集群。

如图,Hadoop框架的核心是HDFS和MapReduce,其中HDFS是分布式文件系统,MapReduce是分布式数据处理模型和执行环境。

各个组件的功能互相配合使得hadoop能够完成很多场景的大数据任务,本文主角是Spark,这里不再赘述,有兴趣了解的可以移步

但MapReduce不是唯一的大数据分析范式,有一些场景是不适合使用MapReduce的,比如处理网状的数据结构时,这要求能够处理顶点和边的增加和减少操作,并在所有节点进行运算,比如进行机器学习算法需要高速多次迭代的时候因为硬盘读写瓶颈会显得效率稍低,比如不擅长实时计算,需要使用storm,s4,akka等系统。

为什么要用Spark

Spark应运而生

Spark替代hadoop不太准确,如上面生态系统图,确切来说spark替代的是MapReduce,凭借自身的内存计算,RDD数据结构在批处理方面比MapReduce表现得更加出色,同时Spark本身拥有Spark SQL,streaming,MLlib,GraphX这些模块,使得诸多功能用Spark一站搞定,不用像之前那样根据需要分别使用MapReduce,hive,storm等,这让Spark在使用门槛,维护成本和性能表现上具有明显优势。

机器学习算法通常需要对同一个数据集合进行多次迭代计算,而MapReduce中每次迭代都会涉及HDFS的读写,以及缺乏一个常驻的MapReduce作业,因此每次迭代需要初始化新的MapReduce任务,效率不高,基于MR的HIVE,PIG等技术也有类似问题。

Spark作为一个研究项目,诞生于加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab),为了迭代算法和交互式查询两种典型的场景,Matei Zaharia和合作伙伴开发了Spark系统的最初版本。2009年Spark论文发布,Spark项目正式诞生,在某些任务表现上,Spark相对于Hadoop MapReduce有10~20倍的性能提升。2010年3月Spark开源,且在开源社区下发展迅速。2014年5月,Spark 1.0正式发布,如今已经是Apache基金会的顶级项目了。

如下图在官方博客的排序大赛可以有很清晰的对比,可以看看英文,还顺便创造了个世界纪录2333。最新的结果可以戳这里

Spark一直寻求保持Spark引擎小而紧凑。Spark 0.3版本只有3900行代码,其中1300行为Scala解释器,600行为示例代码,300行为测试代码。即使在今天,Spark核心代码也只有约50 000行,因此更容易为许多开发人员所理解和供我们改变和提高。

Spark未来发展

Spark在过去的5年里发展迅速,社区活跃程度一点儿不亚于Hadoop社区。

Spark峰会是社区分享Spark应用的重要交流会,在Spark Summit 2015上,来自Databricks、UC Berkeley AMPLab、百度、阿里巴巴、雅虎、英特尔、亚马逊、Red Hat、微软等的数十个机构共分享了近100个精彩的报告。

目前Databricks、英特尔、雅虎、加州大学伯克利分校是Spark主要的贡献者。

已经大规模应用在生产环境,2015年Spark最大的集群来自腾讯,一共有8000个节点,单个Job最大分别是阿里巴巴和Databricks的1 PB,国内的BAT都在用Spark,可以从这里查看完整的用户列表,Spark作为一种优秀的大数据处理技术已经得到广泛认可,在如此多的应用场景中承担着任务,逐渐会积累庞大的用户经验和文档。

道家看事物分为”道,法,术,器”四个层次,拿到大数据领域来看,大概可以这么理解,Spark是一种工具-器,掌握用Spark去解决实际问题的方法-术,能够根据场景和需求熟练选择合适的技术,不滞于某种技术,这就是掌握了大数据领域解决问题的基本规则-法,而道,可能就是那些在十几年前就已经从计算机技术和社会发展的层次看到了数据膨胀和物理发展的冲突,那个时候就开始提出理论试图拿出解决方案的人,是那些将理论转为现实,设计出分布式存储和各种算法的人,是站在大数据发展的最前端,了解技术走向并引领潮流的人所处的层次吧,难望项背,对先行者和开拓者唯有敬意!

第二章 Spark基础

君子藏器于身,待时而动

本章介绍Spark部署和编程。作者首先带领读者在本地单机下体验Spark的基本操作,然后部署一个包括ZooKeeper、Hadoop、Spark的可实际应用的高可用集群。并且,这一章还介绍了作者开发的一个自动化部署工具,最后介绍了Spark编程的基础知识,以及如何打包提交至集群上运行。

Spark集群部署

Spark真实的部署环境绝大部分都在Linux下,运行依赖JDK,大致思路就是在linux系统里配置JDK,将Spark的预编译包解压到指定路径即可使用。

笔者之前已经琢磨过Spark的安装部署,在本博客里另一篇博文Spark实践有详细介绍,所有搭建步骤都经过两遍以上实际操作,验证无误。本地已经有了现成的环境,如下。

这些基本操作没有什么太多的变化,建议按照笔者博文的步骤实际操作,亲手搭建环境后再深入学习原理,手头有环境操作原理并行学习效率会高很多。

本章里精华部分是作者对生产环境各种集群的配合部署和自动化脚本的详细介绍,看起来复杂,各种几千台服务器的初始化,自动化集群部署,实际上在自己实践并理解了整个流程之后会发现不是很难,下面介绍这部分内容,我猜想,作者写作的时候预定阅读群体是对Spark没有认识和实践经验的,所以过程会比较详细,而笔者是对Spark有了了解并且通读全书之后第二遍阅读并整理思路,所以假定本博文的阅读对象已经按照前文提到的博客内容有了实践经验,在此基础上不会那么详细(但保证不会出错!)。

集群总览

整个集群除了核心的Spark之外,还有多个子集群。为了实现集群的高可用性,引入zookeeper集群进行主备切换。Spark没有存储能力,引入HDFS。引入YARN让集群具备非常好的扩展性。

首先是zookeeper集群,集群机器数量为2n+1台,可以抵御n台机器宕机的风险,如果n取1,总共三台,可以抵御1台机器宕机的风险,所有节点都是相同角色,但运行中会自动选择一个作为leader,zookeeper集群担负终极仲裁者的角色,spark集群的master节点通过于zookeeper集群协商完成主备切换,hdfs文件系统的namenode及yarn中的ResourceManager也是通过ZooKeeper集群来协助完成主备切换。

其次是Spark集群,该集群有两类节点,master和slave。

master负责集群的资源管理,只要有一台正常工作即可,一般至少两台,一台主节点,当主节点异常下线时,其他master节点通过zookeeper仲裁竞选成为主节点

slave节点用于执行计算任务,机器数量可以是一台以上,几十几百或上千台。

hadoop集群从2.x开始把存储和计算分离开,分成hdfs和yarn两个子集群。

hdfs是一个分布式文件系统,为集群提供大文件的存储能力,namenode管理所有的文件信息,一主一备,通过zookeeper实现容灾切换,QJM节点数量为2n+1,一般三台即可,负责记录namenode的流水日志,确保数据不丢失,datanode担负集群的存储负载,用于数据存储。

yarn为MapReduce计算提供调度服务,也可为符合yarn编程接口的集群提供调度服务,比如Spark,该集群有两类节点,ResourceManager和NodeManager,RM一主多备,NM通常与datanode部署在一起,实现最高效的数据访问。

集群的设计理念就是利用普通服务器协同工作来实现大数据的计算,因此集群可以部署在任何普通服务器个人计算机甚至个人计算机的多个虚拟机上,但集群各个节点的工作性质不同,对硬件资源的消耗也不同,合理的搭配能更充分地发挥潜力。

“移动计算而不要移动数据”,计算离数据约近移动数据的开销越小,大部分场景移动数据的开销都会大于计算的开销,所以计算节点和存储节点一般会选择混合部署。

节点部署方面建议Spark slave,HDFS datanode,YARN NodeManager三类节点在一起,这些节点在集群中数量最为庞大,最消耗资源,CPU,硬盘,内存资源都可以倾斜。

其他节点的主要任务是管理,除了namenode对内存要求稍高,其他对硬件消耗都不大。可以考虑混合部署,比如笔者用了五台虚拟机,master,namenode,ResourceManager在一台服务器,其他四台服务器作为spark的slave,hdfs的datanode,yarn的nodemanager。机器资源充足的话也可以单独部署,毕竟这些管理节点的数目都不多,将存储,资源管理,计算的主节点按单节点容灾设计,分开部署消耗最多也不过12台服务器,当然会格外麻烦。建议选择稳定性较高的配置,比如双电源,raid,冗余。

具体到硬件配置作者结合Spark官方和实际项目经验,有如下建议

存储 : Spark Slave节点尽可能靠近存储系统(比如HDFS和HBase),如果不能部署在相同节点上,那么请尽可能让它们在物理位置上更近一些以减少网络IO的开销,比如尽可能在相同机架上、同一个路由器下、相同机房里。而且每个节点建议使用4块或更多独立挂载的硬盘,并且不要配置RAID,这样可以提升Spark读写本地缓存数据时的效率。

内存 : Spark节点可以适应几百吉字节大小的内存。更多的内存可以缓存更多的数据,在一些迭代计算场景下性能提升效果非常明显。另外,建议分配最多节点内存总量的75%给Spark使用,其他的留给操作系统。

网络 : 一些需要进行shuffle的byKey类操作,比如group-ByKey、reduceByKey、join,它们的瓶颈一般都在网络上,所以建议使用10 Gbit/s以上网络。在Spark程序的Web界面上,可以看到shuffle对于网络使用的状况。

CPU : Spark程序可以适应几十个内核的CPU,而且其计算性能与CPU核数量基本成正比。一般建议8~16个核,可以结合实际负载与成本综合考虑。一般情况下,如果数据已经在内存中了,CPU和网络IO会成为瓶颈。

单机硬件配置会严重影响集群性能,实际生产环境需要根据负载在成本和性能之间取得平衡。

部署思路

假设,有几千台服务器,刚刚装好系统配好ip,需要搭建一个Spark环境。

首先,需要初始化环境,包括创建账号,安装JDK,设置时间同步;

然后,依次部署zookeeper,hadoop,spark集群并启动服务。

上述操作在机器少的时候可以手动完成,但是需要在所有机器上操作,既要改配置文件,传软件包,还要执行一些命令,必须使用远程操作工具,笔者之前用的是rsync,还有ansible等等,工具只是实现目标的手段,殊途同归,高效解决问题就行,作者在这里用的是python的fabric。

创建账号

给集群所有机器创建账号

安装JDK

分发jdk软件包到指定路径并设置环境变量

设置时间同步

配置ntp服务器后设置集群所有机器时间同步

部署zookeeper集群

笔者部署自己集群的时候没有用到zookeeper,zookeeper主要是对master,namenode,ResourceManager节点进行仲裁,主备切换,属于容灾措施,生产环境必须要有,没有也不影响正常运行。笔者自己搭着玩所以当时没弄,有时间可以玩一玩(^_^)

部署hadoop集群

hadoop的部署没什么说的,解压安装包到指定路径,修改配置文件并同步,fabric,ansible,rsync都能搞定

部署spark集群

Spark集群的部署跟hadoop大同小异,解压,改配置文件,起服务,多了一个配置ssh免密码登陆,用ssh-keygen和ssh-copy命令就可以轻松搞定,在所有机器上执行这些命令可以使用fabric这些远程执行工具


一键部署高可用hadoop+Spark集群

相信认真看了的朋友发现了,这个一键部署就是将所有的软件分发,批量修改配置文件,顺序执行各种命令都写在了install.sh这个脚本里面,用到的材料就是准备做集群的那些服务器,具体的内容分解开来就是上述所有步骤,只是用脚本实现批量执行,大大提高效率并且减少人工失误,完美,这就是成熟的运维人员该干的事,然而很遗憾,作者给的链接失效了,笔者没有拿到这个脚本┑( ̄Д  ̄)┍

编程指南

集群工作方式

Spark用scala语言开发,scala语言的函数式编程特性让代码的可读性非常高,做spark开发和看spark源码都需要懂scala,Spark也支持Python,java和R,本节用scala语言为示例讲解运行在spark上的程序的编写。

交互式编程,以wordcount举例,执行spark-shell这个命令进入spark命令行,在屏幕滚动日志中可以看到Spark context available as sc,解释器已经初始化了一个名为scSpark context对象,可以直接使用。

spark-shell只适合调试,正式环境下Spark程序在调试好之后,需要编译,链接,打成jar包,最后提交给集群运行

编程核心思想

心之所向,一往无前

Spark工作的目的是调用经过清洗转换存储好的数据,用各种方法进行处理,实现查询,数据挖掘,流处理,机器学习等项目需求。

Spark怎么做这件事的?

第一步,将数据拿进来生成RDD。Spark可以从很多数据源拿到数据,最终都是生成RDD,只是类型不同;

第二步,Spark调用各种函数语法对RDD进行操作计算,transcation定义RDD上的操作,action触发操作,最终结果也是一个RDD,可以将结果按需要保存到指定位置或者进行下一次迭代处理。

RDD是Spark最核心的东西,hadoop的hdfs是为生成RDD的数据提供分布式存储,没有hdfs还能从其他渠道得到,hdfs是一种比较好的解决方案而已,yarn是资源调度,起到Spark触发计算的时候各个节点分配调用计算资源的作用,zookeeper是可以说是冗余容错机制,为了保证spark集群/hdfs存储/计算调度主节点的高可用,他们都是为”Spark生成RDD并调用函数进行处理最终得到预期结果”这一目标而服务的。

那么问题来了,RDD是什么,都可以对RDD进行什么操作?理解了这个问题,Spark可以说已经初窥门径,RDD的本质在后面Spark内核章节进行深入解析,循序渐进,这里先教大家怎么用,怎么创建RDD,怎么对它进行操作。

RDD创建

RDD生成方式总结起来两大类:由Driver程序的数据集生成,由外部数据集生成,如共享文件系统,HDFS文件和HBase等

1.从Driver程序的数据集生成

从Driver数据集生成RDD的直接方法是使用SparkContext对象的parallelize方法,其参数是一个Seq对象,或者其他可以被自动转换成Seq的对象也可以作为参数,比如Array,List,scala支持方便的隐式转换。

SparkContext对象还提供了range方法,通过指定长整数的开始值,结束值及步长,生成一定范围的长整型,最终调用parallelize生成RDD

这种方式的数据源收到Driver所在节点的资源限制,不适合处理特别大的数据,多适用于交互式环境下的程序调试,小数据量的数据集测试。

2.从外部数据集生成RDD

Spark可以从任何Hadoop支持的存储类型的数据源生成RDD,包括本地文件系统,HDFS,HBase,Amazom S3等,还支持文本文件,Sequence文件,以及其他任何Hadoop InputFormat格式的数据。这使得RDD从生成到计算全过程都是分布式的,不会形成资源瓶颈。

文本文件可以使用SparkContext对象的textFile方法生成,其参数是文件的地址,可以是本地文件系统的完整路径,也可以是以hdfs://、s3n://等开头的URL。

除了最基本的文本文件外,Spark还支持其他类型的文件。

•wholeTextFiles可以一次读取一个目录下的许多小文件,并返回<文件名、内容>二元组,而不是像textFile那样每条记录是一行文本,不保留文件信息。

•对于SequenceFile格式的文件(Hadoop针对大数据计算使用的一种文件格式),可以使用SparkContext对象的se-quenceFile[K, V]方法,其中K和V分别代表文件中key和value的类型,这些类型必须继承自Hadoop的Writable接口,比如IntWritable和Text。不过Spark提供了便利,可以直接使用Scala基本类型,比如Int和String,它们已经实现了Writable接口,所以我们可以直接这样使用:se-quenceFile[Int, String]。

对于HBase文件,因为HBase使用HDFS作为存储,也是一种Hadoop InputFormat,所以也是用hadoopRDD或newAPIHadoopRDD来读取数据的

此外,Spark还提供了RDD对象的序列化和反序列化功能,通过RDD.savaAsObjectFile保存的RDD序列化对象,可以使用SparkContext.objectFile重新加载进来,非常简单实用。

SparkContext对象还提供了binaryFiles方法,以二进制的形式直接读取Hadoop Mapreduce计算的结果文件。

所有RDD创建方法,都在SparkContext对象中提供,完整的信息可以参考官方API文档,值得注意的一点,从外部创建RDD并不会马上读进内存进行计算,只是保存读取它们的信息,计算时通过DAG就近分配计算资源,避免移动数据,基本不用担心单点资源瓶颈。

以上就是RDD创建的几种方法,这里只是让读者有个概念,知道有这些个方法即可,真正用到的时候根据实际场景采用合适的方法,命令什么的都是细节。

RDD操作

生成了RDD之后对其进行调用各种操作经过计算后得到预期的结果。这些操作分为两类:

Transformation(转换)

Action(动作)

Spark下所有的Transformation都是一种lazy模式,计算不会马上进行,而是先记录下计算方式,Action被触发时才会启动真正的计算向Driver程序返回结果。这种模式的好处是高效,不需要将每次Transformation的非常大的结果返回Driver,只需要记录下要对RDD进行的操作,当Action触发后将计算完成的最终小很多的结果集返回给Driver,避免了大量的网络IO,数据流动,反观mapreduce,每次迭代都需要将hdfs里的数据取出来进行计算,然后写回去,下次迭代再取,再存,即使现在hadoop也支持memory-cache,尽量避免数据频繁读取,加快计算速度,但是本质上的区别使spark和mapreduce还是有着很明显的效率差异。

默认情况下除了最终结果集之外的RDD都是临时的,被Transformation或Action使用过后即丢弃,可以对RDD进行持久化或cache操作,这也会触发Transformation进行真正的计算,而且Spark会将RDD的结果保持在集群的内存或磁盘中,甚至复制多份,这样再次访问时不需要重复计算,就可以获取更快的响应速度。

举个栗子┑( ̄Д  ̄)┍

上传测试文件到hdfs

第一行从hdfs目录下的文件生成一个名为ff的RDD

第二行生成一个对ff执行flatMap操作后名为words的RDD,flatMap这里的作用是将文档里所有的单词用空格分隔,生成一个连续的字符串。**没有执行实际文件操作**

第三行生成一个对words执行map操作后名为wordPairs的RDD,map在这里的作用是遍历文档里所有单词,如从 "a" 变成 "a,1"。**没有执行实际文件操作**

第四行生成一个对wordPairs执行reduceByKey操作后名为wordCounts的RDD,reduceByKey在这里的作用是将文档里 "key,value"相同"key"后面的value都累加起来,这里就已经实现了对hdfs中README.md文档进行词频统计的目标,该RDD就是期待最后返回的结果RDD,但是这里**仍然没有执行实际文件操作**

前面都是transcation操作,涵盖读取数据源生成RDD,中间调用各种函数对RDD进行转换,每次转换都生成一个新的RDD,最终用collect触发Action操作,这时候才会触发实际的文件操作,读数据到内存进行计算,最后返回一个结果集。

上面只是为了方便理解,实际没人那么写,一条命令直接搞定,完成词频统计就是这么简单干脆。

笔者同时还在学习一部小象学院的Spark视频教程,里面有详细讲解scala,具体形式是在spark环境中用scala做各种操作来演示scala的各种功能,因为Spark就是用scala开发的,编写程序也是用scala,那个scala的实际操作差不多就是spark日常操作的教程,到时候整理出来会在这里添加链接,敬请期待,不过那就是另一篇博客了O(∩_∩)O

2018/3/13 19:23:07

第三章 Spark工作机制

有生不生,有化不化。不生者能生生,不化者能化化。

本章主要内容是Spark底层的工作机制,包括调度管理、内存管理、容错机制、监控管理以及Spark程序配置管理,这对理解Spark程序的运行非常有帮助。

调度管理

普通程序运行在普通电脑上,比如QQ跑在笔记本上,由win10操作系统来进行统一调度分配可以使用的CPU,内存,硬盘资源,Spark集群运行在服务器的linux系统上,这时的资源主要是指CPU core数量和物理内存,Spark支持自身资源调度,master节点自身可以进行资源调度,同时支持与其他调度管理系统一起工作,yarn和mesos,这俩资源调度有俩好处,一是外部调度系统更强大,支持更灵活的调度策略,二是当Spark集群和其他分布式系统一起部署时,可以被统一统筹,避免Spark集群整体对资源的吞噬。

主要分为两类调度场景:主要是Spark程序之间的调度,另一个时Spark程序内部的调度。

集群名词解释

1.Driver程序

集群模式下,用户编写的Spark程序称为Driver程序,每个Driver程序包含一个代表集群环境的SparkContext对象并与之连接,程序的执行从Driver程序开始,中间过程会调用RDD操作(transcation和action),这些操作通过集群资源管理器来调度执行,一般在worker节点执行,所有操作执行结束后返回到Driver程序中,在Driver程序中结束。

2.SparkContext对象

每个驱动程序中都有一个SparkContext对象,担负着与集群沟通的职责。

  • sc对象联系集群管理器,分配CPU,内存等资源;

  • 集群管理器在工作节点(worker node)上启动一个执行器;

  • 程序代码被分发到相应的工作节点;

  • sc分发任务(task)至各执行器执行。

3.集群管理器

集群管理器负责集群的资源调度,Spark支持三种资源管理器

  • standalone模式,资源管理器是master节点。这是最简单的一种集群模式,不依赖于其他系统,调度策略相对单一,只支持先进先出模式(FIFO,First-In-First-Out)。
  • spark部署在hadoop上,资源管理器是yarn集群。yarn支持动态资源管理,更适合多用户场景下的集群管理,而且yarn可以同时调度Spark计算和MR计算,还可以调度其他实现了yarn调度接口的集群计算,是目前最主流的一种资源管理系统
  • Apache Mesos,资源管理器是Mesos,是一个专门用于分布式系统资源管理的开源系统,与yarn类似,用c++开发,可以对集群中的资源作弹性管理。

还有一些需要了解的名词

Allocation。即“分配”。

•App、Application、Spark程序。泛指用户编写的运行在Spark上的程序,不仅是Scala语言编写的,其他支持的语言编写的也是。

•节点、Worker节点。集群上的计算节点,一般对应一台物理机器,仅在测试时会出现一台物理机器上启动多个节点。

•Worker。每个节点上会启动一个进程,名为Worker,负责管理本节点,运行jps命令可以看到Worker进程在运行

•core。Spark标识CPU资源的方式,对应一个或多个物理CPU核心,每个Task运行时至少需要一个core。

•执行器。每个Spark程序在每个节点上启动的一个进程,专属于一个Spark程序,与Spark程序有相同的生命周期,负责Spark在节点上启动的Task,管理内存和磁盘。如果一个节点上有多个Spark程序在运行,那么相应地就会启动多个执行器。

•Job。一次RDD Action对应一次Job,会提交至资源管理器调度执行。

•Stage。Job在执行过程中被分为多个阶段。介于Job与Task之间,是按Shuffle分隔的Task集合。

•Task。在执行器上执行的最小单元。比如RDD Transfor-mation操作时对RDD内每个分区的计算都会对应一个Task。

Spark程序之间的调度

两种分配策略,静态分配和动态分配。

静态分配指Spark程序启动时一次性分配所有资源,运行过程中固定不变直至推出。

所有集群管理器都支持静态资源分配,每个Spark程序都分配一个最大可用的资源数量,在程序运行的整个过程都持有它。这种策略简单可靠,强烈建议,除非非常确定这种分配方式无法满足需求。

动态分配指运行过程中不断调整分配的资源,可以按需增加或减少,比静态分配复杂很多,需要在实践中不断调试才能达到最优。

Spark 1.2引入了基于负载情况的集群动态资源分配,分配给Spark程序的资源可以增加或减少。这意味着我们的程序可能会在不使用资源的时候将资源还给集群,需要的时候再从集群申请。这个特性在多个程序共享集群资源的时候特别有用。如果分配给Spark程序的资源中有一部分是空闲的,它就可以返还给集群放入资源池,这样可以被其他程序请求使用。在Spark中,动态资源分配的粒度是执行器,即增加或减少执行器。前面已经介绍过,Spark程序在机器的每个节点上只有一个执行器,所以增加或减少执行器意味着为Spark程序服务的节点数据量的增加或减少。通常,每台机器启动一个节点进程,但也不排除启动多个的情况发生。你可以通过设置spark.dynamicAllocation.en-abled来启动这个功能。
目前只有在yarn模式下可以使用,未来的版本也支持standalone和mesos模式。

Spark程序内部的调度

Spark程序内部,不同线程提交的Job可以并行执行,调度器是线程安全的,因此可以支持这种需要同时处理多个请求的服务型应用。

默认情况下,调度器以FIFO的方式运行Job,每个Job分成多个stage(如map和reduce阶段),如果最前面Job的Stage有task要运行,优先获取所有资源,然后才是第二个Job,依此类推。如果队列中第一个Job不需要太多资源,那么第二个Job可以马上运行,但如果第一个Job特别大,则后面的Job会明显延迟。

从Spark0.8开始,可以配置Job间公平共享资源,在公平共享方式下,采用”循环”(round robin)方式为不同Job之间的task分配资源,这样所有的Job可以获得差不多相同的资源,意味着有长时间的Job运行的情况下,短的Job也可以在提交后马上运行,不用等待长Job结束,特别适合多用户场景。

开启程序内公平调度,只需要在sc中设置spark.scheduler.mode值为FAIR。

val conf = new SparkConf().setMaster(...).setAppName(...)

conf.set("spark.scheduler.mode", "FAIR")

val sc = new SparkContext(conf)

公平调度还支持对多个Job进行分组,这个分组称为调度池,每个调度池可以设置不同的调度选项,比如想要为一些更重要的Job设置更高的优先级,比如为不同的用户设置不同的资源池,让各个资源池平等共享资源,而不是按Job来共享资源,不做设置的话,新提供的Job会自动进入默认调度池。我们可以指定让Job进入哪个调度池,具体方法是提交任务的线程在SparkContext中设置spark.scheduler.pool,这样该线程提交的所有Job都会使用这个调度池。设置按照线程来进行调度,可以很方便地让一个线程地所有Job在一个用户下。

sc.setLocalProperty("spark.scheduler.pool", "pool1")

sc.setLocalProperty("spark.scheduler.pool", null)

调度池的配置可以放在配置文件conf/fairscheduler.xml.template中,在sc中指定配置文件,每个调度池有如下三个属性。

schedulingMode,可以FIFO或FAIR,用于控制调度池内的Job是排队执行还是平均共享资源

weight,用于控制调度池相对于其他调度池的权重,是一个相对值。所有调度池的默认权重都是1,平均共享集群资源,如果某个调度池的是2,那么理论上它可以调用权重为1的资源池两倍的资源。设置一个非常高的权重,比如1k,相当于配置了各个调度池的优先级。实际上,权重为1k的调度池只要有一个Job就会优先启动。

minShare,设置最小资源值,默认值为0,公平调度器在按权限分配之前,会满足各个资源池的最小资源值,这样可以保证调度池总会获得一些资源,不会被其他高权重的调度池抢光。

。配置文件中没有出现的调度池都被设置为默认值(schedulingMode为FIFO,weight为1,minShare为0)。

<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>    

内存管理

相比MR,spark具有巨大的性能优势,很大一部分原因是spark对内存的充分利用和缓存机制。

RDD持久化

如果一个RDD不止一次被用到,那就可以持久化它,将RDD缓存到内存中,内存不够时用磁盘顶上去,这样可以大幅提升程序性能,十倍甚至更多。

默认情况下,RDD只使用一次,用完即扔,再次使用需要重新计算得到,而持久化操作避免了重复计算,这就是Spark刚出现时被人称作内存计算的原因。

持久化的方法是调用persist()函数,RDD.unpersist()可以删除持久化。默认只持久化到内存中,可以在调用持久化函数时添加参数设置存储级别,有memory_and_disk,memory_only_ser,memore_and_disk_ser,disk_only,其中memory_only_ser类似默认的memory_only,格式是序列化后的数据,节省内存更消耗CPU

容错机制

分布式系统通常运行在一个机器集群,同时运行的几百台机器中有机器发生故障的概率大大增加,容错设计是分布式系统的重要能力。

Spark以前的集群容错机制,如MR,将计算转换为一个有向无环图(DAG)的任务集合,通过重复执行DAG里的一部分任务来完成容错。由于主要的数据存储在分布式文件系统中,没有提供其他存储的概念,容错过程需要在网络上进行数据复制,增加了大量的消耗,所以分布式编程经常需要做检查点,将某个时刻的中间数据写到存储中。

RDD处理过程也是一个DAG,每个RDD都会记住创建该数据集需要的操作,自己是由哪个RDD转化而来,这个继承关系叫lineage(血统),由于创建RDD的操作是相对粗粒度的变换(如map,filter,join),很多单一的操作应用于许多数据元素,不需要存储真正的数据,当RDD的某个分区丢失时,RDD有足够的信息记录其如何通过其他RDD进行计算,且只需要计算该分区。

lineage不是完美的,因为RDD之间的依赖有两种,父分区对应多个子分区的宽依赖和父分区对应一个子分区的窄依赖。

对于窄依赖,只需要重新计算丢失的那一块数据,如map,filter,union,分区相同的join,容错成本较小,但宽依赖容错重算分区时,就会有大量冗余计算。

所以不同的应用有时需要使用doCheckPoint适当设置数据检查点,RDD的只读特性使得它很容易做检查点。

某些场景中容错要求更高更复杂,比如计费服务要求零丢失,流计算应用场景系统上游不断产生数据,容错过程会造成数据丢失。为了解决这些问题,spark又提供了预写日志,先将数据写入支持容错的文件系统,然后对数据进行操作。

这样,容错机制包括从检查点重新计算恢复,从日志恢复,从数据源重发,实现了零丢失。

Master使用zookeeper容错,在spark-env.sh中添加选项,设置master的恢复模式为zookeeper,设置zookeeper的集群地址和用于恢复的zookeeper目录。或者采用一种更简单的方式,设置恢复模式为filesystem并设置一个恢复目录,该目录会存储必要的恢复信息,当master进程异常时,重启master进程即可。

slave节点运行着worker,执行器,Driver程序。

Worker异常停止时,会先将自己启动的执行器停止,Driver需要有相应的程序来重启Worker进程。

执行器异常退出时,Driver没有在规定时间内收到执行器的StatusUpdate,于是Driver会将注册的执行器移除,Worker收到LaunchExecutor指令,再次启动执行器。

Driver异常退出时,一般要使用检查点重启Driver,重新构造上下文并重启接收器。第一步,恢复检查点记录的元数据块。第二步,未完成作业的重新形成。由于失败而没有处理完成的RDD,将使用恢复的元数据重新生成RDD,然后运行后续的Job重新计算后恢复。

监控管理

可以用多种方式来监控Spark程序的运行状况:web界面,metrics,外部系统。

web:每个Driver的SparkContext都会启动一个Web界面,默认端口是4040,用于显示程序的许多有用信息,包括:调度器Stage、Task列表,RDD大小和内存使用统计概况,环境信息,正在运行的执行器信息。浏览器输入http://xxx.xxx.xxx.xxx:4040即可访问,

自定义:开发人员可以通过JSON接口自己开发可视化展示形式,历史服务器和正在运行的spark程序的web界面都支持rest api,所有入口地址都有严格的版本控制,spark保证:
这些地址永远不会在某个版本中删除,任何地址中提供的每个字段都不会被删除;
可能添加新的入口地址;
现有入口地址可能添加新的字段;
每个入口地址在未来都可能添加新版本的API(比如api/V2),新版本不要求向下兼容老版本;
API版本可能被废弃,但前提是新的版本可以与之共存,而且新版本不只是发布主版本,至少有一个次版本发布。

metrics指标体系:spark支持基于Coda Hale Metrics Library的指标体系,可以主动将运行状态发送给其他系统,方便与其他监控系统进行集成,比如Ganglia。Spark支持的Metrics实例有Master、Applications、Worker、执行器和Driver,支持的接收者类型包含在org.apache.spark.metrics.sink包中,包括ConsoleSink、CSVSink、JmxSink、MetricsServlet、GraphiteSink和Slf4jSink。

其他监控工具:集群级别可以使用各种工具来监控各节点的CPU,网络,磁盘等负载情况,如zabbix,nagios,ganglia运维工具,操作系统级别可以使用dstat,iostat,iotop,top等linux工具对单点问题进行定位。

Spark程序配置管理

spark配置文件默认模式是${SPARK_HOME}/conf,可更改的配置项自行按需操作。

spark属性项的配置可以在三个地方进行设置,优先级依次是 sc对象,命令行参数,spark-defaults.conf。

可以在spark-defults.conf文件里面可以设置spark运行的诸多参数

可以在用命令行./bin/spark-submit提交程序时重新指定参数,

./bin/spark-submit --name "My app" --master local[4] --conf spark.shuffle.spill=false --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar

在开发spark程序时可以把参数写在代码里,sc对象的优先级最高,不过灵活性差了点,改参数就得改代码。

val conf = new SparkConf()    
    .setMaster("local[2]")    
    .setAppName("CountingSheep")    
    .set("spark.executor.memory", "1g")

val sc = new SparkContext(conf)    

在web界面的environment选项里可以看到所有配置的spark属性

日志配置是另一项单独的配置,使用配置文件目录下的log4j.properties作为配置文件。

2018/3/14 20:19:48

第四章 Spark内核讲解

惟仁人能受直谏,不恶至情

深入Spark内核,并结合源码,介绍了核心结构RDD、RDD对象的Transformation和Action操作是如何实现的、SparkContext对象及初始化过程、DAG调度的工作流程。了解这些内容可以帮助读者编写出高质量的Spark程序代码。

RDD

前面介绍了RDD的创建途径和RDD上的函数都有哪些,那部分内容让读者会使用RDD,不过知其然也要知其所以然,这部分内容会详细介绍RDD的本质,函数实现。

RDD的定义

RDD全称”弹性分布式数据集”(Resilient Distributed Dataset)。

首先,它是一个数据集,就像scala语言中的Array,List,Tuple,set一样是一个数据集合,里面的数据是平铺的,可以顺序遍历,而Array,List这些对象拥有的许多操作RDD对象也有,如flatmap,map,filter,reduce,groupby等

其次,RDD是分布存储的,RDD的分区成员被水平切割成小的数据块,分散在集群的多个节点上,便于对RDD里的数据进行并行计算。

最后,RDD的分布是弹性的,RDD的一些操作可以被拆分成对各个数据块进行直接计算,不涉及其他节点,比如map,这样的操作在数据块所在节点上直接进行,不影响RDD分布,除非故障转换。但有些必须访问RDD所有数据块的操作,比如groupBy,做groupBy之前完全不知道key的分布,必须遍历RDD的所有数据块,将具有相同key的元素汇聚在一起,这样RDD的分布就完全重组。

RDD还有一些特点,如只读,一旦生成,内容不能修改;比如可指定还存在内存中,一般除了最终的结果RDD需要返回之外,其他计算中的RDD都是用过即丢弃,可以将以后需要多次使用的RDD缓存到内存中来避免重复计算,这在机器学习这种需要反复迭代的计算场景极为有用;比如RDD的血统特征,每个RDD都会记录如何通过父RDD计算得到,通过记录足够的计算过程,在需要时从头或者从某个缓存到内存中的RDD或者检查点重新计算来实现高可用性,而不是复制。

一个RDD包含五个核心属性。

一个分区列表,每个分区是RDD的部分数据

一个依赖列表,存储依赖的其他RDD

一个名为compute的计算函数,用于计算RDD各分区的值

分区器,用于键/值类型的RDD,比如某个RDD按散列(hash)来分区

计算各分区时优先的位置列表,比如从hdfs上的文件生成RDD时,RDD分区的位置优先选择数据所在的节点,避免数据移动带来的开销

分区与依赖:

// 依赖关系定义在一个Seq数据集中,类型是Dependency
// 有检查点时,这些信息会被重写,指向检查点private var dependencies_ : Seq[Dependency[_]] = null
// 分区定义在Array数据中,类型是Partition,没用Seq,这主要考虑到随时需要通过下标来访问或更新
// 分区内容,而dependencies_使用Seq是因为它的使用场景一般是取第一个成员或遍历

计算函数:

/**  
 * compute方法由子类来实现,对输入的RDD分区进行计算 
 */def compute(split: Partition, context: TaskContext): Iterator[T]

分区器:
/* 可选,子类可以重写以指定新的分区方式。Spark支持两种分区方式:Hash和Range /@transient val partitioner: Option[Partitioner] = None

优先计算位置:
/ 可选,子类可以指定分区优先的位置,比如HadoopRDD会重写此方法,让分区尽可能与数据在相同的节点上 /protected def getPreferredLocations(split: Partition): Seq[String] = Nil
/

 * RDD提供统一的调用方法,统一处理检查点问题 
 */
final def preferredLocations(split: Partition): 
Seq[String] = {    

checkpointRDD.map(_.getPreferredLocations(split)).getOrElse 

{        
      getPreferredLocations(split)
}
               }

RDD的调度和计算都基于这五个属性,每个transformation操作都会生成一个新的RDD,不同操作也可能返回相同类型的RDD,只是计算方法参数不同。比如map,filter,flatmap都会生成MapPartitionsRDD类型的RDD。

transformation和action

RDD的transformation是指一个RDD生成新的RDD的过程,所有的RDD的transformation都只是生成了RDD之间的计算关系及计算方法,并没有进行真正的计算。

比如这个例子里前面的flatmap,map,reduceBykey都没有进行真正的计算,每个操作会生成新的一个RDD对象,它们按照依赖关系串在一起,像一个链表,每个对象有一个指向父节点的指针,保存有如何从父节点计算得到的信息。

执行map操作就会生成一个新的RDD对象,其类型是MapPartitionsRDD(它是RDD的子类),MapPartitionsRDD最主要的工作是用变量保存传入的计算函数,以便compute调用它来进行计算。其他4个重要属性基本保持不变:分区和优先计算位置没有重新定义,保持不变,依赖关系默认依赖调用的RDD,分区器优先使用上一级RDD的分区器,否则为None。

宽依赖和窄依赖有本质区别,窄依赖可以精确知道依赖的上级RDD的分区,一般情况下,会选择与自己在同一节点的上级RDD分区,这样计算过程都在同一节点进行,没有网络IO开销,非常高效,常见的map、flatMap、filter操作都是这一类,而宽依赖相当于依赖所有分区(想想reduceByKey计算,需要对所有的key重新排列)。计算时涉及所有节点之间的数据传输,开销巨大。所以,以Shuffle依赖为分隔,Task被分成Stage,方便计算时的管理。

RDD通过维持依赖关系来保证高可用,但如果依赖链太长,重新计算的代价就太大,用检查点机制对中间RDD存一份快照,比如7*24小时的流计算,如果没有检查点机制,依赖链无限扩充,容错将完全没有意义。

RDD的Action是相对于transformation的另一种操作,transformation代表计算的中间过程,从一个RDD生成新的RDD,而Action代表计算的结束,Action调用后,不再生成新的RDD,结果返回Driver程序。每个Action都会调用SparkContext的runJob方法向集群正式提交请求,所以每个Action对应一个Job。

shuffle

当对一个RDD的某个分区进行操作而无法精确知道依赖前一个RDD的哪个分区时,依赖关系变成了依赖前一个RDD的所有分区。比如,几乎所有类型的RDD操作,都涉及按key对RDD成员进行重组,将具有相同key但分布在不同节点上的成员聚合到一个节点上,以便对它们的value进行操作。这个重组的过程就是Shuffle操作。因为Shuffle操作会涉及数据的传输,所以成本特别高,而且过程复杂。

Shuffle是一个非常消耗资源的操作,除了会涉及大量网络IO操作并使用大量内存外,还会在磁盘上生成大量临时文件,以避免错误恢复时重新计算。因为Shuffle操作的结果其实是一次调度的Stage的结果,而一次Stage包含许多Task,缓存下来还是很划算的。Shuffle使用的本地磁盘目录由spark.local.dir属性项指定。

SparkContext

SparkContext是Spark程序最主要的入口,用于与Spark集群连接。与Spark集群的所有操作,都通过sc来进行。利用sc可以创建RDD,计数器及广播变量。

所有Spark程序必须创建一个SparkContext对象,进行流式计算的StreamingContext,进行SQL计算的SQLContext,都会关联一个现有的SparkContext或者隐式创建一个SparkContext。

初始化时只需要一个SparkConf配置对象作为参数。

每个JVM只允许启动一个SparkContext,否则默认会抛出异常

初始化过程

SparkContext在构造的过程中,已经完成各项服务的启动。因为Scala语法的特点,所有构造函数都会调用默认定义在类中的构造函数,除了初始化各类配置,日志之外,最重要的初始化操作之一时启动Task调度器和DAG调度器

// 创建并启动Task调度器
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.send(TaskSchedulerIsSet)
// 创建DAG调度器,并引用之前创建的Task调度器之后,
// 再启动Task调度器
_taskScheduler.start()

DAG调度与task调度的区别,

其他功能接口

SparkContext除了可以初始化环境,连接Spark集群外,还提供了非常多的功能入口,具体如下。

创建RDD,所有创建RDD的方法都在SparkContext中定义,比如parallelize和textFile,newAPIHadoopFile等

RDD持久化,persistRDD,unpersistRDD也在sc中定义

创建共享变量,包括计数器和广播变量

stop(),停止sc

runJob,提交RDD Action操作,这是所有调度执行的入口

DAG

SparkContext在初始化时,创建了DAG调度与Task调度来负责RDD Action操作的调度执行。

DAG是最高层级的调度,为每个Job绘制出一个有向无环图(简称DAG),跟踪各stage的输出,计算完成Job的最短路径,并将Task提交给Task调度器来执行,而Task调度器只接受DAG调度器的请求,负责Task的实际调度执行,所以DAGScheduler的初始化必须在Task调度器之后。这种设计的好处是Spark可以灵活设计自己的DAG调度,还能与其他资源调度系统结合,比如yarn和mesos,

DAGScheduler

初始化时除了需要一个SparkContext对象外,最重要的是需要输入一个TaskScheduler对象来负责Task的执行:

1.runJob过程
所有需要执行的RDD Action,都会调用SparkContext.run-Job来提交任务,而SparkContext.runJob调用的是DAGScheduler.runJob。下面是runJob的定义:

runJob调用submitJob提交任务,并等待任务结束。

任务提交后的处理过程如下:

submitJob生成新的JobID,发消息到JobSubmitted

DAG收到JobSubmitted消息,调用handleJobSubmitted来处理

handleJobSubmitted创建一个ResultStage,并使用submitStage来提交这个ResultStage

Spark的执行过程是“懒惰”(lazy)的,这在这里得到了完整的体现。任务提交时,不是按Job的先后顺序提交的,而是倒序的。每个Job的最后一个操作是Action操作,DAG把这最后的Action操作当作一个Stage,首先提交,然后逆向逐级递归填补缺少的上级Stage,从而生成一棵实现最后Action操作的最短的(因为都是必须的)有向无环图,然后再从头开始计算,这是一个逆向递归的过程,先查找所有缺失的上级Stage并提交,待所有上级Stage都提交执行了,才轮到执行当前Stage对应的Task。查找上级Stage的过程,其实就是递归向上遍历所有RDD依赖列表并生成Stage的过程,

TaskScheduler

TaskScheduler是低级别的调度接口,允许实现不同的Task调度器,目前,已经实现的Task调度器除了自带的以外,还有yarn和mesos,每个TaskScheduler对象只服务于一个SparkContext的Task调度,从DAGscheduler的每个stage接收一组task,并负责将它们发送到集群上运行,出错了重试,最后返回消息给DAG。

TaskScheduler的主要接口包括一个钩子接口(也称hook,表示定义好之后,不是用户主动调用的),被调用的时机是在初始化完成之后和调度启动之前:

2018/3/15 17:50:47 

本文标题:Spark最佳实践(上)

文章作者:Chaos

发布时间:2018年02月28日 - 13:02

最后更新:2018年03月16日 - 02:03

原始链接:http://www.lovepanda.tk/2018/02/28/Book-Spark最佳实践(上)/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。