January 2, 2025
By: Kevin

Rx.NET介绍第三部分: 从理论到实践

  1. 第九章:调度与线程
    1. Rx, 线程与并发
    2. 定时调用
    3. 调度器
    4. 测试调度器
    5. SubscribeOn和ObserveOn
    6. SubscribeOn和ObserveOn在UI应用程序中
    7. 并发陷阱
    8. 调度器的高级特性
  2. 第十章:基于时间的序列
    1. 时间戳和时间间隔
    2. 延迟
    3. 采样
    4. 节流
    5. 超时
  3. 第十一章:离开Rx的世界
    1. asyncawait 集成
    2. ForEachAsync
    3. ToEnumerable
    4. 转换为单个集合
    5. ToArray和ToList
    6. ToDictionary和ToLookup
    7. ToTask
    8. ToEvent
    9. Do
    10. 用AsObservable封装
  4. 第十二章:错误处理操作符
    1. Catch
    2. Finally
    3. Using
    4. OnErrorResumeNext
    5. Retry

本书的第一部分着重介绍了响应式扩展(Rx)的基本理念和类型. 在第二部分, 展示了 Rx 所提供的操作符, 这些操作符使能够定义想要应用于源数据的转换和计算. 这第二部分本质上属于函数式编程.

Rx 的操作符大多类似于数学函数, 也就是说, 对于特定的输入, 它们的行为方式总是保持一致.

它们不受周围世界状态的影响, 也不会对其状态做出任何改变. 在函数式编程中, 这类机制有时被描述为"纯"的.

这种纯粹性有助于理解代码的功能. 这意味着, 为了理解某个特定部分的运作方式, 无需了解程序其余部分的状态.

然而, 完全与外界脱节的代码不太可能实现任何有用的功能. 在实际应用中, 需要将这些纯粹的计算与更务实的考量联系起来.

"创建可观察序列"一章已经展示了如何定义可观察流, 所以已经了解了如何将现实世界的输入接入到 Rx 的世界中.

但另一端的情况又如何呢? 如何利用处理结果来做一些有用的事情呢?

在某些情况下, 在 IObserver 实现内部开展工作, 使用前文已经了解过的基于回调的订阅机制, 可能就足够了.

有些情况则需要更复杂的处理方式.

在本书的第三部分, 将探讨 Rx 所提供的一些特性, 这些特性有助于将在第二部分所研究的那种处理过程与外界的其他部分联系起来.

第九章:调度与线程

Rx主要是一个用于异步处理"流动数据"的系统. 如果要处理多个信息源, 它们很可能会并发地生成数据.

在处理数据时, 可能希望实现一定程度的并行性以达到可伸缩性目标, 因此需要对系统的这些方面进行控制.

到目前为止, 一直避免显式使用线程或并发. 已经看到一些方法必须处理定时来执行其任务(例如, Buffer, DelaySample 必须安排在特定的时间表上进行工作).

然而, 依赖于默认行为, 虽然默认行为通常能满足需求, 但有时需要更多的控制. 本章将介绍Rx的调度系统, 它提供了一种优雅的方式来管理这些问题.

Rx, 线程与并发

Rx对使用的线程没有限制. IObservable<T> 可以在任何线程上调用其订阅者的 OnNext/Completed/Error 方法, 每次调用可能在不同的线程上.

尽管如此自由, 但Rx有一个方面防止了混乱: 可观察源在任何情况下都必须遵守Rx序列的基本规则.

当首次探讨这些规则时, 重点关注的是它们如何确定对任何单个观察者的调用顺序.

可以有任意数量的 OnNext 调用, 但一旦调用了 OnErrorOnCompleted, 就不能再有进一步的调用.

但现在考虑并发情况时, 这些规则的另一个方面变得更加重要: 对于任何单个订阅, 可观察源不能对该订阅的观察者进行并发调用.

因此, 如果源调用 OnNext, 它必须等到该调用返回后, 才能再次调用 OnNext, 或者调用 OnErrorOnComplete.

这对观察者意味着, 只要观察者只涉及一个订阅, 它每次只会被要求处理一件事情.

无论它所订阅的源是一个涉及许多不同操作符的长而复杂的处理链, 即使通过组合多个输入来构建该源(例如, 使用 Merge). 基本规则要求, 如果对单个 IObservable<T> 只调用一次 Subscribe, 该源永远不允许对 IObserver<T> 方法进行多次并发调用.

因此, 尽管每次调用可能在不同的线程上进行, 但调用是严格顺序执行的(除非单个观察者涉及多个订阅).

Rx操作符在接收传入通知并产生通知时, 会在传入通知到达的线程上通知其观察者. 假设有这样一系列操作符:

source
   .Where(x => x.MessageType == 3)
   .Buffer(10)
   .Take(20)
   .Subscribe(x => Console.WriteLine(x));

当调用 Subscribe 时, 最终会得到一个观察者链.

Rx提供的将调用回调的观察者被传递给 Take 返回的可观察对象, Take 又会创建一个观察者, 该观察者订阅 Buffer 返回的可观察对象, Buffer 又会创建一个观察者订阅 Where 可观察对象, Where 则会创建另一个观察者订阅 source.

因此, 当 source 生成一个元素时, 它将调用 Where 操作符的观察者的 OnNext.

这将调用过滤函数, 如果 MessageType 确实为3, Where 观察者将调用 Buffer 观察者的 OnNext, 并且会在同一线程上进行.

Where 观察者的 OnNextBuffer 观察者的 OnNext 返回之前不会返回.

现在, 如果 Buffer 观察者确定它已经完全填满了一个缓冲区(例如, 它刚刚收到了第10个项目), 那么它也不会立即返回, 它将调用 Take 观察者的 OnNext, 并且只要 Take 尚未收到20个缓冲区, 它就会调用将调用回调的Rx提供的观察者的 OnNext.

因此, 对于传递给 Subscribe 的回调中的 Console.WriteLine 的源通知, 最终会在堆栈上有很多嵌套调用:

source调用:
  Where观察者, 它调用:
    Buffer观察者, 它调用:
      Take观察者, 它调用:
        Subscribe观察者, 它调用lambda

这一切都在一个线程上发生. 大多数Rx操作符没有特定的线程可称为"主场". 它们只是在调用传入的任何线程上完成工作.

这使得Rx非常高效. 将数据从一个操作符传递到下一个操作符仅仅涉及一个方法调用, 而这些调用速度相当快. (实际上, 通常还有更多层. Rx倾向于添加一些包装器来处理错误和提前取消订阅. 因此, 调用堆栈看起来会比刚刚展示的更复杂一些. 但通常仍然只是方法调用. )

有时Rx被描述为具有"自由线程" 模型. 这仅仅意味着操作符通常不关心它们使用的线程.

也有例外情况, 但一个操作符直接调用下一个操作符是常态.

这样的结果是, 通常是原始源决定使用哪个线程. 下一个示例通过创建一个主题, 然后在不同线程上调用 OnNext 并报告线程ID来说明这一点.

Console.WriteLine($"Main thread: {Environment.CurrentManagedThreadId}");
var subject = new Subject<string>();
subject.Subscribe(
    m => Console.WriteLine($"Received {m} on thread: {Environment.CurrentManagedThreadId}"));
object sync = new();
ParameterizedThreadStart notify = arg =>
{
    string message = arg?.ToString()?? "null";
    Console.WriteLine(
        $"OnNext({message}) on thread: {Environment.CurrentManagedThreadId}");
    lock (sync)
    {
        subject.OnNext(message);
    }
};
notify("Main");
new Thread(notify).Start("First worker thread");
new Thread(notify).Start("Second worker thread");

输出:

Main thread: 1
OnNext(Main) on thread: 1
Received Main on thread: 1
OnNext(First worker thread) on thread: 10
Received First worker thread on thread: 10
OnNext(Second worker thread) on thread: 11
Received Second worker thread on thread: 11

在每种情况下, 传递给 Subscribe 的处理程序都在调用 subject.OnNext 的同一线程上被回调. 这很直接且高效. 然而, 事情并不总是这么简单.

定时调用

有些通知不是源提供项目的直接结果. 例如, Rx提供了 Delay 操作符, 可以对元素的传递加入一些延时.

下一个示例基于前面的示例, 主要区别在于不再直接订阅源, 而是通过 Delay:

Console.WriteLine($"Main thread: {Environment.CurrentManagedThreadId}");
var subject = new Subject<string>();
subject
   .Delay(TimeSpan.FromSeconds(0.25))
   .Subscribe(
        m => Console.WriteLine($"Received {m} on thread: {Environment.CurrentManagedThreadId}"));
object sync = new();
ParameterizedThreadStart notify = arg =>
{
    string message = arg?.ToString()?? "null";
    Console.WriteLine(
        $"OnNext({message}) on thread: {Environment.CurrentManagedThreadId}");
    lock (sync)
    {
        subject.OnNext(message);
    }
};
notify("Main 1");
Thread.Sleep(TimeSpan.FromSeconds(0.1));
notify("Main 2");
Thread.Sleep(TimeSpan.FromSeconds(0.3));
notify("Main 3");
new Thread(notify).Start("First worker thread");
Thread.Sleep(TimeSpan.FromSeconds(0.1));
new Thread(notify).Start("Second worker thread");
Thread.Sleep(TimeSpan.FromSeconds(2));

这个示例在发送源之间也会等待一段时间, 这样就可以看到 Delay 的效果. 以下是输出:

Main thread: 1
OnNext(Main 1) on thread: 1
OnNext(Main 2) on thread: 1
Received Main 1 on thread: 12
Received Main 2 on thread: 12
OnNext(Main 3) on thread: 1
OnNext(First worker thread) on thread: 13
OnNext(Second worker thread) on thread: 14
Received Main 3 on thread: 12
Received First worker thread on thread: 12
Received Second worker thread on thread: 12

每条 Received 消息都在线程ID为12的线程上, 有别于引发通知的三个线程.

这并不完全令人惊讶. Rx在这里使用原始线程的唯一方法是让 Delay 在指定的时间(四分之一秒)内阻塞线程, 然后再转发调用.

对于大多数场景来说, 这是不可接受的, 因此 Delay 操作符会安排在适当的延迟后进行回调.

从输出中可以看出, 这些似乎都发生在一个特定的线程上. 无论哪个线程调用 OnNext, 延迟的通知都会到达线程ID为12的线程上.

不是 Delay 操作符创建的线程. 而是 Delay 引发了"调度器"的干预.

调度器

调度器做三件事:

  • 确定执行工作的上下文(在哪个线程是执行)
  • 决定何时执行工作(什么时间执行, 立即执行或延迟执行)
  • 跟踪时间(计时工作)

下面示例展现前两件事:

Console.WriteLine($"Main thread: {Environment.CurrentManagedThreadId}");
Observable
   .Range(1, 5)
   .Subscribe(m =>
      Console.WriteLine(
        $"Received {m} on thread: {Environment.CurrentManagedThreadId}"));
Console.WriteLine("Subscribe returned");
Console.ReadLine();

可能不太明显这与调度有什么关系, 但实际上, Range 总是使用一个调度器来完成其工作. 只是让它使用默认的调度器. 以下是输出:

Main thread: 1
Received 1 on thread: 1
Received 2 on thread: 1
Received 3 on thread: 1
Received 4 on thread: 1
Received 5 on thread: 1
Subscribe returned

看看调度器所做的事情列表中的前两项, 可以看到执行工作的上下文是调用 Subscribe 的线程. 至于它何时决定执行工作, 它决定在 Subscribe 返回之前完成所有工作.

直觉可能认为 Range 立即产生所要求的所有元素, 然后返回. 事情并没有那么简单.

让看看如果同时运行多个 Range 实例会发生什么. 这引入了一个额外的操作符: 一个再次调用 RangeSelectMany:

Observable
   .Range(1, 5)
   .SelectMany(i => Observable.Range(i * 10, 5))
   .Subscribe(m =>
      Console.WriteLine(
        $"Received {m} on thread: {Environment.CurrentManagedThreadId}"));

输出显示 Range 实际上并不一定立即产生所有项目:

Received 10 on thread: 1
Received 11 on thread: 1
Received 20 on thread: 1
Received 12 on thread: 1
Received 21 on thread: 1
Received 30 on thread: 1
Received 13 on thread: 1
Received 22 on thread: 1
Received 31 on thread: 1
Received 40 on thread: 1
Received 14 on thread: 1
Received 23 on thread: 1
Received 32 on thread: 1
Received 41 on thread: 1
Received 50 on thread: 1
Received 24 on thread: 1
Received 33 on thread: 1
Received 42 on thread: 1
Received 51 on thread: 1
Received 34 on thread: 1
Received 43 on thread: 1
Received 52 on thread: 1
Received 44 on thread: 1
Received 53 on thread: 1
Received 54 on thread: 1
Subscribe returned

SelectMany 回调产生的第一个嵌套 Range 产生了几个值(10和11), 但随后第二个 Range 在第一个产生第三个值(12)之前设法产生了它的第一个值(20).

可以看到这里有一些进展的交错. 所以尽管执行工作的上下文仍然是调用 Subscribe 的线程, 但调度器必须做出的第二个选择–何时执行工作–比乍看起来更微妙. 这告诉 Range 并不像这个简单的实现那样简单:

public static IObservable<int> NaiveRange(int start, int count)
{
    return System.Reactive.Linq.Observable.Create<int>(obs =>
    {
        for (int i = 0; i < count; i++)
        {
            obs.OnNext(start + i);
        }
        return Disposable.Empty;
    });
}

如果 Range 像那样工作, 这段代码将在继续处理下一个 Range 之前产生 SelectMany 回调返回的第一个 Range 的所有项目. 实际上, Rx确实提供了一个调度器, 如果想要那种行为, 它会给. 这个示例将 ImmediateScheduler.Instance 传递给嵌套的 Observable.Range 调用:

Observable
   .Range(1, 5)
   .SelectMany(i => Observable.Range(i * 10, 5, ImmediateScheduler.Instance))
   .Subscribe(
        m => Console.WriteLine($"Received {m} on thread: {Environment.CurrentManagedThreadId}"));

以下是结果:

Received 10 on thread: 1
Received 11 on thread: 1
Received 12 on thread: 1
Received 13 on thread: 1
Received 14 on thread: 1
Received 20 on thread: 1
Received 21 on thread: 1
Received 22 on thread: 1
Received 23 on thread: 1
Received 24 on thread: 1
Received 30 on thread: 1
Received 31 on thread: 1
Received 32 on thread: 1
Received 33 on thread: 1
Received 34 on thread: 1
Received 40 on thread: 1
Received 41 on thread: 1
Received 42 on thread: 1
Received 43 on thread: 1
Received 44 on thread: 1
Received 50 on thread: 1
Received 51 on thread: 1
Received 52 on thread: 1
Received 53 on thread: 1
Received 54 on thread: 1
Subscribe returned

通过在最内层的 Observable.Range 调用中指定 ImmediateScheduler.Instance, 要求了一种特定的策略:

这会在调用者的线程上立即调用所有工作. Range 不默认使用它有几个原因. (它的默认是 Scheduler.CurrentThread, 它总是返回 CurrentThreadScheduler 的一个实例. )

首先, ImmediateScheduler.Instance 可能会导致相当深的调用堆栈.

大多数其他调度器维护工作队列, 所以如果一个操作符决定有新工作要做, 而另一个操作符正在进行中 (例如, 一个嵌套的 Range 操作符决定开始发出它的值), 新工作可以被放入队列, 而不是立即开始(这将涉及调用将执行工作的方法), 这样可以让正在进行的工作在开始下一件事情之前完成.

在查询变得复杂时, 到处使用即时调度器可能会导致堆栈溢出.

Range 不默认使用即时调度器的第二个原因是, 当多个可观察对象同时处于活动状态时, 它们都可以取得一些进展– Range 尽快产生它的所有项目, 所以如果它不使用一个允许操作符轮流进行的调度器, 它可能会耗尽其他操作符的CPU时间.

注意, 在这两个示例中, Subscribe returned 消息都最后出现. 所以尽管 CurrentThreadScheduler 不像即时调度器那样急切, 但它仍然不会在完成所有未完成的工作之前返回给调用者.

它维护一个工作队列, 实现了稍微更公平的执行, 并避免了堆栈溢出, 但是一旦有任何东西要求 CurrentThreadScheduler 做某事, 它在排空队列之前不会返回.

并非所有调度器都有这个特性. 这里是前面示例的一个变体, 只有一个对 Range 的调用, 没有任何嵌套的可观察对象.

这次要求它使用 TaskPoolScheduler.

Observable
   .Range(1, 5, TaskPoolScheduler.Default)
   .Subscribe(
        m => Console.WriteLine($"Received {m} on thread: {Environment.CurrentManagedThreadId}"));

与即时调度器和当前线程调度器相比, 这对运行工作的上下文做出了不同的决定, 从输出中可以看出:

Main thread: 1
Subscribe returned
Received 1 on thread: 12
Received 2 on thread: 12
Received 3 on thread: 12
Received 4 on thread: 12
Received 5 on thread: 12

注意, 通知都发生在与调用 Subscribe 的线程(ID为1)不同的线程(ID为12)上.

这是因为 TaskPoolScheduler 的定义特征是它通过任务并行库(TPL)的任务池调用所有工作.

这就是为什么看到不同的线程ID: 任务池不拥有应用程序的主线程.

在这种情况下, 它认为没有必要启动多个线程.

这是合理的, 这里只有一个源, 一次提供一个项目.

在这种情况下没有更多线程是好的–当单个线程顺序处理工作项时, 线程池效率最高, 因为它避免了上下文切换开销, 而且由于这里实际上没有并发工作的范围, 如果它在这种情况下创建了多个线程, 也不会得到任何好处.

这个调度器还有一个非常重要的区别: 注意, 在任何通知到达观察者之前, 对 Subscribe 的调用就返回了.

这是因为这是看到的第一个会引入真正并行性的调度器. ImmediateSchedulerCurrentThreadScheduler 永远不会自己启动新线程, 无论执行的操作符多么希望执行并发操作.

虽然 TaskPoolScheduler 确定不需要创建多个线程, 但它创建的一个线程与应用程序的主线程不同, 这意味着主线程可以与这个订阅并行运行.

由于 TaskPoolScheduler 不会在发起工作的线程上执行任何工作, 所以它可以在将工作排队后立即返回, 使 Subscribe 方法立即返回.

如果在嵌套可观察对象的示例中使用 TaskPoolScheduler 会怎样呢? 这里只在内部对 Range 的调用中使用它, 所以外部的 Range 仍将使用默认的 CurrentThreadScheduler:

Observable
  .Range(1, 5)
  .SelectMany(i => Observable.Range(i * 10, 5, TaskPoolScheduler.Default))
  .Subscribe(
        m => Console.WriteLine($"Received {m} on thread: {Environment.CurrentManagedThreadId}"));

现在可以看到更多的线程参与进来:

#+end_src

Received 10 on thread: 13 Received 11 on thread: 13 Received 12 on thread: 13 Received 13 on thread: 13 Received 40 on thread: 16 Received 41 on thread: 16 Received 42 on thread: 16 Received 43 on thread: 16 Received 44 on thread: 16 Received 50 on thread: 17 Received 51 on thread: 17 Received 52 on thread: 17 Received 53 on thread: 17 Received 54 on thread: 17 Subscribe returned Received 14 on thread: 13 Received 20 on thread: 14 Received 21 on thread: 14 Received 22 on thread: 14 Received 23 on thread: 14 Received 24 on thread: 14 Received 30 on thread: 15 Received 31 on thread: 15 Received 32 on thread: 15 Received 33 on thread: 15 Received 34 on thread: 15

#+end_src

由于在这个示例中只有一个观察者, Rx的规则要求一次只给它一个项目, 所以实际上这里并没有真正的并行性范围, 但更复杂的结构最初会导致比前面的示例更多的工作项进入调度器的队列, 这可能就是这次工作被多个线程处理的原因.

实际上, 这些线程中的大多数可能大部分时间都在 SelectMany 内部的代码中阻塞, 该代码确保一次将一个项目传递给目标观察者.

项目的顺序有点令人惊讶, 子范围本身似乎是随机出现的, 但在每个子范围内几乎是顺序产生项目的(项目14是唯一的例外).

这是 RangeTaskPoolScheduler 交互方式的一个怪癖.

还没有谈到调度器的第三个工作: 跟踪时间.

这在 Range 中不会出现, 因为它试图尽快产生所有项目. 但对于在[定时调用](#timed-invocation)部分展示的 Delay 操作符, 时间显然是一个关键因素.

实际上, 现在是展示调度器提供的API的好时机:

public interface IScheduler
{
    DateTimeOffset Now { get; }

    IDisposable Schedule<TState>(TState state,
                                 Func<IScheduler, TState, IDisposable> action);

    IDisposable Schedule<TState>(TState state,
                                 TimeSpan dueTime,
                                 Func<IScheduler, TState, IDisposable> action);

    IDisposable Schedule<TState>(TState state,
                                 DateTimeOffset dueTime,
                                 Func<IScheduler, TState, IDisposable> action);
}

可以看到, 除了一个之外, 所有这些都与时间有关. 只有第一个 Schedule 重载与时间无关, 操作符在希望调度器尽快运行工作时调用它. 这就是 Range 使用的重载. (严格来说, Range 会询问调度器是否支持长时间运行的操作, 在这种操作中, 一个操作符可以长时间临时控制一个线程. 它在可能的情况下更喜欢使用这种方式, 因为它通常比为它希望产生的每个项目向调度器提交工作更有效. TaskPoolScheduler 支持长时间运行的操作, 这解释了之前看到的有点令人惊讶的输出, 但 CurrentThreadScheduler, Range 的默认选择, 不支持. 所以默认情况下, Range 会为它希望产生的每个元素调用一次第一个 Schedule 重载. )

Delay 使用第二个重载. 确切的实现相当复杂(主要是因为当繁忙的源导致它落后时如何有效地追赶), 但本质上, 每次有新项目到达 Delay 操作符时, 它会安排一个工作项在配置的延迟后运行, 以便它可以将该项目提供给其订阅者, 并带有预期的时间偏移.

调度器必须负责管理时间, 因为.NET有几种不同的计时器机制, 并且计时器的选择通常由处理计时器回调的上下文决定.

由于调度器确定工作运行的上下文, 这意味着它们还必须选择计时器类型.

例如, UI框架通常提供在适合更新用户界面的上下文中调用其回调的计时器.

Rx提供了一些使用这些计时器的特定于UI框架的调度器, 但对于其他场景, 这些可能是不合适的选择.

所以每个调度器使用适合其运行工作项的上下文的计时器.

这有一个有用的结果: 因为 IScheduler 为与时间相关的细节提供了抽象, 所以可以虚拟化时间.

这对于测试非常有用. 如果查看Rx库中的大量测试套件, 会发现有许多测试验证与时间相关的行为.

如果这些测试实时运行, 测试套件将花费太长时间才能运行, 并且还可能产生奇怪的虚假失败, 因为与测试在同一台机器上运行的后台任务偶尔会以可能使测试混淆的方式改变执行速度.

相反, 这些测试使用一个专门的调度器, 它可以完全控制时间的流逝.

注意, 所有三个 IScheduler.Schedule 方法都需要一个回调.

调度器将在它选择的时间和上下文中调用这个回调.

调度器回调将另一个 IScheduler 作为其第一个参数. 这在需要重复调用的场景中使用, 稍后会看到.

Rx提供了几个调度器. 以下部分描述了最广泛使用的调度器.

  1. 即时调度器

    ImmediateScheduler 是Rx提供的最简单的调度器. 如前面部分所示, 每当被要求调度一些工作时, 它就立即运行它. 它在其 IScheduler.Schedule 方法内部这样做.

    这是一个非常简单的策略, 这使得 ImmediateScheduler 非常高效.

    出于这个原因, 许多操作符默认使用 ImmediateScheduler.

    然而, 对于立即产生多个元素的操作符, 尤其是当项目数量可能很大时, 它可能会有问题.

    例如, Rx为 IEnumerable<T> 定义了[ ToObservable 扩展方法](03~CreatingObservableSequences~.md#from-ienumerablet).

    当订阅由这个方法返回的 IObservable<T> 时, 它将立即开始迭代集合, 如果告诉它使用 ImmediateScheduler, Subscribe 将不会返回, 直到它到达集合的末尾.

    对于无限序列来说, 这显然是一个问题, 这就是为什么这种类型的操作符默认不使用 ImmediateScheduler 的原因.

    当调用接受 TimeSpanSchedule 重载时, ImmediateScheduler 也有潜在的令人惊讶的行为.

    这要求调度器在指定的时间长度后运行一些工作. 它实现这一点的方式是调用 Thread.Sleep.

    对于Rx的大多数调度器, 这个重载将安排某种计时器机制在稍后运行代码, 使当前线程能够继续其业务, 但 ImmediateScheduler 在这里名副其实, 它拒绝进行这种延迟执行.

    它只是阻塞当前线程, 直到该工作的时间到了. 这意味着如果指定这个调度器, 像 Interval 返回的基于时间的可观察对象将工作, 但代价是阻止线程做其他任何事情.

    接受 DateTimeSchedule 重载略有不同. 如果指定一个不到10秒后的时间, 它将像使用 TimeSpan 时一样阻塞调用线程.

    但是如果传递一个更远的未来的 DateTime, 它将放弃立即执行, 并回退到使用计时器.

  2. 当前线程调度器

    CurrentThreadSchedulerImmediateScheduler 非常相似. 区别在于当当前线程上已经在处理一个工作项时, 它如何处理调度工作的请求.

    这可能会在将多个使用调度器工作的操作符链接在一起时发生.

    为了理解发生了什么, 了解如何产生多个项目的源, 如 IEnumerable<T>ToObservable 扩展方法或 Observable.Range, 使用调度器是很有帮助的.

    这些类型的操作符不使用普通的 forforeach 循环.

    它们通常为每次迭代安排一个新的工作项(除非调度器恰好为长时间运行的工作做出特殊规定).

    ImmediateScheduler 会立即运行这样的工作项, 而 CurrentThreadScheduler 会检查它是否已经在处理这个线程上的一个工作项. 从前面的这个示例中看到了这一点:

    Observable
      .Range(1, 5)
      .SelectMany(i => Observable.Range(i * 10, 5))
      .Subscribe(
            m => Console.WriteLine($"Received {m} on thread: {Environment.CurrentManagedThreadId}"));
    

    让准确地跟踪这里发生的事情. 首先, 假设这段代码正常运行, 不在任何异常上下文中, 也许在程序的 Main 入口点内.

    当这段代码对 SelectMany 返回的 IObservable<int> 调用 Subscribe 时, 这将依次对第一个 Observable.Range 返回的 IObservable<int> 调用 Subscribe, 这将依次为生成范围中的第一个值(1)安排一个工作项.

    由于没有显式地将调度器传递给 Range, 它将使用其默认选择, 即 CurrentThreadScheduler, 并且它会问自己"是否已经在这个线程上处理某个工作项的中间?"

    在这种情况下, 答案将是" 否" , 所以它将立即运行工作项(在 Range 操作符调用的 Schedule 调用返回之前).

    Range 操作符然后将产生它的第一个值, 调用 SelectMany 操作符在订阅范围时提供的 IObserver<int>OnNext.

    SelectMany 操作符的 OnNext 方法现在将调用它的lambda, 传入提供的参数(Range 操作符的值 1).

    从上面的示例中可以看到, 这个lambda再次调用 Observable.Range, 返回一个新的 IObservable<int>.

    SelectMany 将立即订阅这个(在从它的 OnNext 返回之前).

    这是这段代码第二次对 Range 返回的 IObservable<int> 调用 Subscribe (但这是一个与上次不同的实例),

    并且 Range 将再次默认使用 CurrentThreadScheduler, 并将再次安排一个工作项来执行第一次迭代.

    所以再次, CurrentThreadScheduler 会问自己"是否已经在这个线程上处理某个工作项的中间?"

    但这次, 答案将是"是" . 这就是行为与 ImmediateScheduler 不同的地方.

    CurrentThreadScheduler 为它被使用的每个线程维护一个工作队列, 在这种情况下, 它只是将新安排的工作添加到队列中, 然后返回给 SelectMany 操作符的 OnNext.

    SelectMany 现在已经完成了对第一个 Range 的这个项目(值 1)的处理, 所以它的 OnNext 返回.

    此时, 这个外部 Range 操作符安排另一个工作项. 同样, CurrentThreadScheduler 将检测到它当前正在运行一个工作项, 所以它只是将这个添加到队列中.

    在安排了将生成其第二个值(2)的工作项后, Range 操作符返回.

    记住, 此时在 Range 操作符中运行的代码是第一个安排的工作项的回调, 所以它正在返回 CurrentThreadScheduler, 回到了它的 Schedule 方法内部(由 Range 操作符的 Subscribe 方法调用).

    此时, CurrentThreadScheduler 不会从 Schedule 返回, 因为它检查它的工作队列, 并且会看到现在队列中有两个项目. (有嵌套的 Range 可观察对象安排生成其第一个值的工作项, 还有顶级 Range 可观察对象刚刚安排生成其第二个值的工作项. )

    CurrentThreadScheduler 现在将执行第一个项目: 嵌套的 Range 操作符现在可以生成它的第一个值(将是 10), 所以它调用 SelectMany x提供的观察者的 OnNext, SelectMany 将然后调用它的观察者, 这是由于示例中的顶级 Subscribe 调用而提供的.

    并且那个观察者将只调用传递给 Subscribe 的lambda, 导致 Console.WriteLine 运行.

    在那返回之后, 嵌套的 Range 操作符将安排另一个工作项来生成它的第二个项目.

    同样, CurrentThreadScheduler 将意识到它已经在这个线程上处理一个工作项的中间, 所以它只是将它放入队列中, 然后立即从 Schedule 返回.

    嵌套的 Range 操作符现在完成了这次迭代, 所以它返回给调度器. 调度器现在将获取队列中的下一个项目, 在这种情况下, 是顶级 Range 添加的生成第二个项目的工作项.

    依此类推. 当工作已经在进行时, 这种工作项的排队使得多个可观察源能够并行取得进展.

    相比之下, ImmediateScheduler 立即运行新的工作项, 这就是为什么看不到这种并行进展. (严格来说, 在某些情况下 ImmediateScheduler 不能立即运行工作. 在这些迭代场景中, 它实际上提供了一个稍微不同的调度器, 操作符在安排第一个项目之后的所有工作时使用它, 并且这个调度器检查它是否被要求同时处理多个工作项. 如果是, 它回退到类似于 CurrentThreadScheduler 的排队策略, 除了它是初始工作项本地的队列, 而不是每个线程的队列. 这防止了由于多线程导致的问题, 并且也避免了在迭代操作符在当前工作项的处理程序中安排新工作项时可能发生的堆栈溢出. 由于队列不是在线程中的所有工作之间共享的, 这仍然具有确保任何由工作项排队的嵌套工作在 Schedule 调用返回之前完成的效果. 所以即使当这种排队生效时, 通常也不会看到像 CurrentThreadScheduler 那样来自不同源的工作交错. 例如, 如果告诉嵌套的 Range 使用 ImmediateScheduler, 这种排队行为将在 Range 开始迭代时生效, 但是因为队列是嵌套 Range 执行的初始工作项本地的, 它最终将在返回之前产生所有嵌套 Range 项目. )

  3. 默认调度器

    DefaultScheduler 用于可能需要随着时间分布的工作, 或者可能希望并发执行的情况.

    这些特征意味着它不能保证在任何特定线程上运行工作, 实际上它通过CLR的线程池调度工作.

    这是Rx所有基于时间的操作符以及 Observable.ToAsync 操作符的默认调度器, Observable.ToAsync 操作符可以将一个.NET方法包装为 IObservable<T>.

    虽然这个调度器在希望工作不在当前线程上发生时很有用, 例如编写一个带有用户界面的应用程序, 并且希望避免在负责更新UI和响应用户输入的线程上做太多工作, 但它可能最终在任何线程上运行工作这一事实可能会使事情变得复杂.

    如果希望所有工作都在一个线程上发生, 只是不是现在所在的线程, 该怎么办? 有另一个调度器用于此.

  4. 事件循环调度器

    EventLoopScheduler 提供一次性调度, 将新安排的工作项排队. 这类似于 CurrentThreadScheduler 在仅从一个线程使用它时的操作方式.

    区别在于 EventLoopScheduler 为此工作创建一个专用线程, 而不是使用安排工作的任何线程.

    与到目前为止检查的调度器不同, 没有用于获取 EventLoopScheduler 的静态属性. 这是因为每个 EventLoopScheduler 都有自己的线程, 所以需要显式创建一个.

    它提供两个构造函数:

    public EventLoopScheduler()
    public EventLoopScheduler(Func<ThreadStart, Thread> threadFactory)
    

    第一个创建一个线程. 第二个控制线程创建过程. 它调用提供的回调, 并将传递给它自己的回调, 需要在新创建的线程上运行这个回调.

    EventLoopScheduler 实现 IDisposable, 调用 Dispose 将允许线程终止.

    这可以与 Observable.Using 方法很好地配合使用.

    以下示例展示了如何使用 EventLoopScheduler 在专用线程上迭代 IEnumerable<T> 的所有内容, 并确保线程在完成后退出:

    IEnumerable<int> xs = GetNumbers();
    Observable.Using(
        () => new EventLoopScheduler(),
        scheduler => xs.ToObservable(scheduler))
    .Subscribe(...);
    
  5. 新线程调度器

    NewThreadScheduler 为它接收到的每个工作项创建一个新线程来执行.

    在大多数场景中, 这可能不太合理. 然而, 想要执行一些长时间运行的工作并通过 IObservable<T> 表示其完成的情况下, 它可能很有用.

    Observable.ToAsync 正是这样做的, 并且通常会使用 DefaultScheduler, 这意味着它将在一个线程池线程上运行工作.

    但是如果工作可能需要一两秒以上的时间, 线程池可能不是一个好选择, 因为它针对短执行时间进行了优化, 并且其管理线程池大小的启发式方法并不是为长时间运行的操作而设计的.

    在这种情况下, NewThreadScheduler 可能是更好的选择.

    虽然每次调用 Schedule 都会创建一个新线程, 但 NewThreadScheduler 会将不同的调度器传递到工作项回调中, 这意味着任何尝试执行迭代工作的操作都不会为每次迭代创建一个新线程.

    例如, 如果将 NewThreadSchedulerObservable.Range 一起使用, 每次订阅生成的 IObservable<int> 时, 将得到一个新线程, 但即使 Range 为它产生的每个值安排一个新的工作项, 也不会为每个项目得到一个新线程.

    它通过传递给工作项回调的嵌套调度器来安排这些按值的工作项, 并且 NewThreadScheduler 在这些情况下提供的嵌套调度器会在同一线程上调用所有这样的嵌套工作项.

  6. 同步上下文调度器

    这个调度器通过SynchronizationContext调用所有工作. 这在用户界面场景中很有用.

    大多数.NET客户端用户界面框架都提供一个 SynchronizationContext, 可用于在适合更新用户界面的上下文中调用回调. (通常这涉及在正确的线程上调用它们, 但各个实现可以决定什么构成适当的上下文. )

  7. 任务池调度器

    使用任务并行库(TPL)任务通过线程池调用所有工作.

    TPL是在CLR线程池多年后引入的, 现在是通过线程池启动工作的推荐方式. 在添加TPL时, 当通过任务调度工作时, 线程池会使用与依赖旧的线程池API时略有不同的算法.

    这种新算法在某些场景中使它更高效. 现在文档对此相当模糊, 所以不清楚在现代.NET上这些差异是否仍然存在, 但任务仍然是使用线程池的推荐机制.

    Rx的默认调度器出于向后兼容的原因使用旧的CLR线程池API.

    在性能关键代码中, 如果有大量工作在螺纹池线程上运行, 可以尝试使用 TaskPoolScheduler, 看看它是否对工作负载提供任何性能优势.

  8. 线程池调度器

    使用旧的TPL之前的API通过线程池调用所有工作. 这个类型是一个历史遗留物, 可追溯到并非所有平台都提供相同类型的线程池的时候.

    在几乎所有情况下, 如果想要这个类型设计用于的行为, 应该使用 DefaultScheduler (尽管 TaskPoolScheduler 提供了不同的行为).

    使用 ThreadPoolScheduler 有区别的唯一场景是编写通用Windows平台(UWP)应用程序时.

    System.Reactive v6.0的UWP目标为此类提供了与所有其他目标不同的实现.

    它使用 Windows.System.Threading.ThreadPool, 而所有其他目标使用 System.Threading.ThreadPool.

    UWP版本提供了属性, 允许配置特定于UWP线程池的一些功能.

    在实践中, 最好在新代码中避免使用这个类. UWP目标有不同实现的唯一原因是UWP过去不提供 System.Threading.ThreadPool.

    但当UWP在Windows 10.0.19041版本中添加对.NET Standard 2.0的支持时, 情况发生了变化.

    不再有任何好的理由存在特定于UWP的 ThreadPoolScheduler, 并且这个类型在UWP目标中非常不同但为了向后兼容必须保留, 这是一个混淆的来源. (它很可能被弃用, 因为Rx 7将解决由于 System.Reactive 组件当前直接依赖UI框架而产生的一些问题. )

    如果使用 DefaultScheduler, 无论在哪个平台上运行, 都将使用 System.Threading.ThreadPool.

  9. UI框架调度器: ControlScheduler, DispatcherScheduler和CoreDispatcherScheduler

    虽然 SynchronizationContextScheduler 适用于.NET中所有广泛使用的客户端UI框架, 但Rx提供了更专门的调度器. ControlScheduler 用于Windows Forms应用程序, DispatcherScheduler 用于WPF, CoreDispatcherScheduler 用于UWP.

    这些更专门的类型提供了两个好处. 首先, 不一定必须在目标UI线程上获取这些调度器的实例.

    而对于 SynchronizationContextScheduler, 通常获取它所需的 SynchronizationContext 的唯一方法是在UI线程上运行时检索 SynchronizationContext.Current.

    但是这些其他特定于UI框架的调度器可以传递一个合适的 Control, DispatcherCoreDispatcher, 可以从非UI线程获取这些.

    其次, DispatcherSchedulerCoreDispatcherScheduler 提供了一种使用 DispatcherCoreDispatcher 类型支持的优先级机制的方法.

测试调度器

Rx库定义了几个虚拟化时间的调度器, 包括 HistoricalScheduler, TestScheduler, VirtualTimeSchedulerVirtualTimeSchedulerBase.

将在第十六章测试中查看这种类型的调度器.

SubscribeOn和ObserveOn

到目前为止, 已经讨论了为什么一些Rx源需要访问调度器.

这对于与时间相关的行为以及尽可能快地产生项目的源是必要的. 但请记住, 调度器控制三件事:

  • 确定执行工作的上下文(例如, 某个线程)
  • 决定何时执行工作(例如, 立即执行或延迟执行)
  • 跟踪时间

到目前为止的讨论主要集中在第二和第三个功能上.

当涉及到自己的应用程序代码时, 最有可能使用调度器来控制第一个方面.

Rx为此定义了两个 IObservable<T> 的扩展方法: SubscribeOnObserveOn.

这两个方法都接受一个 IScheduler 并返回一个 IObservable<T>, 因此可以在这些方法的下游链接更多操作符.

这些方法的作用正如其名. 如果使用 SubscribeOn, 那么当对返回的 IObservable<T> 调用 Subscribe 时, 它会安排通过指定的调度器调用原始 IObservable<T>Subscribe 方法. 以下是一个示例:

Console.WriteLine($"[T:{Environment.CurrentManagedThreadId}] Main thread");
Observable
 .Interval(TimeSpan.FromSeconds(1))
 .SubscribeOn(new EventLoopScheduler((start) =>
  {
      Thread t = new(start) { IsBackground = false };
      Console.WriteLine($"[T:{t.ManagedThreadId}] Created thread for EventLoopScheduler");
      return t;
  }))
 .Subscribe(tick =>
          Console.WriteLine(
            $"[T:{Environment.CurrentManagedThreadId}] {DateTime.Now}: Tick {tick}"));
Console.WriteLine($"[T:{Environment.CurrentManagedThreadId}] {DateTime.Now}: Main thread exiting");

这个示例调用 Observable.Interval (默认使用 DefaultScheduler), 但不是直接订阅它, 而是首先获取 Interval 返回的 IObservable<T> 并调用 SubscribeOn.

使用了一个 EventLoopScheduler, 并传递了一个它将用于创建线程的工厂回调, 以确保它是一个非后台线程. (默认情况下, EventLoopScheduler 会为自己创建一个后台线程, 这意味着该线程不会强制进程保持活动. 通常这是想要的, 但在这个示例中正在改变它以展示发生的事情. )

当对 SubscribeOn 返回的 IObservable<long> 调用 Subscribe 时, 它会在提供的 EventLoopScheduler 上调用 Schedule, 并且在该工作项的回调中, 它然后对原始 Interval 源调用 Subscribe.

所以效果是对底层源的订阅不在主线程上发生, 而是在为 EventLoopScheduler 创建的线程上发生. 运行程序会产生以下输出:

[T:1] Main thread
[T:12] Created thread for EventLoopScheduler
[T:1] 21/07/2023 14:57:21: Main thread exiting
[T:6] 21/07/2023 14:57:22: Tick 0
[T:6] 21/07/2023 14:57:23: Tick 1
[T:6] 21/07/2023 14:57:24: Tick 2
...

注意, 应用程序的主线程在源开始产生通知之前退出.

但也要注意, 新创建的线程的线程ID是12, 而通知却在一个不同的线程上, ID为6! 发生了什么?

这经常让人困惑. 订阅可观察源的调度器不一定对源启动并运行后的行为有任何影响.

记得之前说过 Observable.Interval 默认使用 DefaultScheduler 吗? 嗯, 在这里没有为 Interval 指定调度器, 所以它将使用默认的.

它不在乎从什么上下文调用它的 Subscribe 方法.

所以实际上, 在这里引入 EventLoopScheduler 的唯一效果是即使在主线程退出后也保持进程活着.

在它对 Observable.Interval 返回的 IObservable<long> 进行初始 Subscribe 调用后, 该调度器线程实际上再也没有被使用过.

它只是耐心地等待永远不会到来的进一步 Schedule 调用.

并非所有源都完全不受其 Subscribe 被调用的上下文的影响, 不过. 如果将这一行:

.Interval(TimeSpan.FromSeconds(1))

替换为:

.Range(1, 5)

那么得到以下输出:

[T:1] Main thread
[T:12] Created thread for EventLoopScheduler
[T:12] 21/07/2023 15:02:09: Tick 1
[T:1] 21/07/2023 15:02:09: Main thread exiting
[T:12] 21/07/2023 15:02:09: Tick 2
[T:12] 21/07/2023 15:02:09: Tick 3
[T:12] 21/07/2023 15:02:09: Tick 4
[T:12] 21/07/2023 15:02:09: Tick 5

现在所有通知都在为 EventLoopScheduler 创建的线程12上进入.

注意, 即使在这里, Range 也没有使用那个调度器.

区别在于 Range 默认使用 CurrentThreadScheduler, 所以它将从调用它的任何线程生成输出.

所以即使它实际上没有使用 EventLoopScheduler, 它最终还是使用了那个调度器的线程, 因为使用那个调度器订阅了 Range.

所以这说明了 SubscribeOn 确实做到了它所承诺的: 它确实确定了 Subscribe 被调用的上下文.

只是这个上下文并不总是重要的. 如果 Subscribe 做了重要的工作, 它可能就重要了.

例如, 如果使用 Observable.Create 创建一个自定义序列, SubscribeOn 确定了传递给 Create 的回调被调用的上下文.

但是Rx没有 当前 调度器的概念, 没有办法问"是从哪个调度器被调用的?"

所以Rx操作符不会仅仅从它们被订阅的上下文继承它们的调度器.

当涉及到发出项目时, Rx提供的大多数源属于以下三类之一.

首先, 响应来自上游源的输入而产生输出的操作符(例如, Where, SelectGroupBy)通常在它们自己的 OnNext 内部调用它们的观察者方法.

所以无论它们的源可观察对象在调用 OnNext 时正在哪个上下文中运行, 那就是操作符将用于调用其观察者的上下文.

其次, 迭代地或基于定时产生项目的操作符将使用一个调度器(要么显式提供, 要么在未指定时使用默认类型).

第三, 一些源只是从它们喜欢的任何上下文中产生项目. 例如, 如果一个 async 方法使用 await 并指定 ConfigureAwait(false), 那么在 await 完成后, 它可能在或多或少的任何线程和任何上下文中, 并且它可能然后继续调用观察者的 OnNext.

只要一个源遵循Rx序列的基本规则, 它就可以从任何它喜欢的上下文中调用其观察者的方法.

它可以选择接受一个调度器作为输入并使用它, 但它没有义务这样做. 如果有一个这样难以控制的源, 这就是 ObserveOn 扩展方法的用武之地. 考虑以下这个相当愚蠢的示例:

Observable
 .Interval(TimeSpan.FromSeconds(1))
 .SelectMany(tick => Observable.Return(tick, NewThreadScheduler.Default))
 .Subscribe(tick =>
      Console.WriteLine($"{DateTime.Now}-{Environment.CurrentManagedThreadId}: Tick {tick}"));

这故意使每个通知都在不同的线程上到达, 如下输出所示:

Main thread: 1
21/07/2023 15:19:56-12: Tick 0
21/07/2023 15:19:57-13: Tick 1
21/07/2023 15:19:58-14: Tick 2
21/07/2023 15:19:59-15: Tick 3
...

(它通过为从 Interval 出现的每个Tick调用 Observable.Return 并告诉 Return 使用 NewThreadScheduler 来实现这一点.

每次这样调用 Return 都会创建一个新线程.

这是一个糟糕的主意, 但这是一种容易获得每次从不同上下文调用的源的方法. )如果想强加一些顺序, 可以添加一个对 ObserveOn 的调用:

Observable
.Interval(TimeSpan.FromSeconds(1))
.SelectMany(tick => Observable.Return(tick, NewThreadScheduler.Default))
.ObserveOn(new EventLoopScheduler())
.Subscribe(tick =>
      Console.WriteLine($"{DateTime.Now}-{Environment.CurrentManagedThreadId}: Tick {tick}"));

在这里创建了一个 EventLoopScheduler, 因为它创建一个单线程, 并在该线程上运行每个调度的工作项. 输出现在每次都显示相同的线程ID(13):

Main thread: 1
21/07/2023 15:24:23-13: Tick 0
21/07/2023 15:24:24-13: Tick 1
21/07/2023 15:24:25-13: Tick 2
21/07/2023 15:24:26-13: Tick 3
...

所以尽管由 Observable.Return 创建的每个新可观察对象都创建一个全新的线程, 但 ObserveOn 确保的观察者的 OnNext (以及在调用 OnCompletedOnError 的情况下)通过指定的调度器被调用.

SubscribeOn和ObserveOn在UI应用程序中

如果在用户界面中使用Rx, ObserveOn 在处理不在UI线程上提供通知的信息源时很有用.

可以用 ObserveOn 包装任何 IObservable<T>, 传递一个 SynchronizationContextScheduler (或特定于框架的类型, 如 DispatcherScheduler), 以确保观察者在UI线程上接收通知, 从而安全地更新UI.

SubscribeOn 在用户界面中也很有用, 可以确保可观察源为启动所做的任何初始化工作不在UI线程上发生.

大多数UI框架为任何一个窗口指定一个特定的线程用于接收用户通知和更新UI.

避免阻塞这个UI线程至关重要, 因为这样做会导致糟糕的用户体验.

如果UI线程上执行工作, 在该工作完成之前, UI线程将无法响应用户输入.

一般来说, 如果使用户界面无响应超过100毫秒, 用户就会感到烦躁, 所以不应该在UI线程上执行任何耗时超过此的工作.

当微软首次推出其应用商店(随Windows 8一起推出)时, 他们指定了一个更严格的限制: 如果应用程序阻塞UI线程超过50毫秒, 它可能不被允许进入商店.

凭借现代处理器提供的处理能力, 可以在50毫秒内完成大量处理.

即使在移动设备中相对低功率的处理器上, 这也足够执行数百万条指令.

然而, 任何涉及I/O(读取或写入文件, 或等待任何类型的网络服务响应)的操作都不应该在UI线程上完成. 创建响应式UI应用程序的一般模式是:

  • 接收关于某种用户操作的通知
  • 如果需要缓慢的工作, 在后台线程上执行
  • 将结果传递回UI线程
  • 更新UI

Rx非常适合这种模式: 响应事件, 潜在地组合多个事件, 将数据传递给链式方法调用.

通过包含调度, 甚至有能力在需要时离开并回到UI线程, 以获得用户期望的响应式应用程序感觉.

考虑一个使用Rx填充 ObservableCollection<T> 的WPF应用程序. 可以使用 SubscribeOn 确保主要工作不在UI线程上完成, 然后使用 ObserveOn 确保在正确的线程上收到通知.

如果没有使用 ObserveOn 方法, 那么 OnNext 处理程序将在引发通知的同一线程上被调用.

在大多数UI框架中, 这将导致某种不支持/跨线程异常. 在这个示例中, 订阅一个 Customers 序列.

使用 Defer, 以便如果 GetCustomers 在返回其 IObservable<Customer> 之前进行任何缓慢的初始工作, 直到订阅才会发生.

然后使用 SubscribeOn 在任务池线程上调用该方法并执行订阅.

然后确保当收到 Customer 通知时, 在 Dispatcher 上的 Customers 集合中添加它们.

Observable
.Defer(() => _customerService.GetCustomers())
.SubscribeOn(TaskPoolScheduler.Default)
.ObserveOn(DispatcherScheduler.Instance)
.Subscribe(Customers.Add);

Rx还为 IObservable<T> 提供了 SubscribeOnDispatcher()ObserveOnDispatcher() 扩展方法, 它们自动使用当前线程的 Dispatcher (以及 CoreDispatcher 的等效方法).

虽然这些可能稍微更方便, 但它们可能会使测试代码变得更困难. 将在测试Rx章节中解释原因.

并发陷阱

向应用程序引入并发会增加其复杂性. 如果添加一层并发并没有显著改善应用程序, 那么应该避免这样做.

并发应用程序可能会在调试, 测试和重构等方面出现维护问题.

并发引入的常见问题是不可预测的定时.

不可预测的定时可能由系统上的可变负载以及系统配置的变化(例如, 不同的核心时钟速度和处理器可用性)引起.

这些最终可能导致死锁, 活锁和损坏的状态.

向应用程序引入并发的一个特别重要的危险是可能会悄悄引入错误.

由不可预测的定时引起的错误是出了名的难以检测, 这使得这些类型的缺陷很容易在开发, 质量保证和用户接受测试中溜走, 只在生产环境中才显现出来.

然而, Rx在简化可观察序列的并发处理方面做得非常好, 以至于许多这些问题都可以得到缓解.

仍然可能会创建问题, 但如果遵循指南, 可以放心, 已经大大降低了不必要的竞争条件的可能性.

在后面的测试Rx章节中, 将研究Rx如何提高测试并发工作流的能力.

  1. 死锁

    Rx可以简化并发处理, 但它并不能免疫死锁.

    一些调用(如 First, Last, SingleForEach)是阻塞的, 它们在等待的事情发生之前不会返回. 以下示例显示这很容易导致死锁:

    var sequence = new Subject<int>();
    Console.WriteLine("Next line should lock the system.");
    IEnumerable<int> value = sequence.First();
    sequence.OnNext(1);
    Console.WriteLine("I can never execute....");
    

    First 方法在其源发出一个序列之前不会返回. 但导致这个源发出序列的代码在调用 First 之后的行.

    所以源在 First 返回之前不能发出序列. 这种有两个参与方, 每个方都不能继续直到另一方继续的死锁风格通常被称为"致命拥抱".

    如这个代码所示, 即使在单线程代码中也完全可能发生致命拥抱.

    实际上, 这段代码的单线程性质就是导致死锁的原因: 有两个操作(等待第一个通知, 和发送第一个通知), 但只有一个线程.

    这不一定是个问题. 如果使用 FirstAsync 并附加一个观察者, FirstAsync 将在源 Subject<int> 调用其 OnNext 时执行其逻辑.

    但这比仅仅调用 First 并将结果分配给一个变量更复杂.

    这是一个过于简化的示例, 用于说明行为, 在生产中永远不会编写这样的代码. (即使这样做了, 它也会很快且一致地失败, 会立即意识到问题. )但在实际应用程序代码中, 这些类型的问题可能更难发现.

    竞争条件经常在集成点潜入系统, 所以问题不一定在任何一段代码中明显: 定时问题可能由于将多个代码片段组合在一起的方式而出现.

    下一个示例可能更难检测, 但与第一个不现实的示例只有一小步之遥.

    基本思想是有一个表示用户界面中按钮点击的主题.

    表示用户输入的事件处理程序由UI框架调用.

    只提供框架事件处理程序方法, 每当感兴趣的事件发生时, 例如按钮被点击, 它就会为调用这些方法.

    这段代码在表示点击的主题上调用 First, 但在这里这可能导致问题并不像前面的示例那么明显:

    public Window1()
    {
        InitializeComponent();
        DataContext = this;
        Value = "Default value";
    
        // 死锁! 需要调度程序继续允许点击按钮以产生一个值
        Value = _subject.First();
    
        // 这将产生预期的效果, 但因为它不阻塞,
        // 可以在UI线程上调用它而不会死锁.
        //_subject.FirstAsync(1).Subscribe(value => Value = value);
    }
    private void MyButton_Click(object sender, RoutedEventArgs e)
    {
        _subject.OnNext("New Value");
    }
    public string Value
    {
        get { return _value; }
        set
        {
            _value = value;
            PropertyChanged?.Invoke(this, new PropertyChangedEventArgs("Value"));
        }
    }
    

    前面的示例在 First 返回后调用主题的 OnNext, 这使得相对容易看出如果 First 不返回, 那么主题就不会发出通知.

    但在这里不是那么明显. MyButton_Click 事件处理程序将在 InitializeComponent 调用内部设置(这在WPF代码中是正常的), 所以显然已经做了必要的设置来使事件能够流动.

    当到达对 First 的这个调用时, UI框架已经知道如果用户点击 MyButton, 它应该调用 MyButton_Click, 并且那个方法将导致主题发出一个值.

    在那里使用 First 本身并没有本质上的错误. (有风险, 是的, 但在某些场景中, 那段确切的代码完全没问题. )问题是使用它的上下文.

    这段代码在UI元素的构造函数中, 并且这些总是在与那个窗口的UI元素相关联的特定线程上运行. (这是一个WPF示例, 但其他UI框架工作方式相同. )并且那是UI框架将用于传递用户输入通知的相同线程.

    如果阻塞这个UI线程, 就阻止了UI框架调用按钮点击事件处理程序.

    所以这个阻塞调用正在等待一个只能从它正在阻塞的线程上引发的事件, 从而创建了一个死锁.

    应该尽量避免在Rx中使用阻塞调用. 这是一个很好的经验法则. 可以通过注释掉使用 First 的行并取消注释下面包含这段代码的行来修复上面的代码:

    _subject.FirstAsync(1).Subscribe(value => Value = value);
    

    这使用 FirstAsync, 它做同样的工作, 但采用不同的方法. 它实现相同的逻辑, 但它返回一个 IObservable<T>, 如果想在第一个值最终出现时接收它, 必须订阅这个 IObservable<T>.

    它比仅仅将 First 的结果分配给 Value 属性更复杂, 但它更适合不知道那个源何时会产生一个值的事实.

    在有前端开发经验的工程师看来, 最后那个示例显然是错误的: 在窗口的构造函数中有代码, 它不会允许构造函数完成, 直到用户点击那个窗口中的按钮.

    在构造完成之前, 窗口甚至不会出现, 所以等待用户点击按钮是没有意义的. 在构造函数完成之前, 那个按钮甚至不会在屏幕上可见.

    此外, 经验丰富的UI开发人员知道不应停止主线程去等待用户的特定操作. (即使是模态对话框, 实际上确实要求在继续之前得到响应, 也不会阻塞UI线程. )但如下面的示例所示, 问题可能更难发现.

    在这个示例中, 按钮的点击处理程序将尝试从通过接口公开的可观察序列中获取第一个值.

    public partial class Window1 : INotifyPropertyChanged
    {
        // 想象这里有依赖注入.
        private readonly IMyService _service = new MyService();
        private int _value2;
        public Window1()
        {
            InitializeComponent();
            DataContext = this;
        }
        public int Value2
        {
            get { return _value2; }
            set
            {
                _value2 = value;
                var handler = PropertyChanged;
                if (handler!= null) handler(this, new PropertyChangedEventArgs(nameof(Value2)));
            }
        }
        #region INotifyPropertyChanged Members
        public event PropertyChangedEventHandler PropertyChanged;
        #endregion
        private void MyButton2_Click(object sender, RoutedEventArgs e)
        {
            Value2 = _service.GetTemperature().First();
        }
    }
    

    与前面的示例不同, 这个示例没有尝试在构造函数中阻塞进展. 阻塞调用 First 发生在这里的按钮点击处理程序(靠近末尾的 MyButton2_Click 方法)中.

    这个示例更有趣, 因为这种事情不一定是错误的. 应用程序经常在点击处理程序中执行阻塞操作.

    当点击一个按钮来保存文档的副本时, 期望应用程序执行所有必要的I/O工作, 将数据写入存储.

    在现代固态存储设备上, 这通常发生得非常快, 几乎是瞬间的, 但在机械硬盘是常态的过去, 应用程序在保存文档时短暂无响应并不罕见.

    即使在今天, 如果存储是远程的, 并且网络问题导致延迟, 也可能会发生这种情况.

    所以, 即使已经学会对像 First 这样的阻塞操作持怀疑态度, 在这个例子中它也可能是可以的.

    仅从这段代码来看, 无法确定. 这完全取决于 GetTemperature 返回的是哪种可观察对象, 以及它产生项目的方式.

    First 的调用将在UI线程上阻塞, 直到第一个项目可用, 所以如果产生第一个项目需要访问UI线程, 这将产生死锁. 以下是一种稍微人为的创建该问题的方式:

    class MyService : IMyService
    {
        public IObservable<int> GetTemperature()
        {
            return Observable.Create<int>(
                o =>
                {
                    o.OnNext(27);
                    o.OnNext(26);
                    o.OnNext(24);
                    return () => { };
                })
              .SubscribeOnDispatcher();
        }
    }
    

    这通过一系列对 OnNext 的调用来模拟实际温度传感器的行为. 但它做了一些奇怪的显式调度: 它调用 SubscribeOnDispatcher.

    这是一个扩展方法, 实际上调用 SubscribeOn(DispatcherScheduler.Current.Dispatcher).

    这实际上告诉Rx, 当尝试订阅 GetTemperature 返回的 IObservable<int> 时, 该订阅调用应该通过一个特定于WPF的调度器来完成, 该调度器在UI线程上运行其工作项. (严格来说, WPF允许多个UI线程, 所以更准确地说, 这段代码只有在UI线程上调用它时才有效, 并且如果这样做, 调度器将确保工作项被调度到同一个UI线程上. )

    结果是, 当点击处理程序调用 First 时, 这将依次订阅 GetTemperature 返回的 IObservable<int>, 并且因为它使用了 SubscribeOnDispatcher, 它不会立即调用传递给 Observable.Create 的回调.

    相反, 它安排一个工作项, 当UI线程(即正在运行的线程)空闲时将执行该工作项.

    现在它不被认为是空闲的, 因为它正在处理按钮点击. 将这个工作项交给调度器后, Subscribe 调用返回给 First 方法. 然后 First 方法现在等待第一个项目出现.

    由于它在那之前不会返回, UI线程在那之前不会被认为是可用的, 这意味着应该产生第一个项目的调度工作项永远无法运行, 就有了死锁.

    这归结为与第一个 First 相关的死锁示例相同的基本问题.

    有两个过程: 元素的生成, 和等待元素的出现. 这些需要同时进行, 需要在源发出第一个元素时" 等待第一个元素" 的逻辑已经在运行.

    这些示例都只使用一个线程, 这使得使用单个阻塞调用(First)来同时设置观察第一个项目的过程和等待它发生是一个坏主意.

    但即使在所有三种情况下都是相同的基本问题, 随着代码变得更复杂, 它也变得更难看到. 在实际应用程序代码中, 通常比这更难看到死锁的根本原因.

    在实际中开多线程并不可避免, 而且以上提及的诸多潜在bug难以发现, 主要意图是提醒读者的重视, 并且尽可能避免手动管理.

    虽然采用Rx不能神奇地避免经典的并发问题, 但Rx可以使正确处理并发更容易, 只要遵循以下两条规则:

    • 只有顶级订阅者应该做出调度决策
    • 避免使用阻塞调用, 例如 First, LastSingle

    最后一个示例出问题的原因很简单; GetTemperature 服务在决定调度模型时, 实际上它没有权力这样做.

    代表温度传感器的代码不应该需要知道正在使用特定的UI框架, 当然也不应该单方面决定它将在WPF用户界面线程上运行某些工作.

    在开始使用Rx时, 很容易说服自己将调度决策嵌入到较低层中在某种程度上是"有帮助的" .

    新手何容易弄出一个操作符, 它"不仅提供了温度读数, 还自动在UI线程上发出通知, 订阅者都不必费心使用 ObserveOn."

    意图可能是好的, 但太容易创建一个线程噩梦.

    只有设置订阅并消费其结果的代码才能对并发要求有完整的概述, 所以这是选择使用哪些调度器的正确级别.

    较低层的代码不应该试图参与; 它们应该只做被告知的事情. (Rx可以说自己稍微违反了这条规则, 在需要的地方选择默认调度器. 但它做出非常保守的选择, 旨在最大限度地减少死锁的机会, 并且总是允许应用程序通过指定调度器来控制. )

    请注意, 遵循上述两条规则中的任何一条都足以防止这个示例中的死锁. 但最好同时遵循两条规则.

    这确实留下了一个未回答的问题: 顶级订阅者应该如何做出调度决策?

    已经确定了需要做出决策的代码区域, 但决策应该是什么? 这将取决于正在编写的应用程序类型. 对于UI代码, 这种模式通常效果很好: "在后台线程上订阅; 在UI线程上观察".

    对于UI代码, 死锁的风险出现是因为UI线程实际上是一个共享资源, 对该资源的争用可能会产生死锁.

    所以策略是尽可能避免需要该资源: 不需要在该线程上的工作就不应该在该线程上, 这就是为什么通过使用 TaskPoolScheduler 等在工作线程上执行订阅可以降低死锁风险.

    因此, 如果有可观察源决定何时产生事件(例如, 定时器, 或代表来自外部信息源或设备的输入的源), 也希望这些源在工作线程上调度工作.

    只有当需要更新用户界面时, 才需要代码在UI线程上运行, 所以通过在最后一刻使用 ObserveOn 与合适的UI感知调度器(如WPF的 DispatcherScheduler)结合来延迟这一点.

    如果有一个由多个操作符组成的复杂Rx查询, 这个 ObserveOn 应该在最后, 就在调用 Subscribe 来附加将更新UI的处理程序之前.

    这样, 只有最后一步, 即更新UI, 才需要访问UI线程. 到这个运行时, 所有复杂的处理都将完成, 所以它应该能够非常快地运行, 几乎立即释放对UI线程的控制, 提高应用程序的响应性, 并降低死锁的风险.

    其他场景将需要其他策略, 但处理死锁的一般原则始终相同: 了解哪些共享资源需要独占访问.

    例如, 如果有一个传感器库, 它可能创建一个专用线程来监控设备并报告新的测量值, 如果它规定某些工作必须在该线程上完成, 这将与UI场景非常相似:

    有一个特定的线程需要避免阻塞. 相同的方法可能在这里适用. 但这不是唯一的场景类型.

    可以想象一个数据处理应用程序, 其中某些数据结构是共享的.

    在这些情况下, 通常允许从任何线程访问这些数据结构, 但要求一次只能有一个线程访问.

    通常会使用线程同步原语来防止对这些关键数据结构的并发使用.

    在这些情况下, 死锁的风险不是来自于使用特定的线程, 而是来自于一个线程可能因为另一个线程正在使用共享数据结构而无法进展, 但那个其他线程正在等待第一个线程做某事, 并且在那之前不会释放对该数据结构的锁.

    避免问题的最简单方法是尽可能避免阻塞. 避免像 First 这样的方法, 更喜欢它们的非阻塞等效方法, 如 FirstAsync. (如果有无法避免阻塞的情况, 尽量避免在持有保护对共享数据访问的锁时这样做. 如果真的也无法避免, 那么就没有简单的答案了. 不得不开始考虑锁层次结构来系统地避免死锁, 就像不使用Rx时一样.) 非阻塞风格是使用Rx的自然方式, 这是Rx在这些情况下帮助用户避免并发相关问题的主要方式.

调度器的高级特性

调度器提供了一些特性, 这些特性在编写需要与调度器交互的可观察源时主要有用.

使用调度器的最常见方式是在设置订阅时, 要么在创建可观察源时将它们作为参数提供, 要么将它们传递给 SubscribeOnObserveOn.

但是, 如果需要编写一个在自己选择的时间表上产生项目的可观察源(例如, 假设正在编写一个表示某种外部数据源的库, 并且想将其呈现为 IObservable<T>), 就可能需要使用这些更高级的特性.

  1. 传递状态

    IScheduler 定义的所有方法都接受一个 state 参数. 这是接口定义再次:

    public interface IScheduler
    {
        DateTimeOffset Now { get; }
        IDisposable Schedule<TState>(TState state,
                                     Func<IScheduler, TState, IDisposable> action);
    
        IDisposable Schedule<TState>(TState state,
                                     TimeSpan dueTime,
                                     Func<IScheduler, TState, IDisposable> action);
    
        IDisposable Schedule<TState>(TState state,
                                     DateTimeOffset dueTime,
                                     Func<IScheduler, TState, IDisposable> action);
    }
    

    调度器不关心这个 state 参数中的内容. 它只是在执行工作项时将其未修改地传递给回调.

    这为该回调提供了一种提供上下文的方式. 这并不是严格必需的: 作为 action 传递的委托可以包含需要的任何状态.

    最简单的方法是在lambda中捕获变量. 然而, 如果查看 Rx源代码, 会发现它通常不这样做.

    例如, Range 操作符的核心是一个名为 LoopRec 的方法, 如果查看[LoopRec 的源代码](https://github.com/dotnet/reactive/blob/95d9ea9d2786f6ec49a051c5cff47dc42591e54f/Rx.NET/Source/src/System.Reactive/Linq/Observable/Range.cs#L55 - L73), 会看到它包含这一行:

    var next = scheduler.Schedule(this, static (innerScheduler, @this) => @this.LoopRec(innerScheduler));
    

    从逻辑上讲, Range 只是一个循环, 为它产生的每个项目执行一次. 但是为了实现并发执行并避免堆栈溢出, 它通过将循环的每次迭代安排为单独的工作项来实现这一点. (该方法名为 LoopRec, 因为它在逻辑上是一个递归循环: 它通过调用 Schedule 启动, 并且每次调度器调用这个方法时, 它会再次调用 Schedule 以请求运行下一个项目. 这实际上不会在任何Rx的内置调度器中导致真正的递归, 即使是 ImmediateScheduler, 因为它们都会检测到这一点并安排在下一个项目在当前项目返回后运行. 但是如果编写了可能的最天真的调度器, 这实际上会在运行时导致递归, 如果尝试创建一个大序列, 可能会导致堆栈溢出. )

    请注意, 传递给 Schedule 的lambda被标记为 static. 这告诉C#编译器意图是不捕获任何变量, 并且任何尝试这样做都应该导致编译器错误.

    这样做的好处是编译器能够生成代码, 为每次调用重用相同的委托实例. 第一次运行时, 它将创建一个委托并将其存储在一个隐藏字段中.

    在后续的每次执行(无论是在同一范围的未来迭代中, 还是对于全新的范围实例)中, 它都可以一次又一次地使用相同的委托.

    这是可能的, 因为委托不捕获状态. 这避免了在每次循环时分配一个新对象.

    Rx库难道不能使用更直接的方法吗? 可以选择不使用状态, 将 null 状态传递给调度器, 然后在回调中丢弃传递给状态参数:

    // 不那么奇怪, 但效率较低:
    var next = scheduler.Schedule<object?>(null, (innerScheduler, _) => LoopRec(innerScheduler));
    

    这避免了前面示例中传递自己的 this 参数的奇怪之处: 现在以普通方式调用 LoopRec 实例成员: 隐式地使用了作用域中的 this 引用.

    所以这将创建一个捕获该隐式 this 引用的委托.

    这可行, 但效率低下: 它将迫使编译器生成分配几个对象的代码.

    它创建一个对象, 该对象有一个字段持有捕获的 this, 然后它需要创建一个不同的委托实例, 该实例有一个对该捕获对象的引用.

    Range 实现中实际的更复杂代码避免了这种情况. 它通过将lambda标记为 static 来禁用捕获. 这防止代码依赖隐式的 this 引用. 所以它必须安排 this 引用在回调中可用.

    而这正是 state 参数的用途. 它提供了一种传递每个工作项状态的方式, 以便在每次迭代时避免捕获变量的开销.

  2. 未来调度

    之前谈到了基于时间的操作符, 以及 ISchedule 中启用此功能的两个基于时间的成员, 但还没有展示如何使用它.

    这些允许安排一个动作在未来执行.

    (这依赖于进程在必要时长内继续运行. 如前面章节所述, 对于 System.Reactive 来说, 订阅基本上要立马处理的. 所以如果想安排某事在几天后执行, 可能需要Reaqtor这个库.

    可以通过调用接受 DateTimeOffsetSchedule 重载来指定动作应该被调用的确切时间点, 或者可以使用基于 TimeSpan 的重载指定等待动作被调用的时间段.

    可以像这样使用 TimeSpan 重载:

    var delay = TimeSpan.FromSeconds(1);
    Console.WriteLine("Before schedule at {0:o}", DateTime.Now);
    scheduler.Schedule(delay, () => Console.WriteLine("Inside schedule at {0:o}", DateTime.Now));
    Console.WriteLine("After schedule at  {0:o}", DateTime.Now);
    

    输出:

    Before schedule at 2012-01-01T12:00:00.000000+00:00
    After schedule at 2012-01-01T12:00:00.058000+00:00
    Inside schedule at 2012-01-01T12:00:01.044000+00:00
    

    这说明这里的调度是非阻塞的, 因为"之前" 和"之后" 的调用在时间上非常接近. (对于大多数调度器来说是这样, 但如前所述, ImmediateScheduler 工作方式不同. 在这种情况下, 会在"内部"消息之后看到"之后" 消息. 这就是为什么没有定时操作符默认使用 ImmediateScheduler 的原因.)

    还可以看到, 大约在动作被安排一秒后, 它被调用了.

    可以使用 DateTimeOffset 重载指定特定的时间点来安排任务.

    如果由于某种原因, 指定的时间点在过去, 那么动作将尽快被安排.

    请注意, 系统时钟的变化会使事情变得复杂. Rx的调度器确实做了一些调整来处理时钟漂移, 但系统时钟的突然大变化可能会导致一些短期混乱.

  3. 取消

    Schedule 的每个重载都返回一个 IDisposable, 调用 IDisposableDispose 方法将取消已安排的工作. 在前面的示例中, 安排工作在一秒后被调用. 可以通过释放返回值来取消工作.

    var delay = TimeSpan.FromSeconds(1);
    Console.WriteLine("Before schedule at {0:o}", DateTime.Now);
    var workItem = scheduler.Schedule(delay,
       () => Console.WriteLine("Inside schedule at {0:o}", DateTime.Now));
    Console.WriteLine("After schedule at  {0:o}", DateTime.Now);
    workItem.Dispose();
    

    输出:

    #+end_src
    

    Before schedule at 2012-01-01T12:00:00.000000+00:00 After schedule at 2012-01-01T12:00:00.058000+00:00

    #+end_src
    

    注意, 已安排的动作从未发生, 因为几乎立即取消了它.

    当用户在调度器能够调用已安排的动作方法之前取消它时, 该动作将从工作队列中移除.

    这就是在上面的示例中看到的情况. 也可以取消已经在运行的已安排工作, 这就是为什么工作项回调需要返回 IDisposable 的原因:

    如果在尝试取消工作项时工作已经开始, Rx会调用工作项回调返回的 IDisposableDispose 方法.

    这提供了一种让用户取消可能已经在运行的工作的方法. 这个工作可能是某种I/O, 繁重的计算, 或者也许是使用 Task 来执行一些工作.

    这个机制的作用是: 工作项回调需要已经返回, Rx才能调用它返回的 IDisposable.

    这个机制在实践中只有在工作在返回调度器后继续时才能使用.

    可以启动另一个线程, 使工作并发进行, 尽管通常尽量避免在Rx中创建线程.

    另一种可能性是, 如果已安排的工作项调用了某个异步API并在不等待其完成的情况下返回.

    如果那个API提供取消功能, 可以返回一个取消它的 IDisposable.

    为了说明取消操作的实际情况, 这个稍微不现实的示例将一些工作作为 Task 运行, 以便它能够在回调返回后继续.

    它只是通过执行自旋等待并将值添加到 list 参数来模拟一些工作.

    关键是创建一个 CancellationToken, 以便能够告诉任务希望它停止, 并且返回一个将这个令牌置于取消状态的 IDisposable.

    public IDisposable Work(IScheduler scheduler, List<int> list)
    {
        CancellationTokenSource tokenSource = new();
        CancellationToken cancelToken = tokenSource.Token;
        Task task = new(() =>
        {
            Console.WriteLine();
    
            for (int i = 0; i < 1000; i++)
            {
                SpinWait sw = new();
    
                for (int j = 0; j < 3000; j++) sw.SpinOnce();
    
                Console.Write(".");
    
                list.Add(i);
    
                if (cancelToken.IsCancellationRequested)
                {
                    Console.WriteLine("Cancellation requested");
    
                    // cancelToken.ThrowIfCancellationRequested();
    
                    return;
                }
            }
        }, cancelToken);
    
        task.Start();
    
        return Disposable.Create(tokenSource.Cancel);
    }
    

    这段代码安排上述代码并允许用户通过按Enter键取消处理工作.

    List<int> list = new();
    Console.WriteLine("Enter to quit:");
    IDisposable token = scheduler.Schedule(list, Work);
    Console.ReadLine();
    Console.WriteLine("Cancelling...");
    token.Dispose();
    Console.WriteLine("Cancelled");
    

    输出:

    Enter to quit:
    ........
    Cancelling...
    Cancelled
    Cancellation requested
    

    这里的问题是引入了对 Task 的显式使用, 所以以一种不受调度器控制的方式增加了并发性.

    Rx库通常通过接受调度器参数来允许控制引入并发性的方式. 如果目标是启用长时间运行的迭代工作, 可以避免启动新线程或任务, 而是使用Rx的递归调度器特性.

    在传递状态部分已经稍微谈到了这一点, 但有几种方法可以做到.

  4. 递归

    除了 IScheduler 方法, Rx还以扩展方法的形式定义了各种 Schedule 重载.

    其中一些接受一些看起来奇怪的委托作为参数. 特别注意每个 Schedule 扩展方法重载中的最后一个参数.

    public static IDisposable Schedule(
        this IScheduler scheduler,
        Action<Action> action)
    {...}
    public static IDisposable Schedule<TState>(
        this IScheduler scheduler,
        TState state,
        Action<TState, Action<TState>> action)
    {...}
    public static IDisposable Schedule(
        this IScheduler scheduler,
        TimeSpan dueTime,
        Action<Action<TimeSpan>> action)
    {...}
    public static IDisposable Schedule<TState>(
        this IScheduler scheduler,
        TState state,
        TimeSpan dueTime,
        Action<TState, Action<TState, TimeSpan>> action)
    {...}
    public static IDisposable Schedule(
        this IScheduler scheduler,
        DateTimeOffset dueTime,
        Action<Action<DateTimeOffset>> action)
    {...}
    public static IDisposable Schedule<TState>(
        this IScheduler scheduler,
        TState state, DateTimeOffset dueTime,
        Action<TState, Action<TState, DateTimeOffset>> action)
    {...}
    

    每个这些重载都接受一个委托" action" , 它允许递归地调用"action".

    这可能看起来是一个非常奇怪的签名, 但它允许以一种可能更简单的方式实现与在传递状态部分看到的类似的逻辑递归迭代方法.

    这个示例使用最简单的递归重载. 有一个可以递归调用的 Action.

    Action<Action> work = (Action self) =>
    {
        Console.WriteLine("Running");
        self();
    };
    var token = s.Schedule(work);
    
    Console.ReadLine();
    Console.WriteLine("Cancelling");
    token.Dispose();
    Console.WriteLine("Cancelled");
    

    输出:

    Enter to quit:
    Running
    Running
    Running
    Running
    Cancelling
    Cancelled
    Running
    

    请注意, 不必在委托中编写任何取消代码. Rx为处理了循环并检查取消. 由于每个单独的迭代都被安排为单独的工作项, 所以没有长时间运行的工作, 所以让调度器完全处理取消就足够了.

    这些重载与直接使用 IScheduler 方法的主要区别在于, 不需要直接将另一个回调传递给调度器.

    只需调用提供的 Action, 它就会安排再次调用方法. 它们还使在没有使用状态的情况下不必传递状态参数.

    如前面部分所述, 虽然这在逻辑上表示递归, 但Rx保护免受堆栈溢出. 调度器通过等待方法返回后再执行递归调用来实现这种递归风格.

    这就结束了对调度和线程的介绍. 接下来, 将研究相关的定时主题.

第十章:基于时间的序列

对于事件源, 时间通常很重要. 在某些情况下, 某些事件唯一令人感兴趣的信息可能是它发生的时间.

核心的 IObservable<T>IObserver<T> 接口在其方法签名中根本没有提到时间, 但它们不需要提到, 因为源可以决定何时调用观察者的 OnNext 方法.

订阅者知道事件何时发生, 因为它正在发生. 这并不总是处理时间最方便的方式, 因此Rx库提供了一些与时间相关的操作符.

已经看到了几个提供可选基于时间操作的操作符: BufferWindow. 本章将介绍各种与时间相关的操作符.

时间戳和时间间隔

由于可观察序列是异步的, 知道元素何时被接收可能会很方便.

显然, 订阅者总是可以使用 DateTimeOffset.Now, 但如果想在更大的查询中引用到达时间, Timestamp 扩展方法是一个方便的方法, 它会为每个元素附加一个时间戳.

它将来自源序列的元素包装在轻量级的 Timestamped<T> 结构中. Timestamped<T> 类型是一个结构体, 它公开了它包装的元素的值, 以及一个 DateTimeOffset, 表示 Timestamp 操作符何时接收到它.

在这个示例中, 创建了一个包含三个值的序列, 每个值相隔一秒, 然后将其转换为带时间戳的序列.

Observable.Interval(TimeSpan.FromSeconds(1))
         .Take(3)
         .Timestamp()
         .Dump("Timestamp");

Timestamped<T>ToString() 实现给了一个可读的输出.

Timestamp-->0@07/08/2023 10:03:58 +00:00
Timestamp-->1@07/08/2023 10:03:59 +00:00
Timestamp-->2@07/08/2023 10:04:00 +00:00
TimeStamp completed

可以看到值0, 1和2每个都相隔一秒产生.

Rx还提供了 TimeInterval.

它不是报告项目到达的时间, 而是报告项目之间的间隔(或者, 对于第一个元素, 是在订阅后它出现所花费的时间).

Timestamp 方法类似, 元素被包装在轻量级结构中.

但是, Timestamped<T> 用到达时间装饰每个元素, 而 TimeIntervalTimeInterval<T> 类型包装每个元素, 该类型添加了一个 TimeSpan.

可以修改前面的示例以使用 TimeInterval:

Observable.Interval(TimeSpan.FromSeconds(1))
         .Take(3)
         .TimeInterval()
         .Dump("TimeInterval");

输出现在报告元素之间的时间, 而不是它们被接收的时间:

#+end_src

Timestamp–>0@00:00:01.0183771 Timestamp–>1@00:00:00.9965679 Timestamp–>2@00:00:00.9908958 Timestamp completed

#+end_src

从输出中可以看出, 时间并不完全是一秒, 但非常接近.

其中一些将是 TimeInterval 操作符中的测量噪声, 但这种可变性的大部分可能来自 Observable.Interval 类.

调度程序能够满足其定时请求的精度总是有限的. 一些调度程序比其他调度程序引入更多的变化.

通过UI线程传递工作的调度程序最终受到该线程的消息循环响应速度的限制.

但即使在最有利的条件下, 调度程序也受到.NET不是为在实时系统中使用而构建的(Rx可以使用的大多数操作系统也不是)这一事实的限制.

对于本节中的所有操作符, 应该意识到在Rx中定时始终是尽力而为(best effort)的事情.

实际上, 定时的固有变化可以使 Timestamp 特别有用.

仅仅查看 DateTimeOffset.Now 的问题在于处理一个事件需要花费非零的时间, 所以在处理一个事件期间每次尝试读取当前时间时, 可能会看到略有不同的时间.

通过附加一次时间戳, 捕获了观察到事件的时间, 然后下游处理添加多少延迟就无关紧要了. 事件将被注释一个单一的, 固定的时间, 指示它何时通过 Timestamp.

延迟

Delay 扩展方法对整个序列进行时间偏移.

Delay 试图保持值之间的相对时间间隔.

它在这方面的精度不可避免地有限 – 它不会将定时精确到最近的纳秒.

确切的精度由使用的调度程序决定, 并且在高负载下通常会变得更差, 但通常会将定时精确到几毫秒内.

Delay 有多种重载, 提供了各种不同的指定时间偏移的方式. (对于所有选项, 可以可选地传递一个调度程序, 但如果调用不接受调度程序的重载, 它将默认使用 DefaultScheduler).

最直接的是传递一个 TimeSpan, 这将使序列延迟指定的时间量.

还有接受 DateTimeOffset 的延迟, 它将等待直到指定时间发生, 然后开始重放输入. (第二种基于绝对时间的方法本质上等同于 TimeSpan 重载. 可以通过从目标时间减去当前时间来获得 TimeSpan, 从而得到大致相同的效果, 除了 DateTimeOffset 版本试图处理在调用 Delay 和指定时间到达之间系统时钟的变化. )

为了展示 Delay 方法的实际效果, 这个示例创建了一个值相隔一秒的序列并给它们加上时间戳. 这将表明被延迟的不是订阅, 而是通知实际转发到最终订阅者.

IObservable<Timestamped<long>> source = Observable
   .Interval(TimeSpan.FromSeconds(1))
   .Take(5)
   .Timestamp();
IObservable<Timestamped<long>> delay = source.Delay(TimeSpan.FromSeconds(2));
delay.Subscribe(value =>
   Console.WriteLine(
     $"Item {value.Value} with timestamp {value.Timestamp} received at {DateTimeOffset.Now}"),
   () => Console.WriteLine("delay Completed"));

查看输出中的时间戳, 可以看到 Timestamp 捕获的时间都比订阅报告的时间早两秒:

Item 0 with timestamp 09/11/2023 17:32:20 +00:00 received at 09/11/2023 17:32:22 +00:00
Item 1 with timestamp 09/11/2023 17:32:21 +00:00 received at 09/11/2023 17:32:23 +00:00
Item 2 with timestamp 09/11/2023 17:32:22 +00:00 received at 09/11/2023 17:32:24 +00:00
Item 3 with timestamp 09/11/2023 17:32:23 +00:00 received at 09/11/2023 17:32:25 +00:00
Item 4 with timestamp 09/11/2023 17:32:24 +00:00 received at 09/11/2023 17:32:26 +00:00
delay Completed

请注意, Delay 不会对 OnError 通知进行时间偏移. 这些通知将立即传播.

采样

Sample 方法按照要求的任何间隔产生项目. 每次它产生一个值时, 它报告从的源中出现的最后一个值. 如果有一个产生数据的速率比需要的更高的源(假设有一个加速度计, 每秒报告100次测量, 但每秒只需要读取10次), Sample 提供了一种简单的方法来降低数据速率. 这个示例展示了 Sample 的实际应用.

IObservable<long> interval = Observable.Interval(TimeSpan.FromMilliseconds(150));
interval.Sample(TimeSpan.FromSeconds(1)).Subscribe(Console.WriteLine);

输出:

5
12
18

如果仔细查看这些数字, 可能已经注意到值之间的间隔每次都不相同.

选择了150毫秒的源间隔和1秒的采样间隔, 以突出采样中可能需要仔细处理的一个方面:

如果源产生项目的速率与采样率不完全对齐, 这可能意味着 Sample 引入了源中不存在的不规则性.

如果列出基础序列产生值的时间, 以及 Sample 取每个值的时间, 可以看到, 对于这些特定的时间安排, 采样间隔每3秒才与源时间安排对齐一次.

相对时间(毫秒)源值采样值
0
50
100
1500
200
250
3001
350
400
4502
500
550
6003
650
700
7504
800
850
9005
950
1000 5
10506
1100
1150
12007
1250
1300
13508
1400
1450
15009
1550
1600
165010
1700
1750
180011
1850
1900
195012
2000 12
2050
210013
2150
2200
225014
2300
2350
240015
2450
2500
255016
2600
2650
270017
2750
2800
285018
2900
2950
30001919

由于第一次采样是在源发出5之后, 并且在它将产生6之前的间隙的三分之二处进行的, 所以在某种意义上"正确" 的当前值类似于5.67, 但 Sample 不会尝试任何这样的插值.

它只是报告从源中出现的最后一个值. 一个相关的结果是, 如果采样间隔足够短, 以至于要求 Sample 报告值的速度比它们从源中出现的速度快, 它将只是重复值.

节流

Throttle 扩展方法提供了一种针对以可变速率产生值并且有时太快的序列的保护.

Sample 方法类似, Throttle 将在一段时间内返回最后采样的值. 然而, 与 Sample 不同的是, Throttle 的周期是一个滑动窗口.

每次 Throttle 接收到一个值时, 窗口就会重置. 只有在经过一段时间后, 最后一个值才会被传播.

这意味着 Throttle 方法仅对以可变速率产生值的序列有用.

以恒定速率产生值的序列(如 IntervalTimer)如果产生值的速度比节流周期快, 它们的所有值都将被抑制, 而如果产生值的速度比节流周期慢, 它们的所有值都将被传播.

// 忽略来自可观察序列的值, 如果在dueTime之前有另一个值跟随.
public static IObservable<TSource> Throttle<TSource>(
    this IObservable<TSource> source,
    TimeSpan dueTime)
{...}
public static IObservable<TSource> Throttle<TSource>(
    this IObservable<TSource> source,
    TimeSpan dueTime,
    IScheduler scheduler)
{...}

可以将 Throttle 应用于使用实时搜索功能, 该功能在输入时提供建议.

通常希望等到用户停止输入一会儿后再搜索建议, 因为否则, 可能会连续启动几次搜索, 每次用户按下另一个键时就取消上一次搜索.

只有在有一个停顿之后, 才能用他们到目前为止输入的内容执行搜索. Throttle 非常适合这种情况, 因为如果源产生值的速度比指定速率快, 它根本不会允许任何事件通过.

请注意, RxJS库决定使其版本的节流工作方式不同, 所以如果同时使用Rx.NET和RxJS, 请注意它们的工作方式不同.

在RxJS中, 当源超过指定速率时, 节流不会完全关闭: 它只是丢弃足够的项目, 以使输出永远不超过指定速率.

所以RxJS的节流实现是一种速率限制器, 而Rx.NET的 Throttle 更像是一个自重置断路器, 在过载期间完全关闭.

超时

Timeout 操作符方法允许在源在给定时间段内没有产生任何通知时用错误终止一个序列. 可以将时间段指定为一个滑动窗口(使用 TimeSpan), 或者指定序列必须完成的绝对时间(通过提供 DateTimeOffset).

// 如果值之间的最大持续时间过去, 则返回可观察序列或TimeoutException.
public static IObservable<TSource> Timeout<TSource>(
    this IObservable<TSource> source,
    TimeSpan dueTime)
{...}
public static IObservable<TSource> Timeout<TSource>(
    this IObservable<TSource> source,
    TimeSpan dueTime,
    IScheduler scheduler)
{...}
// 如果dueTime过去, 则返回可观察序列或TimeoutException.
public static IObservable<TSource> Timeout<TSource>(
    this IObservable<TSource> source,
    DateTimeOffset dueTime)
{...}
public static IObservable<TSource> Timeout<TSource>(
    this IObservable<TSource> source,
    DateTimeOffset dueTime,
    IScheduler scheduler)
{...}

如果提供一个 TimeSpan 并且在该时间段内没有产生值, 那么序列将因 TimeoutException 而失败.

var source = Observable.Interval(TimeSpan.FromMilliseconds(100))
                      .Take(5)
                      .Concat(Observable.Interval(TimeSpan.FromSeconds(2)));
var timeout = source.Timeout(TimeSpan.FromSeconds(1));
timeout.Subscribe(
    Console.WriteLine,
    Console.WriteLine,
    () => Console.WriteLine("Completed"));

最初, 这个序列产生值的频率足以满足 Timeout, 所以 Timeout 返回的可观察对象只是转发来自源的项目. 但是一旦源停止产生项目, 就会得到一个 OnError:

0
1
2
3
4
System.TimeoutException: The operation has timed out.

或者, 可以给 Timeout 传递一个绝对时间; 如果序列在该时间之前没有完成, 它将产生一个错误.

var dueDate = DateTimeOffset.UtcNow.AddSeconds(4);
var source = Observable.Interval(TimeSpan.FromSeconds(1));
var timeout = source.Timeout
#+begin_src csharp :results pp :exports both
var dueDate = DateTimeOffset.UtcNow.AddSeconds(4);
var source = Observable.Interval(TimeSpan.FromSeconds(1));
var timeout = source.Timeout(dueDate);
timeout.Subscribe(
    Console.WriteLine,
    Console.WriteLine,
    () => Console.WriteLine("Completed"));

输出:

0
1
2
System.TimeoutException: The operation has timed out.

还有其他 Timeout 重载, 使能够在超时时替换为另一个序列.

// 如果值之间的最大持续时间过去, 则返回源可观察序列或另一个可观察序列.
public static IObservable<TSource> Timeout<TSource>(
    this IObservable<TSource> source,
    TimeSpan dueTime,
    IObservable<TSource> other)
{...}
public static IObservable<TSource> Timeout<TSource>(
    this IObservable<TSource> source,
    TimeSpan dueTime,
    IObservable<TSource> other,
    IScheduler scheduler)
{...}
// 如果dueTime过去, 则返回源可观察序列或另一个可观察序列.
public static IObservable<TSource> Timeout<TSource>(
    this IObservable<TSource> source,
    DateTimeOffset dueTime,
    IObservable<TSource> other)
{...}
public static IObservable<TSource> Timeout<TSource>(
    this IObservable<TSource> source,
    DateTimeOffset dueTime,
    IOobservable<TSource> other,
    IScheduler scheduler)
{...}

正如现在所看到的, Rx提供了在响应式范式中管理时间的功能.

数据可以被定时, 节流或采样以满足特定需求. 整个序列可以使用延迟功能在时间上进行偏移, 并且可以使用 Timeout 操作符断言数据的及时性.

接下来将看看Rx与外部世界的边界.

第十一章:离开Rx的世界

可观察序列是一种有用的结构, 特别是当可以使用LINQ对其编写复杂查询时.

尽管可观察序列有诸多好处, 但有时必须离开 IObservable<T> 范式.

当需要与现有的非基于Rx的API(例如使用事件或 Task<T> 的API)集成时, 这是必要的.

如果发现这样更容易进行测试, 或者通过在可观察范式和更熟悉的范式之间切换来学习Rx更容易, 那么不使用Rx也是完全可行的.

Rx的组合性质是其强大功能的关键, 但当需要与不理解Rx的组件集成时, 这可能看起来像是一个问题.

到目前为止, 所看到的大多数Rx库功能都将其输入和输出表示为可观察对象.

应该如何获取现实世界中的事件源并将其转换为可观察对象呢? 应该如何对可观察对象的输出进行有意义的操作呢?

已经看到了这些问题的一些答案. ["创建可观察序列"章节](03~CreatingObservableSequences~.md)展示了创建可观察源的各种方法.

但是, 当涉及到处理从 IObservable<T> 中产生的元素时, 真正看到的只是如何实现 IObserver<T>, 以及如何使用基于回调的 Subscribe 扩展方法来订阅 IObservable<T>.

在本章中, 将研究Rx中允许离开 IObservable<T> 世界的方法, 以便可以根据Rx源发出的通知采取行动.

asyncawait 集成

可以对任何 IObservable<T> 使用C#的 await 关键字.

之前在 FirstAsync 中看到过这种用法:

long v = await Observable.Timer(TimeSpan.FromSeconds(2)).FirstAsync();
Console.WriteLine(v);

虽然 await 最常用于 Task, Task<T>ValueTask<T>, 但它实际上是一种可扩展的语言特性.

通过提供一个名为 GetAwaiter 的方法(通常作为扩展方法)以及 GetAwaiter 要返回的合适类型, 为C#提供 await 所需的功能, 就可以使 await 或多或少地适用于任何类型.

这正是Rx所做的. 如果源文件包含 using System.Reactive.Linq; 指令, 将提供一个合适的扩展方法, 这样就可以 await 任何任务.

其实际工作方式是, 相关的 GetAwaiter 扩展方法将 IObservable<T> 包装在一个 AsyncSubject<T> 中, 它提供了C#支持 await 所需的一切.

这些包装器的工作方式是, 每次针对 IObservable<T> 执行 await 时, 都会调用 Subscribe.

如果源通过调用其观察者的 OnError 报告错误, Rx的 await 集成会将任务置于错误状态, 以便 await 重新抛出异常.

序列可能为空. 它们可能在从未调用过 OnNext 的情况下调用 OnCompleted.

然而, 由于无法从源的类型判断它是否为空, 这与 await 范式不太匹配.

对于任务, 可以在编译时通过查看是在等待 Task 还是 Task<T> 来知道是否会得到结果, 因此编译器能够知道特定的 await 表达式是否会产生值.

但是当 await 一个 IObservable<T> 时, 在编译时没有区别, 所以当 await 时, Rx报告序列为空的唯一方法是抛出一个 InvalidOperationException, 报告序列不包含任何元素.

在第3章的 AsyncSubject<T> 部分中, AsyncSubject<T> 只报告从其源中出现的最终值.

所以如果 await 一个报告多个元素的序列, 除了最后一个项目之外的所有项目都将被忽略.

如果想看到所有项目, 但仍然想使用 await 来处理完成和错误怎么办?

ForEachAsync

ForEachAsync 方法支持 await, 但它提供了一种处理每个元素的方法.

可以将其视为上一节中描述的 await 行为和基于回调的 Subscribe 的混合体.

仍然可以使用 await 来检测完成和错误, 但提供一个回调, 使能够处理每个项目:

IObservable<long> source = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5);
await source.ForEachAsync(i => Console.WriteLine($"received {i} @ {DateTime.Now}"));
Console.WriteLine($"finished @ {DateTime.Now}");

输出:

received 0 @ 02/08/2023 07:53:46
received 1 @ 02/08/2023 07:53:47
received 2 @ 02/08/2023 07:53:48
received 3 @ 02/08/2023 07:53:49
received 4 @ 02/08/2023 07:53:50
finished @ 02/08/2023 07:53:50

finished 行在最后, 正如期望的那样.

让将其与 Subscribe 扩展方法进行比较, Subscribe 扩展方法也允许提供一个用于处理项目的单个回调:

IObservable<long> source = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5);
source.Subscribe(i => Console.WriteLine($"received {i} @ {DateTime.Now}"));
Console.WriteLine($"finished @ {DateTime.Now}");

如输出所示, Subscribe 立即返回. 每个项目回调像以前一样被调用, 但这一切都在稍后发生:

finished @ 02/08/2023 07:55:42
received 0 @ 02/08/2023 07:55:43
received 1 @ 02/08/2023 07:55:44
received 2 @ 02/08/2023 07:55:45
received 3 @ 02/08/2023 07:55:46
received 4 @ 02/08/2023 07:55:47

这在执行一些工作然后退出的批处理式程序中很有用.

在这种情况下使用 Subscribe 的问题是, 程序可能很容易在未完成开始的工作之前退出.

使用 ForEachAsync 很容易避免这种情况, 因为只需使用 await 来确保方法在工作完成之前不会完成.

当直接对 IObservable<T> 使用 await, 或者通过 ForEachAsync 使用时, 本质上是选择以常规方式处理序列完成, 而不是响应式方式.

错误和完成处理不再是回调驱动的–Rx为提供 OnCompletedOnError 处理程序, 而是通过C#的等待机制来表示这些. (具体来说, 当直接 await 一个源时, Rx提供一个自定义等待器, 当使用 ForEachAsync 时, 它只返回一个 Task. )

在某些情况下, Subscribe 会阻塞直到其源完成. Observable.Return 默认会这样做, Observable.Range 也是如此.

可以尝试通过指定不同的调度程序来使最后一个示例这样做:

// 不要这样做!
IObservable<long> source =
   Observable.Interval(TimeSpan.FromSeconds(1), ImmediateScheduler.Instance)
            .Take(5);
source.Subscribe(i => Console.WriteLine($"received {i} @ {DateTime.Now}"));
Console.WriteLine($"finished @ {DateTime.Now}");

然而, 这凸显了非异步阻塞调用的危险: 尽管看起来应该可行, 但在当前版本的Rx中, 实际上会发生死锁.

Rx认为 ImmediateScheduler 不适合基于定时器的操作, 这就是为什么它不是默认的, 这个场景很好地说明了为什么是这样. (根本问题是取消计划工作项的唯一方法是对 Schedule 调用返回的对象调用 Dispose. 根据定义, ImmediateScheduler 在完成工作之前不会返回, 这意味着它实际上无法支持取消. 所以对 Interval 的调用实际上创建了一个定期计划的工作项, 无法取消, 因此注定要永远运行. )

这就是需要 ForEachAsync 的原因.

看起来可以通过巧妙地使用调度程序来获得相同的效果, 但实际上, 如果需要等待异步事件发生, 使用 await 总是比使用阻塞调用线程的方法更好.

ToEnumerable

到目前为止探索的两种机制将Rx的回调机制转换为 await 启用的更常规方法来处理完成和错误, 但仍然需要提供一个回调才能处理每个单独的元素.

但是 ToEnumerable 扩展方法更进一步: 它允许使用常规的 foreach 循环消费整个序列:

var period = TimeSpan.FromMilliseconds(200);
IObservable<long> source = Observable.Timer(TimeSpan.Zero, period).Take(5);
IEnumerable<long> result = source.ToEnumerable();
foreach (long value in result)
{
    Console.WriteLine(value);
}
Console.WriteLine("done");

输出:

0
1
2
3
4
done

源可观察序列将在开始枚举序列时(即延迟地)被订阅.

如果还没有可用的元素, 或者已经消费了到目前为止产生的所有元素, foreach 对枚举器的 MoveNext 的调用将阻塞, 直到源产生一个元素.

所以这种方法依赖于源能够从其他线程生成元素.

在这个例子中, Timer 默认使用 DefaultScheduler , 它在线程池上运行定时工作. 如果序列产生值的速度比消费它们的速度快, 它会排队. 这意味着在使用 ToEnumerable 时, 从技术上讲可以在同一个线程上消费和生成项目, 但这将依赖于生产者始终保持领先. 这将是一种危险的方法, 因为如果 foreach 循环赶上了, 就会发生死锁.

awaitForEachAsync 一样, 如果源报告错误, 将抛出异常, 所以可以像这个例子所示的那样使用普通的C#异常处理:

try
{
    foreach (long value in result)
    {
        Console.WriteLine(value);
    }
}
catch (Exception e)
{
    Console.WriteLine(e.Message);
}

转换为单个集合

有时会希望将源产生的所有项目作为单个列表. 例如, 也许不能只单独处理元素, 因为有时需要引用之前收到的元素.

以下各节中描述的四个操作将所有项目收集到单个集合中. 它们仍然都产生一个 IObservable<T> (例如, 一个 IObservable<int[]> 或一个 IObservable<Dictionary<string, long>>), 但这些都是单元素可观察对象, 可以使用 await 关键字获取这个单个输出.

ToArray和ToList

ToArrayToList 接受一个可观察序列, 并分别将其打包成一个数组或 List<T> 的实例.

与所有单个集合操作一样, 这些方法返回一个可观察源, 该源等待其输入序列完成, 然后将数组或列表作为单个值产生, 之后立即完成.

这个例子使用 ToArray 将源序列中的所有5个元素收集到一个数组中, 并使用 awaitToArray 返回的序列中提取该数组:

TimeSpan period = TimeSpan.FromMilliseconds(200);
IObservable<long> source = Observable.Timer(TimeSpan.Zero, period).Take(5);
IObservable<long[]> resultSource = source.ToArray();
long[] result = await resultSource;
foreach (long value in result)
{
    Console.WriteLine(value);
}

输出:

0
1
2
3
4

由于这些方法仍然返回可观察序列, 也可以使用正常的Rx Subscribe 机制, 或者将它们用作其他操作符的输入.

如果源产生值然后出错, 将不会收到任何这些值.

到那时为止收到的所有值都将被丢弃, 并且操作符将调用其观察者的 OnError (在上面的例子中, 这将导致 await 抛出异常).

所有四个操作符(ToArray, ToList, ToDictionaryToLookup)都这样处理错误.

ToDictionary和ToLookup

Rx可以使用 ToDictionaryToLookup 方法将可观察序列打包成字典或查找表. 这两个方法都采用与 ToArrayToList 方法相同的基本方法: 它们返回一个单元素序列, 该序列在输入源完成后产生集合.

ToDictionary 提供了四个重载, 它们直接对应于LINQ to Objects为 IEnumerable<T> 定义的 ToDictionary 扩展方法:

// 根据指定的键选择器函数, 比较器和元素选择器函数从可观察序列创建字典.
public static IObservable<IDictionary<TKey, TElement>> ToDictionary<TSource, TKey, TElement>(
    this IObservable<TSource> source,
    Func<TSource, TKey> keySelector,
    Func<TSource, TElement> elementSelector,
    IEqualityComparer<TKey> comparer)
{...}
// 根据指定的键选择器函数和元素选择器函数从可观察序列创建字典.
public static IObservable<IDictionary<TKey, TElement>> ToDictionary<TSource, TKey, TElement>(
    this IObservable<TSource> source,
    Func<TSource, TKey> keySelector,
    Func<TSource, TElement> elementSelector)
{...}
// 根据指定的键选择器函数和比较器从可观察序列创建字典.
public static IObservable<IDictionary<TKey, TSource>> ToDictionary<TSource, TKey>(
    this IObservable<TSource> source,
    Func<TSource, TKey> keySelector,
    IEqualityComparer<TKey> comparer)
{...}
// 根据指定的键选择器函数从可观察序列创建字典.
public static IObservable<IDictionary<TKey, TSource>> ToDictionary<TSource, TKey>(
    this IObservable<TSource> source,
    Func<TSource, TKey> keySelector)
{...}

ToLookup 扩展提供了几乎相同的重载, 区别在于返回类型(当然还有名称).

它们都返回一个 IObservable<ILookup<TKey, TElement>>. 与LINQ to Objects一样, 字典和查找表之间的区别在于 ILookup<TKey, TElement>> 接口允许每个键有任意数量的值, 而字典将每个键映射到一个值.

ToTask

虽然Rx直接支持对 IObservable<T> 使用 await, 但有时获取表示 IObservable<T>Task<T> 会很有用.

这很有用, 因为一些API期望一个 Task<T>. 可以对任何 IObservable<T> 调用 ToTask(), 这将订阅该可观察对象, 返回一个 Task<T>, 该任务将在任务完成时完成, 将序列的最终输出作为任务的结果.

如果源完成时没有产生元素, 任务将进入错误状态, 并带有一个 InvalidOperation 异常, 抱怨输入序列不包含任何元素.

可以可选地传递一个取消令牌. 如果在可观察序列完成之前取消它, Rx将取消对源的订阅, 并将任务置于取消状态.

这是一个使用 ToTask 操作符的简单示例. 请注意, ToTask 方法位于 System.Reactive.Threading.Tasks 命名空间中.

IObservable<long> source = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5);
Task<long> resultTask = source.ToTask();
long result = await resultTask; // 将花费5秒.
Console.WriteLine(result);

输出:

#+end_src

4

#+end_src

如果源序列调用 OnError, Rx会使用提供的异常将任务置于错误状态.

一旦有了任务, 当然可以使用TPL的所有功能, 如延续.

ToEvent

正如可以使用 FromEventPattern 将事件作为可观察序列的源一样,

也可以使用 ToEvent 扩展方法使可观察序列看起来像一个标准的.NET事件.

// 将可观察序列公开为具有.NET事件的对象.
public static IEventSource<unit> ToEvent(this IObservable<Unit> source)
{...}
// 将可观察序列公开为具有.NET事件的对象.
public static IEventSource<TSource> ToEvent<TSource>(this IObservable<TSource> source)
{...}

ToEvent 方法返回一个 IEventSource<T>, 它有一个成员: OnNext 事件.

public interface IEventSource<T>
{
    event Action<T> OnNext;
}

当使用 ToEvent 方法转换可观察序列时, 可以通过提供一个 Action<T> 来订阅, 这里使用lambda表达式.

var source = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5);
var result = source.ToEvent();
result.OnNext += val => Console.WriteLine(val);

输出:

0
1
2
3
4

虽然这是将Rx通知转换为事件的最简单方法, 但它不遵循标准的.NET事件模式.

如果想要遵循标准模式, 需要使用稍微不同的方法.

  1. ToEventPattern

    通常, .NET事件会向其处理程序提供 senderEventArgs 参数.

    在上面的例子中, 只得到值. 想将序列公开为遵循标准模式的事件, 将需要使用 ToEventPattern.

    // 将可观察序列公开为具有.NET事件的对象.
    public static IEventPatternSource<TEventArgs> ToEventPattern<TEventArgs>(
        this IObservable<EventPattern<TEventArgs>> source)
        where TEventArgs : EventArgs
    

    ToEventPattern 将接受一个 IObservable<EventPattern<TEventArgs>> 并将其转换为一个 IEventPatternSource<TEventArgs>. 这些类型的公共接口非常简单.

    public class EventPattern<TEventArgs> : IEquatable<EventPattern<TEventArgs>>
        where TEventArgs : EventArgs
    {
        public EventPattern(object sender, TEventArgs e)
        {
            this.Sender = sender;
            this.EventArgs = e;
        }
        public object Sender { get; private set; }
        public TEventArgs EventArgs { get; private set; }
        //...相等性重载
    }
    public interface IEventPatternSource<TEventArgs> where TEventArgs : EventArgs
    {
        event EventHandler<TEventArgs> OnNext;
    }
    

    要使用这个, 需要一个合适的 EventArgs 类型. 可以使用.NET运行时库提供的一个, 但如果没有,可以编写自己的: EventArgs 类型:

    public class MyEventArgs : EventArgs
    {
        private readonly long _value;
    
        public MyEventArgs(long value)
        {
            _value = value;
        }
        public long Value
        {
            get { return _value; }
        }
    }
    

    然后, 可以通过使用 Select 进行简单转换从Rx中使用它:

    IObservable<EventPattern<MyEventArgs>> source =
       Observable.Interval(TimeSpan.FromSeconds(1))
                .Select(i => new EventPattern<MyEventArgs>(this, new MyEventArgs(i)));
    

    现在有了一个兼容的序列, 可以使用 ToEventPattern, 进而使用标准事件处理程序.

    IEventPatternSource<MyEventArgs> result = source.ToEventPattern();
    result.OnNext += (sender, eventArgs) => Console.WriteLine(eventArgs.Value);
    

    现在知道了如何回到.NET事件, 让休息一下, 记住为什么Rx是一个更好的模型.

    • 事件难以组合
    • 事件不能作为参数传递或存储在字段中
    • 事件不能随着时间的推移轻松查询
    • 事件没有报告错误的标准模式
    • 事件没有指示值序列结束的标准模式
    • 事件对管理并发或多线程应用程序几乎没有帮助

Do

生产系统的非功能性需求通常要求高可用性, 质量监控功能和低缺陷解决前置时间.

日志记录, 调试, 检测和日志记录是实现非功能性需求的常见实现选择.

为了实现这些, 通常可以 插入 到Rx查询中, 使它们在正常操作的副作用中提供监控和诊断信息.

Do 扩展方法允许注入副作用行为. 从Rx的角度来看, Do 似乎什么都不做: 可以将它应用于任何 IObservable<T>, 它返回另一个 IObservable<T>, 该对象报告与其源完全相同的元素以及错误或完成情况.

然而, 它的各种重载接受看起来与 Subscribe 相同的回调参数: 可以为单个项目, 完成和错误提供回调.

Subscribe 不同, Do 不是最终目的地–Do 回调看到的所有内容也将转发给 Do 的订阅者.

这使得它对于日志记录和类似的检测很有用, 因为可以使用它来报告信息如何在Rx查询中流动, 而不改变该查询的行为.

使用 Do 会有性能影响. 如果提供给 Do 的回调执行任何可能改变它所属的Rx查询的输入的操作, 将创建一个反馈循环, 使行为更难理解.

让首先定义一些可以在示例中使用的日志记录方法:

private static void Log(object onNextValue)
{
    Console.WriteLine($"Logging OnNext({onNextValue}) @ {DateTime.Now}");
}
private static void Log(Exception error)
{
    Console.WriteLine($"Logging OnError({error}) @ {DateTime.Now}");
}
private static void Log()
{
    Console.WriteLine($"Logging OnCompleted()@ {DateTime.Now}");
}

这段代码使用 Do 通过上面的方法引入一些日志记录.

IObservable<long> source = Observable.Interval(TimeSpan.FromSeconds(1)).Take(3);
IObservable<long> loggedSource = source.Do(
    i => Log(i),
    ex => Log(ex),
    () => Log());
loggedSource.Subscribe(
    Console.WriteLine,
    () => Console.WriteLine("completed"));

输出:

Logging OnNext(0) @ 01/01/2012 12:00:00
0
Logging OnNext(1) @ 01/01/2012 12:00:01
1
Logging OnNext(2) @ 01/01/2012 12:00:02
2
Logging OnCompleted() @ 01/01/2012 12:00:02
completed

因为 Do 是查询的一部分, 它必然比 Subscribe 更早看到值, Subscribe 是链中的最后一环.

这就是为什么日志消息出现在 Subscribe 回调产生的行之前.

喜欢把 Do 方法想象成对一个序列的监听. 它有能力监听序列而不修改它.

Subscribe 一样, 除了传递回调之外, 还有重载允许为想要的 OnNext, OnErrorOnCompleted 通知中的任何一个提供回调, 或者可以传递一个 IObserver<T>Do.

用AsObservable封装

糟糕的封装是开发人员为错误留下隐患的一种方式. 以下是一些粗心导致抽象泄漏的场景.

第一个例子乍一看可能无害, 但有很多问题.

public class UltraLeakyLetterRepo
{
    public ReplaySubject<string> Letters { get; }
    public UltraLeakyLetterRepo()
    {
        Letters = new ReplaySubject<string>();
        Letters.OnNext("A");
        Letters.OnNext("B");
        Letters.OnNext("C");
    }
}

在这个例子中, 将可观察序列公开为一个属性.

使用了 ReplaySubject<string>, 以便每个订阅者在订阅时都会收到所有值.

然而, 在 Letters 属性的公共类型中暴露这个实现选择是糟糕的封装, 因为使用者可以调用 OnNext / OnError / OnCompleted.

为了堵住这个漏洞, 可以简单地将公开可见的属性类型设为 IObservable<string>.

public class ObscuredLeakinessLetterRepo
{
    public IObservable<string> Letters { get; }
    public ObscuredLeakinessLetterRepo()
    {
        var letters = new ReplaySubject<string>();
        letters.OnNext("A");
        letters.OnNext("B");
        letters.OnNext("C");
        this.Letters = letters;
    }
}

这是一个重大改进: 编译器不会让使用这个源的实例的人编写 source.Letters.OnNext("1").

所以API表面区域正确地封装了实现细节, 但可以注意到没有什么可以阻止使用者将结果强制转换回 ISubject<string>, 然后调用他们喜欢的任何方法.

在这个例子中, 看到外部代码将他们的值推送到序列中.

var repo = new ObscuredLeakinessLetterRepo();
IObservable<string> good = repo.GetLetters();

good.Subscribe(Console.WriteLine);
// 调皮一下
if (good is ISubject<string> evil)
{
    // 太调皮了, 1不是一个字母!
    evil.OnNext("1");
}
else
{
    Console.WriteLine("could not sabotage");
}

输出:

A
B
C
1

如果想积极地防止它, 解决这个问题很简单.

通过应用 AsObservable 扩展方法, 可以修改构造函数中设置 this.Letters 的行, 将主题包装在一个只实现 IObservable<T> 的类型中.

this.Letters = letters.AsObservable();

输出:

#+end_src

A B C could not sabotage

#+end_src

虽然在这些例子中使用了" 邪恶" 和" 破坏" 这样的词, 但导致问题的 往往是疏忽而不是恶意.

首先出错的是设计有漏洞类的程序员. 设计接口很难, 但应该尽力通过给使用者提供可发现和一致的类型来引导他们进入 成功的坑.

如果减少类型的表面区域以仅暴露希望使用者使用的功能, 类型就会变得更容易发现.

在这个例子中, 减少了类型的表面区域. 通过为属性选择合适的面向公众的类型, 然后使用 AsObservable 方法防止对底层类型的访问来做到这一点.

在本章中看到的这组方法完成了在[" 创建序列" 章节](03~CreatingObservableSequences~.md)中开始的循环.

现在有了进入和离开Rx世界的方法. 在选择进入和离开 IObservable<T> 时要小心.

最好不要来回转换–有一些基于Rx的处理, 然后是一些更常规的代码, 然后将结果重新连接回Rx, 这会很快使代码库变得混乱, 并且可能表明存在设计缺陷.

通常最好将所有Rx逻辑放在一起, 这样只需要与外部世界集成两次: 一次用于输入, 一次用于输出.

第十二章:错误处理操作符

异常时有发生. 有些异常本质上是可以避免的, 它们仅们代码中的错误而出现.

例如, CLR进入 DivideByZeroException 的情况, 那就是做错了.

但也有许多异常无法通过防御性编码来防止. 例如, 与I/O或网络故障相关的异常, 如 FileNotFoundExceptionTimeoutException, 可能是由代码控制范围之外的环境因素引起的.

在这些情况下, 需要妥善处理异常. 处理方式将取决于上下文.

向用户提供某种错误消息可能是合适的; 在某些场景中, 记录错误可能是更合适的响应.

如果失败可能是暂时的, 可以通过重试失败的操作来尝试恢复.

IObserver<T> 接口定义了 OnError 方法, 以便源可以报告错误, 但由于这会终止序列, 它没有提供直接的方法来确定下一步该怎么做.

然而, Rx提供了一些操作符, 它们提供了各种错误处理机制.

Catch

Rx定义了一个 Catch 操作符. 这个名字故意让人想起C# 的 try / catch 语法, 因为它允许以与从代码正常执行中出现的异常类似的方式处理来自Rx源的错误.

它可以通过两种不同的方式工作. 以只提供一个函数, Rx会将错误传递给这个函数, 这个函数可以返回一个 IObservable<T>, Catch 将转发来自该函数的元素, 而不是原始源的元素.

或者, 可以不传递函数, 而是提供一个或多个额外的序列, 每次当前序列失败时, Catch 将移动到下一个序列.

  1. 检查异常

    Catch 有一个重载, 能够在源产生错误时提供一个要调用的处理程序:

    public static IObservable<TSource> Catch<TSource, TException>(
        this IObservable<TSource> source,
        Func<TException, IObservable<TSource>> handler)
        where TException : Exception
    

    这在概念上与C# 的 catch 块非常相似: 可以编写代码来查看异常, 然后决定如何继续.

    catch 块一样, 可以决定对哪些类型的异常感兴趣.

    例如, 可能知道源有时会产生 TimeoutException, 在这种情况下, 可能只想返回一个空序列, 而不是错误:

    IObservable<int> result = source.Catch<int, TimeoutException>(_ => Observable.Empty<int>());
    

    只有当异常是指定类型(或派生自该类型)时, Catch 才会调用函数.

    如果序列以不能转换为 TimeoutExceptionException 终止, 那么错误将不会被捕获, 并将传递给订阅者.

    这个例子返回 Observable.Empty<int>(). 这在概念上类似于在C# 中" 吞下" 一个异常, 即选择不采取任何行动.

    对于预期的异常, 这可能是一个合理的响应, 但通常对基本的 Exception 类型这样做是个坏主意.

    最后这个例子忽略了它的输入, 因为它只对异常类型感兴趣.

    可以自由地检查异常, 并对 Catch 应该产生什么做出更精细的决定:

    IObservable<string> result = source.Catch(
        (FileNotFoundException x) => x.FileName == "settings.txt"
           ? Observable.Return(DefaultSettings) : Observable.Throw<string>(x));
    

    这为一个特定的文件提供了特殊处理, 否则将重新抛出异常.

    在这里返回 Observable.Throw<T>(x) (其中 x 是原始异常)在概念上类似于在 catch 块中编写 throw. (在C# 中, throw;throw x; 之间有一个重要的区别, 因为它改变了异常上下文的捕获方式, 但在Rx中, OnError 不捕获堆栈跟踪, 所以没有等效的区别. )

    当然, 也可以抛出一个完全不同的异常. 可以返回任何 IObservable<T>, 只要它的元素类型与源的相同.

  2. 回退

    Catch 的其他重载提供的区分行为较少: 可以提供一个或多个额外的序列, 每当当前源失败时, 异常将被忽略, Catch 将简单地移动到下一个序列.

    由于永远不会知道异常是什么, 这种机制无法知道发生的异常是预期的还是完全意外的, 所以通常要避免这种形式.

    但为了完整起见, 下面是如何使用它:

    IObservable<string> settings = settingsSource1.Catch(settingsSource2);
    

    这种形式只提供了一个回退. 还有一个静态的 Observable.Catch 方法, 它接受一个 params 数组, 所以可以传递任意数量的源. 这与前面的例子完全等效:

    IObservable<string> settings = Observable.Catch(settingsSource1, settingsSource2);
    

    还有一个重载接受一个 IEnumerable<IObservable<T>>.

    如果任何一个源在没有报告异常的情况下到达末尾, Catch 也会立即报告完成, 并且不会订阅任何后续的源.

    如果最后一个源报告了一个异常, Catch 将没有更多的源可以回退, 所以在这种情况下它不会捕获异常. 它将把最后的异常转发给它的订阅者.

Finally

与C# 中的 finally 块类似, Rx能够在序列完成时执行一些代码, 无论它是自然完成还是失败.

Rx添加了第三种完成模式, 在 catch / finally 中没有完全等效的模式: 订阅者可能在源有机会完成之前取消订阅. (这在概念上类似于使用 break 提前终止 foreach. ) Finally 扩展方法接受一个 Action 作为参数.

这个 Action 将在序列终止时被调用, 无论调用的是 OnCompleted 还是 OnError. 如果在完成之前处置了订阅, 它也会调用该操作.

public static IObservable<TSource> Finally<TSource>(
    this IObservable<TSource> source,
    Action finallyAction)
{
   ...
}

在这个例子中, 有一个完成的序列. 提供一个操作, 并看到它在 OnCompleted 处理程序之后被调用.

var source = new Subject<int>();
IObservable<int> result = source.Finally(() => Console.WriteLine("Finally action ran"));
result.Dump("Finally");
source.OnNext(1);
source.OnNext(2);
source.OnNext(3);
source.OnCompleted();

输出:

Finally-->1
Finally-->2
Finally-->3
Finally completed
Finally action ran

源序列也可能以异常终止. 在这种情况下, 异常将被发送到订阅者的 OnError (会在控制台输出中看到), 然后提供给 Finally 的委托将被执行.

或者, 可以处置订阅. 在下一个例子中, 看到即使序列没有完成, Finally 操作也会被调用.

var source = new Subject<int>();
var result = source.Finally(() => Console.WriteLine("Finally"));
var subscription = result.Subscribe(
    Console.WriteLine,
    Console.WriteLine,
    () => Console.WriteLine("Completed"));
source.OnNext(1);
source.OnNext(2);
source.OnNext(3);
subscription.Dispose();

输出:

1
2
3
Finally

如果订阅者的 OnError 抛出异常, 并且如果源在没有 try / catch 块的情况下调用 OnNext, CLR的未处理异常报告机制就会启动, 在某些情况下, 这可能导致应用程序在 Finally 操作符有机会调用回调之前关闭.

可以用以下代码创建这种情况:

var source = new Subject<int>();
var result = source.Finally(() => Console.WriteLine("Finally"));
result.Subscribe(
    Console.WriteLine,
    // Console.WriteLine,
    () => Console.WriteLine("Completed"));
source.OnNext(1);
source.OnNext(2);
source.OnNext(3);
// 使应用程序崩溃. Finally操作可能不会被调用.
source.OnError(new Exception("Fail"));

如果直接从程序的入口点运行这个代码, 而不将其包装在 try / catch 中, 可能会或可能不会看到 Finally 消息显示, 因为当异常一直到达堆栈顶部而没有被捕获时, 异常处理的工作方式略有不同. (奇怪的是, 它通常会运行, 但如果附加了调试器, 程序通常会在不运行 Finally 回调的情况下退出. )

这主要是一个奇特的现象: 像ASP.NET Core或WPF这样的应用程序框架通常会安装它们自己的堆栈顶部异常处理程序, 而且在任何情况下, 都不应该订阅一个知道会调用 OnError 而不提供错误回调的源.

这个问题只出现在这里使用的基于委托的 Subscribe 重载提供了一个在 OnError 中抛出异常的 IObserver<T> 实现的情况下.

构建控制台应用程序来试验Rx的行为, 很可能会遇到这个问题.

在实践中, Finally 在更正常的情况下会做正确的事情. (但在任何情况下, 都不应该从 OnError 处理程序中抛出异常. )

Using

Using 工厂方法允许资源的生命周期绑定到可观察序列的生命周期.

该方法接受两个回调: 一个用于创建可处置资源, 一个用于提供序列.

这允许所有内容都被惰性求值. 当代码调用此方法返回的 IObservable<T> 上的 Subscribe 时, 这些回调将被调用.

public static IObservable<TSource> Using<TSource, TResource>(
    Func<TResource> resourceFactory,
    Func<TResource, IObservable<TSource>> observableFactory)
    where TResource : IDisposable
{
   ...
}

当序列以 OnCompletedOnError 终止, 或者当订阅被处置时, 资源将被处置.

OnErrorResumeNext

本节的标题会让老VB开发者不寒而栗(或者喜出望外???)! (对于不熟悉这个晦涩语言特性的人来说, VB语言允许指示它忽略执行期间发生的任何错误, 并在任何失败后继续执行下一条语句. )

在Rx中, 有一个名为 OnErrorResumeNext 的扩展方法, 它与同名的VB关键字/语句具有相似的语义.

这个扩展方法允许序列在第一个序列正常完成或因错误完成的情况下继续使用另一个序列.

这与 Catch 的第二种形式(回退)非常相似.

区别在于, 使用 Catch 时, 如果任何源序列在没有报告错误的情况下到达末尾, Catch 将不会移动到下一个序列.

OnErrorResumeNext 将转发其所有输入产生的所有元素, 所以它类似于 Concat, 只是它忽略所有错误.

就像在VB中, 除了一次性代码外, OnErrorResumeNext 关键字最好避免使用一样, 在Rx中也应该谨慎使用它.

它会悄悄地吞下异常, 并可能使程序处于未知状态. 一般来说, 这会使代码更难维护和调试. (Catch 的回退形式也是如此. )

Retry

如果序列遇到可预测的失败, 重试是个好选项.

例如, 如果在云环境中运行, 操作偶尔因不明原因失败是很常见的.

云平台经常出于运营原因定期重新定位服务, 这意味着操作失败并不罕见–可能在云提供商决定将服务移动到不同的计算节点之前向该服务发出请求–但如果立即重试相同的操作, 它可能会成功(因为重试的请求会被路由到新节点). Rx的 Retry 扩展方法提供了在失败时重试指定次数或直到成功的能力. 这是通过在源报告错误时重新订阅源来实现的.

这个例子使用了简单的重载, 它将在任何异常时总是重试.

public static void RetrySample<T>(IObservable<T> source)
{
    source.Retry().Subscribe(t => Console.WriteLine(t)); // 将总是重试
    Console.ReadKey();
}

给定一个产生值0, 1和2, 然后调用 OnError 的源, 输出将是数字0, 1, 2不断重复.

这个输出将永远继续, 因为这个例子永远不会取消订阅, 并且如果不另外指定, Retry 将永远重试.

可以指定最大重试次数. 在接下来的例子中, 只重试一次, 因此在第二次订阅时发布的错误将被传递到最终订阅.

告诉 Retry 最大尝试次数, 所以如果想重试一次, 传递一个值为2–那是初始尝试加上一次重试.

source.Retry(2).Dump("Retry(2)");

输出:

Retry(2)-->0
Retry(2)-->1
Retry(2)-->2
Retry(2)-->0
Retry(2)-->1
Retry(2)-->2
Retry(2) failed-->测试异常

使用无限重复重载时应格外小心. 显然, 如果基础序列存在持续问题, 可能会陷入无限循环.

还要注意, 没有允许指定要重试的异常类型的重载.

Rx还提供了一个 RetryWhen 方法. 这与看的第一个 Catch 重载类似: 它不是不加区分地处理所有异常, 而是提供可以决定做什么的代码.

它的工作方式略有不同: 它不是每次错误时调用这个回调一次, 而是一次传入一个 IObservable<Exception>, 通过它将提供所有异常, 回调返回一个被称为 信号 可观察对象的 IObservable<T>.

T 可以是任何东西, 因为这个可观察对象可能返回的值将被忽略: 重要的是调用了三个 IObserver<T> 方法中的哪一个.

如果在收到异常时, 信号可观察对象调用 OnError, RetryWhen 将不会重试, 并将向其订阅者报告相同的错误.

另一方面, 如果信号可观察对象调用 OnCompleted, RetryWhen 同样不会重试, 并将在不报告错误的情况下完成. 但是如果信号可观察对象调用 OnNext, 这将导致 RetryWhen 通过重新订阅源来重试.

应用程序通常需要超出简单 OnError 处理程序的异常管理逻辑.

Rx提供了与在C# 中习惯的类似的异常处理操作符, 可以使用它们来组合复杂而健壮的查询.

在本章中, 介绍了Rx中的高级错误处理和一些更多的资源管理特性.

Tags: c#