Spark-Submit
详细参数说明
参数名 | 参数说明 |
---|---|
—master | master 的地址,提交任务到哪里执行,例如 spark://host:port, yarn, local。具体指可参考下面关于Master_URL的列表 |
—deploy-mode | 在本地 (client) 启动 driver 或在 cluster 上启动,默认是 client |
—class | 应用程序的主类,仅针对 java 或 scala 应用 |
—name | 应用程序的名称 |
—jars | 用逗号分隔的本地 jar 包,设置后,这些 jar 将包含在 driver 和 executor 的 classpath 下 |
—packages | 包含在driver 和executor 的 classpath 中的 jar 的 maven 坐标 |
—exclude-packages | 为了避免冲突 而指定不包含的 package |
—repositories | 远程 repository |
—conf PROP=VALUE | 指定 spark 配置属性的值, 例如 -conf spark.executor.extraJavaOptions=”-XX:MaxPermSize=256m” |
—properties-file | 加载的配置文件,默认为 conf/spark-defaults.conf |
—driver-memory | Driver内存,默认 1G |
—driver-java-options | 传给 driver 的额外的 Java 选项 |
—driver-library-path | 传给 driver 的额外的库路径 |
—driver-class-path | 传给 driver 的额外的类路径 |
—driver-cores | Driver 的核数,默认是1。在 yarn 或者 standalone 下使用 |
—executor-memory | 每个 executor 的内存,默认是1G |
—total-executor-cores | 所有 executor 总共的核数。仅仅在 mesos 或者 standalone 下使用 |
—num-executors | 启动的 executor 数量。默认为2。在 yarn 下使用 |
—executor-core | 每个 executor 的核数。在yarn或者standalone下使用 |
Master_URL的值
Master URL | 含义 |
---|---|
local | 使用1个worker线程在本地运行Spark应用程序 |
local[K] | 使用K个worker线程在本地运行Spark应用程序 |
local[*] | 使用所有剩余worker线程在本地运行Spark应用程序 |
spark://HOST:PORT | 连接到Spark Standalone集群,以便在该集群上运行Spark应用程序 |
mesos://HOST:PORT | 连接到Mesos集群,以便在该集群上运行Spark应用程序 |
yarn-client | 以client方式连接到YARN集群,集群的定位由环境变量HADOOP_CONF_DIR定义,该方式driver在client运行。 |
yarn-cluster | 以cluster方式连接到YARN集群,集群的定位由环境变量HADOOP_CONF_DIR定义,该方式driver也在集群中运行。 |
Spark 共享变量
一般情况下,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本。
这些变量被复制到每台机器上,并且这些变量在远程机器上的所有更新都不会传递回驱动程序。通常跨任务的读写变量是低效的,所以,Spark提供了两种共享变量:「广播变量(broadcast variable)」和「累加器(accumulator)」。
广播变量
广播变量允许程序员缓存一个只读的变量在每台机器上面,而不是每个任务保存一份拷贝。说白了其实就是共享变量。
如果Executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。如果使用广播变量在每个Executor中只有一份Driver端的变量副本。
一个广播变量可以通过调用SparkContext.broadcast(v)
方法从一个初始变量v中创建。广播变量是v的一个包装变量,它的值可以通过value方法访问,下面的代码说明了这个过程:
import org.apache.spark.{SparkConf, SparkContext}
object BroadcastExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Broadcast Example").setMaster("local")
val sc = new SparkContext(conf)
val data = sc.parallelize(List(1, 2, 3, 4, 5))
// 创建一个广播变量
val factor = sc.broadcast(2)
// 使用广播变量
val result = data.map(x => x * factor.value)
result.collect().foreach(println)
}
}
广播变量创建以后,我们就能够在集群的任何函数中使用它来代替变量v,这样我们就不需要再次传递变量v到每个节点上。另外,为了保证所有的节点得到广播变量具有相同的值,对象v不能在广播之后被修改。
累加器
累加器是一种只能通过关联操作进行“加”操作的变量,因此它能够高效的应用于并行操作中。它们能够用来实现counters和sums。
一个累加器可以通过调用SparkContext.accumulator(v)
方法从一个初始变量v中创建。运行在集群上的任务可以通过add方法或者使用+=
操作来给它加值。然而,它们无法读取这个值。只有驱动程序可以使用value方法来读取累加器的值。
示例代码如下:
import org.apache.spark.{SparkConf, SparkContext}
object AccumulatorExample {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("AccumulatorExample")
val sc = new SparkContext(conf)
val accum = sc.longAccumulator("My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
println(accum.value) // 输出 10
}
}
这个示例中,我们创建了一个名为 My Accumulator
的累加器,并使用 sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
来对其进行累加。最后,我们使用 println(accum.value)
来输出累加器的值,结果为 10
。
我们可以利用子类AccumulatorParam
创建自己的累加器类型。AccumulatorParam接口有两个方法:zero方法为你的数据类型提供一个“0 值”(zero value),addInPlace方法计算两个值的和。例如,假设我们有一个Vector类代表数学上的向量,我们能够如下定义累加器:
object VectorAccumulatorParam extends AccumulatorParam[Vector] {
def zero(initialValue: Vector): Vector = {
Vector.zeros(initialValue.size)
}
def addInPlace(v1: Vector, v2: Vector): Vector = {
v1 += v2
}
}
// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
Spark SQL
Spark为结构化数据处理引入了一个称为Spark SQL的编程模块。它提供了一个称为DataFrame的编程抽象,并且可以充当分布式SQL查询引擎。
Spark SQL的特性
- 集成:无缝地将SQL查询与Spark程序混合。 Spark SQL允许将结构化数据作为Spark中的分布式数据集(RDD)进行查询,在Python,Scala和Java中集成了API。这种紧密的集成使得可以轻松地运行SQL查询以及复杂的分析算法。
- Hive兼容性:在现有仓库上运行未修改的Hive查询。 Spark SQL重用了Hive前端和MetaStore,提供与现有Hive数据,查询和UDF的完全兼容性。只需将其与Hive一起安装即可。
- 标准连接:通过JDBC或ODBC连接。 Spark SQL包括具有行业标准JDBC和ODBC连接的服务器模式。
- 可扩展性:对于交互式查询和长查询使用相同的引擎。 Spark SQL利用RDD模型来支持中查询容错,使其能够扩展到大型作业。不要担心为历史数据使用不同的引擎。
Spark SQL 数据类型
Spark SQL 支持多种数据类型,包括数字类型、字符串类型、二进制类型、布尔类型、日期时间类型和区间类型等。
数字类型包括:
- ByteType:代表一个字节的整数,范围是 -128 到 127¹²。
- ShortType:代表两个字节的整数,范围是 -32768 到 32767¹²。
- IntegerType:代表四个字节的整数,范围是 -2147483648 到 2147483647¹²。
- LongType:代表八个字节的整数,范围是 -9223372036854775808 到 9223372036854775807¹²。
- FloatType:代表四字节的单精度浮点数¹²。
- DoubleType:代表八字节的双精度浮点数¹²。
- DecimalType:代表任意精度的十进制数据,通过内部的 java.math.BigDecimal 支持。BigDecimal 由一个任意精度的整型非标度值和一个 32 位整数组成¹²。
字符串类型包括:
- StringType:代表字符字符串值。
二进制类型包括:
- BinaryType:代表字节序列值。
布尔类型包括:
- BooleanType:代表布尔值。
日期时间类型包括:
- TimestampType:代表包含字段年、月、日、时、分、秒的值,与会话本地时区相关。时间戳值表示绝对时间点。
- DateType:代表包含字段年、月和日的值,不带时区。
区间类型包括:
- YearMonthIntervalType (startField, endField):表示由以下字段组成的连续子集组成的年月间隔:MONTH(月份),YEAR(年份)。
- DayTimeIntervalType (startField, endField):表示由以下字段组成的连续子集组成的日时间间隔:SECOND(秒),MINUTE(分钟),HOUR(小时),DAY(天)。
复合类型包括:
- ArrayType (elementType, containsNull):代表由 elementType 类型元素组成的序列值。containsNull 用来指明 ArrayType 中的值是否有 null 值。
- MapType (keyType, valueType, valueContainsNull):表示包括一组键值对的值。通过 keyType 表示 key 数据的类型,通过 valueType 表示 value 数据的类型。valueContainsNull 用来指明 MapType 中的值是否有 null 值。
- StructType (fields):表示一个拥有 StructFields (fields) 序列结构的值。
- StructField (name, dataType, nullable):代表 StructType 中的一个字段,字段的名字通过 name 指定,dataType 指定 field 的数据类型,nullable 表示字段的值是否有 null 值。
DataFrame
DataFrame 是 Spark 中用于处理结构化数据的一种数据结构。它类似于关系数据库中的表,具有行和列。每一列都有一个名称和一个类型,每一行都是一条记录。
DataFrame 支持多种数据源,包括结构化数据文件、Hive 表、外部数据库和现有的 RDD。它提供了丰富的操作,包括筛选、聚合、分组、排序等。
DataFrame 的优点在于它提供了一种高级的抽象,使得用户可以使用类似于 SQL 的语言进行数据处理,而无需关心底层的实现细节。此外,Spark 会自动对 DataFrame 进行优化,以提高查询性能。
下面是一个使用DataFrame的代码例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()
import spark.implicits._
val data = Seq(
("Alice", 25),
("Bob", 30),
("Charlie", 35)
)
val df = data.toDF("name", "age")
df.show()
在这个示例中,我们首先创建了一个 SparkSession
对象,然后使用 toDF
方法将一个序列转换为 DataFrame。最后,我们使用 show
方法来显示 DataFrame 的内容。
创建 DataFrame
在Scala 中,可以通过以下几种方式创建 DataFrame:
- 从现有的 RDD 转换而来。例如:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Create DataFrame").getOrCreate()
import spark.implicits._
case class Person(name: String, age: Int)
val rdd = spark.sparkContext.parallelize(Seq(Person("Alice", 25), Person("Bob", 30)))
val df = rdd.toDF()
df.show()
- 从外部数据源读取。例如,从 JSON 文件中读取数据并创建 DataFrame:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Create DataFrame").getOrCreate()
val df = spark.read.json("path/to/json/file")
df.show()
- 通过编程方式创建。例如,使用 createDataFrame 方法:
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
val spark = SparkSession.builder.appName("Create DataFrame").getOrCreate()
val schema = StructType(
List(
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true)
)
)
val data = Seq(Row("Alice", 25), Row("Bob", 30))
val rdd = spark.sparkContext.parallelize(data)
val df = spark.createDataFrame(rdd, schema)
df.show()
DSL & SQL
在Spark 中,可以使用两种方式对 DataFrame 进行查询:「DSL(Domain-Specific Language)」和「 SQL」。
DSL是一种特定领域语言,它提供了一组用于操作 DataFrame 的方法。例如,下面是一个使用 DSL 进行查询的例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("DSL and SQL").getOrCreate()
import spark.implicits._
val df = Seq(
("Alice", 25),
("Bob", 30),
("Charlie", 35)
).toDF("name", "age")
df.select("name", "age")
.filter($"age" > 25)
.show()
SQL是一种结构化查询语言,它用于管理关系数据库系统。在 Spark 中,可以使用 SQL 对 DataFrame 进行查询。例如,下面是一个使用 SQL 进行查询的例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("DSL and SQL").getOrCreate()
import spark.implicits._
val df = Seq(
("Alice", 25),
("Bob", 30),
("Charlie", 35)
).toDF("name", "age")
df.createOrReplaceTempView("people")
spark.sql("SELECT name, age FROM people WHERE age > 25").show()
DSL和 SQL 的区别在于语法和风格。DSL 使用方法调用链来构建查询,而 SQL 使用声明式语言来描述查询。选择哪种方式取决于个人喜好和使用场景。
Spark SQL 数据源
Spark SQL 支持多种数据源,包括 Parquet、JSON、CSV、JDBC、Hive 等。
下面是示例代码:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Data Sources Example").getOrCreate()
// Parquet
val df = spark.read.parquet("path/to/parquet/file")
// JSON
val df = spark.read.json("path/to/json/file")
// CSV
val df = spark.read.option("header", "true").csv("path/to/csv/file")
// JDBC
val df = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://host:port/database")
.option("dbtable", "table")
.option("user", "username")
.option("password", "password")
.load()
df.show()
load & save
在Spark 中,load
函数用于从外部数据源读取数据并创建 DataFrame,而 save
函数用于将 DataFrame 保存到外部数据源。
下面是从 Parquet 文件中读取数据并创建 DataFrame 的示例代码:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Load and Save Example").getOrCreate()
val df = spark.read.load("path/to/parquet/file")
df.show()
下面是将 DataFrame 保存到 Parquet 文件的示例代码:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Load and Save Example").getOrCreate()
import spark.implicits._
val df = Seq(
("Alice", 25),
("Bob", 30),
("Charlie", 35)
).toDF("name", "age")
df.write.save("path/to/parquet/file")
函数
Spark SQL 提供了丰富的内置函数,包括数学函数、字符串函数、日期时间函数、聚合函数等。你可以在 Spark SQL 的官方文档中查看所有可用的内置函数。
此外,Spark SQL 还支持「自定义函数(User-Defined Function,UDF)」,可以让用户编写自己的函数并在查询中使用。
下面是一个使用 SQL 语法编写自定义函数的示例代码:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
val spark = SparkSession.builder.appName("UDF Example").getOrCreate()
import spark.implicits._
val df = Seq(
("Alice", 25),
("Bob", 30),
("Charlie", 35)
).toDF("name", "age")
df.createOrReplaceTempView("people")
val square = udf((x: Int) => x * x)
spark.udf.register("square", square)
spark.sql("SELECT name, square(age) FROM people").show()
在这个示例中,我们首先定义了一个名为 square
的自定义函数,它接受一个整数参数并返回它的平方。然后,我们使用 createOrReplaceTempView
方法创建一个临时视图,并使用 udf.register
方法注册自定义函数。
最后,我们使用 spark.sql
方法执行 SQL 查询,并在查询中调用自定义函数。
DataSet
DataSet 是 Spark 1.6 版本中引入的一种新的数据结构,它提供了 RDD 的强类型和 DataFrame 的查询优化能力。
创建DataSet
在Scala 中,可以通过以下几种方式创建 DataSet:
- 从现有的 RDD 转换而来。例如:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Create DataSet").getOrCreate()
import spark.implicits._
case class Person(name: String, age: Int)
val rdd = spark.sparkContext.parallelize(Seq(Person("Alice", 25), Person("Bob", 30)))
val ds = rdd.toDS()
ds.show()
- 从外部数据源读取。例如,从 JSON 文件中读取数据并创建 DataSet:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Create DataSet").getOrCreate()
import spark.implicits._
case class Person(name: String, age: Long)
val ds = spark.read.json("path/to/json/file").as[Person]
ds.show()
- 通过编程方式创建。例如,使用 createDataset 方法:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Create DataSet").getOrCreate()
import spark.implicits._
case class Person(name: String, age: Int)
val data = Seq(Person("Alice", 25), Person("Bob", 30))
val ds = spark.createDataset(data)
ds.show()
DataSet VS DataFrame
DataSet 和 DataFrame 都是 Spark 中用于处理结构化数据的数据结构。它们都提供了丰富的操作,包括筛选、聚合、分组、排序等。
它们之间的主要区别在于类型安全性。DataFrame 是一种弱类型的数据结构,它的列只有在运行时才能确定类型。这意味着,在编译时无法检测到类型错误,只有在运行时才会抛出异常。
而DataSet 是一种强类型的数据结构,它的类型在编译时就已经确定。这意味着,如果你试图对一个不存在的列进行操作,或者对一个列进行错误的类型转换,编译器就会报错。
此外,DataSet 还提供了一些额外的操作,例如 map
、flatMap
、reduce
等。
RDD & DataFrame & Dataset 转化
RDD、DataFrame、Dataset三者有许多共性,有各自适用的场景常常需要在三者之间转换。
- DataFrame/Dataset 转 RDD
val rdd1=testDF.rdd
val rdd2=testDS.rdd
- RDD 转 DataSet
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = rdd.map {line=>
Coltest(line._1,line._2)
}.toDS
可以注意到,定义每一行的类型(case class)时,已经给出了字段名和类型,后面只要往case class里面添加值即可。
- Dataset 转 DataFrame
import spark.implicits._
val testDF = testDS.toDF
- DataFrame 转 Dataset
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = testDF.as[Coltest]
这种方法就是在给出每一列的类型后,使用as
方法,转成Dataset,这在数据类型在DataFrame需要针对各个字段处理时极为方便。
注意:在使用一些特殊的操作时,一定要加上 import spark.implicits._
不然toDF
、toDS
无法使用。
Spark Streaming
Spark Streaming 的工作原理是将实时数据流拆分为小批量数据,并使用 Spark 引擎对这些小批量数据进行处理。这种微批处理(Micro-Batch Processing)的方式使得 Spark Streaming 能够以近乎实时的延迟处理大规模的数据流。
下面是一个简单的 Spark Streaming 示例代码:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("Spark Streaming Example")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
我们首先创建了一个 StreamingContext
对象,并指定了批处理间隔为 1 秒。然后,我们使用 socketTextStream
方法从套接字源创建了一个 DStream。接下来,我们对 DStream 进行了一系列操作,包括 flatMap、map 和 reduceByKey。最后,我们使用 print
方法打印出单词计数的结果。
Spark Streaming 优缺点
Spark Streaming 作为一种实时流处理框架,具有以下优点:
- 高性能:Spark Streaming 基于 Spark 引擎,能够快速处理大规模的数据流。
- 易用性:Spark Streaming 提供了丰富的 API,可以让开发人员快速构建实时流处理应用。
- 容错性:Spark Streaming 具有良好的容错性,能够在节点故障时自动恢复。
- 集成性:Spark Streaming 能够与 Spark 生态系统中的其他组件(如 Spark SQL、MLlib 等)无缝集成。
但是,Spark Streaming 也有一些缺点:
- 延迟:由于 Spark Streaming 基于微批处理模型,因此它的延迟相对较高。对于需要极低延迟的应用场景,Spark Streaming 可能不是最佳选择。
- 复杂性:Spark Streaming 的配置和调优相对复杂,需要一定的经验和技能。
DStream
DStream(离散化流)是 Spark Streaming 中用于表示实时数据流的一种抽象。它由一系列连续的 RDD 组成,每个 RDD 包含一段时间内收集到的数据。
在Spark Streaming 中,可以通过以下几种方式创建 DStream:
- 从输入源创建。例如,从套接字源创建 DStream:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("DStream Example")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
lines.print()
ssc.start()
ssc.awaitTermination()
- 通过转换操作创建。例如,对现有的 DStream 进行 map 操作:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("DStream Example")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
words.print()
ssc.start()
ssc.awaitTermination()
- 通过连接操作创建。例如,对两个 DStream 进行 union 操作:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("DStream Example")
val ssc = new StreamingContext(conf, Seconds(1))
val lines1 = ssc.socketTextStream("localhost", 9999)
val lines2 = ssc.socketTextStream("localhost", 9998)
val lines = lines1.union(lines2)
lines.print()
ssc.start()
ssc.awaitTermination()
总结:简单来说 DStream 就是对 RDD 的封装,你对 DStream 进行操作,就是对 RDD 进行操作。对于 DataFrame/DataSet/DStream 来说本质上都可以理解成 RDD。
窗口函数
在Spark Streaming 中,窗口函数用于对 DStream 中的数据进行窗口化处理。它允许你对一段时间内的数据进行聚合操作。
Spark Streaming 提供了多种窗口函数,包括:
- window:返回一个新的 DStream,它包含了原始 DStream 中指定窗口大小和滑动间隔的数据。
- countByWindow:返回一个新的单元素 DStream,它包含了原始 DStream 中指定窗口大小和滑动间隔的元素个数。
- reduceByWindow:返回一个新的 DStream,它包含了原始 DStream 中指定窗口大小和滑动间隔的元素经过 reduce 函数处理后的结果。
- reduceByKeyAndWindow:类似于 reduceByWindow,但是在进行 reduce 操作之前会先按照 key 进行分组。
下面是一个使用窗口函数的示例代码:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("Window Example")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(30), Seconds(10))
wordCounts.print()
ssc.start()
ssc.awaitTermination()
在这个示例中,我们首先创建了一个 DStream,并对其进行了一系列转换操作。然后,我们使用 reduceByKeyAndWindow
函数对 DStream 进行窗口化处理,指定了窗口大小为 30 秒,滑动间隔为 10 秒。最后,我们使用 print
方法打印出单词计数的结果。
输出操作
Spark Streaming允许DStream的数据输出到外部系统,如数据库或文件系统,输出的数据可以被外部系统所使用,该操作类似于RDD的输出操作。Spark Streaming支持以下输出操作:
- **print() **: 打印DStream中每个RDD的前10个元素到控制台。
- **saveAsTextFiles(prefix, [suffix] **: 将此DStream中每个RDD的所有元素以文本文件的形式保存。每个批次的数据都会保存在一个单独的目录中,目录名为:prefix-TIME_IN_MS[.suffix]。
- **saveAsObjectFiles(prefix, [suffix])**: 将此DStream中每个RDD的所有元素以Java对象序列化的形式保存。每个批次的数据都会保存在一个单独的目录中,目录名为:prefix-TIME_IN_MS[.suffix]。
- **saveAsHadoopFiles(prefix, [suffix])**:将此DStream中每个RDD的所有元素以Hadoop文件(SequenceFile等)的形式保存。每个批次的数据都会保存在一个单独的目录中,目录名为:prefix-TIME_IN_MS[.suffix]。
- **foreachRDD(func)**:最通用的输出操作,将函数func应用于DStream中生成的每个RDD。通过此函数,可以将数据写入任何支持写入操作的数据源。
Structured Streaming
Structured Streaming 是 Spark 2.0 版本中引入的一种新的流处理引擎。它基于 Spark SQL 引擎,提供了一种声明式的 API 来处理结构化数据流。
与Spark Streaming 相比,Structured Streaming 具有以下优点:
- 易用性:Structured Streaming 提供了与 Spark SQL 相同的 API,可以让开发人员快速构建流处理应用。
- 高性能:Structured Streaming 基于 Spark SQL 引擎,能够快速处理大规模的数据流。
- 容错性:Structured Streaming 具有良好的容错性,能够在节点故障时自动恢复。
- 端到端一致性:Structured Streaming 提供了端到端一致性保证,能够确保数据不丢失、不重复。
下面是一个简单的 Structured Streaming 示例代码:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
import spark.implicits._
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
在这个示例中,我们首先创建了一个 SparkSession
对象。然后,我们使用 readStream
方法从套接字源创建了一个 DataFrame。接下来,我们对 DataFrame 进行了一系列操作,包括 flatMap、groupBy 和 count。最后,我们使用 writeStream
方法将结果输出到控制台。
Structured Streaming 同样支持 DSL 和 SQL 语法。
DSL语法:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
import spark.implicits._
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
SQL语法:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
lines.createOrReplaceTempView("lines")
val wordCounts = spark.sql(
"""
|SELECT value, COUNT(*) as count
|FROM (
| SELECT explode(split(value, ' ')) as value
| FROM lines
|)
|GROUP BY value
""".stripMargin)
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
Source
Structured Streaming 支持多种输入源,包括文件源(如文本文件、Parquet 文件、JSON 文件等)、Kafka、Socket 等。下面是一个使用 Scala 语言从 Kafka 中读取数据的例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()
// 订阅一个主题
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Output
Structured Streaming 支持多种输出方式,包括控制台输出、内存输出、文件输出、数据源输出等。下面是将数据写入到 Parquet 文件中的例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()
// 从 socket 中读取数据
val lines = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// 将数据写入到 Parquet 文件中
lines.writeStream
.format("parquet")
.option("path", "path/to/output/dir")
.option("checkpointLocation", "path/to/checkpoint/dir")
.start()
Output Mode
每当结果表更新时,我们都希望将更改后的结果行写入外部接收器。
Output mode 指定了数据写入输出接收器的方式。Structured Streaming 支持以下三种 output mode:
Output Mode | 描述 |
---|---|
Append | 只将流 DataFrame/Dataset 中的新行写入接收器。 |
Complete | 每当有更新时,将流 DataFrame/Dataset 中的所有行写入接收器。 |
Update | 每当有更新时,只将流 DataFrame/Dataset 中更新的行写入接收器。 |
Output Sink
Output sink 指定了数据写入的位置。Structured Streaming 支持多种输出接收器,包括文件接收器、Kafka 接收器、Foreach 接收器、控制台接收器和内存接收器等。下面是一些使用 Scala 语言将数据写入到不同输出接收器中的例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()
// 从 socket 中读取数据
val lines = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// 将数据写入到 Parquet 文件中
lines.writeStream
.format("parquet")
.option("path", "path/to/output/dir")
.option("checkpointLocation", "path/to/checkpoint/dir")
.start()
// 将数据写入到 Kafka 中
//selectExpr 是一个 DataFrame 的转换操作,它允许你使用 SQL 表达式来选择 DataFrame 中的列。
//selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 表示选择 key 和 value 列,并将它们的类型转换为字符串类型。
//这是因为 Kafka 接收器要求数据必须是字符串类型或二进制类型。
lines.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
// 将数据写入到控制台中
lines.writeStream
.format("console")
.start()
// 将数据写入到内存中
lines.writeStream
.format("memory")
.queryName("tableName")
.start()
PV,UV统计
下面是用Structured Streaming实现PV,UV统计的例子,我们来感受实战下:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object PVUVExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("PVUVExample").getOrCreate()
import spark.implicits._
// 假设我们有一个包含用户ID和访问的URL的输入流
val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
val data = lines.as[String].map(line => {
val parts = line.split(",")
(parts(0), parts(1))
}).toDF("user", "url")
// 计算PV
val pv = data.groupBy("url").count().withColumnRenamed("count", "pv")
val pvQuery = pv.writeStream.outputMode("complete").format("console").start()
// 计算UV
val uv = data.dropDuplicates().groupBy("url").count().withColumnRenamed("count", "uv")
val uvQuery = uv.writeStream.outputMode("complete").format("console").start()
pvQuery.awaitTermination()
uvQuery.awaitTermination()
}
}
这段代码演示了如何使用Structured Streaming对数据进行PV和UV统计。它首先从一个socket源读取数据,然后使用groupBy
和count
对数据进行PV统计,最后使用dropDuplicates
、groupBy
和count
对数据进行UV统计。
假设我们在本地启动了一个socket服务器,并向其发送以下数据:
user1,http://example.com/page1
user2,http://example.com/page1
user1,http://example.com/page2
user3,http://example.com/page1
user2,http://example.com/page2
user3,http://example.com/page2
那么程序将输出以下结果:
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---+
| url| pv|
+--------------------+---+
|http://example.co...| 3|
|http://example.co...| 3|
+--------------------+---+
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---+
| url| uv|
+--------------------+---+
|http://example.co...| 2|
|http://example.co...| 3|
+--------------------+---+
总结
在此,我们对Spark的基本概念、使用方式以及部分原理进行了简单的介绍。Spark以其强大的处理能力和灵活性,已经成为大数据处理领域的一个重要工具。然而,这只是冰山一角。Spark的世界里还有许多深度和广度等待着我们去探索。
作为初学者,你可能会觉得这个领域庞大且复杂。但请记住,每个都是从初学者开始的。不断的学习和实践,你将能够更好的理解和掌握Spark,并将其应用于解决实际问题。这篇文章可能不能涵盖所有的知识点,但我希望它能带给你收获和思考。