本文共 8049 字,大约阅读时间需要 26 分钟。
Spark是Apache的一个顶级项目,Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。
Spark的计算速度也要比MapReduce快得多,它有一个先进的DAG执行引擎,支持非循环的数据流和内存计算。官网介绍说在使用内存的情况下快100倍,而使用磁盘的情况下快10倍。
而且Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。
Spark也比MapReduce要易于使用,并且可以使用Java, Scala, Python, R等语言进行开发。Spark 提供了80多个高级API,可以很容易地实现并行计算的应用程序。还可以通过Scala、Python和R shells等交互式命令行,交互地使用它。
Spark 主要有四个特点:
高级 API 剥离了对集群本身的关注,Spark 应用开发者可以专注于应用所要做的计算本身。下图是python使用Spark API的代码:
Spark 很快,支持交互式计算和复杂算法以及非循环的数据流和内存计算。下图是官网上展示的MapReduce与Spark进行回归计算时,计算速度的对比图:
Spark官网地址:
Spark的生态系统简称BDAS。如下图:
Hadoop生态圈对比Spark BDAS:
Hadoop对比Spark:
MapReduce对比Spark:
Spark支持的开发语言:
Spark运行模式:
安装Scala时,需要先准备好JDK环境,而我这里已经准备好jdk1.8的环境了。
Scala官网下载地址:
下载Scala:
[root@study-01 ~]# cd /usr/local/src[root@study-01 /usr/local/src]# wget https://downloads.lightbend.com/scala/2.12.5/scala-2.12.5.tgz
解压:
[root@study-01 /usr/local/src]# tar -zxvf scala-2.12.5.tgz -C /usr/local/[root@study-01 /usr/local/src]# cd ../[root@study-01 /usr/local]# lsbin etc games include lib lib64 libexec sbin scala-2.12.5 share src[root@study-01 /usr/local]# cd scala-2.12.5/[root@study-01 /usr/local/scala-2.12.5]# lsbin doc lib man[root@study-01 /usr/local/scala-2.12.5]#
配置环境变量:
[root@study-01 ~]# vim .bash_profile # 更改如下内容export SCALA_HOME=/usr/local/scala-2.12.5PATH=$PATH:$HOME/bin:$JAVA_HOME/bin:$SCALA_HOME/binexport PATH[root@study-01 ~]# source .bash_profile[root@study-01 ~]# scala # 测试能否执行scala命令Welcome to Scala 2.12.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_161).Type in expressions for evaluation. Or try :help.scala>
Maven官网下载地址:
下载并解压:
[root@study-01 ~]# cd /usr/local/src/[root@study-01 /usr/local/src]# wget http://mirror.bit.edu.cn/apache/maven/maven-3/3.5.2/binaries/apache-maven-3.5.2-bin.tar.gz[root@study-01 /usr/local/src]# tar -zxvf apache-maven-3.5.2-bin.tar.gz -C /usr/local[root@study-01 /usr/local/src]# cd ../apache-maven-3.5.2/[root@study-01 /usr/local/apache-maven-3.5.2]# lsbin boot conf lib LICENSE NOTICE README.txt[root@study-01 /usr/local/apache-maven-3.5.2]#
配置环境变量:
[root@study-01 ~]# vim .bash_profile # 更改如下内容export MAVEN_HOME=/usr/local/apache-maven-3.5.2PATH=$PATH:$HOME/bin:$JAVA_HOME/bin:$SCALA_HOME/bin:$MAVEN_HOME/bin[root@study-01 ~]# source .bash_profile[root@study-01 ~]# mvn --version # 测试能否执行mvn命令Apache Maven 3.5.2 (138edd61fd100ec658bfa2d307c43b76940a5d7d; 2017-10-18T15:58:13+08:00)Maven home: /usr/local/apache-maven-3.5.2Java version: 1.8.0_161, vendor: Oracle CorporationJava home: /usr/local/jdk1.8/jreDefault locale: zh_CN, platform encoding: UTF-8OS name: "linux", version: "3.10.0-327.el7.x86_64", arch: "amd64", family: "unix"[root@study-01 ~]#
Spark官网下载地址:
我这里下载的是2.1.0版本的源码包,官网的编译安装文档:
从官网的介绍,我们得知:
下载Spark2.1.0版本的源码包:
下载并解压:
[root@study-01 /usr/local/src]# wget https://archive.apache.org/dist/spark/spark-2.1.0/spark-2.1.0.tgz[root@study-01 /usr/local/src]# tar -zxvf spark-2.1.0.tgz -C /usr/local[root@study-01 /usr/local/src]# cd ../spark-2.1.0/[root@study-01 /usr/local/spark-2.1.0]# lsappveyor.yml common data external licenses NOTICE R scalastyle-config.xml yarnassembly conf dev graphx mesos pom.xml README.md sqlbin CONTRIBUTING.md docs launcher mllib project repl streamingbuild core examples LICENSE mllib-local python sbin tools[root@study-01 /usr/local/spark-2.1.0]#
安装完成之后我们还需要使用Spark源码目录中的dev下的make-distribution.sh脚本进行编译,官方提供的编译命令如下:
./dev/make-distribution.sh --name custom-spark --tgz -Psparkr -Phadoop-2.4 -Phive -Phive-thriftserver -Pmesos -Pyarn
参数说明:
那么我们就可以根据具体的条件来编译Spark,比如我们使用的Hadoop版本是2.6.0-cdh5.7.0,并且我们需要将Spark运行在YARN上、支持对Hive的操作,那么我们的Spark源码编译脚本就是:
[root@study-01 /usr/local/spark-2.1.0]# ./dev/make-distribution.sh --name 2.6.0-cdh5.7.0 --tgz -Pyarn -Phadoop-2.6 -Phive -Phive-thriftserver -Dhadoop.version=2.6.0-cdh5.7.0
但是在执行这个命令之前我们先需要编辑pom.xml文件,增加cdh的maven repository:
[root@study-01 /usr/local/spark-2.1.0]# vim pom.xml # 在标签内,加入如下内容 [root@study-01 /usr/local/spark-2.1.0]# cloudera https://repository.cloudera.com/artifactory/cloudera-repos/
然后还需要更改编译脚本的mvn命令路径,因为使用自带的mvn编译有些慢:
[root@study-01 /usr/local/spark-2.1.0]# vim dev/make-distribution.shMVN="$MAVEN_HOME/bin/mvn"[root@study-01 /usr/local/spark-2.1.0]#
完成以上的修改后,就可以执行编译命令了,编译的过程会有些慢(我这里编译了半个多小时)。而且内存尽量分配得大一些,避免内存不足导致编译中断。
编译完成之后,spark目录下会增加一个.tgz的文件,把这个文件解压到/usr/local/目录下:
[root@study-01 /usr/local/spark-2.1.0]# ls |grep *.tgzspark-2.1.0-bin-2.6.0-cdh5.7.0.tgz[root@study-01 /usr/local/spark-2.1.0]# tar -zxvf spark-2.1.0-bin-2.6.0-cdh5.7.0.tgz -C /usr/local[root@study-01 /usr/local/spark-2.1.0]# cd ../spark-2.1.0-bin-2.6.0-cdh5.7.0/[root@study-01 /usr/local/spark-2.1.0-bin-2.6.0-cdh5.7.0]# lsbin conf data examples jars LICENSE licenses NOTICE python README.md RELEASE sbin yarn[root@study-01 /usr/local/spark-2.1.0-bin-2.6.0-cdh5.7.0]#
到此为止,我们的spark就安装完成了。接下来我们尝试一下启动Spark的shell终端:
[root@study-01 /usr/local/spark-2.1.0-bin-2.6.0-cdh5.7.0]# ./bin/spark-shell --master local[2]
命令说明:
关于启动spark shell的官方文档说明:
启动成功:
启动成功后,我们来实现wordcount的案例。官网的快速入门文档:
现在有一个文件,内容如下:
[root@study-01 /data]# cat hello.txt hadoop welcomehadoop hdfs mapreducehadoop hdfshello hadoopspark vs mapreduce[root@study-01 /data]#
在spark shell里完成对该文件的wordcount:
scala> val file = sc.textFile("file:///data/hello.txt") # 读取文件file: org.apache.spark.rdd.RDD[String] = file:///data/hello.txt MapPartitionsRDD[1] at textFile at:24scala> file.collect # 打印读取的数据res1: Array[String] = Array(hadoop welcome, hadoop hdfs mapreduce, hadoop hdfs, hello hadoop, spark vs mapreduce)scala> val a = file.flatMap(line => line.split(" ")) # 按空格进行拆分a: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at :26scala> a.collectres2: Array[String] = Array(hadoop, welcome, hadoop, hdfs, mapreduce, hadoop, hdfs, hello, hadoop, spark, vs, mapreduce)scala> val b = a.map(word => (word,1)) # 进行map操作,给每个单词附上1b: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at :28scala> b.collectres3: Array[(String, Int)] = Array((hadoop,1), (welcome,1), (hadoop,1), (hdfs,1), (mapreduce,1), (hadoop,1), (hdfs,1), (hello,1), (hadoop,1), (spark,1), (vs,1), (mapreduce,1))scala> val c = b.reduceByKey(_ + _) # 进行Reduce操作,把每个相同key的值相加,并整合在一起c: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at :30scala> c.collectres4: Array[(String, Int)] = Array((mapreduce,2), (hello,1), (welcome,1), (spark,1), (hadoop,4), (hdfs,2), (vs,1))scala>
如上,可以看到,通过简单的交互式的代码我们就完成了对文件的词频统计,并且这些方法都可以形成一个方法链的调用,所以其实一句代码就可以完成wordcount了,如下示例:
scala> sc.textFile("file:///data/hello.txt").flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).collectres5: Array[(String, Int)] = Array((mapreduce,2), (hello,1), (welcome,1), (spark,1), (hadoop,4), (hdfs,2), (vs,1))scala>
我们还可以在web页面上看到任务执行的信息,访问主机ip的4040端口即可,如下:
转载于:https://blog.51cto.com/zero01/2096162