朱涛 · Kotlin 编程第一课
朱涛
Google 认证的 Kotlin、Android 开发者专家,博客“Kotlin Jetpack 实战”作者
6717 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 50 讲
朱涛 · Kotlin 编程第一课
15
15
1.0x
00:00/00:00
登录|注册

32 | 图解Flow:原来你是只纸老虎?

你好,我是朱涛。今天我们来研究 Flow 的源代码。
经过前面的学习,我们已经知道了,Channel 和 Flow 都是数据流,Channel 是“热”的,Flow 则是“冷”的。这里的冷,代表着 Flow 不仅是“冷淡”的,而且还是“懒惰”的。
除了“冷”这个特性以外,Flow 从 API 的角度分类,主要分为:构造器、中间操作符、终止操作符。今天这节课,我们将会从这几个角度来分析 Flow 的源码,来看看它的这几类 API 是如何实现的。
经过这节课的学习,你会发现:虽然 Flow 的功能看起来非常高大上,然而它的原理却非常的简单,是一只名副其实的“纸老虎”。

Flow 为什么是冷的?

在正式开始研究 Flow 源代码之前,我们首先需要确定研究的对象。这里,我写了一段 Demo 代码,接下来我们就以这个 Demo 为例,来分析 Flow 的整个执行流程:
// 代码段1
fun main() {
val scope = CoroutineScope(Job())
scope.launch {
testFlow()
}
Thread.sleep(1000L)
logX("end")
}
private suspend fun testFlow() {
// 1
flow {
emit(1)
emit(2)
emit(3)
emit(4)
emit(5)
}.collect { // 2
logX(it)
}
}
/**
* 控制台输出带协程信息的log
*/
fun logX(any: Any?) {
println(
"""
================================
$any
Thread:${Thread.currentThread().name}
================================""".trimIndent()
)
}
/*
输出结果
================================
1
Thread:DefaultDispatcher-worker-1
================================
================================
2
Thread:DefaultDispatcher-worker-1
================================
================================
3
Thread:DefaultDispatcher-worker-1
================================
================================
4
Thread:DefaultDispatcher-worker-1
================================
================================
5
Thread:DefaultDispatcher-worker-1
================================
================================
end
Thread:main
================================
*/
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

本文深入分析了Kotlin协程中的Flow概念,重点探讨了Flow的特性和工作原理。文章首先介绍了Flow的“冷”特性,即Flow对象只有在调用collect()方法时才会触发执行其内部的逻辑。通过对Flow构造器、中间操作符和终止操作符的分析,揭示了Flow的实现原理。作者通过示例代码演示了Flow的创建和执行流程,以及上游数据如何传递给下游。通过对Flow源码的解析,读者可以深入了解Flow的内部工作机制,从而更好地理解和应用Kotlin协程中的Flow概念。文章还对FlowCollector的作用进行了详细解释,阐述了其在上游与下游之间的桥梁作用。此外,文章还探讨了上下文保护的重要性,阐述了Kotlin官方为何不允许直接使用withContext{},以及如何通过flowOn{}来处理上下文问题。整体而言,本文通过深入的技术分析,让读者对Kotlin协程中的Flow概念有了更清晰的认识,为进一步的应用提供了有力的支持。 文章通过分析Flow的源码,深入探讨了其API的实现原理,解释了Flow的冷数据流特性以及上游Flow构造器、中间操作符、下游FlowCollector之间的关系。同时,强调了Flow的设计基于挂起函数和高阶函数,使其API看似简单,实际上建立在强大的基础之上。读者通过本文可以全面了解Flow的内部工作机制,以及如何利用Flow实现结构化并发。整体而言,本文为读者提供了深入理解Kotlin协程中Flow概念的重要参考,为其进一步应用提供了有力支持。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《朱涛 · Kotlin 编程第一课》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(15)

  • 最新
  • 精选
  • Paul Shan
    Flow 接口引用了FlowCollector接口,并封装了一段调用逻辑,作为将来FlowCollector使用的来源。FlowCollector,带有emit函数的接口,统一了上游的发送方的数据输出和下游接收方的数据输入。FlowCollector的做法和通常扩展函数不太一样,通常的扩展函数是先有核心类,然后扩展函数扩充核心类的功能。FlowCollector是先在上游的构造器里构建了高阶的扩展函数,然后在下游collect里实现了带有emit的核心类。下游collect触发流程,然后上游的emit驱动下游的emit。这么设计原因应该是上游的构造器,相对复杂,而且是推迟执行的,需要给开发人员以足够的灵活性,所以采用了扩展函数的格式,下游接受数据相对固定,而且是同步执行的,采用固定的FlowCollector接口。

    作者回复: 不错的答案,赞~

    2022-04-06
    10
  • 大土豆
    老师,下节课的目录得改下。。。应该是Android开发者还有未来吗?市场基本都没需求了

    作者回复: 哈哈哈……Android开发的领域整体确实趋向于饱和了,不过中、高级的Android开发者还是比较稀缺的。

    2022-04-06
    3
  • zyaire
    老师,“Flow 上游与下游的协程上下文就会不一致,它们整体的结构也会被破坏,从而导致“结构化并发”的特性也被破坏。”这句话不是很能理解,以代码11来说,即使在flow中调用withContext切换了上下文,当外部协程取消时,不也是会响应取消操作吗

    作者回复: 如果你在flow当中,通过withContext()改变了它其中的Job层级,Flow就无法正常响应取消了。当然,withContext{}也可能只切换线程池,并不修改协程的父子结构,但Kotlin官方目前的做法是干脆全都禁止,想要切线程,就用统一的flowOn{}。否则,flow源码当中的判断条件会更加的复杂,这是Kotlin官方不愿意看见的。

    2022-04-11
    2
    1
  • 再前进一点
    同问:为啥transform{}这方法在IDE里点击跳转到的源码是unsafeTransform这个方法呢

    作者回复: 请留意它的import,Kotlin源码中经常这么用: ``` import kotlinx.coroutines.flow.unsafeTransform as transform ``` 类似的,还有:unsafeFlow as flow。

    2022-04-08
    1
  • dawn
    @PublishedApi internal inline fun <T, R> Flow<T>.unsafeTransform( @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit ): Flow<R> = unsafeFlow { // Note: unsafe flow is used here, because unsafeTransform is only for internal use 这里的作用域应该是FlowCollector,为什么可以调用collect函数 collect { value -> // kludge, without it Unit will be returned and TCE won't kick in, KT-28938 return@collect transform(value) } } @PublishedApi internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> { return object : Flow<T> { override suspend fun collect(collector: FlowCollector<T>) { collector.block() } } }

    作者回复: 因为它的扩展接收者是Flow呀,说实话,这部分的API挺巧妙的。

    2022-04-08
    3
    1
  • 神佑小鹿
    // 2 internal inline fun <T, R> Flow<T>.unsafeTransform( crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit ): Flow<R> = unsafeFlow { // 6 collect { value -> // 7 return@collect transform(value) } } 老师,可以解答下么: 1、transform 的接受者是哪个实例呢?是 flow {}.filter {}.collect {} 中终止操作符号 collect 传入的 FlowCollector 还是注释 6 处 collect 传入的 FlowCollector 实例呢? 按照逻辑的话,应该是终止操作符号 collect {} 传入的 FlowCollector。 但是看逻辑,注释 6 处的 collect 的参数也是个 FlowCollector 实例,那么 transform 的接受者应该是它??

    作者回复: 答案:是 flow {}.filter {}.collect {} 中终止操作符号 collect 传入的 FlowCollector。 这里用文字描述可能比较费劲,我试着解释一下: flow {}.filter {}.collect {}当中的 collect{} ,它传入的FlowCollector,是通过下面注释处,跟transform建立关系的。 ``` internal inline fun <T> unsafeFlow( crossinline block: suspend FlowCollector<T>.() -> Unit ): Flow<T> { return object : Flow<T> { override suspend fun collect(collector: FlowCollector<T>) { // 注意这里 collector.block() } } } ```

    2022-04-26
  • 神佑小鹿
    // 2 internal inline fun <T, R> Flow<T>.unsafeTransform ( crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit ): Flow<R> = unsafeFlow { // 6 collect { value -> // 7 return@collect transform (value) } }

    作者回复: 在你另一个回复里解答了哈~

    2022-04-26
  • 神佑小鹿
    // 代码段8 // 1 inline fun <T> Flow<T>.filter( crossinline predicate: suspend (T) -> Boolean ): Flow<T> = transform { value -> // 8 if (predicate(value)) return@transform emit(value) } // 2 internal inline fun <T, R> Flow<T>.unsafeTransform( crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit ): Flow<R> = unsafeFlow { // 6 collect { value -> // 7 return@collect transform(value) } } // 3 internal inline fun <T> unsafeFlow( crossinline block: suspend FlowCollector<T>.() -> Unit ): Flow<T> { // 4 return object : Flow<T> { // 5 override suspend fun collect(collector: FlowCollector<T>) { collector.block() } } } 老师好,想在问一个问题: unsafeTransform 中的 collect 参数是一个 FlowCollector 匿名内部类实例 那 return@collect transform(value) 中的 transform 的接收者是这个 FlowCollector 匿名内部类实例 还是 flow{}.filter{}.collect{} 中,终止操作符传入的 FlowCollector 匿名内部类实例呢??

    作者回复: 答案是:是 flow{}.filter{}.collect{} 中,终止操作符传入的 FlowCollector 匿名内部类实例。 我在你另一个评论里解答了哈~

    2022-04-26
  • 神佑小鹿
    老师好,对于 Flow 的 fliter 的源码,确实没看懂,可以详细讲下么?? 直接看源码不直观,反编译看 java 有很凌乱 // unsafeTransform // 2 inline fun <T> Flow<T>.unsafeTransform( crossinline transform: suspend FlowCollector<T>.(value: T) -> Unit // FlowCollector.transform ): Flow<T> { return unsafeFlow { // 这里的作用域应该是 FlowCollector,为什么可以调用 collect 函数 collect(object : FlowCollector<T> { override suspend fun emit(value: T) { // 7 transform(value) } }) // 6 // collect { value -> // // 7 // return@collect transform(value) // } } }

    作者回复: 请留意你注释2处的,扩展接收者类型,它也是Flow。

    2022-04-24
  • 神佑小鹿
    难道说是因为 unsafeFlow 创建的是一个匿名内部类的实例,匿名内部类的实例是持有外部对象 SafeFlow 的引用???

    作者回复: 是的

    2022-04-23
    2
收起评论
显示
设置
留言
15
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部