Rx.NET介绍第一部分:踏上旅程
第一章: Rx.NET简介
为什么选择Rx?
我们都需要及时的信息, 有些应用程序离开实时信息无法工作.
饿着肚子等外卖等时候, 实时更新的骑手轨迹比"大概40分钟送"到更让人安心.
炒股的时候也必须能看到实时涨跌, 在线协作工具和多人游戏同样依赖于数据的快速分发和传递.
大量应用程序依赖于连续的最新数据流, 系统需要在有趣的事情发生时做出反应.
实时信息流是计算机系统的基本, 普遍元素. 但多年以来, 实时信息流在编程语言中往往是二等公民.
大多数语言通过类似数组的东西支持数据序列, 这假定数据闲坐在于内存当中, 准备好让代码在闲暇时和他打招呼.
数组可能适用于历史数据, 但它们不是表示应用程序运行时发生的事件的好方法.
尽管流式数据在计算中是一个相当古老的概念, 但它往往流于底层抽象, 通常于高级的类型系统结成不佳.
这很糟糕. 实时数据对广泛的应用程序至关重要. 它应该像列表, 字典和其他集合一样易于使用.
.NET的 Reactive Extensions (简称 Rx)将实时数据源提升格为一等公民.
Rx不需要特殊的语法支持. 只是利用.NET的类型系统以一种.NET语言(如C#, F#和VB.NET)都能像使用集合类型一样自然地使用的方式表示数据流.
C#提供了集成查询功能(Linq), 可以用它来查找列表中满足某些条件的所有条目.
对于 List<Trade> 类型的 trades 变量, 可以这样写:
var bigTrades =
from trade in trades
where trade.Volume > 1_000_000;
使用 Rx, 可以用完全相同的代码处理实时数据. 只不过 trades 变量是
IObservable<Trade>, 而不是 List<Trade>.
IObservable<T> 是Rx中的基本抽象.它本质上是 IEnumerable<T>
的实时版本.
在这种情况下, bigTrades 是一个 IObservable<Trade>, 一个实时数据源,
能够在交易量超过一百万时立即通知.
最妙的是, 它可以立即报告每一笔这样的交易–这就是所说的 实时 数据源的含义.
Rx 是一个功能强大的开发工具.
让.NET开发人员用熟悉的语言功能处理实时事件流. 能够更简洁,
更优雅的方式表达复杂行为.
Rx 建立在 LINQ(语言集成查询) 之上.
这使能够使用上面所示的查询语法(或者可以使用显式函数调用方法).
LINQ在.NET中广泛用于数据访问(例如, 在 Entity Framework Core 中),
也用于处理内存中的集合(使用LINQ to Objects),
有经验的.NET开发人员会感觉在Rx中如鱼得水.
LINQ是一种高度可组合的设计: 开发者可以按任何组合连接运算符, 以直接的方式表达潜在的复杂处理.
这种可组合性源于其设计的数学理论基础, 深入理解有帮助的但不是必须的.
对其数学不感兴趣的开发人员可以受益于这一事实, 即 Rx 等 LINQ
提供程序提供了一组可以以无数不同方式组合在一起的构建块,
而且一切都能正常工作.
LINQ在处理大量数据方面有着良好的记录. 微软在其一些内部系统中广泛使用它, 包括支持数千万活跃用户的服务.
何时适合使用Rx?
Rx 专为处理事件序列而设计.
接下来的部分将描述 合适的场景, 以及一些 值得考虑的情况. 最后, 将描述一些可以使用Rx但 有更好替代方案的情况.
天然适合Rx的场景
Rx非常适合表示源自代码外部且应用程序需要响应的事件, 例如:
- 集成事件, 如来自消息总线的广播, 来自WebSocket API的推送事件, 通过MQTT或其他低延迟中间件(如Azure Event Grid, Azure Event Hubs和Azure Service Bus)接收到的消息.
- 来自设备的数据, 如供水设施基础设施中的流量传感器, 或宽带提供商网络设备中的监控和诊断功能;
- 来自移动系统的位置数据, 如船舶的AIS消息或汽车遥测数据;
- 操作系统事件, 如文件系统活动或WMI事件;
- 道路交通信息, 如事故通知或平均速度变化;
- 与复杂事件处理(CEP)引擎的集成;
- UI事件, 如鼠标移动或按钮点击.
Rx也是对领域事件进行建模的好方法. 这些事件可能是上述某些事件的结果, 但在对它们进行处理后, 会产生更直接代表应用程序概念的事件. 这些可能包括:
- 领域对象的属性或状态变化, 如"订单状态更新" 或"注册已接受" ;
- 领域对象集合的变化, 如" 新注册创建" .
事件也可能表示从传入事件(或稍后分析的历史数据)中得出的见解, 例如:
- 宽带客户可能不知不觉成为DDoS攻击的参与者;
- 预设的传感器阈值达成;
- 两艘远洋船舶进行了一种通常与非法活动相关的移动模式(例如, 长时间并排航行, 足以在远海转移货物或人员);
- CNC铣床MFZH12的4号轴轴承磨损速度明显高于标称轮廓;
- 如果用户想按时到达城市另一端的会议, 当前交通状况表明他们应在接下来的10分钟内出发.
这三组示例展示了应用程序如何在处理事件时逐步增加信息的价值. 从原始事件开始, 然后对其进行增强以产生特定于领域的事件, 最后进行分析以产生应用程序用户真正关心的通知.
处理的每个阶段都增加了消息的价值. 每个阶段通常也会减少消息的数量. 如果将第一类中的原始事件直接呈现给用户, 他们可能会被大量消息淹没, 无法发现重要事件.
但是, 如果仅在处理检测到重要事件时向他们呈现通知, 这将使他们能够更高效, 准确地工作, 因为大大提高了信噪比.
System.Reactive库提供了构建这种增值过程的工具, 可以通过它驯服高容量原始事件源, 以产生高价值, 实时, 可操作的见解.它提供了一组运算符, 使代码能够以声明式方式表达这种处理, 正如你将在后续章节中看到的那样.
Rx也非常适合引入和管理并发, 以实现任务派发的目的.
也就是说, 并行执行一组给定的工作, 使得检测到事件的线程不必也是处理该事件的线程.
一个非常流行的用例是维护响应式UI.
UI事件处理在Rx中非常流行–不仅在.NET中, 在RxJS中也是如此, RxJS起源于Rx.NET的一个分支–很容易让人认为这就是它的用途. 但其在那里的成功不应使忽视其更广泛的适用性.
对一个
IEnumerable<T>实时事件进行建模, 应该优先考虑使用Rx. 虽然IEnumerable<T>可以通过惰性求值(如yield return)的模型也试用, 但存在一个问题.即如果消费需要下一个元素(例如, 因为
foreach循环刚刚完成一次迭代)但当前并无可用时,IEnumerable<T>只能在其MoveNext中阻塞调用线程, 直到有数据, 这在某些应用程序中可能会导致可伸缩性问题.即使在某些情况下线程阻塞是可以接受的(或利用新的
IAsyncEnumerable<T>, 它可以借助C#的await foreach功能在这些情况下避免阻塞线程),IEnumerable<T>和IAsyncEnumerable<T>对于表示实时信息源来说是容易产生误导.这些接口表示一种 拉 编程模型: 代码要求序列中的下一个项目. Rx是对自然按自己的时间表产生信息的信息源进行建模的更自然选择.
可能适合Rx的情况
Rx可以用于表示异步操作. .NET的
Task或Task<T>实际上表示单个事件, 而IObservable<T>可以被认为是对一系列事件的概括.例如,
Task<int>和IObservable<int>之间的关系类似于int和IEnumerable<int>之间的关系.这意味着存在一些可以使用
Task和async关键字或通过Rx处理的场景.如果在处理过程中的任何时候你需要处理多个值以及单个值, Rx可以做到这两者;
而
Task不太擅长处理多个项目.Task<IEnumerable<int>>, 能够等待构建一个集合, 如果集合中的所有元素可以在一次性收集, 就非常适用.这种方法的局限性在于, 一旦任务产生了它的
IEnumerable<int>结果,await就完成了, 又回到了对那个IEnumerable<int>的非异步迭代.如果数据不能一次性获取–比如
IEnumerable<int>表示从一个API获取的数据, 其中结果是以每次100个项目的批次获取的–它的MoveNext将不得不在每次需要等待时阻塞调用线程.在Rx以前, 这种方式是表示事件上不连续事件的集合(流)的最佳方式.
随着.NET Core 3.0中引入的
IAsyncEnumerable<T>和C# 8提供了一种在保持在async/await世界中的同时处理序列的方法,Microsoft.Bcl.AsyncInterfacesNuGet包使这在.NET Framework和.NET Standard 2.0上可用.现在选择使用Rx往往归结为 拉 模型(以
foreach或await foreach为例)还是 推 模型(其中代码提供回调, 当项目可用时由事件源调用)更适合正业务场景.另一个相关的功能是自从Rx首次出现后.NET添加的通道(Channel). 这些允许一个源产生对象, 一个消费者处理它们, 所以与Rx有明显的表面相似性.
然而, Rx的一个显著特点是它支持通过广泛的运算符进行组合, 这在通道中没有直接等效的功能.
另一方面, 通道为适应生产和消费速率的变化提供了更多选项.
前文提到的任务分发: 使用Rx将工作推到其他线程上. 这种技术可以使Rx引入和管理并发, 以实现扩展或执行并行计算,
但其他专用框架, 如TPL(任务并行库)Dataflow或PLINQ, 更适合执行并行计算密集型工作.
当然, TPL Dataflow通过其
AsObserver和AsObservable扩展方法提供了与Rx的集成. 所以通常使用Rx将TPL Dataflow与应用程序的其他部分集成.不适合使用Rx的情况
Rx的
IObservable<T>不是IEnumerable<T>或IAsyncEnumerable<T>的替代品. 将自然基于拉取的东西强制转换为基于推送的是错误的.此外, 在某些情况下, Rx编程模型可能太过简单. 一些消息队列技术(如MSMQ)从定义上就是顺序的, 因此可能看起来很适合Rx.
然而, 它们通常因其事务处理支持而被选择. Rx没有直接的方法来呈现事务语义, 所以在需要这种语义的场景中, 可能最好直接使用相关技术的API.
也就是说, Reaqtor为Rx添加了持久性和持久化, 能够把事务队列系统与Rx集成.
始终要为具体工作而选择最佳工具, 编程模型只有在贴近所业务抽象的时候, 才会更易于维护, 才可能提供更好的性能, 万勿勉强.
Rx 实战
通过一个简单的Rx示例快速上手. 安装.NET SDK之后, 在命令行中运行以下命令:
mkdir TryRx
cd TryRx
dotnet new console
dotnet add package System.Reactive
或者, 如果安装了Visual Studio, 可以创建一个新的.NET控制台项目, 然后使用NuGet包管理器添加对System.Reactive的引用.
以下代码创建一个可观察源(ticks), 它每秒产生一个事件.
代码还将一个处理程序传递给该源,
该处理程序为每个事件向控制台写入一条消息:
using System.Reactive.Linq;
IObservable<long> ticks = Observable.Timer(
dueTime: TimeSpan.Zero,
period: TimeSpan.FromSeconds(1));
ticks.Subscribe(
tick => Console.WriteLine($"Tick {tick}"));
Thread.Sleep(5000);
确实不太令人兴奋, 它是一个基本的示例, Rx有一个非常简单的编程模型.
其强大之处来自于组合–可以使用 System.Reactive
库中的构建块来描述将从原始, 低级事件带到高价值见解的处理过程.
但要做到这一点, 必须首先理解Rx的关键类型,
IObservable<T> 和 IObserver<T>.
第二章: 关键类型
Rx可以极大地简化响应事件的代码. 但是要编写良好的响应式代码, 必须理解基本概念.
Rx的基本构建块是一个名为 IObservable<T> 的接口.
理解这个接口及其对应接口 IObserver<T> 是使用Rx成功的关键.
上一章展示了这个LINQ查询表达式作为第一个示例:
var bigTrades =
from trade in trades
where trade.Volume > 1_000_000;
大多数.NET开发人员至少会熟悉LINQ的一种流行形式, 如LINQ对象查询 (LINQ to Objects)或EF查询(Entity Framework Core).
大多数LINQ实现查询静态数据. LINQ to Objects在数组或其他集合上工作, Entity Framework Core中的LINQ查询在数据库中的数据上运行, 但Rx不同: 它提供了对实时事件流定义查询的能力–称之为动态数据.
不喜欢查询表达式语法, 可以通过直接调用LINQ运算符来编写完全等效的代码:
var bigTrades = trades.Where(trade => trade.Volume > 1_000_000);
无论使用哪种风格, 这都是LINQ的方式, 表示 bigTrades 只包含 trades 中
Volume 属性大于一百万的项目.
无法确切知道这些示例的作用, 因为看不到 trades 或 bigTrades
变量的类型. 这段代码的含义将根据这些类型而有很大差异.
如果使用LINQ to Objects, 这些可能都是 IEnumerable<Trade>.
这意味着这些变量都引用表示集合的对象, 可以使用 foreach 循环枚举其内容.
这将表示静态数据, 代码可以直接检查的数据.
通过明确类型来清楚代码的含义:
IObservable<Trade> bigTrades = trades.Where(trade => trade.Volume > 1_000_000);
这消除了歧义. 现在很清楚, 不是在处理静态数据. 正在使用一个
IObservable<Trade>. 但这到底是什么呢?
IObservable<T>
IObservable<T> 接口表示Rx的基本抽象: 某种类型 T 的值序列.
在更上层的抽象意义上, 它表示与 IEnumerable<T> 相同的东西.
区别在于代码如何消费这些值. IEnumerable<T> 使代码能够检索值(通常使用
foreach 循环), 而 IObservable<T> 在值可用时提供它们.
这种区别有时被描述为推送与拉取. 可以通过执行 foreach 循环从
IEnumerable<T> 中拉取值, 但 IObservable<T> 会将值推送给的代码.
IObservable<T> 如何将其值推送给代码呢? 如果想要这些值, 代码必须订阅
IObservable<T>, 意味着提供它可以调用的方法. 实际上, 订阅是
IObservable<T> 直接支持的唯一操作. 这是接口的完整定义:
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
可以在GitHub上查看 IObservable<T> 的源代码. 它是.NET运行时库的一部分,
而不是System.Reactive NuGet包.
IObservable<T> 表示一个非常重要的抽象, 它被内置到.NET中.
.NET运行时库仅定义了 IObservable<T> 和 IObserver<T> 接口,
而没有定义LINQ实现. System.Reactive NuGet包为提供了LINQ支持,
并处理线程相关的事情.
这个接口的唯一方法明确了可以对 IObservable<T> 做什么:
如果想要接收它提供的事件, 可以订阅它.
也可以取消订阅: Subscribe 方法返回一个 IDisposable, 如果调用该对象的
Dispose 方法, 它将取消订阅.
Subscribe 方法要求传入一个 IObserver<T> 的实现, 很快就会讲到.
读到此处, 细心的读者可能已经注意到, 上一章中的一个示例看起来似乎有问题.
该代码创建了一个每秒产生一个事件的 IObservable<long>,
然后使用以下代码订阅它:
ticks.Subscribe(
tick => Console.WriteLine($"Tick {tick}"));
这里传递的是一个委托, 而不是 IObservable<T>.Subscribe 所要求的
IObserver<T>.
很快就会讲到 IObserver<T>, 以上示例使用了System.Reactive
NuGet包中的一个扩展方法:
// 来自System.Reactive库的ObservableExtensions类
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext)
这是一个辅助方法, 它将一个委托包装在一个 IObserver<T> 的实现中,
然后将其传递给 IObservable<T>.Subscribe.
这样做的效果是, 可以只编写一个简单的方法(而不是一个完整的 IObserver<T>
实现), 并且可观察源将在每次想要提供一个值时调用回调.
通常使用这种辅助方法比重新实现Rx的接口更方便.
热数据源和冷数据源
由于 IObservable<T> 在订阅之前不能提供值, 所以订阅的时间很重要.
想象一个提供的信息是实时的 IObservable<Trade>,
描述了某个市场中发生的交易. 那么在订阅之前发生的任何交易都不会被获知.
在Rx中, 这种类型的源被称为热数据源.
与之相对的是冷数据源. 一个 IObservable<T>
可以始终向任何订阅者提供完全相同的事件序列, 无论 Subscribe 何时发生.
如果一个 IObservable<Trade>, 不报告实时信息,
而是基于记录的历史交易数据生成通知.
无论何时订阅都可拿到全部历史数据的的源被称为冷数据源.
热可观察源的示例:
- 传感器的测量值;
- 交易交易所的价格变动;
- 立即分发事件的事件源, 如Azure Event Grid;
- 鼠标移动;
- 定时器事件;
- 广播, 如ESB通道或UDP网络数据包.
冷的可观察源的示例:
- 集合的内容(如
IEnumerable<T>的ToObservable扩展方法返回的内容); - 固定范围的值, 如
Observable.Range产生的内容; - 基于算法生成的事件, 如
Observable.Generate产生的内容; - 异步操作的工厂, 如
FromAsync返回的内容; - 由运行常规代码(如循环)产生的事件; 可以使用
Observable.Create创建这样的源; - 流式事件提供程序, 如Azure Event Hub或Kafka(或任何其他流式源, 它保留过去的事件, 以便能够从流中的特定时刻传递事件; 所以不是Azure Event Grid风格的事件源).
并非所有源都严格地完全是热的或冷的(暖数据源😂). 例如, 可以想象一个对实时
IObservable<Trade> 的轻微变体, 其中源总是向新订阅者报告最近的交易.
订阅者可以立即收到某些内容, 然后随着新信息的到来保持最新状态.
新订阅者将始终收到(可能相当旧的)信息这一事实是冷数据源的特征, 但只有第一个事件是冷的.
这种情况, 订阅者依旧会错过许多早期订阅者可见的信息, 这使得这个源更倾向于热而不是冷.
有一种有趣的特殊情况, 其中事件源被设计为使应用程序能够按顺序接收每个事件, 且恰好一次.
事件流系统如Kafka或Azure Event Hub具有这种特性–它们会保留事件一段时间, 以确保即使消费者偶尔落后, 也不会错过任何事件.
进程的标准输入(stdin)也具有这种特性:
如果在命令行工具准备好处理输入之前开始输入,
操作系统将把输入保存在缓冲区中, 以确保不会丢失任何内容.
Windows为桌面应用程序做了类似的事情: 每个应用程序线程都有一个消息队列, 这样如果在它无法响应时点击或输入, 输入也最终将被处理.
可以将这些源视为冷 -> 热. 它们像冷源一样,
不会因为订阅的晚而错过任何东西, 但一旦"追上"最新的数据,
通常就不能再倒回开始.
所以一旦开始运行, 它们更像是热事件.
这种冷 - 然后 - 热的源可能会带来一个问题, 如果想要附加多个订阅者.
比入说老师讲课希望所有学生进入课堂才开始, 而不是又一个学生就开讲.
如果源在订阅发生时立即开始提供事件(一个学生在课堂), 那么对于第一个订阅者来说完全没问题(第一个学生可以听到全部内容).
Observable将收到在开始之前缓存所有事件. 但是如果想要附加多个订阅者, 就有问题了: 第一个订阅者可能会收到全部通知, 而第二个(及以后的)订阅者将错过这些通知.
在这类情况下, 真的希望在启动之前以某种方式配置好所有订阅者.
希望订阅与启动操作分开. 默认情况下, 订阅一个源意味着希望它开始,
但Rx定义了一个专门的接口, 可以给更多的控制: IConnectableObservable<T>.
它派生自 IObservable<T>, 并只添加了一个方法, Connect:
public interface IConnectableObservable<out T> : IObservable<T>
{
IDisposable Connect();
}
这在需要获取或生成事件的过程中很有用, 并且需要确保在该过程开始之前做好准备.
因为 IConnectableObservable<T> 在调用 Connect 之前不会启动,
所以它为你提供了一种在事件开始流动之前附加所需数量订阅者的方法.
源的"温度" 并不一定从其类型中明显看出. 即使底层源是一个
IConnectableObservable<T>, 它也可能经常隐藏在多层代码后面.
所以大多数时候, 只看到一个 IObservable<T>. 由于 IObservable<T>
只定义了一个方法 Subscribe, 你可能想知道如何用它做任何有趣的事情.
其强大之处来自于System.Reactive NuGet库提供的LINQ运算符.
LINQ运算符和组合
到目前为止, 只展示了一个非常简单的LINQ示例, 使用 Where
运算符将事件过滤为仅满足特定条件的事件.
要了解如何通过组合构建更高级的功能, 接下来将介绍一个示例场景.
假设想编写一个程序, 监视文件系统上的某个文件夹, 并且在该文件夹中的内容发生任何变化时自动执行处理.
例如, Web开发人员经常希望在编辑器中保存更改时触发客户端代码的自动重新构建, 以便他们可以快速看到更改的效果.
文件系统更改通常会成批出现. 文本编辑器在保存文件时可能会执行几个不同的操作.
有些会将修改保存到一个新文件, 然后在完成后执行几次重命名, 因为这样可以避免在保存文件时发生电源故障或系统崩溃导致的数据丢失.
所以你通常不希望在检测到文件活动时立即采取行动. 最好等一会儿, 看看是否有更多活动发生, 并且仅在一切平静下来后才采取行动.
所以不应该直接对文件系统活动做出反应. 希望在活动平息后的安静时刻采取行动.
Rx没有直接提供此功能, 但可以通过组合一些内置运算符来创建一个自定义运算符. 以下代码定义了一个Rx运算符来检测和报告此类情况.
间歇期操作符
static class RxExt
{
public static IObservable<IList<T>> Quiescent<T>(this IObservable<T> src, TimeSpan minimumInactivityPeriod, IScheduler scheduler)
{
IObservable<int> onoffs =
from _ in src
from delta in
Observable.Return(1, scheduler).Concat(Observable.Return(-1, scheduler).Delay(minimumInactivityPeriod, scheduler))
select delta;
IObservable<int> outstanding = onoffs.Scan(0, (total, delta) => total + delta);
IObservable<int> zeroCrossings = outstanding.Where(total => total == 0);
return src.Buffer(zeroCrossings);
}
}
Rx 新手很难立即明白它是如何工作的. 这比展示过的示例复杂得多,
因为它来自一个实际应用. 接下来将逐步解释, 一切都会变得清晰.
关于这个代码首先要说明的是, 实际上是在定义一个自定义的LINQ风格的运算符:
一个扩展方法, 像Rx提供的所有LINQ运算符一样, 它以一个 IObservable<T>
作为其隐式参数, 并产生另一个可观察源作为其结果. 返回类型略有不同: 它是
IObservable<IList<T>>.
这是因为一旦回到不活动状态, 将希望处理刚刚发生的所有事情, 所以这个运算符将产生一个列表, 其中包含源在其最近的活动爆发中报告的每个值.
当想要展示一个Rx运算符的行为时, 通常会绘制一个"弹珠图" .
这是一个显示一个或多个 IObservable<T> 事件源的图,
每个源由一条水平线表示.
源产生的每个事件由该线上的一个圆圈(或"弹珠")表示, 水平位置表示时间.
通常, 线的左侧有一个垂直条, 表示应用程序 订阅源的瞬间, 除非它恰好立即产生事件, 在这种情况下它将从一个弹珠开始.
如果线的右侧有一个箭头, 则表示可观察对象的生命周期超出了图的范围.
以下是一个显示上述 Quiescent 运算符如何响应特定输入的图:
此图显示源(顶行)产生了几个事件(在此示例中为值1和2), 然后停止了一会儿.
在它停止后不久, Quiescent
运算符返回的可观察对象(下行)产生了一个包含这两个事件的单个事件([1,2]).
然后源再次启动, 快速产生值3, 4和5, 然后再次安静下来.
同样, 一旦安静期持续足够长的时间, Quiescent
源就会产生一个包含此第二次活动爆发中源的所有事件的单个事件([3,4,5]).
然后在此图中显示的源的最后一点活动由单个事件6组成, 随后是更多的不活动,
并且一旦不活动持续足够长的时间, Quiescent
源就会产生一个报告此的单个事件.
由于源的最后一次 爆发 活动只包含一个事件, 所以 Quiescent
可观察对象报告的列表是一个包含单个值的列表: [ 6 ].
那么上面的代码是如何实现这一点的呢? 关于 Quiescent 方法首先要注意的是,
它使用其他Rx提供的LINQ运算符, 比如, Return, Scan, Where 和
Buffer.
此外, 当C#查询表达式连续包含两个 from 子句时, C#编译器会将其转换为对
SelectMany, 在操作符的组合之后,最终产生 IObservable<IList<T>>.
和它完全等价的函数式写法:
static class RxExt
{
public static IObservable<IList<T>> Quiescent<T>(
this IObservable<T> src,
TimeSpan minimumInactivityPeriod,
IScheduler scheduler)
{
// 使用函数调用的LINQ方式重写
IObservable<int> onoffs = src
.SelectMany(_ => Observable.Return(1, scheduler)
.Concat(Observable.Return(-1, scheduler)
.Delay(minimumInactivityPeriod, scheduler)));
IObservable<int> outstanding = onoffs.Scan(0, (total, delta) => total + delta);
IObservable<int> zeroCrossings = outstanding.Where(total => total == 0);
return src.Buffer(zeroCrossings);
}
}
这是最通常使用Rx的方式. 使用各种运算符的组合, 以产生想要的效果.
但是这种特定的组合如运作呢? 有几种方法可以从 Quiescent
运算符中获得正在寻找的行为,基本思想是它跟踪最近发生了多少事件,
在该数字降回零时产生一个结果.
outstanding 变量引用的 IObservable<int> 跟踪最近事件的数量,
以下弹珠图显示了它对与上一图相同源事件的响应:
对事件进行了颜色编码, 以便可以显示源事件和 outstanding
产生的相应事件之间的关系.
每次源产生一个事件, outstanding 会同时产生一个事件, 其值比
outstanding 之前产生的值高1.
但是每个这样的源事件也会导致 outstanding 在两秒后产生另一个事件.
(这是两秒, 因为在这些示例中, 假设 Quiescent 的第一个参数是
TimeSpan.FromSeconds(2), 如第一个弹珠图所示. )
那个第二个事件总是产生一个比之前的值低1的值.
这意味着 outstanding
产生的每个事件都告诉源在过去两秒内产生了多少个事件.
此图以略微不同的形式显示了相同的信息: 它显示了 outstanding
产生的最新值作为一个图形. 你可以看到值每次源产生一个新值时就增加1.
并且在源产生每个值后的两秒, 它会降回1.
在简单的情况下, 比如最后一个事件6, 它是在那个时间左右发生的唯一事件,
outstanding 的值在事件发生时增加1, 然后在两秒后降回.
在图的左侧有点复杂: 得到两个事件相继快速发生, 所以 outstanding
的值先增加到1, 然后增加到2, 然后在降回1之前又降回0.
中间部分看起来有点乱–计数在源产生事件3时增加到1, 然后在事件4发生时增加到2.
然后在事件3发生两秒后它降回1, 但随后另一个事件5发生, 使总数回到2. 不久之后, 在事件4发生两秒后它又降回1. 然后在事件5发生两秒后它再次降回0.
中间部分是最混乱的, 但它也是这个运算符旨在处理的那种活动的最具代表性的部分.
这里的整个要点是期望看到活动的爆发, 如果这些代表文件系统活动, 复杂是正常的!
因为存储设备的行为并不总是可预期的, 特别介质是机械硬盘(机械移动), 或者是远程存储(网络抖动)的情况下.
有了对最近活动的这种度量, 可以通过观察 outstanding
何时降回零来检测活动爆发的结束, 这就是上面代码中 zeroCrossing
所引用的可观察对象所做的事情.
本质上是使用 Where 运算符过滤掉所有 outstanding
的当前值不返回零的事件.
outstanding 本身呢? 每次源产生一个值时, 实际上创建一个全新的
IObservable<int>, 它恰好产生两个值.
它立即产生值1, 然后在指定的时间跨度(在这些示例中为2秒, 但一般是
minimumActivityPeriod 指定的任何时间)后产生值 -1.
这就是在查询表达式的这个子句中发生的事情:
from delta in Observable
.Return(1, scheduler)
.Concat(Observable
.Return(-1, scheduler)
.Delay(minimumInactivityPeriod, scheduler))
Rx的使用就是算子组合的使用, Return 运算符创建一个 IObservable<int>,
它立即产生一个单一值然后终止.
这段代码调用它两次, 一次产生值1, 一次产生值 -1. 它使用 Delay 运算符,
不是立即得到 -1值, 而是得到一个可观察对象,
它会等待指定的时间周期(在这些示例中为2秒, 但一般是
minimumActivityPeriod 指定的任何时间)后产生值.
然后使用 Concat 将这两个组合成一个单一的 IObservable<int>,
它先产生值1, 然后两秒后产生值 -1.
虽然这为每个源事件创建了一个全新的 IObservable<int>, 但上面显示的
from 子句是 from... from.. select 形式的查询表达式的一部分.
C#编译器会将其转换为对 SelectMany 的调用,
这具有将所有这些都扁平化为一个单一可观察对象的效果, 这就是 onoffs
变量所引用的内容. 以下弹珠图说明了这一点:
from _ in src
from delta in Observable.Return(1,scheduler).Concat(0bservable.Return(-1,scheduler).Delay(minimumInactivityPeriod, scheduler))
此图还再次显示了 outstanding 可观察对象, 现在可以看到它来自哪里:
它只是 onoffs 产生的所有值的运行总和.
这个运行总和可观察对象是通过以下代码创建的:
IObservable<int> outstanding = onoffs.Scan(0, (total, delta) => total + delta);
Rx的 Scan 运算符的工作方式与标准LINQ的 Aggregate 运算符非常相似,
因为它累积地将一个操作(在这种情况下是加法)应用于序列中的每个项目.
不同之处在于, Aggregate 在到达序列末尾时只产生最终结果, 而 Scan
显示其所有工作, 在每次输入后产生到目前为止的累积值.
所以这意味着 outstanding 将在 onoffs 产生一个事件时产生一个事件,
并且该事件的值将是到目前为止的运行总和–所有来自 onoffs 的值的总和.
所以这就是 outstanding 源在过去两秒内产生了多少个事件(或者一般来说, 是
minimumActivityPeriod 指定的任何时间段内).
最后一部分是如何从 zeroCrossings
(它在源进入静止状态时产生一个事件)到输出 IObservable<IList<T>>,
后者提供源在最近一次活动爆发中发生的所有事件.
这里只是使用Rx的 Buffer 运算符, 它正是为此场景设计的:
它将其输入切片为块, 为每个块产生一个事件, 该事件的值是一个 IList<T>,
其中包含该块的项目.
Buffer 可以通过几种方式切片, 在这种情况下, 使用的形式是每当某个
IObservable<T> 产生一个项目时就开始一个新的切片.
具体来说, 告诉 Buffer 通过在 zeroCrossings
产生一个新事件时创建一个新块来切片源.
还有一个最后的细节, 就是这个方法需要一个 IScheduler.
这是Rx用于处理定时和并发的抽象.
需要它是因为需要能够在延迟一秒后产生事件,
而这种基于时间的活动需要一个调度程序.
在后面的章节中更详细地介绍所有这些运算符和调度程序的工作原理.
通常通过创建LINQ运算符的组合来使用Rx, 这些运算符处理和组合
IObservable<T> 源以定义所需的逻辑.
虽然示例中没有调用 IObservable<T> 定义的方法(Subscribe).
总会有某个地方最终会消费这些事件,
使用Rx的大部分工作在于声明式地定义需要的 IObservable<T>.
.NET不是有Event吗?
.NET从第一个版本发布以来就有内置的事件支持–事件是.NET类型系统的一部分.
C#语言通过 event
关键字以及用于订阅事件的专门语法提供了对事件的内在支持.
那么, 当Rx在大约10年后出现时, 为什么需要发明Rx呢? .NET事件有什么问题吗?
.NET事件的基本问题是它们在.NET类型系统中得到了特殊处理. 而具有讽刺意味的是, 这搞的它们非常灵活.
如果没有.NET事件, 将需要某种基于对象的事件表示, 此时可以对事件做所有与其他对象相同的事情: 可以将它们存储在字段中, 将它们作为参数传递给方法, 在它们上面定义方法等等.
大可不必苛责古人, .NET版本1在没有泛型的情况下很难定义一个好的基于对象的事件表示, 而.NET直到版本2(在.NET 1.0发布三年半后)才有泛型.
不同的事件源需要能够报告不同的数据, 而.NET事件提供了一种通过类型参数化事件的方法.
但是一旦有了泛型, 就有可能定义像 IObservable<T> 这样的类型,
并且事件提供的主要优势就消失了.
另一个好处是对实现和订阅事件的一些语言支持, 但原则上, 如果微软选择这样做, 也可以为Rx提供这种支持. 这不是一个需要事件与类型系统的其他功能有根本不同的功能.
回顾下上一章的例子中自定义的LINQ运算符 Quiescent, 因为
IObservable<T> 和千千万万个普通接口一样, 可以自由地为它编写扩展方法.
而没人能能为一个事件编写扩展方法.
此外, 能够包装或适配 IObservable<T> 源. Quiescent 接受一个
IObservable<T> 作为输入,
并组合各种Rx运算符来产生另一个可观察对象作为输出.
它的输入是一个可以订阅的事件源, 它的输出也是一个可以订阅的事件源. 无法用.NET事件做到这一点–不能编写一个接受事件作为参数或返回事件的方法.
这些限制有时被描述为.NET事件不是一等公民. 在.NET中有一些你可以对值或引用做的事情, 但不能对事件做.
如果将事件源表示为一个普通的接口, 那么它就是一等公民: 它可以使用所有期望与其他对象和值一起使用的功能, 正是因为它不是什么特殊的东西.
为什么不使用流(Streams)呢?
前文经常将 IObservable<T> 描述为表示事件流. 这就引出了一个明显的问题:
.NET已经有了 System.IO.Stream, 那么为什么不直接使用它?
简短的答案是, 抽象不对, 因为它们代表了计算中一个非常古老的概念, 早在第一个Windows操作系统发布之前就存在了, 它们有很多历史包袱.
这意味着即使是像"暴露一些数据, 希望使所有感兴趣的各方都能使用它"
这样简单的场景, 用 Stream 类型实现起来非常复杂.
Stream 没有提供任何方法来指示将出现什么类型的数据–它只知道字节.
由于.NET的类型系统支持泛型,
自然希望表示事件流的类型通过类型参数指示事件类型.
所以即使把 IO 流与Rx一起使用, 它们也不是正确的主要抽象.
(可以查看附录A: 经典IO流有什么问题? , 以获得更详细的解释,
确切地说明为什么 Stream 不适合这项任务.)
已经看到了为什么 IObservable<T> 存在, 接下来看看硬币的另一面
IObserver<T>.
IObserver<T>
前文展示了 IObservable<T> 的定义. 它只有一个方法 Subscribe.
并且这个方法只接受一个类型为 IObserver<T> 的参数.
所以如果想观察 IObservable<T> 提供的事件, 你必须为它提供一个
IObserver<T>. 在前面的示例中,只是提供了一个简单的回调,
Rx会为将其包装在一个 IObserver<T> 的实现中, 需要理解 IObserver<T>
才能有效地使用Rx. 它不是一个复杂的接口:
public interface IObserver<in T>
{
void OnNext(T value);
void OnError(Exception error);
void OnCompleted();
}
和 IObservable<T> 一样, 可以在.NET运行时GitHub库中找到 IObserver<T>
的源代码, 这两个接口都内置在dotnet运行时库中.
如果想创建一个将值打印到控制台的 IObserver<T>, 非常简单:
public class MyConsoleObserver<T> : IObserver<T>
{
public void OnNext(T value)
{
Console.WriteLine($"Received value {value}");
}
public void OnError(Exception error)
{
Console.WriteLine($"Sequence faulted with {error}");
}
public void OnCompleted()
{
Console.WriteLine("Sequence terminated");
}
}
在上一章中, 使用了一个 Subscribe 扩展方法, 它接受一个委托,
每当源产生一个项目时就会调用该委托.
这个方法是由Rx的 ObservableExtensions 类定义的, 它还定义了
IObservable<T> 的各种其他扩展方法.
包括 Subscribe 的重载, 无需提供自己的 IObserver<T> 实现,
能够编写具有相同效果的代码:
source.Subscribe(
value => Console.WriteLine($"Received value {value}"),
error => Console.WriteLine($"Sequence faulted with {error}"),
() => Console.WriteLine("Sequence terminated"));
Subscribe 的重载中, 没有传递所有三个方法(例如,
之前的示例只提供了一个对应于 OnNext 的单个回调)等同于编写一个
IObserver<T> 实现, 其中一个或多个方法只是有一个空体.
无论是发现提供自己的 IObserver<T> 实现更方便, 还是只提供部分或全部
OnNext, OnError 和 OnCompleted 方法的回调, 基本行为都是相同的:
一个 IObservable<T> 源通过调用 OnNext 报告每个事件, 并通过调用
OnError 或 OnCompleted 告诉事件何时结束.
IObservable<T> 和 IObserver<T> 之间的关系是否与 IEnumerable<T> 和
IEnumerator<T> 之间的关系是对称的.
IObservable<T> 和 IEnumerable<T> 都代表潜在的序列. 这两个接口,
只有在请求时才会提供数据.
要从 IEnumerable<T> 获取值, 需要 IEnumerator<T>, 类似地, 要从
IObservable<T> 获取值, 需要 IObserver<T>.
IEnumerable<T> 和 IObservable<T> 之间基本区别在于 推 和 拉.
IEnumerable<T> 可以创建一个 IEnumerator<T>,
然后使用它来检索项目(这就是C#的 foreach 循环), 而 IObservable<T>
并不提供 IObserver<T>: 它期望使用者提供一个 IObserver<T>,
它会将值推送给该观察者.
为什么 IObserver<T> 有这三个方法呢? 因为 IObserver<T> 代表与
IEnumerable<T> 有类同的抽象?
IObservable<T> 和 IObserver<T> 被设计为保留 IEnumerable<T> 和
IEnumerator<T> 的确切含义, 只改变消费的细节.
想想当遍历一个 IEnumerable<T> (例如使用 foreach 循环)时会发生什么.
在每次迭代(更准确地说, 在每次调用枚举器的 MoveNext 方法时),
有三种可能的情况:
MoveNext可以返回true, 表示在枚举器的Current属性中有一个可用的值;MoveNext可以抛出一个异常;MoveNext可以返回false, 表示你已经到达了集合的末尾.
这三种结果与 IObserver<T> 定义的三个方法精确对应.
可以用更抽象的术语来描述这些:
- 这里有另一个元素;
- 出问题了;
- 没有更多元素了.
IObserver<T> | Iemnumerator<T> |
|---|---|
| OnNext | MoveNext => ture |
| OnError | MoveNext => Exception |
| OnComplete | MoveNext => false |
消费 IEnumerable<T> 或 IObservable<T> 时只可能发生三件事.
唯一的区别是消费者发现这一点的方式.
对于 IEnumerable<T>, 每次调用 MoveNext 都是这三种情况中之一. 而对于
IObservable<T> 源,
它会通过调用观察者的相应成员方法之一来通知这三种可能的结果.
Rx序列的基本规则
上面列出的三种结果中的两种是终止性的. 遍历一个 IEnumerable<T>,
并且它抛出了一个异常, foreach 循环将终止.
C#编译器理解如果 MoveNext 抛出异常, IEnumerator<T> 现在就完成了,
所以它会释放该枚举器, 然后允许异常传播.
同样, 如果你到达了序列的末尾, 同样也完成了, 编译器理解这一点: foreach
循环的代码在 MoveNext 返回 false 时检测到这一点,
然后它会释放枚举器并继续执行循环后的代码.
这些规则太过明显, 以至于们遍历 IEnumerable<T> 序列时, 对之视而不见.
可能没想到的是, 完全相同的规则适用于 IObservable<T> 序列.
如果一个可观察源要么告诉观察者序列已经完成, 要么报告一个错误, 那么在任何一种情况下, 这都是源被允许对观察者做的最后一件事.
以下示例都违反规则:
public static void WrongOnError(IObserver<int> obs)
{
obs.OnNext(1);
obs.OnError(new ArgumentException("This isn't an argument!"));
obs.OnNext(2); // 违反规则! 已经报告了失败, 所以迭代必须停止
}
public static void WrongOnCompleted(IObserver<int> obs)
{
obs.OnNext
obs.OnCompleted();
obs.OnNext(2); // 违反规则! 已经说完成了, 所以迭代必须停止
}
public static void WrongOnErrorAndOnCompleted(IObserver<int> obs)
{
obs.OnNext(1);
obs.OnError(new ArgumentException("A connected series of statements was not supplied"));
// 此下一个调用违反规则, 因为已经报告了错误, 并且在报告错误后不允许再进行任何进一步的调用
obs.OnCompleted();
}
public static void WrongOnCompletedAndOnError(IObserver<int> obs)
{
obs.OnNext(1);
obs.OnCompleted();
// 此下一个调用违反规则, 因为已经说完成了
obs.OnError(new ArgumentException("Definite proposition not established"));
}
这些与已经知道的关于 IEnumerable<T> 的情况有相当直接的对应关系:
WrongOnError: 枚举器从MoveNext抛出异常, 它就完成了, 并且不能再调用MoveNext, 所以将不会从它那里获取任何更多的元素;WrongOnCompleted: 如果一个枚举器从MoveNext返回false, 它就完成了, 并且不能再调用MoveNext, 所以将不会再获取任何更多的元素;WrongOnErrorAndOnCompleted: 如果一个枚举器从MoveNext抛出异常, 这意味着它完成了, 并且不能再调用MoveNext, 这意味着没有机会通过返回false从MoveNext来表示完成;WrongOnCompletedAndOnError: 如果一个枚举器返回从MoveNext中返回false, 它就完成了, 不能再调用MoveNext, 这意味着没有机会也抛出一个异常.
因为 IObservable<T> 是基于推送的,
所以遵守所有这些规则的责任落在可观察源上. 对于 IEnumerable<T>,
它是基于拉取的, 所以使用 IEnumerator<T> 的代码(例如 foreach
循环)有责任遵守这些规则. 本质上是相同的规则.
还有一个额外的规则 IObserver<T>: 如果调用 OnNext, 必须等待它返回,
然后才能在同一个 IObserver<T> 上进行任何更多的方法调用.
以下代码违反了规则:
public static void EverythingEverywhereAllAtOnce(IEnumerable<int> obs)
{
Random r = new();
for (int i = 0; i < 10000; ++i)
{
int v = r.Next();
Task.Run(() => obs.OnNext(v)); // 违反规则!
}
}
这段代码调用 obs.OnNext 10,000次, 但它将这些调用作为单个任务在
ThreadPool 上运行.
ThreadPool 旨在能够并行执行工作, 这在这里是一个问题,
因为没有任何东西确保一个对 OnNext 的调用在下次调用开始之前完成.
违反了规则, 即必须等待每个对 OnNext 的调用返回,
然后才能在同一个观察者上调用 OnNext, OnError 或 OnComplete.
这假设调用者不会将同一个观察者订阅到多个不同的源. 如果这样做,
不能假设所有对其 OnNext 的调用都将遵守规则,
因为不同的源将无法知道它们正在与同一个观察者交谈.
这个规则是Rx.NET中内置的唯一一种背压形式: 由于规则禁止在先前对 OnNext
的调用仍在进行时调用 OnNext, 这使 IObserver<T>
能够限制项目到达的速率.
如果在处理完当前元素前, OnNext 不返回, 源有义务等待. 然而, 有一些问题.
一旦涉及调度程序, IObserver<T>
订阅的源可能只是将项目放在队列中并立即返回,
然后这些项目将在不同的线程上传递给实际的观察者.
在这些情况下, 由长时间不返回 OnNext 引起的"背压"
仅传播到从队列中拉取项目的代码.
可能可以使用某些Rx运算符(如 Buffer 或 Sample)来缓解这个问题,
但没有内置的机制用于跨线程传播背压.
其他平台上的一些Rx实现试图提供集成的解决方案; 在过去, 当Rx.NET开发社区研究这个问题时, 一些人认为这些解决方案有问题, 并且对解决方案没有共识.
所以对于Rx.NET, 如果需要控制源在难以跟上时减慢速度, 你将需要引入自己的某种机制.
即使在提供内置背压的Rx平台上, 也没有统一的答案: 如何使这个源更慢地提供事件?
如何(甚至是否)做到这一点将取决于源的性质. 所以定制明确措施是必要的.
即必须等待 OnNext 返回这个规则是很微妙的. 它并不那么明显, 因为没有与
IEnumerable<T> 类似的规则–只有当源将数据推送到应用程序中时,
才有机会违反这个规则.
事实上是这个情况并不罕见. 不光见于多线程程序, 单线程重入也会导致这个问题. 参见如下代码:
public class GoUntilStopped
{
private readonly IObserver<int> observer;
private bool running;
public GoUntilStopped(IObserver<int> observer)
{
this.observer = observer;
}
public void Go()
{
this.running = true;
for (int i = 0; this.running; ++i)
{
this.observer.OnNext(i);
}
}
public void Stop()
{
this.running = false;
this.observer.OnCompleted();
}
}
这个类接受一个 IObserver<int> 作为构造函数参数. 当调用它的 Go
方法时, 它会反复调用观察者的 OnNext, 直到调用 Stop 方法.
这里有个错误!
可以通过提供一个 IObserver<int> 实现来看看会发生什么:
public class MyObserver : IObserver<int>
{
private GoUntilStopped? runner;
public void Run()
{
this.runner = new(this);
Console.WriteLine("Starting...");
this.runner.Go();
Console.WriteLine("Finished");
}
public void OnCompleted()
{
Console.WriteLine("OnCompleted");
}
public void OnError(Exception error) { }
public void OnNext(int value)
{
Console.WriteLine($"OnNext {value}");
if (value > 3)
{
Console.WriteLine($"OnNext calling Stop");
this.runner?.Stop();
}
Console.WriteLine($"OnNext returning");
}
}
注意 OnNext 方法会查看它的输入, 如果它大于3, 它会告诉 GoUntilStopped
对象停止.
让看看输出:
Starting...
OnNext 0
OnNext returning
OnNext 1
OnNext returning
OnNext 2
OnNext returning
OnNext 3
OnNext returning
OnNext 4
OnNext calling Stop
OnCompleted
OnNext returning
Finished
问题就在接近结尾的地方. 这两行:
OnCompleted
OnNext returning
观察者的 OnCompleted 调用发生在对 OnNext 的调用返回之前!
不需要多个线程就能发生这种情况. 因为 OnNext
中的代码决定它是否想要继续接收事件, 并且当它想要停止时, 它立即调用
GoUntilStopped 对象的 Stop 方法.
这没有错. 观察者被允许在 OnNext 中对其他对象进行出站调用,
并且实际上观察者经常会检查传入的事件并决定它想要停止.
问题在于 GoUntilStopped.Stop 方法. 这个方法调用 OnCompleted,
但没有尝试确定对 OnNext 的调用是否正在进行.
这可是一个非常棘手的问题. 假设 GoUntilStopped 确实检测到对 OnNext
的调用正在进行.
那又怎样? 在多线程情况下, 可以通过使用 lock
或其他同步原语来确保对观察者的调用互斥, 但在这里这不起作用: 对 Stop
的调用发生在与调用 OnNext 相同的线程上. 在 Stop 被调用并且想要调用
OnCompleted 的时刻, 调用堆栈将看起来像这样:
GoUntilStopped.Go
MyObserver.OnNext
GoUntilStopped.Stop
GoUntilStopped.Stop 方法需要等待 OnNext 返回才能调用 OnCompleted.
但 OnNext 方法在 Stop 方法返回之前无法返回. 创建了一个死锁,
并且是在单线程代码中!
修复并不太难: 可以修改 Stop, 使其只设置 running 字段为 false,
然后将 OnComplete 调用移动到 Go 方法中, 在 for 循环之后.
但是更普遍的情况下, 这是一个很难修复的问题,
这是使用System.Reactive库而不是让每个人去尝试实现 IObservable<T> 和
IObserver<T> 最直接原因.
Rx有通用的机制来解决这种类型的问题. (将在查看调度时看到这些机制. )
如果使用Rx通过组合其内置运算符以声明方式进行, 永远不必考虑这些规则. 可以在接收事件的回调中依赖这些规则, Rx设计上遵守这些规则.
这样接收事件的代码会变的简单.
这些规则可以被被表述为一种语法. 例如, 考虑这个正则表达式:
(OnNext)*(OnError|OnComplete)
表达约束的最基本思想: 可以有任意数量的 OnNext 调用(零个调用也成),
它们按顺序发生, 然后是 OnError 或 OnComplete 中的一个,
但不能同时有这两个, 并且在这之后不能有任何其他调用.
最后: 序列可能是无限的. 对于 IEnumerable<T> 也是如此.
完全有可能一个枚举器每次 MoveNext 被调用时都返回 true, 一个
foreach 循环遍历它将永远不会到达末尾.
可能选择停止(通过 break 或 return),
或者一些不是由枚举器引发的异常可能导致循环终止, 但完全可以接受一个
IEnumerable<T> 在继续询问时产生项目.
IObservable<T> 也一样. 如果订阅一个可观察源,
并且在程序退出时还没有收到对 OnComplete 或 OnError 的调用, 完全合理.
再改进下正则描述:
(OnNext)*(OnError|OnComplete)?
甚至说, 可观察源可以什么都不做.
Rx提供了一个永远不做任何事情的操作符(Never): 如果调用
Observable.Never<int>(), 将返回一个 IObservable<int>,
它永远不会调用你的观察者的任何方法.
虽然看起来不太有用–逻辑上等同于一个 IEnumerable<T>, 其中枚举器的
MoveNext 方法永远不返回, 这似乎与崩溃没有明显区别😱.
Rx有点不同, 因为当模拟这种 没有项目永远出现 的行为时, 不需要阻塞一个线程来永远这样做.
可以只是决定永远不调用观察者的任何方法. 如前文中 Quiescent
示例中的表述: IObservable<T> 不是因为它的元素而存在,
而是因为对它元素的兴趣而存在😉.
模拟"无事发生"的情况是有用的.
编写了一些代码来检测意外的不活动(例如, 一个传感器停坏了), 想测试该代码,
测试可以使用一个 Never 源而不是一个真实的源.
订阅生命周期
还有一个关于观察者和可观察对象之间关系的更多方面需要理解: 订阅的生命周期.
前文提及, IObserver<T> 对 OnComplete 或 OnError
的调用表示序列的结束. 但是如果想更早地停止订阅呢?
之前提到过, Subscribe 方法返回一个 IDisposable, 能够取消订阅.
比如应用程序打开了某个数据窗口, 为了显示数据订阅了一个源, 并且只在窗口打开时对这个数据源感兴趣.
关闭该窗口时, 不再关心该源的事件, 希望停止继续接收.
可以通过调用 Subscribe 返回的 IDisposable 的 Dispose
方法来取消订阅.
一旦对 Dispose 的调用返回, 源将不再对相关观察者进行任何进一步的调用.
如果调用 Dispose 在观察者上, 那么一旦该调用返回,
可以确定该观察者将不再收到对其三个方法(OnNext, OnError 或
OnComplete)的任何进一步调用.
这看起来很清晰明了, 但它留下了一个灰色区域: 当调用 Dispose
但它尚未返回时会发生什么呢? 规则允许源在这种情况下继续发出事件!
因为本质上无法做更进一步的约束了: 在 Dispose 开始产生任何影响之前,
必然需要非零的时间, 所以在多线程世界中, 总是有可能在 Dispose
调用开始和它产生任何效果之间, 一个事件被传递. 在这种情况下,
唯一可以依赖的是, 如果你的 Dispose 调用发生在 OnNext 处理程序内部,
那么在 Dispose 开始之前, 源已经注意到 OnNext 调用正在进行, 所以在
Dispose 调用返回之前, 进一步的调用已经被阻止.
但是假设你的观察者在 OnNext 调用中间没有被占用,
以下任何一种情况都是合法的:
- 几乎立即停止对
IObserver<T>的调用, 即使在将底层进程停止需要相对较长时间的情况下, 观察者将永远不会收到OnCompleted或OnError; - 产生反映关闭过程的通知(包括在尝试干净关闭时出现错误时调用
OnError, 或者如果关闭顺利则调用OnCompleted); - 在
Dispose调用开始后的一段时间内继续产生一些更多的通知, 但在某个任意点切断它们, 可能会丢失重要信息, 比如在尝试关闭时发生的错误.
Rx倾向于第一种选择. 如果正在使用由System.Reactive库实现的
IObservable<T> (例如, 由LINQ运算符之一返回的), 它很可能具有这种特性.
这是为了避免观察者在其通知回调中尝试对其源进行操作时出现无法处理的情况.
重入很难处理, Rx通过确保在开始关闭订阅之前已经停止向观察者传递通知来避免这种特定形式的重入.
如果需要能够取消你正在观察的某个进程, 但需要能够观察到它在停止之前所做的一切, 那么不能使用取消订阅作为关闭机制!
一旦调用了 Dispose, IObservable<T> 就不再有义务告知任何进一步的信息.
确实出人意料, 毕竟 Subscribe 返回的 IDisposable 有时看起来那么自然.
但基本事实就是: 一旦启动了取消订阅, 就不能 依赖于 从该订阅中获取任何进一步的通知.
还是可能会收到一些–源被允许继续提供元素, 直到 Dispose 调用返回.
但不能依赖于它–源也被允许立即沉默自己, 并且这是大多数Rx实现的源会做的事情.
一个微妙的后果是, 如果一个可观察源在订阅者取消订阅后报告错误, 该错误可能会丢失.
源可能会在其观察者上调用 OnError,
但是如果那是一个由Rx提供的与已经被处置的订阅相关的包装器,
它只会忽略该异常.
所以最好将取消订阅视为本质上混乱的, 有点像中止一个线程: 可以做到, 但信息可能会丢失, 并且存在可能破坏正常异常处理的竞争条件.
简而言之, 如果取消订阅, 那么源没有义务告知事情何时停止.
订阅生命周期和组合
通常通过组合多个LINQ运算符来在Rx中表达处理需求. 这对订阅生命周期意味着什么呢?
例如, 考虑以下代码:
IObservable<int> source = GetSource();
IObservable<int> filtered = source.Where(i => i % 2 == 0);
IDisposable subscription = filtered.Subscribe(
i => Console.WriteLine(i),
error => Console.WriteLine($"OnError: {error}"),
() => Console.WriteLine("OnCompleted"));
正在对 Where 运算符返回的可观察对象调用 Subscribe.
当这样做时, 它将依次调用 GetSource 返回的 IObservable<int> (存储在
source 变量中)的 Subscribe.
所以实际上有一个订阅链. (只能访问 filtered.Subscribe 返回的
IDisposable, 但是返回该对象的对象将存储它在调用 source.Subscribe
时收到的 IDisposable. )
如果源自行结束(通过调用 OnCompleted 或 OnError), 这将级联通过链.
所以 source 将在 Where 运算符提供的 IObserver<int> 上调用
OnCompleted.
然后 Where 将在传递给 filtered.Subscribe 的 IObserver<int> 上调用
OnCompleted, 并且该 IObserver<int> 将有对传递的三个方法的引用,
所以它将调用完成处理程序.
所以你可以这样看, 源完成后, 它告诉 filtered 它已经完成, filtered
调用完成处理程序. (实际上, 这是一个非常轻微的简化, 因为源实际上不是在和
filtered 交谈; 它实际上是在和 filtered 提供的 IObserver<int> 交谈.
这个区别在你有多个订阅同时对同一个可观察对象链处于活动状态时很重要.
但在这种情况下, 更简单的描述方式就足够了, 即使它不是绝对精确的. )
简而言之, 完成事件从源向上冒泡, 通过所有运算符, 到达处理程序.
如果通过调用 subscription.Dispose() 提前取消订阅呢?
在那种情况下, 情况正好相反. filtered.Subscribe
返回的订阅是第一个知道正在取消订阅的, 但它将然后调用 source.Subscribe
返回的对象的 Dispose.
无论哪种方式, 从源到观察者的一切, 包括链中的任何运算符, 都将被关闭.
第三章: 创建可观察序列
在上一章中, 了解了两个基本的Rx接口 IObservable<T> 和 IObserver<T>.
还学习了如何通过实现 IObserver<T> 以及使用 System.Reactive
提供的实现来接收事件.
在本章中, 将学习如何创建 IObservable<T> ,
以表示应用程序中感兴趣的源事件.
将首先直接实现 IObservable<T>. 这样做相对不常见, 因此将接着研究各种让
System.Reactive 提供实现的方法, 它可完成大部分工作.
最基础的 IObservable<T> 实现
以下是一个 IObservable<int> 的实现, 它会生成一个数字序列:
public class MySequenceOfNumbers : IObservable<int>
{
public IDisposable Subscribe(IObserver<int> observer)
{
observer.OnNext(1);
observer.OnNext(2);
observer.OnNext(3);
observer.OnCompleted();
return System.Reactive.Disposables.Disposable.Empty; // 方便的空操作IDisposable
}
}
可以通过构造其实例并订阅它来进行测试:
var numbers = new MySequenceOfNumbers();
numbers.Subscribe(
number => Console.WriteLine($"Received value: {number}"),
() => Console.WriteLine("Sequence terminated"));
这将产生以下输出:
Received value 1
Received value 2
Received value 3
Sequence terminated
MySequenceOfNumbers 是 IObservable<int> 的正确实现, 但过于简单,
不甚太实用.
通常在有感兴趣的事件时使用Rx, 但以上示例都不是响应式的, 功能是立即生成一组固定的数字.
此外, 该实现是阻塞的, 在生成所有值之前, 甚至不会从 Subscribe 返回.
这个示例说明了源如何向订阅者提供事件的基本原理,
如果单纯表示一个预定的数字序列, 应该直接使用 IEnumerable<T> , 如
List<T> 或数组.
在Rx中表示文件系统事件
接下来是一个更实际的例子. FileSystemWatcher 的包装器,
将文件系统更改通知表示为 IObservable<FileSystemEventArgs>.
这不是Rx FileSystemWatcher 包装器的最佳设计.
该监视器为几种不同类型的更改提供事件, 其中一个是改名事件 Renamed,
它将详细信息作为 RenamedEventArgs 提供.
这个类派生自 FileSystemEventArgs, 所有内容合并到一个事件流中是可行的,
但对于想要访问重命名事件详细信息的应用程序来说, 并不不方便.
一个更严重的设计问题是它无法报告来自 FileSystemWatcher.Error
的多个事件.
这类错误大多是暂时的并且可以恢复, 通常用户希望应用程序继续运行,
但由于这个实现选择用单个 IObservable<T> 表示所有内容,
它只能通过调用观察者的 OnError 来报告错误, 所以整个流程的会停下来.
使用Rx的 Retry 操作符可以解决这个问题, 在错误后自动重新订阅,
但最好还是提供一个单独的 IObservable<ErrorEventArgs>,
以便以非终止的方式报告错误.
这种额外的复杂性并不总是必要的. 简单的设计意味着它适用于某些应用程序. 软件设计中, 也确实没有万能的方法.
// 将文件系统更改表示为Rx可观察序列.
// 注意: 这是一个为了说明目的而过度简化的示例.
// 它不能高效地处理多个订阅者, 它不使用IScheduler, 并且在第一个错误后立即停止.
public class RxFsEvents : IObservable<FileSystemEventArgs>
{
private readonly string folder;
public RxFsEvents(string folder)
{
this.folder = folder;
}
public IDisposable Subscribe(IObserver<FileSystemEventArgs> observer)
{
// 如果有多个订阅者, 效率低下, 每个订阅都会产生新的实例
FileSystemWatcher watcher = new(this.folder);
// FileSystemWatcher的文档没有说明它在哪个线程上引发事件(除非使用它的SynchronizationObject,
// 它与Windows Forms集成良好, 但在这里使用不方便), 也没有承诺在处理完一个事件之前不会传递下一个事件. Mac,
// Windows和Linux的实现都有很大不同, 所以依赖文档中未保证的任何东西都是不明智的. (碰巧的是, .NET 7上的Win32实现
// 似乎确实会等待每个事件处理程序返回后再传递下一个事件, 所以现在可能可以忽略这个问题. 在Windows上. 实际上, Linux
// 实现为这项工作专门分配了一个线程, 但源代码中有一条注释说这可能应该改变, 这是只依赖文档行为的另一个原因. )
// 所以确保遵守IObserver<T>规则是问题.
// 首先, 需要确保一次只对观察者进行一次调用. 一个更实际的示例将使用Rx的IScheduler, 但由于还没有解释它们是什么,
// 将只使用这个对象进行锁定.
object sync = new();
// 更微妙的是, FileSystemWatcher文档没有明确说明在它报告错误后是否可能继续收到一些更改事件. 由于没有关于线程的承诺,
// 可能存在竞争条件, 导致在FileSystemWatcher报告错误后尝试处理它的事件. 所以需要记住是否已经调用了OnError,
// 以确保在那种情况下不违反IObserver<T>规则.
bool onErrorAlreadyCalled = false;
void SendToObserver(object _, FileSystemEventArgs e)
{
lock (sync)
{
if (!onErrorAlreadyCalled)
{
observer.OnNext(e);
}
}
}
watcher.Created += SendToObserver;
watcher.Changed += SendToObserver;
watcher.Renamed += SendToObserver;
watcher.Deleted += SendToObserver;
watcher.Error += (_, e) =>
{
lock (sync)
{
// FileSystemWatcher可能报告多个错误, 但向IObservable<T>报告一个后就结束了, 除非retry.
if (!onErrorAlreadyCalled)
{
observer.OnError(e.GetException());
onErrorAlreadyCalled = true;
watcher.Dispose();
}
}
};
watcher.EnableRaisingEvents = true;
return watcher;
}
}
以上 IObservable<T> 的实现严格遵守了 IObserver<T> 的规则, 主动加锁,
保证竞争条件下的正确性.
任何订阅这个 RxFsEvents 的 IObserver<FileSystemEventArgs>
都不必担心并发问题, 因为它可以依赖 IObserver<T> 规则,
该规则保证它一次只需要处理一件事情.
如果不在源中强制实施这些规则, 的 RxFsEvents 类无疑会更简单,
但处理重叠事件的所有复杂性都会扩散到处理事件的代码中.
并发的影响被控制在一定范围内时, 处理并发已经挺复杂了.
一旦它开始扩散到多个类中, 就更加难以推理和追溯. Rx的 IObserver<T>
规则就是为了防止这种情况的发生.
规则是Rx的一个重要特性. 这些规则使观察者侧的处理变得简单. 随着事件源或事件处理的复杂性增加, 这一点变得越来越重要.
这段代码有几个问题(除了已经提到的API设计问题). 首先, 当 IObservable<T>
实现生成模拟现实异步活动(如文件系统更改)的事件时,
应用程序通常希望有办法控制通知到达的线程.
最典型的, UI框架通常有线程亲和性(thread affinity)要求. 需要在特定线程上才能更新用户界面.
Rx提供了将通知重定向到不同调度程序的机制, 以解决这个问题,
但通常为这样的观察者提供一个 IScheduler, 并通过它传递通知.
将在后面的章节中讨论调度程序.
其次, 这个实现不能高效地处理多个订阅者. 多次调用
IObservable<T>.Subscribe, 每次都会创建一个新的 FileSystemWatcher.
这很常见. 对于监视器实例 fs, 想要以不同方式处理不同事件.
订阅者可以使用 Where 操作符来生成需要的 IObservable<T>,
以达到拆分事件的目的:
IObservable<FileSystemEventArgs> configChanges =
fs.Where(e => Path.GetExtension(e.Name) == ".config");
IObservable<FileSystemEventArgs> deletions =
fs.Where(e => e.ChangeType == WatcherChangeTypes.Deleted);
当对 Where 操作符返回的 IObservable<T> 调用 Subscribe 时,
它会对其输入调用 Subscribe.
所以在这种情况下, 如果对 configChanges 和 deletions 都调用
Subscribe, 将导致对 fs 调用两次 Subscribe.
所以如果 fs 是上面的 RxFsEvents 类的实例, 每个都会构造自己的
FileSystemEventWatcher, 这是低效的.
Rx提供了几种处理这个问题的方法. 甚至有专门的操作符,
用于获取一个不支持多个订阅者的 IObservable<T>,
并将其包装在一个适配器中:
IObservable<FileSystemEventArgs> fs =
new RxFsEvents(@"c:\temp")
.Publish()
.RefCount();
这有点超前了. (这些操作符在第十五章:发布操作符章中有描述. )如果想构建一个本质上支持多订阅者的类型, 做的就是跟踪所有订阅者, 并在循环中通知每个订阅者.
这是文件系统监视器的修改版本:
public class RxFsEventsMultiSubscriber : IObservable<FileSystemEventArgs>
{
private readonly object sync = new();
private readonly List<Subscription> subscribers = new();
private readonly FileSystemWatcher watcher;
public RxFsEventsMultiSubscriber(string folder)
{
this.watcher = new FileSystemWatcher(folder);
watcher.Created += SendEventToObservers;
watcher.Changed += SendEventToObservers;
watcher.Renamed += SendEventToObservers;
watcher.Deleted += SendEventToObservers;
watcher.Error += SendErrorToObservers;
}
public IDisposable Subscribe(IObserver<FileSystemEventArgs> observer)
{
Subscription sub = new(this, observer);
lock (this.sync)
{
this.subscribers.Add(sub);
if (this.subscribers.Count == 1)
{
// 之前没有订阅者, 但现在有了一个, 所以需要启动FileSystemWatcher.
watcher.EnableRaisingEvents = true;
}
}
return sub;
}
private void Unsubscribe(Subscription sub)
{
lock (this.sync)
{
this.subscribers.Remove(sub);
if (this.subscribers.Count == 0)
{
watcher.EnableRaisingEvents = false;
}
}
}
void SendEventToObservers(object _, FileSystemEventArgs e)
{
lock (this.sync)
{
foreach (var subscription in this.subscribers)
{
subscription.Observer.OnNext(e);
}
}
}
void SendErrorToObservers(object _, ErrorEventArgs e)
{
Exception x = e.GetException();
lock (this.sync)
{
foreach (var subscription in this.subscribers)
{
subscription.Observer.OnError(x);
}
this.subscribers.Clear();
}
}
private class Subscription : IDisposable
{
private RxFsEventsMultiSubscriber? parent;
public Subscription(
RxFsEventsMultiSubscriber rxFsEventsMultiSubscriber,
IObserver<FileSystemEventArgs> observer)
{
this.parent = rxFsEventsMultiSubscriber;
this.Observer = observer;
}
public IObserver<FileSystemEventArgs> Observer { get; }
public void Dispose()
{
this.parent?.Unsubscribe(this);
this.parent = null;
}
}
}
无论 Subscribe 被调用多少次, 这个类都只创建一个 FileSystemWatcher
实例. 引入了一个嵌套类来提供 Subscribe 返回的 IDisposable.
在本章的第一个 IObservable<T> 实现中不需要这样做,
因为它在返回之前已经完成了序列, 所以它能够方便地返回Rx提供的
Disposable.Empty 属性.
没有什么资源要释放的情况下, 为了符合函数签名而返回 Disposable.Empty
是很方便的.
在第一个 FileSystemWatcher 包装器 RxFsEvents 中, 返回
FileSystemWatcher 以供 Dispose 之用是非常合适的.
FileSystemWatcher.Dispose 会关闭监视器,每个订阅者都有自己的
FileSystemWatcher.
但新版 FileSystemWatcher 支持多个观察者, 当一个观察者取消订阅时,
需要额外做些工作.
当从 Subscribe 返回的 Subscription 实例被释放时,
它会从订阅者列表中删除自己, 确保不再接收任何通知.
如果没有更多的订阅者, 它还会将 FileSystemWatcher 的
EnableRaisingEvents 设置为 false, 确保如果现在不需要通知,
这个源不会做多余的工作.
这个示例比第一个更实用. 是一个可能随时发生事件的源(这正是Rx非常适合的那种事情), 并且它现在可以智能地处理多个订阅者.
以上我们完成了一个典型 IObservable<T> 的 低层实现,
这段代码甚至不需要引用 System.Reactive 包, 因为它引用的唯一Rx类型是
IObservable<T> 和 IObserver<T>, 这两个类型都内置于.NET运行时库中.
在实际使用中, 通常依赖 System.Reactive 中的各类运算符组合,
可以节省大量工作, 更重要的保障正确.
如只关心 Changed 事件. 可以这样写:
FileSystemWatcher watcher = new(@"c:\temp");
IObservable<FileSystemEventArgs> changes = Observable
.FromEventPattern<FileSystemEventArgs>(watcher, nameof(watcher.Changed))
.Select(ep => ep.EventArgs);
watcher.EnableRaisingEvents = true;
使用了 System.Reactive 库的 Observable 类中的 FromEventPattern,
它可以用于从任何.NET事件构建 IObservable<T>, 在这种模式中,
事件处理程序接受两个参数: 一个 object 类型的发送者, 然后是一些派生自
EventArgs 的类型, 包含关于事件的信息.
这个实现不如前面的示例灵活. 只报告其中一个事件,
并且必须手动启动(或者停止) FileSystemWatcher. 但对于某些应用程序来说,
这已经足够. 而且这需要编写的代码少得多.
如果目标是编写一个适用于许多不同场景的功能齐全的 FileSystemWatcher
包装器, 那么编写一个专门的 IObservable<T> 实现可能是值得的.
可以很容易地扩展最后一个示例来监视所有事件. 只需要为每个事件使用一次
FromEventPattern, 然后使用 Observable.Merge
将四个结果可观察对象合并为一个.
从完全自定义实现中获得的唯一真正好处是,
可以根据当前是否有观察者自动启动和停止 FileSystemWatcher.
只需要将一些事件表示为 IObservable<T>, 以便在应用程序中使用它们,
显然简单的方法更有吸引力.
在实践中, 用 System.Reactive 的函数来是现实
IObservable<T> 的情况是绝大多数. 即使要做精细控制(示例中自动启动和关闭
FileSystemWatcher), 总可以找到一组操作符来用.
以下代码使用了 System.Reactive 中的方法来返回一个
IObservable<FileSystemEventArgs>, 具有与上面完全自定义的手写
RxFsEventsMultiSubscriber 相同的功能, 但代码量要少得多.
IObservable<FileSystemEventArgs> ObserveFileSystem(string folder)
{
return
// Observable.Defer使能够避免在有订阅者之前做任何工作.
Observable.Defer(() =>
{
FileSystemWatcher fsw = new(folder);
fsw.EnableRaisingEvents = true;
return Observable.Return(fsw);
})
// 当前面的部分发出FileSystemWatcher(当有人首次订阅时会发生这种情况)时,
// 想要将所有事件包装为IObservable<T>, 为此将使用投影. 为了避免最终得到一个
// IObservable<IObservable<FileSystemEventArgs>>, 使用SelectMany, 它有效地将其扁平化一层.
.SelectMany(fsw =>
Observable.Merge(new[]
{
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
h => fsw.Created += h, h => fsw.Created -= h),
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
h => fsw.Changed += h, h => fsw.Changed -= h),
Observable.FromEventPattern<RenamedEventHandler, FileSystemEventArgs>(
h => fsw.Renamed += h, h => fsw.Renamed -= h),
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
h => fsw.Deleted += h, h => fsw.Deleted -= h)
})
// FromEventPattern提供发送者和事件参数. 只提取后者.
.Select(ep => ep.EventArgs)
// Finally确保一旦没有订阅者, 监视器就会关闭.
.Finally(() => fsw.Dispose()))
// Publish和RefCount的这种组合意味着多个订阅者将共享一个FileSystemWatcher,
// 但如果所有订阅者都取消订阅, 它将关闭.
.Publish()
.RefCount();
}
大多数之前都没有讲到. 为了让这个示例更有意义, 接下来深入了解
System.Reactive 已经实现的 IObservable<T> 的众多方法.
简单工厂方法
创建可观察序列的方法数量众多, 本章把它们分类. 的第一类方法创建的
IObservable<T> 序列最多产生一个结果.
Observable.Return
最简单的工厂方法之一是
Observable.Return<T>(T value), 在上一章的Quiescent示例中已经见过. 这个方法接受一个类型为T的值, 并返回一个IObservable<T>, 它将产生这个单个值, 然后完成.这是将一个值包装在
IObservable<T>中; 它在概念上类似于编写new T[] { value }, 因为它是一个只包含一个元素的序列.也可以将其视为Rx中与
Task.FromResult等效的方法, 当你有一个类型为T的值, 并且需要将其传递给需要Task<T>.IObservable<string> singleValue = Observable.Return<string>("Value");为了清晰指定了类型参数, 但这不是必需的, 因为编译器可以从提供的参数推断类型:
IObservable<string> singleValue = Observable.Return("Value");Return产生一个冷可观察对象: 每个订阅者在订阅时将立即收到该值.Observable.Empty
空序列是有用的. .NET的
Enumerable.Empty<T>()为IEnumerable<T>提供了这种功能, Rx也有一个直接等效的方法, 形式为Observable.Empty<T>(), 它返回一个空的IObservable<T>. 需要提供类型参数, 因为没有值可供编译器推断类型.IObservable<string> empty = Observable.Empty<string>();一个空序列是立即对任何订阅者调用
OnCompleted的序列.与
IEnumerable<T>相比, 这就像是Rx中的空列表, 还有另一种看待它的方式.Rx是一种强大的异步过程建模方式, 所以你可以认为这类似于一个立即完成而不产生任何结果的任务, 因此它在概念上类似于
Task.CompletedTask.这个类比不像
Observable.Return和Task.FromResult之间的类比那么紧密, 因为在那种情况下, 是在比较IObservable<T>和Task<T>, 而在这里是在比较IObservable<T>和Task, 任务不产生任何结果而完成的唯一方式是如果使用非泛型版本的Task.Observable.Never
Observable.Never<T>()方法返回一个序列, 与Empty类似, 它不产生任何值, 但与Empty不同的是, 它永远不会结束.这意味着它永远不会对订阅者调用任何方法(既不调用
OnNext,OnCompleted, 也不调用OnError). 而Observable.Empty<T>()立即完成,Observable.Never<T>具有无限持续时间.IObservable<string> never = Observable.Never<string>();用处不太明显. 在上一章给出了一个可能的用途: 可以在测试中使用它来模拟一个不产生任何值的源, 也许是为了使测试能够验证超时逻辑.
它也可以用于使用可观察对象表示基于时间的信息的地方. 有时实际上并不关心从可观察对象中出现什么;
可能只关心某事(任何事)何时发生.
在上一章看到了这种 "仅用于计时目的的可观察序列" 概念的一个示例, 尽管在那个特定场景中
Never没有意义.Quiescent示例使用了Buffer操作符, 它在两个可观察序列上工作: 第一个包含感兴趣的项目, 第二个仅用于确定如何将第一个切成块.Buffer不处理第二个可观察对象产生的值: 它只关注值何时出现, 每次第二个可观察对象产生一个值时完成前一个块.作为一个出于计时目的使用
Never的示例, 假设正在使用一些基于Rx的库, 该库提供了一个超时机制, 当某个超时发生时, 操作将被取消, 并且超时本身被建模为一个可观察序列.如果由于某种原因不想要超时, 并且只想无限期等待, 可以指定一个超时为
Observable.Never.Observable.Throw
Observable.Throw<T>(Exception)返回一个序列, 该序列立即向任何订阅者报告错误. 与Empty和Never一样, 不向这个方法提供值(只提供一个异常), 所以需要提供一个类型参数, 以便它知道在返回的IObservable<T>中使用什么T. 它实际上永远不会产生T, 但不能有一个IObservable<T>的实例而不为T选择某个特定类型.IObservable<string> throws = Observable.Throw<string>(new Exception());Observable.Create
Create工厂方法比其他创建方法更强大, 可以用于创建任何类型的序列. 可以使用Observable.Create实现前面四个方法中的任何一个.该方法签名本身起初看复杂, 但一旦习惯了就会觉得很自然.
// 从指定的Subscribe方法实现创建一个可观察序列. public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe) {...} public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, Action> subscribe) {...}你向它提供一个委托, 该委托将在每次订阅时执行. 委托将被传递一个
IObserver<T>.从逻辑上讲, 这代表传递给
Subscribe方法的观察者, 尽管在实践中Rx出于各种原因会对它进行包装.可以根据需要调用
OnNext/OnError/OnCompleted方法. 这是少数几个将直接使用IObserver<T>接口的场景. 下面是产生三个元素的示例:private IObservable<int> SomeNumbers() { return Observable.Create<int>( (IObserver<int> observer) => { observer.OnNext(1); observer.OnNext(2); observer.OnNext(3); observer.OnCompleted(); return Disposable.Empty; }); }委托必须返回一个
IDisposable或一个Action以支持取消订阅.当订阅者为了取消订阅而释放订阅时, Rx将调用返回的
IDisposable的Dispose()方法, 或者在返回一个Action的情况下, 它将调用那个Action.这个示例让人想起本章开头的
MySequenceOfNumbers示例, 它立即产生一些固定值.在这种情况下的主要区别是Rx添加了一些包装器, 可以处理诸如重入之类的复杂情况.
Rx有时会自动推迟工作以防止死锁, 所以使用这个方法返回的
IObservable<int>的代码可能会看到对Subscribe的调用在上面代码中的回调运行之前返回, 在这种情况下, 有可能在OnNext处理程序中取消订阅.下面的序列图展示了这在实践中是如何发生的. 假设
SomeNumbers返回的IObservable<int>已经被Rx包装, 以确保订阅发生在某个不同的执行上下文中.通常会通过使用合适的调度器来确定上下文. (
SubscribeOn操作符创建这样的包装器.) 可能使用TaskPoolScheduler, 以确保订阅发生在某个任务池线程上.所以当应用程序代码调用
Subscribe时, 包装的IObservable<int>不会立即订阅底层可观察对象.相反, 它将一个工作项排队到调度程序以执行该操作, 然后立即返回而不等待该工作运行.
这就是订阅者如何在
Observable.Create调用回调之前拥有代表订阅的IDisposable. 该图显示订阅者然后将其提供给观察者.该图显示调度程序在此之后调用底层可观察对象的
Subscribe, 这意味着传递给Observable.Create<int>的回调现在将运行.回调调用
OnNext, 但它没有被传递真实的观察者: 相反, 它被传递另一个Rx生成的包装器.包装器最初直接将调用转发到真实的观察者, 但图显示当真实的观察者(在最右边)收到它的第二个调用(
OnNext(2))时, 它通过调用订阅RxIObservable包装器时返回的IDisposable的Dispose方法取消订阅.这里的两个包装器,
IObservable和IObserver包装器, 是连接的, 所以当从IObservable包装器取消订阅时, 它会告诉IObserver包装器订阅正在关闭.这意味着当
Observable.Create<int>回调在IObserver包装器上调用OnNext(3)时, 那个包装器不会将其转发给真实的观察者, 因为它知道那个观察者已经取消订阅了. (出于同样的原因, 它也不会转发OnCompleted. )你可能想知道返回给
Observable.Create的IDisposable怎么可能做任何有用的事情.它是回调的返回值, 所以只能在回调完成时将其返回给Rx.
难道在返回时总是已经完成了工作, 意味着没有什么可取消的吗?
不一定, 可能会启动一些在返回后继续运行的工作.
下一个示例就是这样做的, 这意味着它返回的取消订阅操作能够做一些有用的事情: 它设置一个取消令牌, 该令牌被生成可观察对象输出的循环所观察. (这个示例返回一个回调而不是一个
IDisposable,Observable.Create提供了让你可以选择的重载. 在这种情况下, Rx将在订阅提前终止时调用回调. )IObservable<char> KeyPresses() => Observable.Create<char>(observer => { CancellationTokenSource cts = new(); Task.Run(() => { while (!cts.IsCancellationRequested) { ConsoleKeyInfo ki = Console.ReadKey(); observer.OnNext(ki.KeyChar); } }); return () => cts.Cancel(); });这说明了取消不一定会立即生效.
Console.ReadKeyAPI没有提供接受CancellationToken的重载, 所以这个可观察对象在用户下次按下键导致ReadKey返回之前无法检测到取消请求.考虑到在等待
ReadKey返回时可能已经请求了取消, 你可能认为应该在ReadKey返回后且在调用OnNext之前检查这一点. 实际上, 如果不这样做也没关系.Rx有一个规则, 即一个可观察源在对该观察者的订阅的
Dispose调用返回后, 不得再调用该观察者.为了执行该规则, 如果传递给
Observable.Create的回调在请求取消订阅后继续调用其IObserver<T>上的方法, Rx将忽略该调用.这就是为什么它传递给你的
IObserver<T>是一个包装器的原因之一: 它可以在调用传递到底层观察者之前拦截这些调用. 然而, 这种便利性意味着有两件重要的事情需要注意:- 如果你忽略取消订阅的尝试并继续工作以产生项目, 你只是在浪费时间, 因为没有东西会接收那些项目.
- 如果你调用
OnError, 可能没有东西在监听, 并且错误将被完全忽略.
Create有一些重载旨在支持async方法. 下一个方法利用这一点, 能够使用异步ReadLineAsync方法将文件中的文本行作为可观察源呈现.IObservable<string> ReadFileLines(string path) => Observable.Create<string>(async (observer, cancellationToken) => { using (StreamReader reader = File.OpenText(path)) { while (cancellationToken.IsCancellationRequested) { string? line = await reader.ReadLineAsync(cancellationToken).ConfigureAwait(false); if (line is null) { break; } observer.OnNext(line); } observer.OnCompleted(); } });从存储设备读取数据通常不会立即发生(除非它恰好已经在文件系统缓存中), 所以这个源将尽快从存储中读取数据并提供数据.
注意, 因为这是一个
async方法, 它通常会在完成之前返回给它的调用者. (第一个实际需要等待的await返回, 并且方法的其余部分在工作完成时通过回调运行. )这意味着订阅者通常会在这个方法完成之前拥有代表他们订阅的
IDisposable, 所以在这里使用不同的机制来处理取消订阅.Create的这个特定重载不仅将一个IObserver<T>传递给它的回调, 还传递一个CancellationToken, 当取消订阅发生时, 它将使用该令牌请求取消.文件IO可能会遇到错误. 正在寻找的文件可能不存在, 或者由于安全限制或其他应用程序正在使用它, 可能无法打开它. 文件可能在远程存储服务器上, 可能会失去网络连接.
出于这个原因, 必须期望从这样的代码中出现异常. 这个示例没有做任何事情来检测异常, 但实际上
ReadFileLines方法返回的IObservable<string>将报告发生的任何异常.这是因为
Create方法将捕获从回调中出现的任何异常, 并使用OnError报告它. (如果代码已经在观察者上调用了OnComplete, Rx不会调用OnError, 因为这将违反规则. 相反, 它将默默地丢弃异常, 所以最好在调用OnCompleted之后不要尝试做任何工作. )这种自动异常传递是
Create工厂方法是实现自定义可观察序列的首选方法的另一个原因. 它几乎总是比创建实现IObservable<T>接口的自定义类型更好的选择.这不仅是因为它为你节省了一些时间. 也是因为Rx处理了你可能没有想到的复杂问题, 例如通知的线程安全性和订阅的处置.
Create方法涉及惰性求值, 这是Rx非常重要的一部分.它为其他强大功能打开了大门, 例如调度和序列组合, 将在后面看到这些功能.
委托只会在订阅发生时被调用. 所以在
ReadFileLines示例中, 它不会尝试打开文件, 直到你订阅返回的IObservable<string>.如果你多次订阅, 它将每次执行回调. (所以如果文件已经更改, 你可以通过再次调用
Subscribe获取最新内容. )作为一个练习, 尝试使用
Create方法自己构建Empty,Return,Never和Throw扩展方法.你在继续阅读之前完成了最后一步, 对吧? 因为现在你可以将你的版本与使用
Observable.Create重新创建的Empty,Return,Never和Throw示例进行比较:public static IObservable<T> Empty<T>() { return Observable.Create<T>(o => { o.OnCompleted(); return Disposable.Empty; }); } public static IObservable<T> Return<T>(T value) { return Observable.Create<T>(o => { o.OnNext(value); o.OnCompleted(); return Disposable.Empty; }); } public static IObservable<T> Never<T>() { return Observable.Create<T>(o => { return Disposable.Empty; }); } public static IObservable<T> Throws<T>(Exception exception) { return Observable.Create<T>(o => { o.OnError(exception); return Disposable.Empty; }); }你可以看到, 如果愿意,
Observable.Create提供了构建自己的工厂方法的能力.Observable.Defer
Observable.Create的一个非常有用的方面是, 它提供了一个放置代码的地方, 这些代码应该只在订阅发生时运行.通常, 库会提供
IObservable<T>属性, 但不一定所有应用程序都会使用它们, 所以将涉及的工作推迟到你确定真正需要它时是很有用的.这种延迟初始化是
Observable.Create工作方式的固有特性, 但是如果源的性质意味着Observable.Create不是一个很好的选择呢?在那种情况下, 如何执行延迟初始化呢? Rx提供了
Observable.Defer用于此目的.已经使用过一次
Defer.ObserveFileSystem方法返回一个IObservable<FileSystemEventArgs>, 报告文件夹中的更改.它不是
Observable.Create的合适候选者, 因为它将想要的所有通知作为.NET事件提供, 所以使用Rx的事件适配功能是有意义的.但是仍然希望将
FileSystemWatcher的创建推迟到订阅时刻, 这就是为什么那个示例使用了Observable.Defer.Observable.Defer接受一个返回IObservable<T>的回调,Defer用一个IObservable<T>包装这个回调, 该IObservable<T>在订阅时调用那个回调.为了展示效果, 将首先展示一个不使用
Defer的示例:static IObservable<int> WithoutDeferal() { Console.WriteLine("Doing some startup work..."); return Observable.Range(1, 3); } Console.WriteLine("Calling factory method"); IObservable<int> s = WithoutDeferal(); Console.WriteLine("First subscription"); s.Subscribe(Console.WriteLine); Console.WriteLine("Second subscription"); s.Subscribe(Console.WriteLine);这将产生以下输出:
Calling factory method Doing some startup work... First subscription Second subscription如你所见,
"Doing some startup work..."消息在调用工厂方法时出现, 并且在订阅之前. 所以如果没有人订阅IObservable<int>返回的方法, 工作无论如何都会完成, 浪费时间和精力. 这是Defer版本:static IObservable<int> WithDeferal() { return Observable.Defer(() => { Console.WriteLine("Doing some startup work..."); return Observable.Range(1, 3); }); }如果使用与第一个示例类似的代码, 将看到以下输出:
Calling factory method First subscription Doing some startup work... Second subscription Doing some startup work...有两个重要的区别. 首先,
"Doing some startup work..."消息直到首次订阅才出现, 说明Defer做了想要的事情.然而, 注意消息现在出现了两次: 每次订阅它都会做这项工作. 如果想要这种延迟初始化但也希望只执行一次, 应该查看发布操作符章节中的操作符, 提供了各种方法, 使多个订阅者能够共享对底层源的单个订阅.
序列生成器
到目前为止看到的创建方法都很直接, 它们要么产生非常简单的序列(如单元素或空序列), 要么依赖代码确切告诉它们要产生什么. 现在将看看一些可以产生更长序列的方法.
Observable.Range
Observable.Range(int, int)返回一个IObservable<int>, 它产生一系列整数. 第一个整数是初始值, 第二个是要产生的值的数量. 这个示例将写入值'10'到'24', 然后完成.IObservable<int> range = Observable.Range(10, 15); range.Subscribe(Console.WriteLine, () => Console.WriteLine("Completed"));Observable.Generate
用
Observable.Create模拟Range工厂方法. 可能会这样做:// 不是最好的方法! IObservable<int> Range(int start, int count) => Observable.Create<int>(observer => { for (int i = 0; i < count; ++i) { observer.OnNext(start + i); } return Disposable.Empty; });这段代码能输出结果, 但它不能取消订阅. 一般情况下无伤大雅, Rx检测到取消订阅, 会忽略后续产生的值.
但是, 毕竟值产生了呀!, 在没有人监听的情况下继续生成数字是浪费CPU时间(浪费能源, 降低电池寿命和/甚至影响环境).
这有多糟糕取决于请求的范围有多长. 如果是个无限序列呢?
产生斐波那契数列或素数的
IObservable<BigInteger>计算成本巨大. 如何用Create编写它呢?这个情况下处理取消订阅的方法是必须的. 需要回调返回, 以便能够收到取消订阅的通知(或者可以提供一个
async方法, 但在这里似乎不太合适).此时
Observable.Generate效果更好: 简单版本接受以下参数:- 一个初始状态
- 一个定义序列何时终止的谓词
- 一个应用于当前状态以产生下一个状态的函数
- 一个将状态转换为期望输出的函数
public static IObservable<TResult> Generate<TState, TResult>( TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector)这展示了如何使用
Observable.Generate构造一个Range方法:// 示例代码仅用于说明 public static IObservable<int> Range(int start, int count) { int max = start + count; return Observable.Generate( start, value => value < max, value => value + 1, value => value); }Generate方法会反复回调, 直到condition回调说完成了, 或者观察者取消订阅. 可以简单地通过永远不说完成来定义一个无限序列:IObservable<BigInteger> Fibonacci() { return Observable.Generate( (v1: new BigInteger(1), v2: new BigInteger(1)), value => true, // 它永远不会结束! value => (value.v2, value.v1 + value.v2), value => value.v1); }
定时序列生成器
介绍过的大多数方法返回的序列都会立即产生所有的值. (唯一的例外是当调用
Observable.Create 并在准备好时产生值.) 实际上,
Rx能够按时间计划生成序列.
安排工作的操作符通过调度器来实现.
如果你不特别指定, 将选择一个默认的调度程序, 有时定时器机制很重要.
例如, 有一些定时器与UI框架集成, 在与鼠标点击和其他输入相同的线程上传递通知, 可能希望Rx的基于时间的操作符使用这些.
出于测试目的, 虚拟化时间很有用, 这样就可以在不一定要实时等待测试执行的情况下验证时间敏感代码中发生的事情.
调度器逻辑复杂, 超出了本章的范围, 但在后面关于调度和线程的章节中有详细介绍.
有三种产生定时事件的方法.
Observable.Interval
第一种是
Observable.Interval(TimeSpan), 根据选择的频率发布从0开始的增量值. 这个示例每250毫秒发布一个值.IObservable<long> interval = Observable.Interval(TimeSpan.FromMilliseconds(250)); interval.Subscribe( Console.WriteLine, () => Console.WriteLine("completed"));一旦订阅, 必须dispose订阅以停止序列, 因为
Interval返回一个无限序列.Rx假定用户有足够的耐心,
Interval返回的序列是IObservable<long>类型(long, 而不是int), 这意味着即使产生超过""微不足道"的21.475亿个事件(超过int.MaxValue), 也不会遇到问题.Observable.Timer
第二种产生基于恒定时间序列的工厂方法是
Observable.Timer. 它有几个重载. 最基本的一个像Observable.Interval一样只接受一个TimeSpan.但与
Observable.Interval不同的是,Observable.Timer将在经过指定的时间段后发布恰好一个值(数字0), 然后完成.var timer = Observable.Timer(TimeSpan.FromSeconds(1)); timer.Subscribe( Console.WriteLine, () => Console.WriteLine("completed"));输出:
completed或者可以为
dueTime参数提供一个DateTimeOffset. 这将在指定时间产生值0并完成.另一组重载添加了一个
TimeSpan, 用于指示产生后续值的周期. 这允许产生无限序列.它还展示了
Observable.Interval实际上只是Observable.Timer的一个特殊情况.Interval可以这样实现:public static IObservable<long> Interval(TimeSpan period) { return Observable.Timer(period, period); }虽然
Observable.Interval总是在产生第一个值之前等待给定的时间段, 但这个Observable.Timer重载能够选择何时开始序列.使用
Observable.Timer, 可以编写以下代码来立即开始一个间隔序列.Observable.Timer(TimeSpan.Zero, period);定时Observable.Generate
Observable.Generate有一个更复杂的重载, 提供一个函数, 为该函数指定下一个值的到期时间.public static IObservable<TResult> Generate<TState, TResult>( TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector)额外的
timeSelector参数让告诉Generate何时产生下一个项目. 可以使用这个来编写自己的Observable.Timer实现(如你已经看到的, 这反过来又使能够编写自己的Observable.Interval).public static IObservable<long> Timer(TimeSpan dueTime) { return Observable.Generate( 0l, i => i < 1, i => i + 1, i => i, i => dueTime); } public static IObservable<long> Timer(TimeSpan dueTime, TimeSpan period) { return Observable.Generate( 0l, i => true, i => i + 1, i => i, i => i == 0? dueTime : period); } public static IObservable<long> Interval(TimeSpan period) { return Observable.Generate( 0l, i => true, i => i + 1, i => i, i => period); }这展示了如何使用
Observable.Generate来产生无限序列. 作为一个练习, 留给读者使用Observable.Generate以可变的速率产生值.
可观察序列和状态
以 Observable.Generate 为例, 有些可观察序列需要维护状态.
对于 Generate 来说, 需要有明确的传入初始状态,
并提供一个回调在每次迭代时更新它.
其他操作符也维护内部状态: Timer 记住它的滴答计数,
此外还必须跟踪它上次引发事件的时间以及下一次到期时间.
接下来的章节中会看到许多其他操作符需要保有自己的状态.
进而引发一个有趣的问题: 如果一个进程关闭了怎么办? 如何保存操作符状态, 并在新进程中重新构建它?
Rx.NET本身并不具备这个功能: 所有这样的状态都完全保存在内存中, 没有预设的持计划方法.
这意味着如果正在处理长时间运行的操作, 需要开发者清楚如何重新启动,
并且不能依赖 System.Reactive.
有一组相关的基于Rx的库, Reaqtive. 提供了几乎与
System.Reactive 相同的操作符的实现,
但以一种可以收集当前状态并从先前保存的状态重新创建新订阅的形式.
这些库还包括一个名为Reaqtor的组件, 是一种托管技术, 可以管理自动检查点和崩溃后恢复, 通过使订阅持久和可靠, 使得支持非常长时间运行的Rx逻辑成为可能.
它还没有任何产品化形式, 所以需要做相当多的工作才能使用, 但如果需要一个可持久化的Rx版本, 请注这个选项.
将常见类型适配为 IObservable<T>
虽然现在已经看到了两种非常通用的产生任意序列的方法, Create 和
Generate, 但是如果已经有了其他形式的现有信息源, 想将其作为
IObservable<T> 提供, 该怎么办呢? Rx为常见的源类型提供了一些适配器.
从委托
Observable.Start方法允许将一个长时间运行的Func<T>或Action转换为一个单值可观察序列.该操作通过一个调度器来实现.
如果不显式传递一个调度程序, 将使用
DefaultScheduler, 通过线程池调用回调.如果使用的重载是一个
Func<T>, 那么返回类型将是IObservable<T>. 当函数返回其值时,IObservable<T>将立即向订阅者提供该值, 然后在提供值后立即完成.Observable.Start返回的IObservable<T>基于AsyncSubject, 所以如果你在回调完成后订阅它, 它将立即提供值然后完成. 如果使用接受Action的重载, 那么返回的序列将是IObservable<Unit>类型.Unit类型表示没有信息, 有点类似于void, 可以有一个Unit类型的实例.它在Rx中特别有用, 因为经常只关心某事何时发生, 并且除了时间之外可能没有任何信息. 在这些情况下, 经常使用
IObservable<Unit>, 以便即使在其中没有有意义的数据时也能产生确定的事件.(这个名字来自函数式编程世界, 在那里这种构造被大量使用. )在这种情况下,
Unit用于发布Action完成的确认, 因为Action不返回任何信息.Unit类型本身没有值, 它只是作为OnNext通知的一个空有效负载. 下面是使用这两个重载的示例.static void StartAction() { var start = Observable.Start(() => { Console.Write("Working away"); for (int i = 0; i < 10; i++) { Thread.Sleep(100); Console.Write("."); } }); start.Subscribe( unit => Console.WriteLine("Unit published"), () => Console.WriteLine("Action completed")); } static void StartFunc() { var start = Observable.Start(() => { Console.Write("Working away"); for (int i = 0; i < 10; i++) { Thread.Sleep(100); Console.Write("."); } return "Published value"; }); start.Subscribe( Console.WriteLine, () => Console.WriteLine("Action completed")); }注意
Observable.Start和Observable.Return之间的区别.Return要求预先提供值, 而Start立即返回一个可观察序列, 不需要值立即可用. (虽然Start不等待回调完成, 但它会立即调用它. 所以这不是惰性求值, 如果想提供一个仅在有人订阅源时才被调用的回调, 使用Defer.)Start返回的可观察对象可以被认为代表了与Task或Task<T>(取决于使用Action还是Func<T>重载)相同的基本思想.每个都表示可能需要一些时间才能最终完成的工作, 也许会产生一个结果. 所以如果想要那个基本思想, 但希望它被表示为一个
IObservable<T>而不是一个Task或Task<T>,Start是有用的.从事件
正如在本书前面讨论的, .NET有一个内置于其类型系统的事件模型.
这早于Rx(尤其是因为在.NET 2.0获得泛型之前Rx是不可行的), 所以类型支持事件但不支持Rx是很常见的.
为了能够与现有的事件模型集成, Rx提供了将事件转换为可观察序列的方法.
在前面的文件系统监视器示例中简要展示了这一点, 但让更详细地研究一下.
有几种不同的变体你可以使用. 这展示了最简洁的形式:
FileSystemWatcher watcher = new(@"c:\incoming"); IObservable<EventPattern<FileSystemEventArgs>> changeEvents = Observable .FromEventPattern<FileSystemEventArgs>(watcher, nameof(watcher.Changed));如果有一个提供事件的对象, 可以使用
FromEventPattern的这个重载, 传入对象和想用Rx使用的事件的名称. 虽然这是将事件适配到Rx世界的最简单方法, 但它有几个问题.首先, 为什么需要将事件名称作为字符串传递? 用字符串标识成员很容易出错.
编译器不会注意到第一个和第二个参数之间的不匹配(例如, 如果错误地传递了参数
(somethingElse, nameof(watcher.Changed))).不能只传递
watcher.Changed本身吗? 不幸的是, 不能, 这是在第一章中提到的问题的一个例子: .NET事件不是一等公民.不能像使用其他对象或值那样使用它们. 例如, 不能将事件作为参数传递给方法. 实际上, 可以对.NET事件做的唯一事情是附加和移除事件处理程序.
如果想让其他方法为选择的事件附加处理程序(例如, 在这里想让Rx处理事件), 那么唯一的方法是指定事件的名称, 以便方法(
FromEventPattern)可以然后使用反射来附加它自己的处理程序.这对于一些部署场景是一个问题. 在.NET中, 越来越常见的是在构建时做额外的工作来优化运行时行为, 而对反射的依赖可能会损害这些技术.
例如, 可能使用提前编译(AOT)机制, 而不是依赖即时编译(JIT).
.NET的Ready to Run(R2R)系统使你能够将针对特定CPU类型预编译的代码与普通IL一起包含, 避免了等待.NET将IL编译为可运行代码的时间.
这可以对启动时间产生重大影响. 在客户端应用程序中, 它可以解决应用程序首次启动时运行缓慢的问题.
在服务器端应用程序中, 特别是在代码可能经常从一个计算节点移动到另一个的环境中, 最小化冷启动成本也很重要.
也有一些场景中JIT编译甚至不是一个选项, 在这种情况下AOT编译不仅仅是一种优化: 它是代码能够运行的唯一方法.
反射的问题是它使构建工具难以确定在运行时将执行什么代码. 当他们检查对
FromEventPattern的这个调用时, 他们只会看到类型为object和string的参数.不明显的是, 这将导致在运行时对
FileSystemWatcher.Changed的add和remove方法进行反射驱动的调用.有一些属性可以用来提供提示, 但这些属性的效果有限. 有时构建工具将无法确定需要AOT编译哪些代码才能使这个方法在不依赖运行时JIT的情况下执行.
还有另一个相关的问题. .NET构建工具支持一个称为 修剪 的功能, 在其中他们删除未使用的代码.
System.Reactive.dll文件大小约为1.3MB, 但一个非常不寻常的应用程序才会使用该组件中每个类型的每个成员.Rx的基本使用可能只需要几十KB. 修剪的想法是确定实际使用的部分, 并生成一个只包含该代码的DLL副本.
这可以大大减少可执行文件运行所需部署的代码量.
这在客户端Blazor应用程序中尤其重要, 在那里.NET组件最终会被浏览器下载.
不得不下载整个1.3MB的组件可能会让你对使用它三思而后行.
但是如果修剪意味着基本使用只需要几十KB, 并且只有在你更广泛地使用该组件时大小才会增加, 那么使用一个在不修剪时会带来太大代价而不值得包含的组件就变得合理了.
但是与AOT编译一样, 修剪只有在工具能够确定使用了哪些代码时才能工作. 如果他们做不到这一点, 就不仅仅是退回到较慢的路径, 等待相关代码被JIT编译的问题.
如果代码被修剪, 它在运行时将不可用, 你的应用程序可能会因
MissingMethodException而崩溃.所以如果你使用这些技术中的任何一种, 基于反射的API可能会有问题. 幸运的是, 有一个替代方法. 可以使用一个接受两个委托的重载, Rx将在想要为事件添加或移除处理程序时调用这些委托:
IObservable<EventPattern<FileSystemEventArgs>> changeEvents = Observable .FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>( h => watcher.Changed += h, h => watcher.Changed -= h);这是AOT和修剪工具可以轻松理解的代码. 编写了明确为
FileSystemWatcher.Changed事件添加和移除处理程序的方法, 所以AOT工具可以预编译这两个方法, 修剪工具知道他们不能删除这些事件的添加和移除处理程序.缺点是这是一段相当繁琐的代码要写. 如果你还没有接受使用Rx的想法, 这可能足以让你想 "还是坚持使用普通的.NET事件, 谢谢. "
但是繁琐的本质是.NET事件问题的一个症状. 如果事件一开始就是一等公民, 就不必写这么难看的东西了.
不仅那种二等公民的地位意味着不能只传递事件本身作为参数, 它还意味着必须显式声明类型参数.
事件的委托类型(在这个例子中是
FileSystemEventHandler)和它的事件参数类型(这里是FileSystemEventArgs)之间的关系, 一般来说, 不是C#的类型推断可以自动确定的, 这就是为什么必须显式指定这两个类型. (使用泛型EventHandler<T>类型的事件更适合类型推断, 并且可以使用稍微不那么冗长的FromEventPattern版本. 不幸的是, 相对较少的事件实际上使用那个. 一些事件除了某事刚刚发生的事实之外不提供任何信息, 并且使用基本的EventHandler类型, 对于那些类型的事件, 你实际上可以完全省略类型参数, 使代码稍微不那么难看. 你仍然需要提供添加和移除回调, 不过. )注意这个示例中
FromEventPattern的返回类型是:IObservable<EventPattern<FileSystemEventArgs>>.EventPattern<T>类型封装了事件传递给处理程序的信息.大多数.NET事件遵循一个常见模式, 其中处理程序方法接受两个参数: 一个
object sender, 它只是告诉你哪个对象引发了事件(如果你将一个事件处理程序附加到多个对象上很有用), 然后是一个派生自EventArgs的第二个参数, 提供关于事件的信息.EventPattern<T>只是将这两个参数打包成一个对象, 提供Sender和EventArgs属性.在你实际上不想将一个处理程序附加到多个源的情况下, 你实际上只需要那个
EventArgs属性, 这就是为什么前面的FileSystemWatcher示例继续提取它, 以获得一个更简单的类型为IObservable<FileSystemEventArgs>的结果.它使用了
Select操作符来做到这一点, 将在后面更详细地介绍:IObservable<FileSystemEventArgs> changes = changeEvents.Select(ep => ep.EventArgs);将属性更改事件公开为可观察序列是非常常见的.
.NET运行时库定义了一个基于.NET事件的接口用于通告属性更改,
INotifyPropertyChanged, 并且一些用户界面框架有更专门的系统用于此, 例如WPF的DependencyProperty.如果正在考虑编写自己的包装器来做这种事情, 强烈建议首先查看 Reactive UI库.
从Task
Task和Task<T>类型在.NET中被广泛使用. 主流.NET语言对它们有内置支持(例如, C#的async和await关键字). 任务和IObservable<T>之间存在一些概念上的重叠:两者都表示某种可能需要一段时间才能完成的工作. 从某种意义上说,
IObservable<T>是Task<T>的一种泛化:两者都表示可能长时间运行的工作, 但
IObservable<T>可以产生多个结果, 而Task<T>只能产生一个.由于
IObservable<T>是更通用的抽象, 应该能够将Task<T>表示为IObservable<T>.Rx为
Task和Task<T>定义了各种扩展方法来做到这一点. 这些方法都称为ToObservable(), 它提供了各种重载, 在需要时提供对细节的控制, 并为最常见的场景提供简单性.虽然它们在概念上相似, 但
Task<T>在细节上有一些不同之处. 例如, 你可以检索其 Status 属性, 它可能报告它处于已取消或出错状态.IObservable<T>没有提供一种询问源其状态的方法; 它只是告诉你事情. 因此,ToObservable做出了一些关于如何以在Rx世界中有意义的方式呈现状态的决定:- 如果任务是
已取消,
IObservable<T>调用订阅者的OnError并传递一个TaskCanceledException. - 如果任务是
出错,
IObservable<T>调用订阅者的OnError并传递任务的内部异常. - 如果任务尚未处于最终状态(既不是已取消, 也不是出错, 或
已完成),
IObservable<T>在任务进入这些最终状态之一之前不会产生任何通知.
在你调用
ToObservable的那一刻, 任务是否已经处于最终状态并不重要. 如果它已经完成,ToObservable将只返回一个表示该状态的序列. (实际上, 它使用你之前看到的Return或Throw创建方法. )如果任务尚未完成,ToObservable将附加一个延续到任务, 以在它完成时检测结果.任务有两种形式:
Task<T>, 它产生一个结果, 和Task, 它不产生结果. 但在Rx中, 只有IObservable<T>, 没有无结果的形式.之前已经遇到过这个问题一次, 当
Observable.Start方法需要能够将委托适配为IObservable<T>即使委托是一个不产生结果的Action时. 解决方案是返回一个IObservable<Unit>, 这也是当你在普通Task上调用ToObservable时得到的结果.扩展方法很容易使用:
Task<string> t = Task.Run(() => { Console.WriteLine("Task running..."); return "Test"; }); IObservable<string> source = t.ToObservable(); source.Subscribe( Console.WriteLine, () => Console.WriteLine("completed")); source.Subscribe( Console.WriteLine, () => Console.WriteLine("completed"));这是输出:
Task running... Test completed Test completed注意, 即使有两个订阅者, 任务也只运行一次. 这并不奇怪, 因为只创建了一个任务. 如果任务尚未完成, 那么所有订阅者将在它完成时收到结果. 如果任务已经完成,
IObservable<T>实际上变成了一个单值冷可观察对象.每个订阅一个任务
有一种不同的方法来为源获取一个
IObservable<T>. 可以将前面示例中的第一个语句替换为:IObservable<string> source = Observable.FromAsync(() => Task.Run(() => { Console.WriteLine("Task running..."); return "Test"; }));对这个进行两次订阅会产生略有不同的输出:
Task running... Task running... Test Test completed completed注意, 这会执行任务两次, 每次调用
Subscribe都会执行一次.FromAsync可以这样做, 因为传递的不是一个Task<T>, 而是一个返回Task<T>的回调.它在调用
Subscribe时调用那个回调, 所以每个订阅者本质上都得到了自己的任务.如果想使用
async和await来定义的任务, 那么不需要使用Task.Run, 因为一个asynclambda创建一个Func<Task<T>>, 这正是FromAsync想要的类型:IObservable<string> source = Observable.FromAsync(async () => { Console.WriteLine("Task running..."); await Task.Delay(50); return "Test"; });这会产生与之前完全相同的输出. 不过, 这里有一个微妙的区别. 当使用
Task.Run时, lambda从一开始就在一个任务池线程上运行.但是当这样写时, lambda将在调用
Subscribe的任何线程上开始运行. 只有当它遇到第一个await时它才返回(并且对Subscribe的调用也会返回), 方法的其余部分在任务池上运行.
- 如果任务是
已取消,
从
IEnumerable<T>Rx定义了另一个扩展方法, 这次是针对
IEnumerable<T>的ToObservable.在前面的章节中, 描述了
IObservable<T>是如何被设计来表示与IEnumerable<T>相同的基本抽象的, 唯一的区别是获取序列中元素的机制:对于
IEnumerable<T>, 编写代码从集合中" 拉" 出值(例如, 一个foreach循环), 而IObservable<T>通过在IObserver<T>上调用OnNext向" 推" 值.可以编写代码来桥接从" 拉" 到" 推" :
// 示例代码仅用于说明 - 请勿使用! public static IObservable<T> ToObservableOversimplified<T>(this IEnumerable<T> source) { return Observable.Create<T>(o => { foreach (var item in source) { o.OnNext(item); } o.OnComplete(); // 错误地忽略取消订阅. return Disposable.Empty; }); }这个粗略的实现传达了基本思想, 但它很幼稚. 它没有尝试处理取消订阅, 并且在这种特定场景下使用
Observable.Create时, 不容易修复这个问题.正如将在本书后面看到的, 可能尝试快速连续传递大量事件的Rx源应该与Rx的并发模型集成. Rx提供的实现当然会处理所有这些棘手的细节.
这使得它相当复杂, 但那是Rx的问题; 你可以认为它在逻辑上等同于上面显示的代码, 但没有缺点.
实际上, 这是Rx.NET中一个反复出现的主题. 许多内置操作符有用不是因为它们做了特别复杂的事情, 而是因为它们为你处理了许多微妙和棘手的问题.
在考虑自己编写解决方案之前, 你应该总是尝试找到Rx.NET中内置的东西来做你需要的事情.
当从
IEnumerable<T>转换为IObservable<T>时, 你应该仔细考虑你真正想要实现什么.考虑到
IEnumerable<T>的阻塞同步(拉)性质与IObservable<T>的异步(推)性质并不总是很好地混合.一旦有东西订阅了以这种方式创建的
IObservable<T>, 它实际上是在请求迭代IEnumerable<T>, 立即产生所有的值.对
Subscribe的调用可能直到到达IEnumerable<T>的末尾才返回, 这使得它类似于本章开头显示的非常简单的示例. (说"可能" 是因为正如将在讨论调度程序时看到的, 确切的行为取决于上下文. )ToObservable不能创造奇迹, 某个地方必须执行相当于一个foreach循环的操作.所以虽然这可以是将数据序列引入Rx世界的一种方便方法, 但你应该仔细测试并测量性能影响.
从APM
Rx提供了对古老的 .NET异步编程模型(APM) 的支持.
早在.NET 1.0中, 这是表示异步操作的唯一模式. 它在2010年被.NET 4.0引入的 基于任务的异步模式(TAP) 所取代.
旧的APM没有比TAP提供任何优势. 此外, C#的
async和await关键字(以及其他.NET语言中的等效物)仅支持TAP, 这意味着最好避免使用APM. 然而, TAP在2011年Rx 1.0发布时还相当新, 所以它提供了适配器来将APM实现表示为IObservable<T>.如今没有人应该使用APM, 但为了完整性(并且以防你必须使用一个只提供APM的古老库), 将简要解释Rx对它的支持.
对
Observable.FromAsyncPattern的调用结果并不返回一个可观察序列. 它返回一个委托, 该委托返回一个可观察序列. (所以它本质上是一个工厂工厂. )这个委托的签名将与对FromAsyncPattern的调用的泛型参数匹配, 除了返回类型将被包装在一个可观察序列中. 以下示例包装了Stream类的BeginRead/EndRead方法(这是APM的一个实现).注意: 这纯粹是为了说明如何包装APM. 在实践中你永远不会这样做, 因为
Stream多年来一直支持TAP.Stream stream = GetStreamFromSomewhere(); var fileLength = (int)stream.Length; Func<byte[], int, int, IObservable<int>> read = Observable.FromAsyncPattern<byte[], int, int, int>( stream.BeginRead, stream.EndRead); var buffer = new byte[fileLength]; IObservable<int> bytesReadStream = read(buffer, 0, fileLength); bytesReadStream.Subscribe(byteCount => { Console.WriteLine( "Number of bytes read={0}, buffer should be populated with data now.", byteCount); });
主题(Subject)
到目前为止, 本章已经探索了各种返回 IObservable<T> 实现的工厂方法.
还有另一种方法: System.Reactive 定义了各种可以直接实例化的实现
IObservable<T> 的类型.
但是如何确定这些类型产生什么值呢? 能够做到这一点是因为它们也实现了
IObserver<T>, 使能够向它们推送值, 并且推送的那些值将被观察者看到.
在Rx中, 实现了 IObservable<T> 和 IObserver<T> 的类型被称为 主题.
有一个 ISubject<T> 来表示这个. (这个在 System.Reactive NuGet包中,
不像 IObservable<T> 和 IObserver<T>, 它们都内置于.NET运行时库中. )
ISubject<T> 看起来像这样:
public interface ISubject<T> : ISubject<T, T>
所以事实证明还有一个两个参数的 ISubject<TSource, TResult>,
以适应这样一个事实,
即既是观察者又是可观察对象的东西可能会以某种方式转换流经它的数据,
这意味着输入和输出类型不一定相同. 这是两个类型参数的定义:
public interface ISubject<in TSource, out TResult> : IObserver<TSource>, IObservable<TResult>
如你所见, ISubject 接口本身没有定义任何成员. 只是从 IObserver<T> 和
IObservable<T> 继承, 一个主题既是观察者又是可观察者.
这么设计的原因, 在于可以将 IObserver<T> 和 IObservable<T>
分别视为"消费者"和"发布者"接口. 主题既是消费者又是发布者.
数据流入和流出主题.
Rx提供了几个主题实现, 在希望提供 IObservable<T> 的代码中偶尔会有用.
虽然 Observable.Create 通常是首选方法, 但在一种重要情况下,
主题可能更有意义: 如果有一些代码发现感兴趣的事件(例如,
通过使用某些消息传递技术的客户端API), 并希望通过 IObservable<T>
提供它们, 主题有时可以提供比使用 Observable.Create
或自定义实现更方便的方法.
Rx提供了几种主题类型. 将从最容易理解的开始.
Subject<T>Subject<T>类型会立即将对其IObserver<T>方法的任何调用转发给当前订阅它的所有观察者. 以下示例展示了其基本操作:Subject<int> s = new(); s.Subscribe(x => Console.WriteLine($"Sub1: {x}")); s.Subscribe(x => Console.WriteLine($"Sub2: {x}")); s.OnNext(1); s.OnNext(2); s.OnNext(3);创建了一个
Subject<int>. 对其进行了两次订阅, 然后多次调用其OnNext方法. 这将产生以下输出, 说明Subject<int>将每个OnNext调用转发给了两个订阅者:Sub1: 1 Sub2: 1 Sub1: 2 Sub2: 2 Sub1: 3 Sub2: 3Subject<T>可以作为外部API接收数据并将其引入Rx世界的桥梁. 举例来说:public class MessageQueueToRx : IDisposable { private readonly Subject<string> messages = new(); public IObservable<string> Messages => messages; public void Run() { while (true) { // 从某个假设的消息队列服务接收消息 string message = MqLibrary.ReceiveMessage(); messages.OnNext(message); } } public void Dispose() { message.Dispose(); } }要使用
Observable.Create来实现并不困难. 涉及到多个不同的IObservable<T>,Subject<T>更清晰简洁.想象一下, 根据消息内容区分不同的消息类型, 并通过不同的可观察对象发布它们. 如果仍然希望使用单个循环从队列中拉取消息, 使用
Observable.Create来安排这一点会很困难.Subject<T>还会将对OnCompleted或OnError的调用分发给所有订阅者.当然, Rx的规则要求, 一旦在
IObserver<T>上调用了这两个方法中的任何一个(并且任何ISubject<T>都是IObserver<T>, 所以这个规则适用于Subject<T>), 就绝不能再对该观察者调用OnNext,OnError或OnComplete. 实际上,Subject<T>会容忍违反此规则的调用, 它只是忽略它们, 所以即使你的代码内部不完全遵守这些规则, 你向外部世界提供的IObservable<T>也将正确行为, 因为Rx会强制执行此规则.Subject<T>实现了IDisposable.处置
Subject<T>会使其进入一种状态, 如果调用其任何方法, 它将抛出异常. 文档还将其描述为取消所有观察者的订阅, 但由于已处置的Subject<T>在任何情况下都无法产生进一步的通知, 所以这实际上意义不大. (请注意, 在处置它时, 它不会对其观察者调用OnCompleted. )一个实际效果是, 其内部用于跟踪观察者的字段将重置为一个特殊的标记值, 表示它已被处置, 这意味着" 取消订阅" 观察者的一个外部可见效果是, 如果由于某种原因, 你的代码在处置Subject<T>后仍然保留对它的引用, 那么对于垃圾回收目的, 所有订阅者将不再可访问. 如果一个Subject<T>在不再使用后仍然无限期可访问, 这本身实际上就是一个内存泄漏, 但处置至少会限制影响: 只有Subject<T>本身仍然可访问, 而不是所有的订阅者.Subject<T>是最直接的主题, 但还有其他更专门的主题.ReplaySubject<T>Subject<T>不记住任何东西: 它立即将传入的值分发给订阅者. 如果有新的订阅者出现, 他们只会看到在他们订阅之后发生的事件.另一方面,
ReplaySubject<T>可以记住它曾经看到的每个值. 如果有一个新的订阅者, 它将接收到目前为止的完整事件历史.这是前面
Subject<T>中第一个示例的变体. 它创建了一个ReplaySubject<int>而不是Subject<int>. 并且不是立即进行两次订阅, 而是先创建一个初始订阅, 然后在发出几个值之后才进行第二次订阅.ReplaySubject<int> s = new(); s.Subscribe(x => Console.WriteLine($"Sub1: {x}")); s.OnNext(1); s.OnNext(2); s.Subscribe(x => Console.WriteLine($"Sub2: {x}")); s.OnNext(3);这将产生以下输出:
Sub1: 1 Sub1: 2 Sub2: 1 Sub2: 2 Sub1: 3 Sub2: 3如你所料, 最初只看到来自
Sub1的输出. 但是当进行第二次订阅调用时, 可以看到Sub2也收到了前两个值. 然后当报告第三个值时, 两者都能看到. 如果这个示例使用的是Subject<int>, 将看到以下输出:Sub1: 1 Sub1: 2 Sub1: 3 Sub2: 3这里有一个明显的潜在问题: 如果
ReplaySubject<T>记住发布给它的每个值, 绝不能将其用于无尽的事件源, 因为它最终会导致耗尽内存.ReplaySubject<T>提供了接受简单缓存过期设置的构造函数, 可以限制内存消耗. 一种选择是指定要记住的最大项目数. 以下示例创建一个缓冲区大小为2的ReplaySubject<T>:ReplaySubject<int> s = new(2); s.Subscribe(x => Console.WriteLine($"Sub1: {x}")); s.OnNext(1); s.OnNext(2); s.OnNext(3); s.Subscribe(x => Console.WriteLine($"Sub2: {x}")); s.OnNext(4);由于第二次订阅是在已经产生了3个值之后才进行的, 它不再看到所有的值. 它只接收在订阅之前发布的最后两个值(但第一次订阅当然会继续看到所有内容):
Sub1: 1 Sub1: 2 Sub1: 3 Sub2: 2 Sub2: 3 Sub1: 4 Sub2: 4或者, 你可以通过将
TimeSpan传递给ReplaySubject<T>构造函数来指定基于时间的限制.BehaviorSubject<T>与
ReplaySubject<T>类似,BehaviorSubject<T>也有内存, 但它只记住一个值. 然而, 它与缓冲区大小为1的ReplaySubject<T>并不完全相同.ReplaySubject<T>开始时处于没有任何内容在其内存中的状态, 而BehaviorSubject<T>总是记住 恰好 一个元素.在进行第一次
OnNext调用之前, 这是如何工作的呢?BehaviorSubject<T>通过在构造时要求提供初始值来强制执行这一点.所以你可以将
BehaviorSubject<T>视为一个总是有值可用的主题. 如果你订阅一个BehaviorSubject<T>, 它将立即产生一个单个值. (它可能随后会产生更多值, 但它总是立即产生一个. )碰巧的是, 它还通过一个名为
Value的属性提供该值, 所以你不需要订阅一个IObserver<T>来获取该值.BehaviorSubject<T>可以被视为一个可观察属性. 像普通属性一样, 它可以在你询问时立即提供一个值. 不同之处在于, 它可以在每次其值更改时通知你.如果你正在使用 [ReactiveUI框架](https://www.reactiveui.net/)(一个基于Rx的用于构建用户界面的框架),
BehaviourSubject<T>可以作为视图模型(在你的底层域模型和用户界面之间进行调解的类型)中属性的实现类型有意义.它具有属性般的行为, 使你能够随时检索值, 但它也提供更改通知, ReactiveUI可以处理这些通知以保持UI更新.
这个类比在完成方面略有不足. 如果你调用
OnCompleted, 它会立即对所有观察者调用OnCompleted, 并且如果有任何新的观察者订阅, 他们也将立即完成, 它不会首先提供最后一个值. (所以这是它与缓冲区大小为1的ReplaySubject<T>的另一个不同之处. )同样, 如果你调用
OnError, 所有当前观察者将收到一个OnError调用, 并且任何后续订阅者也将只收到一个OnError调用.AsyncSubject<T>AsyncSubject<T>为所有观察者提供它接收到的最终值.由于它在
OnCompleted被调用之前无法知道哪个是最终值, 所以在其OnCompleted或OnError方法被调用之前, 它不会对任何订阅者调用任何方法. (如果调用了OnError, 它只是将其转发给所有当前和未来的订阅者. )你通常会间接地使用这个主题, 因为它是 [Rx与await关键字集成](13~LeavingIObservable~.md#integration-with-async-and-await) 的基础. (当你await一个可观察序列时,await返回源发出的最终值. )如果在
OnCompleted之前没有对OnNext进行调用, 那么就没有最终值, 所以它将只是完成任何观察者而不提供值.在这个示例中, 由于序列从未完成, 所以不会发布任何值. 不会有任何内容写入控制台.
AsyncSubject<string> subject = new(); subject.OnNext("a"); subject.Subscribe(x => Console.WriteLine($"Sub1: {x}")); subject.OnNext("b"); subject.OnNext("c");在这个示例中, 调用了
OnCompleted方法, 所以主题将有一个最终值('c')要产生:AsyncSubject<string> subject = new(); subject.OnNext("a"); subject.Subscribe(x => Console.WriteLine($"Sub1: {x}")); subject.OnNext("b"); subject.OnNext("c"); subject.OnCompleted(); subject.Subscribe(x => Console.WriteLine($"Sub2: {x}"));这将产生以下输出:
Sub1: c Sub2: c如果你有一些在应用程序启动时需要完成的潜在缓慢工作, 并且只需要完成一次, 你可能会选择一个
AsyncSubject<T>来使该工作的结果可用.需要这些结果的代码可以订阅该主题. 如果工作尚未完成, 他们将在结果可用时立即收到结果. 如果工作已经完成, 他们将立即收到结果.
主题工厂
最后, 值得让你知道你也可以通过工厂方法创建主题. 考虑到主题结合了
IObservable<T>和IObserver<T>接口, 似乎应该有一个工厂允许你自己组合它们.Subject.Create(IObserver<TSource>, IObservable<TResult>)工厂方法就提供了这个功能.// 从指定的用于向主题发布消息的观察者和用于订阅从主题发送的消息的可观察对象创建一个主题 public static ISubject<TSource, TResult> Create<TSource, TResult>( IObserver<TSource> observer, IObservable<TResult> observable) {...}请注意, 与刚刚讨论的所有其他主题不同, 这个方法创建的主题在输入和输出之间没有固有的关系.
它只是接受你提供的任何
IObserver<TSource>和IObserver<TResult>实现, 并将它们包装在一个单一对象中.对主题的
IObserver<TSource>方法的所有调用将直接传递给你提供的观察者. 如果你希望值出现在相应的IObservable<TResult>的订阅者中, 这取决于你使其发生. 这实际上是以绝对最少的胶水将你提供的两个对象组合在一起.主题提供了一种方便的方式来探索Rx, 并且在生产场景中偶尔有用, 但在大多数情况下不推荐使用. 在 [使用指南附录](C~UsageGuidelines~.md) 中有解释. 相反, 建议使用本章前面显示的工厂方法, 而不是使用主题.
总结
已经研究了创建序列的各种急切和惰性方式. 已经看到了如何使用各种工厂方法生成基于定时器的序列. 还探讨了从其他同步和异步表示形式转换的方法.
快速回顾一下:
- 工厂方法
Observable.ReturnObservable.EmptyObservable.NeverObservable.ThrowObservable.CreateObservable.Defer
- 生成方法
Observable.RangeObservable.GenerateObservable.IntervalObservable.Timer
- 适配方法
Observable.StartObservable.FromEventPatternTask.ToObservableTask<T>.ToObservableIEnumerable<T>.ToObservableObservable.FromAsyncPattern
创建可观察序列是实际应用Rx的第一步: 创建序列, 然后将其公开以供使用. 现在已经牢固掌握了如何创建可观察序列, 可以更详细地研究允许描述要应用的处理的操作符, 以构建更复杂的可观察序列.