文章

Kotlin:协程

协程和线程是有点类似的,可以简单的理解成一种轻量级的线程,我们之前所学的线程需要依靠系统的调度才能实现不同线程之间的切换。而使用协程可以仅在编程语言层面就能实现不同协程之间的切换,从而大大提升了并发编程的运行效率。

协程允许我们在单线程模式下模拟多线程编程的效果,代码执行时的挂起与恢复完全是由编程语言来控制的,和操作系统无关。这种特性使得高并发程序的运行效率得到了极大地提升,开启10万个线程是完全不可想象的事吧?而开启10万个协程就是完全可行的。

Kotlin并没有将协程纳入标准库的API当中,而是以依赖库形式提供的,需要现在app/build.gradle文件当众添加如下依赖库:

dependencies {
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.1.1"
    // 第二个依赖库在Android中才会用到
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.1.1"
}

一、协程的基本用法

1.Global.launch函数

GlobalScope.launch函数可以创建一个协程作用域,这样传递给launch函数的代码块(Lambda表达式)就是在协程中运行的了,代码如下所示:

fun main() {
    GlobalScope.launch {
        // 这段打印并不会执行
        // 代码还没来得及执行,程序就结束运行了
        println("codes run in coroutine scope")
    }
}

但是Global.launch函数每次创建的都是一个顶层协程,这一点和线程比较像,因为线程也没有层级这一说,永远都是顶层的。所以可能导致代码块中的代码还没有来得及运行,应用程序就已经结束了,所以要想让上面的打印执行,需要将线程阻塞1秒钟,如下所示:

fun main() {
    GlobalScope.launch {
        // 线程阻塞1秒,这段打印可以成功执行
        println("codes run in coroutine scope")
    }
    Thread.sleep(1000)
}

2.runBlocking函数

runBlocking函数同样会创建一个协程的作用域,但是它与Global.launch函数不同的是,它可以保证在协程作用域内的所有代码和子协程没有全部执行完之前一直阻塞当前线程。需要注意的是,runBlocking函数通畅只应该在测试环境下使用,在正式环境中使用容易产生一些性能上的问题。实例代码如下:

fun main() {
    runBlocking {
        println("codes run in coroutine scope")
    }
}

3.创建多个协程——launch函数

这里的launch函数和刚才上面的GlobalScope.launch函数不同,首先它必须要在协程的作用域中才能调用,其次它会在当前协程的作用域下创建子协程。子协程的特点是如果外层作用域的协程结束了,该作用域下的所有子协程也会一同结束。

相比而言GlobalScope.launch函数创建的永远是顶层协程,这一点和线程比较像,因为线程也没有层级这一说,永远都是顶层的。

下面代码在runBlocking的协程作用域中创建了两个子协程:

fun main() {
    runBlocking {
        // 在runBlocking作用域中创建第一个子协程
        launch {
            println("launch1")
            delay(1000)
            println("launch1 finished")
        }
        // 在runBlocking作用域中创建第二个子协程
        launch {
            println("launch2")
            delay(1000)
            println("launch2 finished")
        }
    }
}

4.声明挂起函数——suspend关键字

随着launch函数中的逻辑越来越复杂,可能需要将部分代码提取到一个单独的函数中,这个时候会产生一个问题:我们在launch函数中编写的代码是拥有协程作用域的,但是提取到一个单独的函数中就没有协程作用域了,那么该如果调用像delay()这样的挂起函数?

为此,Kotlin提供了一个suspend关键字,使用它可以将任何函数声明成挂起函数,而挂起函数之间都是可以互相调用的,如下所示:

suspend fun printlnDot() {
    println(".")
    // 这样就可以在函数中调用delay()函数了
    delay(1000)
}

但是,suspend关键字只能将一个函数声明成挂起函数,是无法给它提供协程作用域的。比如尝试在printlnDot()方法中调用launch函数是一定无法成功的,因为launch函数要求必须在协程作用域当中才能调用。这时又需要引入下一个函数概念了。

5.创建子作用域——coroutineScope函数

coroutineScope也是一个挂起函数,因此可以在任何其他挂起函数中调用。它的特点是会继承外部的协程作用域并创建一个子作用域,借助这个特性,我们就可以给任何挂起函数提供协程作用域了,代码如下所示:

suspend fun printDot() = coroutineScope {
    launch {
        println(".")
        delay(1000)
    }
}

5.1.与runBlocking函数类似之处

coroutineScope和runBlocking函数还有点类似,它可以保证其作用域内的所有代码和子协程在全部执行完成之前,会一直阻塞当前协程,而runBlocking在全部执行完成前会阻塞当前线程,实例代码如下:

fun main() {
    // 创建协程作用域
    // 全部完成之前前阻塞线程
    runBlocking {
        // 创建子协程作用域
        // 全部完成之前阻塞协程
        coroutineScope {
            // 创建子协程
            launch {
                for (i in 1..10) {
                    println(i)
                    delay(1000)
                }
            }
        }
        // coroutineScope执行完成后调用
        println("coroutineScope finished")
    }
    // runBlocking执行完成后调用
    println("runBlocking finished")
}

虽然看上去coroutineScope函数和runBlocking函数的作用是非常类似的,但是coroutineScope函数只会阻塞当前协程,既不影响其他协程,也不影响任何线程,因此是不会造成任何性能上的问题的而runBlocking函数由于会阻塞当前线程,如果恰好又在主线程中调用它的话,那么就有可能会导致界面卡死的情况,所有不太推荐在实际项目中使用。

二、更多的作用域构建器

在前面我们学习了GlobalScope.launch、runBlocking、launch、coroutineScope这几种作用域构建器,它们都可以创建一个新的协程作用域。不过GlobalScope.launch和runBlocking函数是可以在任何地方调用的,coroutineScope函数可以在协程作用域或挂起函数中调用,而launch函数只能在协程作用域中调用。

前面已经说了,runBlocking由于会阻塞线程,因此只建议在测试环境下使用,而GlobalScope.launch由于每次创建的都是顶层协程,一般也不太建议使用,除非非常明确就是要创建顶层协程。

那么为什么不太建议使用顶层协程呢,因为管理成本太高了,如果我们在Activity中使用协程发起一条网络请求,由于网络请求是耗时的,用户在服务器还没来得及响应的情况下就关闭了当前Activity,此时按理说应该取消这条网络请求,或者至少不应该进行回调,因为Activity已经不存在了,回调也没有意义。

1.取消协程

不管是GlobalScope.launch函数还是launch函数,它们都会返回一个Job对象,只需要调用Job对象的cancel()方法就可以取消协程,代码如下所示:

val job = GlobalScope.launch {
    // 处理具体逻辑
}
// 取消协程
job.cancel()

但是如果我们每次创建的都是顶层协程,当Activity关闭时就要逐个调用所有已经创建的协程的cancel()方法,这样根本无法维护。

2.实际项目中常用协程写法

GlobalScope.launch这种协程作用域构建器,在实际项目中也是不太常用的,下面代码演示了实际项目中比较常用的写法:

// 创建Job对象
val job = Job()
// CoroutineScope是个函数,不是类
// 它会返回一个CoroutineScope对象,可以随时调用launch()创建协程
val scope = CoroutineScope(job)
// 调用CoroutineScope的launch函数创建协程
scope.launch {
    // 处理具体的逻辑
}

// 所有调用CoroutineScope的launch函数创建的协程
// 都会关联在Job对象作用域下面
// 因此只需调用一次cancel()方法就可以将同一作用域内所有协程取消
job.cancel()

相比之下,CoroutineScope()函数更适合用于实际项目当中,如果只是在main()函数中编写一些学习测试用的代码,还是使用runBlocking函数最为方便.

3.创建子协程并获取执行结果——async

async函数必须在协程作用域中才能调用,它会创建一个新的子协程并返回一个Deferred对象,如果我们想要获取async函数代码块的执行结果,只需要调用Deferred对象的await()方法即可,代码如下所示:

fun main() {
    runBlocking {
        val result = async {
            5 + 5
        }.await()
        // 打印:10
        println(result)
    }
}

事实上,在调用了async函数之后,代码块中的代码就会立刻开始执行。当调用await()方法时,如果代码块中的代码还没执行完,那么await()方法会将当前协程阻塞住,直到可以获得async函数的执行结果。

4.特殊的作用域构建器——withContext

withContext()函数是一个挂起函数,大体可以将它理解成async函数的一种简化版写法,示例代码如下:

fun main() {
    runBlocking {
        // 调用withContext函数后会立即执行代码块中代码
        // 同时将当前协程阻塞。全部执行完毕后将最后一行结果返回
        // 等同于:val result = async{ 5 + 5 }.await()
        // 唯一不同是withContext要求传入一个线程参数
        // 线程参数看下文解释
        val result = withContext(Dispatchers.Default) {
            5 + 5
        }
        println(result)
    }
}

我们已经知道协程是一种轻量级的概念,因此很多传统编程情况下需要开启多线程执行的并发任务,现在只需要在一个线程下开启多个协程来执行就可以了。但是这并不意味着我们就永远不需要开启线程了。比如Android中要求网络请求必须在子线程中进行,即使你开启了协程去执行网络请求,假如它是主线程中的协程,那么程序任然会出错。这个时候我们就应该通过线程参数给协程指定一个具体的运行线程。

  1. Dispatchers.Default
    使用一种默认低并发的线程策略,当你要执行的代码属于计算密集型任务时,开启过高的并发反而可能会影响任务的运行效率,此时就可以使用Dispatchers.Default。

  2. Dispatchers.IO
    使用一种较高的并发编程策略,当你要执行的代码大多数时间是在阻塞和等待中,比如说执行网络请求时,为了能够支持更高的并发数量,此时就可以使用。

  3. Dispatchers.Main
    表示不会开启子线程,而是在Android主线程中执行代码,但是这个值只能在Android项目中使用,纯Kotlin程序使用这种类型的线程参数会出现错误。

事实上,在刚才所学的协程作用域构建器中,除了coroutineScope函数之外,其他所有的函数都是可以指定这样一个线程参数的,只不过withContext()函数是强制要求指定,其他函数则是可选的。

三、使用协程简化回调写法

在过去,实现获取异步网络请求数据响应的功能都是使用编程语言的回调机制实现的,回调机制基本上是依靠匿名类实现的,但是匿名类的写法通畅比较繁琐,比如如下代码:

HttpUtil.sendHttpRequest(address, object : HttpCallbackListener {
    override fun onFinish(response: String) {
        // 得到服务器返回的具体内容
    }
    
    override fun onError(e: Exception) {
        // 在这里对异常情况进行处理
    }
})

在过去可能确实没有什么更加简单的写法了,不过现在,Kotlin的协程使这成为可能,只需要借助suspendCoroutine函数就能将传统的回调机制的写法大幅简化。

1.suspendCoroutine函数

suspendCoroutine函数必须在协程作用域或者挂起函数中才能调用,它接收一个Lambda表达式参数,主要作用是将当前协程立即挂起,然后在一个普通的线程中执行Lambda表达式中的代码。Lambda表达式的参数列表上会传入一个Continuation参数,调用它的resume()方法或resumeWithException()可以让协程恢复执行。

1.1.传统回调优化

了解了用法之后,下面借助这个函数对传统的回调写法进行优化。首先定义一个request()函数,如下所示:

// request是一个挂起函数
suspend fun request(address: String): String {
    // 调用suspendCoroutine后当前协程立刻挂起
    return suspendCoroutine { continuation ->
        // Lambda中代码在普通线程中执行
        // 调用sendHttpRequest发起网络请求
        HttpUtil.sendHttpRequest(address, object : HttpCallbackListener {
            override fun onFinish(response: String) {
                // 恢复被挂起的协程,并传入服务器响应数据
               // 该数据会成为suspendCoroutine函数返回值
               continuation.resume(response)
           }
            
            override fun onError(e: Exception) {
                // 传入具体异常原因
                continuation.resumeWithException(e)
            }
        })
    }
}

使用了上面的suspendCoroutine函数后,再进行网络请求就都不需要再重复进行回调实现了。比如说获取百度首页的响应数据,就可以如下所示:

// getBaiduResponse()是一个挂起函数
suspend fun getBaiduResponse() {
    try {
        // 调用request()函数时,当前协程被立刻挂起
        // 等待网络请求成功或失败后,当前协程恢复运行
        val response = request("https://www.baidu.com")
        // 对服务器响应的数据进行处理
    } catch(e: Exception) {
        // 对异常情况进行处理
    }
}

getBaiduResponse()函数被声明成了挂起函数,这样它也只能在协程作用域或者其他挂起函数中调用了,因为suspendCoroutine函数本身就是要结合协程一起使用的。不过通过合理的项目架构设计,我们可以轻松地将各种协程的代码应用到一个普通的项目当中。

1.2.Retrofit回调优化

由于不同的Service接口返回的数据类型也不同,所以这次不能像刚才那样针对具体的类型进行编程了,而是要使用泛型方式。定义一个await()函数,代码如下所示:

// await()是一个挂起函数,是Call<T>的扩展函数
suspend fun <T> Call<T>.await(): T {
    // 挂起当前协程
    return suspendCoroutine { continuation ->
        // Call对象上下文,可以直接调用enqueue发起网络请求
        enqueue(object : Callback<T> {
            override fun onResponse(call: Call<T>, response: Response<T>) {
                val body = response.body()
                if (body != null) continuation.resume(body)
                else continuation.resumeWithException (
                    RuntimeException("response body is null"))
            }
            
            override fun onFailure(call: Call<T>, t: Throwable) {
                continuation.resumeWithException(t)
            }
        })
    }
}

使用await()优化Retrofit实例代码:

suspend fun getAppData() {
    try {
        val appList = ServiceCreator.create<AppService>()
            .getAppData().await()
        // 对服务器响应的数据进行处理
    } catch(e: Exception) {
        // 对异常情况进行处理
    }
}
License:  CC BY 4.0