avatar

目录
Spark学习笔记

Spark学习笔记

一、简介

Lightning-fast cluster computing。

快如闪电的集群计算。

大规模快速通用的计算引擎。

速度:比hadoop 100x,磁盘计算快10x

使用:java / Scala /R /python

​ 提供80+算子(操作符),容易构建并行应用。

通用:组合SQL ,流计算 + 复杂分析。

运行:Hadoop, Mesos, standalone, or in the cloud,local.

模块

  • Spark core:核心模块。通用执行引擎,提供内存计算和对外部数据集的引用。
  • Spark SQL:构建在core之上,引入新的抽象SchemaRDD,提供了结构化和半结构化支持。
  • Spark Streaming:流计算。小批量计算,RDD.
  • Spark MLlib:机器学习库。
  • Spark graph:图计算

二、部署

本地开发环境版本,一定要和spark运行环境一致!!!!!

2.1 Local 模式

local 模式 :通过多线程模拟分布式计算。

部署流程

  1. 下载 spark-2.4.3-bin-hadoop2.7.tgz

  2. 解压

    Code
    $tar -zxvf spark-2.4.3-bin-hadoop2.7.tgz -C /soft/
    $ln -s spark-2.4.3-bin-hadoop2.7 spark
  3. 配置环境变量

    Code
    # spark
    export SPARK_HOME=/soft/spark
    export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

    $source /etc/profile
  4. 验证spark

    Code
    [wbw@s201 /soft/spark/bin]$./spark-shell
  5. WebUI

    Code
    http://s201:4040/
  6. 在IDEA中编写代码运行

    注意:Spark2.4.3 对应 scala的SDK版本是2.11.12,以及pom依赖是spark-core_2.11。其他版本查看/soft/spark/jars/scala-compiler-X.jar和spark-core_X.jar。

    1. 添加MAVEN工程,添加SCALA支持,添加pom依赖

      xml
      <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.4.3</version>
      </dependency>
    2. 编写代码并运行

      scala
      package cn.wangbowen.spark.scala

      import org.apache.spark.{SparkConf, SparkContext}

      object WordCount {
      def main(args: Array[String]): Unit = {
      // 创建spark配置对象.【本地模式】
      val conf = new SparkConf().setAppName("WordCount").setMaster("local")

      val sc = new SparkContext(conf)
      // 加载文本文件
      val rdd = sc.textFile("D:\\tmp\\1.txt")
      // 分割并炸开
      rdd.flatMap(line => line.split(" "))
      // 映射二元组
      .map((_, 1))
      // 根据Key分组聚合
      .reduceByKey(_ + _)
      // 返回数组结果
      .collect()
      // 打印结果
      .foreach(println)
      }
      }

2.2 Standalone 模式

Standalone即Spark独立集群,由maste(分任务发到worker,相当于Yarn的RM)和worker(计算,返回结果。相当于Yarn的NM)组成。是Spark自带的资源调度(相当于Yarn)。因此可以脱离yarn独立存在,也可以理解为standalone替代了yarn的工作(后面会讲到spark on yarn)【也可以说是,MR作业交给yarn的RM,这里Spark作业交给Standalone的master】

部署流程

  1. 规划S201为MASTE节点,S202~S204为slave节点,在S201上以本地模式安装好spark。

  2. 配置master节点的slaves配置文件

    Code
    [wbw@s201 /soft/spark/conf]$cp slaves.template slaves
    $vi slaves

    # 清空所有内容添加以下内容
    s202
    s203
    s204
  3. 可能要去spark-env.sh配置一下JAVA_HOME

  4. 分发到s202~s204,设置好连接以及,环境变量。

  5. 启动集群

    Code
    [wbw@s201 /soft/spark/sbin]$./start-all.sh

    注意:

    1. 要加上./,不然会与HDFS的start-all脚本冲突!

    2. 可能会遇到启动失败问题

      Code
      s204:   JAVA_HOME is not set
      s204: full log in /soft/spark/logs/spark-wbw-org.apache.spark.deploy.worker.Worker-1-s204.out

      原因是:spark不能读取/etc/profile导致,在sbin/spark-env.sh里面配置JAVA_HOME:export JAVA_HOME=/soft/jdk

  6. 查看进程

    Code
    [wbw@s201 /soft/spark/sbin]$xcall.sh jps
    ============= s201 : jps ==============
    1737 QuorumPeerMain
    2938 Jps
    2811 Master
    ============= s202 : jps ==============
    1824 Worker
    1897 Jps
    1309 QuorumPeerMain
    ============= s203 : jps ==============
    1866 Worker
    1915 Jps
    1359 QuorumPeerMain
    ============= s204 : jps ==============
    1810 Worker
    1859 Jps
    ============= s205 : jps ==============
    1526 Jps
  7. 查看WEBUI

    Code
    http://s201:8080/

2.4 集成Hadoop HA

使spark集群可以访问HDFS。

集成流程

  1. 复制core-site.xml *和 *hdfs-site.xml到spark/conf目录下

  2. 分发到所有集群节点

  3. 启动spark集群

  4. 启动spark-shell,连接spark集群上

    Code
    $>spark-shell --master spark://s201:7077
    $scala>sc.textFile("hdfs://mycluster/user/centos/test.txt").collect();

2.5 Spark master HA

master的高可用配置,和Hadoop一样如果只有一个NameNode会造成单点故障的问题。

只针对standalone和mesos集群部署情况。

使用zk连接多个master并存储state。

配置流程

  1. 配置spark/conf/spark-env.sh,在末尾添加

    xml
    export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=s201:2181,s202:2181,s203:2181 -Dspark.deploy.zookeeper.dir=/spark"
  2. 分发配置,重启spark集群

  3. 在s204上单独开启master

    Code
    [wbw@s204 /soft/spark/sbin]$./start-master.sh

    查看8080端口WEB,可以看到s201处于ALIVE,而s204处于STANDBY

    avatar

    avatar

  4. 杀掉s201上的master进程

  5. 再次通过WEB查看状态(可能要1~2分钟才切过去)

    avatar

2.6 Yarn 模式

在yarn作为资源管理器的情况下,一个spark应用只是一个yarn下的应用而已,类似于mr一样,所以不需要启动master和worker。应用提交到yarn后,首先会申请一个container运行applicationmaster,而这个appmaster里运行的就是spark的driver,对应的executor也在container里运行。提交应用需要带上spark-assembly的jar包,包可以位于hdfs上,即使位于本地,也会被上传到hdfs进行分发。

部署流程

  1. 将spark的jars文件放到hdfs上

    Code
    [wbw@s201 /soft/spark]$hdfs dfs -mkdir -p /spark/jars
    [wbw@s201 /soft/spark]$hdfs dfs -put jars/* /spark/jars
  2. 配置spark配置文件

    • /conf/spark-default.conf

      Code
      $cp spark-defaults.conf.template spark-defaults.conf
      $vi spark-defaults.conf

      直接在文件末尾添加如下内容(spark的jars在HDFS上的位置,也可以在提交的时候加–jars 以及 –files)

      Code
      spark.yarn.jars=hdfs://mycluster/spark/jars/*
    • /conf/spark-env.sh

      Code
      $cp spark-env.sh.template spark-env.sh
      $vi spark-env.sh

      直接在文件末尾添加如下内容

      Code
      export JAVA_HOME=/soft/jdk
      export HADOOP_CONF_DIR=/soft/hadoop/etc/hadoop
  3. 分发spark-default.conf *和 *spark-env.sh 到所有节点上。重启spark集群。(这里只要提交作业的那个机器就可以了)

  4. 提交作业(master改为yarn,即作业交给yarn来管理)

    • cluster 模式

      Code
      $spark-submit --class cn.wangbowen.spark.scala.SubmitDeployMode --master yarn --deploy-mode cluster hdfs://mycluster/demoJars/Spark-1.0-SNAPSHOT.jar
    • client 模式

      Code
      $spark-submit --class cn.wangbowen.spark.scala.SubmitDeployMode --master yarn --deploy-mode client Spark-1.0-SNAPSHOT.jar

注意:如果是用虚拟机搭建,可能会由于虚拟机内存过小而导致启动失败,比如内存资源过小,yarn会直接kill掉进程导致rpc连接失败。所以,我们还需要配置Hadoop的yarn-site.xml文件,加入如下两项配置:

xml
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>

2.7 mesos 模式

Spark客户端直接连接Mesos;不需要额外构建Spark集群。国内应用比较少,更多的是运用yarn调度。

2.8 启动脚本分析

start-all.sh:启动所有脚本

  • sbin/spark-config.sh:读取配置文件
  • sbin/start-master.sh:启动master进程
    • sbin/spark-config.sh
      • export JAVA_HOME 配置一些环境
    • bin/load-spark-env.sh
      • if [ -f “${SPARK_CONF_DIR}/spark-env.sh” ];
    • sbin/spark-daemon.sh
  • sbin/start-slaves.sh:启动worker进程
    • sbin/spark-config.sh
    • bin/load-spark-env.sh
    • sbin/slaves.sh
    • sbin/start-slave.sh
      • sbin/spark-config.sh
      • bin/load-spark-env.sh
      • sbin/spark-daemon.sh

三、快速入门

3.1 主要对象

  • SparkContext:Spark程序的入口点,封装了整个spark运行环境的信息。
  • RDD:resilient distributed dataset,弹性分布式数据集。等价于集合。

3.2 编程单词计数程序(Standalone模式)

3.2.1 Scala

  1. 添加scala编译插件(不然打出的jar包没有sacla的类)

    xml
    <plugin>
    <groupId>net.alchim31.maven</groupId>
    <artifactId>scala-maven-plugin</artifactId>
    <version>3.2.2</version>
    <configuration>
    <recompileMode>incremental</recompileMode>
    </configuration>
    <executions>
    <execution>
    <goals>
    <goal>compile</goal>
    <goal>testCompile</goal>
    </goals>
    </execution>
    </executions>
    </plugin>
  2. 编写Scala文件

    scala
    package cn.wangbowen.spark.scala

    import org.apache.spark.{SparkConf, SparkContext}

    object WordCount {
    def main(args: Array[String]): Unit = {
    // 创建spark配置对象
    val conf = new SparkConf().setAppName("WordCount")
    // 本地模式
    //conf.setMaster("local")

    val sc = new SparkContext(conf)
    // 加载文本文件
    val rdd = sc.textFile(args(0))
    // 分割并炸开
    rdd.flatMap(line => line.split(" "))
    // 映射二元组
    .map((_, 1))
    // 根据Key分组聚合
    .reduceByKey(_ + _)
    // 返回数组结果
    .collect()
    // 打印结果
    .foreach(println)
    }
    }
  3. MAVEN打包,上传

  4. 创建一个文本文件,并上传到HDFS

    Code
    $vi wordcount.txt
    hello world
    hello scala
    hello spark
    hello spark

    $start-dfs.sh
    $hdfs dfs -put wordcount.txt /tmp/
  5. 运行作业(注意哪个namenode是激活态用哪个,这里是s201)

    Code
    $spark-submit --master spark://s201:7077 --class cn.wangbowen.spark.scala.WordCount Spark-1.0-SNAPSHOT.jar hdfs://s201:8020/tmp/wordcount.txt

3.2.2 Java

  1. 编写代码

    java
    package cn.wangbowen.spark.java;

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;

    /**
    * WordCount class
    *
    * @author BoWenWang
    * @date 2020/4/29 22:06
    */
    public class WordCount {
    public static void main(String[] args) {
    // 创建SparkConf对象
    SparkConf conf = new SparkConf();
    conf.setAppName("WordCount");

    // 创建JAVA SC
    JavaSparkContext sc = new JavaSparkContext(conf);
    // 加载文本
    JavaRDD<String> rdd = sc.textFile(args[0]);
    // 压扁。这里的泛型,第一个都是输入类型,后面跟输出类型
    JavaRDD<String> rdd1 = rdd.flatMap(new FlatMapFunction<String, String>() {
    public Iterator<String> call(String s) throws Exception {
    String[] words = s.split(" ");
    List<String> list = new ArrayList<String>(Arrays.asList(words));
    return list.iterator();
    }
    });
    // 映射
    JavaPairRDD<String, Integer> rdd2 = rdd1.mapToPair(new PairFunction<String, String, Integer>() {
    public Tuple2<String, Integer> call(String s) throws Exception {
    return new Tuple2<String, Integer>(s, 1);
    }
    });
    // reduce
    JavaPairRDD<String, Integer> rdd3 = rdd2.reduceByKey(new Function2<Integer, Integer, Integer>() {
    public Integer call(Integer integer, Integer integer2) throws Exception {
    return integer + integer2;
    }
    });
    List<Tuple2<String, Integer>> collect = rdd3.collect();
    for (Tuple2<String, Integer> tuple2 : collect) {
    System.out.println(tuple2._1 + ": " + tuple2._2);
    }
    }
    }
  2. MAVEN打包,上传

  3. 运行

    Code
    $spark-submit --master spark://s201:7077 --class cn.wangbowen.spark.java.WordCount Spark-1.0-SNAPSHOT.jar hdfs://s201:8020/tmp/wordcount.txt

四、RDD 弹性分布式数据集

是spark的基本数据结构,是不可变数据集。RDD中的数据集进行逻辑分区,每个分区可以单独在集群节点进行计算。可以包含任何java,scala,python和自定义类型。

  • RDD是只读的记录分区集合。

  • RDD具有容错机制。

  • 创建RDD方式:

    1. 并行化一个现有集合。
    2. 外部存储。
  • 内存处理计算。在job间进行数据共享。内存的IO速率高于网络和disk的10 ~ 100之间(hadoop 花费90%时间用户rw)。

  • 内部包含5个主要属性:

    1. 分区列表
    2. 针对每个split的计算函数。
    3. 对其他rdd的依赖列表
    4. 可选,如果是KeyValueRDD的话,可以带分区类。
    5. 可选,首选块位置列表(hdfs block location);

4.1 并发度

Code
// 5.
local.backend.defaultParallelism() = scheduler.conf.getInt("spark.default.parallelism", totalCores)
// 4.
taskScheduler.defaultParallelism = backend.defaultParallelism()
// 3.
sc.defaultParallelism =...; taskScheduler.defaultParallelism
// 2.取最小值
defaultMinPartitions = math.min(defaultParallelism, 2)
// 1.输入文本文件,第二个参数是并发度
sc.textFile(path,defaultMinPartitions) //1,2

4.2 RDD 变换

返回指向新rdd的指针,在rdd之间创建依赖关系。每个rdd都有计算函数和指向父RDD的指针。

函数名 参数 解释
map (T) => V 对每个元素进行变换,应用变换函数。即映射,将数据集中的每个元素进行变化出新的元素。
filter (T) => Boolean 过滤器。过滤掉返回false的数据。
flatMap (T) => TraversableOnce[U] 压扁。对于返回的一组数据,分别拆分成单个数据。
mapPartitions Iterator[T] => Iterator[U] 对每个分区进行应用变换,输入的Iterator,返回新的迭代器,可以对分区进行函数处理。迭代器内容是该分区里的数据集。
mapPartitionsWithIndex (Int, Iterator[T]) => Iterator[U] 同上,可以指定分区。
sample withReplacement: Boolean,fraction: Double,seed: Long = {} 采样。返回采样的RDD子集。
union RDD[T] 类似于mysql union操作【select * from persons where id < 10 union select * from id persons where id > 29 ;】
intersection RDD[T] 交集。提取两个rdd中都含有的元素。
distinct 可选[numTasks] 去重。去除重复的元素。
groupByKey (K,V) => (K,Iterable[V]) 根据Key分组,相同Key的值被放入一个迭代器。
reduceByKey (V, V) => V 按key聚合。
sortByKey 排序
join RDD[T], [numTasks] 连接。(K,V).join(K,W) =>(K,(V,W))。只会对两个RDD相同Key的数据进行合并成一个元组,一个RDD有,而另一个没有的,不会进行连接。
cogroup RDD[T] 协分组。对标JOIN,将相同KEY分到一组。(K,V).cogroup(K,W) =>(K, (Iterable[V], Iterable[W]))
cartesian RDD[T] 笛卡尔积。RDD[T] RDD[U] => RDD[(T,U)]
pipe cmd:String 将rdd的元素传递给脚本或者命令,执行结果返回形成新的RDD
coalesce numPartitions:Int 减少分区
repartition 可增可减

4.3 RDD 动作

一个RDD数据集经过RDD变换不会立即执行,只有遇到RDD动作的时候才会开始执行。

函数名 参数 解释
collect 收集rdd元素形成数组。
count 统计rdd元素的个数
reduce 聚合,返回一个值。
first 取出第一个元素take(1)
take n:String 取出前 N个元素
saveAsTextFile path:String 保存到文件
saveAsSequenceFile path:String 保存成序列文件
saveAsObjectFile path:String Java and Scala
countByKey 按照key,统计每个key下value的个数

五、Spark核心API

六、Spark任务提交流程

七、依赖

Code
NarrowDependency:	子RDD的每个分区依赖于父RDD的少量分区。
|
/ \
---
|---- OneToOneDependency //父子RDD之间的分区存在一对一关系。
|---- RangeDependency //父RDD的一个分区范围和子RDD存在一对一关系。
|---- OneToOneDependency //父子RDD之间的分区存在一对一关系。

ShuffleDependency //依赖,在shuffle阶段输出时的一种依赖。

PruneDependency //在PartitionPruningRDD和其父RDD之间的依赖
//子RDD包含了父RDD的分区子集。

八、持久化

九、共享变量

map(),filter()高级函数中访问的对象被串行化到各个节点。每个节点都有一份拷贝。变量值并不会回传到driver程序。spark通过广播变量和累加器实现共享变量。

9.1 广播变量

scala
//创建广播变量
val bc1 = sc.broadcast(Array(1,2,3))
bc1.value

9.2 累加器

scala
val ac1 = sc.longaccumulator("ac1")
ac1.value
sc.parell..(1 to 10).map(_ * 2).map(e=>{ac1.add(1) ; e}).reduce(_+_)
ac1.value //10

十、Spark SQL

10.1 Scala 版

10.1.1 创建 DataFrame 收据框

scala
// 创建样例类
scala> case class Person(id:Int, name:String, age:Int)
defined class Person
// 创建数据,并转换成RDD
scala> val arr = Array("1,tom,12", "2,wang,13", "3,li,16")
arr: Array[String] = Array(1,tom,12, 2,wang,13, 3,li,16)

scala> val rdd1 = sc.makeRDD(arr)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at makeRDD at <console>:26
// 创建对象RDD(即将字符串RDD转换成一个对象)
scala> val rdd2 = rdd1.map(e => {val pars = e.split(","); Person(pars(0).toInt, pars(1), pars(2).toInt)})
rdd2: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[2] at map at <console>:27
// 根据RDD创建DataFrame
scala> val df = spark.createDataFrame(rdd2)
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]
// 打印表结构
scala> df.printSchema
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- age: integer (nullable = false)

scala> df.show
+---+----+---+
| id|name|age|
+---+----+---+
| 1| tom| 12|
| 2|wang| 13|
| 3| li| 16|
+---+----+---+

10.1.2 创建临时视图

scala
// 根据DataFrame在内存中创建一个临时的视图
scala> df.createOrReplaceTempView("persons")
// 利用sparkSQL,根据内存中的视图,返回一个DataFrame
scala> val df2 = spark.sql("select * from persons")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]
scala> df2.show
+---+----+---+
| id|name|age|
+---+----+---+
| 1| tom| 12|
| 2|wang| 13|
| 3| li| 16|
+---+----+---+

10.1.3 查询数据

通过SQL查询

scala
// 也可以添加条件查询,返回DataFrame
scala> val df3 = spark.sql("select * from persons where id > 1")
df3: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> val df4 = spark.sql("select * from persons where id < 1")
df4: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]
// 然后创建视图
scala> df3.createOrReplaceTempView("p1")
scala> df4.createOrReplaceTempView("p2")
// 然后查询
scala> spark.sql("select * from p1 union select * from p2").show
+---+----+---+
| id|name|age|
+---+----+---+
| 3| li| 16|
| 2|wang| 13|
+---+----+---+

通过API查询

scala
// 通过函数,返回Dataset(DataFram实际上就是Dataset)
scala> df3.union(df4)
res8: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string ... 1 more field]
// 显示
scala> res8.show
+---+----+---+
| id|name|age|
+---+----+---+
| 2|wang| 13|
| 3| li| 16|
+---+----+---+

// 其他API
// 查询指定字段
scala> df.selectExpr("id", "name").show
+---+----+
| id|name|
+---+----+
| 1| tom|
| 2|wang|
| 3| li|
+---+----+

// where条件查询
scala> df.where("name like 't%'").show
+---+----+---+
| id|name|age|
+---+----+---+
| 1| tom| 12|
+---+----+---+

// 聚合函数
scala> df.agg(sum("age"),max("age")).show
+--------+--------+
|sum(age)|max(age)|
+--------+--------+
| 41| 16|
+--------+--------+

10.2 Java 版

导入依赖

xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
</dependency>

10.2.1 处理 json 数据

创建数据文件

json
{"id":1,"name":"tom1","age":11}
{"id":2,"name":"tom2","age":12}
{"id":3,"name":"tom3","age":13}
{"id":4,"name":"tom4","age":14}
{"id":5,"name":"tom5","age":15}
{"id":6,"name":"tom6","age":16}

编写代码

java
package cn.wangbowen.spark.java;

import org.apache.directory.shared.kerberos.codec.apRep.actions.ApRepInit;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

/**
* JsonFileIO class
*
* @author BoWenWang
* @date 2020/5/5 11:47
*/
public class JsonFileIO {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("SQL-java-Json")
.config("spark.master", "local") // 本地模式
.getOrCreate();

// 读取文件
Dataset<Row> dataset = sparkSession.read().json("file:///d:/tmp/jsonSQL.dat");
dataset.show();
// 创建临时视图
dataset.createOrReplaceTempView("persons");
// SQL
Dataset<Row> result = sparkSession.sql("select * from persons where id > 2");
result.show();
// 写入文件(目录)
result.write().json("file:///d:/tmp/jsonSQLOutDir");


// DataSet 转 RDD (可以进行一些数据操作!)
JavaRDD<Row> javaRDD = dataset.toJavaRDD();
javaRDD.filter(new Function<Row, Boolean>() {
public Boolean call(Row row) throws Exception {
Long id = (Long) row.getAs("id");
if (id > 3) {
return true;
}
return false;
}
}).foreach(new VoidFunction<Row>() {
public void call(Row row) throws Exception {
System.out.println(row);
}
});

}
}

输出结果

Code
// 从文件读取的数据
+---+---+----+
|age| id|name|
+---+---+----+
| 11| 1|tom1|
| 12| 2|tom2|
| 13| 3|tom3|
| 14| 4|tom4|
| 15| 5|tom5|
| 16| 6|tom6|
+---+---+----+

// 经过SQL语句过滤的
+---+---+----+
|age| id|name|
+---+---+----+
| 13| 3|tom3|
| 14| 4|tom4|
| 15| 5|tom5|
| 16| 6|tom6|
+---+---+----+

// 最后输出到文件的
{"age":13,"id":3,"name":"tom3"}
{"age":14,"id":4,"name":"tom4"}
{"age":15,"id":5,"name":"tom5"}
{"age":16,"id":6,"name":"tom6"}

10.2.2 DataSet 转 RDD

完整代码在上面JSON代码里

java
// DataSet 转 RDD (可以进行一些数据操作!)
JavaRDD<Row> javaRDD = dataset.toJavaRDD();
javaRDD.filter(new Function<Row, Boolean>() {
public Boolean call(Row row) throws Exception {
Long id = (Long) row.getAs("id");
if (id > 3) {
return true;
}
return false;
}
}).foreach(new VoidFunction<Row>() {
public void call(Row row) throws Exception {
System.out.println(row);
}
});

输出结果

Code
[14,4,tom4]
[15,5,tom5]
[16,6,tom6]

10.2.3 处理 jdbc 数据

添加依赖

xml
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>

代码

java
package cn.wangbowen.spark.java;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.Properties;

/**
* JdbcIO class
*
* @author BoWenWang
* @date 2020/5/5 13:30
*/
public class JdbcIO {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("SQL-java-jdbc")
.config("spark.master", "local") // 本地模式
.getOrCreate();

// 配置数据库信息
String url = "jdbc:mysql:///test";
String table = "user";
Properties prop = new Properties();
prop.put("user", "root");
prop.put("password", "Bow1024");
prop.put("driver", "com.mysql.jdbc.Driver");
// 读取数据库数据
Dataset<Row> dataset = sparkSession.read().jdbc(url, table, prop);
dataset.show();
// 查询操作
Dataset<Row> result = dataset.select(new Column("id"), new Column("name")).where("id > 1");
// 写入数据库
result.write().jdbc(url, "sparkSQL", prop);
}
}

输出结果

Code
// 数据库读取的内容
+---+--------+--------+----+
| id|username|password|name|
+---+--------+--------+----+
| 1|zhangsan| 123|张三|
| 2| lisi| 123|李四|
+---+--------+--------+----+

数据库查询结果

avatar

10.3 整合 Hive

10.3.1 linux环境

  1. 复制core-site.xml(hdfs) + hdfs-site.xml(hdfs) + hive-site.xml(hive)三个文件到spark/conf下。(分发到所有节点)

  2. 复制hive/lib下mysql驱动程序到/soft/spark/jars下(分发到所有节点)

  3. 启动spark-shell,指定启动模式

    Code
    $spark-shell --master spark://s201:7077

    # 创建HIVE表
    scala> spark.sql("CREATE TABLE IF NOT EXISTS mydb.sparkHive(id int, name string, age int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE")

    # 本地创建数据文件
    1,tom1,11
    2,tom2,12
    3,tom3,13
    4,tom4,14
    5,tom5,15

    # 加载数据到HIVE表
    scala> spark.sql("load data local inpath 'file:///home/wbw/tmp/data.txt' into table mydb.sparkHive")

    # 查看HIVE表
    scala> spark.sql("select * from mydb.sparkHive").show
    +----+----+----+
    | id|name| age|
    +----+----+----+
    | 1|tom1| 11|
    | 2|tom2| 12|
    | 3|tom3| 13|
    | 4|tom4| 14|
    | 5|tom5| 15|
    |null|null|null|
    |null|null|null|
    +----+----+----+

10.3.2 IDEA环境(JAVA)

  1. 复制core-site.xml(hdfs) + hdfs-site.xml(hdfs) + hive-site.xml(hive)三个文件到resources目录下

  2. 编码

    java
    package cn.wangbowen.spark.java;

    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;

    /**
    * SparkHive class
    *
    * @author BoWenWang
    * @date 2020/5/5 15:59
    */
    public class SparkHive {
    public static void main(String[] args) {
    SparkSession sparkSession = SparkSession
    .builder()
    .appName("SQL-java-hive")
    .config("spark.master", "local") // 本地模式
    .enableHiveSupport() // 这个是关键!
    .getOrCreate();

    Dataset<Row> dataset = sparkSession.sql("select * from mydb.sparkHive");
    dataset.show();
    }
    }

10.3.3 可能遇到的问题

Spark2.3.0集成hive3.1.1遇到的一个坑HikariCP

https://blog.csdn.net/weixin_44166276/article/details/85088998

TIP:我是修改了hive-site.xml中

  1. datanucleus.connectionPoolingType改成dbcp

  2. 然后关闭了版本验证hive.metastore.schema.verification

    xml
    <property>
    <name>hive.metastore.schema.verification</name>
    <value>false</value>
    </property>

IDEA下权限不足 Permission denied: user=BoWenWang, access=EXECUTE, inode=”/tmp”:wbw:supergroup:drwx——

Code
# 哪个缺权限,就给哪个
hdfs dfs -chmod 777 /tmp

10.4 SQL查询引擎

相当于嵌套了一层,通过JDBC的途径间接调用。

  1. 启动spark集群(完全分布式-standalone)

    $>/soft/spark/sbin/start-all.sh
       master        //201
       worker        //202 ~ 204
  2. 创建hive的数据表在默认库下。

    $>hive -e "create table tt(id int,name string , age int) row format delimited fields terminated by ',' lines terminated by '\n' stored as textfile"
  3. 加载数据到hive表中.

    $>hive -e "load data local inpath 'file:///home/centos/data.txt' into table tt"
       $>hive -e "select * from tt"
  4. 分发三个文件到所有worker节点

  5. 启动spark集群

    $>soft/spark/sbin/start-all.sh
  6. 启动spark-shell

    $>spark-shell --master spark://s201:7077
  7. 启动thriftserver服务器

    $>start

10.5 拓展和总结

10.5.1 使用场景

处理结构化数据:

  • 即席/普通查询文件中的数据
  • 对流数据处理(SparkStreaming)
  • 使用SQL的方式处理ETL
  • 对外部数据库进行查询

10.5.2 加载数据

  • 直接加载数据到DataFrame中
  • 可以加载数据到RDD然后做相应的转换
  • 可以从本地或云端加载数据

10.5.3 DataFrame函数 和 SQL

  • DataFrame = RDD + Schema:表格 = 数据 + 结构

  • DataFrame只是DataSet的Row类型的别名

  • DataFrame可以处理Text、JSON、parquet等(即外部数据源)

  • 不管用DF的API还是SQL,最后生成的逻辑执行计划都是一样的,优化后效率都是一样的

10.5.4 Schema

  • 隐式:JSON、parquet、ORC等自动推导
  • 显示:
    • 先创建RDD
    • 然后创建一个schema
    • 然后整合

10.5.5 加载和保存结果

SaveMode:存在报错、Append追加、Overwrite覆盖、不理睬

spark.read.format(“json”).load(“file:///…/…”)

df.write.format(“parquet”).mode(“overwrite”).save(“file:///…/…”)

如果保存到数据库的话:saveAsTable

还可以分区写出去df.toDF.write.partitionBy(“year”,”month”).avor(“…”)

10.5.6 SQL函数覆盖

支持了好多SQL语法

10.5.7 复杂JSON处理

数组情况

select name,nums[1] from json_table

多层嵌套可以用A.B

10.5.8 外部数据源

  • 关系型数据库,需要JDBC jars

  • spark-packeages.org(就是format不一样.将原来JSON因为是自带的,可以直接写JSON,不然就要写对应的包。可以去那个网站上找)

    spark.read.format(“com.databricks.spark.avro”).load(“…”)

十一、Spark Streaming

11.1 介绍

是spark core的扩展,针对实时数据流处理,具有可扩展、高吞吐量、容错。数据可以是来自于kafka,flume,tcpsocket,使用高级函数(map reduce filter ,join , windows),
处理的数据可以推送到database,hdfs,针对数据流处理可以应用到机器学习和图计算中。

内部,spark接受实时数据流,分成batch(分批次)进行处理,最终在每个batch终产生结果stream。

discretized stream or DStream
离散流,表示的是连续的数据流。
通过kafka、flume等输入数据流产生,也可以通过对其他DStream进行高阶变换产生。
在内部,DStream是表现为RDD序列。

StreamingContext

  1. 启动上下文之后,不能启动新的流或者添加新的
  2. 上下文停止后不能restart.
  3. 同一JVM只有一个active的streamingcontext
  4. 停止streamingContext会一同stop掉SparkContext,如若只停止StreamingContext.ssc.stop(false|true);
  5. SparkContext可以创建多个StreamingContext,创建新的之前停掉旧的。

11.2 快速入门(单词计数)

添加pom依赖

xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.3</version>
</dependency>

11.2.1 Scala + 本地

代码

scala
package cn.wangbowen.spark.scala

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamingWordCountScala {
def main(args: Array[String]): Unit = {
// 注意这个local[2],线程数不能1个,因为要有1个用来接受数据,一个来处理
val conf = new SparkConf().setMaster("local[2]").setAppName("WC-Streaming-Scala")
// 创建SparkStreaming上下文,批次时长是1秒
val ssc = new StreamingContext(conf, Seconds(1))

// 创建socket文本流(即数据源)
val line = ssc.socketTextStream("localhost", 9999)

// 对数据进行处理
val words = line.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val count = pairs.reduceByKey(_ + _)
// 要有输出,程序才能执行
count.print()

// 启动作业
ssc.start()
// 等待结束(不会停下来,只有调用stop())
ssc.awaitTermination()
}
}

运行步骤

  1. 开启nc服务器(【安装教程】https://blog.csdn.net/weixin_38842096/article/details/85720559)

    Code
    cmd> nc -lL -p 9999
  2. 启动sparkStreaming程序

  3. 在nc命令行中输入

    Code
    hello spark streaming
  4. 查看IDEA控制台输出

    Code
    -------------------------------------------
    Time: 1588831126000 ms
    -------------------------------------------
    (hello,1)
    (streaming,1)
    (spark,1)

11.2.2 Java + 集群

代码

java
package cn.wangbowen.spark.java;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

/**
* SparkStreamingWordCountJava class
*
* @author BoWenWang
* @date 2020/5/7 14:03
*/
public class SparkStreamingWordCountJava {
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf().setAppName("WC-Streaming-Java").setMaster("spark://s201:7077");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(2));

final JavaReceiverInputDStream<String> line = jsc.socketTextStream("s201", 9999);

JavaDStream<String> words = line.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
String[] words = s.split(" ");
return Arrays.asList(words).iterator();
}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});
JavaPairDStream<String, Integer> count = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
count.print();

jsc.start();
jsc.awaitTermination();
}
}

运行步骤

  1. 打jar包,上传导集群

  2. 在s201打开nc服务器

    Code
    nc -l -p 9999
  3. 提交作业

    Code
    $spark-submit --class cn.wangbowen.spark.java.SparkStreamingWordCountJava Spark-1.0-SNAPSHOT.jar
  4. 等待启动成功后,在NC中输入

    Code
    hello world count
  5. 查看运行窗口

    Code
    -------------------------------------------
    Time: 1588833156000 ms
    -------------------------------------------
    (hello,1)
    (world,1)
    (count,1)

11.3 DStream 和 Receiver

11.3.1 DSteam

11.3.2 Receiver

  1. 介绍:Receiver是接受者,从source接受数据,存储在内存中共spark处理。
    • 基本源:fileSystem | socket,内置API支持。
    • 高级源:kafka | flume | …,需要引入pom.xml依赖.
  2. 注意:使用local模式时,不能使用一个线程.使用的local[n],n需要大于receiver的个数。

11.4 Kafka 集成

  1. 确保ZK集群开启

  2. 启动Kafka,在S202~S204分别运行

    Code
    $cd /soft/kafka
    $bin/kafka-server-start.sh -daemon config/server.properties
  3. 创建kafka主题,查看主题列表

    Code
    $kafka-topics.sh --create --bootstrap-server s202:9092 --replication-factor 2 --partitions 2 --topic mytopic1
    # 查看主题列表
    $kafka-topics.sh --list --bootstrap-server s202:9092
  4. 导入依赖

    xml
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.4.3</version>
    </dependency>
  5. 编写代码

    java
    package cn.wangbowen.spark.java;

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Seconds;
    import org.apache.spark.streaming.api.java.*;
    import org.apache.spark.streaming.kafka010.ConsumerStrategies;
    import org.apache.spark.streaming.kafka010.KafkaUtils;
    import org.apache.spark.streaming.kafka010.LocationStrategies;
    import scala.Tuple2;

    import java.util.*;

    /**
    * SparkStreamingWordCountJava class
    *
    * @author BoWenWang
    * @date 2020/5/7 14:03
    */
    public class SparkStreamingWordCountJavaForKafka {
    public static void main(String[] args) throws InterruptedException {
    SparkConf sparkConf = new SparkConf().setAppName("WC-Streaming-Java-Kafka").setMaster("local[2]");
    JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(2));

    // Kafka连接配置
    Map<String, Object> kafkaParams = new HashMap<>(8);
    kafkaParams.put("bootstrap.servers", "s202:9092,s203:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("group.id", "g6");
    kafkaParams.put("auto.offset.reset", "latest");
    kafkaParams.put("enable.auto.commit", false);

    // 订阅主题列表
    Collection<String> topics = Collections.singletonList("mytopic1");

    // 数据源
    final JavaInputDStream<ConsumerRecord<String, String>> stream =
    KafkaUtils.createDirectStream(
    jsc,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
    );

    // 压扁操作(消费数据源)
    JavaDStream<String> words = stream.flatMap(new FlatMapFunction<ConsumerRecord<String, String>, String>() {
    @Override
    public Iterator<String> call(ConsumerRecord<String, String> consumerRecord) throws Exception {
    // consumerRecord.key(): kafka消息由K-V组成,这里是控制台发送所以Key为null
    String[] words = consumerRecord.value().split(" ");
    return Arrays.asList(words).iterator();
    }
    });
    JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
    @Override
    public Tuple2<String, Integer> call(String s) throws Exception {
    return new Tuple2<>(s, 1);
    }
    });
    JavaPairDStream<String, Integer> count = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer integer, Integer integer2) throws Exception {
    return integer + integer2;
    }
    });
    count.print();

    jsc.start();
    jsc.awaitTermination();
    }
    }
  6. 开启Kafka控制台生产者

    Code
    $kafka-console-producer.sh --broker-list s202:9092 --topic mytopic1
  7. 运行程序(Run)

  8. 在生产者控制台发送

    Code
    >hello spark streaming for kafka
  9. 观察IDEA控制台输出

    Code
    -------------------------------------------
    Time: 1588835416000 ms
    -------------------------------------------
    (hello,1)
    (streaming,1)
    (kafka,1)
    (spark,1)
    (for,1)

11.5 状态更新

11.5.1 updateStateByKey

可以在指定的批次间隔内返回之前的全部历史数据,包括新增的,改变的和没有改变的。由于updateStateByKey在使用的时候一定要做checkpoint,当数据量过大的时候,checkpoint会占据庞大的数据量,会影响性能,效率不高。

适用场景

updateStateByKey可以用来统计历史数据。例如统计不同时间段用户平均消费金额,消费次数,消费总额,网站的不同时间段的访问量等指标。

11.5.2 mapWithState

只返回变化后的key的值,这样做的好处是,我们可以只是关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key的数据。这样的话,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储,效率比较高(再生产环境中建议使用这个)。

适用场景

mapWithState可以用于一些实时性较高,延迟较少的一些场景,例如你在某宝上下单买了个东西,付款之后返回你账户里的余额信息。

版权声明:本文为CSDN博主「爱是与世界平行」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/an1090239782/article/details/102832444

11.5.3 代码示例

代码

java
package cn.wangbowen.spark.java;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

/**
* SparkStreamingWordCountJava class
* 状态更新
*
* @author BoWenWang
* @date 2020/5/7 14:03
*/
public class SparkStreamingStatus {
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf().setAppName("WC-Streaming-Status").setMaster("local[2]");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(2));

final JavaReceiverInputDStream<String> line = jsc.socketTextStream("localhost", 9999);

JavaDStream<String> words = line.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
String[] words = s.split(" ");
return Arrays.asList(words).iterator();
}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});
JavaPairDStream<String, Integer> count = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
// 打印当前窗口数据
count.print();

// 设置检查点(使用updateStateByKey必须设置检查点目录)
jsc.checkpoint("file:///d:/tmp/sparkCheckPoint");
// 这里做状态更新
JavaPairDStream<String, Integer> historyCount = count.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
/**
* @param integers 这里是相同的Key为一组进来接受处理
* @param optional 这里是上一个旧的值
*/
@Override
public Optional<Integer> call(List<Integer> integers, Optional<Integer> optional) throws Exception {
// 如果没有上一个值(即Null),那么取0
Integer newCounter = optional.orElse(0);
// 累加值
for (Integer integer : integers) {
newCounter += integer;
}
return Optional.of(newCounter);
}
});
// 打印状态更新值,即累计值
historyCount.print();

jsc.start();
jsc.awaitTermination();
}
}

运行步骤

  1. 打开nc,间隔一段时间发送一次数据

    Code
    C:\Users\XXX>nc -lL -p 9999
    hello
    hello
    hello
    hello
  2. 查看IDEA控制台打印信息

    Code
    // count.print(); 当前窗口统计值
    -------------------------------------------
    Time: 1588837904000 ms
    -------------------------------------------
    (hello,1)

    // historyCount.print(); 历史累积值
    -------------------------------------------
    Time: 1588837904000 ms
    -------------------------------------------
    (hello,4)

11.6 窗口化操作

  • batch interval : 批次的间隔。

  • windows length : 窗口长度,跨批次。是批次的整数倍。

  • slide interval : 滑动间隔,窗口计算的间隔时间,是批次interval的整倍数。

比如:

batch interval = 2,windows length = 6,slide interval =4

统计6秒内的热词,每2秒计算一次接收到的数据,每4秒刷新一次。

avatar

代码

java
package cn.wangbowen.spark.java;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

/**
* SparkStreamingWordCountJava class
*
* @author BoWenWang
* @date 2020/5/7 14:03
*/
public class SparkStreamingWindows {
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf().setAppName("WC-Streaming-Windows").setMaster("local[2]");

// 批次的间隔:每2秒计算一次接收到的数据
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(2));

final JavaReceiverInputDStream<String> line = jsc.socketTextStream("localhost", 9999);

JavaDStream<String> words = line.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
String[] words = s.split(" ");
return Arrays.asList(words).iterator();
}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});

// 窗口机制
JavaPairDStream<String, Integer> count = pairs.reduceByKeyAndWindow(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
},
new Duration(6 * 1000), // 窗口长度
new Duration(4 * 1000) // 滑动窗口长度
);
count.print();

jsc.start();
jsc.awaitTermination();
}
}

运行过程

  1. 开启nc

  2. 运行程序

  3. 发送数据(每隔1秒发一次)

    Code
    C:\Users\XXX>nc -lL -p 9999
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
  4. 查看输出结果(每次输出间隔 4s = 4000ms,每2秒计算一次,用[]表示2秒的数据,显示6秒内的数据)

    Code
    -------------------------------------------
    Time: 1588839240000 ms
    -------------------------------------------

    -------------------------------------------
    Time: 1588839244000 ms
    -------------------------------------------
    (2,1)
    (1,1)

    -------------------------------------------
    Time: 1588839248000 ms
    -------------------------------------------
    (4,1)
    (6,1)
    (2,1)
    (5,1)
    (3,1)
    (1,1)

    -------------------------------------------
    Time: 1588839252000 ms
    -------------------------------------------
    (8,1)
    (6,1)
    (7,1)
    (5,1)
    (9,1)

    -------------------------------------------
    Time: 1588839256000 ms
    -------------------------------------------
    (12,1)
    (13,1)
    (9,1)
    (11,1)
    (10,1)

    -------------------------------------------
    Time: 1588839260000 ms
    13
    -------------------------------------------
    (13,1)

11.7 容错处理

11.7.1 生产环境中spark streaming的job的注意事项

  • 避免单点故障
  • Driver:驱动,运行用户编写的程序代码的主机。
  • Excutors:执行的spark driver提交的job,内部含有附加组件比如receiver。receiver接受数据并以block方式保存在memory中,同时,将数据块复制到其他executor中,已备于容错。每个批次末端会形成新的DStream,交给下游处理。如果receiver故障,其他执行器中的receiver会启动进行数据的接收。

avatar

11.7.2 spark streaming中的容错实现

如果executor故障,所有未被处理的数据都会丢失,解决办法可以通过wal(hbase,hdfs/WALs)方式将数据预先写入到hdfs或者s3.

如果Driver故障,driver程序就会停止,所有executor都是丢失连接,停止计算过程。解决办法需要配置和编程。

流程

  1. 配置Driver程序自动重启,使用特定的clustermanager实现。

  2. 重启时,从宕机的地方进行重启,通过检查点机制可以实现该功能。

    Code
    // 设置检查点目录可以是本地,可以是hdfs.
    jsc.checkpoint("d://....");
    // 不再使用new方式创建SparkStreamContext对象,而是通过工厂方式.JavaStreamingContext.getOrCreate()方法创建上下文对象,首先会检查检查点目录,看是否有job运行,没有就new新的。

代码

java
package cn.wangbowen.spark.java;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.api.java.function.Function0;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

/**
* SparkStreamingRestart class
* 容错处理
*
* @author BoWenWang
* @date 2020/5/7 17:22
*/
public class SparkStreamingRestart {
public static void main(String[] args) throws InterruptedException {

// 上下文工厂函数,用来创建上下文
Function0<JavaStreamingContext> contextFactory = new Function0<JavaStreamingContext>() {
@Override
public JavaStreamingContext call() throws Exception {
// 这里复用了之前的更新状态内容
SparkConf sparkConf = new SparkConf().setAppName("WC-Streaming-ReStart").setMaster("local[2]");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(2));

final JavaReceiverInputDStream<String> line = jsc.socketTextStream("localhost", 9999);

JavaDStream<String> words = line.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
String[] words = s.split(" ");
return Arrays.asList(words).iterator();
}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});
JavaPairDStream<String, Integer> count = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});

// 这里做状态更新
JavaPairDStream<String, Integer> historyCount = count.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
/**
* @param integers 这里是相同的Key为一组进来接受处理
* @param optional 这里是上一个旧的值
*/
@Override
public Optional<Integer> call(List<Integer> integers, Optional<Integer> optional) throws Exception {
Integer newCounter = optional.orElse(0);
for (Integer integer : integers) {
newCounter += integer;
}
return Optional.of(newCounter);
}
});
historyCount.print();

// 设置检查点(要与下面的路径相同,启动的时候先检查该目录,有问题的时候也是写入到该目录)
jsc.checkpoint("file:///d:/tmp/ReStartCheckPoint");
return jsc;
}

};


// 每次启动的时候,先检查检查点,如果之间有断开,那么重启。否则新建一个上下文
JavaStreamingContext context = JavaStreamingContext.getOrCreate("file:///d:/tmp/ReStartCheckPoint",
contextFactory);

context.start();
context.awaitTermination();
}
}

运行步骤

  1. 开启NC

  2. 运行程序

  3. 发送数据

    Code
    C:\Users\BoWenWang>nc -lL -p 9999
    hello
    hello
    hello
    hello
    hello
  4. 观察控制台输出

    Code
    -------------------------------------------
    Time: 1588844410000 ms
    -------------------------------------------
    (hello,5)
  5. 直接杀掉程序(日志)

    Code
    20/05/07 17:44:20 INFO CheckpointWriter: Deleting file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844650000
    20/05/07 17:44:20 INFO CheckpointWriter: Checkpoint for time 1588844660000 ms saved to file 'file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844660000', took 4337 bytes and 23 ms
    20/05/07 17:44:20 INFO DStreamGraph: Clearing checkpoint data for time 1588844660000 ms
    20/05/07 17:44:20 INFO DStreamGraph: Cleared checkpoint data for time 1588844660000 ms
    20/05/07 17:44:20 INFO ReceivedBlockTracker: Deleting batches: 1588844638000 ms
    20/05/07 17:44:20 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Attempting to clear 0 old log files in file:/d:/tmp/ReStartCheckPoint/receivedBlockMetadata older than 1588844640000:
    20/05/07 17:44:20 INFO InputInfoTracker: remove old batch metadata: 1588844638000 ms
  6. 重启程序(检查日志)

    Code
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    20/05/07 17:45:48 INFO CheckpointReader: Checkpoint files found: file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844660000,file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844660000.bk,file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844658000,file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844658000.bk,file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844656000,file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844656000.bk,file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844654000,file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844654000.bk,file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844652000,file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844652000.bk
    20/05/07 17:45:48 INFO CheckpointReader: Attempting to load checkpoint from file file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844660000
    20/05/07 17:45:50 INFO Checkpoint: Checkpoint for time 1588844660000 ms validated
    20/05/07 17:45:50 INFO CheckpointReader: Checkpoint successfully loaded from file file:/d:/tmp/ReStartCheckPoint/checkpoint-1588844660000
    20/05/07 17:45:50 INFO CheckpointReader: Checkpoint was generated at time 1588844660000 ms
  7. 观察控制台输出(看到恢复了)

    Code
    -------------------------------------------
    Time: 1588844660000 ms
    -------------------------------------------
    (hello,5)

11.8 将结果写入MySQL

利用11.2.1的代码,其中count.print()替换成输出到数据库即可:

scala
// 先定义一个函数获取连接
def createConnextion() = {
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("url", "username", "password")
}
// 添加新代码
count.print()
count.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
val connection = createConnextion()
partitionOfRecords.foreach(record => {
val sql = "insert into ..."
connection.createStatement().execute(sql)
})
})
})

十二、SparkApp 部署模式

决定spark作业入口程序的地方,Driver驱动。

Code
spark-submit --class xxx xx.jar --deploy-mode (client | cluster)

–deploy-mode:指定是否部署driver程序在worker节点上还是在client主机上。不论哪种方式,rdd的运算都在worker执行

12.1 闭包

RDD,resilient distributed dataset,弹性(容错)分布式数据集。

分区列表,function,dep Option(分区类, Pair[Key,Value]),首选位置。

运行job时,spark将rdd打碎变换成task,每个task由一个executor执行。执行之前,spark会进行task的闭包(closure)计算。闭包是指针对executor可见的变量和方法,以备在rdd的foreach中进行计算。闭包就是串行化后并发送给每个executor.

local模式下,所有spark程序运行在同一JVM中,共享对象,counter(变量)是可以累加的。原因是所有executor指向的是同一个引用。

cluster模式下,不可以,counter是闭包处理的。每个节点对driver上的counter是不可见的。只能看到自己内部串行化的counter副本。

12.2 client

driver运行在client主机上。client可以不在cluster中。

验证

  1. 启动spark集群

  2. 编程

    scala
    package cn.wangbowen.spark.scala

    import java.net.{InetAddress, Socket}

    import org.apache.spark.{SparkConf, SparkContext}

    object SubmitDeployMode {

    // 打印消息
    def printfInfo(str:String): Unit = {
    val ip = InetAddress.getLocalHost.getHostAddress
    val socket = new Socket("192.168.174.205", 9999)
    val out = socket.getOutputStream
    out.write((ip + ": " + str + "\r\n").getBytes())
    out.flush()
    socket.close()
    }

    def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Submit-Deploy-Mode")
    val sc = new SparkContext(conf)
    printfInfo("Driver running on this node!")
    val rdd = sc.parallelize(1 to 10, 3)
    val rdd2 = rdd.map(num => {
    printfInfo("map => " + num)
    num * 2
    })
    val count = rdd2.reduce((num1, num2) => {
    printfInfo("reduce => " + num1 + "," + num2)
    num1 + num2
    })
    printfInfo("count: " + count)
    }
    }
  3. 随便找一台机子,开启nc(这里选s205)。如果没装。用命令 $sudo yum install nc -y 安装。

    Code
    nc -lk 9999
  4. 打jar包上传到s201并执行

    Code
    $spark-submit --class cn.wangbowen.spark.scala.SubmitDeployMode --master spark://s201:7077 --deploy-mode client Spark-1.0-SNAPSHOT.jar

    打印结果

    Code
    192.168.174.201: Driver running on this node!	# 入口程序在s201上
    192.168.174.203: map => 1
    192.168.174.203: map => 2
    192.168.174.203: reduce => 2,4
    192.168.174.203: map => 3
    192.168.174.203: reduce => 6,6
    192.168.174.203: map => 4
    192.168.174.203: map => 5
    192.168.174.203: reduce => 8,10
    192.168.174.203: map => 6
    192.168.174.203: reduce => 18,12
    192.168.174.201: reduce => 12,30
    192.168.174.203: map => 7
    192.168.174.203: map => 8
    192.168.174.203: reduce => 14,16
    192.168.174.203: map => 9
    192.168.174.203: reduce => 30,18
    192.168.174.203: map => 10
    192.168.174.203: reduce => 48,20
    192.168.174.201: reduce => 42,68
    192.168.174.201: count: 110 # 最终结果返回client节点s201
  5. 将jar包上传到s202并执行

    Code
    $spark-submit --class cn.wangbowen.spark.scala.SubmitDeployMode --master spark://s201:7077 --deploy-mode client Spark-1.0-SNAPSHOT.jar

    打印结果

    Code
    192.168.174.202: Driver running on this node!	# 入口程序在s201上
    192.168.174.203: map => 1
    192.168.174.203: map => 2
    192.168.174.203: reduce => 2,4
    192.168.174.203: map => 3
    192.168.174.203: reduce => 6,6
    192.168.174.204: map => 4
    192.168.174.204: map => 5
    192.168.174.204: reduce => 8,10
    192.168.174.203: map => 7
    192.168.174.204: map => 6
    192.168.174.203: map => 8
    192.168.174.204: reduce => 18,12
    192.168.174.203: reduce => 14,16
    192.168.174.203: map => 9
    192.168.174.203: reduce => 30,18
    192.168.174.203: map => 10
    192.168.174.203: reduce => 48,20
    192.168.174.202: reduce => 12,68
    192.168.174.202: reduce => 80,30
    192.168.174.202: count: 110 # 最终结果返回client节点s202

12.3 cluster

driver程序提交给spark cluster的某个worker节点来执行。worker是cluster中的一员。导出的jar需要放置到所有worker节点都可见的位置(如hdfs)才可以。

  1. 上传jar包到hdfs

    Code
    $hdfs dfs -mkdir /demoJars
    $hdfs dfs -put Spark-1.0-SNAPSHOT.jar /demoJars
  2. 提交任务(注意模式的改变deply-mdoe和hdfs路径下的jar)

    Code
    $spark-submit --class cn.wangbowen.spark.scala.SubmitDeployMode --master spark://s201:7077 --deploy-mode cluster hdfs://mycluster/demoJars/Spark-1.0-SNAPSHOT.jar
  3. 查看结果(发现在s201上提交,结果Driver在s202)

    Code
    192.168.174.202: Driver running on this node!
    192.168.174.203: map => 1
    192.168.174.203: map => 2
    192.168.174.203: reduce => 2,4
    192.168.174.203: map => 3
    192.168.174.203: reduce => 6,6
    192.168.174.203: map => 7
    192.168.174.203: map => 8
    192.168.174.203: reduce => 14,16
    192.168.174.203: map => 9
    192.168.174.203: reduce => 30,18
    192.168.174.203: map => 10
    192.168.174.203: reduce => 48,20
    192.168.174.202: reduce => 12,68
    192.168.174.204: map => 4
    192.168.174.204: map => 5
    192.168.174.204: reduce => 8,10
    192.168.174.204: map => 6
    192.168.174.204: reduce => 18,12
    192.168.174.202: reduce => 80,30
    192.168.174.202: count: 110
  4. 再次提交(发现Driver又变了,在s204上)

    Code
    192.168.174.204: Driver running on this node!
    192.168.174.203: map => 1
    192.168.174.203: map => 2
    192.168.174.203: reduce => 2,4
    192.168.174.203: map => 3
    192.168.174.203: reduce => 6,6
    192.168.174.202: map => 4
    192.168.174.202: map => 5
    192.168.174.202: reduce => 8,10
    192.168.174.202: map => 6
    192.168.174.202: reduce => 18,12
    192.168.174.202: map => 7
    192.168.174.202: map => 8
    192.168.174.202: reduce => 14,16
    192.168.174.202: map => 9
    192.168.174.202: reduce => 30,18
    192.168.174.202: map => 10
    192.168.174.202: reduce => 48,20
    192.168.174.204: reduce => 30,68
    192.168.174.204: reduce => 98,12
    192.168.174.204: count: 110

十三、Spark集成Hive查询Hbase

假设已经按照之前的步骤HIVE集成了HBASE了。此时HBase中有一个表有两条记录,而Hive关联了HBase的表,且表名为t1。

13.1 local 模式 + spark-shell

  1. 复制hive的hive-hbase-handler-2.1.0.jar文件到spark/jars目录下。

    Code
    [wbw@s201 /soft/hive/lib]$cp hive-hbase-handler-3.1.2.jar /soft/spark/jars/
  2. 复制hive/下的metrics的jar文件到spark下

    Code
    $>cd /soft/hive/lib
    $>ls | grep metrics | cp `xargs` /soft/spark/jars
  3. 由于之间hive集成hbase时候修改了hive-site.xml这里,重新导入

    Code
    $cp hive-site.xml /soft/spark/conf/
  4. 拷贝hbase的包到spark/jars下(这部有争议,先跳过,如果有问题先看是不是13.4中的,没办法才全部导入hbase的相关包)

  5. 启动spark-shell 本地模式测试

    Code
    $spark-shell --master local[4]
    $scala>spark.sql("select * from t1").show


    +----+----+---+
    | key|name| id|
    +----+----+---+
    |row1| tom|100|
    |row2|toms| 18|
    +----+----+---+
  6. 如果报错,可能是版本验证问题和数据库连接问题。跟之前一样参考10.3.3。其他缺包等问题参考13.4

13.2 standalone 模式 + spark-shell

  1. 在spark集群上分发13.1 (1)模式下所有需要的jar包。

  2. 启动standalone模式,spark集群。

    Code
    [wbw@s201 /soft/spark/sbin]$./start-all.sh
  3. 启动spark-shell

    Code
    $spark-shell --master spark://s201:7077
    scala> spark.sql("select * from t1").show

    +----+----+---+
    | key|name| id|
    +----+----+---+
    |row1| tom|100|
    |row2|toms| 18|
    +----+----+---+

13.3 IDEA编程访问

  1. 导依赖

    xml
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.11</artifactId>
    <version>2.4.3</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-hbase-handler</artifactId>
    <version>3.1.2</version>
    </dependency>

    MAVEN发生错误?无法打包?

    不过去掉hive-hbase-handler后,打包。在standalone下可以运行。

13.4 整合常见问题

https://www.wandouip.com/t5i62606/

  1. 问题一

    Code
    spark 执行时报java.lang.ClassNotFoundException: org.apache.htrace.core.HTraceConfiguration

    解决方案

    Code
    将Hadoop中的htrace-core4-4.1.0-incubating.jar放入spark/jars下:

    [wbw@s201 /soft/hadoop-3.1.2/share/hadoop/common/lib]$cp htrace-core4-4.1.0-incubating.jar /soft/spark/jars/
  2. 问题二

    Code
    spark 执行时报java.lang.ClassNotFoundException: org.apache.hadoop.hbase.util.Bytes

    解决方案

    Code
    将hbase/lib下的hbase-common-2.2.4.jar放到spark/jars下

十四、优化

14.1 存储格式的选择

在sparkSQL中可以使用 parquet :一种文件格式类型,可以按照这个读取写出文件。

14.2 压缩格式

14.3 代码优化

  1. 选择高性能算子

    在将数据写入到数据库中的时候

    DataFrame变量.foreachPartition

    每一次对每一个分区进行插入。不要一条记录插一次

    先关掉自动提交,然后批量插入。最后commit。

  1. 复用已有数据

    通用的dataframe,可以XXX.cache(),不用了再XXX.unpersist(ture)

14.4 参数优化

  1. 并行度

    spark.sql.shuffle.partitions

  2. 分区字段类型推测

    spark.sql.sources.partionColumnTypeInference.enabled

文章作者: IT小王
文章链接: https://wangbowen.cn/2020/04/28/Spark%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 IT小王

评论