02、Spark 教程 - Spark入门指南:基础概念

Spark-Submit

详细参数说明

参数名 参数说明
—master master 的地址,提交任务到哪里执行,例如 spark://host:port, yarn, local。具体指可参考下面关于Master_URL的列表
—deploy-mode 在本地 (client) 启动 driver 或在 cluster 上启动,默认是 client
—class 应用程序的主类,仅针对 java 或 scala 应用
—name 应用程序的名称
—jars 用逗号分隔的本地 jar 包,设置后,这些 jar 将包含在 driver 和 executor 的 classpath 下
—packages 包含在driver 和executor 的 classpath 中的 jar 的 maven 坐标
—exclude-packages 为了避免冲突 而指定不包含的 package
—repositories 远程 repository
—conf PROP=VALUE 指定 spark 配置属性的值, 例如 -conf spark.executor.extraJavaOptions=”-XX:MaxPermSize=256m”
—properties-file 加载的配置文件,默认为 conf/spark-defaults.conf
—driver-memory Driver内存,默认 1G
—driver-java-options 传给 driver 的额外的 Java 选项
—driver-library-path 传给 driver 的额外的库路径
—driver-class-path 传给 driver 的额外的类路径
—driver-cores Driver 的核数,默认是1。在 yarn 或者 standalone 下使用
—executor-memory 每个 executor 的内存,默认是1G
—total-executor-cores 所有 executor 总共的核数。仅仅在 mesos 或者 standalone 下使用
—num-executors 启动的 executor 数量。默认为2。在 yarn 下使用
—executor-core 每个 executor 的核数。在yarn或者standalone下使用

Master_URL的值

Master URL 含义
local 使用1个worker线程在本地运行Spark应用程序
local[K] 使用K个worker线程在本地运行Spark应用程序
local[*] 使用所有剩余worker线程在本地运行Spark应用程序
spark://HOST:PORT 连接到Spark Standalone集群,以便在该集群上运行Spark应用程序
mesos://HOST:PORT 连接到Mesos集群,以便在该集群上运行Spark应用程序
yarn-client 以client方式连接到YARN集群,集群的定位由环境变量HADOOP_CONF_DIR定义,该方式driver在client运行。
yarn-cluster 以cluster方式连接到YARN集群,集群的定位由环境变量HADOOP_CONF_DIR定义,该方式driver也在集群中运行。

Spark 共享变量

一般情况下,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本。

这些变量被复制到每台机器上,并且这些变量在远程机器上的所有更新都不会传递回驱动程序。通常跨任务的读写变量是低效的,所以,Spark提供了两种共享变量:「广播变量(broadcast variable)」和「累加器(accumulator)」。

广播变量

广播变量允许程序员缓存一个只读的变量在每台机器上面,而不是每个任务保存一份拷贝。说白了其实就是共享变量。

如果Executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。如果使用广播变量在每个Executor中只有一份Driver端的变量副本。

一个广播变量可以通过调用SparkContext.broadcast(v)方法从一个初始变量v中创建。广播变量是v的一个包装变量,它的值可以通过value方法访问,下面的代码说明了这个过程:

import org.apache.spark.{SparkConf, SparkContext}

object BroadcastExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Broadcast Example").setMaster("local")
    val sc = new SparkContext(conf)

    val data = sc.parallelize(List(1, 2, 3, 4, 5))

    // 创建一个广播变量
    val factor = sc.broadcast(2)

    // 使用广播变量
    val result = data.map(x => x * factor.value)

    result.collect().foreach(println)
  }
}

广播变量创建以后,我们就能够在集群的任何函数中使用它来代替变量v,这样我们就不需要再次传递变量v到每个节点上。另外,为了保证所有的节点得到广播变量具有相同的值,对象v不能在广播之后被修改

累加器

累加器是一种只能通过关联操作进行“加”操作的变量,因此它能够高效的应用于并行操作中。它们能够用来实现counters和sums。

一个累加器可以通过调用SparkContext.accumulator(v)方法从一个初始变量v中创建。运行在集群上的任务可以通过add方法或者使用+=操作来给它加值。然而,它们无法读取这个值。只有驱动程序可以使用value方法来读取累加器的值。

示例代码如下:

import org.apache.spark.{SparkConf, SparkContext}

object AccumulatorExample {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("AccumulatorExample")
    val sc = new SparkContext(conf)

    val accum = sc.longAccumulator("My Accumulator")

    sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))

    println(accum.value) // 输出 10
  }
}

这个示例中,我们创建了一个名为 My Accumulator 的累加器,并使用 sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x)) 来对其进行累加。最后,我们使用 println(accum.value) 来输出累加器的值,结果为 10

我们可以利用子类AccumulatorParam创建自己的累加器类型。AccumulatorParam接口有两个方法:zero方法为你的数据类型提供一个“0 值”(zero value),addInPlace方法计算两个值的和。例如,假设我们有一个Vector类代表数学上的向量,我们能够如下定义累加器:

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
  def zero(initialValue: Vector): Vector = {
    Vector.zeros(initialValue.size)
  }
  def addInPlace(v1: Vector, v2: Vector): Vector = {
    v1 += v2
  }
}
// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

Spark SQL

Spark为结构化数据处理引入了一个称为Spark SQL的编程模块。它提供了一个称为DataFrame的编程抽象,并且可以充当分布式SQL查询引擎。

Spark SQL的特性

  • 集成:无缝地将SQL查询与Spark程序混合。 Spark SQL允许将结构化数据作为Spark中的分布式数据集(RDD)进行查询,在Python,Scala和Java中集成了API。这种紧密的集成使得可以轻松地运行SQL查询以及复杂的分析算法。
  • Hive兼容性:在现有仓库上运行未修改的Hive查询。 Spark SQL重用了Hive前端和MetaStore,提供与现有Hive数据,查询和UDF的完全兼容性。只需将其与Hive一起安装即可。
  • 标准连接:通过JDBC或ODBC连接。 Spark SQL包括具有行业标准JDBC和ODBC连接的服务器模式。
  • 可扩展性:对于交互式查询和长查询使用相同的引擎。 Spark SQL利用RDD模型来支持中查询容错,使其能够扩展到大型作业。不要担心为历史数据使用不同的引擎。

Spark SQL 数据类型

Spark SQL 支持多种数据类型,包括数字类型、字符串类型、二进制类型、布尔类型、日期时间类型和区间类型等。

数字类型包括:

  • ByteType:代表一个字节的整数,范围是 -128 到 127¹²。
  • ShortType:代表两个字节的整数,范围是 -32768 到 32767¹²。
  • IntegerType:代表四个字节的整数,范围是 -2147483648 到 2147483647¹²。
  • LongType:代表八个字节的整数,范围是 -9223372036854775808 到 9223372036854775807¹²。
  • FloatType:代表四字节的单精度浮点数¹²。
  • DoubleType:代表八字节的双精度浮点数¹²。
  • DecimalType:代表任意精度的十进制数据,通过内部的 java.math.BigDecimal 支持。BigDecimal 由一个任意精度的整型非标度值和一个 32 位整数组成¹²。

字符串类型包括:

  • StringType:代表字符字符串值。

二进制类型包括:

  • BinaryType:代表字节序列值。

布尔类型包括:

  • BooleanType:代表布尔值。

日期时间类型包括:

  • TimestampType:代表包含字段年、月、日、时、分、秒的值,与会话本地时区相关。时间戳值表示绝对时间点。
  • DateType:代表包含字段年、月和日的值,不带时区。

区间类型包括:

  • YearMonthIntervalType (startField, endField):表示由以下字段组成的连续子集组成的年月间隔:MONTH(月份),YEAR(年份)。
  • DayTimeIntervalType (startField, endField):表示由以下字段组成的连续子集组成的日时间间隔:SECOND(秒),MINUTE(分钟),HOUR(小时),DAY(天)。

复合类型包括:

  • ArrayType (elementType, containsNull):代表由 elementType 类型元素组成的序列值。containsNull 用来指明 ArrayType 中的值是否有 null 值。
  • MapType (keyType, valueType, valueContainsNull):表示包括一组键值对的值。通过 keyType 表示 key 数据的类型,通过 valueType 表示 value 数据的类型。valueContainsNull 用来指明 MapType 中的值是否有 null 值。
  • StructType (fields):表示一个拥有 StructFields (fields) 序列结构的值。
  • StructField (name, dataType, nullable):代表 StructType 中的一个字段,字段的名字通过 name 指定,dataType 指定 field 的数据类型,nullable 表示字段的值是否有 null 值。

DataFrame

DataFrame 是 Spark 中用于处理结构化数据的一种数据结构。它类似于关系数据库中的表,具有行和列。每一列都有一个名称和一个类型,每一行都是一条记录。

DataFrame 支持多种数据源,包括结构化数据文件、Hive 表、外部数据库和现有的 RDD。它提供了丰富的操作,包括筛选、聚合、分组、排序等。

DataFrame 的优点在于它提供了一种高级的抽象,使得用户可以使用类似于 SQL 的语言进行数据处理,而无需关心底层的实现细节。此外,Spark 会自动对 DataFrame 进行优化,以提高查询性能。

下面是一个使用DataFrame的代码例子:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()
import spark.implicits._

val data = Seq(
  ("Alice", 25),
  ("Bob", 30),
  ("Charlie", 35)
)

val df = data.toDF("name", "age")

df.show()

在这个示例中,我们首先创建了一个 SparkSession 对象,然后使用 toDF 方法将一个序列转换为 DataFrame。最后,我们使用 show 方法来显示 DataFrame 的内容。

创建 DataFrame

在Scala 中,可以通过以下几种方式创建 DataFrame:

  • 从现有的 RDD 转换而来。例如:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("Create DataFrame").getOrCreate()
import spark.implicits._

case class Person(name: String, age: Int)

val rdd = spark.sparkContext.parallelize(Seq(Person("Alice", 25), Person("Bob", 30)))
val df = rdd.toDF()
df.show()

  • 从外部数据源读取。例如,从 JSON 文件中读取数据并创建 DataFrame:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("Create DataFrame").getOrCreate()

val df = spark.read.json("path/to/json/file")
df.show()

  • 通过编程方式创建。例如,使用 createDataFrame 方法:
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

val spark = SparkSession.builder.appName("Create DataFrame").getOrCreate()

val schema = StructType(
  List(
    StructField("name", StringType, nullable = true),
    StructField("age", IntegerType, nullable = true)
  )
)

val data = Seq(Row("Alice", 25), Row("Bob", 30))
val rdd = spark.sparkContext.parallelize(data)

val df = spark.createDataFrame(rdd, schema)
df.show()

DSL & SQL

在Spark 中,可以使用两种方式对 DataFrame 进行查询:「DSL(Domain-Specific Language)」和「 SQL」。

DSL是一种特定领域语言,它提供了一组用于操作 DataFrame 的方法。例如,下面是一个使用 DSL 进行查询的例子:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("DSL and SQL").getOrCreate()
import spark.implicits._

val df = Seq(
  ("Alice", 25),
  ("Bob", 30),
  ("Charlie", 35)
).toDF("name", "age")

df.select("name", "age")
  .filter($"age" > 25)
  .show()

SQL是一种结构化查询语言,它用于管理关系数据库系统。在 Spark 中,可以使用 SQL 对 DataFrame 进行查询。例如,下面是一个使用 SQL 进行查询的例子:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("DSL and SQL").getOrCreate()
import spark.implicits._

val df = Seq(
  ("Alice", 25),
  ("Bob", 30),
  ("Charlie", 35)
).toDF("name", "age")

df.createOrReplaceTempView("people")

spark.sql("SELECT name, age FROM people WHERE age > 25").show()

DSL和 SQL 的区别在于语法和风格。DSL 使用方法调用链来构建查询,而 SQL 使用声明式语言来描述查询。选择哪种方式取决于个人喜好和使用场景。

Spark SQL 数据源

Spark SQL 支持多种数据源,包括 Parquet、JSON、CSV、JDBC、Hive 等。

下面是示例代码:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("Data Sources Example").getOrCreate()
// Parquet
val df = spark.read.parquet("path/to/parquet/file")
// JSON 
val df = spark.read.json("path/to/json/file")
// CSV
val df = spark.read.option("header", "true").csv("path/to/csv/file")
// JDBC
val df = spark.read
  .format("jdbc")
  .option("url", "jdbc:mysql://host:port/database")
  .option("dbtable", "table")
  .option("user", "username")
  .option("password", "password")
  .load()

df.show()

load & save

在Spark 中,load 函数用于从外部数据源读取数据并创建 DataFrame,而 save 函数用于将 DataFrame 保存到外部数据源。

下面是从 Parquet 文件中读取数据并创建 DataFrame 的示例代码:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("Load and Save Example").getOrCreate()

val df = spark.read.load("path/to/parquet/file")
df.show()

下面是将 DataFrame 保存到 Parquet 文件的示例代码:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("Load and Save Example").getOrCreate()
import spark.implicits._

val df = Seq(
  ("Alice", 25),
  ("Bob", 30),
  ("Charlie", 35)
).toDF("name", "age")

df.write.save("path/to/parquet/file")

函数

Spark SQL 提供了丰富的内置函数,包括数学函数、字符串函数、日期时间函数、聚合函数等。你可以在 Spark SQL 的官方文档中查看所有可用的内置函数。

此外,Spark SQL 还支持「自定义函数(User-Defined Function,UDF)」,可以让用户编写自己的函数并在查询中使用。

下面是一个使用 SQL 语法编写自定义函数的示例代码:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf

val spark = SparkSession.builder.appName("UDF Example").getOrCreate()
import spark.implicits._

val df = Seq(
  ("Alice", 25),
  ("Bob", 30),
  ("Charlie", 35)
).toDF("name", "age")

df.createOrReplaceTempView("people")

val square = udf((x: Int) => x * x)
spark.udf.register("square", square)

spark.sql("SELECT name, square(age) FROM people").show()

在这个示例中,我们首先定义了一个名为 square 的自定义函数,它接受一个整数参数并返回它的平方。然后,我们使用 createOrReplaceTempView 方法创建一个临时视图,并使用 udf.register 方法注册自定义函数。

最后,我们使用 spark.sql 方法执行 SQL 查询,并在查询中调用自定义函数。

DataSet

DataSet 是 Spark 1.6 版本中引入的一种新的数据结构,它提供了 RDD 的强类型和 DataFrame 的查询优化能力。

创建DataSet

在Scala 中,可以通过以下几种方式创建 DataSet:

  • 从现有的 RDD 转换而来。例如:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("Create DataSet").getOrCreate()
import spark.implicits._

case class Person(name: String, age: Int)

val rdd = spark.sparkContext.parallelize(Seq(Person("Alice", 25), Person("Bob", 30)))
val ds = rdd.toDS()
ds.show()

  • 从外部数据源读取。例如,从 JSON 文件中读取数据并创建 DataSet:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("Create DataSet").getOrCreate()
import spark.implicits._

case class Person(name: String, age: Long)

val ds = spark.read.json("path/to/json/file").as[Person]
ds.show()

  • 通过编程方式创建。例如,使用 createDataset 方法:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("Create DataSet").getOrCreate()
import spark.implicits._

case class Person(name: String, age: Int)

val data = Seq(Person("Alice", 25), Person("Bob", 30))
val ds = spark.createDataset(data)
ds.show()

DataSet VS DataFrame

DataSet 和 DataFrame 都是 Spark 中用于处理结构化数据的数据结构。它们都提供了丰富的操作,包括筛选、聚合、分组、排序等。

它们之间的主要区别在于类型安全性。DataFrame 是一种弱类型的数据结构,它的列只有在运行时才能确定类型。这意味着,在编译时无法检测到类型错误,只有在运行时才会抛出异常。

而DataSet 是一种强类型的数据结构,它的类型在编译时就已经确定。这意味着,如果你试图对一个不存在的列进行操作,或者对一个列进行错误的类型转换,编译器就会报错。

此外,DataSet 还提供了一些额外的操作,例如 mapflatMapreduce 等。

RDD & DataFrame & Dataset 转化

RDD、DataFrame、Dataset三者有许多共性,有各自适用的场景常常需要在三者之间转换。

  • DataFrame/Dataset 转 RDD
val rdd1=testDF.rdd
val rdd2=testDS.rdd

  • RDD 转 DataSet
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = rdd.map {line=>
      Coltest(line._1,line._2)
    }.toDS

可以注意到,定义每一行的类型(case class)时,已经给出了字段名和类型,后面只要往case class里面添加值即可。

  • Dataset 转 DataFrame
import spark.implicits._
val testDF = testDS.toDF

  • DataFrame 转 Dataset
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = testDF.as[Coltest]

这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型在DataFrame需要针对各个字段处理时极为方便。

注意:在使用一些特殊的操作时,一定要加上 import spark.implicits._ 不然toDFtoDS无法使用。

Spark Streaming

Spark Streaming 的工作原理是将实时数据流拆分为小批量数据,并使用 Spark 引擎对这些小批量数据进行处理。这种微批处理(Micro-Batch Processing)的方式使得 Spark Streaming 能够以近乎实时的延迟处理大规模的数据流。

下面是一个简单的 Spark Streaming 示例代码:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

val conf = new SparkConf().setAppName("Spark Streaming Example")
val ssc = new StreamingContext(conf, Seconds(1))

val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.print()

ssc.start()
ssc.awaitTermination()

我们首先创建了一个 StreamingContext 对象,并指定了批处理间隔为 1 秒。然后,我们使用 socketTextStream 方法从套接字源创建了一个 DStream。接下来,我们对 DStream 进行了一系列操作,包括 flatMap、map 和 reduceByKey。最后,我们使用 print 方法打印出单词计数的结果。

Spark Streaming 优缺点

Spark Streaming 作为一种实时流处理框架,具有以下优点:

  • 高性能:Spark Streaming 基于 Spark 引擎,能够快速处理大规模的数据流。
  • 易用性:Spark Streaming 提供了丰富的 API,可以让开发人员快速构建实时流处理应用。
  • 容错性:Spark Streaming 具有良好的容错性,能够在节点故障时自动恢复。
  • 集成性:Spark Streaming 能够与 Spark 生态系统中的其他组件(如 Spark SQL、MLlib 等)无缝集成。

但是,Spark Streaming 也有一些缺点:

  • 延迟:由于 Spark Streaming 基于微批处理模型,因此它的延迟相对较高。对于需要极低延迟的应用场景,Spark Streaming 可能不是最佳选择。
  • 复杂性:Spark Streaming 的配置和调优相对复杂,需要一定的经验和技能。

DStream

DStream(离散化流)是 Spark Streaming 中用于表示实时数据流的一种抽象。它由一系列连续的 RDD 组成,每个 RDD 包含一段时间内收集到的数据。

在Spark Streaming 中,可以通过以下几种方式创建 DStream:

  • 从输入源创建。例如,从套接字源创建 DStream:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

val conf = new SparkConf().setAppName("DStream Example")
val ssc = new StreamingContext(conf, Seconds(1))

val lines = ssc.socketTextStream("localhost", 9999)
lines.print()

ssc.start()
ssc.awaitTermination()

  • 通过转换操作创建。例如,对现有的 DStream 进行 map 操作:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

val conf = new SparkConf().setAppName("DStream Example")
val ssc = new StreamingContext(conf, Seconds(1))

val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
words.print()

ssc.start()
ssc.awaitTermination()

  • 通过连接操作创建。例如,对两个 DStream 进行 union 操作:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

val conf = new SparkConf().setAppName("DStream Example")
val ssc = new StreamingContext(conf, Seconds(1))

val lines1 = ssc.socketTextStream("localhost", 9999)
val lines2 = ssc.socketTextStream("localhost", 9998)
val lines = lines1.union(lines2)
lines.print()

ssc.start()
ssc.awaitTermination()

总结:简单来说 DStream 就是对 RDD 的封装,你对 DStream 进行操作,就是对 RDD 进行操作。对于 DataFrame/DataSet/DStream 来说本质上都可以理解成 RDD。

窗口函数

在Spark Streaming 中,窗口函数用于对 DStream 中的数据进行窗口化处理。它允许你对一段时间内的数据进行聚合操作。

Spark Streaming 提供了多种窗口函数,包括:

  • window:返回一个新的 DStream,它包含了原始 DStream 中指定窗口大小和滑动间隔的数据。
  • countByWindow:返回一个新的单元素 DStream,它包含了原始 DStream 中指定窗口大小和滑动间隔的元素个数。
  • reduceByWindow:返回一个新的 DStream,它包含了原始 DStream 中指定窗口大小和滑动间隔的元素经过 reduce 函数处理后的结果。
  • reduceByKeyAndWindow:类似于 reduceByWindow,但是在进行 reduce 操作之前会先按照 key 进行分组。

下面是一个使用窗口函数的示例代码:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

val conf = new SparkConf().setAppName("Window Example")
val ssc = new StreamingContext(conf, Seconds(1))

val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(30), Seconds(10))

wordCounts.print()

ssc.start()
ssc.awaitTermination()

在这个示例中,我们首先创建了一个 DStream,并对其进行了一系列转换操作。然后,我们使用 reduceByKeyAndWindow 函数对 DStream 进行窗口化处理,指定了窗口大小为 30 秒,滑动间隔为 10 秒。最后,我们使用 print 方法打印出单词计数的结果。

输出操作

Spark Streaming允许DStream的数据输出到外部系统,如数据库或文件系统,输出的数据可以被外部系统所使用,该操作类似于RDD的输出操作。Spark Streaming支持以下输出操作:

  • **print() **: 打印DStream中每个RDD的前10个元素到控制台。
  • **saveAsTextFiles(prefix, [suffix] **: 将此DStream中每个RDD的所有元素以文本文件的形式保存。每个批次的数据都会保存在一个单独的目录中,目录名为:prefix-TIME_IN_MS[.suffix]。
  • **saveAsObjectFiles(prefix, [suffix])**: 将此DStream中每个RDD的所有元素以Java对象序列化的形式保存。每个批次的数据都会保存在一个单独的目录中,目录名为:prefix-TIME_IN_MS[.suffix]。
  • **saveAsHadoopFiles(prefix, [suffix])**:将此DStream中每个RDD的所有元素以Hadoop文件(SequenceFile等)的形式保存。每个批次的数据都会保存在一个单独的目录中,目录名为:prefix-TIME_IN_MS[.suffix]。
  • **foreachRDD(func)**:最通用的输出操作,将函数func应用于DStream中生成的每个RDD。通过此函数,可以将数据写入任何支持写入操作的数据源。

Structured Streaming

Structured Streaming 是 Spark 2.0 版本中引入的一种新的流处理引擎。它基于 Spark SQL 引擎,提供了一种声明式的 API 来处理结构化数据流。

与Spark Streaming 相比,Structured Streaming 具有以下优点:

  • 易用性:Structured Streaming 提供了与 Spark SQL 相同的 API,可以让开发人员快速构建流处理应用。
  • 高性能:Structured Streaming 基于 Spark SQL 引擎,能够快速处理大规模的数据流。
  • 容错性:Structured Streaming 具有良好的容错性,能够在节点故障时自动恢复。
  • 端到端一致性:Structured Streaming 提供了端到端一致性保证,能够确保数据不丢失、不重复。

下面是一个简单的 Structured Streaming 示例代码:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

import spark.implicits._

val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()

val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()

在这个示例中,我们首先创建了一个 SparkSession 对象。然后,我们使用 readStream 方法从套接字源创建了一个 DataFrame。接下来,我们对 DataFrame 进行了一系列操作,包括 flatMap、groupBy 和 count。最后,我们使用 writeStream 方法将结果输出到控制台。

Structured Streaming 同样支持 DSL 和 SQL 语法

DSL语法:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

import spark.implicits._

val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()

val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()

SQL语法:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

lines.createOrReplaceTempView("lines")

val wordCounts = spark.sql(
  """
    |SELECT value, COUNT(*) as count
    |FROM (
    |    SELECT explode(split(value, ' ')) as value
    |    FROM lines
    |)
    |GROUP BY value
  """.stripMargin)

val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()

Source

Structured Streaming 支持多种输入源,包括文件源(如文本文件、Parquet 文件、JSON 文件等)、Kafka、Socket 等。下面是一个使用 Scala 语言从 Kafka 中读取数据的例子:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()

// 订阅一个主题
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Output

Structured Streaming 支持多种输出方式,包括控制台输出、内存输出、文件输出、数据源输出等。下面是将数据写入到 Parquet 文件中的例子:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()

// 从 socket 中读取数据
val lines = spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// 将数据写入到 Parquet 文件中
lines.writeStream
  .format("parquet")
  .option("path", "path/to/output/dir")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .start()

Output Mode

每当结果表更新时,我们都希望将更改后的结果行写入外部接收器。

Output mode 指定了数据写入输出接收器的方式。Structured Streaming 支持以下三种 output mode:

Output Mode 描述
Append 只将流 DataFrame/Dataset 中的新行写入接收器。
Complete 每当有更新时,将流 DataFrame/Dataset 中的所有行写入接收器。
Update 每当有更新时,只将流 DataFrame/Dataset 中更新的行写入接收器。

Output Sink

Output sink 指定了数据写入的位置。Structured Streaming 支持多种输出接收器,包括文件接收器、Kafka 接收器、Foreach 接收器、控制台接收器和内存接收器等。下面是一些使用 Scala 语言将数据写入到不同输出接收器中的例子:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()

// 从 socket 中读取数据
val lines = spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// 将数据写入到 Parquet 文件中
lines.writeStream
  .format("parquet")
  .option("path", "path/to/output/dir")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .start()

// 将数据写入到 Kafka 中
//selectExpr 是一个 DataFrame 的转换操作,它允许你使用 SQL 表达式来选择 DataFrame 中的列。
//selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 表示选择 key 和 value 列,并将它们的类型转换为字符串类型。
//这是因为 Kafka 接收器要求数据必须是字符串类型或二进制类型。
lines.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

// 将数据写入到控制台中
lines.writeStream
  .format("console")
  .start()

// 将数据写入到内存中
lines.writeStream
  .format("memory")
  .queryName("tableName")
  .start()

PV,UV统计

下面是用Structured Streaming实现PV,UV统计的例子,我们来感受实战下:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object PVUVExample {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.appName("PVUVExample").getOrCreate()
    import spark.implicits._

    // 假设我们有一个包含用户ID和访问的URL的输入流
    val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
    val data = lines.as[String].map(line => {
      val parts = line.split(",")
      (parts(0), parts(1))
    }).toDF("user", "url")

    // 计算PV
    val pv = data.groupBy("url").count().withColumnRenamed("count", "pv")
    val pvQuery = pv.writeStream.outputMode("complete").format("console").start()

    // 计算UV
    val uv = data.dropDuplicates().groupBy("url").count().withColumnRenamed("count", "uv")
    val uvQuery = uv.writeStream.outputMode("complete").format("console").start()

    pvQuery.awaitTermination()
    uvQuery.awaitTermination()
  }
}

这段代码演示了如何使用Structured Streaming对数据进行PV和UV统计。它首先从一个socket源读取数据,然后使用groupBycount对数据进行PV统计,最后使用dropDuplicatesgroupBycount对数据进行UV统计。

假设我们在本地启动了一个socket服务器,并向其发送以下数据:

user1,http://example.com/page1
user2,http://example.com/page1
user1,http://example.com/page2
user3,http://example.com/page1
user2,http://example.com/page2
user3,http://example.com/page2

那么程序将输出以下结果:

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---+
|                 url| pv|
+--------------------+---+
|http://example.co...|  3|
|http://example.co...|  3|
+--------------------+---+

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---+
|                 url| uv|
+--------------------+---+
|http://example.co...|  2|
|http://example.co...|  3|
+--------------------+---+

总结

在此,我们对Spark的基本概念、使用方式以及部分原理进行了简单的介绍。Spark以其强大的处理能力和灵活性,已经成为大数据处理领域的一个重要工具。然而,这只是冰山一角。Spark的世界里还有许多深度和广度等待着我们去探索。

作为初学者,你可能会觉得这个领域庞大且复杂。但请记住,每个都是从初学者开始的。不断的学习和实践,你将能够更好的理解和掌握Spark,并将其应用于解决实际问题。这篇文章可能不能涵盖所有的知识点,但我希望它能带给你收获和思考。