
目录
代码结构
代码解析
(1) 主程序入口
(2) 定义数据流
(3) 使用旧版 Kafka Sink
(4) 使用新版 Kafka Sink
(5) 将数据写入 Kafka
(6) 执行任务
代码优化
交付保证
异常处理
动态 Topic
优化后的代码
这段代码展示了如何使用 Apache Flink 将数据流写入 Kafka,并提供了两种不同的 Kafka Sink 实现方式。以下是对代码的详细解析和说明:
代码结构
- 包声明:
package sink
定义了代码所在的包。 - 导入依赖:
导入了必要的 Flink 和 Kafka 相关类库,包括:org.apache.flink.api.common.serialization.SimpleStringSchema:用于将数据序列化为字符串。org.apache.flink.connector.kafka.sink:Flink 的 Kafka Sink 相关类。org.apache.flink.streaming.api.scala._:Flink 流处理 API。org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer:旧版的 Kafka Sink 实现。
sinkToKafka对象:
主程序入口,包含 Flink 流处理逻辑和 Kafka Sink 的配置。
1package sink 2 3import org.apache.flink.api.common.serialization.SimpleStringSchema 4import org.apache.flink.connector.base.DeliveryGuarantee 5import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink} 6import org.apache.flink.streaming.api.scala._ 7import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer 8 9/** 10 * 11 * @PROJECT_NAME: flink1.13 12 * @PACKAGE_NAME: sink 13 * @author: 赵嘉盟-HONOR 14 * @data: 2023-11-19 23:46 15 * @DESCRIPTION 16 * 17 */ 18object sinkToKafka { 19 def main(args: Array[String]): Unit = { 20 val env = StreamExecutionEnvironment.getExecutionEnvironment 21 env.setParallelism(4) 22 23 val data = env.fromElements( 24 Event("Mary", "./home", 100L), 25 Event("Sum", "./cart", 500L), 26 Event("King", "./prod", 1000L), 27 Event("King", "./root", 200L) 28 ) 29 30 data.map(_.toString).addSink(new FlinkKafkaProducer[String]("","",new SimpleStringSchema())) 31 32 val kafkaSink=KafkaSink.builder() 33 .setBootstrapServers("") 34 .setRecordSerializer(KafkaRecordSerializationSchema.builder() 35 .setTopic("") 36 .setValueSerializationSchema(new SimpleStringSchema()) 37 .build() 38 ) 39 .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) 40 .build() 41 42 data.map(_.toString).sinkTo(kafkaSink) 43 44 env.execute("sinkKafka") 45 46 } 47} 48
基于scala使用flink将读取到的数据写入到kafka
val kafkaSink=KafkaSink.builder():这行代码创建了一个KafkaSink对象的构建器。.setBootstrapServers(""):这行代码设置了Kafka集群的地址,这里为空字符串,表示没有指定具体的Kafka集群地址。.setRecordSerializer(KafkaRecordSerializationSchema.builder():这行代码创建了一个KafkaRecordSerializationSchema对象的构建器,用于序列化要发送到Kafka的数据。.setTopic(""):这行代码设置了要发送数据的Kafka主题,这里为空字符串,表示没有指定具体的Kafka主题。.setValueSerializationSchema(new SimpleStringSchema()):这行代码设置了要发送数据的序列化方式,这里使用了SimpleStringSchema,表示将数据直接转换为字符串进行发送。.build():这行代码完成了KafkaRecordSerializationSchema对象的构建。.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)是一个设置消息传递保证的代码片段。它指定了消息传递的可靠性,即至少传递一次。
在分布式系统中,消息传递是一种常见的通信方式,用于在不同的节点之间传递数据或指令。为了保证消息传递的可靠性,通常会使用一些机制来确保消息至少被传递一次。
在这个代码片段中,.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)是设置消息传递保证的方法调用。其中,DeliveryGuarantee是一个枚举类型,表示不同的消息传递保证级别。AT_LEAST_ONCE是其中一个级别,表示至少传递一次。
通过将DeliveryGuarantee.AT_LEAST_ONCE作为参数传递给.setDeliverGuarantee()方法,可以确保消息至少被传递一次,即使中间出现了故障或其他问题。这样可以提高系统的可靠性和容错能力。
代码解析
(1) 主程序入口
1def main(args: Array[String]): Unit = { 2 val env = StreamExecutionEnvironment.getExecutionEnvironment 3 env.setParallelism(4)
- 创建 Flink 流处理环境
StreamExecutionEnvironment。 - 设置并行度为 4。
(2) 定义数据流
1val data = env.fromElements( 2 Event("Mary", "./home", 100L), 3 Event("Sum", "./cart", 500L), 4 Event("King", "./prod", 1000L), 5 Event("King", "./root", 200L) 6)
- 使用
fromElements方法生成一个包含 4 个Event对象的流。
(3) 使用旧版 Kafka Sink
data.map(_.toString).addSink(new FlinkKafkaProducer[String]("","",new SimpleStringSchema()))
- 将
Event对象转换为字符串。 - 使用
FlinkKafkaProducer将数据写入 Kafka。- 参数说明:
* 第一个参数:Kafka 的bootstrap.servers(未填写)。
* 第二个参数:Kafka 的topic(未填写)。
* 第三个参数:序列化器SimpleStringSchema。
- 参数说明:
(4) 使用新版 Kafka Sink
1val kafkaSink = KafkaSink.builder() 2 .setBootstrapServers("") // Kafka 服务器地址 3 .setRecordSerializer(KafkaRecordSerializationSchema.builder() 4 .setTopic("") // Kafka topic 5 .setValueSerializationSchema(new SimpleStringSchema()) // 值序列化器 6 .build() 7 ) 8 .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) // 交付保证 9 .build()
- 使用
KafkaSink.builder()创建一个 Kafka Sink:setBootstrapServers:设置 Kafka 服务器地址(未填写)。setRecordSerializer:配置记录序列化器:
*setTopic:设置 Kafka topic(未填写)。
*setValueSerializationSchema:设置值序列化器为SimpleStringSchema。setDeliverGuarantee:设置交付保证为AT_LEAST_ONCE(至少一次)。
(5) 将数据写入 Kafka
data.map(_.toString).sinkTo(kafkaSink)
- 将
Event对象转换为字符串。 - 使用
sinkTo方法将数据写入 Kafka。
(6) 执行任务
env.execute("sinkKafka")
- 启动 Flink 流处理任务,任务名称为
sinkKafka。
代码优化
交付保证
DeliveryGuarantee.AT_LEAST_ONCE是默认的交付保证,如果需要更高的可靠性,可以设置为EXACTLY_ONCE:
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
异常处理
- 在 Sink 中添加异常处理逻辑,避免程序因 Kafka 写入失败而崩溃:
data.map(_.toString).sinkTo(kafkaSink).setParallelism(1).name("KafkaSink")
动态 Topic
- 如果需要根据数据动态选择 Kafka topic,可以实现
KafkaRecordSerializationSchema。
优化后的代码
以下是优化后的完整代码:
1package sink 2 3import org.apache.flink.api.common.serialization.SimpleStringSchema 4import org.apache.flink.connector.base.DeliveryGuarantee 5import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink} 6import org.apache.flink.streaming.api.scala._ 7 8object sinkToKafka { 9 def main(args: Array[String]): Unit = { 10 val env = StreamExecutionEnvironment.getExecutionEnvironment 11 env.setParallelism(4) 12 13 val data = env.fromElements( 14 Event("Mary", "./home", 100L), 15 Event("Sum", "./cart", 500L), 16 Event("King", "./prod", 1000L), 17 Event("King", "./root", 200L) 18 ) 19 20 val kafkaSink = KafkaSink.builder() 21 .setBootstrapServers("localhost:9092") // Kafka 服务器地址 22 .setRecordSerializer(KafkaRecordSerializationSchema.builder() 23 .setTopic("test-topic") // Kafka topic 24 .setValueSerializationSchema(new SimpleStringSchema()) // 值序列化器 25 .build() 26 ) 27 .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) // 交付保证 28 .build() 29 30 data.map(_.toString).sinkTo(kafkaSink) 31 32 env.execute("sinkKafka") 33 } 34}
《Flink+Kafka:数据流处理实战指南》 是转载文章,点击查看原文。


