Kotlin Coroutine 的 CPS 变换
04 Jul 2020 by fleuria
一般 coroutine 的实现大多基于某种运行时的 yield 原语,然而 kotlin 并没有这种下层支持,在没有 yield 指令支持的 jvm 上仍基于 CPS 变换实现出了 coroutine,其实很厉害,想了解一下是怎样做到的。
CPS Transformation
CPS 是 Continuation Passing Style 的缩写,大致上是将原本长这样的代码:
function auth(token, resourceName) {
let userId = loginUser(token)
let ok = checkPermission(userId, resourceName)
return ok ? 'success': 'failed'
}
转换为这个画风:
function auth(token, resourceName, callback) {
loginUser(token, function(userId) {
checkPermission(userId, resourceName, function(ok) {
callback(ok ? 'success': 'failed')
})
})
}
也就是俗称的「callback hell」,这种风格一度曾在 node js 中发扬光大了一把。大致上是将本来函数该返回的值通过参数中的回调函数进行返回。其中「continuation」 词义较晦涩,实际上它大约就是「callback」的同义词。
流程上大致是:
- 调用异步方法,注册回调函数到 io loop;
- io loop 确认 io 事件(如收到服务端响应)后,回调注册在 io loop 中的回调函数,在参数中传递异步返回的结果,如果有异常,也同样在回调函数的参数中;
- 回调函数继续调别的回调函数,调用到新的异步方法时注册新的回调函数到 io loop。
真正驱动整个控制流的是 io loop,即调度器。
回调函数的问题是缺少结构化,每一次异步调用,都必然涉及到异常处理流程,这代码就没法看了。Future / Promise 在结构化方面能稍微进步一些,对 onSuccess(result) 与 onError(err) 进行标准化的包装,这一来代码结构方面能相对漂亮一点:
function Promise auth(token, resourceName) {
loginUser(token)
.then(userId => checkPermission(userId, resourceName))
.then(ok => ok ? 'success': 'failed')
.catch(err => console.log(err))
})
}
到这里朝 async / await 式语法的演进还差一步,便是 yield。与回调函数风格异步方法的不同在于,async / await 模式下将函数上次执行的点位,作为回调入口登记到 io loop。io loop 调度器发现有收到响应,便找出对应的函数,按 resume(result) 推动后续的执行。
但是 jvm 明显没有 yield 支持,那么 kotlin 是怎样做到的?
CPS in kotlin
回来看 kotlin。异步方法在 kotlin 中增加 suspend 关键字做标记。
suspend fun auth(token: String, resourceName: String): String {
val userId = loginUser(token) // suspending fun
val ok = checkPermission(userId, resourceName) // suspending fun
return ok ? 'success': 'failed'
}
在编译时,大约会将 suspend 方法做 CPS 转换,增加一个 cont 参数,用于回调返回值或者异常:
suspend fun auth(token: String, resourceName: String, cont: Continuation<Any?>) {
...
}
其中的 Continuation 便是个类似 Promise/Future 的接口:
public interface Continuation<in T> {
public val context: CoroutineContext
public fun resume(value: T)
public fun resumeWithException(exception: Throwable)
}
其中 context 表示协程上下文,这里先略过。resume 与 resumeWithException 与 js 中 Promise 的 then() 与 catch() 完全一致。到这里缺少的就是 yield 的替代方案了。
StateMachine
替代 yield 的方案便是 StateMachine,它扮演两个功能:1. 暂停并记住执行点位;2. 记住函数暂停时刻的局部变量上下文。
每个 suspend 函数都会生成出一个内部类 StateMachine,用于保存函数的局部变量与执行点位。
怎样做到记录执行点位的?
首先所有的暂停点位,都必然是调用 suspend 函数的点位。kotlin 这里的做法很有意思,使用了一个 switch case,每个 suspend 执行的位置对应一个 label,下次恢复执行时,按递增的 label 找到下一个执行位置,大约的伪代码会长这样:
suspend fun auth(token: String, resourceName: String, cont: Continuation<Any?>) {
val sm = cont as? AuthSM ?: AuthSM(cont)
when (sm.label) {
0 -> {
sm.cont = cont
sm.label = 1
loginUser(token, sm)
return
}
1 -> {
throwOnFailure(sm.result)
sm.userId = sm.result as Int
sm.label = 2
checkPermission(sm.userId, sm)
return
}
2 -> {
throwOnFailure(sm.result)
sm.ok = sm.result as boolean
sm.cont.resume(sm.ok ? "success": "failed")
}
else -> throw IllegalStateException(...)
}
}
每个 StateMachine 对象也是 Continuation 接口的实现。每当 suspend 函数执行到 suspend 点位时,实际上会退出执行,函数的执行上下文会完整记录在 StateMachine 中,调用的异步方法有响应时,会回调 StateMachine 的 resume 方法,而 resume 方法的执行,相当于使用记录在 StateMachine 中的上下文作为参数,再执行一次该函数。
画图整理一下大致的流程:
总结
- kotlin 的协程支持大约是 CPS 变换和 StateMachine 两部分,CPS 变换使同步代码异步化,增加额外的 Continuation 类型的参数,用于函数结果值的返回;
- 通过 switch / case 配合 label,做到执行点位的记录与暂停;
- 每个 suspend 方法都会生成一个内部的 StateMachine 类,StateMachine 类中包含函数的所有局部变量,以及暂定点的返回值与异常值,扮演了 yield 语句的功能,即暂停函数的执行,为此需要记录下函数当前的局部变量上下文与执行点位。
- 被暂停函数的恢复执行,实现上等于将函数的局部变量上下文与点位作为参数,重新调用一次这个函数。