协程的取消机制
取消协程需要协程内部配合,这点和线程一样,本质上也是协作式的取消,就是将状态设置为取消,协程内部根据状态的变化来响应。
完善 Job 的状态流转与取消通知
我们基于上一篇博客中的代码,来完善协程的取消逻辑。
首先支持协程取消回调的注册:
1// [AbstractCoroutine.kt] 2override fun invokeOnCancel(onCancel: OnCancel): Disposable { 3 // 1. 创建回调包装对象,以便后续可以手动解绑 4 val disposable = CancellationHandleDisposable(job = this@AbstractCoroutine, onCancel = onCancel) 5 // 2. 原子更新协程状态 6 val newState = state.fetchAndUpdate { prev -> 7 when (prev) { 8 is CoroutineState.Incomplete -> { 9 // 如果协程还在运行中,就追加回调 10 CoroutineState.Incomplete().from(state = prev).with(disposable = disposable) 11 } 12 13 is CoroutineState.Cancelling, 14 is CoroutineState.Complete<*> -> { 15 // 已取消或已完成,无需注册(无意义),保持当前状态 16 prev 17 } 18 } 19 } 20 21 // 3. 如果注册时正在取消,立即同步触发回调 22 (newState as? CoroutineState.Cancelling)?.let { 23 onCancel() 24 } 25 26 return disposable 27} 28 29// [CancellationHandleDisposable.kt] 30/** 31 * 可移除的取消回调 32 */ 33class CancellationHandleDisposable( 34 val job: Job, 35 val onCancel: OnCancel 36) : Disposable { 37 override fun dispose() { 38 // 从当前绑定的协程实例中移除自身 39 job.remove(disposable = this@CancellationHandleDisposable) 40 } 41} 42
接着是 cancel 函数的实现:
1// [AbstractCoroutine.kt] 2override fun cancel() { 3 // 1. 原子流转状态 4 val prevState = state.fetchAndUpdate { prev -> 5 when (prev) { 6 is CoroutineState.Cancelling, 7 is CoroutineState.Complete<*> -> { 8 // 保持不变,意味着重复调用 cancel 无任何副作用 9 prev 10 } 11 12 is CoroutineState.Incomplete -> { 13 // 流转为取消状态 14 CoroutineState.Cancelling().from(prev) 15 } 16 } 17 } 18 19 // 2. 只有在取消之前是未完成状态,才去通知注册的取消回调 20 if (prevState is CoroutineState.Incomplete) { 21 prevState.notifyCancellation() 22 } 23} 24 25// [CoroutineState.kt] 26/** 27 * 遍历并触发所有已注册的取消回调 28 */ 29fun notifyCancellation() { 30 this@CoroutineState.disposableList.loopOn<CancellationHandleDisposable> { 31 it.onCancel() 32 } 33} 34
注意,这里并不能这样实现:
1// 先更新后获取新状态 2val newState = state.updateAndFetch { 3 // ... 4} 5 6if (newState is CoroutineState.Cancelling) { 7 // 旧状态可能是 Incomplete 或 Cancelling 8 // 但我们只要从 Incomplete 转变为 Cancelling 的情况 9 prevState.notifyCancellation() 10} 11
因为 cancel 允许多次调用,在协程第一次调用 cancel 到结束之前,每次调用 cancel 得到的新状态都会是 Cancelling,这就会导致后续的 if 语句一定为 true,存在取消回调被多次重复通知的情况。
就算你在通知后将回调列表清空,也可能会出现这种情况:协程 A 通知后、清空前,协程 B 又进行了并发通知,还是无法避免,因为通知回调和清空回调就不是原子性的。
挂起函数响应取消的底层支撑:CancellableContinuation
怎么让挂起函数支持取消呢?其实我们之前提到过了,就是让它能够检查当前取消状态,并在取消时停止耗时任务。
参考标准库中的 suspendCoroutine 函数来实现 suspendCancellableCoroutine 函数:
1import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn 2import kotlin.coroutines.intrinsics.intercepted 3 4// [CancellableContinuation.kt] 5suspend inline fun <T> suspendCancellableCoroutine( 6 crossinline block: (CancellableContinuation<T>) -> Unit 7) = suspendCoroutineUninterceptedOrReturn { continuation: Continuation<T> -> 8 // 拦截原始的 continuation,并用 CancellableContinuation 包装以接管挂起与取消逻辑 9 val cancellable = CancellableContinuation(continuation.intercepted()) 10 block(cancellable) 11 cancellable.getResult() 12} 13
suspendCoroutineUninterceptedOrReturn我们早已见过,调用它能够获取到一个未被拦截(Unintercepted)的原始Continuation实例,在其 Lambda 中我们需要返回(OrReturn)一个值:如果结果已经准备好了(快路径),我们直接返回结果,否则(慢路径)返回一个特殊的编译器标记常量COROUTINE_SUSPENDED。
其中的 CancellableContinuation 应该能够注册取消回调(invokeOnCancellation)、能够监听取消状态。
首先来定义状态:
1// [CancellableContinuation.kt] 2/** 3 * 挂起点内部状态 4 */ 5sealed class CancelState { 6 object Incomplete : CancelState() 7 class CancelHandler(val onCancel: OnCancel) : CancelState() // 只允许注册一个取消回调 8 class Complete<T>( 9 val value: T? = null, 10 val exception: Throwable? = null 11 ) : CancelState() 12 13 object Cancelled : CancelState() 14} 15 16// 挂起决策 17enum class CancelDecision { 18 UNDECIDED, // 初始状态:还未决定 19 SUSPENDED, // 确定挂起 20 RESUMED // 确定已拿到结果 21} 22
CancellableContinuation 的实现只需静态代理 Continuation 即可:在完成(resumeWith)时流转状态,支持取消回调的注册。
1import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED 2 3// [CancellableContinuation.kt] 4@OptIn(ExperimentalAtomicApi::class) 5class CancellableContinuation<T>( 6 private val continuation: Continuation<T> 7) : Continuation<T> by continuation { 8 private val state = AtomicReference<CancelState>(CancelState.Incomplete) 9 private val decision = AtomicReference(CancelDecision.UNDECIDED) 10 11 // 是否完成 12 val isCompleted: Boolean 13 get() = when (state.load()) { 14 CancelState.Incomplete, 15 is CancelState.CancelHandler -> false 16 17 is CancelState.Complete<*>, 18 CancelState.Cancelled -> true 19 } 20 21 /** 22 * 注册取消回调 23 */ 24 fun invokeOnCancellation(onCancel: OnCancel) { 25 val newState = state.updateAndFetch { prev -> 26 when (prev) { 27 CancelState.Incomplete -> CancelState.CancelHandler(onCancel) 28 is CancelState.CancelHandler -> 29 throw IllegalStateException("Prohibited.") 30 31 is CancelState.Complete<*>, 32 CancelState.Cancelled -> prev 33 } 34 } 35 if (newState is CancelState.Cancelled) { 36 onCancel() 37 } 38 } 39 40 /** 41 * 将当前挂起点绑定到所在的协程上 42 */ 43 private fun installCancelHandler() { 44 if (isCompleted) return 45 // 获取当前所在协程 46 val currentJob = continuation.context[Job] ?: return 47 currentJob.invokeOnCancel { 48 doCancel() 49 } 50 } 51 52 private fun doCancel() { 53 val prevState = state.fetchAndUpdate { prev -> 54 when (prev) { 55 is CancelState.CancelHandler, 56 CancelState.Incomplete -> { 57 // 流转为取消状态 58 CancelState.Cancelled 59 } 60 61 CancelState.Cancelled, 62 is CancelState.Complete<*> -> { 63 prev 64 } 65 } 66 } 67 if (prevState is CancelState.CancelHandler) { 68 // 执行取消回调 69 prevState.onCancel() 70 resumeWithException(CancellationException("Cancelled.")) // 抛出异常响应取消 71 } 72 } 73 74 /** 75 * 决定是真正挂起,还是同步返回结果 76 */ 77 @Suppress("UNCHECKED_CAST") 78 fun getResult(): Any? { 79 // 此时才绑定,为的是在真正的挂起点注册取消回调,提升性能 80 installCancelHandler() 81 if (decision.compareAndSet(CancelDecision.UNDECIDED, CancelDecision.SUSPENDED)) 82 // 结果尚未就绪,返回挂起标志 83 return COROUTINE_SUSPENDED 84 85 // 此时没有真正挂起(decision 为 RESUMED),同步完成,可以获取结果 86 return when (val currentState = state.load()) { 87 is CancelState.CancelHandler, 88 CancelState.Incomplete -> COROUTINE_SUSPENDED 89 90 CancelState.Cancelled -> 91 throw CancellationException("Continuation is cancelled.") 92 93 is CancelState.Complete<*> -> { 94 (currentState as CancelState.Complete<T>).let { 95 it.exception?.let { e -> throw e } ?: it.value 96 } 97 } 98 } 99 } 100 101 /** 102 * 完成回调 103 */ 104 override fun resumeWith(result: Result<T>) { 105 when { 106 // 同步完成:抢先修改决策,将结果存入状态 107 decision.compareAndSet(CancelDecision.UNDECIDED, CancelDecision.RESUMED) -> { 108 state.store( 109 CancelState.Complete( 110 result.getOrNull(), 111 result.exceptionOrNull() 112 ) 113 ) 114 } 115 116 // 异步恢复:当前已挂起,直接唤醒底层的 continuation 117 decision.compareAndSet(CancelDecision.SUSPENDED, CancelDecision.RESUMED) -> { 118 state.updateAndFetch { prev -> 119 when (prev) { 120 is CancelState.Complete<*> -> { 121 throw IllegalStateException("Already completed.") 122 } 123 124 else -> { 125 CancelState.Complete( 126 result.getOrNull(), 127 result.exceptionOrNull() 128 ) 129 } 130 } 131 } 132 continuation.resumeWith(result) 133 } 134 } 135 } 136} 137
getResult() 和 resumeWith() 处理了协程底层的并发竞态问题,尝试挂起(getResult) 与任务完成尝试唤醒 (resumeWith),需要进行“赛跑”:
- 最常见的情况是慢路径(真正挂起),此时异步任务耗时较长,
getResult()会先执行,将decision置为SUSPEND,协程会交出线程的执行权进行挂起。当异步任务完成后,触发resumeWith,发现协程挂起,就会调用底层的continuation.resumeWith将其唤醒。 - 如果异步任务很快就完成,此时
getResult()还没来得及执行,resumeWith会将状态改为RESUME并将结果放在状态中,此时协程还没有挂起。接着getResult()会尝试挂起,发现此时已经得到了结果,就会取出结果同步返回,不会挂起和切线程。
为什么需要两种取消回调?
这两种取消回调的代码是不是重复了?其实没有,因为它们的作用对象和职责完全不同。
Job 的取消回调是全局的,它代表的是一整个协程的生命周期。当它取消时,会向外通知这个协程将要灭亡。它的关心对象包括子协程、调用了 join 等待它的其他协程,关心对象很多,所以使用的是回调列表。
而 CancellableContinuation 更为局部,它代表的仅是协程内部的某一个挂起点,当整个协程要“完蛋”时,它会完成资源的清理操作。关心的对象只有挂起函数的底层实现,因为一个挂起点同一时刻只能完成一件事,所以只允许注册一个回调。
改造具体挂起函数以支持取消
有了 CancellableContinuation 后,我们就可以让挂起函数感知到取消了。
改造 delay
首先是 delay,添加取消响应:
1suspend fun delay(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS) { 2 if (time <= 0) { 3 return 4 } 5 6 suspendCancellableCoroutine { continuation -> 7 val future = executor.schedule({ continuation.resume(Unit) }, time, unit) 8 continuation.invokeOnCancellation { 9 // 当所在协程被取消时,取消底层的 future 定时任务 10 future.cancel(true) 11 } 12 } 13} 14
改造 join
接着改造 join,响应取消时不再等待。
1private suspend fun joinSuspend() = suspendCancellableCoroutine { continuation -> 2 val disposable = doOnCompleted { _ -> 3 continuation.resume(value = Unit) 4 } 5 continuation.invokeOnCancellation { 6 // 所在协程被取消时,移除需要等待的目标协程注册的完成回调,防止在完成状态下,恢复执行抛出移除 7 disposable.dispose() 8 } 9} 10
如果 join 函数对应的协程已经完成,我们可以直接返回;但此时,我们可以检查一下当前所在协程的状态,如果已经取消,则抛出取消异常予以响应。
1override suspend fun join() { 2 when (state.load()) { 3 is CoroutineState.Incomplete, 4 is CoroutineState.Cancelling -> { 5 // 被等待的协程尚未完成,挂起等待 6 return joinSuspend() 7 } 8 9 is CoroutineState.Complete<*> -> { 10 // 目标协程已完成,但需主动检查当前所在协程的活跃状态以响应取消 11 val isActive = currentCoroutineContext()[Job]?.isActive ?: return 12 if (!isActive) { 13 // 此时一定是取消状态(而非完成) 14 throw CancellationException("Coroutine is already cancelled") 15 } 16 return 17 } 18 } 19} 20
为什么要在目标协程完成时,检查当前协程是否被取消?
因为如果同一时间外部取消了当前协程,而我们不做任何检查,那么当前协程会直接返回,接着执行后续的代码,无法响应取消。
所以即使当前协程没有挂起,但是在挂起函数中,就需要主动看看自己有没有被取消,来保证取消机制,官方也是这么做的,封装了一个扩展函数 Job.ensureActive()。
示例代码
1private suspend fun testLaunch() { 2 // launch 示例代码 3 val job = launch { 4 try { 5 println("1. 协程开始执行") 6 delay(1000) 7 println("3. 我不应该被打印,因为 delay 被取消了") 8 } catch (e: CancellationException) { 9 println("3. 捕获到取消异常:delay 过程被中断了!") 10 } 11 } 12 13 delay(500) 14 println("2. 外部触发取消") 15 job.cancel() 16} 17
可以看到:结果中并不会打印 1s 后的日志,在 500ms 取消协程时,delay 会先将线程回调解除,并抛出取消异常,结束当前协程。
1private suspend fun testJoin() { 2 // join 示例代码 3 // 协程 B:目标协程,执行一个耗时任务 4 val jobB = launch(CoroutineName("JobB")) { 5 println("JobB: 开始执行长任务...") 6 delay(5000) 7 println("JobB: 任务执行完毕!") 8 } 9 10 // 协程 A:所在协程,等待 B 完成 11 val jobA = launch(CoroutineName("JobA")) { 12 try { 13 println("JobA: 我正在等待 JobB 完成...") 14 jobB.join() 15 println("JobA: 终于等到 JobB 跑完了!") 16 } catch (e: CancellationException) { 17 println("JobA: 我被取消了,不再等 JobB 了,我要退出!") 18 } 19 } 20 21 delay(1000) 22 println("Main: 取消 JobA") 23 jobA.cancel() 24 25 delay(6000) 26} 27
JobA 在取消时,会先移除 JobB 的完成回调(无需等待),再抛出取消异常来退出。而 JobB 并不会受到任何影响,还是会打印“任务执行完毕”。
协程的异常处理机制
我们接着来讲清楚协程中的异常应该怎么处理:
- 在协程体内,无论是挂起函数还是普通函数抛出的异常,都可以通过 try...catch 来捕获。
- 对于未捕获的异常,则在获取结果时处理。
定义与简化异常处理器
为了统一拦截和处理第二种未捕获异常,我们来定义一个异常处理器。
1interface CoroutineExceptionHandler : CoroutineContext.Element { 2 companion object Key : CoroutineContext.Key<CoroutineExceptionHandler> 3 4 /** 5 * 处理异常 6 */ 7 fun handleException(context: CoroutineContext, exception: Throwable) 8} 9
然后创建一个函数,来简化它的创建:
1inline fun CoroutineExceptionHandler( 2 crossinline handler: (CoroutineContext, Throwable) -> Unit 3): CoroutineExceptionHandler { 4 return object : AbstractCoroutineContextElement(CoroutineExceptionHandler), CoroutineExceptionHandler { 5 override fun handleException(context: CoroutineContext, exception: Throwable) { 6 handler.invoke(context, exception) 7 } 8 } 9} 10
未捕获异常的分发与处理
我们在 AbstractCoroutine 定义一个 handleJobException 函数,返回 true 表示异常已处理。它的子类可以根据自身需要来实现特有的异常处理逻辑。
1/** 2 * 处理未捕获异常 3 */ 4protected open fun handleJobException(e: Throwable) = false 5
它的子类有两个,异常处理逻辑有些不同:
- StandaloneCoroutine: 由
launch启动,自身无返回结果。我们希望它在遇到未捕获的异常时,优先调用自身的异常处理器进行处理,如果没有进行配置,则将异常抛给completion调用时所在线程的uncaughtExceptionHandler来兜底。 - DeferredCoroutine: 由
async启动,通常有返回结果。由于它需要将结果返回给调用者,所以我们无需覆写该方法去主动处理异常,而是将未捕获的异常存放在最终状态中,等外部调用await获取结果时,才将异常抛出。
StandaloneCoroutine 的具体实现如下:
1// [StandaloneCoroutine.kt] 2override fun handleJobException(e: Throwable): Boolean { 3 super.handleJobException(e) 4 // 优先由自身的异常处理器处理,无配置则由线程的 uncaughtExceptionHandler 兜底 5 context[CoroutineExceptionHandler]?.handleException(context, e) 6 ?: Thread.currentThread().let { 7 it.uncaughtExceptionHandler.uncaughtException(it, e) 8 } 9 return true 10} 11
处理的逻辑完成后,在协程执行完成处进行异常的尝试分发:
1override fun resumeWith(result: Result<T>) { 2 // ... 3 4 // 若最终状态携带异常,进入异常分发流程 5 (newState as? CoroutineState.Complete<T>)?.exception?.let { e -> 6 tryHandleException(e = e) 7 } 8 9 newState.notifyCompletion(result) 10 newState.clear() 11} 12 13/** 14 * 尝试处理异常,分发前先做静默过滤 15 */ 16private fun tryHandleException(e: Throwable): Boolean { 17 return when (e) { 18 is CancellationException -> { 19 // 忽略正常的取消控制流,避免将其视为程序崩溃 20 false 21 } 22 23 else -> { 24 // 将常规未捕获异常交由子类处理 25 handleJobException(e) 26 } 27 } 28} 29
为什么要过滤掉
CancellationException?挂起函数取消时,会通过抛出取消异常来实现取消响应,它是正常的流程,类似于线程的中断异常。所以需要忽略,不能让它触发崩溃处理逻辑。而未捕获异常为什么要向上传递,这就涉及到了协程的作用域了,我将会在下一篇博客中详细讲解。
异常处理器的实战演示
最后我们来演示一下异常处理器是如何接受崩溃的:
1suspend fun main() { 2 val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable -> 3 println("CoroutineExceptionHandler got ${throwable.message}, from ${coroutineContext[CoroutineName]}") 4 } 5 6 launch(context = exceptionHandler + CoroutineName("main")) { 7 println("Started main coroutine") 8 throw ArithmeticException("Dive by zero") 9 println("Ended main coroutine") 10 }.join() 11 12 delay(time = 200L) 13} 14
因为我们之前有注入默认的调度器,协程完成时会在守护线程中执行,此时主线程可能会提前退出,所以加一个延迟保证异常处理器的执行。
结果输出为:
1Started main coroutine 2CoroutineExceptionHandler got Divide by zero, from main 3


