
Retrofit 官方支持返回 Observable 或 Flowable,是处理网络请求的黄金搭档。
public interface ApiService {
@GET("users/{id}")
Observable<User> getUser(@Path("id") int id);
}
// 使用
apiService.getUser(123)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
user -> { /* 处理成功 */ },
error -> { /* 处理错误 */}
);将 RxJava 与 MVP 或 MVVM 架构结合使用,是现代 Android(以及 Java Swing)开发中的最佳实践之一。这种结合能充分发挥各自的优势:架构模式负责清晰的职责分离和可测试性,而 RxJava 则优雅地处理异步、事件流和复杂的线程切换。
MVP 的核心思想是将 Activity/Fragment (View) 从繁重的业务逻辑中解放出来,使其只负责 UI 的展示和用户交互的传递。Presenter 作为中间层,负责处理业务逻辑并与 Model 层交互。
// 1. 定义 View 接口
public interface UserView {
void showLoading();
void hideLoading();
void showUsers(List<User> users);
void showError(String message);
}
// 2. Model 层 (返回 RxJava 数据流)
public class UserModel {
private ApiService apiService; // 假设使用 Retrofit
public Observable<List<User>> getUsers() {
return apiService.getUsers() // 返回 Observable<List<User>>
.subscribeOn(Schedulers.io()); // 在 IO 线程执行网络请求
}
}
// 3. Presenter 层 (核心: 处理 RxJava 链)
public class UserPresenter {
private UserView view;
private UserModel model;
private CompositeDisposable disposables = new CompositeDisposable();
public UserPresenter(UserView view, UserModel model) {
this.view = view;
this.model = model;
}
public void loadUsers() {
view.showLoading();
// 建立 RxJava 订阅
disposables.add(
model.getUsers()
.observeOn(AndroidSchedulers.mainThread()) // 切换到主线程更新 UI
.subscribe(
users -> {
view.hideLoading();
view.showUsers(users); // 成功: 更新 UI
},
error -> {
view.hideLoading();
view.showError("加载失败: " + error.getMessage()); // 失败: 显示错误
}
)
);
}
// 在 Activity/Fragment onDestroy 时调用,防止内存泄漏
public void onDestroy() {
disposables.clear();
}
}
// 4. View 层 (Activity/Fragment)
public class UserActivity extends AppCompatActivity implements UserView {
private UserPresenter presenter;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_user);
// 初始化 Presenter
presenter = new UserPresenter(this, new UserModel());
// 触发加载
presenter.loadUsers();
}
@Override
protected void onDestroy() {
super.onDestroy();
presenter.onDestroy(); // 关键: 取消所有订阅
}
// 实现 View 接口方法
@Override
public void showLoading() {
// 显示进度条
}
@Override
public void hideLoading() {
// 隐藏进度条
}
@Override
public void showUsers(List<User> users) {
// 更新 RecyclerView 或 ListView
}
@Override
public void showError(String message) {
// 弹出 Toast 或 Snackbar
}
}MVVM 通过 数据绑定 (Data Binding) 或 LiveData/StateFlow 将 View 与 ViewModel 解耦。View 通过观察 ViewModel 中的数据变化来自动更新 UI,ViewModel 则负责准备和管理这些数据。
RxJava 可以作为 ViewModel 内部处理异步数据流的强大工具,最终将结果暴露给 View。由于 Android 官方推荐在 MVVM 中使用 LiveData,而 LiveData 本身不是响应式流,我们通常使用 LiveDataReactiveStreams 工具类进行桥接。
// 1. Model 层 (同 MVP)
public class UserModel {
public Observable<List<User>> getUsers() {
return apiService.getUsers()
.subscribeOn(Schedulers.io());
}
}
// 2. ViewModel 层 (核心: 使用 RxJava 处理逻辑,输出 LiveData)
public class UserViewModel extends ViewModel {
private UserModel model;
// 暴露给 View 的 LiveData
private MutableLiveData<List<User>> usersLiveData = new MutableLiveData<>();
private MutableLiveData<Boolean> loadingLiveData = new MutableLiveData<>();
private MutableLiveData<String> errorLiveData = new MutableLiveData<>();
// 提供 LiveData 给 View 观察
public LiveData<List<User>> getUsers() { return usersLiveData; }
public LiveData<Boolean> isLoading() { return loadingLiveData; }
public LiveData<String> getError() { return errorLiveData; }
public UserViewModel(UserModel model) {
this.model = model;
}
public void loadUsers() {
loadingLiveData.setValue(true);
// 将 RxJava Observable 转换为 LiveData
LiveData<List<User>> liveData = LiveDataReactiveStreams.fromPublisher(
model.getUsers()
.toFlowable(BackpressureStrategy.LATEST) // 转换为 Flowable 以支持背压
.observeOn(AndroidSchedulers.mainThread()) // 确保在主线程发射
);
// 订阅这个 LiveData
liveData.observeForever(users -> {
loadingLiveData.setValue(false);
usersLiveData.setValue(users);
});
// 你也可以直接订阅 Observable,并手动设置 LiveData
/*
model.getUsers()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
users -> {
loadingLiveData.setValue(false);
usersLiveData.setValue(users);
},
error -> {
loadingLiveData.setValue(false);
errorLiveData.setValue(error.getMessage());
}
);
*/
}
@Override
protected void onCleared() {
super.onCleared();
// ViewModel 被销毁时,LiveDataReactiveStreams 会自动取消订阅
// 如果是手动 subscribe,需要在此处管理 Disposable
}
}
// 3. View 层 (Activity/Fragment)
public class UserActivity extends AppCompatActivity {
private UserViewModel viewModel;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_user);
viewModel = new ViewModelProvider(this).get(UserViewModel.class);
// 观察数据变化
viewModel.getUsers().observe(this, users -> {
// 更新 UI,例如设置 Adapter
});
viewModel.isLoading().observe(this, isLoading -> {
if (isLoading) {
// 显示加载框
} else {
// 隐藏加载框
}
});
viewModel.getError().observe(this, errorMsg -> {
if (errorMsg != null) {
// 显示错误信息
}
});
// 触发加载
viewModel.loadUsers();
}
}利用 PublishSubject 或 BehaviorSubject 实现组件间的解耦通信。
public class RxBus {
private final PublishSubject<Object> bus = PublishSubject.create();
public void post(Object event) {
bus.onNext(event);
}
public <T> Observable<T> toObservable(Class<T> eventType) {
return bus.ofType(eventType);
}
}
// 发送事件
RxBus.getInstance().post(new UserLoginEvent("Alice"));
// 接收事件
RxBus.getInstance().toObservable(UserLoginEvent.class)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(event -> {
// 更新 UI
});• 全局错误处理: 使用 RxJavaPlugins.setErrorHandler(…) 设置全局错误处理器。 • CompositeDisposable: 管理多个 Disposable,在 Activity/Fragment 销毁时统一取消,避免内存泄漏。
private CompositeDisposable disposables = new CompositeDisposable();
// 添加订阅
disposables.add(apiService.getData().subscribe(...));
// 在 onDestroy 中清理
@Override
protected void onDestroy() {
super
.onDestroy();
disposables.clear(); // 取消所有订阅
}RxJava 通过组合操作符(如 zip、merge、concat、combineLatest)实现多任务并行或串行执行,并合并结果。以下是典型场景与实现:
场景:同时发起多个网络请求(如获取用户信息和订单列表),待所有请求完成后统一处理结果。
Observable<User> userObservable = api.getUser();
Observable<OrderList> orderObservable = api.getOrders();
Observable.zip(userObservable, orderObservable,
(user, orders) -> new UserOrderResult(user, orders))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
result -> showUserOrders(result),
error -> handleError(error)
);关键点:
zip 按顺序组合多个 Observable 的数据,生成新的数据项。 发射数据量以最慢的 Observable 为准,超出的数据会被丢弃。
场景:依次执行多个任务(如先登录再获取数据),前一个任务失败则终止后续任务。 实现:
Observable.concat(
api.login(),
api.getData()
).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
data -> processData(data),
error -> handleError(error)
);关键点:
concat 严格按顺序执行,前一个 Observable 完成后才会订阅下一个。 适合需要严格依赖关系的任务链。
场景:合并多个动态数据源(如实时股票价格和用户操作日志),不保证顺序。 实现:
Observable<StockPrice> priceObservable = api.getStockPrices();
Observable<UserAction> actionObservable = api.getUserActions();
Observable.merge(priceObservable, actionObservable)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
event -> logEvent(event),
error -> handleError(error)
);关键点:
merge 合并多个 Observable 的数据,按时间顺序发射,可能交错。 适合无严格顺序要求的实时数据流。
场景:当两个输入框内容变化时,实时验证表单(如密码和确认密码是否一致)。 实现:
Observable<String> passwordObservable = RxTextView.textChanges(passwordEditText)
.skipInitialValue()
.map(CharSequence::toString);
Observable<String> confirmPasswordObservable = RxTextView.textChanges(confirmPasswordEditText)
.skipInitialValue()
.map(CharSequence::toString);
Observable.combineLatest(
passwordObservable,
confirmPasswordObservable,
(password, confirmPassword) -> password.equals(confirmPassword)
).subscribe(
isValid -> showValidationResult(isValid),
error -> handleError(error)
);关键点:
combineLatest 在任意一个源 Observable 发射新数据时,组合所有源的最新数据。 适合需要基于多个输入实时计算的场景。
RxView.clicks(submitButton)
.throttleFirst(1000, TimeUnit.MILLISECONDS) // 1秒内仅允许一次点击
.flatMap(voidEvent -> api.submitData(data))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
success -> showSuccess(),
error -> handleError(error)
);