首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >RxJava在Android中的应用

RxJava在Android中的应用

作者头像
木易士心
发布2025-11-29 11:18:22
发布2025-11-29 11:18:22
1690
举报
将 RxJava 融入到实际项目架构中,解决复杂问题。

1.1 与 Retrofit 结合

Retrofit 官方支持返回 Observable 或 Flowable,是处理网络请求的黄金搭档。

代码语言:javascript
复制
public interface ApiService {
    @GET("users/{id}")
    Observable<User> getUser(@Path("id") int id);
}

// 使用
apiService.getUser(123)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(
        user -> { /* 处理成功 */ },
        error -> { /* 处理错误 */}
    );

1.2 与 MVP/MVVM 架构结合

将 RxJava 与 MVP 或 MVVM 架构结合使用,是现代 Android(以及 Java Swing)开发中的最佳实践之一。这种结合能充分发挥各自的优势:架构模式负责清晰的职责分离和可测试性,而 RxJava 则优雅地处理异步、事件流和复杂的线程切换。

1.2.1 与 MVP (Model-View-Presenter) 结合

MVP 的核心思想是将 Activity/Fragment (View) 从繁重的业务逻辑中解放出来,使其只负责 UI 的展示和用户交互的传递。Presenter 作为中间层,负责处理业务逻辑并与 Model 层交互。

代码语言:javascript
复制
// 1. 定义 View 接口
public interface UserView {
    void showLoading();
    void hideLoading();
    void showUsers(List<User> users);
    void showError(String message);
}

// 2. Model 层 (返回 RxJava 数据流)
public class UserModel {
    private ApiService apiService; // 假设使用 Retrofit

    public Observable<List<User>> getUsers() {
        return apiService.getUsers() // 返回 Observable<List<User>>
            .subscribeOn(Schedulers.io()); // 在 IO 线程执行网络请求
    }
}

// 3. Presenter 层 (核心: 处理 RxJava 链)
public class UserPresenter {
    private UserView view;
    private UserModel model;
    private CompositeDisposable disposables = new CompositeDisposable();

    public UserPresenter(UserView view, UserModel model) {
        this.view = view;
        this.model = model;
    }

    public void loadUsers() {
        view.showLoading();

        // 建立 RxJava 订阅
        disposables.add(
            model.getUsers()
                .observeOn(AndroidSchedulers.mainThread()) // 切换到主线程更新 UI
                .subscribe(
                    users -> {
                        view.hideLoading();
                        view.showUsers(users); // 成功: 更新 UI
                    },
                    error -> {
                        view.hideLoading();
                        view.showError("加载失败: " + error.getMessage()); // 失败: 显示错误
                    }
                )
        );
    }

    // 在 Activity/Fragment onDestroy 时调用,防止内存泄漏
    public void onDestroy() {
        disposables.clear();
    }
}

// 4. View 层 (Activity/Fragment)
public class UserActivity extends AppCompatActivity implements UserView {
    private UserPresenter presenter;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_user);

        // 初始化 Presenter
        presenter = new UserPresenter(this, new UserModel());

        // 触发加载
        presenter.loadUsers();
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        presenter.onDestroy(); // 关键: 取消所有订阅
    }

    // 实现 View 接口方法
    @Override
    public void showLoading() {
        // 显示进度条
    }

    @Override
    public void hideLoading() {
        // 隐藏进度条
    }

    @Override
    public void showUsers(List<User> users) {
        // 更新 RecyclerView 或 ListView
    }

    @Override
    public void showError(String message) {
        // 弹出 Toast 或 Snackbar
    }
}
  • 优势: 职责清晰: View 只管展示,Presenter 处理逻辑和数据流。 易于测试: Presenter 不依赖 Android 组件,可以方便地进行单元测试。 异步处理优雅: RxJava 完美解决了网络请求、数据库操作等异步问题。
  • 挑战: 内存泄漏: 必须妥善管理 Disposable,在生命周期结束时取消订阅。 Presenter 膨胀: 如果业务逻辑复杂,Presenter 可能会变得非常庞大。可以通过引入 Interactor (或称 Use Case) 层来分担业务逻辑。
1.2.2 RxJava 与 MVVM (Model-View-ViewModel) 结合

MVVM 通过 数据绑定 (Data Binding) 或 LiveData/StateFlow 将 View 与 ViewModel 解耦。View 通过观察 ViewModel 中的数据变化来自动更新 UI,ViewModel 则负责准备和管理这些数据。

RxJava 可以作为 ViewModel 内部处理异步数据流的强大工具,最终将结果暴露给 View。由于 Android 官方推荐在 MVVM 中使用 LiveData,而 LiveData 本身不是响应式流,我们通常使用 LiveDataReactiveStreams 工具类进行桥接。

代码语言:javascript
复制
// 1. Model 层 (同 MVP)
public class UserModel {
    public Observable<List<User>> getUsers() {
        return apiService.getUsers()
            .subscribeOn(Schedulers.io());
    }
}

// 2. ViewModel 层 (核心: 使用 RxJava 处理逻辑,输出 LiveData)
public class UserViewModel extends ViewModel {
    private UserModel model;
    // 暴露给 View 的 LiveData
    private MutableLiveData<List<User>> usersLiveData = new MutableLiveData<>();
    private MutableLiveData<Boolean> loadingLiveData = new MutableLiveData<>();
    private MutableLiveData<String> errorLiveData = new MutableLiveData<>();

    // 提供 LiveData 给 View 观察
    public LiveData<List<User>> getUsers() { return usersLiveData; }
    public LiveData<Boolean> isLoading() { return loadingLiveData; }
    public LiveData<String> getError() { return errorLiveData; }

    public UserViewModel(UserModel model) {
        this.model = model;
    }

    public void loadUsers() {
        loadingLiveData.setValue(true);

        // 将 RxJava Observable 转换为 LiveData
        LiveData<List<User>> liveData = LiveDataReactiveStreams.fromPublisher(
            model.getUsers()
                .toFlowable(BackpressureStrategy.LATEST) // 转换为 Flowable 以支持背压
                .observeOn(AndroidSchedulers.mainThread()) // 确保在主线程发射
        );

        // 订阅这个 LiveData
        liveData.observeForever(users -> {
            loadingLiveData.setValue(false);
            usersLiveData.setValue(users);
        });

        // 你也可以直接订阅 Observable,并手动设置 LiveData
        /*
        model.getUsers()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                users -> {
                    loadingLiveData.setValue(false);
                    usersLiveData.setValue(users);
                },
                error -> {
                    loadingLiveData.setValue(false);
                    errorLiveData.setValue(error.getMessage());
                }
            );
        */
    }

    @Override
    protected void onCleared() {
        super.onCleared();
        // ViewModel 被销毁时,LiveDataReactiveStreams 会自动取消订阅
        // 如果是手动 subscribe,需要在此处管理 Disposable
    }
}

// 3. View 层 (Activity/Fragment)
public class UserActivity extends AppCompatActivity {
    private UserViewModel viewModel;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_user);

        viewModel = new ViewModelProvider(this).get(UserViewModel.class);

        // 观察数据变化
        viewModel.getUsers().observe(this, users -> {
            // 更新 UI,例如设置 Adapter
        });

        viewModel.isLoading().observe(this, isLoading -> {
            if (isLoading) {
                // 显示加载框
            } else {
                // 隐藏加载框
            }
        });

        viewModel.getError().observe(this, errorMsg -> {
            if (errorMsg != null) {
                // 显示错误信息
            }
        });

        // 触发加载
        viewModel.loadUsers();
    }
}
  • 优势: 解耦更彻底: View 通过观察数据变化来更新,无需主动调用方法。 生命周期感知: LiveData 和 StateFlow 能自动感知 Activity/Fragment 的生命周期,避免在非活跃状态下更新 UI。 数据驱动 UI: UI 的状态完全由数据决定,逻辑更清晰。
  • 挑战: 桥接成本: 需要将 RxJava 流转换为 LiveData 或 StateFlow,增加了复杂性。 学习曲线: 需要同时理解 MVVM、数据绑定和 RxJava。

1.3. RxBus (事件总线)

利用 PublishSubject 或 BehaviorSubject 实现组件间的解耦通信。

代码语言:javascript
复制
public class RxBus {
    private final PublishSubject<Object> bus = PublishSubject.create();

    public void post(Object event) {
        bus.onNext(event);
    }

    public <T> Observable<T> toObservable(Class<T> eventType) {
        return bus.ofType(eventType);
    }
}

// 发送事件
RxBus.getInstance().post(new UserLoginEvent("Alice"));

// 接收事件
RxBus.getInstance().toObservable(UserLoginEvent.class)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(event -> {
        // 更新 UI
    });

1.4. 错误处理与资源管理

• 全局错误处理: 使用 RxJavaPlugins.setErrorHandler(…) 设置全局错误处理器。 • CompositeDisposable: 管理多个 Disposable,在 Activity/Fragment 销毁时统一取消,避免内存泄漏。

代码语言:javascript
复制
private CompositeDisposable disposables = new CompositeDisposable();

// 添加订阅
disposables.add(apiService.getData().subscribe(...));

// 在 onDestroy 中清理
@Override
protected void onDestroy() {
    super
.onDestroy();
    disposables.clear(); // 取消所有订阅
}

1.5 复杂任务编排

RxJava 通过组合操作符(如 zip、merge、concat、combineLatest)实现多任务并行或串行执行,并合并结果。以下是典型场景与实现:

1.5.1 并行任务合并(zip 操作符)

场景:同时发起多个网络请求(如获取用户信息和订单列表),待所有请求完成后统一处理结果。

代码语言:javascript
复制
Observable<User> userObservable = api.getUser();
Observable<OrderList> orderObservable = api.getOrders();
 
Observable.zip(userObservable, orderObservable, 
    (user, orders) -> new UserOrderResult(user, orders))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(
        result -> showUserOrders(result),
        error -> handleError(error)
    );

关键点:

zip 按顺序组合多个 Observable 的数据,生成新的数据项。 发射数据量以最慢的 Observable 为准,超出的数据会被丢弃。

1.5.2. 顺序任务合并(concat 操作符)

场景:依次执行多个任务(如先登录再获取数据),前一个任务失败则终止后续任务。 实现:

代码语言:javascript
复制
Observable.concat(
    api.login(),
    api.getData()
).subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(
      data -> processData(data),
      error -> handleError(error)
  );

关键点:

concat 严格按顺序执行,前一个 Observable 完成后才会订阅下一个。 适合需要严格依赖关系的任务链。

1.5.3. 动态数据流合并(merge 操作符)

场景:合并多个动态数据源(如实时股票价格和用户操作日志),不保证顺序。 实现:

代码语言:javascript
复制
Observable<StockPrice> priceObservable = api.getStockPrices();
Observable<UserAction> actionObservable = api.getUserActions();
 
Observable.merge(priceObservable, actionObservable)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(
        event -> logEvent(event),
        error -> handleError(error)
    );

关键点:

merge 合并多个 Observable 的数据,按时间顺序发射,可能交错。 适合无严格顺序要求的实时数据流。

1.5.4. 最新数据合并(combineLatest 操作符)

场景:当两个输入框内容变化时,实时验证表单(如密码和确认密码是否一致)。 实现:

代码语言:javascript
复制
Observable<String> passwordObservable = RxTextView.textChanges(passwordEditText)
    .skipInitialValue()
    .map(CharSequence::toString);
 
Observable<String> confirmPasswordObservable = RxTextView.textChanges(confirmPasswordEditText)
    .skipInitialValue()
    .map(CharSequence::toString);
 
Observable.combineLatest(
    passwordObservable,
    confirmPasswordObservable,
    (password, confirmPassword) -> password.equals(confirmPassword)
).subscribe(
    isValid -> showValidationResult(isValid),
    error -> handleError(error)
);

关键点

combineLatest 在任意一个源 Observable 发射新数据时,组合所有源的最新数据。 适合需要基于多个输入实时计算的场景。

1.6 UI 事件处理

1.6.1 防抖优化搜索输入:
代码语言:javascript
复制
RxView.clicks(submitButton)
    .throttleFirst(1000, TimeUnit.MILLISECONDS) // 1秒内仅允许一次点击
    .flatMap(voidEvent -> api.submitData(data))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(
        success -> showSuccess(),
        error -> handleError(error)
    );
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-11-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 将 RxJava 融入到实际项目架构中,解决复杂问题。
  • 1.1 与 Retrofit 结合
  • 1.2 与 MVP/MVVM 架构结合
    • 1.2.1 与 MVP (Model-View-Presenter) 结合
    • 1.2.2 RxJava 与 MVVM (Model-View-ViewModel) 结合
  • 1.3. RxBus (事件总线)
  • 1.4. 错误处理与资源管理
  • 1.5 复杂任务编排
    • 1.5.1 并行任务合并(zip 操作符)
    • 1.5.2. 顺序任务合并(concat 操作符)
    • 1.5.3. 动态数据流合并(merge 操作符)
    • 1.5.4. 最新数据合并(combineLatest 操作符)
  • 1.6 UI 事件处理
    • 1.6.1 防抖优化搜索输入:
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档