我正在尝试使用Rx异步处理一些任务。
var list = Enumerable.Range(0, 100)
.ToObservable()
.SelectMany(x => Observable.Start(() => {
Console.WriteLine("Processing {0} ...", x);
Thread.Sleep(100 * x % 3);
if (x > 90) {
Console.WriteLine("Procesing exception {0} > 90", x);
throw new Exception("Value too large");
}
Console.WriteLine("Processing {0} completed.", x);
return x;
}))
.Subscribe(
x => { Console.WriteLine("Next [{0}]", x); },
e => {
Console.WriteLine("Exception:");
Console.WriteLine(e.Message);
},
() => { Console.WriteLine("Complete"); }
);
这个代码的问题是,异常不会传递给订阅服务器。所以,经过多次尝试,我放弃了,决定问一个简单的问题:
如何处理在SelectMany
语句中的异步方法中引发的异常?
为了说明这一点,最后的实现是一个同步函数调用,它可能抛出异常,也可能不会抛出异常。目标是将其传递给订阅服务器,以便进一步处理它(在特定情况下,将向用户显示消息)。
编辑
我把我的发现变成了一个答案,这样我就可以把这个问题记下来。就我个人而言,我不同意自我回答。但有时候没有别的办法,所以很抱歉。
发布于 2012-03-11 03:16:11
答案
实际上,代码是正确工作的。但是,当异步操作仍然在后台执行时,调试器会中断异常--至少是在第一个异常发生时已经启动的那些操作。把我扔了!如果您在没有调试器的情况下运行代码,例外是swallowed.So,我想问题确实发生在计算机面前:-)
还有一些关于Observable.Start
的澄清--这是正确的--它应该有一些实际实现的错误处理.见背景。
背景
Observable.Start
是一种方便的方法,它使用Observable.ToAsync
方法将函数/acion转换为异步操作。如果您查看该方法的实现,您将看到它已经执行异常处理/转发。
public static Func<IObservable<TResult>> ToAsync<TResult>(this Func<TResult> function, IScheduler scheduler) {
if (function != null) {
if (scheduler != null) {
return () => {
AsyncSubject<TResult> asyncSubject = new AsyncSubject<TResult>();
scheduler.Schedule(() => {
TResult result = default(TResult);
try {
result = function();
} catch (Exception exception1) {
Exception exception = exception1;
asyncSubject.OnError(exception);
return;
}
asyncSubject.OnNext(result);
asyncSubject.OnCompleted();
});
return asyncSubject.AsObservable<TResult>();
};
} else {
throw new ArgumentNullException("scheduler");
}
} else {
throw new ArgumentNullException("function");
}
}
发布于 2012-03-06 12:11:00
使用Materialize
将OnError / OnCompleted消息转换为通知。
例如,
observable.SelectMany(x => Observable.Start(fn).Materialize())
将使您的错误/完成包装在通知中,以便在实际订阅点中处理,而不是在SelectMany中终止错误。
这对于大多数异步调用操作非常有用,因为该方法要么失败要么完成。
https://stackoverflow.com/questions/9574996
复制