使用Flink进行数据处理,首先消费kafka中的销售topic,然后写入到mysql中的销售订单表(tbl_sales_order)和销售商品表(tbl_sales_goods),两张表主键id由数据库自增,它们是一对多的关系,销售商品表有外键id能关联上销售订单表的主键id。还有,销售数据中的企业id需要关联mysql中的企业表获取企业名称一同存入到销售订单表中。请给我一份详细的可实现代码。
要使用 Apache Flink 进行数据处理,消费 Kafka 中的销售 topic,并将数据写入 MySQL 中的销售订单表(tbl_sales_order
)和销售商品表(tbl_sales_goods
),同时关联企业表获取企业名称,可以按照以下步骤进行:
设置依赖:确保你的项目中包含 Flink、Kafka 和 MySQL 连接器的依赖。
Kafka 消费者配置:配置 Kafka 消费者以读取销售数据。
数据处理逻辑:处理销售数据,关联企业表获取企业名称。
写入 MySQL:将处理后的数据写入 MySQL 的销售订单表和销售商品表。
以下是一个详细的代码示例:
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.types.Row;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Properties;
public class SalesDataProcessor {
public static void main(String[] args) throws Exception {
// 设置Flink执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Kafka消费者配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "sales-consumer-group");
// 创建Kafka消费者
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
"sales_topic",
new SimpleStringSchema(),
properties
);
// 读取Kafka数据流
DataStream<String> salesStream = env.addSource(kafkaConsumer);
// 处理销售数据
DataStream<Tuple2<Row, Row>> processedStream = salesStream.map(new SalesDataMapper());
// 写入MySQL的销售订单表
processedStream.map(tuple -> tuple.f0)
.addSink(JdbcSink.sink(
"INSERT INTO tbl_sales_order (order_id, enterprise_id, enterprise_name, order_date) VALUES (?, ?, ?, ?)",
(JdbcStatementBuilder<Row>) (ps, row) -> {
ps.setString(1, row.getField(0).toString());
ps.setInt(2, (Integer) row.getField(1));
ps.setString(3, row.getField(2).toString());
ps.setString(4, row.getField(3).toString());
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/your_database")
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername("your_username")
.withPassword("your_password")
.build()
));
// 写入MySQL的销售商品表
processedStream.map(tuple -> tuple.f1)
.addSink(JdbcSink.sink(
"INSERT INTO tbl_sales_goods (goods_id, order_id, product_name, quantity, price) VALUES (?, ?, ?, ?, ?)",
(JdbcStatementBuilder<Row>) (ps, row) -> {
ps.setString(1, row.getField(0).toString());
ps.setString(2, row.getField(1).toString());
ps.setString(3, row.getField(2).toString());
ps.setInt(4, (Integer) row.getField(3));
ps.setDouble(5, (Double) row.getField(4));
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/your_database")
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername("your_username")
.withPassword("your_password")
.build()
));
// 执行Flink作业
env.execute("Sales Data Processor");
}
// 自定义MapFunction,用于处理销售数据并关联企业名称
public static class SalesDataMapper extends RichMapFunction<String, Tuple2<Row, Row>> {
private transient Connection connection;
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
super.open(parameters);
// 初始化MySQL连接
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/your_database", "your_username", "your_password");
}
@Override
public Tuple2<Row, Row> map(String value) throws Exception {
// 假设销售数据是JSON格式
// 解析JSON数据
JSONObject json = new JSONObject(value);
String orderId = json.getString("order_id");
int enterpriseId = json.getInt("enterprise_id");
String orderDate = json.getString("order_date");
JSONArray goodsArray = json.getJSONArray("goods");
// 查询企业名称
String enterpriseName = "";
PreparedStatement ps = connection.prepareStatement("SELECT enterprise_name FROM tbl_enterprise WHERE enterprise_id = ?");
ps.setInt(1, enterpriseId);
ResultSet rs = ps.executeQuery();
if (rs.next()) {
enterpriseName = rs.getString("enterprise_name");
}
// 创建销售订单Row
Row orderRow = new Row(4);
orderRow.setField(0, orderId);
orderRow.setField(1, enterpriseId);
orderRow.setField(2, enterpriseName);
orderRow.setField(3, orderDate);
// 创建销售商品Row
Row goodsRow = new Row(5);
for (int i = 0; i < goodsArray.length(); i++) {
JSONObject goods = goodsArray.getJSONObject(i);
String goodsId = goods.getString("goods_id");
String productName = goods.getString("product_name");
int quantity = goods.getInt("quantity");
double price = goods.getDouble("price");
goodsRow.setField(0, goodsId);
goodsRow.setField(1, orderId);
goodsRow.setField(2, productName);
goodsRow.setField(3, quantity);
goodsRow.setField(4, price);
}
return new Tuple2<>(orderRow, goodsRow);
}
@Override
public void close() throws Exception {
super.close();
if (connection != null) {
connection.close();
}
}
}
}
依赖:确保在你的项目中包含以下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.26</version>
</dependency>
Kafka 消费者:配置 Kafka 消费者以读取销售数据。
数据处理:SalesDataMapper
类用于解析销售数据,并从 MySQL 中查询企业名称。
写入 MySQL:使用 JdbcSink
将处理后的数据写入 MySQL 的销售订单表和销售商品表。
执行 Flink 作业:调用 env.execute("Sales Data Processor")
执行 Flink 作业。
请根据你的实际情况调整代码中的数据库连接信息和表结构。