Kotlin 学习笔记(七)—— Flow 数据流学习实践指北(三)冷流转热流以及代码实例(kotlin从入门到精通)
 南窗  分类:IT技术  人气:126  回帖:0  发布于1年前 收藏

“最近好像没啥热点,还是说太忙了没空摸鱼看新闻了?人大又要召开了,真心希望这一届的委员们能够提一些靠谱的提案,也不枉我上周网购的商品这周还没到北京了···

前一节(Kotlin 学习笔记(六)—— Flow 数据流学习实践指北(二)StateFlow 与 SharedFlow)介绍完了两种热流的构造方法以及它们的特点,那有没有方法可以将冷流转化为热流呢?当然是有的。那为什么需要将冷流转化为热流呢?

假如有这么一个场景:一开始有一个冷流 coldFlow 和它对应的消费者,后来下游又有几个新来的消费者需要使用这个 coldFlow,并且还需要之前已发送过的数据。而冷流的生产者与消费者是一对一的关系,且没有 replay 缓存机制,为新的消费者再创建一个冷流开销较大,这种情况下将冷流转为热流就显得事半功倍了。

1. shareIn 操作符

Flow 中的 shareIn 操作符就可以将冷流转为热流,它的方法声明是:

// code 1
public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T>

首先看返回值,最终确实会转化为一个热流 SharedFlow 实例。方法参数先来看最简单的 replay 参数,就是设置回播到每个新增消费者的数据个数,默认为 0。所以默认情况下,新增的消费者只能收到从它开始收集的时间点之后,生产者发送的数据。

再来看第一个 scope 参数,用于设置一个 CoroutineScope 作用域,注意其生命周期的长度需要比任何消费者都要长,保证被转化成的热流能在所有消费者收集数据进行消费时,都能处于活跃状态。新被转化的热流其实就是一个共享数据流,可以被所有的消费者共享使用。

第二个参数 started 复杂一些,它是用于设置被转化为共享数据流的启动方式,官方提供有 3 种方式,下面一个个说:

SharingStarted.Eagerly 勤快式启动方式。不等第一个消费者出现就会立即启动,需要注意的是,这种方式只会保留启动时数据流发送的前 replay 个数据,再之前的数据会立即丢弃。即不对数据流缓存区以外的数据负责,所以 replay 缓存区大小设置很重要。

SharingStarted.Lazily 懒汉式启动方式。需要等第一个消费者出现才会启动,第一个消费者可以接收到数据流所有发送的数据;但其他后面的消费者只能接收到最近的 replay 个数据。这种方式启动的数据流会一直保持活跃状态,甚至所有的的消费者都退出观察不再接收了,数据流仍然会缓存最近的 replay 个数据。

SharingStarted.WhileSubscribed() 灵活式启动方式。默认情况下就是有消费者来它就立即启动,没消费者接收了它就立即停止。所以在第一个消费者出现数据流就启动,当最后一个消费者退出它就立即停止,但它仍会永久缓存最近的 replay 个数据。此外,这种启动方式还可以根据需求自定义设置参数:

// code 2
public fun WhileSubscribed(
    stopTimeoutMillis: Long = 0,
    replayExpirationMillis: Long = Long.MAX_VALUE
): SharingStarted =
    StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)

stopTimeoutMillis:设置最后一个消费者退出后,多长时间后再关闭数据流。默认是 0,即立即关闭。replayExpirationMillis:设置关闭流之后等待多长时间后,再重置清空缓存区 replay cache 的数据。默认是 Long.MAX_VALUE,即永远保存。

自定义 SharingStarted 其实还可以自定义启动方式,自己实现 SharingStarted 接口即可。如果看了前三种启动方式的源码,不难会发现,其实启动方式都是使用固定的几个 SharingCommand 实现的。SharingCommand 有三种:

// code 3
public enum class SharingCommand {
    /**
     * 开始启动,并开始收集上游数据流.
     * 多次发送这个命令并没有什么用(支持防抖),如果先发送 STOP 再发送 START 则是重启一个上游数据流。
     */
    START,

    /**
     * 停止数据流, 取消上游数据流的收集所在协程。
     */
    STOP,

    /**
     * 停止数据流, 取消上游数据流的收集所在协程。并且将 replayCache 缓冲区的值重置为初始状态。
     * 如果是 shareIn 操作符,则会调用 [MutableSharedFlow.resetReplayCache] 方法;
     * 如果是 stateIn 操作符,则会将缓冲数据重置为最初设置的初始值.
     */
    STOP_AND_RESET_REPLAY_CACHE
}

感兴趣的同学可以看看 SharingStarted.WhileSubscribed() 的具体实现类 StartedWhileSubscribed 里面的源码。如果需要自定义启动方式,照着葫芦画瓢即可。

既然有 shareIn,那自然就少不了 stateIn 了。

2. stateIn 操作符

方法声明:

// code 4
public fun <T> Flow<T>.stateIn(
    scope: CoroutineScope,
    started: SharingStarted,
    initialValue: T
): StateFlow<T>

首先可以看出返回值是一个热流 StateFlow 实例,那么自然而然就需要一个参数给它设置一个初始值,即第三个参数 initialValue。前两个参数与 shareIn 一样,这里就不再赘述。

3. shareIn 与 stateIn 使用指北

3.1 SharingStarted.WhileSubscribed() 实际使用

从上面的介绍可知,这种启动方式可以在没有消费者时自动取消上游数据流,从而避免资源的浪费。但在实际使用中,建议使用 SharingStarted.WhileSubscribed(5000),即在最后一个消费者停止后再保持数据流 5 秒钟的活跃状态。避免在某些特定情况下(如配置改变——最常见就是横竖屏切换、暗夜模式切换)重启上游的数据流。

3.2 shareIn、stateIn 适用于属性声明而非方法返回值

shareInstateIn 都会创建一个新的数据流,具体说就是 shareIn 会构建一个 ReadonlySharedFlow 实例;stateIn 则会构建一个 ReadonlyStateFlow 实例。而新创建的数据流会一直保存在内存中,直到传入数据流的作用域被取消或者没有任何引用时才会被 GC 回收。

所以下面代码中,前一部分代码是禁止使用的,正确的使用应该是如后一部分的代码,即在属性中使用。

// code 5
//错误示例:每次调用方法都会构建新的数据流
fun getUser(): Flow<User> =
    userLocalDataSource.getUser()
            .shareIn(externalScope, WhileSubscribed())    

//正确示例:在属性中使用 shareIn 或 stateIn 
 val user: Flow<User> = 
     userLocalDataSource.getUser().shareIn(externalScope, WhileSubscribed())

3.3 MutableSharedFlow 的 subscriptionCount 参数

这个参数表示的是 MutableSharedFlow 中活跃的消费者数目,即订阅者的个数。可用于监听消费者的数目变更,下面就是一个例子:

// code 6
sharedFlow.subscriptionCount
    .map { count -> count > 0 } // count > 0 说明有消费者,返回 true;= 0 说明没有消费者了,返回 false
    .distinctUntilChanged() // only react to true<->false changes
    .onEach { isActive -> // configure an action
        if (isActive) { // do something } else { // do something }
    }
    .launchIn(scope) // launch it

这个例子可以在有消费者收集数据流时,做一些自己的操作;当所有消费者都停止收集时,再处理另外的一些操作,比如资源回收等。

distinctUntilChanged 操作符比较面生,它就是过滤掉前面接收到的重复值,从而使得后面只会接收到发生了变化的新值,和 StateFlow 特性一样。

onEach 操作符也比较常见,可以在流上新增一些处理操作,再发给下游。

3.4 与操作符的搭配使用

如果在实际使用中,需要得知上游数据流的一些状态,比如开始、完成等,则需要在上游数据流转为热流之前添加一些操作符起到监听的作用。

onStart、onCompletion 操作符监听启动和完成

// code 7
private fun shareInOnStartDemo() {
    val testFlow = flow {
        println("++++emit before")
        emit(4)
        delay(1000)
        emit(5)
        delay(1000)
        emit(6)
    }.onStart {
        emit(-1)
        println("++++ onStart")
    }.onCompletion {
        emit(-100)
        println("++++ onCompletion")
    }.shareIn(
        lifecycleScope,
        SharingStarted.WhileSubscribed(),
        8
    )
    lifecycleScope.launch {
        testFlow.collect {
            println("++++ collector receive $it")
        }
    }
}

从打印的 log 可以看到,确实可以监听状态。当然也可以在相同的位置添加 catch 操作符用于监听异常的发生,感兴趣的同学可以试试看。

4. StateFlow 代码实战

说了这么多 Flow 的东西,最后以一个实际的例子结束这一章节的学习笔记吧!

下面我将用一个应用实例来讲解 StateFlow 的实际应用。这个例子将会用到 debouncedistinctUnitChangedflatMapLatest 等操作符,用这些操作符去实现一个文本输入中实时查询的例子。

假设有个需求,要实现一个浏览器搜索的功能,根据用户不断输入的字符去查询相关的内容。如果不做任何处理,用户对键入的字符串做的任何修改,都会去请求一次接口,那后端服务器肯定是吃不消的;对于用户而言,在不断输入的过程中返回的结果用户并不会很关心,他只会关心最终输入完成之后请求的数据。那么,如何减少后端的接口请求次数是关键所在。

先来看看核心的代码:

// code 8   ViewModel.kt 文件
val queryStateFlow = MutableStateFlow("")

fun getQueryResult(): Flow<String> {
    return queryStateFlow
        .debounce(300L)
        .distinctUntilChanged()
        .flatMapLatest {
            if (it.isNullOrBlank()) {
                flow { emit("") }
            } else {
                dataFromNetwork(it).catch {
                    emitAll( flow { emit("") } )
                }
            }
        }
        .flowOn(Dispatchers.IO)
}

// 模拟网络请求的耗时操作
private fun dataFromNetwork(query: String): Flow<String> {
    return flow {
        delay(2000)
        emit(query) // 返回请求的结果
    }
}

首先可以直观地感受到,使用 Flow 去处理这一逻辑较为简单,代码量较少,这也是 Flow 的魅力所在。我们按顺序介绍一下所使用到的 Flow 操作符:

debounce 操作符 具体的操作符方法声明:

// code 9
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T>

用于过滤掉最新的发射值之前 timeoutMillis 时间内发射的值,返回一个过滤后的 Flow。官方栗子非常清楚:

// code 10
flow {
    emit(1)
    delay(90)
    emit(2)
    delay(90)
    emit(3)
    delay(1010)
    emit(4)
    delay(1010)
    emit(5)
}.debounce(1000)
最终会发射出下面的三个值:
3, 4, 5

发射 1 之后不到 1000ms 又发射了 2,所以 1 就会被过滤掉不会发射了,以此类推。所以最后发射的值是一定可以发射成功的。通过这个操作符,我们就可以有效减少频繁请求接口的问题,这里设置的 timeout 为 300ms,即在用户连续输入过程中每间隔 300ms 才去请求一次数据。

distinctUntilChanged 操作符 具体操作符声明为:

// code 11
public fun <T> Flow<T>.distinctUntilChanged(): Flow<T>

用于过滤掉重复的发射值。虽然 StateFlow 本身就可过滤掉没有变化的发射值,但在这里还是需要的,因为用户可能会删除刚输入的字符,这一操作符可进一步减少不必要的接口请求。

flatMapLatest 操作符 我看的代码版本这个操作符还是实验性api,后续可能被移除。具体操作符声明为:

// code 12
@ExperimentalCoroutinesApi
public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R>

这个操作符可以在原流的基础上生成一个新流,当原流依次发出 a、b 两值时,新流都会接收,但如果新流 a 值的相关操作还未结束,则会取消 a 值的相关操作,并用 b 值进行操作。简单说就是,丢弃旧值操作,换用新值操作。下面是一个例子:

// code 13
    fun flatMapLatestDemo() {
        val testFlow = flow {
            emit("a")
            delay(100)
            emit("b")
        }.flatMapLatest {
            flow {
                emit("receive $it")
                delay(200)
                emit("send $it")
            }
        }

        lifecycleScope.launch {
            testFlow.collect {
                println("----$it")
            }
        }
    }

通过打印的 log 可以看出,a,b 都被 flatMapLatest 操作符接收到了,只有 b 最终通过。这是因为 a 先到达,等待了 100ms 后新的值 b 也到了,但 a 还在等待中,这时 flatMapLatest 就会取消掉 a 后续的操作。如果把 delay(200) 改成 delay(50),那最终 a,b 都能被打印出来。

所以这个操作符在 code 8 中的作用就是进一步减少接口请求的次数。当输入的新字符串到来时,就会将之前旧字符串还未结束的请求操作取消掉,用新的字符串去请求数据。

ViewModel.kt 的代码终于说完了,其他的代码就比较常规了,直接上码:

// code 14  MainActivity.kt
binding.editText.addTextChangedListener(object : TextWatcher{
    override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) { }

    override fun onTextChanged(input: CharSequence?, start: Int, before: Int, count: Int) {
        viewModel.queryStateFlow.value = input.toString()
    }

    override fun afterTextChanged(s: Editable?) { }
})

lifecycleScope.launch {
    repeatOnLifecycle(Lifecycle.State.STARTED) {
        viewModel.getQueryResult()
            .collect {
                binding.tvText.text = it
            }
    }
}

有关 Flow 的相关知识就到此结束了,来个简单总结吧~

总结

1)shareInstateIn 都可将冷流转化为热流,将数据共享给多个消费者,无需为每个消费者创建同一个数据流的新实例。两者通常用于提升性能,在没有消费者时缓存数据; 2)SharingStarted 启动方式有 EagerlyLazilyWhileSubscribed 三种,最常用的还是 WhileSubscribed,有消费者就启动,没有就停止,还能设置停止延时时长和缓存过期时长;3)注意 shareInstateIn 都会新建一个 Flow,不要用于方法的返回值,建议赋值给属性;4)shareInstateInonStartonCompletion 等搭配可监听转成的热流的状态;5)distinctUntilChanged 操作符可过滤重复数据,一般用于 SharedFlow;debounce 可用于在某一时间段内防抖;flatMapLatest 操作符可以用最新值替换旧值发送给下游,旧值直接被取消作废。

参考文献

  1. StateFlow 和 SharedFlow 官方文档 https://developer.android.google.cn/kotlin/flow/stateflow-and-sharedflow?hl=zh-cn
  2. Flow 操作符 shareIn 和 stateIn 使用须知;Android开发者;https://mp.weixin.qq.com/s/PbqF-vzDrttYq-cSR6NDmQ
  3. Kotlin协程:冷流转换热流的使用与原理;LeeDuo;https://blog.csdn.net/LeeDuoZuiShuai/article/details/127145092

讨论这个帖子(0)垃圾回帖将一律封号处理……