Spark学习笔记
一、简介
Lightning-fast cluster computing。
快如闪电的集群计算。
大规模快速通用的计算引擎。
速度:比hadoop 100x,磁盘计算快10x
使用:java / Scala /R /python
提供80+算子(操作符),容易构建并行应用。
通用:组合SQL ,流计算 + 复杂分析。
运行:Hadoop, Mesos, standalone, or in the cloud,local.
模块
- Spark core:核心模块。通用执行引擎,提供内存计算和对外部数据集的引用。
- Spark SQL:构建在core之上,引入新的抽象SchemaRDD,提供了结构化和半结构化支持。
- Spark Streaming:流计算。小批量计算,RDD.
- Spark MLlib:机器学习库。
- Spark graph:图计算
二、部署
本地开发环境版本,一定要和spark运行环境一致!!!!!
2.1 Local 模式
local 模式 :通过多线程模拟分布式计算。
部署流程
下载 spark-2.4.3-bin-hadoop2.7.tgz
解压
Code$tar -zxvf spark-2.4.3-bin-hadoop2.7.tgz -C /soft/
$ln -s spark-2.4.3-bin-hadoop2.7 spark配置环境变量
Code# spark
export SPARK_HOME=/soft/spark
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
$source /etc/profile验证spark
Code[wbw@s201 /soft/spark/bin]$./spark-shell
WebUI
Codehttp://s201:4040/
在IDEA中编写代码运行
注意:Spark2.4.3 对应 scala的SDK版本是2.11.12,以及pom依赖是spark-core_2.11。其他版本查看/soft/spark/jars/scala-compiler-X.jar和spark-core_X.jar。
添加MAVEN工程,添加SCALA支持,添加pom依赖
xml<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.3</version>
</dependency>编写代码并运行
scalapackage cn.wangbowen.spark.scala
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
// 创建spark配置对象.【本地模式】
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)
// 加载文本文件
val rdd = sc.textFile("D:\\tmp\\1.txt")
// 分割并炸开
rdd.flatMap(line => line.split(" "))
// 映射二元组
.map((_, 1))
// 根据Key分组聚合
.reduceByKey(_ + _)
// 返回数组结果
.collect()
// 打印结果
.foreach(println)
}
}
2.2 Standalone 模式
Standalone即Spark独立集群,由maste(分任务发到worker,相当于Yarn的RM)和worker(计算,返回结果。相当于Yarn的NM)组成。是Spark自带的资源调度(相当于Yarn)。因此可以脱离yarn独立存在,也可以理解为standalone替代了yarn的工作(后面会讲到spark on yarn)【也可以说是,MR作业交给yarn的RM,这里Spark作业交给Standalone的master】
部署流程
规划S201为MASTE节点,S202~S204为slave节点,在S201上以本地模式安装好spark。
配置master节点的slaves配置文件
Code[wbw@s201 /soft/spark/conf]$cp slaves.template slaves
$vi slaves
# 清空所有内容添加以下内容
s202
s203
s204可能要去spark-env.sh配置一下JAVA_HOME
分发到s202~s204,设置好连接以及,环境变量。
启动集群
Code[wbw@s201 /soft/spark/sbin]$./start-all.sh
注意:
要加上./,不然会与HDFS的start-all脚本冲突!
可能会遇到启动失败问题
Codes204: JAVA_HOME is not set
s204: full log in /soft/spark/logs/spark-wbw-org.apache.spark.deploy.worker.Worker-1-s204.out原因是:spark不能读取/etc/profile导致,在sbin/spark-env.sh里面配置JAVA_HOME:export JAVA_HOME=/soft/jdk
查看进程
Code[wbw@s201 /soft/spark/sbin]$xcall.sh jps
============= s201 : jps ==============
1737 QuorumPeerMain
2938 Jps
2811 Master
============= s202 : jps ==============
1824 Worker
1897 Jps
1309 QuorumPeerMain
============= s203 : jps ==============
1866 Worker
1915 Jps
1359 QuorumPeerMain
============= s204 : jps ==============
1810 Worker
1859 Jps
============= s205 : jps ==============
1526 Jps查看WEBUI
Codehttp://s201:8080/
2.4 集成Hadoop HA
使spark集群可以访问HDFS。
集成流程
复制core-site.xml *和 *hdfs-site.xml到spark/conf目录下
分发到所有集群节点
启动spark集群
启动spark-shell,连接spark集群上
Code$>spark-shell --master spark://s201:7077
$scala>sc.textFile("hdfs://mycluster/user/centos/test.txt").collect();
2.5 Spark master HA
master的高可用配置,和Hadoop一样如果只有一个NameNode会造成单点故障的问题。
只针对standalone和mesos集群部署情况。
使用zk连接多个master并存储state。
配置流程
配置spark/conf/spark-env.sh,在末尾添加
xmlexport SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=s201:2181,s202:2181,s203:2181 -Dspark.deploy.zookeeper.dir=/spark"
分发配置,重启spark集群
在s204上单独开启master
Code[wbw@s204 /soft/spark/sbin]$./start-master.sh
查看8080端口WEB,可以看到s201处于ALIVE,而s204处于STANDBY
杀掉s201上的master进程
再次通过WEB查看状态(可能要1~2分钟才切过去)
2.6 Yarn 模式
在yarn作为资源管理器的情况下,一个spark应用只是一个yarn下的应用而已,类似于mr一样,所以不需要启动master和worker。应用提交到yarn后,首先会申请一个container运行applicationmaster,而这个appmaster里运行的就是spark的driver,对应的executor也在container里运行。提交应用需要带上spark-assembly的jar包,包可以位于hdfs上,即使位于本地,也会被上传到hdfs进行分发。
部署流程
将spark的jars文件放到hdfs上
Code[wbw@s201 /soft/spark]$hdfs dfs -mkdir -p /spark/jars
[wbw@s201 /soft/spark]$hdfs dfs -put jars/* /spark/jars配置spark配置文件
/conf/spark-default.conf
Code$cp spark-defaults.conf.template spark-defaults.conf
$vi spark-defaults.conf直接在文件末尾添加如下内容(spark的jars在HDFS上的位置,也可以在提交的时候加–jars 以及 –files)
Codespark.yarn.jars=hdfs://mycluster/spark/jars/*
/conf/spark-env.sh
Code$cp spark-env.sh.template spark-env.sh
$vi spark-env.sh直接在文件末尾添加如下内容
Codeexport JAVA_HOME=/soft/jdk
export HADOOP_CONF_DIR=/soft/hadoop/etc/hadoop
分发spark-default.conf *和 *spark-env.sh 到所有节点上。重启spark集群。(这里只要提交作业的那个机器就可以了)
提交作业(master改为yarn,即作业交给yarn来管理)
cluster 模式
Code$spark-submit --class cn.wangbowen.spark.scala.SubmitDeployMode --master yarn --deploy-mode cluster hdfs://mycluster/demoJars/Spark-1.0-SNAPSHOT.jar
client 模式
Code$spark-submit --class cn.wangbowen.spark.scala.SubmitDeployMode --master yarn --deploy-mode client Spark-1.0-SNAPSHOT.jar
注意:如果是用虚拟机搭建,可能会由于虚拟机内存过小而导致启动失败,比如内存资源过小,yarn会直接kill掉进程导致rpc连接失败。所以,我们还需要配置Hadoop的yarn-site.xml文件,加入如下两项配置:
<property> |
2.7 mesos 模式
Spark客户端直接连接Mesos;不需要额外构建Spark集群。国内应用比较少,更多的是运用yarn调度。
2.8 启动脚本分析
start-all.sh:启动所有脚本
- sbin/spark-config.sh:读取配置文件
- sbin/start-master.sh:启动master进程
- sbin/spark-config.sh
- export JAVA_HOME 配置一些环境
- bin/load-spark-env.sh
- if [ -f “${SPARK_CONF_DIR}/spark-env.sh” ];
- sbin/spark-daemon.sh
- sbin/spark-config.sh
- sbin/start-slaves.sh:启动worker进程
- sbin/spark-config.sh
- bin/load-spark-env.sh
- sbin/slaves.sh
- sbin/start-slave.sh
- sbin/spark-config.sh
- bin/load-spark-env.sh
- sbin/spark-daemon.sh
三、快速入门
3.1 主要对象
- SparkContext:Spark程序的入口点,封装了整个spark运行环境的信息。
- RDD:resilient distributed dataset,弹性分布式数据集。等价于集合。
3.2 编程单词计数程序(Standalone模式)
3.2.1 Scala
添加scala编译插件(不然打出的jar包没有sacla的类)
xml<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<recompileMode>incremental</recompileMode>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>编写Scala文件
scalapackage cn.wangbowen.spark.scala
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
// 创建spark配置对象
val conf = new SparkConf().setAppName("WordCount")
// 本地模式
//conf.setMaster("local")
val sc = new SparkContext(conf)
// 加载文本文件
val rdd = sc.textFile(args(0))
// 分割并炸开
rdd.flatMap(line => line.split(" "))
// 映射二元组
.map((_, 1))
// 根据Key分组聚合
.reduceByKey(_ + _)
// 返回数组结果
.collect()
// 打印结果
.foreach(println)
}
}MAVEN打包,上传
创建一个文本文件,并上传到HDFS
Code$vi wordcount.txt
hello world
hello scala
hello spark
hello spark
$start-dfs.sh
$hdfs dfs -put wordcount.txt /tmp/运行作业(注意哪个namenode是激活态用哪个,这里是s201)
Code$spark-submit --master spark://s201:7077 --class cn.wangbowen.spark.scala.WordCount Spark-1.0-SNAPSHOT.jar hdfs://s201:8020/tmp/wordcount.txt
3.2.2 Java
编写代码
javapackage cn.wangbowen.spark.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
* WordCount class
*
* @author BoWenWang
* @date 2020/4/29 22:06
*/
public class WordCount {
public static void main(String[] args) {
// 创建SparkConf对象
SparkConf conf = new SparkConf();
conf.setAppName("WordCount");
// 创建JAVA SC
JavaSparkContext sc = new JavaSparkContext(conf);
// 加载文本
JavaRDD<String> rdd = sc.textFile(args[0]);
// 压扁。这里的泛型,第一个都是输入类型,后面跟输出类型
JavaRDD<String> rdd1 = rdd.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) throws Exception {
String[] words = s.split(" ");
List<String> list = new ArrayList<String>(Arrays.asList(words));
return list.iterator();
}
});
// 映射
JavaPairRDD<String, Integer> rdd2 = rdd1.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
});
// reduce
JavaPairRDD<String, Integer> rdd3 = rdd2.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
List<Tuple2<String, Integer>> collect = rdd3.collect();
for (Tuple2<String, Integer> tuple2 : collect) {
System.out.println(tuple2._1 + ": " + tuple2._2);
}
}
}MAVEN打包,上传
运行
Code$spark-submit --master spark://s201:7077 --class cn.wangbowen.spark.java.WordCount Spark-1.0-SNAPSHOT.jar hdfs://s201:8020/tmp/wordcount.txt
四、RDD 弹性分布式数据集
是spark的基本数据结构,是不可变数据集。RDD中的数据集进行逻辑分区,每个分区可以单独在集群节点进行计算。可以包含任何java,scala,python和自定义类型。
RDD是只读的记录分区集合。
RDD具有容错机制。
创建RDD方式:
- 并行化一个现有集合。
- 外部存储。
内存处理计算。在job间进行数据共享。内存的IO速率高于网络和disk的10 ~ 100之间(hadoop 花费90%时间用户rw)。
内部包含5个主要属性:
- 分区列表
- 针对每个split的计算函数。
- 对其他rdd的依赖列表
- 可选,如果是KeyValueRDD的话,可以带分区类。
- 可选,首选块位置列表(hdfs block location);
4.1 并发度
// 5. |
4.2 RDD 变换
返回指向新rdd的指针,在rdd之间创建依赖关系。每个rdd都有计算函数和指向父RDD的指针。
函数名 | 参数 | 解释 |
---|---|---|
map | (T) => V | 对每个元素进行变换,应用变换函数。即映射,将数据集中的每个元素进行变化出新的元素。 |
filter | (T) => Boolean | 过滤器。过滤掉返回false的数据。 |
flatMap | (T) => TraversableOnce[U] | 压扁。对于返回的一组数据,分别拆分成单个数据。 |
mapPartitions | Iterator[T] => Iterator[U] | 对每个分区进行应用变换,输入的Iterator,返回新的迭代器,可以对分区进行函数处理。迭代器内容是该分区里的数据集。 |
mapPartitionsWithIndex | (Int, Iterator[T]) => Iterator[U] | 同上,可以指定分区。 |
sample | withReplacement: Boolean,fraction: Double,seed: Long = {} | 采样。返回采样的RDD子集。 |
union | RDD[T] | 类似于mysql union操作【select * from persons where id < 10 union select * from id persons where id > 29 ;】 |
intersection | RDD[T] | 交集。提取两个rdd中都含有的元素。 |
distinct | 可选[numTasks] | 去重。去除重复的元素。 |
groupByKey | (K,V) => (K,Iterable[V]) | 根据Key分组,相同Key的值被放入一个迭代器。 |
reduceByKey | (V, V) => V | 按key聚合。 |
sortByKey | 排序 | |
join | RDD[T], [numTasks] | 连接。(K,V).join(K,W) =>(K,(V,W))。只会对两个RDD相同Key的数据进行合并成一个元组,一个RDD有,而另一个没有的,不会进行连接。 |
cogroup | RDD[T] | 协分组。对标JOIN,将相同KEY分到一组。(K,V).cogroup(K,W) =>(K, (Iterable[V], Iterable[W])) |
cartesian | RDD[T] | 笛卡尔积。RDD[T] RDD[U] => RDD[(T,U)] |
pipe | cmd:String | 将rdd的元素传递给脚本或者命令,执行结果返回形成新的RDD |
coalesce | numPartitions:Int | 减少分区 |
repartition | 可增可减 |
4.3 RDD 动作
一个RDD数据集经过RDD变换不会立即执行,只有遇到RDD动作的时候才会开始执行。
函数名 | 参数 | 解释 |
---|---|---|
collect | 收集rdd元素形成数组。 | |
count | 统计rdd元素的个数 | |
reduce | 聚合,返回一个值。 | |
first | 取出第一个元素take(1) | |
take | n:String | 取出前 N个元素 |
saveAsTextFile | path:String | 保存到文件 |
saveAsSequenceFile | path:String | 保存成序列文件 |
saveAsObjectFile | path:String | Java and Scala |
countByKey | 按照key,统计每个key下value的个数 |
五、Spark核心API
六、Spark任务提交流程
七、依赖
NarrowDependency: 子RDD的每个分区依赖于父RDD的少量分区。 |
八、持久化
九、共享变量
map(),filter()高级函数中访问的对象被串行化到各个节点。每个节点都有一份拷贝。变量值并不会回传到driver程序。spark通过广播变量和累加器实现共享变量。
9.1 广播变量
//创建广播变量 |
9.2 累加器
val ac1 = sc.longaccumulator("ac1") |
十、Spark SQL
10.1 Scala 版
10.1.1 创建 DataFrame 收据框
// 创建样例类 |
10.1.2 创建临时视图
// 根据DataFrame在内存中创建一个临时的视图 |
10.1.3 查询数据
通过SQL查询
// 也可以添加条件查询,返回DataFrame |
通过API查询
// 通过函数,返回Dataset(DataFram实际上就是Dataset) |
10.2 Java 版
导入依赖
<dependency> |
10.2.1 处理 json 数据
创建数据文件
{"id":1,"name":"tom1","age":11} |
编写代码
package cn.wangbowen.spark.java; |
输出结果
// 从文件读取的数据 |
10.2.2 DataSet 转 RDD
完整代码在上面JSON代码里
// DataSet 转 RDD (可以进行一些数据操作!) |
输出结果
[14,4,tom4] |
10.2.3 处理 jdbc 数据
添加依赖
<dependency> |
代码
package cn.wangbowen.spark.java; |
输出结果
// 数据库读取的内容 |
数据库查询结果
10.3 整合 Hive
10.3.1 linux环境
复制core-site.xml(hdfs) + hdfs-site.xml(hdfs) + hive-site.xml(hive)三个文件到spark/conf下。(分发到所有节点)
复制hive/lib下mysql驱动程序到/soft/spark/jars下(分发到所有节点)
启动spark-shell,指定启动模式
Code$spark-shell --master spark://s201:7077
# 创建HIVE表
scala> spark.sql("CREATE TABLE IF NOT EXISTS mydb.sparkHive(id int, name string, age int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE")
# 本地创建数据文件
1,tom1,11
2,tom2,12
3,tom3,13
4,tom4,14
5,tom5,15
# 加载数据到HIVE表
scala> spark.sql("load data local inpath 'file:///home/wbw/tmp/data.txt' into table mydb.sparkHive")
# 查看HIVE表
scala> spark.sql("select * from mydb.sparkHive").show
+----+----+----+
| id|name| age|
+----+----+----+
| 1|tom1| 11|
| 2|tom2| 12|
| 3|tom3| 13|
| 4|tom4| 14|
| 5|tom5| 15|
|null|null|null|
|null|null|null|
+----+----+----+
10.3.2 IDEA环境(JAVA)
复制core-site.xml(hdfs) + hdfs-site.xml(hdfs) + hive-site.xml(hive)三个文件到resources目录下
编码
javapackage cn.wangbowen.spark.java;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* SparkHive class
*
* @author BoWenWang
* @date 2020/5/5 15:59
*/
public class SparkHive {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("SQL-java-hive")
.config("spark.master", "local") // 本地模式
.enableHiveSupport() // 这个是关键!
.getOrCreate();
Dataset<Row> dataset = sparkSession.sql("select * from mydb.sparkHive");
dataset.show();
}
}
10.3.3 可能遇到的问题
Spark2.3.0集成hive3.1.1遇到的一个坑HikariCP
https://blog.csdn.net/weixin_44166276/article/details/85088998
TIP:我是修改了hive-site.xml中
datanucleus.connectionPoolingType改成dbcp。
然后关闭了版本验证hive.metastore.schema.verification
xml<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
IDEA下权限不足 Permission denied: user=BoWenWang, access=EXECUTE, inode=”/tmp”:wbw:supergroup:drwx——
# 哪个缺权限,就给哪个 |
10.4 SQL查询引擎
相当于嵌套了一层,通过JDBC的途径间接调用。
启动spark集群(完全分布式-standalone)
$>/soft/spark/sbin/start-all.sh master //201 worker //202 ~ 204
创建hive的数据表在默认库下。
$>hive -e "create table tt(id int,name string , age int) row format delimited fields terminated by ',' lines terminated by '\n' stored as textfile"
加载数据到hive表中.
$>hive -e "load data local inpath 'file:///home/centos/data.txt' into table tt" $>hive -e "select * from tt"
分发三个文件到所有worker节点
启动spark集群
$>soft/spark/sbin/start-all.sh
启动spark-shell
$>spark-shell --master spark://s201:7077
启动thriftserver服务器
$>start
10.5 拓展和总结
10.5.1 使用场景
处理结构化数据:
- 即席/普通查询文件中的数据
- 对流数据处理(SparkStreaming)
- 使用SQL的方式处理ETL
- 对外部数据库进行查询
10.5.2 加载数据
- 直接加载数据到DataFrame中
- 可以加载数据到RDD然后做相应的转换
- 可以从本地或云端加载数据
10.5.3 DataFrame函数 和 SQL
DataFrame = RDD + Schema:表格 = 数据 + 结构
DataFrame只是DataSet的Row类型的别名
DataFrame可以处理Text、JSON、parquet等(即外部数据源)
不管用DF的API还是SQL,最后生成的逻辑执行计划都是一样的,优化后效率都是一样的
10.5.4 Schema
- 隐式:JSON、parquet、ORC等自动推导
- 显示:
- 先创建RDD
- 然后创建一个schema
- 然后整合
10.5.5 加载和保存结果
SaveMode:存在报错、Append追加、Overwrite覆盖、不理睬
spark.read.format(“json”).load(“file:///…/…”)
df.write.format(“parquet”).mode(“overwrite”).save(“file:///…/…”)
如果保存到数据库的话:saveAsTable
还可以分区写出去df.toDF.write.partitionBy(“year”,”month”).avor(“…”)
10.5.6 SQL函数覆盖
支持了好多SQL语法
10.5.7 复杂JSON处理
数组情况
select name,nums[1] from json_table
多层嵌套可以用A.B
10.5.8 外部数据源
关系型数据库,需要JDBC jars
spark-packeages.org(就是format不一样.将原来JSON因为是自带的,可以直接写JSON,不然就要写对应的包。可以去那个网站上找)
spark.read.format(“com.databricks.spark.avro”).load(“…”)
十一、Spark Streaming
11.1 介绍
是spark core的扩展,针对实时数据流处理,具有可扩展、高吞吐量、容错。数据可以是来自于kafka,flume,tcpsocket,使用高级函数(map reduce filter ,join , windows),
处理的数据可以推送到database,hdfs,针对数据流处理可以应用到机器学习和图计算中。
内部,spark接受实时数据流,分成batch(分批次)进行处理,最终在每个batch终产生结果stream。
discretized stream or DStream
离散流,表示的是连续的数据流。
通过kafka、flume等输入数据流产生,也可以通过对其他DStream进行高阶变换产生。
在内部,DStream是表现为RDD序列。
StreamingContext
- 启动上下文之后,不能启动新的流或者添加新的
- 上下文停止后不能restart.
- 同一JVM只有一个active的streamingcontext
- 停止streamingContext会一同stop掉SparkContext,如若只停止StreamingContext.ssc.stop(false|true);
- SparkContext可以创建多个StreamingContext,创建新的之前停掉旧的。
11.2 快速入门(单词计数)
添加pom依赖
<dependency> |
11.2.1 Scala + 本地
代码
package cn.wangbowen.spark.scala |
运行步骤
开启nc服务器(【安装教程】https://blog.csdn.net/weixin_38842096/article/details/85720559)
Codecmd> nc -lL -p 9999
启动sparkStreaming程序
在nc命令行中输入
Codehello spark streaming
查看IDEA控制台输出
Code-------------------------------------------
Time: 1588831126000 ms
-------------------------------------------
(hello,1)
(streaming,1)
(spark,1)
11.2.2 Java + 集群
代码
package cn.wangbowen.spark.java; |
运行步骤
打jar包,上传导集群
在s201打开nc服务器
Codenc -l -p 9999
提交作业
Code$spark-submit --class cn.wangbowen.spark.java.SparkStreamingWordCountJava Spark-1.0-SNAPSHOT.jar
等待启动成功后,在NC中输入
Codehello world count
查看运行窗口
Code-------------------------------------------
Time: 1588833156000 ms
-------------------------------------------
(hello,1)
(world,1)
(count,1)
11.3 DStream 和 Receiver
11.3.1 DSteam
11.3.2 Receiver
- 介绍:Receiver是接受者,从source接受数据,存储在内存中共spark处理。
- 源
- 基本源:fileSystem | socket,内置API支持。
- 高级源:kafka | flume | …,需要引入pom.xml依赖.
- 注意:使用local模式时,不能使用一个线程.使用的local[n],n需要大于receiver的个数。
11.4 Kafka 集成
确保ZK集群开启
启动Kafka,在S202~S204分别运行
Code$cd /soft/kafka
$bin/kafka-server-start.sh -daemon config/server.properties创建kafka主题,查看主题列表
Code$kafka-topics.sh --create --bootstrap-server s202:9092 --replication-factor 2 --partitions 2 --topic mytopic1
# 查看主题列表
$kafka-topics.sh --list --bootstrap-server s202:9092导入依赖
xml<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.3</version>
</dependency>编写代码
javapackage cn.wangbowen.spark.java;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
import java.util.*;
/**
* SparkStreamingWordCountJava class
*
* @author BoWenWang
* @date 2020/5/7 14:03
*/
public class SparkStreamingWordCountJavaForKafka {
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf().setAppName("WC-Streaming-Java-Kafka").setMaster("local[2]");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(2));
// Kafka连接配置
Map<String, Object> kafkaParams = new HashMap<>(8);
kafkaParams.put("bootstrap.servers", "s202:9092,s203:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "g6");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
// 订阅主题列表
Collection<String> topics = Collections.singletonList("mytopic1");
// 数据源
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
jsc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
// 压扁操作(消费数据源)
JavaDStream<String> words = stream.flatMap(new FlatMapFunction<ConsumerRecord<String, String>, String>() {
public Iterator<String> call(ConsumerRecord<String, String> consumerRecord) throws Exception {
// consumerRecord.key(): kafka消息由K-V组成,这里是控制台发送所以Key为null
String[] words = consumerRecord.value().split(" ");
return Arrays.asList(words).iterator();
}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});
JavaPairDStream<String, Integer> count = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
count.print();
jsc.start();
jsc.awaitTermination();
}
}开启Kafka控制台生产者
Code$kafka-console-producer.sh --broker-list s202:9092 --topic mytopic1
运行程序(Run)
在生产者控制台发送
Code>hello spark streaming for kafka
观察IDEA控制台输出
Code-------------------------------------------
Time: 1588835416000 ms
-------------------------------------------
(hello,1)
(streaming,1)
(kafka,1)
(spark,1)
(for,1)
11.5 状态更新
11.5.1 updateStateByKey
可以在指定的批次间隔内返回之前的全部历史数据,包括新增的,改变的和没有改变的。由于updateStateByKey在使用的时候一定要做checkpoint,当数据量过大的时候,checkpoint会占据庞大的数据量,会影响性能,效率不高。
适用场景
updateStateByKey可以用来统计历史数据。例如统计不同时间段用户平均消费金额,消费次数,消费总额,网站的不同时间段的访问量等指标。
11.5.2 mapWithState
只返回变化后的key的值,这样做的好处是,我们可以只是关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key的数据。这样的话,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储,效率比较高(再生产环境中建议使用这个)。
适用场景
mapWithState可以用于一些实时性较高,延迟较少的一些场景,例如你在某宝上下单买了个东西,付款之后返回你账户里的余额信息。
版权声明:本文为CSDN博主「爱是与世界平行」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/an1090239782/article/details/102832444
11.5.3 代码示例
代码
package cn.wangbowen.spark.java; |
运行步骤
打开nc,间隔一段时间发送一次数据
CodeC:\Users\XXX>nc -lL -p 9999
hello
hello
hello
hello查看IDEA控制台打印信息
Code// count.print(); 当前窗口统计值
-------------------------------------------
Time: 1588837904000 ms
-------------------------------------------
(hello,1)
// historyCount.print(); 历史累积值
-------------------------------------------
Time: 1588837904000 ms
-------------------------------------------
(hello,4)
11.6 窗口化操作
batch interval : 批次的间隔。
windows length : 窗口长度,跨批次。是批次的整数倍。
slide interval : 滑动间隔,窗口计算的间隔时间,是批次interval的整倍数。
比如:
batch interval = 2,windows length = 6,slide interval =4
统计6秒内的热词,每2秒计算一次接收到的数据,每4秒刷新一次。
代码
package cn.wangbowen.spark.java; |
运行过程
开启nc
运行程序
发送数据(每隔1秒发一次)
CodeC:\Users\XXX>nc -lL -p 9999
1
2
3
4
5
6
7
8
9
10
11
12
13查看输出结果(每次输出间隔 4s = 4000ms,每2秒计算一次,用[]表示2秒的数据,显示6秒内的数据)
Code-------------------------------------------
Time: 1588839240000 ms
-------------------------------------------
-------------------------------------------
Time: 1588839244000 ms
-------------------------------------------
(2,1)
(1,1)
-------------------------------------------
Time: 1588839248000 ms
-------------------------------------------
(4,1)
(6,1)
(2,1)
(5,1)
(3,1)
(1,1)
-------------------------------------------
Time: 1588839252000 ms
-------------------------------------------
(8,1)
(6,1)
(7,1)
(5,1)
(9,1)
-------------------------------------------
Time: 1588839256000 ms
-------------------------------------------
(12,1)
(13,1)
(9,1)
(11,1)
(10,1)
-------------------------------------------
Time: 1588839260000 ms
13
-------------------------------------------
(13,1)
11.7 容错处理
11.7.1 生产环境中spark streaming的job的注意事项
- 避免单点故障
- Driver:驱动,运行用户编写的程序代码的主机。
- Excutors:执行的spark driver提交的job,内部含有附加组件比如receiver。receiver接受数据并以block方式保存在memory中,同时,将数据块复制到其他executor中,已备于容错。每个批次末端会形成新的DStream,交给下游处理。如果receiver故障,其他执行器中的receiver会启动进行数据的接收。
11.7.2 spark streaming中的容错实现
如果executor故障,所有未被处理的数据都会丢失,解决办法可以通过wal(hbase,hdfs/WALs)方式将数据预先写入到hdfs或者s3.
如果Driver故障,driver程序就会停止,所有executor都是丢失连接,停止计算过程。解决办法需要配置和编程。
流程
配置Driver程序自动重启,使用特定的clustermanager实现。
重启时,从宕机的地方进行重启,通过检查点机制可以实现该功能。
Code// 设置检查点目录可以是本地,可以是hdfs.
jsc.checkpoint("d://....");
// 不再使用new方式创建SparkStreamContext对象,而是通过工厂方式.JavaStreamingContext.getOrCreate()方法创建上下文对象,首先会检查检查点目录,看是否有job运行,没有就new新的。
代码
package cn.wangbowen.spark.java; |
运行步骤
开启NC
运行程序
发送数据
CodeC:\Users\BoWenWang>nc -lL -p 9999
hello
hello
hello
hello
hello观察控制台输出
Code-------------------------------------------
Time: 1588844410000 ms
-------------------------------------------
(hello,5)直接杀掉程序(日志)
Code20/05/07 17:44:20 INFO CheckpointWriter: Deleting file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844650000
20/05/07 17:44:20 INFO CheckpointWriter: Checkpoint for time 1588844660000 ms saved to file 'file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844660000', took 4337 bytes and 23 ms
20/05/07 17:44:20 INFO DStreamGraph: Clearing checkpoint data for time 1588844660000 ms
20/05/07 17:44:20 INFO DStreamGraph: Cleared checkpoint data for time 1588844660000 ms
20/05/07 17:44:20 INFO ReceivedBlockTracker: Deleting batches: 1588844638000 ms
20/05/07 17:44:20 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Attempting to clear 0 old log files in file:/d:/tmp/ReStartCheckPoint/receivedBlockMetadata older than 1588844640000:
20/05/07 17:44:20 INFO InputInfoTracker: remove old batch metadata: 1588844638000 ms重启程序(检查日志)
CodeUsing Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/05/07 17:45:48 INFO CheckpointReader: Checkpoint files found: file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844660000,file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844660000.bk,file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844658000,file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844658000.bk,file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844656000,file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844656000.bk,file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844654000,file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844654000.bk,file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844652000,file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844652000.bk
20/05/07 17:45:48 INFO CheckpointReader: Attempting to load checkpoint from file file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844660000
20/05/07 17:45:50 INFO Checkpoint: Checkpoint for time 1588844660000 ms validated
20/05/07 17:45:50 INFO CheckpointReader: Checkpoint successfully loaded from file file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844660000
20/05/07 17:45:50 INFO CheckpointReader: Checkpoint was generated at time 1588844660000 ms观察控制台输出(看到恢复了)
Code-------------------------------------------
Time: 1588844660000 ms
-------------------------------------------
(hello,5)
11.8 将结果写入MySQL
利用11.2.1的代码,其中count.print()替换成输出到数据库即可:
// 先定义一个函数获取连接 |
十二、SparkApp 部署模式
决定spark作业入口程序的地方,Driver驱动。
spark-submit --class xxx xx.jar --deploy-mode (client | cluster) |
–deploy-mode:指定是否部署driver程序在worker节点上还是在client主机上。不论哪种方式,rdd的运算都在worker执行
12.1 闭包
RDD,resilient distributed dataset,弹性(容错)分布式数据集。
分区列表,function,dep Option(分区类, Pair[Key,Value]),首选位置。
运行job时,spark将rdd打碎变换成task,每个task由一个executor执行。执行之前,spark会进行task的闭包(closure)计算。闭包是指针对executor可见的变量和方法,以备在rdd的foreach中进行计算。闭包就是串行化后并发送给每个executor.
local模式下,所有spark程序运行在同一JVM中,共享对象,counter(变量)是可以累加的。原因是所有executor指向的是同一个引用。
cluster模式下,不可以,counter是闭包处理的。每个节点对driver上的counter是不可见的。只能看到自己内部串行化的counter副本。
12.2 client
driver运行在client主机上。client可以不在cluster中。
验证
启动spark集群
编程
scalapackage cn.wangbowen.spark.scala
import java.net.{InetAddress, Socket}
import org.apache.spark.{SparkConf, SparkContext}
object SubmitDeployMode {
// 打印消息
def printfInfo(str:String): Unit = {
val ip = InetAddress.getLocalHost.getHostAddress
val socket = new Socket("192.168.174.205", 9999)
val out = socket.getOutputStream
out.write((ip + ": " + str + "\r\n").getBytes())
out.flush()
socket.close()
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Submit-Deploy-Mode")
val sc = new SparkContext(conf)
printfInfo("Driver running on this node!")
val rdd = sc.parallelize(1 to 10, 3)
val rdd2 = rdd.map(num => {
printfInfo("map => " + num)
num * 2
})
val count = rdd2.reduce((num1, num2) => {
printfInfo("reduce => " + num1 + "," + num2)
num1 + num2
})
printfInfo("count: " + count)
}
}随便找一台机子,开启nc(这里选s205)。如果没装。用命令 $sudo yum install nc -y 安装。
Codenc -lk 9999
打jar包上传到s201并执行
Code$spark-submit --class cn.wangbowen.spark.scala.SubmitDeployMode --master spark://s201:7077 --deploy-mode client Spark-1.0-SNAPSHOT.jar
打印结果
Code192.168.174.201: Driver running on this node! # 入口程序在s201上
192.168.174.203: map => 1
192.168.174.203: map => 2
192.168.174.203: reduce => 2,4
192.168.174.203: map => 3
192.168.174.203: reduce => 6,6
192.168.174.203: map => 4
192.168.174.203: map => 5
192.168.174.203: reduce => 8,10
192.168.174.203: map => 6
192.168.174.203: reduce => 18,12
192.168.174.201: reduce => 12,30
192.168.174.203: map => 7
192.168.174.203: map => 8
192.168.174.203: reduce => 14,16
192.168.174.203: map => 9
192.168.174.203: reduce => 30,18
192.168.174.203: map => 10
192.168.174.203: reduce => 48,20
192.168.174.201: reduce => 42,68
192.168.174.201: count: 110 # 最终结果返回client节点s201将jar包上传到s202并执行
Code$spark-submit --class cn.wangbowen.spark.scala.SubmitDeployMode --master spark://s201:7077 --deploy-mode client Spark-1.0-SNAPSHOT.jar
打印结果
Code192.168.174.202: Driver running on this node! # 入口程序在s201上
192.168.174.203: map => 1
192.168.174.203: map => 2
192.168.174.203: reduce => 2,4
192.168.174.203: map => 3
192.168.174.203: reduce => 6,6
192.168.174.204: map => 4
192.168.174.204: map => 5
192.168.174.204: reduce => 8,10
192.168.174.203: map => 7
192.168.174.204: map => 6
192.168.174.203: map => 8
192.168.174.204: reduce => 18,12
192.168.174.203: reduce => 14,16
192.168.174.203: map => 9
192.168.174.203: reduce => 30,18
192.168.174.203: map => 10
192.168.174.203: reduce => 48,20
192.168.174.202: reduce => 12,68
192.168.174.202: reduce => 80,30
192.168.174.202: count: 110 # 最终结果返回client节点s202
12.3 cluster
driver程序提交给spark cluster的某个worker节点来执行。worker是cluster中的一员。导出的jar需要放置到所有worker节点都可见的位置(如hdfs)才可以。
上传jar包到hdfs
Code$hdfs dfs -mkdir /demoJars
$hdfs dfs -put Spark-1.0-SNAPSHOT.jar /demoJars提交任务(注意模式的改变deply-mdoe和hdfs路径下的jar)
Code$spark-submit --class cn.wangbowen.spark.scala.SubmitDeployMode --master spark://s201:7077 --deploy-mode cluster hdfs://mycluster/demoJars/Spark-1.0-SNAPSHOT.jar
查看结果(发现在s201上提交,结果Driver在s202)
Code192.168.174.202: Driver running on this node!
192.168.174.203: map => 1
192.168.174.203: map => 2
192.168.174.203: reduce => 2,4
192.168.174.203: map => 3
192.168.174.203: reduce => 6,6
192.168.174.203: map => 7
192.168.174.203: map => 8
192.168.174.203: reduce => 14,16
192.168.174.203: map => 9
192.168.174.203: reduce => 30,18
192.168.174.203: map => 10
192.168.174.203: reduce => 48,20
192.168.174.202: reduce => 12,68
192.168.174.204: map => 4
192.168.174.204: map => 5
192.168.174.204: reduce => 8,10
192.168.174.204: map => 6
192.168.174.204: reduce => 18,12
192.168.174.202: reduce => 80,30
192.168.174.202: count: 110再次提交(发现Driver又变了,在s204上)
Code192.168.174.204: Driver running on this node!
192.168.174.203: map => 1
192.168.174.203: map => 2
192.168.174.203: reduce => 2,4
192.168.174.203: map => 3
192.168.174.203: reduce => 6,6
192.168.174.202: map => 4
192.168.174.202: map => 5
192.168.174.202: reduce => 8,10
192.168.174.202: map => 6
192.168.174.202: reduce => 18,12
192.168.174.202: map => 7
192.168.174.202: map => 8
192.168.174.202: reduce => 14,16
192.168.174.202: map => 9
192.168.174.202: reduce => 30,18
192.168.174.202: map => 10
192.168.174.202: reduce => 48,20
192.168.174.204: reduce => 30,68
192.168.174.204: reduce => 98,12
192.168.174.204: count: 110
十三、Spark集成Hive查询Hbase
假设已经按照之前的步骤HIVE集成了HBASE了。此时HBase中有一个表有两条记录,而Hive关联了HBase的表,且表名为t1。
13.1 local 模式 + spark-shell
复制hive的hive-hbase-handler-2.1.0.jar文件到spark/jars目录下。
Code[wbw@s201 /soft/hive/lib]$cp hive-hbase-handler-3.1.2.jar /soft/spark/jars/
复制hive/下的metrics的jar文件到spark下
Code$>cd /soft/hive/lib
$>ls | grep metrics | cp `xargs` /soft/spark/jars由于之间hive集成hbase时候修改了hive-site.xml这里,重新导入
Code$cp hive-site.xml /soft/spark/conf/
拷贝hbase的包到spark/jars下(这部有争议,先跳过,如果有问题先看是不是13.4中的,没办法才全部导入hbase的相关包)
启动spark-shell 本地模式测试
Code$spark-shell --master local[4]
$scala>spark.sql("select * from t1").show
+----+----+---+
| key|name| id|
+----+----+---+
|row1| tom|100|
|row2|toms| 18|
+----+----+---+如果报错,可能是版本验证问题和数据库连接问题。跟之前一样参考10.3.3。其他缺包等问题参考13.4
13.2 standalone 模式 + spark-shell
在spark集群上分发13.1 (1)模式下所有需要的jar包。
启动standalone模式,spark集群。
Code[wbw@s201 /soft/spark/sbin]$./start-all.sh
启动spark-shell
Code$spark-shell --master spark://s201:7077
scala> spark.sql("select * from t1").show
+----+----+---+
| key|name| id|
+----+----+---+
|row1| tom|100|
|row2|toms| 18|
+----+----+---+
13.3 IDEA编程访问
导依赖
xml<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-hbase-handler</artifactId>
<version>3.1.2</version>
</dependency>MAVEN发生错误?无法打包?
不过去掉hive-hbase-handler后,打包。在standalone下可以运行。
13.4 整合常见问题
https://www.wandouip.com/t5i62606/
问题一
Codespark 执行时报java.lang.ClassNotFoundException: org.apache.htrace.core.HTraceConfiguration
解决方案
Code将Hadoop中的htrace-core4-4.1.0-incubating.jar放入spark/jars下:
[wbw@s201 /soft/hadoop-3.1.2/share/hadoop/common/lib]$cp htrace-core4-4.1.0-incubating.jar /soft/spark/jars/问题二
Codespark 执行时报java.lang.ClassNotFoundException: org.apache.hadoop.hbase.util.Bytes
解决方案
Code将hbase/lib下的hbase-common-2.2.4.jar放到spark/jars下
十四、优化
14.1 存储格式的选择
在sparkSQL中可以使用 parquet
:一种文件格式类型,可以按照这个读取写出文件。
14.2 压缩格式
14.3 代码优化
选择高性能算子
在将数据写入到数据库中的时候
DataFrame变量.foreachPartition
每一次对每一个分区进行插入。不要一条记录插一次
先关掉自动提交,然后批量插入。最后commit。
复用已有数据
通用的dataframe,可以XXX.cache(),不用了再XXX.unpersist(ture)
14.4 参数优化
并行度
spark.sql.shuffle.partitions
分区字段类型推测
spark.sql.sources.partionColumnTypeInference.enabled