JAVA ·

kotlin 协程探索

什么是协程?为什么要用协程?在kotlin中如何使用协程?搞清楚这三个问题,不说你能完全弄懂协程,至少面试中问到协程,你就可以吹一波了。

什么是协程?

协程与子例程一样,协程(coroutine)也是一种程序组件。相对子例程而言,协程更为一般和灵活,但在实践中使用没有子例程那样广泛。

上面这段官方说法来自百度百科,我看了以后还是有点懵逼,这个概念最早由Melvin Conway于1958提出。至于本人也是最近在学习kotlin的时候才接触,那么到底什么是协程?我更倾向于以下说法:它就是一种用户状态的轻量级线程,由协程构建器启动。 通过上面这段描述可以知道协程是用户状态的,是一种比线程还轻量的东西,由协程构建器赋值启动。

为什么使用协程?

当在程序中需要执行耗时操作,如:文件I/O,网络I/O等,在不堵塞主线程的情况下,就需要重新启动一个子线程去完成这些任务。

线程:

系统级别的,以抢占CPU时间片来执行,是同过kernel进行调度的,线程的创建,销毁,切换都是由kernel处理,还有线程的各种状态维护,多线程的同步,数据交换等等,如此就降低了程序的执行效率,在高并发的情况下显的更加明显。

协程

用户级别的,由用户空间负责切换,不需要陷入内核,非抢占式调度。同一时刻同一调度器中的协程只有一个处于运行状态。协程提供了一种避免阻塞线程并用更廉价、更可控的操作替代线程阻塞的方法:协程挂起。
协程通过将复杂性放入库来简化异步编程。程序的逻辑可以在协程中顺序地表达,而底层库会解决其异步性。该库可以将用户代码的相关部分包装为回调、订阅相关事件、在不同线程(甚至不同机器)上调度执行,而代码则保持如同顺序执行一样简单。
协程完全通过编译技术实现,不需要VM和OS的支持。

协程的使用

简单的协程
override fun onResume() {
        super.onResume()
        launch {
            delay(3*1000L, TimeUnit.MILLISECONDS)
            Log.e(TAG,"hello coroutines")
        }
        Log.e(TAG,"hello KOTLIN")
    }
    
//log 输出
hello KOTLIN
hello coroutines

以上看到,协程并不会堵塞主线程,而是以一种类似子线程的方式运行。源码中的launcher函数:

public actual fun launch(
    context: CoroutineContext = DefaultDispatcher,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    parent: Job? = null,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context, parent)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

其中需要注意的是最后一个参数,必须是suspend修饰的函数而且是无返回值的,即挂起函数。挂起函数只能由协程或另外一个挂起函数调用。
以上就是一个简单的协程创建运行的示例,那么在使用的过程中和线程相比有哪些类型和不同呢?

问题一:是否和线程一样有主线程和子线程之分?

在kotlin的协程里面,提供了runBlocking函数来实现类似主协程的功能:

runBlocking {
            launch {
                //子协程delay 3s
                delay(3*1000L, TimeUnit.MILLISECONDS)
                Log.e(TAG,"hello coroutines")
            }
            //主协程 delay 4s
            delay(4*1000L, TimeUnit.MILLISECONDS)
            Log.e(TAG,"hello KOTLIN")
        }
问题二:是否能等待子协程执行完毕,再执行,类似线程的join()函数?

在kotlin中也同样提供join的函数,来等待一个子协程完成任务

runBlocking {
            val job = launch { // launch new coroutine and keep a reference to its Job
                delay(1000L)
                println("World!")
            }
            println("Hello,")
            job.join()
            println("Hello777777")
        }
    //log 顺序
    Hello,
    World!
    Hello77777
问题三:在jvm中批量创建线程会触发java.lang.OutOfMemoryError异常,协程是否也有限制?

在java中创建一个线程,同样的会为这个线程在系统的剩余内存中创建一个线程栈用来运行,当创建的线程太多导致系统剩余内存不足。那么我们创建多个协程看看情况。

runBlocking {
        val jobs = List(100_000) {
            // create a lot of coroutines and list their jobs
            launch(CommonPool) {
                delay(1000L)
                print(".")
            }
        }
        jobs.forEach { it.join() } // wait for all jobs to complete
    }

以上代码完全可以无异常运行,但是如果换成线程毫无疑问就会异常退出。所以协程比线程更具轻量。

问题四:是否可以取消正在运行的协程?

在kotlin中提供一个cancel的函数用来取消正在运行的协程。

runBlocking {
            val jobs = launch(CommonPool) {
                repeat(1000){
                    println("I'm sleeping ")
                }
                delay(500)
            }
            delay(3000)
            jobs.cancel()
        }

但是这里有一个需要注意的地方,计算代码的协程取消会失效。在这就不详细阐述了。

问题五:是否可以设置协程的超时?

withTimeout 函数来给一个协程任务的执行设定最大执行时间,超出这个时间,就直接终止掉。

//3秒后超时
withTimeout(3000L) {
            repeat(100) { i ->
                println("I'm sleeping $i ...")
                delay(500L)
            }
        }
问题六:能否异步并发执行协程?

在协程中执行中,挂起函数默认是按顺序执行,

private suspend fun fc2() {
        Log.e(TAG, "onResume 222222222")
    }

    private suspend fun fc3() {
        Log.e(TAG, "onResume 333333333")
    }

    private suspend fun fc1() {
        delay(2000)
        Log.e(TAG, "onResume 111111111")
    }
    override fun onResume() {
        super.onResume()
        runBlocking {
           fc1()
            fc2()
            fc3()
        }
    }

在以上代码中,会依次执行fc1,fc2,fc3。那么如果你想并发执行着三个函数呢?那就需要使用async来实现异步调用。

override fun onResume() {
        super.onResume()
        runBlocking {
           async { fc1() }
            async { fc2() }
            async { fc3() }

        }
    }

查看async的源码如下:

public actual fun <T> async(
    context: CoroutineContext = DefaultDispatcher,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    parent: Job? = null,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    val newContext = newCoroutineContext(context, parent)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

可以看到async和launch类似,launch返回一个任务Job对象, 不带任何结果值;而async返回一个延迟任务对象Deferred,一种轻量级的非阻塞性future, 它表示后面会提供结果。

问题七:协程之间是如何通信?

延迟对象提供了一种在协程之间传输单个值的方法。而通道(Channel)提供了一种传输数据流的方法。
通道跟阻塞队列一个关键的区别是:通道有挂起的操作, 而不是阻塞的, 同时它可以关闭。

runBlocking {
            val channel = Channel<Int>()
            launch() {
                for (x in 1..10) channel.send(x * x)
            }
            println("channel = ${channel}"
            repeat(10) { println(channel.receive()) }
            println("Done!")
        }

Channel<Int>()背后调用的是会合通道RendezvousChannel(),会合通道中没有任何缓冲区。send函数被挂起直到另外一个协程调用receive函数, 然后receive函数挂起直到另外一个协程调用send函数。它是一个完全无锁的实现。 kotlin的生成-消费模型:

fun produceSquares() = produce<Int>(CommonPool) {
        for (x in 1..7) send(x * x)
    }

    fun consumeSquares() = runBlocking{
        val squares = produceSquares()
        squares.consumeEach { println(it) }
        println("Done!")
    }

produce函数会启动一个新的协程, 协程中发送数据到通道来生成数据流,并以 ProducerJob对象返回对协程的引用。ProducerJob继承了Job, ReceiveChannel类型,因此在kotlin中协程之间的通信主要可以使用channel和produce来进行。

总结:使用协程能使程序更有效率,降低异步编程的复杂性,提高程序的并发能力等。

参与评论