一、Sink之JDBC概述

Flink的Sink支持的数据库:
 
Bahir中支持的数据库:
 

从上两图可以看到,Flink的Sink并支持类似MySQL的这种关系型数据库,那么如果我需要通过Flink连接MySQL,该如何操作呢?

这个时候我们可以使用Flink Sink的JDBC连接。

二、pom文件配置

此处,我本地MySQL版本是 8.0.19

<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.19</version>
</dependency>

三、MySQL配置

新建数据库及表

CREATE DATABASE flink_test DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;

CREATE TABLE sensor_temp (
  id varchar(32) NOT NULL,
  temp double NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

四、编写Java代码

package org.flink.sink;

import org.flink.beans.SensorReading;
import org.example.SourceTest4_UDF;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * @remark Sink之JDBC
 */

public class SinkTest4_Jdbc {
   
     
    public static void main(String[] args) throws Exception {
   
     
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从文件读取数据
//        DataStream<String> inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt");
//
//        // 转换成SensorReading类型
//        DataStream<SensorReading> dataStream = inputStream.map(line -> {
   
     
//            String[] fields = line.split(",");
//            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
//        });

        DataStream<SensorReading> dataStream = env.addSource(new SourceTest4_UDF.MySensorSource());

        dataStream.addSink(new MyJdbcSink());

        env.execute();
    }

    // 实现自定义的SinkFunction
    public static class MyJdbcSink extends RichSinkFunction<SensorReading> {
   
     
        // 声明连接和预编译语句
        Connection connection = null;
        PreparedStatement insertStmt = null;
        PreparedStatement updateStmt = null;

        @Override
        public void open(Configuration parameters) throws Exception {
   
     
            connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink_test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai", "root", "123456");
            insertStmt = connection.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)");
            updateStmt = connection.prepareStatement("update sensor_temp set temp = ? where id = ?");
        }

        // 每来一条数据,调用连接,执行sql
        @Override
        public void invoke(SensorReading value, Context context) throws Exception {
   
     
            // 直接执行更新语句,如果没有更新那么就插入
            updateStmt.setDouble(1, value.getTemperature());
            updateStmt.setString(2, value.getId());
            updateStmt.execute();
            if( updateStmt.getUpdateCount() == 0 ){
   
     
                insertStmt.setString(1, value.getId());
                insertStmt.setDouble(2, value.getTemperature());
                insertStmt.execute();
            }
        }

        @Override
        public void close() throws Exception {
   
     
            insertStmt.close();
            updateStmt.close();
            connection.close();
        }
    }
}

五、运行Flink程序查看数据

mysql> select * from sensor_temp;
+-----------+-------------------+
| id        | temp              |
+-----------+-------------------+
| sensor_3  | 65.31089123002162 |
| sensor_10 | 20.23454807781744 |
| sensor_4  | 79.87349739590283 |
| sensor_1  | 68.79742249825429 |
| sensor_2  |  44.1766638371653 |
| sensor_7  | 99.47000620947128 |
| sensor_8  |  68.7360059804266 |
| sensor_5  |  69.9135258264366 |
| sensor_6  | 38.85722751176939 |
| sensor_9  | 69.97758295030204 |
+-----------+-------------------+
10 rows in set (0.00 sec)

mysql>