1. 1. 代码规范
    1. 1.1. 源码组织
      1. 1.1.1. 目录结构
      2. 1.1.2. 文件命名
      3. 1.1.3. 文件组织
      4. 1.1.4. 类的内容排布
      5. 1.1.5. 实现接口时的排布
      6. 1.1.6. 重载方法的排布
    2. 1.2. 命名规则
      1. 1.2.1. 方法名
      2. 1.2.2. 测试方法的命名
      3. 1.2.3. 属性名
      4. 1.2.4. 幕后属性的命名
        1. 1.2.4.1. 幕后字段
        2. 1.2.4.2. 幕后属性
      5. 1.2.5. 如何选择名称
    3. 1.3. 格式化
      1. 1.3.1. 缩进
      2. 1.3.2.
      3. 1.3.3. 方法
      4. 1.3.4. 表达体
      5. 1.3.5. 属性
      6. 1.3.6. 控制流程语句
      7. 1.3.7. 方法调用
      8. 1.3.8. 链式调用
      9. 1.3.9. Lambdas
      10. 1.3.10. 拖尾逗号
    4. 1.4. 文档注释
    5. 1.5. 习惯用法
      1. 1.5.1. 不可变性
      2. 1.5.2. 默认参数
      3. 1.5.3. 类型别名
      4. 1.5.4. Lambda 参数
      5. 1.5.5. Lambda 返回值
      6. 1.5.6. 命名参数
      7. 1.5.7. 条件表达式
      8. 1.5.8. Loops
      9. 1.5.9. Loops on range
      10. 1.5.10. String
      11. 1.5.11. Functions vs properties
      12. 1.5.12. 扩展方法
      13. 1.5.13. 中缀方法 (Infix functions)
      14. 1.5.14. 工厂方法
  2. 2. 标准库
    1. 2.1. Scope functions
      1. 2.1.1. Distinctions
        1. 2.1.1.1. Context object: this or it
        2. 2.1.1.2. Return value
      2. 2.1.2. let
      3. 2.1.3. with
      4. 2.1.4. run
      5. 2.1.5. apply
      6. 2.1.6. also
      7. 2.1.7. takeIf and takeUnless
  3. 3. 类与对象
    1. 3.1. 泛型
      1. 3.1.1. 型变
        1. 3.1.1.1. 声明处型变
        2. 3.1.1.2. 使用处型变
        3. 3.1.1.3. Star-projections
      2. 3.1.2. 泛型函数
      3. 3.1.3. 泛型约束
      4. 3.1.4. 类型擦除
  4. 4. 函数与 Lambda 表达式
    1. 4.1. 高阶函数
      1. 4.1.1. 函数类型
      2. 4.1.2. 函数类型实例化
      3. 4.1.3. 函数类型实例调用
    2. 4.2. Lambda 表达式
      1. 4.2.1. 语法
      2. 4.2.2. 传递末尾的 lambda 表达式
      3. 4.2.3. it:单个参数的隐式名称
      4. 4.2.4. 从 lambda 表达式中返回值
      5. 4.2.5. 下划线用于未使用的变量
      6. 4.2.6. lambda 表达式中的解构声明
    3. 4.3. 匿名函数
      1. 4.3.1. 闭包
    4. 4.4. 内联函数
      1. 4.4.1. 禁用内联
      2. 4.4.2. 非局部返回
      3. 4.4.3. 具体化的类型参数
      4. 4.4.4. 内联属性
  5. 5. 协程
    1. 5.1. 异步编程技术
      1. 5.1.1. 线程
      2. 5.1.2. 回调
      3. 5.1.3. Futures / Promises 及其它
      4. 5.1.4. 响应式插件
      5. 5.1.5. 协程
    2. 5.2. 协程基础
      1. 5.2.1. 结构化并发
        1. 5.2.1.1. CoroutineScope vs CoroutineContext
      2. 5.2.2. 挂起函数
      3. 5.2.3. 作用域构建器
      4. 5.2.4. 全局协程像守护线程
    3. 5.3. 取消与超时
      1. 5.3.1. 取消
      2. 5.3.2. 超时
    4. 5.4. 组合挂起函数
      1. 5.4.1. 使用 async 并发
      2. 5.4.2. 惰性启动的 async
      3. 5.4.3. async 风格的函数
      4. 5.4.4. 使用 async 的结构化并发
    5. 5.5. 协程上下文与调度器
      1. 5.5.1. 调度器与线程
    6. 5.6. 异步流
      1. 5.6.1. 序列与流
      2. 5.6.2. 流的取消
      3. 5.6.3. 流是冷的
      4. 5.6.4. 流是连续的
      5. 5.6.5. 流上下文
        1. 5.6.5.1. withContext 发出错误
      6. 5.6.6. 流构建器
      7. 5.6.7. 流操作符
        1. 5.6.7.1. 过渡操作符 map
        2. 5.6.7.2. 转换操作符 transform
        3. 5.6.7.3. 限长操作符 take
        4. 5.6.7.4. 末端操作符
        5. 5.6.7.5. flowOn 操作符
        6. 5.6.7.6. 缓冲操作符
          1. 5.6.7.6.1. conflate
          2. 5.6.7.6.2. collectLatest
        7. 5.6.7.7. 合并操作符
          1. 5.6.7.7.1. Zip
          2. 5.6.7.7.2. combine
        8. 5.6.7.8. 展平操作符
          1. 5.6.7.8.1. flatMapConcat
          2. 5.6.7.8.2. flatMapMerge
          3. 5.6.7.8.3. flatMapLatest
        9. 5.6.7.9. 异常操作符
          1. 5.6.7.9.1. 使用 try..catch 捕获异常
          2. 5.6.7.9.2. catch 操作符
          3. 5.6.7.9.3. 声明式捕获 onEach
        10. 5.6.7.10. 完成操作符
          1. 5.6.7.10.1. 命令式 finally 块
          2. 5.6.7.10.2. 声明式处理 onCompletion
        11. 5.6.7.11. 启动操作符
          1. 5.6.7.11.1. 可取消的流
      8. 5.6.8. Flow 与 Rx
    7. 5.7. 通道
      1. 5.7.1. 通道基础
        1. 5.7.1.1. 通道的关闭与迭代
        2. 5.7.1.2. 构建通道生产者
      2. 5.7.2. 管道
      3. 5.7.3. 扇出
      4. 5.7.4. 扇入
      5. 5.7.5. 带缓冲的通道
      6. 5.7.6. 通道是公平的
      7. 5.7.7. 计时器通道
    8. 5.8. 异常处理与监督
      1. 5.8.1. 异常的传播
      2. 5.8.2. CoroutineExceptionHandler
      3. 5.8.3. 取消与异常
      4. 5.8.4. 异常聚合
      5. 5.8.5. 监督
        1. 5.8.5.1. 监督作业
        2. 5.8.5.2. 监督作用域
        3. 5.8.5.3. 监督协程中的异常
    9. 5.9. 共享的可变状态与并发
      1. 5.9.1. 线程安全的数据结构
      2. 5.9.2. 以细粒度限制线程
      3. 5.9.3. 以粗粒度限制线程
      4. 5.9.4. 互斥

Kotlin 学习笔记

kotlin_logo

阅读文档和《Kotlin in Action》做的一些笔记。

代码规范

源码组织

目录结构

在纯 Kotlin 的项目中,推荐的目录结构是省略根包名。如果包名是 com.example.kotlin,那么所有的代码都应该在这个根目录之下,比如 org.example.kotlin.network.socket 中的文件就应该放在 network/socket 子目录下。

文件命名

如果文件中只包含一个类(包括顶层声明),则它的文件名应该和类名保持一致。如果文件包含多个类、顶层声明等,则应该选择最能描述这些类作用的命名,尽量选择清晰易懂的名称,如果做不到就应该使用多个文件分别保存。对于命名风格应该选择大驼峰命名法,并且应该避免使用一些无意义的后缀,比如 util 等。

文件组织

如果多个类相互关系密切,并且描述的是同一个功能,那么推荐将它们放在同一个文件,只要最终的文件不是太长就可以。尤其是当我们需要为类定义一些扩展方法的时候,如果扩展方法只和当前类有关,那么就应该把这些扩展方法放到类一起,而不是单独创建一个文件用于保存扩展方法。

类的内容排布

类的各部分排列应该按照以下顺序:

  1. 属性声明和初始化代码块
  2. 从构造器
  3. 方法声明
  4. 伴生对象

初次之外,不要将方法根据首字母顺序排序,或者根据可见性排序,也不要将普通方法和扩展方法分开来,而是应该将相关的方法放在一起,根据功能依次排列,这样阅读你的代码的人才能方便地从上到下阅读你的代码,而不是频繁地来回跳转寻找相关代码。推荐将关键方法放到上面,然后是较基础和底层的方法。

对于嵌套类来说,推荐将它们放到使用到这些类的地方。如果嵌套类只是被外界使用,则可以将它们放到类的底部,位于伴生对象之后。

实现接口时的排布

保持类中实现的各个方法的顺序和接口中方法定义顺序一致,如果有私有方法的话,应该放在实现的方法附近。

重载方法的排布

永远要把重载方法放在一起,方便阅读者看到一个方法全部的重载方法。

命名规则

包名和类名的命名规则:

  • 包名由小写单词和 . 构成,且不能使用下划线,多个单词推荐使用 . 分割或者直接拼接。
  • 类名使用 UpperCamelCase,大驼峰命名。

方法名

方法名、属性名、局部变量都应该使用小写字母开头,并且使用驼峰命名法,且不能使用下划线。只有工厂方法才可以使用与类名相同的方法名。

1
fun Foo(): Foo { return FooImpl() }

测试方法的命名

只有在测试方法中才可以使用反引号,但是注意安卓运行环境下并不支持这种方式,不过可以使用下划线。

1
2
3
4
5
class MyTestCase {
@Test fun `ensure everything works`() { /*...*/ }

@Test fun ensureEverythingWorks_onAndroid() { /*...*/ }
}

属性名

常量,被标记为 const val 的属性,以及没有自定义 getter、数据不可变的 val 属性应该使用大写字符+下划线分割:

1
2
const val MAX_COUNT = 8
val USER_NAME_FIELD = "UserName"

顶层变量或者类的成员属性如果携带可变数据,则应该使用小驼峰命名法:

1
val mutableCollection: MutableSet<String> = HashSet()

如果表示单例对象,则命名应该和对象声明保持一致:

1
val PersonComparator: Comparator<Person> = /*...*/

枚举类的命名应该和 Java 中的命名保持一致:

1
enum class Color { RED, GREEN, LIGHT_BLUE }

幕后属性的命名

幕后字段

一个类中通常包含属性(Property)和字段(Field),外界不可直接访问字段,一般是通过属性提供对字段的访问,有时属性中还包括对字段的计算和转换,然后再返回。Kotlin 中的属性分为只读的(通过 val 声明)和可变的(通过 var 声明),除此之外,我们还可以为属性提供自定义的访问器(getters & setters)。

Kotlin 中的字段无法显式被声明(我们只能创建属性),但是,当属性需要用到字段的时候,Kotlin 会默认它自动生成幕后字段(backing field),这个幕后字段可以在访问器通过 field 关键字进行调用。

1
2
3
4
5
6
7
var counter = 0 // the initializer assigns the backing field directly
set(value) {
if (value >= 0)
field = value
// ERROR StackOverflow: Using actual name 'counter' would make setter recursive
// counter = value
}

只有在使用了至少一个默认的访问器或者自定义访问器中引用了 field 才会生成幕后字段,下面这个例子中就没有幕后字段:

1
2
3
// 没有初始化器而且自定义访问器中没有调用 field,此时就不会有幕后字段。
val isEmpty: Boolean // 只要有初始化器,则必定会生成幕后字段;没有幕后字段就不能被初始化。
get() = this.size == 0
幕后属性

如果你不喜欢这种隐式的幕后字段,则可以使用幕后属性,也就是通过在类的内部维护一个可读可写的属性,然后对外界提供一个只读的属性:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 外部只能通过 table 来访问,但实际上访问的是 _table
* 类似于 Java 中通过设置 Getter 和 Setter 来控制对属性的访问
* */
private var _table: Map<String, Int>? = null
val table: Map<String, Int>
get() {
if (_table == null) {
_table = HashMap()
}
return _table ?: throw AssertionError("Set to null by another thread")
}

在命名幕后字段时,私有属性应该使用下划线作为开头。

如何选择名称

类名应该以名词为结尾,描述该类的作用或功能。类名应该避免使用一些无意义的单词,比如前面提到过的 Util,还有 Manager/Wrapper 等等。

方法名应该是动词或者动词短语,描述方法的功能或执行哪些操作,并且应该表明是否会修改对象或者返回一个新的对象,比如 sort 和 sorted。

对于缩写词,应该尽量少用,除非是特别常见的,比如 IO/TV 等等,或者是在当前项目的文档中注明过的常见缩写。如果是两个字母以上的缩写,应该当做普通的单词使用,比如 HttpConnection/XmlParser 等等。

格式化

打开 IDE (IDEA/Android Studio) 的 Preferences > Editor > Code Style > Kotlin,使用默认设置,全部看一遍就够了。

缩进

使用四个空格作为缩进,不要使用 Tab。

如果类的签名很长,主构造函数中的参数很多,实现的接口很多时,都应该使用换行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Person(
id: Int,
name: String,
surname: String
) : Human(id, name),
KotlinMaker,
SomeOtherInterface,
AndAnotherOne {

// 这种情况下类的第一行应该使用空白行增加可读性
fun foo() {
// ...
}
}

方法

同理,如果方法签名很长,应该使用换行:

1
2
3
4
5
6
7
fun longMethodName(
argument: ArgumentType = defaultValue,
argument2: AnotherArgumentType,
): ReturnType {

// body
}

表达体

如果方法体只有一行代码,或者只有返回值,则应该使用表达体:

1
2
3
4
5
6
// = 后面就是表达体(expression bodies)
fun foo() = value.size()

// 当表达体很长时,使用换行并添加 4 个空格
fun f(x: String, y: String, z: String) =
veryLongFunctionCallWithManyWords(andLongParametersToo(), x, y, z)

属性

如果是比较简单的属性,应该放在同一行:

1
val isEmpty: Boolean get() = size == 0

如果 getters/setters 比较复杂,则应该换行并使用缩进:

1
2
3
4
val foo: String
get() {
// body
}

如果初始化器比较长,同样应该换行并使用缩进:

1
2
private val defaultCharset: Charset? =
EncodingRegistry.getInstance().getDefaultCharsetForPropertiesFiles(file)

控制流程语句

如果 if 或 when 中包含多个语句,必须使用花括号,并且每个条件或者语句都要使用缩进,与第一个语句条件对齐:

1
2
3
4
5
6
if (!component.isSyncing &&
!hasAnyKotlinRuntimeInScope(module) ||
(someOtherCondition && andSomeMore)
) {
return createKotlinNotConfiguredPanel(module)
}

when 语句中,只有在分支包含多个表达式时才使用花括号:

1
2
3
4
5
6
7
8
9
10
11
private fun parsePropertyValue(propName: String, token: Token) {
when (token) {
condition1 -> shortCondition()
is Token.ValueToken ->
callback.visitValue(propName, token.value)

Token.LBRACE -> {
// long body
}
}
}

方法调用

如果方法的参数较多,使用换行,并将多个相近的参数放在同一行:

1
2
3
4
5
drawSquare(
x = 10, y = 10,
width = 100, height = 100,
fill = true
)

链式调用

1
2
3
4
val anchor = owner
?.firstChild!!
.siblings(forward = true)
.dropWhile { it is PsiComment || it is PsiWhiteSpace }

Lambdas

如果方法只接收一个 lambda 表达式,方法体应该放在括号外面并省略括号:

1
list.filter { it > 10 }

如果给 lambda 指定了标签,标签和花括号之间不能有空格:

1
2
3
4
5
fun foo() {
ints.forEach lit@{
// ...
}
}

如果 lambda 中指定了参数名,则应该使用换行:

1
2
3
4
appendCommaSeparated(properties) { prop ->
val propertyValue = prop.get(obj)
// ...
}

如果 lambda 中的参数列表很长,则应该使用换行并将 -> 单独放在一行:

1
2
3
4
5
6
foo {
context: Context,
environment: Env
->
context.configureEnv(environment)
}

拖尾逗号

当参数或者值的列表很长时,应该使用拖尾逗号 (Trailing commas):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@ApplicableFor([
"serializer",
"balancer",
"database",
"inMemoryCache", // trailing comma
])
class Person(
val firstName: String,
val lastName: String,
val age: Int, // trailing comma
) {

val colors = listOf(
"red",
"green",
"blue", // trailing comma
)
}

文档注释

当文档注释较长时,应该使用换行:

1
2
3
4
5
6
7
/**
* This is a summary, what this class does, blablabla...
*
* More detailed description, explain how this works,
* how to use it, and maybe add some sample codes.
*/
class AwesomeClass { /* ... */ }

文档注释中,尽量不要使用 @param 或者 @return,而是尽可能地使用叙述性的文字,这样可以增加可读性。只有在参数特别多、解释性文字特别长时,才使用它们。

1
2
3
4
/**
* Returns the absolute value of the given [number].
*/
fun abs(number: Int) { /*...*/ }

习惯用法

不可变性

倾向于使用不可变的数据。对于局部变量而言,如果初始化之后不会再进行修改的话,尽量使用 val 声明而不是 var。尽量创建不可变的集合以及使用不可变的集合作为参数,以避免在使用过程中集合被意外改变后产生的各种错误。

1
2
3
4
5
6
7
8
9
10
11
// Bad: use of mutable collection type for value which will not be mutated
fun validateValue(actualValue: String, allowedValues: HashSet<String>) { ... }

// Good: immutable collection type used instead
fun validateValue(actualValue: String, allowedValues: Set<String>) { ... }

// Bad: arrayListOf() returns ArrayList<T>, which is a mutable collection type
val allowedValues = arrayListOf("a", "b", "c")

// Good: listOf() returns List<T>
val allowedValues = listOf("a", "b", "c")

默认参数

尽量为方法创建默认参数。

类型别名

如果某个方法签名或者类型参数在你的代码库中被多次使用,那就应该为它创建类型别名:

1
2
typealias MouseClickHandler = (Any, MouseEvent) -> Unit
typealias PersonIndex = Map<String, Person>

尽量使用 import xxx as xxx 避免命名冲突。

Lambda 参数

Lambda 中尽量使用 it 而不是显式指定参数名称,但是如果是在嵌套的 lambda 中,则应该为每个 lambda 表达式指定参数。

Lambda 返回值

尽量使得 lambda 表达式的返回值只有一个出口,而不是在多处使用 return,如果无法做到则应该使用匿名方法代替 lambda 表达式。

命名参数

如果调用的方法参数很多且有多个相同类型的参数,或者包含多个布尔值,则应该使用命名参数:

1
drawSquare(x = 10, y = 10, width = 100, height = 100, fill = true)

条件表达式

尽量使用条件表达式的返回值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// if 
return if (x) foo() else bar()

// when
return when(x) {
0 -> "zero"
else -> "nonzero"
}

// try..catch
val readFromInput: Int? = try {
parseInt(input)
} catch (e: NumberFormatException) {
null
}

Loops

尽量使用高阶函数 (filter, map 等) 而不是循环,除了 forEach

在决定使用高阶函数还是循环时,需要考虑使用场景和操作的性能消耗。

Loops on range

需要关闭区间时使用 until 而不是 x..n-1

1
2
3
4
// bad:
for (i in 0..n - 1) { /*...*/ }
// good:
for (i in 0 until n) { /*...*/ }

String

尽量使用字符串模板而不是字符串拼接。当需要换行时,优先使用多行字符串而不是 \n

Functions vs properties

对于没有参数的方法而言,其作用和自定义 getter 的属性是类似的。如果满足下列条件,则应该使用属性:

  • 不会抛出异常;
  • 结果的计算是比较轻量级的操作;
  • 只要对象的状态没有发生变化,调用的结果就不会发生变化;

扩展方法

请自由使用扩展方法,如果一个方法在某个对象上被多次调用,那就应该把它设为扩展方法。为了减少 API 污染,应该注意扩展方法的可见性。

中缀方法 (Infix functions)

只有在两个对象角色(功能)类似时才使用中缀方法,正确示例:and, to, zip,错误示例:add

不要将一个会改变接收对象的方法定义为中缀方法。

工厂方法

工厂方法名尽量不要定义成和类名一样,尽量使用能体现其作用的命名,比如 fromXxx 等等。

标准库

Scope functions

Kotlin 标准库中包含了一些在对象的上下文中执行代码块的方法,比如 let, run, with, apply, also 等,在调用这些方法时,会在 lambda 表达式中形成一个临时的 scope,在这个 scope 中,你可以直接访问对象的属性和方法。

举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 传统方式调用,需要创建一个变量,然后通过变量来调用其方法
val alice = Person("Alice", 20, "Amsterdam")
println(alice)
alice.moveTo("London")
alice.incrementAge()
println(alice)

// 使用 let
Person("Alice", 20, "Amsterdam").let {
println(it)
it.moveTo("London")
it.incrementAge()
println(it)
}

可以看到使用 scope function 可以使我们的代码更简短以及具有可读性。

本质上来看,所有这些方法的执行效果都是相同的,唯一不同的是对象被调用的方式以及返回的结果不同。所以,我们了解并区分它们的区别,并在合适的场景下调用它们。

最简单的使用原则可以参考以下几点:

  • 执行一段 lambda 表达式:let

  • 在当前上下文中,创建一个表达式并使用其结果作为变量:let

  • 对象构建,比如使用 Builder 模式:apply

  • 配置对象属性并计算结果:run

  • 执行需要含有表达式的语句:run

  • 添加更多效果:also

  • 将多个方法调用合并到一起:with

可以看到,有些功能并不是只有一种方法才能实现,以上只是推荐的做法。另外,虽然使用 scope functions 可以使你的代码更灵活,但是如果过度使用比如使用嵌套等也会降低你的代码的可读性。另外,也要注意链式调用时 context 的变化,比如对象的参数由 it 变成了 this 或者由 this 变成了 it

Distinctions

Scope functions 之间最明显的区别主要有两个:如何引用 context 对象以及返回值。

Context object: this or it

run, with, apply 都通过 this 引用当前的 context 对象,因此,我们可以像在对象中一样访问它的属性和方法。当然,我们也可以省略 this 关键字,但是,这样做有时候会造成与外部其它代码造成名字冲突,所以,推荐的做法是加上 this. 来访问对象的属性和方法。

letalso 则通过 it 引用当前的 context 对象,而且我们需要手动调用 it 来访问对象上的属性和方法,不过这样可以使我们的代码显得更清晰。

Return value

apply, also 返回的是当前的 context 对象本身。因此,它们可以使用链式调用:

1
2
3
4
5
6
7
8
9
val numberList = mutableListOf<Double>()
numberList.also { println("Populating the list") }
.apply {
add(2.71)
add(3.14)
add(1.0)
}
.also { println("Sorting the list") }
.sort()

也可以被用在 return 语句中:

1
2
3
4
5
fun getRandomInt(): Int {
return Random.nextInt(100).also {
writeToLog("getRandomInt() generated value $it")
}
}

let, run, with 返回的 lambda 表达式的结果。所以,你可以用它们给变量赋值,也可以使用链式调用:

1
2
3
4
5
6
val numbers = mutableListOf("one", "two", "three")
val countEndsWithE = numbers.run {
add("four")
add("five")
count { it.endsWith("e") }
}

let

在 let 方法中,context 对象以参数 it 的形式存在,返回值是 lambda 表达式的结果。

使用 let 方法可以帮我们减少创建一些临时变量,尤其是不打破链式调用的结构:

1
2
3
4
5
val numbers = mutableListOf("one", "two", "three", "four", "five")
numbers.map { it.length }.filter { it > 3 }.let {
println(it)
// and more function calls if needed
}

另外,如果 let 中只包含一个方法调用,可以使用方法引用:

1
numbers.map { it.length }.filter { it > 3 }.let(::println)

第三种用法是为变量创建一个局部 scope 提升代码的可读性:

1
2
3
4
5
// 需要在第一个数字上做一些操作
val modifiedFirstItem = numbers.first().let { firstItem ->
println("The first item of the list is '$firstItem'")
if (firstItem.length >= 5) firstItem else "!" + firstItem + "!"
}.uppercase()

with

with 方法是一个非扩展函数,context 对象通过 this 访问,返回值是 lambda 表达式的结果。但是不推荐在 with 方法中返回结果,最好只做一些操作,表示 “with this object, do the following.

1
2
3
4
5
val numbers = mutableListOf("one", "two", "three")
with(numbers) {
println("'with' is called with argument $this")
println("It contains $size elements")
}

另一种使用场景是使用对象的属性和方法计算值:

1
2
3
4
5
6
val numbers = mutableListOf("one", "two", "three")
val firstAndLast = with(numbers) {
"The first element is ${first()}," +
" the last element is ${last()}"
}
println(firstAndLast)

run

context 对象通过 this 访问,返回值是 lambda 表达式的结果。

run 方法的效果和 with 的效果一致,但是调用方式和 let 一样,作为扩展方法。run 适合在既需要对对象进行初始化配置,也需要对结果进行计算的情况:

1
2
3
4
5
6
7
8
9
10
11
12
val service = MultiportService("https://example.kotlinlang.org", 80)

val result = service.run {
port = 8080
query(prepareRequest() + " to port $port")
}

// 使用 let
val letResult = service.let {
it.port = 8080
it.query(it.prepareRequest() + " to port ${it.port}")
}

除此之外,run 方法还有一个非扩展方法,我们可以用它执行一些需要执行表达式的语句,比如变量声明时:

1
2
3
4
5
6
7
val hexNumberRegex = run {
val digits = "0-9"
val hexDigits = "A-Fa-f"
val sign = "+-"

Regex("[$sign]?[$digits$hexDigits]+")
}

apply

context 对象通过 this 访问,返回值是对象本身。

当不需要返回值,且主要对对象的成员和方法进行操作的时候可以使用 apply,最常见的是对象初始化配置:

1
2
3
4
val adam = Person("Adam").apply {
age = 32
city = "London"
}

由于返回的是对象本身,所以我们可以很方便地进行链式调用。

also

context 对象通过 it 访问,返回值是对象本身。

also 适用于需要对象的引用而不是其属性和方法的场景下,以及你不想污染 this 关键字的时候,可以将其理解为 “and also do the following with the object.

1
2
3
4
val numbers = mutableListOf("one", "two", "three")
numbers
.also { println("The list elements before adding new one: $it") }
.add("four")

takeIf and takeUnless

除了 scope functions 之外,标准库中还提供了 takeIftakeUnless 方法,可以让我们在使用对象之前对其状态进行检查。

takeIf 只有在对象满足断言时才返回对象,否则返回 null,takeUnless 则恰恰相反,只有在对象不满足断言时才返回对象,否则返回 null,所以 takeIftakeUnless 是对单个对象的筛选方法。

1
2
3
4
5
6
7
8
9
10
11
val number = Random.nextInt(100)

val evenOrNull = number.takeIf { it % 2 == 0 }
val oddOrNull = number.takeUnless { it % 2 == 0 }

evenOrNull?.let {
println("even: $evenOrNull")
}
oddOrNull?.let {
println("odd: $oddOrNull")
}

由于返回值可能为空,所以必须使用 ?.。而且可以看到 takeIftakeUnless 非常适合配合 scope functions 使用。

类与对象

泛型

和 Java 中一样,Kotlin 中也有类型参数:

1
2
3
class Box<T>(t: T) {
var value = t
}

如果通过构造器创建对象,类型参数也可以被推断出来,所以可以省略:

1
val box = Box(1)

型变

Java 中的泛型不是型变的,因此会带来很多问题,所以我们一般需要使用通配符来为泛型确定上下边界。Kotlin 中没有通配符类型,而是引入了声明处型变(declaration-site variance)和类型投影(type projections)。

声明处型变

先看个例子:

1
2
3
public interface List<E> extends Collection<E> { /* ... */ }

List<Number> numberList = new ArrayList<Integer>(); // Incompatible types

类似 List 这样的声明在 Java 中是不被允许的,因为 Java 中泛型不支持协变(Covariant,即 AB 的父类,同时 List<A> 也是 List<B> 的父类,则称 List 类是协变的),我们需要使用通配符来告诉编译器这种声明是安全的:

1
List<? extends Number> numbers = new ArrayList<Integer>();

而在 Kotlin 中,我们通过 out 标注类型参数来支持协变,并且确保它只是被返回(生产)从不被消费:

1
2
3
4
5
6
7
8
9
10
public interface List<out T> : Collection<T> {
fun get(index: Int): T // 返回类型叫 out 位置,生产类型为 T 的元素

// 参数类型叫 in 位置,它消费类型为 T 的值。使用 @UnsafeVariance 是为了避免编译器报错
fun indexOf(element: @UnsafeVariance T): Int
}

fun copyList(list: List<Int>) {
var source: List<Number> = list // 现在可以被允许了
}

这里有一个原则,如果一个类 C 的类型参数 T 被声明为 out 时,它就只能出现在 C 的成员的输出位置(返回类型),回报是 C<Base> 可以安全地作为 C<Derived> 的父类。这样,我们就可以称类型参数 T 在 C 上是协变的。你可以认为 C 是 T 的生产者,而不是 T 的消费者。

out 修饰符称为型变注解,并且由于它在类型参数声明处提供,所以我们称之为声明处型变。Java 中是在使用处通过通配符使得类型型变,Kotlin 与之正好相反。

另外,与之相对的,Kotlin 中还提供了另一个型变注释 in,它的作用是使得类型参数逆变(Contravariant,如果 AB 的父类,那么 List<A> 就是 List<B> 的子类型),只可以被消费而不能被生产。逆变类型的一个很好的例子是 Comparable

1
2
3
4
5
6
7
8
9
interface Comparable<in T> {
operator fun compareTo(other: T): Int
}

fun showContravariant(x: Comparable<Number>) {
x.compareTo(1.0) // 参数 1.0 的类型 Double,它是 Number 的子类型
// 因此,我们可以将 x 赋给类型为 Comparable<Double> 的变量
val y: Comparable<Double> = x // OK!
}

总结一下,我们使用 out 关键字把类声明成是协变 的,并且要求 T 只能在 out 位置(被生产),只有这样才能确保子类型才是安全的:List<Int>List<Number> 的子类型。使用 in 关键字使得类是逆变的,并且要求 T 只能在 in 位置(被消费)。

使用处型变

同样先看个例子:

1
2
3
4
5
6
7
8
9
10
11
12
fun copy(from: Array<Number>, to: Array<Number>) {
assert(from.size == to.size)
for (i in from.indices)
to[i] = from[i]
}

fun main() {
val ints: Array<Int> = arrayOf(1, 2, 3)
val nums = Array<Number>(3) { 3.3 }
// 无法调用 copy 方法,因为 Array<Int> 不是 Array<Number>!
// copy(ints, nums) // Type mismatch
}

Array 类需要能被读和写,因此既不能是协变的也不能是逆变的,这就带来了一个问题,Array<Number> 不能被转换为 Array<Int>,因此,我们需要在使用处 也就是 from 的类型参数前添加 out 关键字:

1
fun copy(from: Array<out Number>, to: Array<Number>) { …… }

我们把 from 称为一个受限制 (projected) 的数组,只可以调用返回类型为 T 的方法,这被叫做 type projection(不知道怎么翻译,Kotlin 中文网翻译为类型投影 )。

这其实就是 Kotlin 中的使用处型变,对应与 Java 中的 Array<? extends Number>,限制泛型类型的上边界。当然,我们也可以使用 in 关键字,它对应于 Java 中的 Array<? super Number>,限制泛型类型的下边界。不过,和 Java 中不同,我们可以在这样的限制了下边界的数组中,添加任何父类及子类元素:

1
val arr: Array<in String> = arrayOf('a', "abc", 123, Origin(), null) // 连 null 也可以添加
Star-projections

如果你对泛型参数的类型一无所知,但是依旧想要使用它,则可以使用 star-projections,用 * 表示。我觉得有点类似于 Java 中的捕获转换,使用无界通配符 <?> 去捕捉类型。

  • 如果用 House<*> 去捕获签名为 House<out T : Human> 的类,则捕获到的具体类型为 House<out Human>,这样你就可以安全地调用 House 中的成员方法和属性了。
  • 如果用 House<*> 去捕获签名为 House<in T> 的类,由于类型参数 T 是逆变的,而且没有任何有关 T 的类型,所以捕获到的类型为 House<in Nothing>,此时往 House 添加任何对象都是不安全的。
  • 如果用 House<*> 去捕获签名为 House<T : Human> 的类,则对于读取而言捕获到的是 House<out Human>,对于写入而言捕获到的是 House<in Nothing>

如果类的类型参数有多个,则每个类型参数都可以被单独 projected。比如类型声明为 interface Function<in T, out U>,则它的 proection 可以分为以下几种情况:

  • Function<*, String> 表示 Function<in Nothing, String>
  • Function<String, *> 表示 Function<String, out Any?>
  • Function<*, *> 表示 Function<in Nothing, out Any?>

泛型函数

Kotlin 中的泛型函数和 Java 中的泛型方法类似,类型参数要放在方法名之前:

1
2
3
4
5
// 声明泛型方法
fun <T> singletonList(item: T): List<T> { /* ... */ }

// 调用方法
val list = singletonList<Int>(1)

泛型约束

我们可以给泛型参数限定其可能的类型。

1
2
3
4
5
6
7
8
9
// 使用 : 限定其上界,如果没有指定,默认的上界是 Any?
fun <T : Comparable<T>> sort(list: List<T>) { /* ... */ }

// 如果同一个类型参数需要使用多个上界,则应该使用 where-clause
fun <T> copyWhenGreater(list: List<T>, threshold: T): List<String>
where T : CharSequence,
T : Comparable<T> {
// ...
}

类型擦除

与 Java 中类似,泛型的类型安全检测仅在编译期有效,运行时泛型类的实例不保留其类型实参的任何信息,其类型信息被擦除了,比如 Foo<Bar>Foo<Baz> 都会被擦除为 Foo<*>

1
2
3
4
5
6
7
val foo: Box<String> = Box("foo")
val bar: Box<Number> = Box(1)

println(foo is Box<*>) // true
println(bar is Box<*>) // true
println(foo.javaClass.toGenericString()) // Box<T>
println(bar.javaClass.toGenericString()) // Box<T>

函数与 Lambda 表达式

Kotlin 中函数是头等的,这意味着函数可以存储在变量、参数、数据结构中,或者从其它高等函数中被返回。可以简单理解为,函数也可以被当做变量使用。

高阶函数

高阶函数是指接收函数作为参数或者返回一个函数的函数。最具代表性的是 fold 函数:

1
2
3
4
5
6
7
8
9
10
fun <T, R> Collection<T>.fold(
initial: R,
combine: (acc: R, nextElement: T) -> R
): R {
var accumulator: R = initial
for (element: T in this) {
accumulator = combine(accumulator, element)
}
return accumulator
}

它接收一个初始值和一个计算函数。计算函数中包含一个累积值和 next 元素值,在方法体中将集合遍历,将遍历到的元素通过计算函数计算,得到新的累积值,并替换原有的累积值,最终返回结果。

我们可以通过以下方式调用该方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
items.fold(0, { acc: Int, i: Int -> // 参数后用 -> 分割
// 方法主体
print("acc = $acc, i = $i, ")
val result = acc + i
println("result = $result")
// 最后一个表达式作为返回值
result
})

// 参数类型如果可以被推断出来则可以省略
items.fold("Elements:") { acc, i -> "$acc $i" }

// 也可以使用方法引用
items.fold(1, Int::times)

函数类型

Kotlin 中用 (Int) -> String 这样的形式声明一个函数类型的变量,比如:

1
val onClick: () -> Unit = // ...

函数类型主要形式如下:

  • 所有函数类型都有一个圆括号括起来的参数类型列表以及一个返回类型:(A, B) -> C,表示它接受类型分别为 AB 的两个参数,并返回一个 C 类型的值。参数类型列表可以为空,比如 () -> A;如果没有返回值则必须注明,比如 (A, B) -> Unit
  • 函数类型可以有一个额外的接收者 类型,通过 A. 这样的形式表示。当调用函数时,第一个参数是该接收者,然后才是函数参数。另外,函数中可以通过 this 关键字引用该对象。带与不带接收者的函数类型可以互换。
  • 挂起函数是一种特殊的函数类型,表示法中包含一个 suspend 修饰符,如 suspend () -> Unit

声明函数类型时,函数的参数名是可选的。下面看几个例子:

1
2
3
4
5
// 可为空的函数类型
val nullableFun : ((Int, Int) -> Int)? = // ...

// 函数的返回值也为函数类型,使用 () 括起来就可以了
val returnFun : (Int) -> ((Int) -> Unit) = // ...

我们还可以通过类型别名给函数起一个别名:

1
typealias ClickHandler = (View) -> Unit

函数类型实例化

我们主要可以通过以下几种方法获得函数类型的实例:

  • 使用函数字面值的代码块

    • lambda 表达式:{ a, b -> a + b }
    • 匿名函数:fun(s: String): Int { return s.toIntOrNull() ?: 0 }
  • 使用已声明的可调用引用:

    • 顶层、局部、成员、扩展函数:::isOddString::toInt
    • 顶层、成员、扩展属性:List<Int>::size
    • 构造函数:::Regex
  • 使用实现函数类型接口的自定义类的实例:

    1
    2
    3
    4
    5
    class IntTransformer: (Int) -> Int {
    override operator fun invoke(x: Int): Int = TODO()
    }

    val intFunction: (Int) -> Int = IntTransformer() // 实例化

如果有足够信息,编译器可以推断出具体的函数类型:

1
val a = { i: Int -> i + 1 } // 推断出的类型是 (Int) -> Int

不过,函数类型推断默认推断出的是没有接收者的函数类型:

1
val a = { i: Int, s: String -> s + i }

上面的例子中,默认推断出的是 (String, Int) -> String 而不是 String.(Int) -> String。如果这不符合你的需要,请显式指定函数类型。

函数类型实例调用

除了直接调用之外,我们还可以使用 invoke() 对函数进行调用:

1
2
3
4
val stringPlus: (String, String) -> String = String::plus

println(stringPlus("Hello, ", "world!"))
println(stringPlus.invoke("<-", "->"))

Lambda 表达式

语法

完整语法如下:

1
2
// 声明一个名称为 sum 的函数类型,并使用 lambda 表达式初始化
val sum: (Int, Int) -> Int = { x: Int, y: Int -> x + y }

Lambda 表达式总是括在花括号中,完整语法形式的参数声明放在花括号内,并有可选的类型标注,函数体跟在一个 -> 符号之后。如果推断出的该 lambda 的返回类型不是 Unit,那么该 lambda 主体中的最后一个表达式会被视为返回值:

1
2
3
4
5
val result = {
val s = "hi"
s.length
}
println(result is Function<Int>) // 返回值是 Int

传递末尾的 lambda 表达式

如果函数的最后一个参数是函数类型,那么作为参数传入的 lambda 表达式可以放在圆括号之外:

1
val product = items.fold(1) { acc, e -> acc * e }

这种语法叫做拖尾 lambda (trailing lambdas),如果该 lambda 表达式是调用时唯一的参数,那么圆括号可以省略:run { println("...") }

it:单个参数的隐式名称

如果 lambda 表达式中只有一个参数,那么我们可以省略它以及 ->,该参数会被隐式声明为 it

1
ints.filter { it > 0 }

从 lambda 表达式中返回值

我们可以使用标签返回的语法从 lambda 显式返回一个值。 否则,将隐式返回最后一个表达式的值。

下划线用于未使用的变量

如果 lambda 表达式的参数未使用,那么可以用下划线取代其名称:

1
map.forEach { _, value -> println("$value!") }

lambda 表达式中的解构声明

1
map.mapValues { (key, value) -> "$value!" }

匿名函数

Lambda 表达式缺少指定函数的返回类型的能力。在大多数情况下,这是不必要的,因为返回值类型可以被推断出来,但是如果你需要显式指定返回值类型,就可以使用匿名函数代替 lambda 表达式。

匿名函数与普通函数的唯一区别是匿名函数省略了函数名称。其使用方式和 lambda 表达式基本一致:

1
val add = fun(x: Int, y: Int): Int = x + y

不过,不同的是,匿名函数的参数必须在括号内才能传递。

1
ints.filter(fun(item) = item > 0)

除此之外,还有非局部返回的不同。非 lambda 表达式中,不带标签的 return 总是在函数中直接返回,而 lambda 表达式是从包含它的函数返回,所以在 lambda 表达式中如果要正确地 return 需要使用标签:

1
2
3
4
5
ints.filter {
val mold = it % 2
println(mold)
return@filter mold == 0
}

闭包

Lambda 表达式或者匿名函数(以及局部函数对象表达式) 可以访问其闭包 ,即在外部作用域中声明的变量。与 Java 不同,Kotlin 中不但允许访问闭包中的非 final 变量,还允许直接修改它们:

1
2
3
4
5
6
var sum = 0
ints.filter { it > 0 }.forEach {
// 修改闭包中的变量
sum += it
}
print(sum)

其实现原理是,Kotlin 为我们捕捉并保存了可变变量的引用,然后在我们修改其值的时候改变引用。所以,如果变量被 lambda 表达式捕捉,其声明周期会和 lambda 的表达式的生命周期一致。

内联函数

使用高阶函数会带来一些运行时的效率损失:每一个函数都是一个对象,并且会捕获一个闭包,即在那些函数体内会访问到的变量,对于函数对象和类的内存分配和虚拟调用会引入运行时开销。举个例子:

1
lock(l) { foo() }

在这个方法中,foo() 方法其实只有在被调用到时才起作用,其他时候只是作为参数传递,所以我们期望的行为是希望编译器可以帮我们生成这样一个方法:

1
2
3
4
5
6
lock.lock()
try {
body() // 对目标方法进行调用
} finally {
lock.unlock()
}

为了让编译器这么做,我们需要使用 inline 关键字。

1
inline fun <T> lock(lock: Lock, body: () -> T): T { /* ... */ }

inline 修饰符影响函数和传给它的 lambda 表达式:所有这些都被内联到调用的地方。也就是 lambda 表达式成为函数调用者定义的一部分,而不是保存在匿名类中。

内联虽然有可能会导致生成的方法数增加,但是只要内联的方法体不是太大就可以节省性能开销。

禁用内联

如果希望只内联一部分函数,我们可以在内联函数上使用 noinline 关键字:

1
inline fun foo(inlined: () -> Unit, noinline notInlined: () -> Unit) { /* ... */ }

非局部返回

在 Kotlin 中,我们只能对具名或匿名函数使用正常的、非限定的 return 来退出。 但是,如果一个 lambda 表达式是内联的,那么就可以使用非局部返回

1
2
3
4
5
fun foo(s: String) {
s.let {
return // OK:该 lambda 表达式是内联的
}
}

不仅仅是 let,所有的 scope functions 都是内联的。

一些内联函数可能调用传给它们的不是直接来自函数体、而是来自另一个执行上下文的 lambda 表达式参数,例如来自局部对象或嵌套函数。在这种情况下,该 lambda 表达式中也不允许非局部控制流。为了标识这种情况,该 lambda 表达式参数需要用 crossinline 修饰符标记:

1
2
3
4
5
6
// 使用 crossinline 修饰,不允许局部返回
inline fun f(crossinline body: () -> Unit) {
val f = object: Runnable {
override fun run() = body() // 比如这段代码执行的 context 和当前 context 不同
}
}

具体化的类型参数

有时候,我们需要得到类型参数的具体信息,这个时候可以在内联函数中使用 reified 关键字:

1
2
3
4
5
6
7
inline fun <reified T : Number> TreeNode.findParentOfType(): T? {
var p = parent
while (p != null && p !is T) {
p = p.parent
}
return p as T?
}

例子中,由于函数是内联的,不需要反射,所以 !isas 都可以使用了。

内联属性

inline 修饰符可用于没有幕后字段的属性的访问器。既可以单独标注某个属性访问器,也可以标注整个属性使得两个访问器都是内联的:

1
2
3
4
5
6
7
8
9
10
val foo: Foo
inline get() = Foo()

var bar: Bar
get() = // ...
inline set(v) { /* ... */ }

inline var bar: Bar
get() = // ...
set(v) { /* ... */ }

协程

异步编程技术

在学习协程之前,让我们先回顾一下已有的异步编程方案。

线程

线程可能是目前为止最著名的防止程序造成阻塞的方案。

1
2
3
4
5
6
7
8
9
10
fun postItem(item: Item) {
val token = preparePost()
val post = submitPost(token, item)
processPost(post)
}

fun preparePost(): Token {
// makes a request and consequently blocks the main thread
return token
}

比如上面这段代码中,我们需要在 preparePost() 方法中做网络请求获取数据,我们可以把它放在子线程中来防止 UI 被阻塞,但是这样做有一些缺陷:

  • 创建线程所需的性能开销并不低。线程造成的上下文切换非常昂贵。
  • 线程不是无限制的。可创建的线程数量受限于当前操作系统,如果是服务端的应用程序,这会造成主要的瓶颈。
  • 线程不一定总是可用。在一些平台,比如在 JavaScript 中就不支持线程。
  • 线程的使用并不简单。在多线程编程中,多线程应用的调试和避免出现竞争状况是常见的问题。

回调

另一种思路是使用回调,也就是将目标方法作为参数传递到另一个函数中,在任务结束时再对目标函数进行调用。

1
2
3
4
5
6
7
8
9
10
11
12
fun postItem(item: Item) {
preparePostAsync { token ->
submitPostAsync(token, item) { post ->
processPost(post)
}
}
}

fun preparePostAsync(callback: (Token) -> Unit) {
// make request and return immediately
// arrange callback to be invoked later
}

使用回调看起来优雅了许多,但是依旧存在一些问题:

  • 多层嵌套导致代码变复杂。
  • 错误处理变得异常困难。

Futures / Promises 及其它

Future / Promises 背后的思想是当我们调用了异步请求之后,我们会得到一个 Promise 对象,其中包含了异步请求成功或失败的结果,然后我们可以对它进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fun postItem(item: Item) {
preparePostAsync()
.thenCompose { token ->
submitPostAsync(token, item)
}
.thenAccept { post ->
processPost(post)
}
}

fun preparePostAsync(): Promise<Token> {
// makes request and returns a promise that is completed later
return promise
}

这种解决方式需要我们改变编程方式,具体而言:

  • 不同的编程模型。从自上而下的命令式编程到通过链式调用的组合式编程。
  • 需要学习如何使用一套全新的 API。
  • 指定返回值类型。返回值从原始的真实数据到 Promise 对象。
  • 错误处理变得异常复杂。

响应式插件

响应式插件 (Reactive Extensions, Rx) 最初是在 C# 中被 Erik Meijer 提出的,后来 Netflix 将它移植到了 Java 中创造了 RxJava,于是慢慢受到了越来越多人的青睐。其背后的思想是 observable streams,数据以可被观察的流的形式存在。与 Future 返回具体的对象不同,Rx 返回的是数据流,并且使用观察者模式。

如果你接受并理解了 Rx 的核心理念,那么这种编程习惯的确可以很快被应用到其它平台上,而且其错误处理也比前面提到的几种更好一些。

协程

Kotlin 处理异步代码的方式是通过协程,其思想核心是可挂起的运算:函数可以将它的执行挂起,并在稍后继续执行。协程最大的优势是开发者可以像写阻塞式代码一样写非阻塞式代码(写出的异步代码和顺序执行的代码一样):

1
2
3
4
5
6
7
8
9
10
11
12
fun postItem(item: Item) {
launch {
val token = preparePost()
val post = submitPost(token, item)
processPost(post)
}
}

suspend fun preparePost(): Token {
// makes a request and suspends the coroutine
return suspendCoroutine { /* ... */ }
}

在上面这个例子中,postItem 中会执行一些耗时操作,但是它不会阻塞主线程,preparePost)() 就是一个可挂起的函数,它会在执行并返回结果之后,再继续往下执行其它代码。

相比前面的一些方案,协程具有以下优势:

  • 方法签名不需要改变;
  • 代码结构也不需要改变,我们可以像写同步代码一样编写异步代码;
  • 编程模型和 API 保持可用,比如使用循环、异常处理等保持一致;
  • 平台独立性,无论是针对 JVM、JavaScript 或者其它平台,代码始终保持一致,编译器会为我们适配到各自的平台。

Kotlin 并不是唯一采用这种异步编程思想的语言,比如 C# Go 等语言很早就开始使用了。比较特殊的是,除了 suspend 关键字之外,Kotlin 中协程功能全都是以库的形式提供的,我们需要导入 kotlinx.coroutines 包才能使用协程。

协程基础

一个协程是一个可终止运算的实例。从概念上看,它与线程相似,因为它需要运行一块与其余代码块同时工作的代码块。但是,协程不绑定到任何特定线程,它可以在一个线程中暂停执行,然后在另一个线程中恢复执行。因此,协程可以被看作是轻量级的线程。

1
2
3
4
5
6
7
8
fun main() = runBlocking { // runBlocking 用于连接阻塞式代码和协程代码
val job = GlobalScope.launch { // 启动一个新协程并保持对这个 job 的引用
delay(1000L)
println("World!")
}
println("Hello,")
job.join() // 等待直到子协程执行结束
}

结构化并发

以上示例中,我们创建了一个顶层协程,保持对其的引用并将协程挂起直至 job 执行结束。这存在一些问题,比如创建顶层协程需要消耗更多的资源,手动保持对协程的引用容易出错等等。

更好的做法是使用结构化并发。我们可以在 runBlocking 所在的 CoroutineScope 中直接启动一个新协程,这样就毋需显式 join 它了。正因为在同一个作用域中,所以会等待所有启动的协程都执行完毕后才会退出:

1
2
3
4
5
6
7
fun main() = runBlocking { // this: CoroutineScope
launch { // 在 runBlocking 作用域中启动一个新协程
delay(1000L)
println("World!")
}
print("Hello ")
}

依次介绍下这里涉及到的几个协程函数:

  • launch 是一个协程构造器,它会相对其余代码并发地启动一个新的协程,并且与它们保持相互独立。这也是为什么 “Hello “ 会被先打印出来。
  • delay 是一个特殊的挂起函数,它会将当前协程挂起指定的一段时间。挂起一个协程不会阻塞当前所在的线程,但是会允许其它协程运行并且使用当前线程运行它们的代码。
  • runBlocking 也是一个协程构造器,它桥接起了外部非协程代码和方法体中的协程代码。

所谓的结构化并发指的是新的协程只能在 CoroutineScope 中才能启动,这样就限定了该协程的生命周期。在真实的使用场景中,我们通常会启动非常多的协程,结构化并发保证了这些协程不会被丢失或者泄露。外部的协程只有等待内部的子协程全都执行完毕才会退出。另外,结构化并发也保证了当代码出错的时候,错误能够被正确地抛出而且不会被丢失。

CoroutineScope vs CoroutineContext

CoroutineScope 除了提供一个 CoroutineContext,还负责保存并发结构以及多个协程间的父子关系,我们只能在 CoroutineScope 中启动一个新协程。CoroutineContext 则保存了协程相关的信息。

挂起函数

我们可以将上面打印 “World!” 的部分代码提取到一个 printWorld() 方法中:

1
2
3
4
5
// 挂起函数
suspend fun doWorld() {
delay(1000L)
println("World!")
}

可以看到提取该方法时,IDE 为我们自动添加了 suspend 修饰符,这样的函数被称为挂起函数 (suspending function)。挂起函数的作用是可以让我们在协程中调用其它挂起函数,并且暂停执行。

作用域构建器

我们还可以使用 coroutineScope 创建自己的协程作用域,它和 runBlocking 一样会等待代码主体和子协程执行完毕,唯一的不同之处是 runBlocking 会阻塞当前线程,但 coroutineScope 只是挂起,它会释放占有的当前线程下的资源。正因如此,coroutineScope 是一个挂起函数,而 runBlocking 只是普通的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
fun main() = runBlocking {
doWorld()
println("Done")
}

suspend fun doWorld() = coroutineScope {
launch {
delay(2000L)
println("World 2")
}
launch {
delay(1000L)
println("World 1")
}
println("Hello")
}

// 输出:
Hello
World 1
World 2
Done

在上面这个例子中,由于 coroutineScope 不会阻塞当前线程,所以当它在内部启动两个协程之后,这些代码会被同步执行。而 runBlocking 是阻塞式运行的,所以它会等待 doWorld() 执行完毕。所以,最先被打印的是 “Hello”,然后是 delay 了 1 秒的 “World 1”,再然后是 delay 了 2 秒的 “World 2”,最后才是 “Done”。

全局协程像守护线程

看个例子:

1
2
3
4
5
6
7
GlobalScope.launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L)

这里打印的结果是:

1
2
3
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...

原因是在 GlobalScope 中启动的活动协程并不会使进程保活,它们就像守护线程。

取消与超时

取消

1
2
3
job.cancel()
// or
job.cancelAndJoin()

如果协程没有检查取消状态,那么仅仅调用 cancel() 是无法被取消的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
fun cancelIsCooperative() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5) {
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L)
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消 job 并且等待它结束
println("main: Now I can quit.")
}

// 输出
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm sleeping 3 ...
job: I'm sleeping 4 ...
main: Now I can quit.

为了使取消起作用,我们需要对子协程做以下修改:

1
2
3
4
// isActive 是 CoroutineScope 的扩展属性
while (isActive) {
// ...
}

我们还可以在 finally 代码块中释放资源:

1
2
3
4
5
6
7
8
try {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
} finally {
println("job: I'm running finally")
}

对于不可取消的协程还可以在 finally 中判断:

1
2
3
4
5
6
7
8
9
10
11
12
13
try {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
} finally {
// NonCancellable 是一个对象,且需要结合 withContext 方法使用
withContext(NonCancellable) {
println("job: I'm running finally")
delay(1000L)
println("job: And I've just delayed for 1 sec because I'm non-cancellable")
}
}

超时

由于协程很有可能会超时,所以协程库为我们提供了 withTimeout() 函数:

1
2
3
4
5
6
7
8
fun runWithTimeout() = runBlocking {
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
}

运行结果会抛出 TimeoutCancellationException 异常,它是 CancellationException 的子类,我们之前没有看到这个异常是因为被取消的协程中,即使抛出 CancellationExcetption 也被认为是正确退出的。

如果不想看到异常,我们可以使用 withTimeoutOrNull() 方法,该方法中对 TimeoutCancellationException 进行了捕捉:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public suspend fun <T> withTimeoutOrNull(timeMillis: Long, block: suspend CoroutineScope.() -> T): T? {
if (timeMillis <= 0L) return null

var coroutine: TimeoutCoroutine<T?, T?>? = null
try {
return suspendCoroutineUninterceptedOrReturn { uCont ->
val timeoutCoroutine = TimeoutCoroutine(timeMillis, uCont)
coroutine = timeoutCoroutine
setupTimeout<T?, T?>(timeoutCoroutine, block)
}
} catch (e: TimeoutCancellationException) {
if (e.coroutine === coroutine) {
return null
}
throw e
}
}

withTimeout 中的 timeout 事件是异步的,并且有可能在任何时间点发生,甚至在内部代码块返回值被返回之前。来看个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
var acquired = 0
var released = 0

class Resource {
init {
acquired++
}

fun close() {
released++
}
}

/**
* 由于 timeout 事件是异步的,所以最终,acquired 事件触发的此时有可能多于 released 事件
* */
fun runTimeoutWithCloseResource() {
runBlocking {
repeat(100_000) {
launch {
val resource = withTimeout(60) {
delay(50)
Resource() // 由于 timeout 事件随时都有可能被触发,所以这里有可能会被调用多次
}
resource.close()
}
}
}
println(acquired)
println(released)
}

为了解决这个问题,我们可以使用 try..finally 语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 使用 try..finally 保证 Resource 的 acquire 和 release 都是成对的
* */
fun runTimeoutWithCloseResourceSafely() {
runBlocking {
repeat(100_000) {
launch {
// 不依赖 withTimeout 的返回值
var resource: Resource? = null
try {
withTimeout(60) {
delay(50)
resource = Resource()
}
} finally {
resource?.close()
}
}
}
}
println(acquired)
println(released)
}

组合挂起函数

首先看个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
fun main() {
sequentialInvocation()
}

fun sequentialInvocation() = runBlocking {
val time = measureTimeMillis {
val one = doSomethingUsefulOne()
val two = doSomethingUsefulTwo()
println("The answer is ${one + two}")
}
println("Completed in $time ms")
}

suspend fun doSomethingUsefulOne(): Int {
println("Calculating one...")
delay(1000L) // 假设我们在这里做了一些有用的事
return 13
}

suspend fun doSomethingUsefulTwo(): Int {
println("Calculating two...")
delay(1000L) // 假设我们在这里也做了一些有用的事
return 29
}

由于它们都是运行在协程中的,所以如果按顺序调用,则它们也会像常规方法一样,按顺序被执行。

使用 async 并发

上面的例子中,doSomethingUsefulOnedoSomethingUsefulTwo 之间并没有依赖,为了更快地得到结果,我们可以对它们使用并发,这需要用到 async 关键字:

1
2
3
4
5
6
7
8
fun concurrentAsync() = runBlocking {
val time = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}

asyncCoroutineScope 的扩展方法,默认会在调用后立即执行,并且返回一个 Deferred 作为结果,我们可以在 Deferred 上调用 await() 获取结果值。

惰性启动的 async

async 可以通过将 start 参数设置为 CoroutineStart.LAZY 而变为惰性的。在这个模式下,只有结果通过 await 获取的时候协程才会启动,或者在 Jobstart 函数调用的时候。

1
2
3
4
5
6
7
8
9
10
fun concurrentAsyncLazy() = runBlocking {
val time = measureTimeMillis {
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
one.start()
two.start()
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}

async 风格的函数

我们可以定义异步风格的函数来异步 地调用 doSomethingUsefulOnedoSomethingUsefulTwo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fun main() {
val time = measureTimeMillis {
// 我们可以在协程外面启动异步执行
val one = somethingUsefulOneAsync()
val two = somethingUsefulTwoAsync()
// 但是等待结果必须调用其它的挂起或者阻塞
// 当我们等待结果的时候,这里我们使用 `runBlocking { …… }` 来阻塞主线程
runBlocking {
println("The answer is ${one.await() + two.await()}")
}
}
println("Completed in $time ms")
}

fun somethingUsefulOneAsync() = GlobalScope.async {
doSomethingUsefulOne()
}

fun somethingUsefulTwoAsync() = GlobalScope.async {
doSomethingUsefulTwo()
}

这里我们通过 GlobalScope 对象创建出来的 CoroutineScope 创建出一个 async 协程,并在其中调用我们的挂起函数。这样,我们就可以在协程外调用该方法了,因为该方法不依赖外界是否是 CoroutineScope。而且这些方法总是异步且并发被执行的(在顶层协程中)。不过,在获取运行结果 (Deferred.await()) 的时候,我们需要等待挂起函数执行的结果,这里的例子里我们使用了 runBlocking 阻塞主线程并创建了一个 CoroutineScope 来等待执行结束。

使用 async 的结构化并发

上面的例子中,虽然我们可以这么做,但是 Kotlin 中并不推荐这种异步编程的风格。考虑一下,如果程序在 somethingUsefulOneAsync 或者在 one.await() 中发生错误抛出了异常,那么,somethingUsefulTwoAsync 依旧会被执行,这明显破坏了结构化并发的原则。所以我们可以对代码做以下修改:

1
2
3
4
5
suspend fun concurrentSum(): Int = coroutineScope {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
one.await() + two.await()
}

我们把 doSomethingUsefulOnedoSomethingUsefulTwo 放到了同一个 CoroutineScope 中,这样,当程序抛出异常的时候,所有在当前作用域内启动的协程都会被取消:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
fun runFailedConcurrentSum() = runBlocking {
try {
failedConcurrentSum()
} catch (e: ArithmeticException) {
println("Computation failed with ArithmeticException")
}
}

suspend fun failedConcurrentSum(): Int = coroutineScope {
val one = async {
try {
delay(Long.MAX_VALUE) // 模拟一个长时间的运算
42
} finally {
// 结束或者取消时会被打印
println("First child was cancelled")
}
}
val two = async<Int> {
println("Second child throws an exception")
throw ArithmeticException()
}
one.await() + two.await()
}

协程上下文与调度器

协程总是运行在一些以 CoroutineContext 类型为代表的上下文中。协程上下文是各种不同元素的集合,其中主元素是协程中的 Job

调度器与线程

协程上下文包含一个协程调度器 CoroutineDispatcher,它确定了相关的协程在哪个线程或哪些线程上执行。协程调度器可以将协程限制在一个特定的线程执行,或将它分派到一个线程池,亦或是让它不受限地运行。

所有的协程构建器诸如 launchasync 接收一个可选的 CoroutineContext 参数,它可以被用来显式的为一个新协程或其它上下文元素指定一个调度器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fun main() = runBlocking<Unit> {
launch { // 运行在父协程的上下文中,即 runBlocking 主协程
println("main runBlocking : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Unconfined) { // 不受限的——将工作在主线程中
println("Unconfined : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Default) { // 将会获取默认调度器
println("Default : I'm working in thread ${Thread.currentThread().name}")
}
launch(newSingleThreadContext("MyOwnThread")) { // 将使它获得一个新的线程
println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
}
}

异步流

挂起函数可以异步的返回单个值,但是该如何异步返回多个计算好的值呢?这正是 Kotlin 流(Flow)的用武之地。

序列与流

如果使用一些消耗 CPU 资源的阻塞代码计算数字(每次计算需要 100 毫秒)那么我们可以使用 Sequence 来表示数字:

1
2
3
4
5
6
7
8
9
10
private fun printSequenceList() {
val sequence: Sequence<Int> = sequence { // 序列构建器
for (i in 1..3) {
Thread.sleep(300) // 假装我们正在计算
yield(i) // 产生下一个值
}
}

sequence.forEach { println(it) }
}

我们可以使用挂起函数,在不阻塞的情况下执行其工作并将结果作为列表返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private fun printDelayedList() = runBlocking {
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(300)
}
}

val list = mutableListOf<Int>()
for (i in 1..3) {
delay(300) // delay 函数不是阻塞式的
list.add(i)
}

list.forEach { println(it) }
}

使用 List 结果类型,意味着我们只能一次返回所有值。 为了表示异步计算的值流(stream),我们可以使用 Flow 类型(正如同步计算值会使用 Sequence 类型):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private fun flowList() = flow { // 流构建器
for (i in 1..3) {
delay(100) // 假装我们在这里做了一些有用的事情
println("Emitting $i")
emit(i) // 发送下一个值
}
}

private fun printFlowList() = runBlocking {
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(300)
}
}
flowList().collect { println(it) }
}

使用 flow 具有以下特点:

  • 名为 flow 的构建器函数
  • flow { ... } 构建块中的代码可以挂起
  • flow 不是 suspend 函数
  • flow 使用 emit 函数发射
  • flow 使用 collect 函数收集

流的取消

流采用与协程同样的协作取消。

1
2
3
4
5
6
private fun runFlowWithTimeOut() = runBlocking {
withTimeoutOrNull(800) { // 超时后 flow 不再继续执行
flowList().collect { value -> println(value) }
}
println("Done")
}

流是冷的

Flow 是一种类似于序列的冷流——只有在被 collect 的时候才会运行。

流是连续的

  • 流的每次单独收集都是按顺序执行的,除非进行特殊操作的操作符使用多个流。
  • 收集过程直接在当前协程中运行,默认情况下不启动新协程。
  • 从上游到下游每个过渡操作符都会处理每个发射出的值然后再交给末端操作符。

流上下文

流的收集总是在调用协程的上下文中发生,流的该属性称为上下文保存 。默认情况下,flow { ... } 构建器中的代码总是运行在相应流的收集器提供的上下文中。

withContext 发出错误

然而,长时间运行的消耗 CPU 的代码也许需要在 Dispatchers.Default 上下文中执行,并且更新 UI 的代码也许需要在 Dispatchers.Main 中执行。通常,withContext 用于在 Kotlin 协程中改变代码的上下文,但是 flow {...} 构建器中的代码必须遵循上下文一致原则,并且不允许从其他上下文中发射(emit)。

1
2
3
4
5
6
7
8
9
10
11
12
private fun runFlowWithContextDispatchers() = runBlocking {
flow {
// 在流构建器中更改上下文,会抛出异常:
// java.lang.IllegalStateException: Flow invariant is violated
withContext(Dispatchers.Default) {
for (i in 1..3) {
delay(300)
emit(i)
}
}
}.collect { println(it) }
}

流构建器

除了使用 flow { ... } 构建 Flow 之外,我们还可以使用:

  • flowOf:定义了一个发射固定值集的流
  • .asFlow():扩展函数,可以将各种集合与序列转换为流

比如可以将一个整数区间转换为流:(1..3).asFlow().collect { value -> println(value) }

流操作符

过渡操作符 map

我们可以像使用集合与序列一样,使用操作符对流进行转换,比如 filtermap。过渡操作符应用于上游流,并返回下游流,而且像流一样也是冷操作符。

1
2
3
4
5
6
7
8
9
10
11
suspend fun performRequest(request: Int): String {
delay(1000) // 模仿长时间运行的异步工作
return "response $request"
}

private fun runFlowWithFilterAndMap() = runBlocking {
(1..3).asFlow() // 一个请求流
.filter { request -> request > 1 }
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
转换操作符 transform

我们还可以使用 转换操作符 实现更为复杂的转换,使用形式 transformXxx

1
2
3
4
5
6
7
8
fun runFlowWithTransform() = runBlocking {
(1..3).asFlow() // 一个请求流
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
}
限长操作符 take

我们可以使用限长操作符在流触及相应限制的时候,将它的执行取消。形式如 takeXxx

1
2
3
4
5
fun runFlowWithTake() = runBlocking {
(1..3).asFlow()
.take(2)
.collect { response -> println(response) }
}
末端操作符

末端操作符是在流上用于启动流收集的挂起函数collect 是最基础的末端操作符,但是还有另外一些更方便使用的末端操作符:

  • 转化为各种集合,例如 toListtoSet
  • 获取第一个 first 值与确保流发射单个 single 值的操作符
  • 使用 reducefold 将流规约到单个值
1
2
3
4
5
6
7
private fun runFlowWithTerminalOperators() = runBlocking {
(1..5).asFlow()
.map { it * it } // 数字 1 至 5 的平方
// .toList()
.reduce { a, b -> a + b } // 求和
.let { println(it) }
}
flowOn 操作符

前面说过不允许直接使用 withContext 修改上下文,所以正确更改流发射的上下文需要通过 flowOn 操作符:

1
2
3
4
5
private fun runFlowWithFlowOn() = runBlocking {
flowList()
.flowOn(Dispatchers.Default)
.collect { println(it) }
}
缓冲操作符

我们可以使用 buffer 操作符来并发运行上流中发射元素的代码以及 collect 中的代码,而不是顺序运行它们:

1
2
3
4
5
6
7
8
9
10
11
private fun runFlowWithBuffer() = runBlocking {
val time = measureTimeMillis {
flowList()
.buffer() // 缓冲发射项,无需等待
.collect { value ->
delay(300)
println(value)
}
}
println("Collected in $time ms")
}
conflate

当流只代表部分操作结果或操作状态更新时,可能没有必要处理每个值,而是只处理最新的值。当收集器处理它们太慢的时候,我们可以使用 conflate 操作符,用于跳过中间值:

1
2
3
4
5
6
7
8
9
10
11
private fun runFlowWithConflate() = runBlocking {
val time = measureTimeMillis {
flowList()
.conflate() // 合并发射项,不对每个值进行处理
.collect { value ->
delay(300)
println(value)
}
}
println("Collected in $time ms")
}

运行结果:

1
2
3
4
5
6
Emitting 1
Emitting 2
Emitting 3
1
3
Collected in 797 ms

可以看到,虽然第一个数字仍在处理中,但第二个和第三个数字已经产生,因此第二个被 conflated,只有最新的(第三个)被交付给收集器。

collectLatest

当发射器和收集器都很慢的时候,合并是加快处理速度的一种方式,它通过删除发射值来实现。另一种方式是取消缓慢的收集器,并在每次发射新值的时候再收集:

1
2
3
4
5
6
7
8
9
10
11
private fun runFlowWithCollectLatest() = runBlocking {
val time = measureTimeMillis {
flowList()
.collectLatest { value -> // 只收集最新的值
println("Collecting $value")
delay(300)
println("Done with $value")
}
}
println("Collected in $time ms")
}

在这个例子中,由于 flowList 每个 100ms 发射一个新值,但是收集的时候会被 delay 300ms,所以只有最后一个值才会被收集。

合并操作符
Zip

与标准库中的 Sequence.zip 扩展函数一样,流拥有一个 zip 操作符用于组合两个流:

1
2
3
4
5
6
private fun runFlowWithZip() = runBlocking {
val nums = (1..3).asFlow()
val strs = flowOf("one", "two", "three")
nums.zip(strs) { a, b -> "$a -> $b" } // 组合成新的字符串
.collect { println(it) } // 收集并打印
}
combine

当流表示一个变量或操作的最新值时,可能需要执行计算,我们可以使用 combine 来对上游流进行重新计算:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 各个时间节点下产生的单个和合并后的事件
*
* Time : 300 400 600 800 900 1200
* Single Event : 1 one 2 two 3 three
* Combined Event: 1->one 2->one 2->two 3->two 3->three
* */
private fun runFlowWithCombine() = runBlocking {
val nums = (1..3).asFlow().onEach { delay(300) } // 发射数字 1..3,间隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射一次字符串
val startTime = System.currentTimeMillis() // 记录开始的时间
nums.combine(strs) { a, b -> "$a->$b" } // 使用“zip”组合单个字符串
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
展平操作符

流表示异步接收的值的序列,所以很容易遇到这样的情况:每个值都会触发对另一个值序列的请求。所有当我们对流进行操作的时候会出现包含流的流,这个时候我们就需要对流进行展平 (flatten) 然后再进行其它操作。

flatMapConcat

展平连接主要由 flatMapConcatflattenConcat 操作符实现。

flatMapMerge

另一种展平模式是并发收集所有传入的流,并将它们的值合并到一个单独的流,以便尽快的发射值。 它由 flatMapMergeflattenMerge 操作符实现。他们都接收可选的用于限制并发收集的流的个数的 concurrency 参数(默认情况下,它等于 DEFAULT_CONCURRENCY)。

flatMapLatest

collectLatest 操作符类似,在发出新流后立即取消先前流的收集,这由 flatMapLatest 操作符来实现。

异常操作符

当运算符中的发射器或代码抛出异常时,我们可以使用异常操作符对异常进行处理。

使用 try..catch 捕获异常
1
2
3
4
5
6
7
8
9
10
private fun runFlowWithTryCatch() = runBlocking {
try {
flowList().collect { value ->
println(value)
check(value <= 1) { "Collected $value" } // 值 > 1 的时候抛出一个 IllegalStateException
}
} catch (e: Throwable) {
println("Caught $e")
}
}

但是,上面这个例子中实际上捕获了任何在发射器或者过渡操作符、末端操作符中抛出的异常。

catch 操作符

发射器可以使用 catch 操作符来保留此异常的透明性并允许封装它的异常处理。catch 操作符的代码块可以分析异常并根据捕获到的异常以不同的方式对其做出反应,它的特点是:

  • 可以使用 throw 重新抛出异常
  • 可以使用 emit 将异常转换为值发射出去
  • 可以将异常忽略,或用日志打印,或使用一些其他代码处理它
  • 仅捕获上游异常
1
2
3
4
5
6
7
8
9
10
11
12
13
private fun catchFlowException() = runBlocking {
try {
flowListWithException()
.catch { e -> emit("Caught $e") } // 发射一个异常
.collect { value ->
check(value.length < 20) { "Collected $value" } // 仅捕获上游异常
println("Collect $value")
}
} catch (e: Exception) {
// 捕获下游的异常
println("Collect Exception: $e")
}
}
声明式捕获 onEach

我们可以将 catch 操作符的声明性与处理所有异常的期望相结合,将 collect 操作符的代码块移动到 onEach 中,并将其放到 catch 操作符之前。收集该流必须由调用无参的 collect() 来触发:

1
2
3
4
5
6
7
8
9
private fun catchFlowExceptionOnEach() = runBlocking {
flowListWithException()
.onEach {
check(it.length > 10) { "Collected $it" }
println(it)
}
.catch { println("Caught $it") }
.collect()
}
完成操作符

当流收集完成时(普通情况或异常情况),它可能需要执行一个动作。我们可以使用命令式或声明式在流完成时做一些操作。

命令式 finally 块
1
2
3
4
5
6
7
private fun flowWithFinally() = runBlocking {
try {
flowList().collect { println("Collected $it") }
} finally {
println("Done")
}
}
声明式处理 onCompletion

我们可以使用 onCompletion 操作符在流完成收集时进行调用:

1
2
3
4
5
private fun flowOnCompletion() = runBlocking {
flowList()
.onCompletion { println("Done") }
.collect { println("Collected $it") }
}

上面的输出和使用 finally 代码块的输出一致,除此之外,如果流完成时抛出了异常我们还可以通过 onCompletion 中的可空参数进行捕捉:

1
2
3
4
5
6
private fun flowOnCompletionWithUpStreamException() = runBlocking {
flowListWithException()
.onCompletion { cause -> cause?.let { println("Flow completed but has exceptions: $it") } }
.catch { println("Exception: $it") }
.collect { println("Collected $it") }
}

不过,onCompletion 不会对异常进行处理,而是交由后面的 catch 操作符进行处理。并且与 catch 操作符不同,收集时抛出的异常在 onCompletion 也会感知到:

1
2
3
4
5
6
7
8
9
10
private fun flowOnCompletionWithDownStreamException() = runBlocking {
flowList()
// onCompletion 能观察到所有的异常,包括下游收集时抛出的异常
.onCompletion { cause -> cause?.let { println("Flow completed but has exceptions: $it") } }
.catch { println("Exception: $it") }
.collect {
check(it > 1) { "Illegal value: $it" }
println("Collected $it")
}
}

选择命令式或者声明式对异常和流完成进行处理,取决于我们的需求和喜好,两种方式都是有效的。

启动操作符

我们可以使用 launchIn 替换 collect,在单独的协程中启动流的收集。我们需要通过指定参数 CoroutineScope 用以确定哪一个协程来启动流的收集。

1
2
3
4
5
6
private fun flowWithLaunchIn() = runBlocking {
flowList()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- 在单独的协程中执行流
println("Done")
}

在实际的应用中,作用域来自于一个寿命有限的实体。在该实体的寿命终止后,相应的作用域就会被取消,即取消相应流的收集。这种成对的 onEach { ... }.launchIn(scope) 工作方式就像 addEventListener 一样。而且,这不需要相应的 removeEventListener 函数, 因为取消与结构化并发可以达成这个目的。

另外,launchIn 也会返回一个 Job,可以在不取消整个作用域的情况下仅取消相应的流收集或对其进行 join

可取消的流

使用 flow { ... } 创建的流会对每个发射值执行附加的 ensureActive 检测以进行取消,但是大多数其它流(比如 asFlow())都不会自行执行取消检测,不过我们可以在 onEach 中对添加 currentCoroutineContext().ensureActive() 或者使用 cancellable 操作符:

1
2
3
4
5
6
private fun makeFlowCancellable() = runBlocking {
(1..5).asFlow().cancellable().collect { value ->
if (value == 3) cancel()
println(value)
}
}

Flow 与 Rx

熟悉 RxJava 的人会觉得 Flow 非常相似,这是因为 Flow 的设计灵感正是来源于响应式流及其各种实现。虽然略有不同,但从概念上讲,Flow 依然是 响应式流。

通道

通道提供了一种在流中传输值的方法。

通道基础

一个 Channel 是一个和 BlockingQueue 非常相似的概念。其中一个不同是它代替了阻塞的 put 操作并提供了挂起的 send,还替代了阻塞的 take 操作并提供了挂起的 receive

1
2
3
4
5
6
7
8
9
10
private fun headFirstChannel() = runBlocking {
val channel = Channel<Int>() // 实现了 SendChannel 和 ReceiveChannel
launch {
// 这里可能是消耗大量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平方并发送
for (x in 1..5) channel.send(x * x)
}
// 这里我们打印了 5 次被接收的整数:
repeat(5) { println(channel.receive()) }
println("Done!")
}
通道的关闭与迭代

和队列不同,一个通道可以通过被关闭来表明没有更多的元素将会进入通道。在接收者中可以使用 for 循环来从通道中接收元素:

1
2
3
4
5
6
7
8
9
private fun closeAndIterateChannel() = runBlocking {
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close()
}
for (c in channel) println(c)
println("Done!")
}
构建通道生产者

使用生产者-消费者模式对通道进行创建和使用,需要注意这些接口是实验性的:

1
2
3
4
5
6
private fun produceAndConsumeChannel() = runBlocking {
produce {
for (x in 1..5) send(x * x)
}.consumeEach { println(it) }
println("Done!")
}

管道

管道是指在一个协程中创建拥有无穷多个值的流。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private fun infiniteNumberPipeline() = runBlocking {
val numbers = produce {
var x = 1
while (true) send(x++)
}
val squares = produce {
for (x in numbers) send(x * x)
}
repeat(10) {
print("${squares.receive()}, ")
}
println("Done!")

// 获取 coroutineContext 取消所有后续的 job
coroutineContext.cancelChildren()
}

扇出

多个协程也许会接收相同的管道,它们之间可以进行分布式的工作。

1
2
3
4
5
6
private fun panOut() = runBlocking {
val producer = infiniteDelayedNumbers(1)
repeat(5) { launchProcessors(it, producer) }
delay(950) // 等待一会儿
producer.cancel() // 取消 channel
}

扇入

多个协程可以发送到同一个通道。 比如说,让我们创建一个字符串的通道,并且在这个通道中以指定的延迟反复发送一个字符串:

1
2
3
4
5
6
7
private fun panIn() = runBlocking {
val channel = Channel<String>()
launch { sendString(channel, "Foo", 200) }
launch { sendString(channel, "Bar", 500) }
repeat(6) { println(channel.receive()) }
coroutineContext.cancelChildren()
}

带缓冲的通道

到目前为止展示的通道都是没有缓冲区的。无缓冲的通道在发送者和接收者相遇时才传输元素,也叫对接。如果发送先被调用,则它将被挂起直到接收被调用;如果接收先被调用,则它将被挂起直到发送被调用。

Channel 构造器与 produce 建造器通过一个可选的参数 capacity 来指定缓冲区的大小。缓冲允许发送者在被挂起前发送多个元素, 就像 BlockingQueue 有指定的容量一样,当缓冲区被占满的时候将会引起阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
private fun bufferedChannels() = runBlocking {
val channel = Channel<Int>(4)
val sender = launch {
repeat(10) {
println("Sending $it")
channel.send(it)
}
}
// 不对其进行接收,只是等待
delay(1000)
sender.cancel()
}

通道是公平的

发送和接收操作是公平的 并且尊重调用它们的多个协程。它们遵守先进先出原则,可以看到第一个协程调用 receive 并得到了元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private fun channelIsSequential() = runBlocking {
data class Ball(var hits: Int)

suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // 在循环中接收球
ball.hits++
println("$name $ball")
delay(300) // 等待一段时间
table.send(ball) // 将球发送回去
}
}

val table = Channel<Ball>()
// 先启动的协程先接收到事件
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0))
delay(2000)
coroutineContext.cancelChildren()
}

计时器通道

计时器通道是一种特别的会合通道 (ReceiveChannel),每次经过特定的延迟都会从该通道进行消费并产生 Unit。虽然它看起来似乎没用,它被用来构建分段来创建复杂的基于时间的 produce 管道和进行窗口化操作以及其它时间相关的处理。可以在 select 中使用计时器通道来进行“打勾”操作。

使用工厂方法 ticker 来创建这些通道,使用 ReceiveChannel.cancel 方法关闭通道。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private fun tickerChannel() = runBlocking {
val tickerChannel = ticker(delayMillis = 1000, initialDelayMillis = 0)
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement")

nextElement = withTimeoutOrNull(500) { tickerChannel.receive() }
println("Next element is not available in 500ms: $nextElement")

nextElement = withTimeoutOrNull(600) { tickerChannel.receive() }
println("Next element is available in 1100ms: $nextElement")

println("Consumer pauses for 1500ms")
delay(1500)

nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available after delay: $nextElement")

nextElement = withTimeoutOrNull(600) { tickerChannel.receive() }
println("Next element is available sooner because of the previous delay: $nextElement")

tickerChannel.cancel()
}

异常处理与监督

我们已经知道被取消的协程会在挂起点抛出 CancellationException 并且它会被协程的机制所忽略。在这里我们会看看在取消过程中抛出异常或同一个协程的多个子协程抛出异常时会发生什么。

异常的传播

协程构建器有两种形式:自动传播异常(launchactor)或向用户暴露异常(asyncproduce)。当这些构建器用于创建一个 协程时,即该协程不是另一个协程的 协程,前者这类构建器将异常视为未捕获异常,类似 Java 的 Thread.uncaughtExceptionHandler,而后者则依赖用户来最终消费异常,例如通过 awaitreceive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private fun exceptionPropagation() = runBlocking {
val job = GlobalScope.launch { // launch 自动传播异常
println("Throwing exception from launch")
throw IndexOutOfBoundsException()
}
job.join()
println("Joined failed job")
val deferred = GlobalScope.async { // async 用户调用处消费异常
println("Throwing exception from async")
throw ArithmeticException() // 没有打印任何东西,依赖用户去调用等待
}
try {
deferred.await()
println("Unreached")
} catch (e: ArithmeticException) {
println("Caught ArithmeticException")
}
}

CoroutineExceptionHandler

未捕获异常打印到控制台的默认行为是可自定义的。 协程中的 CoroutineExceptionHandler 上下文元素可以被用于这个根协程通用的 catch 块,及其所有可能自定义了异常处理的子协程。

它类似于 Thread.uncaughtExceptionHandler 。你无法从 CoroutineExceptionHandler 的异常中恢复。当调用处理者的时候,协程已经完成并带有相应的异常。通常,该处理者用于记录异常,显示某种错误消息,终止和(或)重新启动应用程序。

取消与异常

取消与异常紧密相关。协程内部使用 CancellationException 来进行取消,这个异常会被所有的处理者忽略,所以那些可以被 catch 代码块捕获的异常仅仅应该被用来作为额外调试信息的资源。当一个协程使用 Job.cancel 取消的时候,它会被终止,但是它不会取消它的父协程。

异常聚合

当协程的多个子协程因异常而失败时,一般规则是“取第一个异常”,因此将处理第一个异常。在第一个异常之后发生的所有其他异常都作为被抑制的异常绑定至第一个异常。

监督

取消是在协程的整个层次结构中传播的双向关系。让我们看一下需要单向取消的情况。

此类需求的一个良好示例是在其作用域内定义作业的 UI 组件。如果任何一个 UI 的子作业执行失败了,它并不总是有必要取消(有效地杀死)整个 UI 组件, 但是如果 UI 组件被销毁了(并且它的作业也被取消了),由于它的结果不再被需要了,它有必要使所有的子作业执行失败。

另一个例子是服务进程孵化了一些子作业并且需要监督 它们的执行,追踪它们的故障并在这些子作业执行失败的时候重启。

监督作业

SupervisorJob 可以用于这些目的。它类似于常规的 Job,唯一的不同是:SupervisorJob 的取消只会向下传播:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private fun supervisorJobExample() = runBlocking {
val supervisor = SupervisorJob()
with(CoroutineScope(coroutineContext + supervisor)) {
// 启动第一个子作业——这个示例将会忽略它的异常(不要在实践中这么做!)
val firstChild = launch(CoroutineExceptionHandler { _, _ -> }) {
println("The first child is failing")
throw AssertionError("The first child is cancelled")
}
// 启动第二个子作业
val secondChild = launch {
firstChild.join()
// 取消了第一个子作业且没有传播给第二个子作业
println("The first child is cancelled: ${firstChild.isCancelled}, but the second one is still active")
try {
delay(Long.MAX_VALUE)
} finally {
// 但是取消了监督的传播
println("The second child is cancelled because the supervisor was cancelled")
}
}
// 等待直到第一个子作业失败且执行完成
firstChild.join()
println("Cancelling the supervisor")
supervisor.cancel()
secondChild.join()
}
}
监督作用域

对于作用域 的并发,可以用 supervisorScope 来替代 coroutineScope 来实现相同的目的。它只会单向的传播并且当作业自身执行失败的时候将所有子作业全部取消。作业自身也会在所有的子作业执行结束前等待,就像 coroutineScope 所做的那样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private fun supervisorScopeExample() = runBlocking {
try {
supervisorScope {
val child = launch {
try {
println("The child is sleeping")
delay(Long.MAX_VALUE)
} finally {
println("The child is cancelled")
}
}
// 使用 yield 来给我们的子作业一个机会来执行打印
yield()
println("Throwing an exception from the scope")
throw AssertionError()
}
} catch (e: AssertionError) {
println("Caught an assertion error")
}
}
监督协程中的异常

常规的作业和监督作业之间的另一个重要区别是异常处理。监督协程中的每一个子作业应该通过异常处理机制处理自身的异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
private fun coroutineExceptionHandler() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception")
}
supervisorScope {
val child = launch(handler) {
println("The child throws an exception")
throw AssertionError()
}
println("The scope is completing")
}
println("The scope is completed")
}

共享的可变状态与并发

协程可被多线程调度器并发地执行,这会造成常见的并发问题。主要的问题是同步访问共享的可变状态

线程安全的数据结构

一种对线程、协程都有效的常规解决方法,就是使用线程安全(也称为同步的、 可线性化、原子)的数据结构,它为需要在共享状态上执行的相应操作提供所有必需的同步处理。比如使用 AtomicInteger 类代替 Int 类。

以细粒度限制线程

限制线程 是解决共享可变状态问题的一种方案:对特定共享状态的所有访问权都限制在单个线程中。它通常应用于 UI 程序中:所有 UI 状态都局限于单个事件分发线程或应用主线程中。这在协程中很容易实现,通过使用一个单线程上下文。

以粗粒度限制线程

在实践中,线程限制是在大段代码中执行的,比如在单线程上下文中运行每个协程。

1
2
3
4
5
6
7
8
private fun massiveRunExecutorCoarseGrained() = runBlocking {
withContext(coroutineContext) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}

互斥

另外我们还可以使用互斥解决方案:使用永远不会同时执行的关键代码块 来保护共享状态的所有修改。

在阻塞的世界中,你通常会为此目的使用 synchronized 或者 ReentrantLock。 在协程中的替代品叫做 Mutex,它具有 lockunlock 方法,可以隔离关键的部分。关键的区别在于 Mutex.lock() 是一个挂起函数,它不会阻塞线程。

还有 withLock 扩展函数,可以方便的替代常用的 mutex.lock(); try { …… } finally { mutex.unlock() } 模式。