01、Flink 实战 - Flink DataStream 创建数据源 转换算子

1. 前言

从前年开始,就被公众号上Flink文章频繁的刷屏,看来是时候了解下Flink了。

Flink官网第一句话介绍是数据流上的有状态计算。

我第一眼看这句话感觉很拗口,什么是流上的计算?什么是有状态?

作为菜鸟,我觉的学习Flink最好方法是看官网并敲代码实践,不会的百度些博客学学。

2. 创建Flink项目

废话不多说,我们来创建一个Flink项目吧。关于“Flink是什么”,“Flink应用场景”,“Flink安装部署”,“Flink架构原理”等话题,我感觉网上好的博客很多了,我默认此时你至少了解过Flink并安装过Flink吧,不然怎么会搜到我这篇博客?

2.1 在cmd窗口创建

打开cmd命令窗口,输入如下命令

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.10.1 -DarchetypeCatalog=internal

如果你有强迫症,你看不惯一行命令这么长,你可以粘贴下面的。
我的是win10系统,命令以^换行,如果你是Linux系统,要以\换行。

mvn archetype:generate ^
-DarchetypeGroupId=org.apache.flink ^
-DarchetypeArtifactId=flink-quickstart-java ^
-DarchetypeVersion=1.10.1 ^
-DarchetypeCatalog=internal

执行中途,它会询问你输入groupId和artifactId,然后一路回车输入Y,项目就创建好了。
 

2.2 WordCount例子

如果你项目创建成功,会有如下代码结构,然后再创建个BatchWordCount类,贴入下面Java代码。
 

package com.pigg.test01;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCount {
   
     

    public static void main(String[] args) throws Exception {
   
     

        //第1步:创建执行环境
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //第2步:创建数据源
        DataSource<String> lines = env.fromElements("I love coding", "I love flink", "study flink");

        //第3步:处理数据
        // 3.1:将每一行按照空格切分,并组成(word, 1)
        FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
   
     

            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
   
     
                String[] words = line.split(" ");

                for (String word : words) {
   
     
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        // 3.2:按照第一个字段分组,并按照第二个字段求和
        AggregateOperator<Tuple2<String, Integer>> result = wordAndOne.groupBy(0).sum(1);

        //第4步:输出打印到控制台
        result.print();

        //第5步:触发执行job,如果是实时流计算这是要的,这里是批处理,也可以不加
        env.execute("BatchWordCount");
    }
}

不要深究代码的意思,我们只是先了解下写一个Flink程序的代码结构。运行main方法时,如果报如下错误:

NoClassDefFoundError: org/apache/flink/api/common/functions/FlatMapFunction

你需要把如下选项勾上
 
执行结果为:

(flink,2)
(love,2)
(I,2)
(coding,1)
(study,1)

2.3 分析Flink程序代码结构

在上面程序中,我注释了5个步骤,Flink的Job程序也基本分5个步骤:

  • 第1步:创建执行环境
  • 第2步:创建数据源
  • 第3步:处理数据流
  • 第4步:输出结果到指定位置
  • 第5步:触发执行Job

下面我们来逐步学习这5个步骤,其中第3步最为关键,是本博客的重点,更是平时开发的核心。

3. 第1步:创建执行环境

Flink程序最开始都是要创建执行环境,它会自动根据不同的运行场景创建对应的执行环境。

  • 如果你在IDEA里运行main方法,Flink创建的是本地执行环境
  • 如果你把程序打成jar包,提交到Flink集群上执行,Flink创建的是集群执行环境

创建执行环境很简单,就一句话:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//流数据源
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

4. 第2步:创建数据源

创建数据源是指定义我们的Flink程序从哪里获取数据。
如果是实时计算,一般工作中我们都是从Kafka中获取数据。
如果是跑批处理,一般是从txt,csv,hdfs上读取数据。
还记得一开始说的Flink自我介绍数据流上的有状态计算这句话吗?
我认为很有必要先理解下什么是流?

4.1 什么是流?

对Flink而言,不管是不停采集新增的事件还是已经固定大小的数据集合,它们都是流数据,只不过根据它们是否有界限,分为无界流和有界流。
 

4.1.1 无界流

例如温度传感器,它把采集的温度数据不停的推送到后台给Flink计算,如果触发某个规则,则报警。

无界流有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。

4.1.2 有界流

例如上1个月每天A股指数收盘的数据集合,这种有界的数据可以称为有界流。

有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。

4.2 从指定的数据集合创建流(一般测试时用)

一般在测试自己代码时,可以这样用,以便快速验证自己写的转换算子是否对。

传入一组数字
DataSource<Integer> dataSource1 = env.fromElements(1, 2, 3, 4, 5);

传入一组字符串
DataSource<String> dataSource2 = env.fromElements("I love coding", "I love flink");

传入一个List集合
DataSource<String> dataSource3 = env.fromCollection(Arrays.asList("a", "b", "c"));

4.3 从文件里读取数据

widows上读取本地文件
DataSource<String> dataSource4 = env.readTextFile("D:\\FlinkTest.txt");

Linux上读取本地文件
DataSource<String> localLines = env.readTextFile("file:///usr/local/textfile");

从hdfs读取文件,需要写上hdfs的nameService(高可用集群),或者namenode ip及端口号
DataSource<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/textfile");

4.4 从WebSocket读取数据

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

第一个参数是hostname,第二个参数是port
DataStreamSource<String> lines = env.socketTextStream("com.pigg", 8888);

在对应机器上输入如下命令,flink程序就可以接收数据

nc -lk 8888

4.5 从Kafka读取数据

关于Kafka的安装和配置不在本文讨论范围内,网上博客很多,或参考我之前的博客Linux搭建kafka集群并测试

4.5.1 引入jar

Flink和Kafka结合的非常好,Flink官方也提供了Kafka的连接器。

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka_2.11</artifactId>
	<version>1.10.1</version>
</dependency>
<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.2.0</version>
</dependency>

4.5.2 编写Kafka数据源

  • 一般开发用SpringBoot,Kafka的配置也放在yml文件里,这里作为演示写死了配置
  • 这里仅抛砖引玉,网上Flink整合Kafka完整的博客很多,可以自行搜索学习
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();

//指定kafka的broker地址
properties.setProperty("bootstrap.servers", "com.pigg:9092");

//指定组ID
properties.setProperty("group.id", "test");
properties.setProperty("auto.offset.reset", "earliest");

//定义kafka消费者
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
        "myTopicName",
        new SimpleStringSchema(),
        properties
);

DataStreamSource<String> lines = env.addSource(kafkaSource)

创建Kafka主题注意点:

Kafka在远程机器上,创建主题不要写localhost,这里用hostname: com.pigg
否则你本机的Flink任务读取不到localhost的Kafka消息,而导致错误

1.创建主题
kafka-topics.sh --create --zookeeper com.pigg:2181 --replication-factor 1 --partitions 1 --topic myTopicName

2.查看主题
kafka-topics.sh --list --zookeeper com.pigg:2181

3.创建生产者
kafka-console-producer.sh --broker-list com.pigg:9092 --topic myTopicName

5. 第3步:处理数据流

恭喜贺喜,终于来到第3步了,说实话第3步的内容真的太大太难,我也不知道如何讲起,毕竟我刚学Flink不到7天,还都是晚上迷迷糊糊看的。所以真的只能厚着脸皮写了,我觉得按照下面顺序写,写的不好,莫怪。

  • 什么是DataStream
  • 什么是元组
  • 基本转换算子(Map,FlatMap,Filter,groupBy,keyBy,Reduce)
  • 时间语义
  • 窗口和WaterMark
  • 聚合算子 (max,min,sum)
  • 分流算子
     

5.1 什么是DataStream

Flink提供了三层API,每层在简洁性和表达性之间进行了不同的权衡,DataStream API为许多通用的流处理操作提供原语,比如window。DataStream API适用于Java和Scala,它基于函数实现,比如map()、reduce()等。
 
往上翻看读取Kafka的代码,这边返回的类型是DataStreamSource

DataStreamSource<String> lines = env.addSource(kafkaSource)

查看DataStreamSource类图
 
说明上面代码也可以写成下面这样,对流数据lines的操作,就是调用DataStream API操作它。

DataStream<String> lines = env.addSource(kafkaSource)

有时间得看看DataStream.java这个类的代码。map,flatMap,filter等算子的定义都是在这个类里。

5.2 什么是元组(Tuple)

写Java的可能不知道元组,但是玩过Python的小朋友应该都知道。
元组与ArrayList类似,可以放一组数值,但是不同数据的类型可以不同。
从上面的WordCount代码里看,用到了Tuple2<String, Integer>,这表示定义一个元组,它有2个值,
第一个值是String类型,第二个值是Integer类型。
查看Flink里Tuple2类,可以发现Flink定义了Tuple0一直到Tuple25。

5.3 基本转换算子

5.3.1 map

  • 转换类型:DataStream → DataStream
  • 说明:读取一个元素并生成一个新的元素,例如
  • 举例:
输入 map转换 输出
1,2,3 乘以2 2,4,6
a,b,b 添加一个元素1,组成Tuple2<String, Integer> (a,1),(b,1),(b,1)

下面举例a,b,b -> (a,1),(b,1),(b,1)

public class DataSourceTest {
   
     

    public static void main(String[] args) throws Exception {
   
     

        //第1步:创建执行环境
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        //第2步:创建数据源
        DataSource<String> dataSource3 = env.fromCollection(Arrays.asList("a", "b", "b"));
        
        //第3步:执行转换算子
        MapOperator<String, Tuple2<String, Integer>> maped = dataSource3.map(new MapFunction<String, Tuple2<String, Integer>>() {
   
     

            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
   
     
                return new Tuple2<>(s, 1);
            }
        });

        //第4步:输出打印到控制台
        maped.print();

        //第5步:执行job,如果是实时流计算这是要的,这里是批处理,也可以不加
        env.execute("BatchWordCount");
    }
}

5.3.2 flatMap

写Java8多的小朋友估计对flatMap不陌生,暂时叫它扁平map吧。

  • 转换类型:DataStream → DataStream
  • 说明:多组数据->生成多个流->合并成一个流
  • 举例:
输入 flatMap转换 输出
“I love coding”, “I love flink” 切分后,组成Tuple2<String, Integer> (flink,1)2个(love,1)2个(I,1)(coding,1)
DataSource<String> lines = env.fromElements("I love coding", "I love flink");

//将每一行按照空格切分,并组成(word, 1)
FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
   
     

	@Override
	public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
   
     
            String[] words = line.split(" ");

            for (String word : words) {
   
     
                collector.collect(Tuple2.of(word, 1));
            }
        }
    });

5.3.3 filter

  • 转换类型:DataStream → DataStream
  • 说明:该算子将按照条件对输入数据集进行筛选操作,将符合条件的数据集输出
  • 举例:
输入 flatMap转换 输出
1, 2, 3, 4, 5, 6 找到奇数 1,3,5
DataStreamSource<Integer> nums = env.fromElements(1, 2, 3, 4, 5, 6);

SingleOutputStreamOperator<Long> filterd = nums.filter(new FilterFunction<Long>() {
   
     
    @Override
    public boolean filter(Long value) throws Exception {
   
     
        return value %2 != 0;
    }
}).setParallelism(1);

filterd.print();

5.3.4 keyBy

  • 转换类型:DataStream → KeyedStream
  • 说明:具有相同key的所有记录会分配给到同一分区,类似SQL的group by,在内部,keyBy()是使用hash分区实现
  • 举例:
    如果是DataSet用groupBy,是DataStream用keyBy
    接着上面4.5.2编写Kafka节,把从Kafka读取的数据进行WordCount
DataStreamSource<String> lines = env.addSource(kafkaSource);

SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne =
        lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
   
     
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
   
     
                String[] words = line.split(" ");
                for (String word : words){
   
     
                    Tuple2<String, Integer> tp = Tuple2.of(word, 1);
                    out.collect(tp);
                }
            }
        });
//注意下面的.keyBy(0).sum(1),说按照第一值分组,再把同组里第二个值求和
//聚合函数(min,max,sum)都是加在keyBy(DataSet时是groupBy)后面
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordAndOne.keyBy(0).sum(1);

上面例子是在元组类型上keyBy,所以传的是数字,如果是POJO类型,可以传入字段名

dataStream.keyBy("someKey")

5.3.5 reduce

  • 转换类型:KeyedStream→DataStream
  • 说明:在分区的数据流上调用reduce函数:将当前元素与最后一个reduce的值合并生成新值。
    reduce函数是将KeyedStream转换为DataStream,也就是reduce调用前必须进行分区,即得先调用keyBy()函数
  • 举例:
keyedStream.reduce(new ReduceFunction<Integer>() {
   
     
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
   
     
        return value1 + value2;
    }
});

后续

写到这,发现把时间语义,窗口,聚合,分流也写在这一博客,会显的博客太长了,而且质量会更低(我承认我基本是把IDEA的代码贴过来)。而且时间语义和窗口特别重要的知识,我还是放到下一篇博客吧(该贴的代码还是得贴)。
Flink 实战(二) DataStream聚合 keyBy sum min和minBy区别