使用Flink消费Kafka数据并写入MySQL的详细实现代码

ChatGPT4o 使用 ChatGPT 4o 回答

问题:

使用Flink进行数据处理,首先消费kafka中的销售topic,然后写入到mysql中的销售订单表(tbl_sales_order)和销售商品表(tbl_sales_goods),两张表主键id由数据库自增,它们是一对多的关系,销售商品表有外键id能关联上销售订单表的主键id。还有,销售数据中的企业id需要关联mysql中的企业表获取企业名称一同存入到销售订单表中。请给我一份详细的可实现代码。

回答:

要使用 Apache Flink 进行数据处理,消费 Kafka 中的销售 topic,并将数据写入 MySQL 中的销售订单表(tbl_sales_order)和销售商品表(tbl_sales_goods),同时关联企业表获取企业名称,可以按照以下步骤进行:

  1. 设置依赖:确保你的项目中包含 Flink、Kafka 和 MySQL 连接器的依赖。

  2. Kafka 消费者配置:配置 Kafka 消费者以读取销售数据。

  3. 数据处理逻辑:处理销售数据,关联企业表获取企业名称。

  4. 写入 MySQL:将处理后的数据写入 MySQL 的销售订单表和销售商品表。

以下是一个详细的代码示例:

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.types.Row;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Properties;

public class SalesDataProcessor {

    public static void main(String[] args) throws Exception {
        // 设置Flink执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Kafka消费者配置
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "sales-consumer-group");

        // 创建Kafka消费者
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                "sales_topic",
                new SimpleStringSchema(),
                properties
        );

        // 读取Kafka数据流
        DataStream<String> salesStream = env.addSource(kafkaConsumer);

        // 处理销售数据
        DataStream<Tuple2<Row, Row>> processedStream = salesStream.map(new SalesDataMapper());

        // 写入MySQL的销售订单表
        processedStream.map(tuple -> tuple.f0)
                .addSink(JdbcSink.sink(
                        "INSERT INTO tbl_sales_order (order_id, enterprise_id, enterprise_name, order_date) VALUES (?, ?, ?, ?)",
                        (JdbcStatementBuilder<Row>) (ps, row) -> {
                            ps.setString(1, row.getField(0).toString());
                            ps.setInt(2, (Integer) row.getField(1));
                            ps.setString(3, row.getField(2).toString());
                            ps.setString(4, row.getField(3).toString());
                        },
                        JdbcExecutionOptions.builder()
                                .withBatchSize(1000)
                                .withBatchIntervalMs(200)
                                .withMaxRetries(5)
                                .build(),
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                .withUrl("jdbc:mysql://localhost:3306/your_database")
                                .withDriverName("com.mysql.cj.jdbc.Driver")
                                .withUsername("your_username")
                                .withPassword("your_password")
                                .build()
                ));

        // 写入MySQL的销售商品表
        processedStream.map(tuple -> tuple.f1)
                .addSink(JdbcSink.sink(
                        "INSERT INTO tbl_sales_goods (goods_id, order_id, product_name, quantity, price) VALUES (?, ?, ?, ?, ?)",
                        (JdbcStatementBuilder<Row>) (ps, row) -> {
                            ps.setString(1, row.getField(0).toString());
                            ps.setString(2, row.getField(1).toString());
                            ps.setString(3, row.getField(2).toString());
                            ps.setInt(4, (Integer) row.getField(3));
                            ps.setDouble(5, (Double) row.getField(4));
                        },
                        JdbcExecutionOptions.builder()
                                .withBatchSize(1000)
                                .withBatchIntervalMs(200)
                                .withMaxRetries(5)
                                .build(),
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                .withUrl("jdbc:mysql://localhost:3306/your_database")
                                .withDriverName("com.mysql.cj.jdbc.Driver")
                                .withUsername("your_username")
                                .withPassword("your_password")
                                .build()
                ));

        // 执行Flink作业
        env.execute("Sales Data Processor");
    }

    // 自定义MapFunction,用于处理销售数据并关联企业名称
    public static class SalesDataMapper extends RichMapFunction<String, Tuple2<Row, Row>> {
        private transient Connection connection;

        @Override
        public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
            super.open(parameters);
            // 初始化MySQL连接
            connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/your_database", "your_username", "your_password");
        }

        @Override
        public Tuple2<Row, Row> map(String value) throws Exception {
            // 假设销售数据是JSON格式
            // 解析JSON数据
            JSONObject json = new JSONObject(value);
            String orderId = json.getString("order_id");
            int enterpriseId = json.getInt("enterprise_id");
            String orderDate = json.getString("order_date");
            JSONArray goodsArray = json.getJSONArray("goods");

            // 查询企业名称
            String enterpriseName = "";
            PreparedStatement ps = connection.prepareStatement("SELECT enterprise_name FROM tbl_enterprise WHERE enterprise_id = ?");
            ps.setInt(1, enterpriseId);
            ResultSet rs = ps.executeQuery();
            if (rs.next()) {
                enterpriseName = rs.getString("enterprise_name");
            }

            // 创建销售订单Row
            Row orderRow = new Row(4);
            orderRow.setField(0, orderId);
            orderRow.setField(1, enterpriseId);
            orderRow.setField(2, enterpriseName);
            orderRow.setField(3, orderDate);

            // 创建销售商品Row
            Row goodsRow = new Row(5);
            for (int i = 0; i < goodsArray.length(); i++) {
                JSONObject goods = goodsArray.getJSONObject(i);
                String goodsId = goods.getString("goods_id");
                String productName = goods.getString("product_name");
                int quantity = goods.getInt("quantity");
                double price = goods.getDouble("price");

                goodsRow.setField(0, goodsId);
                goodsRow.setField(1, orderId);
                goodsRow.setField(2, productName);
                goodsRow.setField(3, quantity);
                goodsRow.setField(4, price);
            }

            return new Tuple2<>(orderRow, goodsRow);
        }

        @Override
        public void close() throws Exception {
            super.close();
            if (connection != null) {
                connection.close();
            }
        }
    }
}

说明:

  1. 依赖:确保在你的项目中包含以下依赖:

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.26</version>
    </dependency>
    
  2. Kafka 消费者:配置 Kafka 消费者以读取销售数据。

  3. 数据处理SalesDataMapper 类用于解析销售数据,并从 MySQL 中查询企业名称。

  4. 写入 MySQL:使用 JdbcSink 将处理后的数据写入 MySQL 的销售订单表和销售商品表。

  5. 执行 Flink 作业:调用 env.execute("Sales Data Processor") 执行 Flink 作业。

请根据你的实际情况调整代码中的数据库连接信息和表结构。