结论
建议每次subscribe()都new一个新的Subscriber或者是使用Observer,也就是不要重复使用mSubscriber。
为什么
Subscriber是Observer的实现类,Observer 和 Subscriber 具有相同的角色,而且 Observer 在 subscribe() 过程中最终会被转换成 Subscriber 对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public final Subscription subscribe(final Observer<? super T> observer) { if (observer instanceof Subscriber) { return subscribe((Subscriber<? super T>)observer); } if (observer == null) { throw new NullPointerException("observer is null"); } return subscribe(new ObserverSubscriber<T>(observer)); } ------------------------------------------------- public final class ObserverSubscriber<T> extends Subscriber<T> { final Observer<? super T> observer; public ObserverSubscriber(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { observer.onNext(t); } @Override public void onError(Throwable e) { observer.onError(e); } @Override public void onCompleted() { observer.onCompleted(); } }
|
Subscriber和Observer作为成员变量,比如 mSubscriber, mObserver传入 subscribe() 会有所区别。mSubscriber 传入之后,会被赋值成SafeSubscriber实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| public class SafeSubscriber<T> extends Subscriber<T> { private final Subscriber<? super T> actual; boolean done; public SafeSubscriber(Subscriber<? super T> actual) { super(actual); this.actual = actual; } @Override public void onCompleted() { if (!done) { done = true; try { actual.onCompleted(); } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaHooks.onError(e); throw new OnCompletedFailedException(e.getMessage(), e); } finally { try { unsubscribe(); } catch (Throwable e) { RxJavaHooks.onError(e); throw new UnsubscribeFailedException(e.getMessage(), e); } } } } @Override public void onError(Throwable e) { Exceptions.throwIfFatal(e); if (!done) { done = true; _onError(e); } } @Override public void onNext(T args) { try { if (!done) { actual.onNext(args); } } catch (Throwable e) { Exceptions.throwOrReport(e, this); } } ... ... }
|
通过SafeSubscriber源码可知 onCompleted 执行后会给成员变量 done 赋值true,并自动unsubscribe(),
通过成员变量 done 判断是否执行 onNext(),onError()等,所以subscribe()传入mSubscriber只有第一次能正常使用。传入mObserver则没有这个问题,它每次都会自动创建新的Subscriber来包装它一次。如果使用subscribe(new Subscriber< T>(){})方式实现订阅,也不会出现上面的问题。