RxJava Observer与Subscriber

结论

建议每次subscribe()都new一个新的Subscriber或者是使用Observer,也就是不要重复使用mSubscriber。

为什么

Subscriber是Observer的实现类,Observer 和 Subscriber 具有相同的角色,而且 Observer 在 subscribe() 过程中最终会被转换成 Subscriber 对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
if (observer == null) {
throw new NullPointerException("observer is null");
}
return subscribe(new ObserverSubscriber<T>(observer));
}
-------------------------------------------------
public final class ObserverSubscriber<T> extends Subscriber<T> {
final Observer<? super T> observer;
public ObserverSubscriber(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
observer.onNext(t);
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onCompleted() {
observer.onCompleted();
}
}

Subscriber和Observer作为成员变量,比如 mSubscriber, mObserver传入 subscribe() 会有所区别。mSubscriber 传入之后,会被赋值成SafeSubscriber实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class SafeSubscriber<T> extends Subscriber<T> {
private final Subscriber<? super T> actual;
boolean done;
public SafeSubscriber(Subscriber<? super T> actual) {
super(actual);
this.actual = actual;
}
@Override
public void onCompleted() {
if (!done) {
done = true;
try {
actual.onCompleted();
} catch (Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwIfFatal(e);
RxJavaHooks.onError(e);
throw new OnCompletedFailedException(e.getMessage(), e);
} finally { // NOPMD
try {
// Similarly to onError if failure occurs in unsubscribe then Rx contract is broken
// and we throw an UnsubscribeFailureException.
unsubscribe();
} catch (Throwable e) {
RxJavaHooks.onError(e);
throw new UnsubscribeFailedException(e.getMessage(), e);
}
}
}
}
@Override
public void onError(Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwIfFatal(e);
if (!done) {
done = true;
_onError(e);
}
}
@Override
public void onNext(T args) {
try {
if (!done) {
actual.onNext(args);
}
} catch (Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwOrReport(e, this);
}
}
...
...
}

通过SafeSubscriber源码可知 onCompleted 执行后会给成员变量 done 赋值true,并自动unsubscribe(),
通过成员变量 done 判断是否执行 onNext(),onError()等,所以subscribe()传入mSubscriber只有第一次能正常使用。传入mObserver则没有这个问题,它每次都会自动创建新的Subscriber来包装它一次。如果使用subscribe(new Subscriber< T>(){})方式实现订阅,也不会出现上面的问题。

分享到 评论