Fork me on GitHub

Spark实践

简介

Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎,一个用来实现快速而通用的集群计算的平台,扩展了广泛使用的MapReduce计算模型,能高效支持更多的计算模式,包括交互式查询和流处理。在处理大规模数据集的时候有优势,Spark的一个重要特点就是能够在内存中计算,因而比MapReduce更快,即使在磁盘上进行的复杂计算,Spark依然更加高效。

软件 下载地址
jdk-8u161-linux-x64.tar.gz 点击下载
hadoop-2.7.2.tar.gz 点击下载
scala-2.11.7.tgz 点击下载
spark-2.2.0-bin-hadoop2.7.tgz 点击下载
hadoop-native-64-2.7.0.tar 点击下载
服务器 节点
192.168.4.50 SparkMaster
192.168.4.237 SparkWorker1
192.168.4.238 SparkWorker2
192.168.4.48 SparkWorker3
192.168.4.49 SparkWorker4

Hadoop

运行环境配置

请务必关闭防火墙,包括iptablesselinux

java

三台虚拟机均需配置java环境,下文所示环境是包括hadoop,scala,spark软件在内的环境变量最终形态,如果新玩家要入手,直接配置成这样纸,后面的软件注意存放在指定路径即可。

$ tar xvf jdk-8u161-linux-x64.tar.gz  -C /usr/local/ 
$ vi /root/.bash_profile
$ source /root/.bash_profile
export PATH
export JAVA_HOME=/usr/local/jdk1.8.0_161
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=$JAVA_HOME/lib:$JRE_HOME/lib
export SCALA_HOME=/usr/local/scala-2.11.7
export SPARK_HOME=/usr/local/spark-2.2.0-bin-hadoop2.7
export PATH=$JAVA_HOME/bin:/usr/local/hadoop-2.7.2/bin:/usr/local/hadoop-2.7.2/sbin:$SCALA_HOME/bin:$SPARK_HOME/bin:$PATH
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native  
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib"

主机名称

各节点均要更改,图为主节点示例,需要修改/etc/hosts/etc/sysconfig/network两个文件,注意hosts文件各节点一致,network文件各节点需分别修改成自己的hostname

ssh

在各虚拟机上执行命令生成密钥

# ssh-keygen 

ssh将各个机器的密钥拿来主节点(以下语句皆在SparkMaster上执行)合成authorized_keys,然后分发到各个节点

$ ssh root@192.168.4.237 cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys

237替换成238,50,48,49依次执行,完成后会在主节点生成/root/.ssh/authorized_keys,是一份包含了五台机器公钥信息的文件

$ scp authorized_keys root@192.168.4.237:/root/.ssh/

237替换成238,50,48,49依次执行,将包含了五台服务器的公钥文件分发到所有服务器,所有节点之间都可以无密码ssh操作

配置互信的原因:namenode(即主节点)如果没有对datanode(即数据节点)的ssh免登陆权限,那么namenode起每个datanode的服务都需要输入密码,多节点的情况极为影响效率,所以namenode与datanode间必须配置互信,而datanode之间没有ssh通信的需求,所以上文是一种绝对可行但相对麻烦和不必要的操作,如果多节点的大规模hadoop配置互信建议使用下面这种ssh自带的方法,可以使用类似ansible这样的工具批量执行。

$ ssh-keygen 

生成密钥

$ ssh-copy-id -i /root/.ssh/id_rsa.pub root@192.168.4.50 

把datanode生成的公钥发送到namenode,namedode拥有所有datanode的公钥即可,datanode不必持有其他datanode或者namenode的公钥

软件包解压

在namenode解压并修改配置文件,用rsync把修改后的/usr/local/hadoop-2.7.2文件夹同步到其他节点

$ tar xvf hadoop-2.7.2.tar.gz -C /usr/local/

$ mkdir -p {tmp,hdfs/{data,name}}

tmp用来存储临时生成的文件

hdfs用来存储集群数据

hdfs/data用来存储真正的数据

hdfs/name用来存储文件系统元数据

修改配置文件

下列文件绝对路径:/usr/local/hadoop-2.7.2/etc/hadoop/

hadoop-env.sh

将 export JAVA_HOME=${JAVA_HOME}

改成 export JAVA_HOME=/usr/local/jdk1.8.0_161

core-site.xml

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://SparkMaster:9000</value>
    </property>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>file:/usr/local/hadoop-2.7.2/tmp</value>
    </property>
    <property>
        <name>io.file.buffer.size</name>
        <value>131072</value>
    </property>
</configuration>

mapred-site.xml

$ cp mapred-site.xml.template mapred-site.xml

<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
    <property>
        <name>mapreduce.jobhistory.address</name>
        <value>SparkMaster:10020</value>
    </property>
    <property>
        <name>mapreduce.jobhistory.webapp.address</name>
        <value>SparkMaster:19888</value>
    </property>
</configuration>

hdfs-site.xml

<configuration>
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>file:/usr/local/hadoop-2.7.2/hdfs/name</value>
    </property>
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>file:/usr/local/hadoop-2.7.2/hdfs/data</value>
    </property>
    <property>
        <name>dfs.replication</name>
        <value>2</value>
    </property>
    <property>
        <name>dfs.namenode.secondary.http-address</name>
        <value>SparkMaster:9001</value>
    </property>
    <property>
    <name>dfs.webhdfs.enabled</name>
    <value>true</value>
    </property>
</configuration>

yarn-site.xml

<configuration>
        <property>
               <name>yarn.nodemanager.aux-services</name>
               <value>mapreduce_shuffle</value>
        </property>
        <property>
               <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
               <value>org.apache.hadoop.mapred.ShuffleHandler</value>
        </property>
        <property>
               <name>yarn.resourcemanager.address</name>
               <value>SparkMaster:8032</value>
       </property>
       <property>
               <name>yarn.resourcemanager.scheduler.address</name>
               <value>SparkMaster:8030</value>
       </property>
       <property>
               <name>yarn.resourcemanager.resource-tracker.address</name>
               <value>SparkMaster:8031</value>
       </property>
       <property>
               <name>yarn.resourcemanager.admin.address</name>
               <value>SparkMaster:8033</value>
       </property>
       <property>
               <name>yarn.resourcemanager.webapp.address</name>
               <value>SparkMaster:8088</value>
       </property>
</configuration>

slaves

SparkWorker1
SparkWorker2
SparkWorker3
SparkWorker4

所有节点的文件都需要是经过修改后的状态,利用rsync将namenode修改过的文件夹直接同步到其他节点

$ rsync -av /usr/local/hadoop-2.7.2 SparkWorker1:/usr/local/hadoop-2.7.2

SparkWorker1替换成slaves里其他节点依次执行,所有节点都有了修改过配置文件的hadoop文件夹后集群准备工作完成。

启动并验证服务

hadoop namenode -format  格式化

start-all.sh 或者 start-dfs.sh && start-yarn.sh   
每个节点都有完整的配置文件和命令,任何节点都可以执行集群的起停操作,都可以成功启动hdfs和yarn   

jps  查看各个节点进程状态

hadoop集群已经正常启动

简单测试

$ hadoop fs -put /tmp/input.txt /

上传待处理文件到hdfs中

$ hadoop jar /usr/local/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar wordcount /input.txt /output.txt

调用安装包自带jar包对hdfs中存储的待处理文件执行词频统计,这里就是诸位大数据开发工程师大显身手的地方,写好的程序打成jar包放在这里供hadoop调用,examples是hadoop自带练手用的测试包。

$ hadoop fs -ls /output.txt

可以看到已经生成了一个包含着若干文件的output.txt文件夹,存放着调用jar包函数后输出的处理结果,给文件夹起output.txt这种名字,皮一下就很开心

$ hadoop fs -text /output.txt/part-r-00000

Scala

安装包

$ tar xvf /tmp/scala-2.11.7.tgz -C /usr/local

上传解压安装包到指定目录

环境变量

$ vi /root/.bash_profile

添加 /usr/local/scala-2.11.7/bin 到PATH路径中

同步并验证

scala -version

一个词,rsync,有点灵性啊同学们!

Spark

安装包

$ tar xvf /tmp/spark-2.2.0-bin-hadoop2.7.tgz -C /usr/local

环境变量及配置文件

$ vi /root/.bash_profile

添加 /usr/local/spark-2.2.0-bin-hadoop2.7/bin到环境变量

spark-env.sh

export JAVA_HOME=/usr/local/jdk1.8.0_161

export SCALA_HOME=/usr/scala-2.11.7

export HADOOP_HOME=/usr/local/hadoop-2.7.2

export HADOOP_CONF_DIR=/usr/local/hadoop-2.7.2/etc/hadoop

export SPARK_MASTER_IP=SparkMaster

export SPARK_WORKER_MEMORY=4g

export SPARK_WORKER_CORES=2

export SPARK_WORKER_INSTANCES=1

slaves

同步并验证

将修改过配置文件的文件夹全部同步到数据节点

$ start-dfs.sh

spark只使用hdfs文件系统,并不用启动所有功能

$ /usr/local/spark-2.2.0-bin-hadoop2.7/sbin/start-all.sh

启动spark集群,建议使用绝对路径

成功启动后使用jps可以在namenode看到

30049 Jps
29655 ResourceManager
29501 SecondaryNameNode
29311 NameNode

可以在datanode看到

4208 Jps
3937 DataNode
4042 NodeManager

至此spark集群全部搭建完成

可以通过spark的webui界面访问控制台

可以通过spark-shell执行各种操作,执行transcation action任务,对存放在hdfs里的数据集进行分词,统计,排序

可以执行用python,scala,java等语言打包好的jar包程序

ENJOY IT!

遇到的坑

1.请务必关闭防火墙,包括iptables和selinux,否则会导致集群无法通信,页面无法访问等报错,如java.net.NoRouteToHostException:没有找到主机的路由

2.启动后有警告Unable to load native-hadoop library for your platform... using builtin-java classes where applicable,因为Apache提供的hadoop本地库是32位的,而在64位的服务器上就会有问题,因此需要自己编译64位的版本,解决方案是找个包,解压到目录并修改环境变量,操作后问题解决,原作者环境跟我的环境不一样,不确定是不是所有人都会碰到这个问题

$ tar -xvf hadoop-native-64-2.7.0.tar -C /usr/local/hadoop-2.7.2/lib/native 

$ tar -xvf hadoop-native-64-2.7.0.tar -C /usr/local/hadoop-2.7.2/lib

3.第一次启动后进程都正常,页面无法访问,查看监听状态发现监听到了ipv6的端口,hadoop的配置文件明明标注的是监听到ipv4的端口,解决方案如下。

vi /etc/sysctl.conf
添加以下内容并执行stop-all.sh和start-all.sh重启hadoop服务
#disable ipv6
net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1
net.ipv6.conf.lo.disable_ipv6 = 1

4.数据节点jps没有datanode进程的情况

多次format文件系统可能会出现datanode无法启动的情况,解决方案是删除数据节点的hdfs及tmp目录,重新启动即可。

原因是执行hadoop namenode -format后会在namenode/usr/lcoal/hadoop-2.7.2/hdfs/name/current/文件夹下生成记录了namespaceIDclusterIDblockpoolIDVERSION文件,然后执行start-dfs.sh命令启动hdfs,这时datanode的/usr/local/hadoop-2.7.2/hdfs/data/文件夹下会生成记录了集群信息的文件。

hadoop启动关闭hdfs时需要读取namenode及datanode的集群信息,二次format的时候namenode的相关信息会自动更新,但datanode的原有信息不会自动删除,启动的时候namenode优先读取该节点原有信息,这样本来准备起datanode,一看有记录,要写入的信息不写了,按原有记录来,结果原有记录的信息是format之前集群的信息,hadoop以为那是其他集群的数据节点,自然不会带起来datanode进程。