Spark部署与快速入门
环境准备
Spark: 3.3.2
安装Spark
从Spark官网下载好安装包
1
tar -zxvf spark-3.3.2-bin-hadoop3.tgz -C /mysoft/
配置环境变量
1 | #Spark enviroment variables |
1 | source /etc/profile |
Standalone集群模式
1 | cd $SPARK_HOME |
配置spark-env.sh
1 | cp spark-env.sh.template spark-env.sh |
1 | export JAVA_HOME=/usr/local/jdk1.8.0_341 |
虚拟机内存默认2G+,若比较小,需要把worker的内存和executor内存降低成900M或者更低
1 | export SPARK_EXECUTOR_MEMORY=512M |
配置workers
1 | cp workers.template workers |
1 | hsq01 |
把localhost删掉
将Spark目录及Spark环境分发到其他节点
1 | for i in {2..3}; do scp -r /mysoft/spark-3.3.2-bin-hadoop3/ hsq0$i:/mysoft/;done |
启动Spark集群(standalone模式)
1 | cd $SPARK_HOME |
使用 hsq01:8081 在 web 界面查看

启用HA功能
修改`spark-env.sh`把export SPARK_MASTER_HOST=hsq01和export SPARK_MASTER_PORT=7077前面加#注释掉
添加如下内容1
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hsq01:2181,hsq02:2181,hsq03:2181 -Dspark.deploy.zookeeper.dir=/spark"
1 | for i in {2..3}; do scp /mysoft/spark-3.3.2-bin-hadoop3/conf/spark-env.sh hsq0$i:/mysoft/spark-3.3.2-bin-hadoop3/conf/spark-env.sh;done |
1 | sbin/stop-all.sh |
三个节点jps查看是否有QuorumPeerMain
否则重启Zookeeper
1 | zkCli.sh |

1 | cd $SPARK_HOME |


测试hsq01的master进程宕机(此时hsq02是备用主机)1
2jps
kill -9 (master端口号)
不断刷新浏览器页面,此时hsq01的页面已经无法打开,hsq02的要多刷新几次(等一会儿)
也就是此时hsq02(备用主机)完全继承了原先主机hsq01的功能!
1 | cd $SPARK_HOME |
发现无论怎么刷新网页都是回不去的,也就是说此时原先的备用主机(hsq02 STANDBY)变成了实质上的主机,而原先的主机(hsq01 ALIVE)变成了实质上的备用主机,继承的功能并不会随原主机的恢复而还回去!这就是spark集群的特点。
例程——程蒙特卡洛计算圆周率
1 | bin/spark-submit --master spark://hsq01:7077,hsq02:7077,hsq03:7077 --class org.apache.spark.examples.SparkPi /mysoft/spark-3.3.2-bin-hadoop3/examples/jars/spark-examples_2.12-3.3.2.jar 10000 |

在网页上查看(谁是现在实质上的主机master就观察谁)
Quick Start
使用Spark-Shell进行交互式分析
基本使用
Spark’s shell provides a simple way to learn the API, as well as a powerful tool to analyze data interactively.
1 | cd $SPARK_HOME |

Spark’s primary abstraction is a distributed collection of items called a Dataset. Datasets can be created from Hadoop InputFormats (such as HDFS files) or by transforming other Datasets. Let’s make a new Dataset from the text of the README file in the Spark source directory:
1 | val textFile = spark.read.textFile("file:///mysoft/spark-3.3.2-bin-hadoop3/README.md") |

You can get values from Dataset directly, by calling some actions, or transform the Dataset to get a new one. For more details, please read the API doc.
1 | textFile.count() //Number of items in this Dataset |
1 | textFile.first() // First item in this Dataset |
Now let’s transform this Dataset into a new one. We call filter to return a new Dataset with a subset of the items in the file.
1 | val linesWithSpark = textFile.filter(line => line.contains("Spark")) |
We can chain together transformations and actions(转换和操作可以一起):
1 | textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"? |
更多Dataset操作
Dataset actions and transformations can be used for more complex computations. If we want to find the line with the most words:
1 | textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) |
This first maps a line to an integer value, creating a new Dataset.
reduce is called on that Dataset to find the largest word count.
The arguments to map and reduce are Scala function literals (closures)(Scala语言(闭包)), and can use any language feature or Scala/Java library.
For example, we can easily call functions declared elsewhere. We’ll use Math.max() function to make this code easier to understand:
1 | import java.lang.Math |
One common data flow pattern is MapReduce, as popularized by Hadoop. Spark can implement MapReduce flows easily:1
val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
Here, we call flatMap to transform a Dataset of lines to a Dataset of words, and then combine groupByKey and count to compute the per-word counts in the file as a Dataset of (String, Long) pairs.
To collect the word counts in our shell, we can call collect:
1 | wordCounts.collect() |
Caching(缓存)
Spark also supports pulling data sets into a cluster-wide in-memory cache. This is very useful when data is accessed repeatedly, such as when querying a small “hot” dataset or when running an iterative algorithm(迭代算法) like PageRank. As a simple example, let’s mark our linesWithSpark dataset to be cached:
1 | linesWithSpark.cache() |
It may seem silly to use Spark to explore and cache a 100-line text file. The interesting part is that these same functions can be used on very large data sets, even when they are striped across tens or hundreds of nodes. You can also do this interactively(交互) by connecting bin/spark-shell to a cluster, as described in the RDD programming guide.
创建属于自己的应用
Suppose we wish to write a self-contained application using the Spark API. We will walk through a simple application in Scala (with sbt), Java (with Maven), and Python (pip).
This example will use Maven to compile an application JAR, but any similar build system will work.
This program just counts the number of lines containing ‘a’ and the number containing ‘b’ in the Spark README.md.
Unlike the earlier examples with the Spark shell, which initializes its own SparkSession, we initialize a SparkSession as part of the program.
To build the program, we also write a Maven pom.xml file that lists Spark as a dependency. Note that Spark artifacts(构件) are tagged with a Scala version.
1 | <dependencies> |
1 | package com.hsq; |
Now, we can package the application using Maven and execute it with ./bin/spark-submit.
1 | bin/spark-submit --master spark://hsq01:7077,hsq02:7077,hsq03:7077 --class "com.hsq.SimpleApp" testSpark-1.0-SNAPSHOT.jar |

多元的选择
最后,Spark在示例目录中包含几个示例(Scala,Java,Python,R)。可以按如下方式运行它们:1
2
3
4
5
6
7
8For Scala and Java, use run-example:
./bin/run-example SparkPi
For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py
For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R(执行不了)

