深入浅出 RxJava (一、基础篇)

阿里云产品限时红包,最高 ¥1888 元,立即领取

RxJava 正在 Android 开发者中变的越来越流行。唯一的问题就是上手不容易,尤其是大部分人之前都是使用命令式编程语言。但是一旦你弄明白了,你就会发现 RxJava 真是太棒了。

这里仅仅是帮助你了解 RxJava,整个系列共有四篇文章,希望你看完这四篇文章之后能够了解 RxJava 背后的思想,并且喜欢上 RxJava。

基础

RxJava 最核心的两个东西是 Observables (被观察者,事件源)和 Subscribers (观察者)。 Observables 发出一系列事件,Subscribers 处理这些事件。这里的事件可以是任何你感兴趣的东西(触摸事件,web接口调用返回的数据。。。)

一个 Observable 可以发出零个或者多个事件,直到结束或者出错。每发出一个事件,就会调用它的 Subscriber 的 onNext 方法,最后调用 Subscriber.onNext() 或者 Subscriber.onError() 结束。

Rxjava 的看起来很像设计模式中的观察者模式,但是有一点明显不同,那就是如果一个 Observerble 没有任何的 Subscriber,那么这个 Observable 是不会发出任何事件的。

Hello World

创建一个 Observable 对象很简单,直接调用 Observable.create 即可。

1
2
3
4
5
6
7
8
9
Observable<String> myObservable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) {
sub.onNext("Hello, world!");
sub.onCompleted();
}
}
);

这里定义的 Observable 对象仅仅发出一个 Hello World 字符串,然后就结束了。接着我们创建一个 Subscriber 来处理 Observable 对象发出的字符串。

1
2
3
4
5
6
7
8
9
10
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onNext(String s) { System.out.println(s); }

@Override
public void onCompleted() { }

@Override
public void onError(Throwable e) { }
};

这里 subscriber 仅仅就是打印 observable 发出的字符串。通过 subscribe 函数就可以将我们定义的 myObservable 对象和 mySubscriber 对象关联起来,这样就完成了 subscriber 对 observable 的订阅。

1
myObservable.subscribe(mySubscriber);

一旦 mySubscriber 订阅了 myObservable, myObservable 就是调用 mySubscriber 对象的 onNext 和 onComplete 方法,mySubscriber 就会打印出 Hello World!

更简洁的代码

是不是觉得仅仅为了打印一个 hello world 要写这么多代码太啰嗦?我这里主要是为了展示 RxJava 背后的原理而采用了这种比较啰嗦的写法,RxJava 其实提供了很多便捷的函数来帮助我们减少代码。

首先来看看如何简化 Observable 对象的创建过程。RxJava 内置了很多简化创建 Observable 对象的函数,比如 Observable.just 就是用来创建只发出一个事件就结束的 Observable 对象,上面创建 Observable 对象的代码可以简化为一行。

1
Observable<String> myObservable = Observable.just("Hello, world!");

接下来看看如何简化 Subscriber,上面的例子中,我们其实并不关心 OnComplete 和 OnError,我们只需要在 onNext 的时候做一些处理,这时候就可以使用 Action1 类。

1
2
3
4
5
6
Action1<String> onNextAction = new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
};

subscribe 方法有一个重载版本,接受三个 Action1 类型的参数,分别对应 OnNext, OnComplete, OnError 函数。

1
myObservable.subscribe(onNextAction, onErrorAction, onCompleteAction);

这里我们并不关心 onError 和 onComplete,所以只需要第一个参数就可以

1
2
myObservable.subscribe(onNextAction);
// Outputs "Hello, world!"

上面的代码最终可以写成这样:

1
2
3
4
5
6
7
Observable.just("Hello, world!")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});

使用 java8 的 lambda 可以使代码更简洁。

1
2
Observable.just("Hello, world!")
.subscribe(s -> System.out.println(s));

Android 开发中,强烈推荐使用retrolambda这个 gradle 插件,这样你就可以在你的代码中使用 lambda 了。

变换

让我们做一些更有趣的事情吧!
比如我想在 hello world 中加上我的签名,你可能会想到去修改 Observable 对象:

1
2
Observable.just("Hello, world! -Dan")
.subscribe(s -> System.out.println(s));

如果你能够改变 Observable 对象,这当然是可以的,但是如果你不能修改 Observable 对象呢?比如 Observable 对象是第三方库提供的?比如我的 Observable 对象被多个 Subscriber 订阅,但是我只想在对某个订阅者做修改呢?
那么在 Subscriber 中对事件进行修改怎么样呢?比如下面的代码:

1
2
Observable.just("Hello, world!")
.subscribe(s -> System.out.println(s + " -Dan"));

这种方式仍然不能让人满意,因为我希望我的 Subscribers 越轻量越好,因为我有可能会在 mainThread 中运行 subscriber。另外,根据响应式函数编程的概念, Subscribers 更应该做的事情是“响应”,响应 Observable 发出的事件,而不是去修改。如果我能在某些中间步骤中对 “Hello World!” 进行变换是不是很酷?

操作符(Operators)

操作符就是为了解决对 Observable 对象的变换的问题,操作符用于在 Observable 和最终的 Subscriber 之间修改 Observable 发出的事件。RxJava 提供了很多很有用的操作符。

比如 map 操作符,就是用来把把一个事件转换为另一个事件的。

1
2
3
4
5
6
7
8
Observable.just("Hello, world!")
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s + " -Dan";
}
})
.subscribe(s -> System.out.println(s));

使用 lambda 可以简化为:

1
2
3
Observable.just("Hello, world!")
.map(s -> s + " -Dan")
.subscribe(s -> System.out.println(s));

是不是很酷? map() 操作符就是用于变换 Observable 对象的, map 操作符返回一个 Observable 对象,这样就可以实现链式调用,在一个 Observable 对象上多次使用 map 操作符,最终将最简洁的数据传递给 Subscriber 对象。

map操作符进阶

map 操作符更有趣的一点是它不必返回 Observable 对象返回的类型,你可以使用 map 操作符返回一个发出新的数据类型的 observable 对象。

比如上面的例子中,subscriber 并不关心返回的字符串,而是想要字符串的 hash 值。

1
2
3
4
5
6
7
8
Observable.just("Hello, world!")
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return s.hashCode();
}
})
.subscribe(i -> System.out.println(Integer.toString(i)));

很有趣吧?我们初始的 Observable 返回的是字符串,最终的 Subscriber 收到的却是 Integer,当然使用 lambda 可以进一步简化代码:

1
2
3
Observable.just("Hello, world!")
.map(s -> s.hashCode())
.subscribe(i -> System.out.println(Integer.toString(i)));

前面说过,Subscriber 做的事情越少越好,我们再增加一个 map 操作符:

1
2
3
4
Observable.just("Hello, world!")
.map(s -> s.hashCode())
.map(i -> Integer.toString(i))
.subscribe(s -> System.out.println(s));

不服?

是不是觉得我们的例子太简单,不足以说服你?你需要明白下面的两点:

1. Observable 和 Subscriber 可以做任何事情。

Observable 可以是一个数据库查询, Subscriber 用来显示查询结果; Observable 可以是屏幕上的点击事件, Subscriber 用来响应点击事件; Observable 可以是一个网络请求, Subscriber 用来显示请求结果。

2. Observable 和 Subscriber 是独立于中间的变换过程的。

在 Observable 和 Subscriber 中间可以增减任何数量的 map。整个系统是高度可组合的,操作数据是一个很简单的过程。

原文链接:Grokking RxJava, Part 1: The Basics
译文来源:深入浅出RxJava(一:基础篇)