12、Flink 基础 - Source之从文件读取

一、文件准备

sensor.txt

sensor_1 1547718199 35.8
sensor_6 1547718201, 15.4
sensor_7 1547718202, 6.7
sensor_10 1547718205 38.1

将文件上传到hdfs

hadoop fs -copyFromLocal sensor.txt /user/hive/warehouse

二、程序准备

SourceTest2_File

package org.example;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @remark  Flink Source之从文件读取
 */

public class SourceTest2_File {
   
     
    public static void main(String[] args) throws Exception {
   
     
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //从文件读取
        DataStream<String> dataStream = env.readTextFile("hdfs://hp1:8020/user/hive/warehouse/sensor.txt");

        // 打印输出
        dataStream.print();

        env.execute();
    }
}

三、运行Flink程序

运行命令:

flink run -m yarn-cluster -c org.example.SourceTest2_File FlinkStudy-1.0-SNAPSHOT.jar

运行截图: