文章目录
-
- 一、数据流简介
-
- 1.1 Kotlin 数据流概述
- 1.2 核心特性
- 1.3 Flow 的基本组件
- 1.1 Kotlin 数据流概述
- 二、数据流的使用方法
-
- 2.1 正向流
-
- 2.1.1 创建数据流
* 2.1.2 修改数据流
* 2.1.3 收集数据流
- 2.1.1 创建数据流
- 2.2 反向流
-
- 2.2.1 创建数据流
* 2.2.2 修改并收集数据流
- 2.2.1 创建数据流
- 2.3 数据流在 Jetpack 中的应用场景
-
- 2.1 正向流
- 三、数据流的行为模式
-
- 3.1 冷流(Cold Flow)
-
- 3.1.1 什么是冷流
* 3.1.2 为什么 Kotlin 默认使用冷流
- 3.1.1 什么是冷流
- 3.2 热流(Hot Flow)
-
- 3.2.1 什么是热流
* 3.2.2 Kotlin 如何支持热流
- 3.2.1 什么是热流
- 3.3 使用哪种数据流
-
- 3.3.1 如何选择
* 3.3.2 冷流转热流:shareIn 与 stateIn
- 3.3.1 如何选择
-
- 3.1 冷流(Cold Flow)
- 参考资料
一、数据流简介
1.1 Kotlin 数据流概述
在 Kotlin 协程中,普通挂起函数只能返回单个值,调用一次即结束。但实际开发中往往需要处理持续产生的多值序列——如实时数据、用户输入、WebSocket 消息等。Kotlin 数据流(Flow)正是为此而生。
Flow 是基于协程的异步流处理 API,支持按顺序、异步地发射多个同类型值。其核心价值在于:用声明式、近乎同步的代码风格处理异步事件序列,同时享受协程的结构化并发与资源安全优势。
它类似于 RxJava 的
Observable,但完全基于协程设计,可与协程无缝集成。
1.2 核心特性
- 声明式 & 可组合:通过
map、filter、zip等链式操作,以近乎同步的代码风格处理异步事件序列。 - 异步非阻塞:
emit与collect均为挂起函数,可与协程无缝组合,后台计算不阻塞主线程。 - 结构化并发:收集在协程作用域内运行,作用域取消时(如界面关闭)自动停止,避免内存泄漏。
- 背压支持:生产者过快时,可通过缓冲、
conflate等机制处理速度不匹配,而回调无法做到。 - 冷流(Cold Stream):默认模式,仅在
collect时执行,每次收集都从头发射。详见 3.1 冷流。
1.3 Flow 的基本组件
Flow 的基本组件有:
- Flow:数据流接口,通过
flow { ... }构建器定义。 - 发射器(Emitter):在构建器内调用
emit()发射数据。 - 收集器(Collector):通过
collect()接收数据,持续消费直到流结束或协程取消。
发射与收集均为挂起函数,以异步方式生成和消费值。示例:
1import kotlinx.coroutines.* 2import kotlinx.coroutines.flow.* 3 4// 使用 Flow 构建器创建冷数据流 5fun simpleFlow(): Flow<Int> = flow { 6 for (i in 1..3) { 7 delay(100) // 模拟异步操作 8 emit(i) // 发射数据 9 } 10} 11 12fun main() = runBlocking { 13 14 // 收集数据 15 simpleFlow().collect { value -> 16 println(value) // 依次打印 1, 2, 3 17 } 18} 19
二、数据流的使用方法
数据流包含三个角色:
- 提供方:生成数据,可借助协程异步生产。
- 中介(可选):修改流中的值或流本身。
- 使用方:消费流中的值。

(图源:Android 官方文档)
在 Android 中常见两种流向:
- 正向流:数据层(
Repository)→ 领域层/ViewModel(map转换)→ 界面层(收集并更新 UI)。 - 反向流:界面层(用户输入)→ ViewModel(响应事件流)→ 更新界面状态。
2.1 正向流
2.1.1 创建数据流
使用 flow { } 构建器创建流,在内部通过 emit 发射值。下例中,数据源以固定间隔轮询资讯,作为提供方:
1class NewsRemoteDataSource( 2 private val newsApi: NewsApi, 3 private val refreshIntervalMs: Long = 5000 4) { 5 val latestNews: Flow<List<ArticleHeadline>> = flow { 6 while(true) { 7 val latestNews = newsApi.fetchLatestNews() 8 emit(latestNews) // Emits the result of the request to the flow 9 delay(refreshIntervalMs) // Suspends the coroutine for some time 10 } 11 } 12} 13 14// Interface that provides a way to make network requests with suspend functions 15interface NewsApi { 16 suspend fun fetchLatestNews(): List<ArticleHeadline> 17} 18
注意:flow 在协程内执行,发射有序(挂起函数返回后才 emit 下一个值)。不可在 withContext 或新协程中调用 emit,否则需用 callbackFlow。
2.1.2 修改数据流
中介通过中间运算符(如 map、filter)链式修改流,这些操作是惰性的,仅在收集时执行。Repository 示例:
1class NewsRepository( 2 private val newsRemoteDataSource: NewsRemoteDataSource, 3 private val userData: UserData 4) { 5 /** 6 * Returns the favorite latest news applying transformations on the flow. 7 * These operations are lazy and don't trigger the flow. They just transform 8 * the current value emitted by the flow at that point in time. 9 */ 10 val favoriteLatestNews: Flow<List<ArticleHeadline>> = 11 newsRemoteDataSource.latestNews 12 // Intermediate operation to filter the list of favorite topics 13 .map { news -> news.filter { userData.isFavoriteTopic(it) } } 14 // Intermediate operation to save the latest news in the cache 15 .onEach { news -> saveInCache(news) } 16} 17
2.1.3 收集数据流
终端运算符 collect 触发流执行并消费所有发射值。ViewModel 示例:
1class LatestNewsViewModel( 2 private val newsRepository: NewsRepository 3) : ViewModel() { 4 5 init { 6 viewModelScope.launch { 7 // Trigger the flow and consume its elements using collect 8 newsRepository.favoriteLatestNews.collect { favoriteNews -> 9 // Update View with the latest favorite news 10 } 11 } 12 } 13} 14
收集数据流会触发提供方刷新最新资讯,并以固定时间间隔发出网络请求。由于提供方始终通过 while(true) 循环保持活跃状态,因此,在清除 ViewModel 并取消 viewModelScope 数据流后,数据流将关闭。
2.2 反向流
在反向流中,界面层是提供方。使用 callbackFlow 将回调式 API(如 TextWatcher)转为 Flow,并通过 awaitClose 在流取消时清理监听:
2.2.1 创建数据流
1// --- 界面层(作为 提供方)--- 2// 创建一个表示搜索框输入的 Flow 3val searchFlow: Flow<String> = callbackFlow { 4 val watcher = object : TextWatcher { 5 override fun onTextChanged(text: String) { 6 trySend(text) // 将每次输入作为一个事件“发射”到流中 7 } 8 } 9 editText.addTextChangedListener(watcher) 10 11 // 当流被取消时(如界面销毁),移除监听,避免内存泄漏 12 awaitClose { editText.removeTextChangedListener(watcher) } 13} 14 15// 将这个流“暴露”给 ViewModel 16viewModel.bindSearchFlow(searchFlow) 17
2.2.2 修改并收集数据流
ViewModel 对输入流做防抖、过滤、去重,再触发搜索并收集结果:
1// --- ViewModel层(作为 中介/使用方)--- 2fun bindSearchFlow(searchFlow: Flow<String>) { 3 viewModelScope.launch { 4 searchFlow 5 .debounce(300) // 中介操作1:防抖,等用户停止输入300ms 6 .filter { it.length >= 2 } // 中介操作2:过滤,只搜索至少2个字符 7 .distinctUntilChanged() // 中介操作3:去重,避免连续相同搜索 8 .flatMapLatest { query -> // 中介操作4:切换到最新搜索,取消之前的 9 repository.search(query) // 触发真正的搜索(返回一个 Flow<Result>) 10 } 11 .collect { result -> // 使用方:消费最终的搜索结果 12 _searchResults.value = result // 更新 LiveData/StateFlow 13 } 14 } 15} 16
2.3 数据流在 Jetpack 中的应用场景
Jetpack 已广泛支持 Flow。以 Room 为例,DAO 返回 Flow 即可监听数据库变更:
1@Dao 2abstract class ExampleDao { 3 @Query("SELECT * FROM Example") 4 abstract fun getExamples(): Flow<List<Example>> 5} 6
表数据变更时,Flow 自动发射最新列表。
三、数据流的行为模式
冷流像点播:你点才开始播,每人从头看。热流像直播:不管你看不看都在播,中途进入只能从当前看起。
3.1 冷流(Cold Flow)
3.1.1 什么是冷流
冷流类似于私人定制,具备如下特征:
- 按需启动:仅当使用方调用终端操作符(如
collect)时,提供方(flow构建器内的代码)才会开始执行。 - 独立副本:每次调用
collect,都会获得完整、从头开始的值序列。 - 无共享状态:不同收集者互不共享进度。
适用场景:网络请求、数据库查询等需独立数据源,或每个订阅者需从头消费完整数据。
3.1.2 为什么 Kotlin 默认使用冷流
Kotlin Flow 默认使用冷流,这主要是为了安全和可预测:
- 结构化并发友好:冷流的生命周期与收集它的协程完全绑定。协程取消,流的生产就停止。这完美契合了协程的结构化并发理念,避免了资源泄漏。
- 幂等性:每次收集都获得一份完整的、独立的副本,行为可预测。这对于网络请求、数据库查询、文件读取等场景至关重要。你希望每次查询都获取最新数据,而不是错过已经开始的数据。
- 资源管理简单:由于是独立的,不需要复杂的“多播”或“连接”管理。
3.2 热流(Hot Flow)
3.2.1 什么是热流
热流类似于直播,具备如下特征:
- 主动发射:不管有无订阅者,数据都会产生。
- 共享数据源:多订阅者共享同一份数据流,但只能收到订阅后的数据。
适用场景:IM 消息、定位更新、全局状态等需多订阅者共享的实时数据。
3.2.2 Kotlin 如何支持热流
Kotlin 通过 StateFlow 与 SharedFlow 提供热流:
| 特性 | StateFlow | SharedFlow |
|---|---|---|
| 设计语义 | 状态容器 | 事件流 |
| 初始值 | 必须提供 | 可选,通过 replay 控制 |
| 重放机制 | 固定重放 1 个值 | 可配置 replay = 0, 1, ... |
| 去重策略 | 自动去重(equals 相同则不发射) | 不去重,每次 emit 都触发 |
| 当前值访问 | 有 value 属性 | 无,需通过缓存或收集获取 |
| 背压处理 | 固定为 Conflated(只取最新) | 可配置:SUSPEND、DROP_OLDEST、DROP_LATEST |
| 典型场景 | UI 状态 | 一次性事件(Toast、导航)、实时消息流 |
(1) SharedFlow
举个例子,您可以使用 SharedFlow向应用程序的其他部分发送“滴答”信号,以便所有内容能够定期、同时刷新。
1// 集中管理应用程序内容何时需要刷新的类 2class TickHandler( 3 private val externalScope: CoroutineScope, 4 private val tickIntervalMs: Long = 5000 5) { 6 // 后备属性,避免从其他类发出数据流 7 private val _tickFlow = MutableSharedFlow<Unit>(replay = 0) 8 val tickFlow: SharedFlow<Unit> = _tickFlow 9 10 init { 11 externalScope.launch { 12 while(true) { 13 _tickFlow.emit(Unit) // 发出信号 14 delay(tickIntervalMs) // 延迟指定间隔 15 } 16 } 17 } 18} 19 20class NewsRepository( 21 ..., 22 private val tickHandler: TickHandler, 23 private val externalScope: CoroutineScope 24) { 25 init { 26 externalScope.launch { 27 // 监听滴答信号更新 28 tickHandler.tickFlow.collect { 29 refreshLatestNews() // 收集到信号时,刷新新闻 30 } 31 } 32 } 33 34 suspend fun refreshLatestNews() { ... } 35 ... 36} 37
(2) StateFlow
StateFlow 是一个特殊的 SharedFlow,专门为管理“状态”而设计。其内部维护了一个可变的状态,并在状态发生变化时通知所有观察者。
其典型案例是MVVM中保存界面的状态的 UiState。
1class LatestNewsViewModel( 2 private val newsRepository: NewsRepository 3) : ViewModel() { 4 5 // Backing property to avoid state updates from other classes 6 private val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList())) 7 // The UI collects from this StateFlow to get its state updates 8 val uiState: StateFlow<LatestNewsUiState> = _uiState 9 10 init { 11 viewModelScope.launch { 12 newsRepository.favoriteLatestNews 13 // Update View with the latest favorite news 14 // Writes to the value property of MutableStateFlow, 15 // adding a new element to the flow and updating all 16 // of its collectors 17 .collect { favoriteNews -> 18 _uiState.value = LatestNewsUiState.Success(favoriteNews) 19 } 20 } 21 } 22} 23 24// Represents different states for the LatestNews screen 25sealed class LatestNewsUiState { 26 data class Success(val news: List<ArticleHeadline>): LatestNewsUiState() 27 data class Error(val exception: Throwable): LatestNewsUiState() 28} 29
3.3 使用哪种数据流
3.3.1 如何选择
- 冷流:数据需按需计算、一次性、独立。
- 热流:数据需多观察者共享、长期存活、活跃状态或事件。
3.3.2 冷流转热流:shareIn 与 stateIn
在 Android 开发中,一个常见的模式是:数据层用冷流(Flow)暴露数据。对于需要在多个观察者间共享的数据(特别是昂贵的查询结果),可以在数据层使用 shareIn转换为热流 SharedFlow以优化性能。对于 UI 状态,则在 ViewModel 层使用 stateIn(或直接管理 MutableStateFlow)将其转换为热流 StateFlow,供 UI 安全地观察,并与 ViewModel 的生命周期绑定。
举个例子,一个从远程(如 Firestore)或本地获取数据的 Flow是冷流。如果多个 ViewModel 或屏幕都需要这个数据,使用 shareIn可以确保数据源只执行一次,然后将结果广播给所有订阅者,避免了重复的网络请求或数据库查询,从而显著提升应用性能和效率。
1// 在 DataSource/Repository 层 2class NewsRepository( 3 private val newsApi: NewsApi, 4 private val externalScope: CoroutineScope // 通常是 applicationScope 5) { 6 val topHeadlines: Flow<List<Article>> = flow { 7 emit(newsApi.fetchTopHeadlines()) // 网络请求 8 }.shareIn( 9 scope = externalScope, 10 started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 5000), // 有订阅者时才启动,无人订阅5秒后停止 11 replay = 1 // 新订阅者立即得到最新缓存 12 ) 13} 14
参考资料
Android 上的 Kotlin 数据流 | Android Developers
《【Kotlin】 数据流完全指南:冷流、热流与 Android 实战》 是转载文章,点击查看原文。