【Kotlin】 数据流完全指南:冷流、热流与 Android 实战

作者:idealzouhu日期:2026/3/21

文章目录

    • 一、数据流简介
      • 1.1 Kotlin 数据流概述
        • 1.2 核心特性
        • 1.3 Flow 的基本组件
    • 二、数据流的使用方法
      • 2.1 正向流
          • 2.1.1 创建数据流
            * 2.1.2 修改数据流
            * 2.1.3 收集数据流
        • 2.2 反向流
          • 2.2.1 创建数据流
            * 2.2.2 修改并收集数据流
        • 2.3 数据流在 Jetpack 中的应用场景
    • 三、数据流的行为模式
      • 3.1 冷流(Cold Flow)
          • 3.1.1 什么是冷流
            * 3.1.2 为什么 Kotlin 默认使用冷流
        • 3.2 热流(Hot Flow)
          • 3.2.1 什么是热流
            * 3.2.2 Kotlin 如何支持热流
        • 3.3 使用哪种数据流
          • 3.3.1 如何选择
            * 3.3.2 冷流转热流:shareIn 与 stateIn
    • 参考资料

一、数据流简介

1.1 Kotlin 数据流概述

在 Kotlin 协程中,普通挂起函数只能返回单个值,调用一次即结束。但实际开发中往往需要处理持续产生的多值序列——如实时数据、用户输入、WebSocket 消息等。Kotlin 数据流(Flow)正是为此而生。

Flow 是基于协程的异步流处理 API,支持按顺序、异步地发射多个同类型值。其核心价值在于:用声明式、近乎同步的代码风格处理异步事件序列,同时享受协程的结构化并发与资源安全优势。

它类似于 RxJava 的 Observable,但完全基于协程设计,可与协程无缝集成。

1.2 核心特性

  • 声明式 & 可组合:通过 mapfilterzip 等链式操作,以近乎同步的代码风格处理异步事件序列。
  • 异步非阻塞emitcollect 均为挂起函数,可与协程无缝组合,后台计算不阻塞主线程。
  • 结构化并发:收集在协程作用域内运行,作用域取消时(如界面关闭)自动停止,避免内存泄漏。
  • 背压支持:生产者过快时,可通过缓冲、conflate 等机制处理速度不匹配,而回调无法做到。
  • 冷流(Cold Stream):默认模式,仅在 collect 时执行,每次收集都从头发射。详见 3.1 冷流

1.3 Flow 的基本组件

Flow 的基本组件有:

  • Flow:数据流接口,通过 flow { ... } 构建器定义。
  • 发射器(Emitter):在构建器内调用 emit() 发射数据。
  • 收集器(Collector):通过 collect() 接收数据,持续消费直到流结束或协程取消。

发射与收集均为挂起函数,以异步方式生成和消费值。示例:

1import kotlinx.coroutines.*
2import kotlinx.coroutines.flow.*
3
4// 使用 Flow 构建器创建冷数据流
5fun simpleFlow(): Flow<Int> = flow {
6    for (i in 1..3) {
7        delay(100) // 模拟异步操作
8        emit(i)    // 发射数据
9    }
10}
11
12fun main() = runBlocking {
13    
14    // 收集数据
15    simpleFlow().collect { value -> 
16        println(value) // 依次打印 1, 2, 3
17    }
18}
19

二、数据流的使用方法

数据流包含三个角色:

  • 提供方:生成数据,可借助协程异步生产。
  • 中介(可选):修改流中的值或流本身。
  • 使用方:消费流中的值。

在这里插入图片描述
(图源:Android 官方文档

在 Android 中常见两种流向:

  • 正向流:数据层(Repository)→ 领域层/ViewModel(map 转换)→ 界面层(收集并更新 UI)。
  • 反向流:界面层(用户输入)→ ViewModel(响应事件流)→ 更新界面状态。

2.1 正向流

2.1.1 创建数据流

使用 flow { } 构建器创建流,在内部通过 emit 发射值。下例中,数据源以固定间隔轮询资讯,作为提供方:

1class NewsRemoteDataSource(
2    private val newsApi: NewsApi,
3    private val refreshIntervalMs: Long = 5000
4) {
5    val latestNews: Flow<List<ArticleHeadline>> = flow {
6        while(true) {
7            val latestNews = newsApi.fetchLatestNews()
8            emit(latestNews) // Emits the result of the request to the flow
9            delay(refreshIntervalMs) // Suspends the coroutine for some time
10        }
11    }
12}
13
14// Interface that provides a way to make network requests with suspend functions
15interface NewsApi {
16    suspend fun fetchLatestNews(): List<ArticleHeadline>
17}
18

注意flow 在协程内执行,发射有序(挂起函数返回后才 emit 下一个值)。不可在 withContext 或新协程中调用 emit,否则需用 callbackFlow

2.1.2 修改数据流

中介通过中间运算符(如 mapfilter)链式修改流,这些操作是惰性的,仅在收集时执行。Repository 示例:

1class NewsRepository(
2    private val newsRemoteDataSource: NewsRemoteDataSource,
3    private val userData: UserData
4) {
5    /**
6     * Returns the favorite latest news applying transformations on the flow.
7     * These operations are lazy and don't trigger the flow. They just transform
8     * the current value emitted by the flow at that point in time.
9     */
10    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
11        newsRemoteDataSource.latestNews
12            // Intermediate operation to filter the list of favorite topics
13            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
14            // Intermediate operation to save the latest news in the cache
15            .onEach { news -> saveInCache(news) }
16}
17
2.1.3 收集数据流

终端运算符 collect 触发流执行并消费所有发射值。ViewModel 示例:

1class LatestNewsViewModel(
2    private val newsRepository: NewsRepository
3) : ViewModel() {
4
5    init {
6        viewModelScope.launch {
7            // Trigger the flow and consume its elements using collect
8            newsRepository.favoriteLatestNews.collect { favoriteNews ->
9                // Update View with the latest favorite news
10            }
11        }
12    }
13}
14

收集数据流会触发提供方刷新最新资讯,并以固定时间间隔发出网络请求。由于提供方始终通过 while(true) 循环保持活跃状态,因此,在清除 ViewModel 并取消 viewModelScope 数据流后,数据流将关闭。

2.2 反向流

在反向流中,界面层是提供方。使用 callbackFlow 将回调式 API(如 TextWatcher)转为 Flow,并通过 awaitClose 在流取消时清理监听:

2.2.1 创建数据流
1// --- 界面层(作为 提供方)---
2// 创建一个表示搜索框输入的 Flow
3val searchFlow: Flow<String> = callbackFlow {
4    val watcher = object : TextWatcher {
5        override fun onTextChanged(text: String) {
6            trySend(text) // 将每次输入作为一个事件“发射”到流中
7        }
8    }
9    editText.addTextChangedListener(watcher)
10    
11    // 当流被取消时(如界面销毁),移除监听,避免内存泄漏
12    awaitClose { editText.removeTextChangedListener(watcher) }
13}
14
15// 将这个流“暴露”给 ViewModel
16viewModel.bindSearchFlow(searchFlow)
17
2.2.2 修改并收集数据流

ViewModel 对输入流做防抖、过滤、去重,再触发搜索并收集结果:

1// --- ViewModel层(作为 中介/使用方)---
2fun bindSearchFlow(searchFlow: Flow<String>) {
3    viewModelScope.launch {
4        searchFlow
5            .debounce(300) // 中介操作1:防抖,等用户停止输入300ms
6            .filter { it.length >= 2 } // 中介操作2:过滤,只搜索至少2个字符
7            .distinctUntilChanged() // 中介操作3:去重,避免连续相同搜索
8            .flatMapLatest { query -> // 中介操作4:切换到最新搜索,取消之前的
9                repository.search(query) // 触发真正的搜索(返回一个 Flow<Result>)
10            }
11            .collect { result -> // 使用方:消费最终的搜索结果
12                _searchResults.value = result // 更新 LiveData/StateFlow
13            }
14    }
15}
16

2.3 数据流在 Jetpack 中的应用场景

Jetpack 已广泛支持 Flow。以 Room 为例,DAO 返回 Flow 即可监听数据库变更:

1@Dao
2abstract class ExampleDao {
3    @Query("SELECT * FROM Example")
4    abstract fun getExamples(): Flow<List<Example>>
5}
6

表数据变更时,Flow 自动发射最新列表。

三、数据流的行为模式

冷流像点播:你点才开始播,每人从头看。热流像直播:不管你看不看都在播,中途进入只能从当前看起。

3.1 冷流(Cold Flow)

3.1.1 什么是冷流

冷流类似于私人定制,具备如下特征:

  • 按需启动:仅当使用方调用终端操作符(如 collect)时,提供方(flow 构建器内的代码)才会开始执行。
  • 独立副本:每次调用 collect,都会获得完整、从头开始的值序列。
  • 无共享状态:不同收集者互不共享进度。

适用场景:网络请求、数据库查询等需独立数据源,或每个订阅者需从头消费完整数据。

3.1.2 为什么 Kotlin 默认使用冷流

Kotlin Flow 默认使用冷流,这主要是为了安全可预测

  1. 结构化并发友好:冷流的生命周期与收集它的协程完全绑定。协程取消,流的生产就停止。这完美契合了协程的结构化并发理念,避免了资源泄漏。
  2. 幂等性:每次收集都获得一份完整的、独立的副本,行为可预测。这对于网络请求、数据库查询、文件读取等场景至关重要。你希望每次查询都获取最新数据,而不是错过已经开始的数据。
  3. 资源管理简单:由于是独立的,不需要复杂的“多播”或“连接”管理。

3.2 热流(Hot Flow)

3.2.1 什么是热流

热流类似于直播,具备如下特征:

  • 主动发射:不管有无订阅者,数据都会产生。
  • 共享数据源:多订阅者共享同一份数据流,但只能收到订阅后的数据。

适用场景:IM 消息、定位更新、全局状态等需多订阅者共享的实时数据。

3.2.2 Kotlin 如何支持热流

Kotlin 通过 StateFlowSharedFlow 提供热流:

特性StateFlowSharedFlow
设计语义状态容器事件流
初始值必须提供可选,通过 replay 控制
重放机制固定重放 1 个值可配置 replay = 0, 1, ...
去重策略自动去重(equals 相同则不发射)不去重,每次 emit 都触发
当前值访问有 value 属性无,需通过缓存或收集获取
背压处理固定为 Conflated(只取最新)可配置:SUSPEND、DROP_OLDEST、DROP_LATEST
典型场景UI 状态一次性事件(Toast、导航)、实时消息流

(1) SharedFlow
举个例子,您可以使用 SharedFlow向应用程序的其他部分发送“滴答”信号,以便所有内容能够定期、同时刷新。

1// 集中管理应用程序内容何时需要刷新的类
2class TickHandler(
3    private val externalScope: CoroutineScope,
4    private val tickIntervalMs: Long = 5000
5) {
6    // 后备属性,避免从其他类发出数据流
7    private val _tickFlow = MutableSharedFlow<Unit>(replay = 0)
8    val tickFlow: SharedFlow<Unit> = _tickFlow
9
10    init {
11        externalScope.launch {
12            while(true) {
13                _tickFlow.emit(Unit) // 发出信号
14                delay(tickIntervalMs) // 延迟指定间隔
15            }
16        }
17    }
18}
19
20class NewsRepository(
21    ...,
22    private val tickHandler: TickHandler,
23    private val externalScope: CoroutineScope
24) {
25    init {
26        externalScope.launch {
27            // 监听滴答信号更新
28            tickHandler.tickFlow.collect {
29                refreshLatestNews() // 收集到信号时,刷新新闻
30            }
31        }
32    }
33
34    suspend fun refreshLatestNews() { ... }
35    ...
36}
37

(2) StateFlow

StateFlow 是一个特殊的 SharedFlow,专门为管理“状态”而设计。其内部维护了一个可变的状态,并在状态发生变化时通知所有观察者。

其典型案例是MVVM中保存界面的状态的 UiState。

1class LatestNewsViewModel(
2    private val newsRepository: NewsRepository
3) : ViewModel() {
4
5    // Backing property to avoid state updates from other classes
6    private val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList()))
7    // The UI collects from this StateFlow to get its state updates
8    val uiState: StateFlow<LatestNewsUiState> = _uiState
9
10    init {
11        viewModelScope.launch {
12            newsRepository.favoriteLatestNews
13                // Update View with the latest favorite news
14                // Writes to the value property of MutableStateFlow,
15                // adding a new element to the flow and updating all
16                // of its collectors
17                .collect { favoriteNews ->
18                    _uiState.value = LatestNewsUiState.Success(favoriteNews)
19                }
20        }
21    }
22}
23
24// Represents different states for the LatestNews screen
25sealed class LatestNewsUiState {
26    data class Success(val news: List<ArticleHeadline>): LatestNewsUiState()
27    data class Error(val exception: Throwable): LatestNewsUiState()
28}
29

3.3 使用哪种数据流

3.3.1 如何选择
  • 冷流:数据需按需计算、一次性、独立。
  • 热流:数据需多观察者共享、长期存活、活跃状态或事件。
3.3.2 冷流转热流:shareIn 与 stateIn

在 Android 开发中,一个常见的模式是:数据层用冷流(Flow)暴露数据。对于需要在多个观察者间共享的数据(特别是昂贵的查询结果),可以在数据层使用 shareIn转换为热流 SharedFlow以优化性能。对于 UI 状态,则在 ViewModel 层使用 stateIn(或直接管理 MutableStateFlow)将其转换为热流 StateFlow,供 UI 安全地观察,并与 ViewModel 的生命周期绑定。

举个例子,一个从远程(如 Firestore)或本地获取数据的 Flow冷流。如果多个 ViewModel 或屏幕都需要这个数据,使用 shareIn可以确保数据源只执行一次,然后将结果广播给所有订阅者,避免了重复的网络请求或数据库查询,从而显著提升应用性能和效率。

1//  DataSource/Repository 
2class NewsRepository(
3    private val newsApi: NewsApi,
4    private val externalScope: CoroutineScope // 通常是 applicationScope
5) {
6    val topHeadlines: Flow<List<Article>> = flow {
7        emit(newsApi.fetchTopHeadlines()) // 网络请求
8    }.shareIn(
9        scope = externalScope,
10        started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 5000), // 有订阅者时才启动,无人订阅5秒后停止
11        replay = 1 // 新订阅者立即得到最新缓存
12    )
13}
14

参考资料

Android 上的 Kotlin 数据流 | Android Developers


【Kotlin】 数据流完全指南:冷流、热流与 Android 实战》 是转载文章,点击查看原文


相关推荐


特高压输变电工程全生命周期BIM+GIS数字化管理平台:重塑能源动脉的数字基石(WORD)
无忧智库2026/3/13

引言:能源互联网时代的数字化觉醒 在“双碳”目标的宏大叙事下,中国能源结构正经历着前所未有的深刻变革。作为能源配置的“大动脉”,特高压输变电工程以其输送容量大、距离远、损耗低的技术优势,成为了构建新型电力系统的核心骨架。然而,随着工程规模的指数级增长和地理环境的日益复杂,传统的管理模式正面临着严峻的挑战。设计阶段的各专业协同困难、施工阶段的进度与质量管控盲区、运维阶段的海量数据孤岛,如同一个个隐形的枷锁,制约着电网建设效率的提升和全生命周期价值的释放。 当物理世界的铁塔银线不断向天际延伸时,数字


windows下配置Qt arm32交叉编译环境
itas1092026/3/5

windows下配置Qt arm32交叉编译环境 环境: 系统:windows 11 Qt: 5.12.12(2021-11-25) 本地编译器:mingw73 64(2018-04-25) 交叉编译器: gcc-linaro-7.3.1-2018.05-i686-mingw32_arm-linux-gnueabi perl: strawberry perl 5.22.1(2016-01-07) python: 3.8.10(2021-05-03) 1. 安装Qt MinGW Perl Pyt


告别死板流程:OpenSpec OPSX 如何重塑 SDD 开发工作流
fundroid2026/2/25

引言:SDD 与 OpenSpec 规范驱动开发(SDD)是什么? 近两年,AI 编码助手已经能“听懂人话”,从一段自然语言描述里生成大段代码。但很多团队也发现:如果需求只是散落在聊天记录里、脑补在每个人的心里,AI 很容易“发挥过度”——代码写出来了,却不是你真正想要的系统行为。 规范驱动开发(Spec-Driven Development,SDD)试图解决的,就是这个问题。它把规范(spec)而不是代码当成系统的“单一事实来源”:先用结构化、机器可读的方式,把系统应该做什么、有哪些边界和不变


WebMCP 时代:在浏览器中释放 AI 的工作能力
CharlesYu012026/2/16

随着 AI Agent 的广泛应用,传统的 Web 自动化与 Web 交互模式正在迎来根本性变化。WebMCP 是一个未来派的技术提案,它不仅改变了 AI 访问 Web 的方式,还为 AI 与前端应用之间建立起了 协议级的交互通道。本文从WebMCP架构分层解析这项技术及其工程意义。 面对 GEO 与 Agent 应用逐步弱化浏览器入口价值的趋势,浏览器厂商必须主动跟进,通过技术升级与生态重构来守住自身核心阵地。 一、WebMCP 是什么? WebMCP(Web Model Context P


MCP (Model Context Protocol) 技术理解 - 第二篇
想用offer打牌2026/2/8

引言 我们第一篇讲了MCP的基础概念、MCP解决的问题以及MCP的架构,我相信大家已经对MCP有了一定的了解,那么接下来让我们深入MCP具体是如何实现的,这一篇我们的重点放在通信协议和数据传输上,让我们一起来看看吧 如果你对前面的内容感兴趣,可以点击这里跳转 MCP (Model Context Protocol) 技术理解 - 第一篇 MCP的层级 MCP由两层组成: 数据层:定义了基于 JSON-RPC 的客户端-服务器通信协议,包括生命周期管理和核心原语,如工具、资源、提示和通知。 传输


type-challenges(ts类型体操): 11 - 元组转换为对象
fxss2026/1/30

11 - 元组转换为对象 by sinoon (@sinoon) #简单 #object-keys 题目 将一个元组类型转换为对象类型,这个对象类型的键/值和元组中的元素对应。 例如: const tuple = ['tesla', 'model 3', 'model X', 'model Y'] as const type result = TupleToObject<typeof tuple> // expected { 'tesla': 'tesla', 'model 3': 'mode


Flutter艺术探索-Flutter国际化:多语言支持实现
kirk_wang2026/1/20

Flutter 国际化:从原理到实践的多语言支持方案 引言:为什么你的 Flutter 应用需要国际化? 如今,开发一款成功的应用就不得不考虑全球市场。国际化(i18n)和本地化(l10n)不再是可选项,而是连接不同文化用户的桥梁。对于使用 Flutter 的开发者来说,框架本身提供了强大的国际化支持,这不仅能显著提升用户体验,更是扩大应用市场份额的关键一步。想想看,当你的应用能够用用户的母语与其沟通时,下载量和用户留存率的提升是显而易见的。 Flutter 的国际化体系基于 Dart 的 in


mongodb的基本命令
豆浆粉牛奶2026/1/12

大家好我是小帅,今天学习mongodb的简单认识和基本命令。 本章内容: 理解MongoDB的业务场景、熟悉MongoDB的简介、特点和体系结构、数据类型等。能够在Windows和Linux下安装和启动MongoDB、图形化管理界面Compass的安装使用掌握MongoDB基本常用命令实现数据的CRUD 掌握MongoDB的索引类型、索引管理、执行计划。使用Spring DataMongoDB完成文章评论业务的开发 文章目录 1. MongoDB认识1.1 业务场景1.2 结构体系


AI 有你想不到,也它有做不到 | 2025 年深度使用 Cursor/Trae/CodeX 所得十条经验
Piper蛋窝2026/1/4

去年的今天,我还在奋笔疾书地写着 VS Code + Roo Cline 的评测心得:个人评测 | Cursor 免费平替:Roo Cline + DeepSeek-v3/Gemini-2.0 + RepoPrompt AI 辅助编程 。当时的我没有想过:在 2025 年, Roo Cline 会被我迅速淘汰,我也成为了 Cursor 这类 Vibe Coding 工具的稳定用户之一。 站在 2026 年伊始的节点上,审视自己的工作流,发现已经完全被锚定在了如下工具链上: 对话工具: Chat


别再让 AI 直接写页面了:一种更稳的中后台开发方式
月亮有石头2025/12/26

本文讨论的不是 Demo 级别的 AI 编码体验,而是面向真实团队、长期维护的中后台工程实践。 AI 能写代码,但不意味着它适合直接“产出页面”。 最近一年,大模型在前端领域的讨论几乎都围绕一个问题: “能不能让 AI 直接把页面写出来?” 在真实的中后台项目中,我的答案是: 不但不稳,而且很危险。 这篇文章想分享一种我在真实项目中实践过、可长期使用、可规模化的方式: 不是让 AI 写页面,而是把 AI 纳入中后台前端的工程体系中。 把 AI 的不确定性关进了笼子里,用工程流程保证可控性

首页编辑器站点地图

本站内容在 CC BY-SA 4.0 协议下发布

Copyright © 2026 XYZ博客