08、Flink 基础 - Flink on yarn运行wordcount程序

环境介绍

测试服务器CDH 6.3.1版本安装Flink 1.9版本。

hello.txt文件

hello word
hello hdfs
hello mapreduce
hello yarn
hello hive
hello spark
hello flink

一、Maven配置

Flink依赖的配置

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-java</artifactId>
  <version>1.9.0</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>1.9.0</version>
  <scope>provided</scope>
</dependency>

因为是本地写Java代码,要打包成jar文件,然后上传到服务器后运行,要设置主入口,不然会报错

<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-jar-plugin</artifactId>
  <version>3.0.2</version>
  <configuration>
    <archive>
      <manifest>
        <addClasspath>true</addClasspath>
        <mainClass>org.example.wordCount</mainClass> <!-- 此处为主入口-->
      </manifest>
    </archive>
  </configuration>
</plugin>
<plugin>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-maven-plugin</artifactId>
</plugin>

其中org.example.wordCount 需要自己调整
org.example 是包名
wordCount 是类名

二、Java代码编写

如下:

package org.example;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.Collector;

/*
 * @remark  Flink的第一个wordCount程序
 */

public class wordCount {
   
     
    public static void main(String[] args) throws Exception{
   
     
        //创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //从文件中读取数据
        String inputPath = "hdfs://hp1:8020/user/hive/warehouse/hello.txt";
        DataSet<String> inputDataSet = env.readTextFile(inputPath);

        // 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计
        DataSet<Tuple2<String, Integer>> resultSet = inputDataSet.flatMap(new MyFlatMapper())
                .groupBy(0) // 按照第一个位置的word分组
                .sum(1); // 将第二个位置上的数据求和;

        resultSet.print();

        //env.execute();

        //env.execute("Word Count Example");
    }

    //自定义类,实现FlatMapFunction接口
    public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
   
     
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
   
     
            //按空格分词
            String[] words = value.split(" ");
            //遍历所有word,包成二元组输出
            for (String word : words) {
   
     
                out.collect(new Tuple2<String, Integer>(word, 1));
            }

        }
    }
}

三、Maven打包并上传

通过mvn package命令打包

C:\Users\Administrator\IdeaProjects\FlinkStudy>mvn package
[INFO] Scanning for projects...
[WARNING]
[WARNING] Some problems were encountered while building the effective model for org.example:FlinkStudy:jar:1.0-SNAPSHOT
[WARNING] 'build.pluginManagement.plugins.plugin.(groupId:artifactId)' must be unique but found duplicate declaration of plugin org.apache.maven.plugins:maven-compiler-plugin @ line 98, column 17
[WARNING] 'build.pluginManagement.plugins.plugin.(groupId:artifactId)' must be unique but found duplicate declaration of plugin org.apache.maven.plugins:maven-surefire-plugin @ line 107, column 17
[WARNING] 'build.pluginManagement.plugins.plugin.(groupId:artifactId)' must be unique but found duplicate declaration of plugin org.apache.maven.plugins:maven-jar-plugin @ line 116, column 17
[WARNING]
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING]
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING]
[INFO]
[INFO] -----------------------< org.example:FlinkStudy >-----------------------
[INFO] Building FlinkStudy 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-resources-plugin:3.0.2:resources (default-resources) @ FlinkStudy ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 0 resource
[INFO]
[INFO] --- maven-compiler-plugin:3.6.0:compile (default-compile) @ FlinkStudy ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 2 source files to C:\Users\Administrator\IdeaProjects\FlinkStudy\target\classes
[INFO]
[INFO] --- maven-resources-plugin:3.0.2:testResources (default-testResources) @ FlinkStudy ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 0 resource
[INFO]
[INFO] --- maven-compiler-plugin:3.6.0:testCompile (default-testCompile) @ FlinkStudy ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- maven-surefire-plugin:2.19:test (default-test) @ FlinkStudy ---
[INFO] Tests are skipped.
[INFO]
[INFO] --- maven-jar-plugin:3.0.2:jar (default-jar) @ FlinkStudy ---
[INFO] Building jar: C:\Users\Administrator\IdeaProjects\FlinkStudy\target\FlinkStudy-1.0-SNAPSHOT.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.848 s
[INFO] Finished at: 2021-08-25T09:41:03+08:00
[INFO] ------------------------------------------------------------------------

C:\Users\Administrator\IdeaProjects\FlinkStudy>

然后将生产的FlinkStudy-1.0-SNAPSHOT.jar文件上传到服务器

四、运行jar文件

命令:

flink run  -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 /home/flink/FlinkStudy-1.0-SNAPSHOT.jar 

运行结果:
 

Web界面显示执行结果:
 

五、运行其它的class文件

虽然我们pom文件指定了main class,如果不指定对应的class,就执行pom文件里面指定的class,如果我们想执行该工程下其它class文件怎么办?这个时候我们需要用 -c 或者–class命令来指定对应的class文件

代码:

package org.example;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.Collector;

/*
 * @remark  Flink的第二个wordCount程序
 */

public class wordCount2 {
   
     
    public static void main(String[] args) throws Exception{
   
     
        //创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //从文件中读取数据
        String inputPath = "hdfs://hp1:8020/user/hive/warehouse/hello.txt";
        DataSet<String> inputDataSet = env.readTextFile(inputPath);

        // 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计
        DataSet<Tuple2<String, Integer>> resultSet = inputDataSet.flatMap(new MyFlatMapper())
                .groupBy(0) // 按照第一个位置的word分组
                .sum(1); // 将第二个位置上的数据求和;

        resultSet.print();

        //env.execute();

        //env.execute("Word Count Example");

        System.out.println("这是第二个测试的wordcount");
    }

    //自定义类,实现FlatMapFunction接口
    public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
   
     
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
   
     
            //按空格分词
            String[] words = value.split(" ");
            //遍历所有word,包成二元组输出
            for (String word : words) {
   
     
                out.collect(new Tuple2<String, Integer>(word, 1));
            }

        }
    }
}

运行命令:

-- 正确
flink run -m yarn-cluster -c org.example.wordCount2 FlinkStudy-1.0-SNAPSHOT.jar
-- 正确
flink run -m yarn-cluster --class org.example.wordCount2 FlinkStudy-1.0-SNAPSHOT.jar
--错误(依旧执行pom文件里面的main class)
flink run -m yarn-cluster FlinkStudy-1.0-SNAPSHOT.jar -c org.example.wordCount2 

执行截图: