原文:http://reactivex.io/documentation/observable.html
对译文有建议,请发邮件或下方评论给我,万分感谢。
更新历史:1月13日
在RxJava中一个对象实现了Observer接口则被一个Observable类对象所订阅。订阅者则针对Obserable对象产生的值(包括项及项的序列)做响应。这种模式便利了异步操作,因为不用在等待Obserable产生对象时进行阻塞,它以订阅者的形式创建了一个哨兵,用于在Observable在将来的时间产生任何输出时提供合适的响应。
这篇文章解释什么是响应式模式以及什么是Observable与观察者(观察者怎样对Observable进行订阅)。
文档中的解释将采用”marble diagrams”的形式。下图说明”marble diagrams”怎样展示Observable以及Observable之间的转换。
背景
在许多软件编程任务中,你或多或少都希望自己编写的代码能够逐步的运行完成,因为你是一个一个的写下来的。但是在响应式编程范式中,许多代码并行的执行,其结果将在后来被观察者以任意的顺序捕获下来。在这种情况下,你不是调用一个方法,而是以Observable的形式为获取和转换数据定义一种机制,并将Observable订阅给订阅者。在预置的机制下当Observable产生的值到达时,观察者的哨兵捕获并对此进行响应。
这种解决方案的优点在于,当你有许多没有相互依赖的任务需要运行时,你可以在同一时刻启动它们,而不是需要在一个任务开始之前,等待其中的一个任务的结束。这样,你执行所有任务的所花费的时间只是其中耗时最长的任务的时间。
有许多种形式来描述这种异步编程和设计的模型。这篇文档将使用下面的形式:Subscriber(有时是Observer)订阅Observable类对象。也就是Subscriber对象订阅Observable。Observable产生值,并通过调用Subscriber的方法来发送通知给Subscriber。
在其他文档或者上下文,有时我们也会将Subscriber称为watcher或reactor。这个模型通常被被认为是响应模式。
建立订阅者
这篇文档通常使用Groovy来做代码示例,实际上,你可以在任何基于JVM的语言上使用RxJava,如Clojure,Scala,JRuby或是Java本身。
与典型的响应式编程中无序的异步、并行不同,在传统的方法调用中,流程一般是这样的:
- 调用一个方法。
- 将方法的返回值保存在一个变量中。
- 使用这个变量以及它的新值做一些有用的事情。
或是用代码表示:
1 | // make the call, assign its return value to `returnVal` |
在异步模型中,流程则更像是这样:
- 定义一个方法来使用异步调用的返回值进行处理,这个方法是Subscriber的一部分。
- 用一个Observable类对象来定义异步调用本身。
- 通过订阅来将Subscriber关联Observable(这也同时初始化方法调用)。
- 继续你的业务逻辑;无论方法调用何时返回,Subscriber的方法开始对Observable所产生的项(返回值)进行操作。
用代码表示是这样:
1 | // defines, but does not invoke, the Subscriber's onNext handler |
onNext,onCompleted,以及onError
subscribe()
方法可以接受1-3个方法,或者是一个Subscriber
对象,或是任何实现了Observer
接口(包含了这3个方法)的对象:
onNext:当Observable产生了一个值时,Observable将调用它的Subscriber上的这个方法。这个方法将Observable所产生的值作为它的参数。
onError:当Observable无法产生所预期的数据或是遇到了其他一些错误,Observable将调用它的Subscriber上的这个方法。这会使Observable停止,且不再掉调用onNext
和onCompleted
。onError
将产生错误的误差指示作为它的参数。
onCompleted:在没有发生任何错误的情形下,Observable将在最后一次调用onNext
之后调用其观察者的这个方法。
一个更加完整的subscribe()
示例如下所示:
1 | def myOnNext = { item -> /* do something useful with item */ }; |
Unsubscribing(退订)
在一些响应式扩展实现中,有专门的观察者接口,Subscriber
来实现unsubscribe()
方法。你可以调用这个方法来表明Subscriber不再对当前订阅的任何Observable感兴趣了。当不再有其他对其感兴趣的Observer,这些Observable可以选择停止产生新的值。
这个退订的结果会级联的影响到Observer所订阅的Observable上的操作链,这将导致操作链上的每个链接都停止产生项目。但这个并无法保证立即发生,当不再有Subscriber关注这些产生的值时,在短时间内,Observable可能仍然会产生新的值。
命名规范的注意事项
每一种特定语言的响应式扩展的实现都有自己的命名怪癖。这之间没有统一的命名标准,但在每个实现之间有许多的共性。
此外,在某些上下文中这些名称具有不同的含义,或者在一些特定的语言实现中显得尴尬。
举个例子:存在onEvent
的命名模式(如:onNext
, onCompleted
, onError
)。在许多上下文中,这些命名表明的是被注册的事件处理器上的方法。在ReactiveX中,这些命名则表示事件处理器本身。
“热”和”冷”Observable
Observable什么时候开始产生它的值序列呢?这个依赖于Observable。一个”热”的Observable将在它被新建之后便开始产生值,因此,任何订阅这种Observable的Observer将从这些序列的中间开始观察。另一方面,一个”冷”的Observable在开发产生值之前,会一直等待Observer订阅它。这样可以保证一个Observer看到从头开始的完整序列。
在响应式扩展的某些实现中,存在称为Connectable的Observable。这样的Observable直到它的connect
方法被调用才开始产生值,不能是否有Observer已经订阅它。
组装Observable Operator
Observable和Observer仅仅是响应式扩展的开始。它是标准观察者模式的轻量扩展,比起一个简单的回调,它更适合与处理事件序列。
响应式扩展的真正力量在于,所有的Operator可以被变换,组合,操作,并和Observable所产生的值序列一起工作。
这些响应式扩展操作能让你通过利用所有回调的优点,以声明的方式将异步序列组织在一起。同时避免传统异步编程系统中的嵌套回调处理。
该文章将不同的操作及他们的示例用户组织在如下的页面中:
- Creating创建
- Transforming变换
- Filtering过滤
- Combining组合
- Error Handling错误处理
- Utility工具
- Conditional and Boolean条件与布尔
- Mathematical and Aggregate
- Backpressure
- Connectable Observables
- Blocking Observables
这些页面所包含的信息一些操作并不是响应式扩展的核心组成部分,但却是一些特定语言的实现或是可选模块。