我有一个来自API的GET请求。使用Retrofit
和RxJava
,我得到了一个响应。现在我想将我的响应添加到Room
数据库。但是我不想用AsyncTask
,我想用RxJava
。我在Dao insertAll中准备了一个方法。如何使用RxJava
将我的列表异步添加到数据库?我的get请求:
@GET("contacts")
fun getContactModel(): Single<List<Contact>>
我的Dao insertAll
方法:
@Insert(onConflict = OnConflictStrategy.REPLACE)
fun insertAll(contact: List<Contact>?) : Completable
我使用RxJava的请求:
val disposable = CompositeDisposable()
disposable.add(contactsRepository.modelSingle()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(object : DisposableSingleObserver<List<Contact>>() {
override fun onSuccess(t: List<Contact>) {
// t - My List from Api
}
override fun onError(e: Throwable) {
}
})
)
发布于 2020-01-14 22:42:34
如果contactsRepository.modelSingle()
返回一个Single
Observable,那么您可以这样做
disposable.add(contactsRepository.modelSingle()
.doOnSuccess { data -> saveToDb(data) } // this will be called on Schedulers.io
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(object : DisposableSingleObserver<List<Contact>>() {
override fun onSuccess(t: List<Contact>) {
// t - My List from Api
}
override fun onError(e: Throwable) {
}
})
)
如果它是另一个可观察对象,则可以使用flatMap
disposable.add(contactsRepository.modelSingle()
.flatMap { data ->
saveToDb(data)
Observable.just(data)
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe()
)
发布于 2020-01-14 21:23:29
在java中,这是我将首先使用RxJava的Retrofit,并能够从API检索可观察到的响应,如下所示。
在Retrofit Interface服务中,使用以下代码
@GET("contacts")
Observable<Contacts> getContactsFromAPI
在@Dao Room类实现中
@Insert(onConflict = OnConflictStrategy.REPLACE)
Completable insertContact(List<Contact> contact);
要从API中检索可观察的数据项,请使用以下代码
RetrofitService getContactService = RetroInstance.getService();
Observable<Contact> apiData =
getContactService.getContactsFromAPI();
compositeDisposable.add(apiData
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<Contact>() {
@Override
public void onNext(List<Contact> contact) {
newContact(contact)
Log.d(LOG_TAG, "The contacts are : "+ contact);
}
@Override
public void onError(Throwable e) {
Log.d(LOG_TAG, "Error: "+ e.getMessage());
}
@Override
public void onComplete() {
}
}));
要将添加检索从api添加到Room数据库,然后创建一个名为的方法,该方法用于将数据插入到Room数据库中,并在为从api检索数据而编写的代码中调用此方法
public void newContact(List<Contacts> contacts) {
disposable.add(Completable.fromAction(new Action() {
@Override
public void run() throws Exception {
rowIdInserted =
contactAppDatabase.userDao().insertContact(contacts);
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableCompletableObserver() {
@Override
public void onComplete() {
Log.d(TAG, "inserted successful");
}
@Override
public void onError(Throwable e) {
Log.d(LOG_TAG, "tHE new error on create the new user is: " +
e.getMessage());
}
}));
}
我认为这很有帮助
https://stackoverflow.com/questions/59733338
复制相似问题