Flink+Kafka:数据流处理实战指南

作者:渣渣盟日期:2026/4/27

目录

代码结构

代码解析

(1) 主程序入口

(2) 定义数据流

(3) 使用旧版 Kafka Sink

(4) 使用新版 Kafka Sink

(5) 将数据写入 Kafka

(6) 执行任务

代码优化

交付保证

异常处理

动态 Topic

优化后的代码

这段代码展示了如何使用 Apache Flink 将数据流写入 Kafka,并提供了两种不同的 Kafka Sink 实现方式。以下是对代码的详细解析和说明:

代码结构

  • 包声明package sink
    定义了代码所在的包。
  • 导入依赖
    导入了必要的 Flink 和 Kafka 相关类库,包括:
    • org.apache.flink.api.common.serialization.SimpleStringSchema:用于将数据序列化为字符串。
    • org.apache.flink.connector.kafka.sink:Flink 的 Kafka Sink 相关类。
    • org.apache.flink.streaming.api.scala._:Flink 流处理 API。
    • org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer:旧版的 Kafka Sink 实现。
  • sinkToKafka 对象
    主程序入口,包含 Flink 流处理逻辑和 Kafka Sink 的配置。
1package sink
2
3import org.apache.flink.api.common.serialization.SimpleStringSchema
4import org.apache.flink.connector.base.DeliveryGuarantee
5import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink}
6import org.apache.flink.streaming.api.scala._
7import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
8
9/**
10 *
11 * @PROJECT_NAME: flink1.13
12 * @PACKAGE_NAME: sink
13 * @author: 赵嘉盟-HONOR
14 * @data: 2023-11-19 23:46
15 * @DESCRIPTION
16 *
17 */
18object sinkToKafka {
19  def main(args: Array[String]): Unit = {
20    val env = StreamExecutionEnvironment.getExecutionEnvironment
21    env.setParallelism(4)
22
23    val data = env.fromElements(
24      Event("Mary", "./home", 100L),
25      Event("Sum", "./cart", 500L),
26      Event("King", "./prod", 1000L),
27      Event("King", "./root", 200L)
28    )
29
30    data.map(_.toString).addSink(new FlinkKafkaProducer[String]("","",new SimpleStringSchema()))
31
32    val kafkaSink=KafkaSink.builder()
33      .setBootstrapServers("")
34      .setRecordSerializer(KafkaRecordSerializationSchema.builder()
35      .setTopic("")
36      .setValueSerializationSchema(new SimpleStringSchema())
37      .build()
38      )
39      .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
40      .build()
41
42    data.map(_.toString).sinkTo(kafkaSink)
43
44    env.execute("sinkKafka")
45
46  }
47}
48

基于scala使用flink将读取到的数据写入到kafka

  1. val kafkaSink=KafkaSink.builder():这行代码创建了一个KafkaSink对象的构建器。
  2. .setBootstrapServers(""):这行代码设置了Kafka集群的地址,这里为空字符串,表示没有指定具体的Kafka集群地址。
  3. .setRecordSerializer(KafkaRecordSerializationSchema.builder():这行代码创建了一个KafkaRecordSerializationSchema对象的构建器,用于序列化要发送到Kafka的数据。
  4. .setTopic(""):这行代码设置了要发送数据的Kafka主题,这里为空字符串,表示没有指定具体的Kafka主题。
  5. .setValueSerializationSchema(new SimpleStringSchema()):这行代码设置了要发送数据的序列化方式,这里使用了SimpleStringSchema,表示将数据直接转换为字符串进行发送。
  6. .build():这行代码完成了KafkaRecordSerializationSchema对象的构建。
  7. .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) 是一个设置消息传递保证的代码片段。它指定了消息传递的可靠性,即至少传递一次。
    在分布式系统中,消息传递是一种常见的通信方式,用于在不同的节点之间传递数据或指令。为了保证消息传递的可靠性,通常会使用一些机制来确保消息至少被传递一次。
    在这个代码片段中,.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) 是设置消息传递保证的方法调用。其中,DeliveryGuarantee 是一个枚举类型,表示不同的消息传递保证级别。AT_LEAST_ONCE 是其中一个级别,表示至少传递一次。
    通过将 DeliveryGuarantee.AT_LEAST_ONCE 作为参数传递给 .setDeliverGuarantee() 方法,可以确保消息至少被传递一次,即使中间出现了故障或其他问题。这样可以提高系统的可靠性和容错能力。

代码解析

(1) 主程序入口
1def main(args: Array[String]): Unit = {
2  val env = StreamExecutionEnvironment.getExecutionEnvironment
3  env.setParallelism(4)
  • 创建 Flink 流处理环境 StreamExecutionEnvironment
  • 设置并行度为 4。
(2) 定义数据流
1val data = env.fromElements(
2  Event("Mary", "./home", 100L),
3  Event("Sum", "./cart", 500L),
4  Event("King", "./prod", 1000L),
5  Event("King", "./root", 200L)
6)
  • 使用 fromElements 方法生成一个包含 4 个 Event 对象的流。
(3) 使用旧版 Kafka Sink

data.map(_.toString).addSink(new FlinkKafkaProducer[String]("","",new SimpleStringSchema()))

  • Event 对象转换为字符串。
  • 使用 FlinkKafkaProducer 将数据写入 Kafka。
    • 参数说明:
      * 第一个参数:Kafka 的 bootstrap.servers(未填写)。
      * 第二个参数:Kafka 的 topic(未填写)。
      * 第三个参数:序列化器 SimpleStringSchema
(4) 使用新版 Kafka Sink
1val kafkaSink = KafkaSink.builder()
2  .setBootstrapServers("") // Kafka 服务器地址
3  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
4    .setTopic("") // Kafka topic
5    .setValueSerializationSchema(new SimpleStringSchema()) // 值序列化器
6    .build()
7  )
8  .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) // 交付保证
9  .build()
  • 使用 KafkaSink.builder() 创建一个 Kafka Sink:
    • setBootstrapServers:设置 Kafka 服务器地址(未填写)。
    • setRecordSerializer:配置记录序列化器:
      * setTopic:设置 Kafka topic(未填写)。
      * setValueSerializationSchema:设置值序列化器为 SimpleStringSchema
    • setDeliverGuarantee:设置交付保证为 AT_LEAST_ONCE(至少一次)。
(5) 将数据写入 Kafka

data.map(_.toString).sinkTo(kafkaSink)

  • Event 对象转换为字符串。
  • 使用 sinkTo 方法将数据写入 Kafka。
(6) 执行任务

env.execute("sinkKafka")

  • 启动 Flink 流处理任务,任务名称为 sinkKafka

代码优化

交付保证
  • DeliveryGuarantee.AT_LEAST_ONCE 是默认的交付保证,如果需要更高的可靠性,可以设置为 EXACTLY_ONCE
    .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
异常处理
动态 Topic
  • 如果需要根据数据动态选择 Kafka topic,可以实现 KafkaRecordSerializationSchema

优化后的代码

以下是优化后的完整代码:

1package sink
2
3import org.apache.flink.api.common.serialization.SimpleStringSchema
4import org.apache.flink.connector.base.DeliveryGuarantee
5import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink}
6import org.apache.flink.streaming.api.scala._
7
8object sinkToKafka {
9  def main(args: Array[String]): Unit = {
10    val env = StreamExecutionEnvironment.getExecutionEnvironment
11    env.setParallelism(4)
12
13    val data = env.fromElements(
14      Event("Mary", "./home", 100L),
15      Event("Sum", "./cart", 500L),
16      Event("King", "./prod", 1000L),
17      Event("King", "./root", 200L)
18    )
19
20    val kafkaSink = KafkaSink.builder()
21      .setBootstrapServers("localhost:9092") // Kafka 服务器地址
22      .setRecordSerializer(KafkaRecordSerializationSchema.builder()
23        .setTopic("test-topic") // Kafka topic
24        .setValueSerializationSchema(new SimpleStringSchema()) // 值序列化器
25        .build()
26      )
27      .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) // 交付保证
28      .build()
29
30    data.map(_.toString).sinkTo(kafkaSink)
31
32    env.execute("sinkKafka")
33  }
34}

Flink+Kafka:数据流处理实战指南》 是转载文章,点击查看原文


相关推荐


OpenClaw——让龙虾像真人一样控制桌面的SKILL(macOS版)
KD2026/4/19

一、背景 工作中要做一个桌面控制相关需求,试了下ClawHub现有Desktop Control skill,发现都有一些不好用的地方,或者与macOS系统不够适配,因此写了一个新skill供大家使用和交流 二、概述 这个Skill主要链路如下: 三、具体步骤实现拆解 1.初始化 这一步是最关键的,也是很多现有skill缺失的一步。第一版本先只做Retina屏兼容 在 macOS 上,即使截图和点击都用 Python,也仍然需要先确认几件事情: 截图图像尺寸是多少 屏幕逻辑尺寸是多少 鼠标


AI Agent 智能体开发入门:AutoGen 多智能体协作实战教程
Halcyon.平安2026/4/10

本文通过 AutoGen 框架,从单智能体到多智能体协作,循序渐进地讲解如何构建 AI Agent 系统,包含完整的代码示例和架构设计。 1. 多智能体协作架构 #mermaid-svg-TX83Bcl6adrsEqiY{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}@keyframes edge-animation-frame{from{stroke-dashoffset:0;}}


Claude Code 防上下文爆炸:源码级深度解析
lizhongxuan2026/4/2

基于 Claude Code v2.1.88 源码还原分析。本文从源码层面拆解 Claude Code 如何在长对话中管理上下文窗口,防止 token 爆炸,同时保持用户意图不被稀释。 问题:为什么上下文会爆炸? Claude Code 是一个 agentic coding 工具。一次典型的编码会话中,模型会: 读取十几个文件(每个几百到几千行) 执行 shell 命令并获取输出 搜索代码库(grep/glob 结果可能很大) 编辑文件并查看 diff 调用子 agent 处理子任务 每一


【万字长文】从 AI SDK 到 mini-opencode:一次很巧的 Go Agent 架构实践
mCell2026/3/25

同步更新至个人站点:从 AI SDK 到 mini-opencode:一次很巧的 Go Agent 架构实践 相关链接: 从零构建 Mini Claude Code:stack.mcell.top/blog/2026/a… 本次 Mini OpenCode 仓库地址:github.com/minorcell/m… Memo Code:memo.mcell.top/ 前阵子,我写过一篇 从零构建 Mini Claude Code 的 Agent 开发入门教程。 那次基本是顺着 AI SDK


Rust宏编程完全指南:用元编程解锁Rust的终极力量
土豆12502026/3/17

"宏就像是编译器的魔法棒,挥一挥,重复的代码就消失了。" —— 某位深夜 debug 的 Rustacean 目录 Why:为什么需要宏? What:宏是什么? How:如何使用宏? 声明宏 (macro_rules!) 派生宏 (Derive Macros) 属性宏 (Attribute Macros) 函数式宏 (Function-like Macros) 最佳实践 常见误区 总结 Why:为什么需要宏? 想象一下,你正在写一个 Web 框架,需要为 50 个不同的结构体实现相


【毕设】前后端(无模型训练)
2301_815389372026/3/8

后端 第一步,先建一个项目文件夹。 打开你电脑上任意一个地方,新建一个文件夹,就叫 ebike-detection,然后把你的 best.pt 复制进去。 第二步,安装Flask和相关依赖。 打开命令提示符(按 Win+R,输入 cmd,回车),然后把下面这行命令复制进去运行: pip install flask flask-cors ultralytics pillow 好,第三步,创建Flask后端文件。 在你的 ebike-detection


Node.js 安装与配置完全指南:从零开始搭建开发环境
张3蜂2026/2/28

目录 引言 第一部分:Node.js 简介与版本选择 1.1 什么是 Node.js? 1.2 Node.js 版本介绍 第二部分:Node.js 安装方式详解 2.1 方式一:官方安装包(最简单,适合初学者) Windows/macOS 安装步骤: 2.2 方式二:包管理器安装(适合 Linux 用户) Ubuntu/Debian 系统安装步骤: CentOS/RHEL 系统安装步骤: macOS 使用 Homebrew 安装: 2.3 方式三:使用 NVM 安装(最推


ThreadForge v1.1.0 发布:让 Java 并发更接近 Go 的开发体验
一只叫煤球的猫2026/2/20

正好春节放假,自驾去了陕西、河南、安徽,一路上走走停停。 白天基本在路上,晚上在酒店或者服务区休息时,抽一些时间继续打磨 ThreadForge。 一点点补了个 v1.1.0 出来。 仍然保持 ThreadForge 的目标: 让 Java 能写出更简单、更可推理、更可观测的并发代码。 这次版本,重点补齐了并发开发里几个还算常见的能力。 v1.1.0 核心更新 Retry Policy(失败重试) 支持 scope 级默认重试,也支持任务级覆盖,不再到处手写 while/try-catch


git pull拉取的时候碰到报错:error: 您对下列文件的本地修改将被合并操作覆盖 请在合并前提交或贮藏您的修改。
skywalk81632026/2/11

git pull拉取的时候碰到报错: error: 您对下列文件的本地修改将被合并操作覆盖:         data/processed/acnes_related_data.csv         data/processed/activity_data.csv         data/processed/hemolytic_data.csv         data/raw/active_peptides.csv         data/raw/hemolytic.csv      


自己搭邮件服务器有多难?我用 Mailu 跑通了整套企业邮箱
GetcharZp2026/2/3

从“为什么要自建邮箱”讲起,拆解 Mailu 的架构、优缺点,以及普通人也能照着做的安装实战。 Github:github.com/Mailu/Mailu 官网:mailu.io/ 这几年,越来越多团队开始重新审视一件事:邮箱,到底要不要掌握在自己手里? 用第三方企业邮箱当然省事,但账号封禁、功能限制、隐私不可控的问题,一旦遇到,几乎没有回旋余地。于是,自建邮件服务器这件事,又被不少技术团队重新捡了起来。 而在一堆方案里,Mailu 是被频繁提到的一个名字。 Mailu 是什么?一句话先讲明

首页编辑器站点地图

本站内容在 CC BY-SA 4.0 协议下发布

Copyright © 2026 XYZ博客