Rx.NET介绍第二部分: 从单一事件,到整体洞察
当今时代, 数据以惊人的速度被创建, 存储和传播, 处理这些数据就像试图直接从高压水枪中喝水一样令人应接不暇.
为了从数据中获得整体洞察, 我们需要实时确定数据的相关性, 对数据进行分组处理, 并将其作为一个整体来分析, 以发现从单个原始输入中可能无法察觉的模式或其他信息.
用户, 分析人员和管理人员需要处理比以往更多的数据, 同时还要提供更高的性能和更有价值的输出.
Rx 提供了一些强大的机制, 用于从原始数据流中提取有意义的见解,
这也是将信息表示为 IObservable<T> 流的主要原因之一.
上一章展示了如何创建一个可观察序列,
所以现在将探讨如何利用通过各种能够处理和转换可观察序列的 Rx
方法所释放出来的这种能力.
Rx 支持大多数标准的 LINQ 运算符. 它还定义了许多其他运算符. 这些运算符大致可分为以下几类, 接下来的各章将分别对每一类进行探讨:
第四章:过滤
过滤就像在淘金时使用筛子, 将泥沙和杂质筛掉, 只留下有价值的金粒.
Rx的filter就像是这个筛子, 帮助我们从大量事件中筛选出真正有用的信息, 减少噪音, 聚焦于关键洞察.
最简单机制就是过滤掉不想要的事件. Rx定义了几个可以做到这一点的操作符.
在介绍新操作符之前, 将快速定义一个扩展方法, 以帮助阐明几个示例.
Dump 扩展方法订阅任何 IObservable<T>, 并带有处理程序,
用于显示源产生的每个通知的消息.
此方法接受一个 name 参数, 该参数将作为每条消息的一部分显示,
使能够在订阅多个源的示例中看到事件来自何处.
public static class SampleExtensions
{
public static void Dump<T>(this IObservable<T> source, string name)
{
source.Subscribe(
value => Console.WriteLine($"{name}-->{value}"),
ex => Console.WriteLine($"{name} failed-->{ex.Message}"),
() => Console.WriteLine($"{name} completed"));
}
}
Where
过滤是一种极其常见的操作, LINQ中最直接的过滤器是 Where 操作符.
与LINQ通常的情况一样, Rx以扩展方法的形式提供其操作符. 如果已经熟悉LINQ,
Rx的 Where 方法的签名会很顺眼.
IObservable<T> Where<T>(this IObservable<T> source, Func<T, bool> predicate)
source 参数的元素类型与返回类型相同. Where 不会修改元素. 只会一些,
通过的元素将原封不动地传递过去.
示例使用 Where 过滤掉从 Range 序列中所有奇数, 只有偶数会通过.
IObservable<int> xs = Observable.Range(0, 10); // 数字0 - 9
IObservable<int> evenNumbers = xs.Where(i => i % 2 == 0);
evenNumbers.Dump("Where");
输出:
Where-->0
Where-->2
Where-->4
Where-->6
Where-->8
Where completed
Where 是众多标准LINQ操作符之一. LINQ to Objects(IEnumerable<T>
的实现)提供了一个等效的方法.
Rx的操作符的行为与它们在 IEnumerable<T> 实现中的行为基本相同,
稍后会看到一些例外情况.
我们将讨论每个实现, 并在进行过程中解释任何差异. 通过实现这些常见操作符, Rx还通过C#查询表达式语法获得了语言支持.
例如, 可以这样编写第一条语句, 并且它将编译为实际上相同的代码:
IObservable<int> evenNumbers =
from i in xs
where i % 2 == 0
select i;
对应的函数式写法
IObservable<int> evenNumbers = xs.Where(i => i % 2 == 0);
本书中的示例大多使用扩展函数方法, 而不是查询表达式, Rx实现了一些没有相应查询语法的操作符, 而且方法调用方式有时更清晰.
与大多数Rx操作符一样, Where 不会立即订阅其源.
Rx LINQ操作符与LINQ to Objects中的操作符非常相似: IEnumerable<T>
版本的 Where 返回时不会开始执行过滤.
只有当真正遍历 Where 返回的 IEnumerable<T> 时,
它才会开始进行过滤操作.
只有当对 Where 返回的 IObservable<T> 调用 Subscribe 时,
它才会对其源调用 Subscribe.
并且对于对 Subscribe 的每次这样的调用, 它都会这样做一次.
更一般地说, 当LINQ操作符链接在一起时, 对结果 IObservable<T> 的每次
Subscribe 调用都会导致一系列级联的 Subscribe 调用一直向下传递到链中.
这种级联 Subscribe 的一个副作用是, Where
(与大多数其他LINQ操作符一样)既不是固有热的也不是冷的:
因为它只是订阅其源, 所以如果其源是热的, 它就是热的, 如果其源是冷的,
它就是冷的.
Where 操作符传递其 predicate 回调返回 true 的所有元素.
更准确地说, 当订阅 Where 时, 它将创建自己的 IObserver<T>,
并将其作为参数传递给 source.Subscribe, 并且这个观察者会在每次调用
OnNext 时调用 predicate.
如果该谓词返回 true, 那么只有在那时, Where
创建的观察者才会在你传递给 Where 的观察者上调用 OnNext.
Where 总是传递对 OnComplete 或 OnError 的最终调用.
这意味着如果你这样写:
IObservable<int> dropEverything = xs.Where(_ => false);
以上操作会过滤掉所有元素(因为谓词忽略其参数并始终返回 false, 指示
Where 丢弃所有内容), 但这不会过滤掉错误或完成通知.
如果丢掉多有数据是需求, 也就谁说要实现一个丢弃所有元素并仅在源完成或失败时发送消息的操作符–后面我们会介绍一种简单的方法.
IgnoreElements
IgnoreElements 扩展方法允许仅接收 OnCompleted 或 OnError 通知.
它等同于使用 Where 操作符并带有一个始终返回 false 的谓词,
如下例所示:
IObservable<int> xs = Observable.Range(1, 3);
IObservable<int> dropEverything = xs.IgnoreElements();
xs.Dump("Unfiltered");
dropEverything.Dump("IgnoreElements");
如输出所示, xs 源产生数字1到3然后完成, 但如果通过 IgnoreElements
运行它, 看到的只是 OnCompleted.
Unfiltered-->1
Unfiltered-->2
Unfiltered-->3
Unfiltered completed
IgnoreElements completed
OfType
一些可观察序列会产生各种类型的项目. 例如, 考虑一个想要跟踪船只移动的应用程序. 这可以通过AIS接收器实现. AIS是自动识别系统, 大多数远洋船只使用它来报告其位置, 航向, 速度和其他信息. 有许多种AIS消息.
一些报告船只的位置和速度, 但它的名字在不同类型的消息中报告. (这是因为大多数船只移动的频率比更改名字的频率高, 所以它们以截然不同的间隔广播这两种类型的信息. )
想象一下这在Rx中可能是什么样子. 实际上你不必想象.
开源的 Ais.Net项目 包括一个ReceiverHost
类,
它通过Rx使AIS消息可用. ReceiverHost 定义了一个类型为
IObservable<IAisMessage> 的 Messages 属性.
由于AIS定义了许多消息类型, 这个可观察源可以产生许多不同种类的对象.
它发出的所有内容都将实现 IAisMessage
接口
, 该接口报告船只的唯一标识符, 但没有太多其他信息.
但是 Ais.Net.Models 库 定义了许多其他接口, 包括 IVesselNavigation接口 , 它报告位置, 速度和航向, 以及 IVesselName接口, 它告诉船只的名字.
假设只对水中船只的位置感兴趣, 而不关心船只的名字.
将希望看到所有实现 IVesselNavigation 接口的消息, 并忽略所有其他消息.
可以尝试使用 Where 操作符来实现这一点:
IObservable<IVesselNavigation> vesselMovements =
receiverHost.Messages.Where(m => m is IVesselNavigation);
无法编译. 将得到这个错误:
无法隐式转换类型
'System.IObservable<Ais.Net.Models.Abstractions.IAisMessage>'
为
'System.IObservable<Ais.Net.Models.Abstractions.IVesselNavigation>'
这是因为 Where 的 返回类型始终与其输入相同.
由于 receiverHost.Messages 的输入类型是 IObservable<IAisMessage>,
所以 Where 也将返回此类型, 这和声明的类型矛盾, 所以编译不过.
Rx提供了一个专门用于这种情况的操作符. OfType
将项目过滤为仅特定类型的项目.
元素必须是指定类型(或者是它的子类型):
IObservable<IVesselNavigation> vesselMovements =
receiverHost.Messages.OfType<IVesselNavigation>();
位置过滤
有时, 不太关心元素是什么, 而更关心它在序列中的位置. Rx定义了一些操作符可以帮助解决这个问题.
FirstAsync和FirstOrDefaultAsync
LINQ提供程序通常实现一个
First操作符, 它提供序列的第一个元素.Rx也不例外, 但Rx的性质意味着通常需要这个操作符的工作方式略有不同.
对于静态数据的提供程序(如LINQ to Objects或Entity Framework Core), 源元素已经存在, 所以检索第一个项目只是读取它的问题.
但对于Rx, 源在它们选择的时候产生数据, 所以无法知道第一个项目何时可用.
所以在Rx中, 通常使用
FirstAsync. 这返回一个IObservable<T>, 它将产生从源序列中出现的第一个值, 然后完成. Rx也提供了一个更传统的First方法, 但它只会在stream complete/exception之后才能拿到第一个元素, 大多数时候这不是我们期待的结果. 请参阅后面的First/Last/Single[OrDefault]的阻塞版本部分.例如, 以下代码使用前面介绍的AIS.NET源来报告特定船只(碰巧名为HMS Example)首次报告它正在移动的时间:
uint exampleMmsi = 235009890; IObservable<IVesselNavigation> moving = receiverHost.Messages .Where(v => v.Mmsi == exampleMmsi) .OfType<IVesselNavigation>() .Where(vn => vn.SpeedOverGround > 1f) .FirstAsync();除了使用
FirstAsync, 这个示例还使用了其他几个已经描述过的过滤元素.它从一个
Where步骤开始, 将消息过滤为仅来自感兴趣的一艘船的消息.具体来说, 根据那艘船的 海上移动业务识别码, 或MMSI 进行过滤, 然后使用
OfType, 以便只查看那些报告船只如何/是否在移动的消息.然后使用另一个
Where子句, 以便可以忽略表示船只实际上没有移动的消息, 最后, 使用FirstAsync, 以便只得到第一个表示移动的消息. 一旦船只移动, 这个moving源将发出一个单个的IVesselNavigation事件, 然后立即完成.可以稍微简化那个查询, 因为
FirstAsync可选地接受一个谓词. 这使能够将最后的Where和FirstAsync合并为一个操作符:IObservable<IVesselNavigation> moving = receiverHost.Messages .Where(v => v.Mmsi == exampleMmsi) .OfType<IVesselNavigation>() .FirstAsync(vn => vn.SpeedOverGround > 1f);如果
FirstAsync的输入为空会怎样? 如果它在从未产生项目的情况下完成,FirstAsync将调用其订阅者的OnError, 传递一个InvalidOperationException, 并带有一个错误消息, 报告序列不包含任何元素.如果使用接受谓词的形式(如在第二个示例中), 并且没有与谓词匹配的元素出现, 情况也是如此.
这与LINQ to Objects的
First操作符一致. (请注意, 不期望这会发生在刚刚显示的示例中, 因为只要应用程序在运行, 源将继续报告AIS消息, 这意味着没有理由它会完成. )有时, 可能希望容忍这种事件缺失的情况. 大多数LINQ提供程序不仅提供
First, 还提供FirstOrDefault. 可以通过修改前面的示例来使用它.这个示例使用
TakeUntil操作符来引入一个截止时间: 这个示例准备等待5分钟, 但在那之后放弃. (所以尽管AIS接收器可以无休止地产生消息, 但这个示例已经决定它不会永远等待. )并且由于这意味着可能在没有看到船只移动的情况下完成, 已经将
FirstAsync替换为FirstOrDefaultAsync:IObservable<IVesselNavigation?> moving = receiverHost.Messages .Where(v => v.Mmsi == exampleMmsi) .OfType<IVesselNavigation>() .TakeUntil(DateTimeOffset.Now.AddMinutes(5)) .FirstOrDefaultAsync(vn => vn.SpeedOverGround > 1f);如果在5分钟后, 没有看到来自船只的消息表明它以1节或更快的速度移动,
TakeUntil将从其上游源取消订阅, 并将在FirstOrDefaultAsync提供的观察者上调用OnCompleted. 而FirstAsync会将此视为错误,FirstOrDefaultAsync将产生其元素类型的默认值(在这种情况下为IVesselNavigation; 接口类型的默认值为null), 将其传递给其订阅者的OnNext, 然后调用OnCompleted.简而言之, 这个
moving可观察对象将始终产生恰好一个元素.它要么产生一个
IVesselNavigation表示船只已经移动, 要么产生null表示在这段代码允许的5分钟内没有发生这种情况.产生
null可能是表示某事没有发生的一种方式, 可用但是丑陋:订阅
moving的后续代码中需要进行null判断, 其实Rx提供了一种更直接的方式来表示事件的缺失: 一个空序列(没有经过OnNext, 直接进入OnComplte).可以构造一个 第一个或空(FirstOrEmpty) 操作符以这种方式工作.
LINQ to Objects的
First返回T, 而不是IEnumerable<T>, 所以它没有办法返回一个空序列.但是因为Rx提供了返回
IObservable<T>的类似First的操作符, 所以从技术上讲, 有可能有一个操作符返回第一个元素或根本不返回元素.Rx中没有内置这样的操作符, 但可以通过使用一个更通用的操作符
Take来获得完全相同的效果.Take
Take是一个标准的LINQ操作符, 它从序列中获取前几个项目, 然后丢弃其余的项目.从不是特别严格的意义上说,
Take是First更通用的形式:Take(1)只返回第一个元素, 可以认为LINQ的First是Take的一个特殊情况.这当然并不完全正确, 因为这些操作符对缺失元素的方式不同:
First(和Rx的FirstAsync)坚持至少接收一个元素, 如果提供一个空序列, 将产生一个InvalidOperationException.即使是更宽松的
FirstOrDefault也需要有个default值. 而Take的工作方式不同.如果
Take的输入在产生指定数量的元素之前完成,Take不会抱怨–它只是转发源提供的任何内容.如果源除了调用
OnCompleted什么都不做, 那么Take就在其观察者上调用OnCompleted.如果使用
Take(5), 但源产生了三个项目然后完成,Take(5)将转发这三个项目给它的订阅者, 然后完成. 这意味着可以使用Take来实现前面讨论的假设的FirstOrEmpty:public static IObservable<T> FirstOrEmpty<T>(this IObservable<T> src) => src.Take(1);这里也看到, 大多数Rx操作符(以及本章中的所有操作符)本质上既不是热的也不是冷的.
操作符返回
IObservable的温度取决于源. 对于某个热的source,source.Take(1)也是热的.在这些示例中一直使用的AIS.NET
receiverHost.Messages源是热的(因为它报告来自船只的实时消息广播), 所以从它派生的可观察序列也是热的.IObservable<IAisMessage> hotTake = receiverHost.Messages.Take(1);FirstAsync和Take操作符从序列的开头开始工作. 如果只对序列的末尾感兴趣呢?LastAsync, LastOrDefaultAsync和PublishLast
LINQ提供程序提供
Last和LastOrDefault. 这些操作符几乎与First或FirstOrDefault完全相同, 只是如名称所示, 它们返回最后一个元素而不是第一个元素.与
First一样, Rx的性质意味着与处理静态数据的LINQ提供程序不同, 最后一个元素可能不是就在那里等待被获取.所以正如Rx提供
FirstAsync和FirstOrDefault一样, 它也提供LastAsync和LastOrDefaultAsync.还有
PublishLast. 它具有与LastAsync相似的语义, 但它对多个订阅的处理方式不同.每次订阅
LastAsync返回的IObservable<T>时, 它将订阅底层源, 但PublishLast对底层源只进行一次Subscribe调用. (为了精确控制这何时发生,PublishLast返回一个IConnectableObservable<T>. 如第2章的热和冷源部分所述, 这提供了一个Connect方法, 当调用此方法时,PublishLast返回的可连接可观察对象将订阅其底层源. )一旦这个单一订阅从源接收到
OnComplete通知, 它将把最后一个值传递给所有订阅者. (它还记住最后一个值, 所以如果在最后一个值产生后有新的观察者订阅, 他们将在订阅时立即收到该值. )最后一个值之后紧接着是一个
OnCompleted通知.这是
Multicast操作符的一系列操作符之一.LastAsync和LastOrDefaultAsync之间的区别与FirstAsync和FirstOrDefaultAsync之间的区别相同.如果源完成时没有产生任何内容,
LastAsync报告错误, 而LastOrDefaultAsync发出其元素类型的默认值然后完成.PublishLast对空源的处理方式又有所不同: 如果源完成时没有产生任何元素,PublishLast返回的可观察对象将执行相同操作: 在这种情况下, 它既不产生错误也不产生默认值.报告序列的最后一个元素面临着
First不会遇到的挑战.很容易知道何时从源接收到第一个元素: 如果源产生一个元素, 并且它之前没有产生过元素, 那么那就是第一个元素.
这意味着像
FirstAsync这样的操作符可以立即报告第一个元素. 但是LastAsync和LastOrDefaultAsync没有这种奢侈.如果从源接收到一个元素, 怎么知道它是最后一个元素呢? 一般来说, 在接收到它的那一刻无法知道.
只有当源继续调用
OnCompleted方法时, 才会知道已经接收到了最后一个元素.这不一定会立即发生.
前面的一个示例使用
TakeUntil(DateTimeOffset.Now.AddMinutes(5))在5分钟后结束一个序列, 如果这样做, 完全有可能在最后一个元素被发出和TakeUntil关闭之间经过相当长的时间.在AIS场景中, 船只可能每隔几分钟才发送一次消息, 所以很有可能最终会遇到
TakeUntil转发一条消息, 然后几分钟后发现截止时间已到, 没有更多消息进来.在最后一个
OnNext和OnComplete之间可能已经过去了几分钟.因此,
LastAsync和LastOrDefaultAsync在它们的源完成之前根本不发出任何内容.这有一个重要的后果: 在
LastAsync从源接收到最后一个元素和它将该元素转发给其订阅者之间可能会有显著的延迟.TakeLast
前面看到Rx实现了标准的
Take操作符, 它从序列的开头转发指定数量的元素然后停止.TakeLast转发序列末尾的元素. 例如,TakeLast(3)请求源序列的最后3个元素. 与Take一样,TakeLast对产生太少项目的源是容忍的.如果一个源产生的项目少于3个,
TakeLast(3)将只转发整个序列.TakeLast面临着与Last相同的挑战: 它不知道何时接近序列的末尾.因此, 它必须保留最近看到的值的副本. 它需要内存来保留指定的任意多个值.
如果写
TakeLast(1_000_000), 它将需要分配一个足够大的缓冲区来存储1,000,000个值.它不知道它接收到的第一个元素是否会是最后一百万个中的一个.
直到源完成, 或者源已经发出超过1,000,000个项目, 它才能知道.
当源最终完成时,
TakeLast现在将知道最后一百万个元素是什么, 并需要将它们一个接一个地传递给其订阅者的OnNext方法.Skip和SkipLast
如果想要与
Take或TakeLast操作符完全相反的操作呢?与其从源获取前5个项目, 也许想丢弃前5个项目呢?
也许有一个
IObservable<float>从传感器获取读数, 并且发现传感器在最初的几次读数中产生错误值, 所以想忽略那些, 只在它稳定下来后开始监听.可以用
Skip(5)来实现这一点.SkipLast在序列的末尾做同样的事情: 它省略指定数量的尾端元素.与刚刚看过的其他一些操作符一样, 这必须处理它无法判断何时接近序列末尾的问题.
它只有在源发出所有元素, 接着是一个
OnComplete之后, 才能发现哪些是最后(比如说)4个元素.所以
SkipLast会引入延迟. 如果使用SkipLast(4), 它不会转发源产生的第一个元素, 直到源产生第5个元素.所以它不需要等待
OnCompleted或OnError才能开始做事, 它只需要等待直到确定一个元素不是想要丢弃的元素之一.其他关键的过滤方法非常相似, 认为可以把它们看作一个大组.
首先将看
Skip和Take. 它们的行为就像它们在IEnumerable<T>实现中的行为一样.这些是Skip/Take方法中最简单且可能最常用的. 这两个方法都只有一个参数; 要跳过或获取的元素数量.
SingleAsync和SingleOrDefaultAsync
LINQ操作符通常提供一个
Single操作符, 用于当源应该恰好提供一个元素, 并且如果它包含更多或为空则是错误的情况.这里Rx的考虑与
First和Last相同, 所以可能不会惊讶地得知Rx提供了一个SingleAsync方法, 它返回一个IObservable<T>, 它将要么恰好一次调用其观察者的OnNext, 要么调用其OnError以指示源报告了错误, 或者源没有恰好产生一个元素.使用
SingleAsync, 如果源为空, 将得到一个错误, 就像使用FirstAsync和LastAsync一样, 但如果源包含多个项目, 也将得到一个错误.有一个
SingleOrDefault, 与它的First/Last对应物一样, 容忍空输入序列, 在那种情况下生成一个具有元素类型默认值的单个元素.Single和SingleAsync与Last和LastAsync共享一个特点, 即它们在最初从源接收到一个项目时不知道它是否应该是输出.这可能看起来很奇怪: 由于
Single要求源流只提供一个项目, 那么它肯定知道它将传递给其订阅者的项目将是它接收到的第一个项目.这是真的, 但当它接收到第一个项目时它还不知道的是源是否会产生第二个项目.
它不能转发第一个项目, 除非并且直到源完成. 可以说
SingleAsync的工作是首先验证源恰好包含一个项目, 然后如果是这样就转发该项目, 但如果不是就报告错误.在错误情况下,
SingleAsync将在接收到第二个项目时知道它出错了, 所以它可以在那时立即在其订阅者上调用OnError.但在成功场景中, 它直到源通过完成确认没有更多内容到来才能知道一切正常. 只有那时
SingleAsync才会发出结果.First/Last/Single[OrDefault]的阻塞版本前面几节中描述的几个操作符名称以
Async结尾.这有点奇怪, 因为通常, .NET中以
Async结尾的方法返回一个Task或Task<T>, 而这些都返回一个IObservable<T>.而且, 如已经讨论的, 这些方法中的每一个都对应一个标准LINQ操作符, 其名称通常不以
Async结尾. (更让人困惑的是, 一些LINQ提供程序, 如Entity Framework Core, 确实包括这些操作符的Async版本, 但它们是不同的.与Rx不同, 这些实际上返回一个
Task<T>, 所以它们仍然产生一个单一值, 而不是一个IQueryable<T>或IEnumerable<T>. )这个命名源于Rx设计早期的一个不幸选择.如果Rx今天从头开始设计, 前面一节中的相关操作符将只使用普通名称:
First,FirstOrDefault等等.它们都以
Async结尾的原因是这些是在Rx 2.0中添加的, 而Rx 1.0已经定义了具有那些名称的操作符. 这个示例使用First操作符:int v = Observable.Range(1, 10).First(); Console.WriteLine(v);这将输出值
1, 这是Range在这里返回的第一个项目. 但是看看那个变量v的类型.它不是一个
IObservable<int>, 它只是一个int. 如果在一个订阅时不立即产生值的Rx操作符上使用这个会发生什么呢? 这是一个例子:long v = Observable.Timer(TimeSpan.FromSeconds(2)).First(); Console.WriteLine(v);执行以上代码, 会发现对
First的调用直到产生一个值才返回.它是一个阻塞操作符. 通常在Rx中避免使用阻塞操作符, 因为很容易用它们创建死锁.
Rx的全部意义在于可以创建对事件做出反应的代码, 所以只是坐着等待直到一个特定的可观察源产生一个值并不真正符合其精神.
如果发现自己想这样做, 通常有更好的方法来实. (或者也许Rx对正在做的事情不是一个好的模型. )
如果真的需要像这样等待一个值, 使用
Async形式结合Rx对C#的async/await的集成支持可能更好:long v = await Observable.Timer(TimeSpan.FromSeconds(2)).FirstAsync(); Console.WriteLine(v);这在逻辑上有相同的效果, 但因为使用了
await, 这在等待可观察源产生一个值时不会阻塞调用线程. 这可能会减少死锁的机会.能够使用
await使得这些方法以Async名字是 异步 了, 但具体发生了什么呢? 已经看到这些方法都返回IObservable<T>, 而不是Task<T>, 那么如何能够使用await呢?当
await一个可观察序列时,await将在源完成时完成, 并且它将返回从源中出现的最终值.这对于像
FirstAsync和LastAsync这样只产生一个元素的操作符工作得很好.请注意, 偶尔会有值立即可用的情况. 例如, 第3章的
BehaviourSubject<T>部分表明,BehaviourSubject<T>的定义特征是它总是有一个当前值. 这意味着Rx的First方法实际上不会阻塞–它将订阅BehaviourSubject<T>, 并且BehaviourSubject<T>.Subscribe在返回之前会在其订阅者的可观察对象上调用OnNext. 这使得First能够立即返回一个值而不阻塞. (当然, 如果使用接受谓词的First重载, 并且如果BehaviourSubject<T>的值不满足谓词,First将然后阻塞. )ElementAt
还有另一个标准LINQ操作符用于从源中选择一个特定元素:
ElementAt.提供一个数字, 指示需要的元素在序列中的位置. 在处理静态数据的LINQ提供程序中, 这在逻辑上等同于通过索引访问数组元素.
Rx实现了这个操作符, 但与大多数LINQ提供程序的
ElementAt<T>实现返回一个T不同, Rx的返回一个IObservable<T>.与
First,Last和Single不同, Rx没有提供ElementAt<T>的阻塞形式. 但由于可以await任何IObservable<T>, 总是可以这样做:IAisMessage fourth = await receiverHost.Message.ElementAt(4);如果源序列只产生五个值, 而请求
ElementAt(5), 当源完成时,ElementAt返回的序列将向其订阅者报告一个ArgumentOutOfRangeException错误. 有三种方法可以处理这个问题:- 优雅地处理
OnError. - 使用
.Skip(5).Take(1);这将忽略前5个值, 只获取第6个值. 如果序列少于6个元素, 将得到一个空序列, 但没有错误. - 使用
ElementAtOrDefault.ElementAtOrDefault扩展方法将在索引超出范围时保护, 通过推送default(T)值. 目前没有提供自己默认值的选项.
- 优雅地处理
时间过滤
Take和TakeLast操作符让过滤掉除了序列开头或结尾的所有元素(Skip和SkipLast让看到除了那些之外的所有元素), 但这些都要求知道确切的元素数量.如果想指定截止不是根据元素数量, 而是根据特定的时间瞬间呢?
实际上已经看到了一个例子: 前面使用
TakeUntil将一个无尽的IObservable<T>转换为一个在五分钟后完成的可观察对象. 这是一系列操作符中的一个.SkipWhile和TakeWhile
在
Skip和SkipLast部分, 描述了一个传感器, 它在最初的几次读数中产生错误值.这很常见. 例如, 气体监测传感器通常需要将某个组件加热到正确的工作温度, 然后才能产生准确的读数.
在那个部分的示例中, 使用
Skip(5)忽略了前几次读数, 但这是一个粗糙的解决方案. 怎么知道5就足够了呢? 或者它可能更快准备好, 在这种情况下5就太多了.真正想做的是丢弃读数, 直到知道读数将是有效的.
而这正是
SkipWhile可以有用的那种场景. 假设有一个气体传感器, 它报告某种特定气体的浓度, 但它也报告执行检测的传感器板的温度.与其希望5次读数是一个合理的跳过数量, 可以表达实际要求:
const int MinimumSensorTemperature = 74; IObservable<SensorReading> readings = sensor.RawReadings .SkipUntil(r => r.SensorTemperature >= MinimumSensorTemperature);这直接表达了需要的逻辑: 这将丢弃读数, 直到设备达到其最低工作温度.
下一组方法允许在谓词计算结果为真时从序列中跳过或获取值. 对于
SkipWhile操作, 这将过滤掉所有值, 直到一个值使谓词失败, 然后可以返回剩余的序列.var subject = new Subject<int>(); subject .SkipWhile(i => i < 4) .Subscribe(Console.WriteLine, () => Console.WriteLine("Completed")); subject.OnNext(1); subject.OnNext(2); subject.OnNext(3); subject.OnNext(4); subject.OnNext(3); subject.OnNext(2); subject.OnNext(1); subject.OnNext(0); subject.OnCompleted();输出:
4 3 2 1 0 CompletedTakeWhile将返回所有使谓词通过的值, 当第一个使谓词失败的值出现时, 序列将完成.var subject = new Subject<int>(); subject .TakeWhile(i => i < 4) .Subscribe(Console.WriteLine, () => Console.WriteLine("Completed")); subject.OnNext(1); subject.OnNext(2); subject.OnNext(3); subject.OnNext(4); subject.OnNext(3); subject.OnNext(2); subject.OnNext(1); subject.OnNext(0); subject.OnCompleted();输出:
1 2 3 CompletedSkipUntil和TakeUntil
除了
SkipWhile和TakeWhile, Rx还定义了SkipUntil和TakeUntil.这些听起来可能只是相同想法的另一种表达方式: 期望
SkipUntil几乎与SkipWhile做完全相同的事情, 唯一的区别是SkipWhile在其谓词返回true时运行, 而SkipUntil在其谓词返回false时运行. 并且SkipUntil有一个重载确实是这样做的(TakeUntil也有相应的重载).如果这就是它们的全部, 那它们就不会很有趣. 然而,
SkipUntil和TakeUntil有一些重载使能够做一些用SkipWhile和TakeWhile无法做到的事情.已经看到了一个例子. 在
FirstAsync和FirstOrDefaultAsync部分有一个例子使用了TakeUntil的一个重载, 它接受一个DateTimeOffset.这个重载包装任何
IObservable<T>, 返回一个IObservable<T>, 它将转发源中的所有内容, 直到指定的时间, 此时它将立即完成(并将从底层源取消订阅).无法用
TakeWhile实现这一点, 因为TakeWhile仅在源产生项目时才查询其谓词.如果希望源在特定时间完成, 用
TakeWhile做到这一点的唯一方法是其源恰好在想要结束的精确时刻产生一个项目.TakeWhile只会因为其源产生项目而完成.TakeUntil可以异步完成.如果指定未来5分钟的时间, 当那个时间到达时, 无论源是否完全空闲都没关系.
TakeUntil无论如何都会完成. 它依赖于调度器来做到这一点.不一定要使用时间,
TakeUntil提供了一个接受第二个IObservable<T>的重载.这使能够告诉它在有趣的事情发生时停止, 而无需提前确切知道何时会发生.
这个
TakeUntil的重载转发源中的项目, 直到第二个IObservable<T>产生一个值.SkipUntil提供了一个类似的重载, 其中第二个IObservable<T>确定它应该何时开始转发源中的项目.注意: 这些重载要求第二个可观察对象产生一个值才能触发开始或结束.
如果第二个可观察对象在没有产生单个通知的情况下完成, 那么它没有效果–
TakeUntil将继续无限期地获取项目;SkipUntil将永远不会产生任何内容. 换句话说, 这些操作符会将Observable.Empty<T>()视为实际上等同于Observable.Never<T>().Distinct和DistinctUntilChanged
Distinct是另一个标准LINQ操作符. 它从序列中删除重复项.为此, 它需要记住其源曾经产生的所有值, 以便它可以过滤掉以前见过的任何项目.
Rx包括
Distinct的一个实现, 这个示例使用它来显示生成AIS消息的船只的唯一标识符, 但确保只在第一次看到每个这样的标识符时显示它:IObservable<uint> newIds = receiverHost.Messages .Select(m => m.Mmsi) .Distinct(); newIds.Subscribe(id => Console.WriteLine($"New vessel: {id}"));(这有点超前–它使用了
Select, 将在第六章序列转换中介绍. 这是一个非常广泛使用的LINQ操作符. 在这里使用它从消息中提取仅MMSI–船只标识符. )只对船只的标识符感兴趣, 这个示例没问题.
但是如果想检查这些消息的详细信息呢? 如何既能保留只看到以前从未听说过的船只的消息的能力, 又能查看那些消息中的信息呢?
使用
Select提取id阻止了这样做. 幸运的是,Distinct提供了一个重载, 使能够改变它确定唯一性的方式.与其让
Distinct查看它正在处理的值, 可以提供一个函数, 让选择任何喜欢的特征.所以, 不是将流过滤为以前从未见过的值, 而是可以将流过滤为具有以前从未见过的某些特定属性或属性组合的值. 这是一个简单的例子:
IObservable<IAisMessage> newVesselMessages = receiverHost.Messages.Distinct(m => m.Mmsi);这里,
Distinct的输入现在是一个IObservable<IAisMessage>. (在前面的例子中, 它实际上是IObservable<uint>, 因为Select子句仅提取了MMSI. ) 所以每次源发出一个IAisMessage,Distinct现在都会收到整个IAisMessage.但是因为提供了一个回调, 它不会尝试将整个
IAisMessage消息相互比较.相反, 每次它收到一个消息, 它会将其传递给回调, 然后查看回调返回的值, 并将其与回调为所有以前见过的消息返回的值进行比较, 只有当它是新的时才让消息通过.
所以效果与以前相似. 只有当消息具有以前未见过的MMSI时才会被允许通过.
但不同之处在于,
Distinct操作符的输出这里是IObservable<IAisMessage>, 所以当Distinct让一个项目通过时, 整个原始消息仍然可用.除了标准LINQ的
Distinct操作符, Rx还提供了DistinctUntilChanged.这个操作符仅在某些东西发生变化时才让通知通过, 它通过仅过滤掉相邻的重复项来实现这一点.
例如, 对于序列
1,2,2,3,4,4,5,4,3,3,2,1,1, 它将产生1,2,3,4,5,4,3,2,1. 与Distinct记住曾经产生的每个值不同,DistinctUntilChanged只记住最近发出的值, 并且仅当新值与最近的值匹配时才过滤掉新值.这个示例使用
DistinctUntilChanged来检测特定船只报告NavigationStatus变化的时间.uint exampleMmsi = 235009890; IObservable<IAisMessageType1to3> statusChanges = receiverHost.Messages .Where(v => v.Mmsi == exampleMmsi) .OfType<IAisMessageType1to3>() .DistinctUntilChanged(m => m.NavigationStatus) .Skip(1);例如, 如果船只一直报告状态为
AtAnchor,DistinctUntilChanged将丢弃每个这样的消息, 因为状态与以前相同.但是如果状态变为
UnderwayUsingEngine, 这将导致DistinctUntilChanged让第一个报告该状态的消息通过.然后它不会让任何进一步的消息通过, 直到值再次发生变化, 要么变回
AtAnchor, 要么变为其他值, 如Moored. (最后的Skip(1)是因为DistinctUntilChanged总是让它看到的第一个消息通过.无法知道这是否实际上代表状态的变化, 但很可能不是: 船只每隔几分钟报告一次状态, 但它们更改状态的频率要低得多, 所以第一次收到船只状态的报告时, 它可能不代表状态的变化.
通过丢弃第一个项目, 确保
statusChanges仅在可以确定状态发生变化时提供通知. )这就是对Rx中可用的过滤方法的快速浏览. 虽然它们相对简单, 但正如已经开始看到的, Rx的强大之处在于其操作符的可组合性.
过滤操作符是在这个信息丰富的时代管理可能面临的数据洪流的第一站. 现在知道如何应用各种标准来删除数据. 接下来, 将转向可以转换数据的操作符.
第五章:序列转换
所使用的序列中的值并不总是符合需要的格式. 有时存在比需要的更多的信息, 需要挑选出感兴趣的值. 有时每个值需要被扩展为一个更丰富的对象或更多的值.
到目前为止, 已经研究了序列的创建, 转换为序列以及通过过滤对序列进行缩减. 在本章中, 将研究序列的 转换 .
Select
最直接的转换方法是 Select. 它允许提供一个函数, 该函数接受一个
TSource 类型的值并返回一个 TResult 类型的值. Select
的签名反映了它将序列元素从一种类型转换为另一种类型的能力, 即从
IObservable<TSource> 转换为 IObservable<TResult>.
IObservable<TResult> Select<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, TResult> selector)
不必更改类型 – 如果愿意, TSource 和 TResult 可以相同.
第一个示例通过给一个整数序列中的每个值加3来转换该序列,
从而得到另一个整数序列.
IObservable<int> source = Observable.Range(0, 5);
source.Select(i => i + 3)
.Dump("+3")
这使用了在开头定义的 Dump 扩展方法. 它产生以下输出:
+3 --> 3
+3 --> 4
+3 --> 5
+3 --> 6
+3 --> 7
+3 completed
下一个示例以更改值类型的方式转换值. 它将整数值转换为字符.
Observable.Range(1, 5);
.Select(i => (char)(i + 64))
.Dump("char");
输出:
char --> A
char --> B
char --> C
char --> D
char --> E
char completed
这个示例将整数序列转换为一个元素具有匿名类型的序列:
Observable.Range(1, 5)
.Select(i => new { Number = i, Character = (char)(i + 64) })
.Dump("anon");
输出:
anon --> { Number = 1, Character = A }
anon --> { Number = 2, Character = B }
anon --> { Number = 3, Character = C }
anon --> { Number = 4, Character = D }
anon --> { Number = 5, Character = E }
anon completed
Select 是C#查询表达式语法支持的标准LINQ运算符之一,
所以可以像这样编写最后一个示例:
var query = from i in Observable.Range(1, 5)
select new {Number = i, Character = (char) (i + 64)};
query.Dump("anon");
在Rx中, Select 有另一个重载, 其中 selector 函数接受两个值.
额外的参数是元素在序列中的索引.
如果元素在序列中的索引对选择器函数很重要, 则使用此方法.
SelectMany
Select 为每个输入产生一个输出, 而 SelectMany
使每个输入元素能够转换为任意数量的输出. 为了了解这是如何工作的,
让先看一个仅使用 Select 的示例:
Observable
.Range(1, 5)
.Select(i => new string((char)(i + 64), i))
.Dump("strings");
它产生以下输出:
strings-->A
strings-->BB
strings-->CCC
strings-->DDDD
strings-->EEEEE
strings completed
对于 Range 产生的每个数字, 输出包含一个字符串, 其长度为该数字的字符数.
如果不是将每个数字转换为一个字符串, 而是将其转换为一个
IObservable<char> 会怎样呢? 只需在构造字符串后添加 .ToObservable()
就可以做到:
Observable
.Range(1, 5)
.Select(i => new string((char)(i + 64), i).ToObservable())
.Dump("sequences");
(或者, 可以将选择表达式替换为
i => Observable.Repeat((char)(i + 64), i). 两者的效果完全相同.
)输出不是很有用:
strings-->System.Reactive.Linq.ObservableImpl.ToObservableRecursive~1[System.Char]
strings-->System.Reactive.Linq.ObservableImpl.ToObservableRecursive~1[System.Char]
strings-->System.Reactive.Linq.ObservableImpl.ToObservableRecursive~1[System.Char]
strings-->System.Reactive.Linq.ObservableImpl.ToObservableRecursive~1[System.Char]
strings-->System.Reactive.Linq.ObservableImpl.ToObservableRecursive~1[System.Char]
strings completed
有一个可观察序列的可观察序列. 但是看看如果现在将那个 Select 替换为
SelectMany 会发生什么:
Observable
.Range(1, 5)
.SelectMany(i => new string((char)(i + 64), i).ToObservable())
.Dump("chars");
这给一个 IObservable<char>, 输出如下:
chars-->A
chars-->B
chars-->B
chars-->C
chars-->C
chars-->D
chars-->C
chars-->D
chars-->E
chars-->D
chars-->E
chars-->D
chars-->E
chars-->E
chars-->E
chars completed
顺序有点混乱, 但如果仔细看, 会发现每个字母出现的次数与输出字符串时相同.
例如, 只有一个 A, 但 C 出现三次, E 出现五次.
SelectMany 期望转换函数为每个输入返回一个 IObservable<T>,
然后它将这些结果组合回一个单一结果. LINQ to Objects
的等效操作则不那么混乱. 如果运行以下代码:
Enumerable
.Range(1, 5)
.SelectMany(i => new string((char)(i + 64), i))
.ToList()
它将产生一个包含以下元素的列表:
[ A, B, B, C, C, C, D, D, D, D, E, E, E, E, E ]
顺序不那么奇怪. 值得更详细地探讨一下原因.
IEnumerable<T> 与 IObservable<T> 的 SelectMany
IEnumerable<T> 是基于拉取的 – 序列只有在被请求时才产生元素.
Enumerable.SelectMany 以非常特定的顺序从其源中拉取项目. 它首先从其源
IEnumerable<int> (前面示例中由 Range 返回的那个)请求第一个值.
SelectMany 然后调用回调函数, 传递这个第一个项目,
然后枚举回调函数返回的 IEnumerable<char> 中的所有内容.
只有当它用尽这个时, 它才会向源(Range)请求第二个项目.
同样, 它将第二个项目传递给回调函数, 然后完全枚举返回的
IEnumerable<char>, 依此类推.
所以首先从第一个嵌套序列中获取所有内容, 然后从第二个中获取, 等等.
Enumerable.SelectMany 能够以这种方式进行有两个原因.
首先, IEnumerable<T> 的基于拉取的性质使它能够决定处理事情的顺序.
其次, 对于 IEnumerable<T>, 操作阻塞是正常的, 即直到它们有东西给才返回.
当前面的示例调用 ToList 时, 它不会返回, 直到它用所有结果完全填充了一个
List<T>.
Rx不是这样的. 首先, 消费者不能告诉源何时产生每个项目 – 源在准备好时发出项目.
其次, Rx通常模拟正在进行的过程, 所以不期望方法调用阻塞直到它们完成.
在某些情况下, Rx序列会自然地非常快速地产生所有项目并尽快完成, 但倾向于用Rx建模的那种信息源通常不会这样表现.
所以Rx中的大多数操作不会阻塞 – 它们立即返回一些东西(例如一个
IObservable<T>, 或一个表示订阅的 IDisposable), 然后稍后产生值.
当前正在研究的Rx版本的示例实际上是这些不寻常的情况之一, 其中每个序列都会尽快发出项目.
从逻辑上讲, 所有嵌套的 IObservable<char> 序列都是并发进行的.
结果是一团糟, 因为这里的每个可观察源都试图尽快产生每个元素, 只要源能够消费它们.
它们最终交错的事实与这些可观察源使用Rx的 Scheduler 系统的方式有关, 将在调度和线程章节中描述.
调度程序确保即使正在建模逻辑上并发的过程, Rx的规则也能得到维护, 并且
SelectMany 输出的观察者一次只会得到一个项目.
以下弹珠图显示了导致看到混乱输出的事件:
可以做一个小调整来防止子序列都同时尝试运行. (这也使用了
Observable.Repeat, 而不是先构造一个字符串然后在其上调用 ToObservable
的相当间接的方法. 在前面的示例中这样做是为了强调与LINQ to
Objects示例的相似性, 但在Rx中实际上不会那样做. )
Observable
.Range(1, 5)
.SelectMany(i =>
Observable.Repeat((char)(i + 64), i)
.Delay(TimeSpan.FromMilliseconds(i * 100)))
.Dump("chars");
现在得到与 IEnumerable<T> 版本一致的输出:
chars-->A
chars-->B
chars-->B
chars-->C
chars-->C
chars-->C
chars-->D
chars-->D
chars-->D
chars-->D
chars-->E
chars-->E
chars-->E
chars-->E
chars-->E
chars completed
这阐明了 SelectMany 允许为源产生的每个项目生成一个序列,
并将所有这些新序列中的所有项目扁平化为一个包含所有内容的单一序列.
虽然这可能更容易理解, 实践中不会纯粹为了使其更容易理解而在现实中引入这种延迟. 这些延迟意味着所有元素大约需要一秒半的时间才能出现.
这个弹珠图显示, 上面的代码通过使每个子可观察对象产生一小批项目来产生一个合理的顺序, 并且刚刚引入了空闲时间来获得分离:
引入这些间隙纯粹是为了提供一个稍微不那么混乱的示例,
但如果真的想要这种严格按顺序的处理, 实践中不会以这种方式使用
SelectMany.
首先, 它不能完全保证有效. (如果尝试这个示例, 但修改它使用越来越短的时间跨度, 最终会达到一个点, 其中项目又开始变得混乱. 而且由于.NET不是一个实时编程系统, 实际上在这里没有一个安全的时间跨度可以保证顺序. )
如果绝对需要在看到第二个子序列中的任何项目之前获取第一个子序列中的所有项目, 实际上有一种可靠的方法来请求:
Observable
.Range(1, 5)
.Select(i => Observable.Repeat((char)(i + 64), i))
.Concat()
.Dump("chars");
然而, 这不是展示 SelectMany 功能的好方法, 因为根本没用到它😂. (它使用
Concat, 将在组合序列章节中讨论.)
在知道正在解包单值序列时,
或者在没有特定顺序要求并且希望在子可观察对象产生元素时获取它们时使用
SelectMany.
SelectMany的重要性
按顺序阅读本书的章节, 在前面的章节中已经看到了两个 SelectMany 的示例.
以下是相关代码:
IObservable<int> onoffs =
from _ in src
from delta in
Observable.Return(1, scheduler)
.Concat(Observable.Return(-1, scheduler)
.Delay(minimumInactivityPeriod, scheduler))
select delta;
(如果想知道 SelectMany 的调用在哪里, 请记住, 如果查询表达式包含两个
from 子句, C#编译器会将其转换为对 SelectMany 的调用.
)这说明了Rx中的一种常见模式, 可以描述为展开然后再合并.
这个示例通过为 src 产生的每个项目创建一个新的, 短暂的
IObservable<int> 来工作. (这些子序列, 在示例中由 delta 范围变量表示,
产生值 1, 然后在指定的 minimumActivityPeriod 之后, 它们产生 -1.
这使能够计算最近发出的事件的数量. )
这是 展开 部分, 其中源序列中的项目产生新的可观察序列. SelectMany
在这些场景中至关重要,
因为它使所有这些新序列能够扁平化为一个单一的输出序列.
使用 SelectMany 的第二个地方略有不同: 它是
第3章的"在Rx中表示文件系统事件"部分的最后一个示例.
虽然那个示例也将多个可观察源组合成一个单一的可观察对象,
但可观察对象的列表是固定的: 对于 FileSystemWatcher
的每个不同事件都有一个.
它使用了不同的运算符 Merge (将在本章"组合序列"中介绍,
在那种场景中使用起来更简单).因为只需将想要组合的所有可观察对象的列表传递给它.
然而, 由于这段代码想要做的其他一些事情(包括延迟启动,
自动处置以及在有多个订阅者活动时共享单个源),
用于实现此目的的特定运算符组合意味着返回
IObservable<FileSystemEventArgs> 的合并代码需要作为转换步骤被调用.
如果只使用了 Select, 结果将是一个
IObservable<IObservable<FileSystemEventArgs>>.
代码的结构意味着它只会产生一个 IObservable<FileSystemEventArgs>,
所以双重包装的类型会非常不方便.
SelectMany 在这些场景中非常有用.
如果运算符的组合引入了不想要的额外一层可观察对象中的可观察对象,
SelectMany 可以解开一层.
这两种情况 – 展开然后合并, 以及移除或避免一层可观察对象中的可观察对象 –
经常出现, 这使得 SelectMany 成为一个重要的方法.
碰巧的是, SelectMany 在Rx所基于的数学理论中也是一个特别重要的运算符.
它是一个基本运算符, 因为可以用它构建许多其他Rx运算符.
Cast
C#的类型系统并非无所不知. 有时可能知道一些关于从可观察源中出现的值的类型信息, 而这些信息并未反映在源的类型中.
这可能基于特定领域的知识. 例如, 对于船舶广播的AIS消息, 可能知道如果消息类型为3, 它将包含导航信息. 这意味着可以这样写:
IObservable<IVesselNavigation> type3 =
receiverHost.Messages.Where(v => v.MessageType == 3)
.Cast<IVesselNavigation>();
这使用了 Cast, 一个标准的LINQ运算符,
可以在知道某些集合中的项目属于比类型系统能够推断出的更具体的类型时使用它.
Cast 和 第5章中显示的 OfType
运算符的区别在于它们处理不属于指定类型的项目的方式. OfType
是一个过滤运算符, 所以它只是过滤掉任何不属于指定类型的项目.
但是对于 Cast (就像普通的C#强制转换表达式一样),
断言期望源项目属于指定类型, 所以如果 Cast
的源产生一个与指定类型不兼容的项目, 它返回的可观察对象将调用其订阅者的
OnError.
如果使用其他更基本的运算符重新创建 Cast 和 OfType 的功能,
这种区别可能更容易理解.
// source.Cast<int>();等同于
source.Select(i => (int)i);
// source.OfType<int>();
source.Where(i => i is int).Select(i => (int)i);
Materialize和Dematerialize
Materialize 运算符将 IObservable<T> 类型的源转换为
IObservable<Notification<T>> 类型的源.
它将为源产生的每个项目提供一个 Notification<T>, 并且如果源终止,
它将产生一个最终的 Notification<T>, 指示它是成功完成还是出错完成.
这可能很有用, 因为它产生描述整个序列的对象.
如果想以一种稍后可以重放的方式记录可观察对象的输出, 使用
ReplaySubject<T>, 因为它正是为此而设计的.
但是如果想能够做的不仅仅是重放序列 – 检查项目或者甚至在重放之前修改它们, 可能想编写自己的代码来存储项目.
Notification<T> 会很有帮助, 因为它能够以统一的方式表示源所做的一切.
不需要单独存储关于序列是否或如何终止的信息 – 这些信息就是最终的
Notification<T>.
可以想象在单元测试中结合 ToArray 使用它. 能够获得一个
Notification<T>[] 类型的数组, 其中包含源所做的一切的完整描述,
从而更容易编写测试, 例如询问序列中出现的第三个项目是什么.
(Rx.NET源代码本身在许多测试中使用 Notification<T>. )
如果具体化一个序列, 可以看到返回的包装值.
Observable.Range(1, 3)
.Materialize()
.Dump("Materialize");
输出:
Materialize --> OnNext(1)
Materialize --> OnNext(2)
Materialize --> OnNext(3)
Materialize --> OnCompleted()
Materialize completed
请注意, 当源序列完成时, 具体化的序列会产生一个 'OnCompleted' 通知值,
然后完成. Notification<T> 是一个抽象类, 有三个实现:
- OnNextNotification
- OnErrorNotification
- OnCompletedNotification
Notification<T> 公开四个公共属性来帮助检查它: Kind, HasValue,
Value 和 Exception.
显然, 只有 OnNextNotification 会为 HasValue 返回 true, 并且有
Value 的有用实现.
类似地, OnErrorNotification 是唯一会有 Exception 值的实现. Kind
属性返回一个枚举, 判断哪种方法适合.
public enum NotificationKind
OnNext,
OnError,
OnCompleted,
在接下来的示例中, 产生一个出错的序列. 请注意, 具体化序列的最终值是一个
OnErrorNotification. 还要注意, 具体化序列不会出错, 它会成功完成.
var source = new Subject<int>();
source.Materialize()
.Dump("Materialize");
source.OnNext(1);
source.OnNext(2);
source.OnNext(3);
source.OnError(new Exception("Fail?"));
输出:
Materialize --> OnNext(1)
Materialize --> OnNext(2)
Materialize --> OnNext(3)
Materialize --> OnError(System.Exception)
Materialize completed
具体化一个序列对于执行序列的分析或日志记录非常方便. 可以通过应用
Dematerialize 扩展方法来解包一个具体化的序列. Dematerialize 仅适用于
IObservable<Notification<TSource>>.
这就完成了对转换运算符的介绍.
它们的共同特点是它们为每个输入项目产生一个输出(或者, 在 SelectMany
的情况下, 一组输出). 接下来,
将研究那些可以组合来自其源中多个项目的信息的运算符.
第六章:(Aggregate)聚合
数据以其原始形式并不总是易于处理的. 有时需要整合, 整理, 合并或浓缩接收到的大量数据.
这可能只是将数据量减少到可管理水平的情况.
例如, 考虑来自仪器仪表, 金融, 信号处理和运营智能等领域的快速变化的数据.
对于单个源, 此类数据的变化速度可能超过每秒十个值, 如果观察多个源, 变化速度会更高.
一个人真的能处理这些数据吗? 对于人类消费而言, 像平均值, 最小值和最大值这样的聚合值可能更有用.
通常可以实现更多. 组合和关联数据的方式可能使能够揭示模式, 提供从任何单个消息中无法获得的见解, 或者不仅仅是简单地减少到单个统计度量.
Rx的可组合性使能够对数据流表达复杂而微妙的计算, 这不仅使能够减少用户必须处理的消息量, 而且能够增加人类接收到的每条消息中的价值量.
将从最简单的聚合函数开始, 这些函数以某种特定方式将一个可观察序列简化为一个只有单个值的序列. 然后将转向更通用的操作符, 使您能够定义自己的聚合机制.
简单数值聚合
Rx支持各种标准LINQ操作符, 这些操作符将序列中的所有值简化为单个数值结果.
计数
Count返回一个序列包含多少个元素. 虽然这是一个标准LINQ操作符, 但Rx的版本与IEnumerable<T>版本有所不同, 因为Rx将返回一个可观察序列, 而不是标量值.通常, 这是因为Rx的推送相关性质. Rx的
Count不能要求其源立即提供所有元素, 所以它只能等待源表示它已完成.Count返回的序列始终是IObservable<int>类型, 无论源的元素类型如何.在源完成之前, 它什么都不做, 此时它将发出一个单一值, 报告源产生了多少个元素, 然后它将立即完成. 这个示例使用
Count与Range, 因为Range尽可能快地生成其所有值, 然后完成, 这意味着可以立即从Count得到结果:IObservable<int> numbers = Observable.Range(0, 3); numbers.Count().Dump("count");count-->3 count Completed如果期望序列中的值数量超过32位有符号整数所能计数的范围, 可以使用
LongCount操作符代替. 它与Count相同, 只是返回IObservable<long>.求和
Sum操作符将其源中的所有值相加, 产生总和作为其唯一输出.与
Count一样, Rx的Sum与大多数其他LINQ提供程序的不同之处在于它不产生标量作为其输出.它产生一个可观察序列, 在其源完成之前什么都不做. 当源完成时,
Sum返回的可观察序列产生一个单一值, 然后立即完成. 这个示例展示了它的用法:IObservable<int> numbers = Observable.Range(1, 5); numbers.Sum().Dump("sum");输出显示了
Sum产生的单个结果:sum-->15 sum completedSum只能处理int,long,float,double,decimal或这些类型的可空版本的值. 这意味着有些类型却无法使用Sum.例如,
System.Numerics命名空间中的BigInteger类型表示整数, 其大小仅受可用内存和准备等待它执行计算的时间限制. (即使是基本算术运算在有数百万位数字的数字上也会变得非常慢. )可以使用
+将它们相加, 因为该类型为该操作符定义了重载. 但Sum一直以来都无法找到该重载.C# 11.0中引入的 泛型计算 意味着从技术上讲, 可以引入一个适用于任何实现
IAdditionOperators<T, T, T>的类型T的Sum版本. 然而, 这将意味着依赖于.NET 7.0或更高版本(因为泛型数学在旧版本中不可用), 在撰写本文时, Rx通过其net6.0目标支持.NET 7.0. 它可以引入单独的net7.0和/或net8.0目标来实现这一点, 但尚未这样做. (LINQ to Objects中的Sum目前也不支持这一点.)如果为
Sum提供这些类型的可空版本(例如, 的源是IObservable<int?>), 那么Sum也将返回一个具有可空项类型的序列, 并且如果任何输入值为null, 它将产生null.虽然
Sum只能处理一小部分固定的数值类型, 但的源不一定必须产生这些类型的值.Sum提供了接受lambda的重载, 该lambda从每个输入元素中提取合适的数值.例如, 假设有这么个问题: 如果接下来恰好通过AIS广播自身描述的10艘船并排放置, 它们是否都能放入某个特定宽度的航道?
可以通过将AIS消息过滤为提供船舶尺寸信息的消息, 使用
Take收集接下来的10条这样的消息, 然后使用Sum来处理.Ais.NET库的
IVesselDimensions接口没有实现加法(即使实现了, 刚刚看到Rx也无法利用它), 但没关系: 所需要做的就是提供一个lambda, 它可以接受一个IVesselDimensions并返回一个Sum可以处理的某种数值类型的值:IObservable<IVesselDimensions> vesselDimensions = receiverHost.Messages .OfType<IVesselDimensions>(); IObservable<int> totalVesselWidths = vesselDimensions .Take(10) .Sum(dimensions => checked((int)(dimensions.DimensionToPort + dimensions.DimensionToStarboard)));(如果想知道这里的强制转换和
checked关键字是怎么回事, AIS将这些值定义为无符号整数, 所以Ais.NET库将它们报告为uint, 这不是Rx的Sum支持的类型.实际上, 一艘船宽到足以溢出32位有符号整数的可能性非常小, 所以只是将其强制转换为
int, 并且checked关键字将在遇到宽度超过21亿米的船这种不太可能的情况下抛出异常. )平均值
标准LINQ操作符
Average有效地计算Sum会计算的值, 然后将其除以Count会计算的值. 再一次, 与大多数LINQ实现返回标量不同, Rx的Average产生一个可观察对象.虽然
Average可以处理与Sum相同的数值类型的值, 但在某些情况下输出类型会不同.如果源是
IObservable<int>, 或者如果使用接受从源中提取值的lambda的重载, 并且该lambda返回int, 结果将是double. 这是因为一组整数的平均值不一定是整数.同样, 对
long值求平均会产生double. 然而,decimal类型的输入会产生decimal类型的输出,float类型的输入会产生float类型的输出. 与Sum一样, 如果Average的输入是可空的, 输出也将是可空的.最小值和最大值
Rx实现了标准LINQ的
Min和Max操作符, 它们找到具有最高或最低值的元素.与本节中的所有其他操作符一样, 这些操作符不返回标量, 而是返回一个
IObservable<T>, 它产生一个单一值.Rx为
Sum和Average支持的相同数值类型定义了专门的实现. 然而, 与那些操作符不同的是, 它还定义了一个接受任何类型源项的重载.当在Rx没有定义专门实现的源类型上使用
Min或Max时, 它使用Comparer<T>.Default来比较项. 还有一个重载允许传递一个比较器.与
Sum和Average一样, 有接受回调的重载. 如果使用这些重载,Min和Max将为每个源项调用此回调, 并寻找回调返回的最低或最高值.请注意, 它们最终产生的单个输出将是回调返回的值, 而不是从中派生该值的原始源项. 要了解这意味着什么, 请看这个示例:
IObservable<int> widthOfWidestVessel = vesselDimensions .Take(10) .Max(dimensions => checked((int)(dimensions.DimensionToPort + dimensions.DimensionToStarboard)));Max在这里返回一个IObservable<int>, 它将是接下来报告船舶尺寸的10条消息中最宽船舶的宽度. 但是如果不仅仅想看到宽度, 而是想要整个消息呢?最小/大值元素
Rx提供了
Min和Max的两个微妙变体:MinBy和MaxBy. 这些与刚刚看到的基于回调的Min和Max类似, 使能够处理不是数值类型但可能具有数值属性的元素序列.不同之处在于,
MinBy和MaxBy不是返回最小值或最大值, 而是哪个源元素产生了该值. 例如, 假设不仅仅想发现最宽的船的宽度, 还想知道那实际上是哪艘船:IObservable<IVesselDimensions> widthOfWidestVessel = vesselDimensions .Take(10) .MaxBy(dimensions => checked((int)(dimensions.DimensionToPort + dimensions.DimensionToStarboard)));这与上一节中的示例非常相似. 正在处理一个元素类型为
IVesselDimensions的序列, 所以提供了一个回调, 用于提取想要用于比较目的的值. (实际上, 与上次的回调相同. )就像
Max一样,MaxBy试图找出当传递给这个回调时哪个元素产生最高值. 在源完成之前它无法知道是哪个.如果源尚未完成, 它所能知道的只是到目前为止的最高值, 但未来的值可能会超过它.
所以与在本章中看到的所有其他操作符一样, 在源完成之前它什么都不做, 这就是为什么在那里放了一个
Take(10).然而, 得到的序列类型是不同的.
Max返回一个IObservable<int>, 因为它为源中的每个项目调用回调, 然后产生回调返回的最高值.但是对于
MaxBy, 得到一个IObservable<IVesselDimensions>, 因为MaxBy告诉哪个源元素产生了该值.当然, 可能有不止一个项目具有最高宽度–例如, 可能有三艘同样大的船. 对于
Max来说这并不重要, 因为它只是试图返回实际值: 有多少个源项目具有最大值并不重要, 因为在所有情况下都是相同的值.但是对于
MaxBy, 得到产生最大值的原始项目, 如果有三个都这样, 不希望Rx随意选择一个. 所以与到目前为止看到的其他聚合操作符不同,MinBy或MaxBy返回的可观察对象不一定只产生一个值.它可能产生几个值. 可能会问它是否真的是一个聚合操作符, 因为它没有将输入序列简化为一个输出.
但它确实将其简化为一个单一值: 回调返回的最小值(或最大值). 只是它的呈现结果略有不同. 它根据聚合过程的结果生成一个序列.
可以将其视为聚合和过滤的组合: 它执行聚合以确定最小值或最大值, 然后将源序列过滤为仅包含回调产生该值的元素.
注意: LINQ to Objects也定义了
MinBy和MaxBy方法, 但它们略有不同. 这些LINQ to Objects版本实际上会随意选择一个源元素–如果有多个源值都产生最小值(或最大值)结果, LINQ to Objects只给一个, 而Rx会给所有的.Rx在.NET 6.0添加它们的LINQ to Objects同名方法之前多年就定义了这些操作符的版本, 所以如果想知道为什么Rx的做法不同, 真正的问题是为什么LINQ to Objects不遵循Rx的先例.
简单布尔聚合
LINQ定义了几个标准操作符, 将整个序列简化为单个布尔值.
(Any)任意
Any操作符有两种形式. 无参数重载实际上是在问这个问题: "这个序列中有任何元素吗?"它返回一个可观察序列, 如果源在没有发出任何值的情况下完成, 将产生一个
false值.然而, 如果源确实产生了一个值, 那么当产生第一个值时, 结果序列将立即产生
true, 然后完成. 如果它收到的第一个通知是错误, 那么它将传递该错误.var subject = new Subject<int>(); subject.Subscribe(Console.WriteLine, () => Console.WriteLine("Subject completed")); var any = subject.Any(); any.Subscribe(b => Console.WriteLine("The subject has any values? {0}", b)); subject.OnNext(1); subject.OnCompleted();输出:
1 The subject has any values? True subject completed如果现在删除
OnNext(1), 输出将变为以下内容:subject completed The subject has any values? False在源确实产生值的情况下,
Any会立即取消对源的订阅. 所以如果源想要报告错误,Any只有在这是它收到的第一个通知时才会看到这个错误.var subject = new Subject<int>(); subject.Subscribe(Console.WriteLine, ex => Console.WriteLine("subject OnError : {0}", ex), () => Console.WriteLine("Subject completed")); IObservable<bool> any = subject.Any(); any.Subscribe(b => Console.WriteLine("The subject has any values? {0}", b), ex => Console.WriteLine(".Any() OnError : {0}", ex), () => Console.WriteLine(".Any() completed")); subject.OnError(new Exception());输出:
subject OnError : System.Exception: Exception of type 'System.Exception' was thrown. .Any() OnError : System.Exception: Exception of type 'System.Exception' was thrown.但是如果源在产生异常之前生成了一个值, 例如:
subject.OnNext(42); subject.OnError(new Exception());将看到以下输出:
42 The subject has any values? True .Any() completed subject OnError : System.Exception: Exception of type 'System.Exception' was thrown.虽然直接订阅源主题的处理程序仍然会看到错误, 但
any可观察对象报告了一个True值, 然后完成, 这意味着它没有报告随后的错误.Any方法还有一个接受谓词的重载. 这实际上是在问一个稍微不同的问题: " 这个序列中有任何满足这些条件的元素吗? " 其效果类似于使用Where后跟无参数形式的Any.IObservable<bool> any = subject.Any(i => i > 2); // 功能上等同于 IObservable<bool> longWindedAny = subject.Where(i => i > 2).Any();
(All)全部
All操作符类似于接受谓词的Any方法, 不同之处在于所有值都必须满足谓词. 一旦谓词拒绝一个值,All返回的可观察对象就会产生一个false值, 然后完成.如果源到达末尾而没有产生任何不满足谓词的元素, 那么
All将推送true作为其值. (这样做的一个结果是, 如果在一个空序列上使用All, 结果将是一个产生true的序列. 这与All在其他LINQ提供程序中的工作方式一致, 但对于不熟悉称为空真的形式逻辑约定的人来说可能会感到惊讶. )一旦
All决定产生一个false值, 它就会立即取消对源的订阅(就像Any一旦确定可以产生true时所做的那样). 如果源在这之前产生错误, 错误将传递给All方法的订阅者.var subject = new Subject<int>(); subject.Subscribe(Console.WriteLine, () => Console.WriteLine("Subject completed")); IEnumerable<bool> all = subject.All(i => i < 5); all.Subscribe(b => Console.WriteLine($"All values less than 5? {b}")); subject.OnNext(1); subject.OnNext(2); subject.OnNext(6); subject.OnNext(2); subject.OnNext(1); subject.OnCompleted();输出:
1 2 6 All values less than 5? False all completed 2 1 subject completed(IsEmpty)是否为空
LINQ的
IsEmpty操作符在逻辑上与无参数的Any方法相反. 当且仅当源在没有产生任何元素的情况下完成时, 它返回true. 如果源产生一个项目,IsEmpty产生false并立即取消订阅. 如果源产生错误, 它会转发该错误.(Contains)包含
Contains操作符确定一个特定元素是否存在于一个序列中. 可以使用Any来实现它, 只需提供一个回调, 将每个项目与正在寻找的值进行比较. 然而, 通常写Contains会更简洁, 并且可能更直接地表达意图.var subject = new Subject<int>(); subject.Subscribe( Console.WriteLine, () => Console.WriteLine("Subject completed")); IEnumerable<bool> contains = subject.Contains(2); contains.Subscribe( b => Console.WriteLine("Contains the value 2? {0}", b), () => Console.WriteLine("contains completed")); subject.OnNext(1); subject.OnNext(2); subject.OnNext(3); subject.OnCompleted();输出:
1 2 Contains the value 2? True contains completed 3 Subject completedContains还有一个重载, 允许指定一个不同于类型默认实现的IEqualityComparer<T>. 如果有一个自定义类型的序列, 根据用例可能有一些特殊的相等规则, 这会很有用.构建自己的聚合
如果前面章节中描述的内置聚合不能满足的需求, 可以构建自己的聚合. Rx提供了两种不同的方法来做到这一点.
(Aggregate)聚合
Aggregate方法非常灵活: 可以用它构建本章到目前为止所示的任何操作符. 我们需要提供一个函数, 它会为每个元素调用一次该函数. 但它不仅仅将元素传递给函数: 它还提供一种让函数"聚合"信息的方式. 除了当前元素, 它还传递一个" 累加器" . 累加器可以是任何类型–这将取决于想要累积什么样的信息. 函数返回的任何值都将成为新的累加器值, 并且它会将其与源中的下一个元素一起传递给函数. 有一些变体, 但最简单的重载如下所示:IObservable<TSource> Aggregate<TSource>( this IObservable<TSource> source, Func<TSource, TSource, TSource> accumulator)如果想为
int值生成自己版本的Count, 可以通过提供一个函数来做到这一点, 该函数对于源产生的每个值只加1:IObservable<int> sum = source.Aggregate((acc, element) => acc + 1);为了确切理解这是在做什么, 让看看
Aggregate将如何调用这个lambda表达式. 为了更容易看到这一点, 假设将那个lambda表达式放在它自己的变量中:Func<int, int, int> c = (acc, element) => acc + 1;现在假设源产生一个值为100的项目. Aggregate将调用函数:
c(0, 100) // 返回1第一个参数是当前累加器.
Aggregate使用default(int)作为初始累加器值, 即0. 函数返回1, 这成为新的累加器值. 所以如果源产生第二个值, 比如说200,Aggregate将传递新的累加器以及源中的第二个值:c(1, 200) // 返回2这个特定的函数完全忽略了它的第二个参数(来自源的元素). 它每次只是将累加器加1. 所以累加器只不过是函数被调用次数的记录.
现在让看看如何使用
Aggregate实现Sum:Func<int, int, int> s = (acc, element) => acc + element IObservable<int> sum = source.Aggregate(s);对于第一个元素,
Aggregate将再次传递选择的累加器类型int的默认值: 0. 并且它将传递第一个元素值. 所以如果第一个元素是100, 它会这样做:s(0, 100) // 返回100然后如果第二个元素是200,
Aggregate将进行以下调用:s(100, 200) // 返回300注意这次, 第一个参数是100, 因为这是上一次调用
s返回的值. 所以在这种情况下, 在看到元素100和200之后, 累加器的值是300, 这是所有元素的总和.如果希望初始累加器值不是
default(TAccumulator)怎么办? 有一个重载可以做到这一点. 例如, 这是如何使用Aggregate实现类似All的功能:IObservable<bool> all = source.Aggregate(true, (acc, element) => acc && element);顺便说一下, 这与真正的
All并不完全等效: 它处理错误的方式不同.All如果看到一个false元素, 会立即取消对源的订阅, 因为它知道源产生的其他任何东西都不可能改变结果.这意味着如果源即将产生错误, 它将不再有机会这样做, 因为
All取消了订阅. 但是Aggregate无法知道累加器已经进入了一个它永远无法离开的状态, 所以它将一直订阅源, 直到源完成(或者直到订阅Aggregate返回的IObservable<T>的任何代码取消订阅).这意味着如果源产生
true, 然后是false,Aggregate将不像All那样取消对源的订阅, 所以如果源继续调用OnError,Aggregate将收到该错误, 并将其传递给它的订阅者.这里有一种有些人觉得有帮助的思考
Aggregate的方式. 如果源产生值1到5, 并且如果传递给Aggregate的函数被称为f, 那么当源完成时Aggregate产生的值将是这样的:T result = f(f(f(f(f(default(T), 1), 2), 3), 4), 5);所以在重新创建
Count的情况下, 累加器类型是int, 所以这变成:int sum = s(s(s(s(s(0, 1), 2), 3), 4), 5); // 注意: Aggregate不会直接返回这个值 - // 它返回一个IObservable<int>, 该可观察对象产生这个值.Rx的
Aggregate不会一次执行所有这些调用: 它在源产生每个元素时调用函数, 所以计算将在时间上分散进行.如果回调是一个"纯函数" –一个不受全局变量和其他环境因素影响, 并且对于任何特定输入总是返回相同结果的函数–这并不重要.
Aggregate的结果将与如果它在一个像前面例子那样的大表达式中全部发生的结果相同.但是如果回调的行为受到, 比如说, 一个全局变量或当前文件系统内容的影响, 那么它将在源产生每个值时被调用这一事实可能更重要.
顺便说一下,
Aggregate在一些编程系统中有其他名称. 一些系统称它为reduce. 它也经常被称为fold(具体来说是" 左折叠" . 右折叠以相反的顺序进行. 按照惯例, 它的函数以相反的顺序接受参数, 所以它看起来像s(1, s(2, s(3, s(4, s(5, 0)))))).Rx没有提供内置的右折叠. 它不太适合, 因为它必须等到收到最后一个元素才能开始, 这意味着它需要保留整个序列中的每个元素, 然后在序列完成时一次性评估整个折叠).
在重新实现一些内置聚合操作符的过程中, 直接从
Sum跳到了Any. 那Average呢?事实证明, 仅使用到目前为止展示的重载无法做到这一点. 这是因为
Average需要累积两条信息–运行总和和计数–并且它还需要在最后执行一个最终步骤:它需要将总和除以计数. 使用到目前为止展示的重载, 只能做到一部分:
IObservable<int> nums = Observable.Range(1, 5); IObservable<(int Count, int Sum)> avgAcc = nums.Aggregate( (Count: 0, Sum: 0), (acc, element) => (Count: acc.Count + 1, Sum: acc.Sum + element));这使用一个元组作为累加器, 使其能够累积两个值: 计数和总和. 但最终的累加器值成为结果, 而这不是想要的.
缺少通过将总和除以计数来计算平均值的最后一步. 幸运的是,
Aggregate提供了第三个重载, 使能够提供这个最终步骤.传递第二个回调, 该回调将在源完成时仅被调用一次.
Aggregate将最终的累加器值传递给这个lambda表达式, 然后它返回的任何值都成为Aggregate返回的可观察对象产生的单个项目.IObservable<double> avg = nums.Aggregate( (Count: 0, Sum: 0), (acc, element) => (Count: acc.Count + 1, Sum: acc.Sum + element), acc => ((double) acc.Sum) / acc.Count);一直在展示
Aggregate如何重新实现一些内置聚合操作符, 以说明它是一个强大且非常通用的操作符. 然而, 这不是使用它的目的.Aggregate之所以有用, 正是因为它允许定义自定义聚合.例如, 假设想构建一个通过AIS广播其详细信息的所有船舶名称的列表. 这是一种实现方法:
IObservable<IReadOnlySet<string>> allNames = vesselNames .Take(10) .Aggregate( ImmutableHashSet<string>.Empty, (set, name) => set.Add(name.VesselName));在这里使用了
ImmutableHashSet<string>, 因为它的使用模式恰好适合Aggregate. 普通的HashSet<string>也可以工作, 但不太方便, 因为它的Add方法不返回集合, 所以函数需要一个额外的语句来返回累积的集合:IObservable<IReadOnlySet<string>> allNames = vesselNames .Take(10) .Aggregate( new HashSet<string>(), (set, name) => { set.Add(name.VesselName); return set; });使用这些实现中的任何一个,
vesselNames将产生一个单个值, 该值是一个IReadOnlySet<string>, 包含在报告名称的前10条消息中看到的每个船舶名称.在最后两个示例中, 不得不回避一个问题. 让它们仅在出现的前10条合适消息上工作.
想想如果没有在那里使用
Take(10)会发生什么. 代码将编译, 但会有一个问题.在各种示例中使用的AIS消息源是一个无尽的源. 在可预见的未来, 船舶将继续在海洋上航行.
Ais.NET不包含任何旨在检测文明终结或使船舶使用过时的技术发明的代码, 所以它永远不会对其订阅者调用
OnCompleted.Aggregate返回的可观察对象在其源完成或失败之前不会报告任何内容.所以如果删除那个
Take(10), 行为将与Observable.Never<IReadOnlySet<string>>相同.不得不强制
Aggregate的输入结束, 以使它产生一些东西. 但还有另一种方法.(Scan)扫描
虽然
Aggregate允许将完整序列简化为一个单一的最终值, 但有时这不是需要的.如果正在处理一个无尽的源, 可能想要更像一个运行总和的东西, 每次收到一个值时更新.
Scan操作符正是为满足此要求而设计的.Scan和Aggregate的签名相同; 区别在于Scan不会等待其输入结束. 它在每个项目之后产生聚合值.可以像上一节那样使用它来构建一组船舶名称, 但使用
Scan不必等到最后. 每次收到包含名称的消息时, 它都会报告当前列表:IObservable<IReadOnlySet<string>> allNames = vesselNames .Scan( ImmutableHashSet<string>.Empty, (set, name) => set.Add(name.VesselName));请注意, 即使没有任何变化, 这个
allNames可观察对象也会产生元素.如果累积的名称集合已经包含了刚刚从
vesselNames中出现的名称, 对set.Add的调用将什么也不做, 因为该名称已经在集合中.但是
Scan会为每个输入产生一个输出, 并且不在乎累加器是否没有改变.这是否重要将取决于计划如何处理这个
allNames可观察对象, 但如果需要, 可以使用第5章中显示的DistinctUntilChanged操作符轻松修复此问题.可以将
Scan视为一个展示其工作过程的Aggregate版本. 如果想看看计算平均值如何聚合计数和总和, 可以这样写:IObservable<int> nums = Observable.Range(1, 5); IObservable<(int Count, int Sum)> avgAcc = nums.Scan( (Count: 0, Sum: 0), (acc, element) => (Count: acc.Count + 1, Sum: acc.Sum + element)); avgAcc.Dump("acc");这将产生以下输出:
acc-->(1, 1) acc-->(2, 3) acc-->(3, 6) acc-->(4, 10) acc-->(5, 15) acc completed可以清楚地看到,
Scan在源产生每个值时都发出当前累积值.与
Aggregate不同,Scan不提供接受第二个函数将累加器转换为结果的重载.所以可以在这里看到包含计数和总和的元组, 但不是想要的实际平均值. 但是可以通过使用数据转换(transform)中描述的
Select操作符来实现这一点:IObservable<double> avg = nums.Scan( (Count: 0, Sum: 0), (acc, element) => (Count: acc.Count + 1, Sum: acc.Sum + element)) .Select(acc => ((double) acc.Sum) / acc.Count); avg.Dump("avg");现在得到以下输出:
avg-->1 avg-->1.5 avg-->2 avg-->2.5 avg-->3 avg completedScan是一个比Aggregate更通用的操作符. 可以通过将Scan 和TakeLast操作符组合来实现Aggregate.source.Aggregate(0, (acc, current) => acc + current); // 等同于 source.Scan(0, (acc, current) => acc + current).TakeLast();聚合对于减少数据量或组合多个元素以产生平均值或其他包含来自多个元素信息的度量很有用. 但是为了进行某些类型的分析, 在计算聚合值之前还需要对数据进行切片或以其他方式重新组织. 所以在下一章中, 将研究Rx提供的用于分区数据的各种机制.
第七章:(partition)划分
Rx可以将单个序列拆分为多个序列. 这对于将项目分发给多个订阅者非常有用.
在进行分析时, 对分区进行聚合可能会很有用. 您可能已经熟悉标准LINQ运算符
GroupBy. Rx支持此功能, 并且还定义了一些自己的功能.
GroupBy
GroupBy 运算符允许您像 IEnumerable<T> 的 GroupBy
运算符一样对序列进行分区. 开源的
Ais.Net项目 可以提供一个有用的示例.
其
ReceiverHost类
通过Rx提供AIS消息, 定义了一个类型为 IObservable<IAisMessage> 的
Messages 属性. 这是一个非常繁忙的源, 因为它会报告它能够访问的每条消息.
例如, 如果您将接收器连接到挪威政府慷慨提供的AIS消息源,
每次有船在挪威海岸的任何地方广播AIS消息时, 它都会产生一个通知.
有很多船只在挪威周围移动, 所以这有点像消防水管.
如果确切知道对哪些船只感兴趣, 可以在在 本章过滤(filtering) 中看到了如何过滤此流.
但是, 如果不知道, 而仍然希望能够执行与单个船只相关的处理呢?
例如, 也许想发现任何船只更改其 NavigationStatus (报告诸如 AtAnchor
或 Moored 等值)的任何时间. 过滤章节的 Distinct 和
DistinctUntilChanged 部分 展示了如何做到这一点,
但它首先将流过滤为来自单个船只的消息. 如果尝试直接在所有船只的流上使用
DistinctUntilChanged, 它将不会产生有意义的信息. 如果船A停泊,
船B在锚定, 并且如果从船A和船B收到交替的状态消息, DistinctUntilChanged
会将每条消息报告为状态变化, 即使两艘船的状态都没有改变.
可以通过将 " 所有船只" 序列拆分为许多小序列来解决此问题:
IObservable<IGroupedObservable<uint, IAisMessage>> perShipObservables =
receiverHost.Messages.GroupBy(message => message.Mmsi);
这个 perShipObservables 是一个可观察序列的可观察序列. 更具体地说,
它是一个分组可观察序列的可观察序列, 但正如 IGroupedObservable<TKey,
T>
的定义 中看到的那样, 分组可观察序列只是一种特殊类型的可观察序列:
public interface IGroupedObservable<out TKey, out TElement> : IObservable<TElement>
{
TKey Key { get; }
}
每次 receiverHost.Message 报告一条AIS消息时, GroupBy
运算符将调用回调以找出此项目属于哪个组.
将回调返回的值称为键, GroupBy 会记住它已经看到的每个键.
如果这是一个新键, GroupBy 将创建一个新的 IGroupedObservable, 其
Key 属性将是回调刚刚返回的值.
它从外部可观察序列(放入 perShipObservables 的那个)发出这个
IGroupedObservable, 然后立即使那个新的 IGroupedObservable
发出产生该键的元素(在此示例中为 IAisMessage). 但是, 如果回调产生一个
GroupBy 以前见过的键, 它会找到它已经为该键生成的 IGroupedObservable,
并使它发出该值.
因此, 在这个示例中, 效果是每当 receiverHost
报告来自以前从未听说过的船只的消息时, perShipObservables
将发出一个新的可观察序列, 该序列仅报告该船只的消息.
可以使用此功能来报告每次了解到新船只的情况:
perShipObservables.Subscribe(m => Console.WriteLine($"New ship! {m.Key}"));
但这并没有做任何用 Distinct 无法实现的事情. GroupBy
的强大之处在于在这里为每艘船获得一个可观察序列,
因此可以继续设置一些每艘船的处理:
IObservable<IObservable<IAisMessageType1to3>> shipStatusChangeObservables =
perShipObservables.Select(shipMessages => shipMessages
.OfType<IAisMessageType1to3>()
.DistinctUntilChanged(m => m.NavigationStatus)
.Skip(1));
这使用 Select 对从 perShipObservables 出来的每个组应用处理.
每个这样的组代表一艘不同的船, 因此在此传递给 Select
的回调将为每艘船恰好调用一次. 这意味着现在可以使用
DistinctUntilChanged.
此示例提供给 DistinctUntilChanged
的输入是一个表示仅来自一艘船的消息的序列,
因此这将告诉该船何时更改其状态.
现在这能够做到想要的, 因为每艘船都有自己的 DistinctUntilChanged 实例.
DistinctUntilChanged 总是转发它收到的第一个事件 –
只有当项目与前一个项目相同时它才会丢弃项目,
而在这种情况下没有前一个项目. 但这在这里不太可能是正确的行为. 假设从名为
A 的某艘船上看到的第一条消息报告的状态为 Moored.
有可能在开始运行之前, 它处于某种不同的状态,
并且收到的第一条报告恰好代表状态变化. 但更有可能的是,
在开始之前它已经停泊了一段时间. 不能确定, 但大多数状态报告并不代表变化,
因此 DistinctUntilChanged 总是转发第一个事件的行为在这里可能是错误的.
所以使用 Skip(1) 丢弃每艘船的第一条消息.
此时有一个可观察序列的可观察序列.
外部序列为它看到的每艘不同的船生成一个嵌套序列,
并且该嵌套序列将报告该特定船只的 NavigationStatus 变化.
要做一个小调整:
IObservable<IAisMessageType1to3> shipStatusChanges =
perShipObservables.SelectMany(shipMessages => shipMessages
.OfType<IAisMessageType1to3>()
.DistinctUntilChanged(m => m.NavigationStatus)
.Skip(1));
用 SelectMany 替换了 Select.
SelectMany 将嵌套的可观察序列扁平化为单个扁平序列.
可以从返回类型中看到这一点: 现在只有一个
IObservable<IAisMessageType1to3>, 而不是一个序列的序列.
等一下! 不是刚刚撤销了 GroupBy 所做的工作吗?
要求它按船只ID对事件进行分区, 那么为什么现在又将其重新组合回单个扁平流呢? 这不是开始时的样子吗?
确实, 流类型与的原始输入具有相同的形状:
这将是一个AIS消息的单个可观察序列. (它更专业化一点 – 元素类型是
IAisMessageType1to3, 因为可以从那里获取 NavigationStatus,
但这些仍然都实现 IAisMessage. )
并且所有不同的船只将在这个流中混合在一起. 但实际上并没有否定 GroupBy
所做的工作. 这个弹珠图说明了发生了什么:
perShipObservables 部分显示了 GroupBy
如何为每艘不同的船只创建一个单独的可观察序列. (此图显示了三艘船只, 名为
A, B 和 C. 对于实际源, GroupBy 会产生更多的可观察序列,
但原理相同. )
在将这些组流扁平化之前, 对它们进行了一些处理. 如前所述, 使用
DistinctUntilChanged 和 Skip(1)
以确保仅在确定船只状态已更改时才产生事件. (由于只看到 A 报告状态为
Moored, 那么据所知它的状态从未改变, 这就是为什么它的流完全为空.
)只有在那时才将其扁平化回单个可观察序列.
弹珠图需要简单才能适合页面, 所以现在让快速看一些实际输出.
这证实了这与原始的 receiverHost.Messages 非常不同. 首先,
需要附加一个订阅者:
shipStatusChanges.Subscribe(m => Console.WriteLine(
$"Vessel {((IAisMessage)m).Mmsi} changed status to {m.NavigationStatus} at {DateTimeOffset.UtcNow}"));
如果然后让接收器运行大约十分钟, 会看到这样的输出:
Vessel 257076860 changed status to UnderwayUsingEngine at 23/06/2023 06:42:48 +00:00
Vessel 257006640 changed status to UnderwayUsingEngine at 23/06/2023 06:43:08 +00:00
Vessel 259005960 changed status to UnderwayUsingEngine at 23/06/2023 06:44:23 +00:00
Vessel 259112000 changed status to UnderwayUsingEngine at 23/06/2023 06:44:33 +00:00
Vessel 259004130 changed status to Moored at 23/06/2023 06:44:43 +00:00
Vessel 257076860 changed status to NotDefined at 23/06/2023 06:44:53 +00:00
Vessel 258024800 changed status to Moored at 23/06/2023 06:45:24 +00:00
Vessel 258006830 changed status to UnderwayUsingEngine at 23/06/2023 06:46:39 +00:00
Vessel 257428000 changed status to Moored at 23/06/2023 06:46:49 +00:00
Vessel 257812800 changed status to Moored at 23/06/2023 06:46:49 +00:00
Vessel 257805000 changed status to Moored at 23/06/2023 06:47:54 +00:00
Vessel 259366000 changed status to UnderwayUsingEngine at 23/06/2023 06:47:59 +00:00
Vessel 257076860 changed status to UnderwayUsingEngine at 23/06/2023 06:48:59 +00:00
Vessel 257020500 changed status to UnderwayUsingEngine at 23/06/2023 06:50:24 +00:00
Vessel 257737000 changed status to UnderwayUsingEngine at 23/06/2023 06:50:39 +00:00
Vessel 257076860 changed status to NotDefined at 23/06/2023 06:51:04 +00:00
Vessel 259366000 changed status to Moored at 23/06/2023 06:51:54 +00:00
Vessel 232026676 changed status to Moored at 23/06/2023 06:51:54 +00:00
Vessel 259638000 changed status to UnderwayUsingEngine at 23/06/2023 06:52:34 +00:00
这里要理解的关键是, 在十分钟的时间内, receiverHost.Messages
产生了数千条消息. (速率因一天中的时间而异, 但通常每分钟超过一千条消息.
当运行代码以产生该输出时, 代码将处理大约一万条消息. ) 但如您所见,
shipStatusChanges 只产生了19条消息.
这表明Rx可以以比单纯聚合强大得多的方式驯服高容量事件源. 不仅仅将数据减少到一些只能提供概述的统计度量.
诸如平均值或方差之类的统计度量通常非常有用, 但它们并不总是能够为提供想要的特定领域的见解.
例如, 它们无法告诉任何特定船只的任何信息. 但在这里, 每条消息都告诉关于特定船只的一些信息. 尽管在查看每艘船, 但能够保留那种详细程度. 能够指示Rx在任何船只更改其状态时通知.
可能看起来对此大惊小怪, 但实现这个结果所花费的精力如此之少, 以至于很容易忽略Rx在这里为做了多少工作. 这段代码执行了以下所有操作:
- 监控在挪威水域运营的每一艘船
- 提供每艘船的信息
- 以人类可以合理应对的速率报告事件
它可以处理数千条消息, 并执行必要的处理以找到对真正重要的少数消息.
这是在"SelectMany的重要性"中描述的"扇出, 然后再扇入"技术的一个示例.
这段代码使用 GroupBy 从单个可观察序列扇出到多个可观察序列.
此步骤的关键是创建嵌套的可观察序列, 为想要执行的处理提供正确的详细程度.
在此示例中, 该详细程度是 " 一艘特定的船" , 但不一定必须如此.
可以想象按区域对消息进行分组 – 也许对比较不同的港口感兴趣, 所以希望根据船只最接近的港口或其目的港对源进行分区. (AIS为船只提供了一种广播其预期目的地的方式. )
根据需要的任何标准对数据进行分区后, 然后定义要应用于每个组的处理.
在这种情况下, 只是观察 NavigationStatus 的变化.
此步骤通常是数据量减少的地方.
例如, 大多数船只每天最多只会更改几次 NavigationStatus.
在将通知流减少到真正关心的那些事件之后,
可以将其组合回一个提供想要的高价值通知的单个流.
当然, 这种强大功能是有代价的. 让Rx为做这项工作不需要太多代码, 但正在使它相当努力地工作:
它需要记住到目前为止它看到的每艘船, 并为每艘船维护一个可观察源. 如果数据源范围足够广, 可以接收来自数万艘船只的消息, Rx将需要维护数万个可观察源, 每个船只一个.
所示示例没有类似的不活动超时 – 即使船只只广播一条消息, 只要程序运行, 它就会被记住. (恶意行为者伪造每条都带有不同虚构标识符的AIS消息最终会因耗尽内存而导致此代码崩溃. )
根据您的数据源, 您可能需要采取措施避免内存使用无限制增长, 因此实际示例可能比这更复杂, 但基本方法很强大.
现在已经看到了一个示例, 让更详细地看一下 GroupBy. 它有几种不同的形式.
刚刚使用了这个重载:
public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(
this IObservable<TSource> source,
Func<TSource, TKey> keySelector)
该重载使用您选择的键类型的默认比较行为. 在例子中, 使用了 uint
(在AIS消息中唯一标识船只的 Mmsi 属性的类型), 它只是一个数字,
所以它是一种本质上可比较的类型.
在某些情况下, 可能需要非标准比较. 例如, 如果使用 string 作为键,
可能希望能够指定特定于区域设置的不区分大小写的比较. 对于这些情况,
有一个接受比较器的重载:
public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(
this IObservable<TSource> source,
Func<TSource, TKey> keySelector,
IEqualityComparer<TKey> comparer)
还有两个重载扩展了前面两个, 带有一个 elementSelector 参数:
public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(
this IObservable<TSource> source,
Func<TSource, TKey> keySelector,
Func<TSource, TElement> elementSelector)
{...}
public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(
this IObservable<TSource> source,
Func<TSource, TKey> keySelector,
Func<TSource, TElement> elementSelector,
IEqualityComparer<TKey> comparer)
{...}
这在功能上等同于在 GroupBy 之后使用 Select 运算符.
顺便说一下, 当使用 GroupBy 时, 您可能会想直接订阅嵌套的可观察序列:
// 不要这样做. 使用前面的示例.
perShipObservables.Subscribe(shipMessages =>
shipMessages
.OfType<IAisMessageType1to3>()
.DistinctUntilChanged(m => m.NavigationStatus)
.Skip(1)
.Subscribe(m => Console.WriteLine(
$"Ship {((IAisMessage)m).Mmsi} changed status to {m.NavigationStatus} at {DateTimeOffset.UtcNow}")));
这似乎可能有相同的效果: 这里的 perShipObservables 是 GroupBy
返回的序列, 所以它将为每艘不同的船产生一个可观察流.
这个示例订阅了它, 然后在每个嵌套序列上使用与之前相同的运算符, 但不是用
SelectMany 将结果收集到一个单一的输出可观察序列中,
而是为每个嵌套流显式调用 Subscribe.
如果您不熟悉Rx, 这可能看起来是一种更自然的工作方式. 但是, 尽管这似乎会产生相同的行为, 但它引入了一个问题: Rx不理解这些嵌套订阅与外部订阅相关联.
在这个简单的示例中, 这不一定会导致问题, 但如果开始使用其他运算符, 就可能会出现问题. 考虑这个修改:
IDisposable sub = perShipObservables.Subscribe(shipMessages =>
shipMessages
.OfType<IAisMessageType1to3>()
.DistinctUntilChanged(m => m.NavigationStatus)
.Skip(1)
.Finally(() => Console.WriteLine($"Nested sub for {shipMessages.Key} ending"))
.Subscribe(m => Console.WriteLine(
$"Ship {((IAisMessage)m).Mmsi} changed status to {m.NavigationStatus} at {DateTimeOffset.UtcNow}")));
为嵌套序列添加了一个 Finally 运算符.
这使能够在序列因任何原因结束时调用一个回调.
但是, 即使从外部序列取消订阅(通过调用 sub.Dispose();), 这个 Finally
也永远不会做任何事情.
这是因为Rx无法知道这些内部订阅是外部订阅的一部分.
如果对前面的版本进行相同的修改, 在那个版本中, 这些嵌套序列被
SelectMany 收集到一个输出序列中, Rx理解对内部序列的订阅仅因为对
SelectMany 返回的序列的订阅而存在. (实际上, SelectMany
订阅了那些内部序列. )所以, 如果从那个示例中的输出序列取消订阅,
它将正确地在任何内部序列上运行任何 Finally 回调.
更一般地说, 如果您有许多序列作为单个处理链的一部分而存在, 通常最好让Rx从头到尾管理这个过程.
Buffer
如果您需要批量处理事件, Buffer 运算符很有用. 这对于性能可能很有用,
特别是如果您要存储有关事件的数据. 以AIS示例为例.
如果您想将通知记录到持久存储中, 存储单个记录的成本可能几乎与存储多个记录的成本相同.
大多数存储设备以通常几千字节大小的数据块进行操作, 所以存储单个字节数据所需的工作量通常与存储几千字节数据所需的工作量相同.
在编程中, 缓冲数据直到有一个相当大的工作块的模式一直存在. .NET运行时库的
Stream 类为此原因具有内置缓冲, 所以它被内置到Rx中也就不足为奇了.
效率问题并不是您可能希望一次处理多个事件而不是单个事件的唯一原因.
假设您想生成一个关于某个数据源的不断更新的统计信息流. 通过使用 Buffer
将源分割成块, 您可以计算, 例如, 过去10个事件的平均值.
Buffer 可以对来自源流的元素进行分区, 所以它是一种与 GroupBy
类似的运算符, 但有几个重要的区别.
- 首先,
Buffer不检查元素以确定如何对它们进行分区 – 它纯粹根据元素出现的顺序进行分区. - 其次,
Buffer等待直到它完全填满一个分区, 然后将所有元素作为一个IList<T>呈现. 这可以使某些任务更容易, 因为分区中的所有内容都立即可用 – 值不会隐藏在嵌套的IObservable<T>中. - 第三,
Buffer提供了一些重载, 使得单个元素可以出现在多个 " 分区" 中. (在这种情况下,Buffer不再严格地对数据进行分区, 但正如您将看到的, 这只是其他行为的一个小变化. )
使用 Buffer 的最简单方法是将相邻元素收集成块. (LINQ to
Objects现在有一个等效的运算符, 它称为 Chunk.
Rx没有使用相同名称的原因是Rx在LINQ to Objects之前十多年就引入了这个运算符. 所以真正的问题是为什么LINQ to Objects选择了不同的名称.
可能是因为 Chunk 不支持Rx的 Buffer 所支持的所有变体,
但您需要问.NET运行时库团队. )这个 Buffer 的重载接受一个参数,
指示您想要的块大小:
public static IObservable<IList<TSource>> Buffer<TSource>(
this IObservable<TSource> source,
int count)
{...}
这个示例使用它将导航消息分成大小为4的块, 然后继续计算这4个读数的平均速度:
IObservable<IList<IVesselNavigation>> navigationChunks =
receiverHost.Messages.Where(v => v.Mmsi == 235009890)
.OfType<IVesselNavigation>()
.Where(n => n.SpeedOverGround.HasValue)
.Buffer(4);
IObservable<float> recentAverageSpeed =
navigationChunks.Select(chunk => chunk.Average(n => n.SpeedOverGround.Value));
如果源完成, 并且没有产生块大小的精确倍数, 最后一个块将更小. 可以通过以下更人为的示例看到这一点:
Observable
.Range(1, 5)
.Buffer(2)
.Select(chunk => string.Join(", ", chunk))
.Dump("chunks");
从这个输出中可以看到, 最后一个块只有一个项目, 即使一次要求2个:
chunks-->1, 2
chunks-->3, 4
chunks-->5
chunks completed
Buffer 在这里别无选择, 因为源完成了,
如果它没有产生那个最后大小不足的块, 将永远看不到最后一个项目.
但除了这种源结束的情况, 这个 Buffer
的重载会等待直到它收集到足够的元素来填充指定大小的缓冲区, 然后才传递它.
这意味着 Buffer 引入了延迟. 如果源项目相距很远(例如, 当船不移动时,
它可能每隔几分钟才报告一次AIS导航数据), 这可能会导致长时间的延迟.
在某些情况下, 可能希望在源繁忙时批量处理多个事件, 而在源运行较慢时不必等待很长时间. 这在用户界面中会很有用.
如果您想提供最新信息, 接受一个大小不足的块可能更好,
这样您就可以提供更及时的信息. 对于这些场景, Buffer 提供了接受
TimeSpan 的重载:
public static IObservable<IList<TSource>> Buffer<TSource>(
this IObservable<TSource> source,
TimeSpan timeSpan)
{...}
public static IObservable<IList<TSource>> Buffer<TSource>(
this IObservable<TSource> source,
TimeSpan timeSpan,
int count)
{...}
第一个重载仅根据时间对源进行分区. 无论 source 产生值的速率如何,
这将每秒发出一个块:
IObservable<IList<string>> output = source.Buffer(TimeSpan.FromSeconds(1));
如果 source 在任何特定块的生命周期内没有发出任何值, output
将发出一个空列表.
第二个重载, 接受 timespan 和 count, 本质上施加了两个上限:
您永远不必在块之间等待超过 timespan 的时间,
并且您永远不会收到一个包含超过 count 个元素的块.
与仅接受 timespan 的重载一样,
如果源没有足够快地产生元素以在指定时间内填充缓冲区,
这可能会传递不满甚至空的块.
重叠缓冲区
在上一节中, 展示了一个示例, 该示例收集了特定船只的4个
IVesselNavigation 条目的块, 并计算了平均速度.
这种对多个样本求平均的方法可以是平滑读数中轻微随机变化的有用方法.
所以在这种情况下, 目标不是为了提高效率而批量处理项目, 而是为了实现一种特定的计算.
但是这个示例有一个问题: 因为它是对4个读数求平均, 所以它每4个输入消息才产生一个输出. 而且由于船只如果不移动可能每隔几分钟才报告一次速度, 可能要等很长时间.
Buffer 有一个重载可以让做得更好一点: 可能不想先对前4个读数求平均,
然后对接下来的4个读数求平均, 再对接下来的4个读数求平均, 等等,
而是每次船只报告一个新读数时, 计算最后4个读数的平均值.
这有时被称为滑动窗口. 想要处理读数1, 2, 3, 4, 然后是2, 3, 4, 5, 然后是3,
4, 5, 6, 等等. 有一个 Buffer 的重载可以做到这一点.
这个示例显示了前面平均速度示例中的第一条语句, 但有一个小修改:
IObservable<IList<IVesselNavigation>> navigationChunks = receiverHost.Messages
.Where(v => v.Mmsi == 235009890)
.OfType<IVesselNavigation>()
.Where(n => n.SpeedOverGround.HasValue)
.Buffer(4, 1);
这调用了 Buffer 的一个重载, 它接受两个 int 参数.
第一个参数的作用与之前相同: 它指示希望每个块中有4个项目.
但第二个参数指示多久产生一个缓冲区.
这表示希望源产生的每个 1 个元素(即每个单个元素)都有一个缓冲区. (仅接受
count 的重载等同于将相同的值传递给这个重载的两个参数. )
所以这将等待源产生4个合适的消息(即满足这里的 Where 和 OfType
运算符的消息), 然后将报告从 navigationChunks 中出现的第一个
IList<VesselNavigation> 中的前四个读数.
但是源只需要再产生一个合适的消息, 然后这将发出另一个
IList<VesselNavigation>, 其中包含与第一个块中相同的3个值, 然后是新值.
当下一个合适的消息出现时, 这将发出另一个列表, 其中包含第3, 4, 5和6个消息, 依此类推.
这个弹珠图说明了 Buffer(4, 1) 的行为.
如果将其输入到与前面示例相同的 recentAverageSpeed 表达式中,
仍然要等到源产生第4个合适的消息才会得到输出, 但从那时起,
源产生的每个合适的消息都会发出一个新的平均值.
这些平均值仍然总是报告最近报告的4个速度的平均值, 但现在将得到这些平均值的频率是原来的四倍.
也可以使用这个来改进前面报告船只更改其 NavigationStatus 的示例.
最后一个示例告诉您一艘船刚刚进入的状态, 但这提出了一个明显的问题:
它之前处于什么状态? 可以使用 Buffer(2, 1),
这样每次看到一个指示状态变化的消息时, 也可以访问之前的状态变化:
IObservable<IList<IAisMessageType1to3>> shipStatusChanges =
perShipObservables.SelectMany(shipMessages => shipMessages
.OfType<IAisMessageType1to3>()
.DistinctUntilChanged(m => m.NavigationStatus)
.Buffer(2, 1));
IDisposable sub = shipStatusChanges.Subscribe(m => Console.WriteLine(
$"Ship {((IAisMessage)m[0]).Mmsi} changed status from" +
$" {m[1].NavigationStatus} to {m[1].NavigationStatus}" +
$" at {DateTimeOffset.UtcNow}"));
如输出所示, 现在可以报告之前的状态以及刚刚进入的状态:
Ship 259664000 changed status from UnderwayUsingEngine to Moored at 30/06/2023
13:36:39 +00:00
Ship 257139000 changed status from AtAnchor to UnderwayUsingEngine at 30/06/20
23 13:38:39 +00:00
Ship 257798800 changed status from UnderwayUsingEngine to Moored at 30/06/2023
13:38:39 +00:00
这个改变使能够删除 Skip. 前面的示例有 Skip
是因为无法确定在启动后从任何特定船只收到的第一条消息是否代表变化.
但是由于告诉 Buffer 想要成对的消息,
所以在它看到具有两个不同状态的消息之前, 它不会为任何一艘船提供任何东西.
您也可以使用这个重载要求一个由时间而不是计数定义的滑动窗口:
public static IObservable<IList<TSource>> Buffer<TSource>(
this IObservable<TSource> source,
TimeSpan timeSpan,
TimeSpan timeShift)
{...}
timeSpan 确定每个窗口覆盖的时间长度, timeShift 确定新窗口开始的间隔.
Window
Window 运算符与 Buffer 非常相似.
它可以根据元素计数或时间将输入分割成块, 并且它也支持重叠窗口.
然而, 它有不同的返回类型. 当在 IObservable<T> 上使用 Buffer 时,
它将返回一个 IObservable<IList<T>>, 而 Window 将返回一个
IObservable<IObservable<T>>.
这意味着 Window 不必等到填满整个缓冲区才产生任何东西. 可以说 Window
比 Buffer 更充分地接受了响应式范式. 话虽如此, 经过一些经验后,
您可能会得出结论, Window 比 Buffer 更难使用,
但在实践中很少有更多的用处.
因为 Buffer 返回一个 IObservable<IList<T>>,
所以它在拥有构成该块的所有元素之前无法产生一个块. IList<T>
支持随机访问 – 您可以询问它有多少个元素,
并且可以通过数字索引检索任何元素, 期望这些操作立即完成. (从技术上讲,
可以编写一个表示尚未收到的数据的 IList<T> 实现,
并且如果在数据可用之前尝试使用其 Count 和索引器属性, 使其阻塞,
但这将是一件奇怪的事情. 开发人员期望列表立即返回信息, Rx的 Buffer
产生的列表满足了这一期望. )
所以, 如果您编写, 例如, Buffer(4),
在拥有构成第一个块的所有4个项目之前, 它无法产生任何东西.
但是因为 Window 返回一个可观察序列,
该序列产生一个嵌套的可观察序列来表示每个块,
所以它可以在不一定拥有所有元素之前就发出那个嵌套的可观察序列.
实际上, 它一旦知道需要一个新窗口就会立即发出. 例如, 如果您使用
Window(4, 1), 它返回的可观察序列会立即发出它的第一个嵌套可观察序列.
然后, 一旦源产生它的第一个元素, 那个嵌套的可观察序列就会发出该元素,
然后第二个嵌套可观察序列就会被产生. 将 1 作为第二个参数传递给
Window, 所以源产生的每个元素都会有一个新窗口.
一旦第一个元素被发出, 源发出的下一个项目将出现在第二个窗口中(并且也在第一个窗口中, 因为在这种情况下指定了重叠窗口), 所以第二个窗口实际上在第一个元素出现后立即 " 打开".
所以 Window 返回的 IObservable<IObservable<T>>
会在那个时候产生一个新的 IObservable<T>.
嵌套的可观察序列在元素可用时产生它们的项目. 它们在 Window
知道该窗口中不会再有项目时完成(即在与 Buffer 为该窗口产生完成的
IList<T> 完全相同的点).
Window 看起来可能比 Buffer 更好,
因为它让您在块中的单个项目可用时立即获取它们.
然而, 如果您正在进行需要访问块中每个单个项目的计算, 这并不一定对您有帮助.
在收到块中的每个项目之前, 您无法完成处理, 所以您不会更早地产生最终结果,
并且您的代码可能更复杂, 因为它不能再依赖于有一个 IList<T>
方便地一次性提供所有项目. 然而, 如果您正在计算块中项目的某种聚合,
Window 可能更有效, 因为它使您能够在每个项目出现时处理它, 然后丢弃它.
如果一个块非常大, Buffer 将不得不保留每个项目, 直到块完成,
这可能会使用更多内存. 此外,
在您不一定需要在对块中的项目执行有用操作之前看到每个项目的情况下,
Window 可能使您能够避免引入处理延迟.
Window 在AIS NavigationStatus 示例中对没有帮助,
因为那里的目标是报告每个状态变化的 " 之前" 和 " 之后" 状态. 在知道 "
之后" 的值之前, 无法做到这一点, 所以提前收到 " 之前" 的值对没有好处.
需要第二个值来做试图做的事情, 所以不妨使用 Buffer, 因为它更容易. 但是,
如果您想跟踪到今天为止报告移动的不同船只的数量, Window
将是一个合适的机制: 您可以将其设置为每天产生一个窗口,
并且您将能够在每个窗口内开始看到信息, 而无需等到一天结束.
除了支持简单的基于计数或基于持续时间的拆分之外, 还有更灵活的方式来定义窗口边界, 例如这个重载:
// 将可观察序列的每个元素投影到连续的非重叠窗口中.
// windowClosingSelector: 一个用于定义产生的窗口边界的函数. 当先前的窗口关闭时, 会启动一个新窗口.
public static IObservable<IObservable<TSource>> Window<TSource, TWindowClosing>
(
this IObservable<TSource> source,
Func<IObservable<TWindowClosing>> windowClosingSelector
)
这些复杂重载中的第一个允许控制窗口何时关闭. windowClosingSelector
函数在每次创建窗口时被调用, 并且每个窗口将在 windowClosingSelector
产生的值时关闭. 该值被忽略, 所以序列值的类型无关紧要; 实际上,
您可以直接完成 windowClosingSelector 的序列来关闭窗口.
在这个示例中, 创建一个带有关闭选择器的窗口. 每次都从那个选择器返回相同的主题, 然后每当用户在控制台按下回车键时从主题发出通知.
int windowIdx = 0;
IObservable<long> source = Observable.Interval(TimeSpan.FromSeconds(1)).Take(10);
var closer = new Subject<Unit>();
source.Window(() => closer)
.Subscribe(window =>
{
int thisWindowIdx = windowIdx++;
Console.WriteLine("--Starting new window");
string windowName = $"Window{thisWindowIdx}";
window.Subscribe(
value => Console.WriteLine("{0} : {1}", windowName, value),
ex => Console.WriteLine("{0} : {1}", windowName, ex),
() => Console.WriteLine("{0} Completed", windowName));
},
() => Console.WriteLine("Completed"));
string input = "";
while (input!= "exit")
{
input = Console.ReadLine();
closer.OnNext(Unit.Default);
}
输出(当在显示 '1' 和 '5' 后按下回车键时):
#+end_src
–Starting new window window0 : 0 window0 : 1 window0 Completed –Starting new window window1 : 2 window1 : 3 window1 : 4 window1 : 5 window1 Completed –Starting new window window2 : 6 window2 : 7 window2 : 8 window2 : 9 window2 Completed Completed
#+end_src
Window 的最复杂重载允许创建可能重叠的窗口.
// 将可观察序列的每个元素投影到零个或多个窗口中.
// windowOpenings: 一个可观察序列, 其元素表示新窗口的创建.
// windowClosingSelector: 一个用于定义每个产生的窗口关闭的函数.
public static IObservable<IObservable<TSource>> Window
<TSource, TWindowOpening, TWindowClosing>
(
this IObservable<TSource> source,
IObservable<TWindowOpening> windowOpenings,
Func<TWindowOpening, IObservable<TWindowClosing>> windowClosingSelector
)
这个重载接受三个参数
- 源序列
- 一个指示何时应打开新窗口的序列
- 一个接受窗口打开值并返回窗口关闭序列的函数
这个重载在窗口的打开和关闭方式上提供了很大的灵活性. 窗口可以在很大程度上相互独立; 它们可以重叠, 大小不同, 甚至可以跳过源中的值.
为了更容易理解这个更复杂的重载, 让首先尝试使用它来重新创建一个更简单的
Window 版本(接受计数的重载).
为此, 需要在初始订阅时打开一个窗口, 并且每次源产生了指定数量时再次打开一个窗口. 窗口需要在达到该数量时关闭.
为了实现这一点, 只需要源序列. 将多次订阅它, 但对于某些类型的源,
这可能会导致问题, 所以通过 Publish 运算符进行订阅,
该运算符在仅对基础源进行一次订阅的同时允许多个订阅者.
public static IObservable<IObservable<T>> MyWindow<T>(
this IObservable<T> source,
int count)
{
IObservable<T> shared = source.Publish().RefCount();
IObservable<int> windowEdge = shared
.Select((i, idx) => idx % count)
.Where(mod => mod == 0)
.Publish()
.RefCount();
return shared.Window(windowEdge, _ => windowEdge);
}
如果现在想扩展这个方法以提供跳过功能, 需要有两个不同的序列:
一个用于打开, 一个用于关闭. 在订阅时打开一个窗口, 并且在 skip
个项目过去之后再次打开. 在窗口打开后经过 'count' 个项目时关闭那些窗口.
public static IObservable<IObservable<T>> MyWindow<T>(
this IObservable<T> source,
int count,
int skip)
{
if (count <= 0) throw new ArgumentOutOfRangeException();
if (skip <= 0) throw new ArgumentOutOfRangeException();
IObservable<T> shared = source.Publish().RefCount();
IObservable<int> index = shared
.Select((i, idx) => idx)
.Publish()
.RefCount();
IObservable<int> windowOpen = index.Where(idx => idx % skip == 0);
IObservable<int> windowClose = index.Skip(count - 1);
return shared.Window(windowOpen, _ => windowClose);
}
可以看到, 由于 windowClose 序列是从一个函数返回的,
所以每次打开一个窗口时它都会被重新订阅.
这使能够为每个窗口重新应用跳过(Skip(count - 1)). 目前, 忽略了
windowOpen 推送到 windowClose 选择器的值, 但如果您的某些逻辑需要它,
它是可供您使用的.
如您所见, Window 运算符可以非常强大. 甚至可以使用 Window
来复制其他运算符; 例如, 可以通过这种方式创建自己的 Buffer 实现.
可以让 SelectMany
运算符接受一个单个值(窗口)来产生零个或多个其他类型的值(在例子中,
一个单个 IList<T>). 为了在不阻塞的情况下创建 IList<T>, 可以应用
Aggregate 方法并使用一个新的 List<T> 作为种子.
public static IObservable<IList<T>> MyBuffer<T>(this IObservable<T> source, int count)
{
return source.Window(count)
.SelectMany(window =>
window.Aggregate(
new List<T>(),
(list, item) =>
{
list.Add(item);
return list;
}));
}
尝试用 Window 实现其他时间偏移方法, 如 Sample 或 Throttle,
是一个有趣的练习.
已经看到了几种有用的方法, 可以使用数据驱动的分组标准, 或者使用 Buffer
或 Window 的基于时间的分块, 将单个项目流分散到多个输出序列中.
在下一章中, 将研究可以将来自多个流的数据组合在一起的运算符.
第八章:(Combination)组合
数据源无处不在, 有时需要从多个数据源获取数据.
常见的有多个输入的示例包括: 价格波动, 传感器网络, 新闻源, 社交媒体聚合器, 文件监视器, 多点触控表面, 心跳/轮询服务器等.
处理这些多个刺激的方式也各不相同. 可能希望将其全部作为集成数据的洪流来处理, 或者一次处理一个序列作为顺序数据.
也可以有序地获取数据, 将来自两个源的数据值配对以便一起处理, 或者只获取对请求作出响应的第一个源的数据.
前面的章节也展示了一些"扇出和扇入" 式数据处理的示例, 在其中对数据进行分区, 并对每个分区执行处理, 以便在重新组合之前将大量数据转换为少量高价值事件. 这种重组流的能力极大地增强了操作符组合的优势.
如果Rx只允许将组合作为简单的线性处理链应用, 那么它的功能将会大打折扣.
能够将流分开给了更多的灵活性. 因此, 即使只有一个事件源, 通常仍需要在处理过程中组合多个可观察流.
序列组合使您能够跨多个数据源创建复杂查询. 这开启了编写一些非常强大而简洁的代码的可能性.
在前面的章节中已经使用了 SelectMany. 这是Rx中的基本操作符之一.
正如在 转换(Transform) 中看到的, 可以从 SelectMany 构建其他几个操作符,
其组合流的能力是其强大的部分原因.
但是还有更多专门的组合操作符可用, 这使得解决某些问题比使用 SelectMany
更容易.
此外, 之前见过的一些操作符(包括 TakeUntil 和
Buffer)有一些尚未探索过的重载, 可以组合多个序列.
顺序组合
将从最简单的组合操作符开始, 这些操作符不尝试并发组合. 它们一次处理一个源序列.
Concat
Concat可以说是组合序列的最简单方法.它与其他LINQ提供程序中的同名方法作用相同: 它连接两个序列.
结果序列产生第一个序列的所有元素, 然后是第二个序列的所有元素.
Concat的最简单签名如下:public static IObservable<TSource> Concat<TSource>( this IObservable<TSource> first, IObservable<TSource> second)由于
Concat是一个扩展方法, 可以将其作为任何序列的方法调用, 将第二个序列作为唯一参数传入:IObservable<int> s1 = Observable.Range(0, 3); IObservable<int> s2 = Observable.Range(5, 5); IObservable<int> c = s1.Concat(s2); IDisposable sub = c.Subscribe(Console.WriteLine, x => Console.WriteLine("Error: " + x));这个弹珠图展示了从两个源
s1和s2产生的项目, 以及Concat如何将它们组合成结果c:Rx的
Concat在有订阅者订阅它返回的IObservable<T>之前, 不会对其源进行任何操作.所以在这种情况下, 当对
c(Concat返回的源)调用Subscribe时, 它将订阅其第一个输入s1, 并且每次s1产生一个值时,c可观察对象将向其订阅者发出相同的值.如果在
s1完成之前调用sub.Dispose(),Concat将取消订阅第一个源, 并且永远不会订阅s2.如果
s1报告错误,c将向其订阅者报告相同的错误, 并且同样, 它将永远不会订阅s2.只有当
s1完成时,Concat操作符才会订阅s2, 此时它将转发第二个输入产生的任何项目, 直到第二个源完成或失败, 或者应用程序取消订阅连接的可观察对象.虽然Rx的
Concat与 LINQ to Objects中的 Concat 具有相同的逻辑行为, 但仍有一些Rx特定的细节需要注意.特别是, 在Rx中, 时间通常比其他LINQ实现更重要. 例如, 在Rx中, 区分热和冷源.
对于冷源, 通常在何时订阅并不重要, 但热源本质上是实时的, 所以只有在订阅时发生的事情才会通知.
这可能意味着热源可能不太适合与
Concat一起使用. 下面的弹珠图说明了一种可能产生意外结果的场景:由于
Concat在第一个输入完成之前不会订阅第二个输入, 所以它不会看到" 热" 源会传递给从一开始就监听的订阅者的前几个项目.这可能不是期望的行为: 它看起来肯定不像是将第一个序列的所有项目与第二个序列的所有项目连接起来了. 它看起来像是遗漏了"热" 中的A和B.
弹珠图的局限性
最后一个例子揭示了弹珠图忽略了一个细节: 它们显示了源何时开始, 何时产生值以及何时完成, 但它们忽略了一个事实, 即可观察源需要有订阅者才能产生元素.
如果没有任何订阅
IObservable<T>, 实际上不有任何产出.Concat在第一个输入完成之前不会订阅第二个输入, 所以可以说, 下面的图会更准确地展示这种情况:这使得更容易理解为什么
Concat会产生这样的输出. 但是由于 热 在这里是一个热源, 这个图没有传达出 热 完全按照自己的时间表产生项目的事实.在 热 有多个订阅者的场景中, 那么前面的图可能更好, 因为它正确地反映了 热 可用的每个事件 (无论在任何特定时刻可能有多少个监听者订阅).
但是, 尽管这种约定适用于热源, 但对于冷源却不适用, 冷源通常在订阅时开始产生项目.
由
Timer返回的源按照固定的时间表产生项目, 但该时间表从订阅发生的那一刻开始.这意味着如果有多个订阅, 就会有多个时间表.
即使只有一个由
Observable.Timer返回的IObservable<long>, 每个不同的订阅者也会按照自己的时间表获得项目–订阅者从他们订阅的那一刻开始, 以固定的时间间隔接收事件.所以对于冷可观察对象, 通常使用第二个图中的约定是有意义的, 在这个约定中, 查看的是一个特定订阅到源所接收到的事件.
大多数时候, 可以忽略这个微妙之处, 悄悄地使用适合任何约定.
套用 矮胖子(Humpty Dumpty)的话: 当使用弹珠图时, 它的意思就是选择它的意思–不多也不少. 但是当将热和冷源组合在一起时, 可能没有一种明显的最佳方式在弹珠图中表示这种情况. 甚至可以这样做, 将" 热" 所代表的事件与对" 热" 的特定订阅所看到的事件分开描述.
在弹珠图中使用一个单独的 通道 来表示对源的特定订阅所看到的事件.
使用这种技术, 还可以展示如果将相同的冷源两次传递给
Concat会发生什么:这突出了一个事实, 即作为冷源, 冷 为每个订阅分别提供元素. 从相同的源看到相同的三个值, 但在不同的时间.
连接多个源
如果想连接两个以上的序列怎么办?
Concat有一个重载, 接受多个可观察序列作为数组.这个重载用
params关键字注释, 所以不需要显式地构造数组.可以只传递任意数量的参数, C# 编译器会生成创建数组的代码.
还有一个重载接受
IEnumerable<IObservable<T>>public static IObservable<TSource> Concat<TSource>( params IObservable<TSource>[] sources) public static IObservable<TSource> Concat<TSource>( this IEnumerable<IObservable<TSource>> sources)IEnumerable<IObservable<T>>重载会延迟执行sources.在有人订阅
Concat返回的可观察对象之前, 它不会开始请求源可观察对象, 并且只有当当前源完成(意味着它准备开始下一个)时, 它才会再次调用生成的IEnumerator<IObservable<T>>上的MoveNext.为了说明这一点, 下面的示例是一个迭代器方法, 它返回一个序列的序列, 并穿插了日志记录.
它返回三个可观察序列, 每个序列都有一个单个值
[1],[2]和[3]. 每个序列在计时器延迟后返回其值.public IEnumerable<IObservable<long>> GetSequences() { Console.WriteLine("GetSequences() called"); Console.WriteLine("Yield 1st sequence"); yield return Observable.Create<long>(o => { Console.WriteLine("1st subscribed to"); return Observable.Timer(TimeSpan.FromMilliseconds(500)) .Select(i => 1L) .Finally(() => Console.WriteLine("1st finished")) .Subscribe(o); }); Console.WriteLine("Yield 2nd sequence"); yield return Observable.Create<long>(o => { Console.WriteLine("2nd subscribed to"); return Observable.Timer(TimeSpan.FromMilliseconds(300)) .Select(i => 2L) .Finally(() => Console.WriteLine("2nd finished")) .Subscribe(o); }); Thread.Sleep(1000); // 强制延迟 Console.WriteLine("Yield 3rd sequence"); yield return Observable.Create<long>(o => { Console.WriteLine("3rd subscribed to"); return Observable.Timer(TimeSpan.FromMilliseconds(100)) .Select(i=>3L) .Finally(() => Console.WriteLine("3rd finished")) .Subscribe(o); }); Console.WriteLine("GetSequences() complete"); }可以调用这个
GetSequences方法并将结果传递给Concat, 然后使用Dump扩展方法来查看发生了什么:GetSequences().Concat().Dump("Concat");以下是输出:
GetSequences() called Yield 1st sequence 1st subscribed to Concat-->1 1st finished Yield 2nd sequence 2nd subscribed to Concat-->2 2nd finished Yield 3rd sequence 3rd subscribed to Concat-->3 3rd finished GetSequences() complete Concat completed下面是
Concat操作符应用于GetSequences方法的弹珠图. "s1" , "s2" 和"s3" 分别代表序列1, 2和3. "rs" 代表结果序列.应该注意到, 一旦迭代器执行了它的第一个
yield return以返回第一个序列, 迭代器在第一个序列完成之前不会继续.迭代器在第一个
yield return之后立即调用Console.WriteLine来显示文本" Yield 2nd sequence", 可以看到该消息直到看到Concat-->1消息(显示Concat的第一个输出)以及1st finished消息(由Finally操作符产生, 该操作符仅在第一个序列完成后运行)才出现. (代码还使第一个源在产生其值之前延迟500毫秒, 所以如果运行这个, 可以看到一切都会停止一会儿, 直到第一个源产生其单个值然后完成. )一旦第一个源完成,
GetSequences方法继续(因为Concat会在第一个可观察源完成后向它请求下一个项目).当
GetSequences用另一个yield return提供第二个序列时,Concat订阅它, 并且再次GetSequences在第二个可观察序列完成之前不会有进一步的进展.当被要求提供第三个序列时, 迭代器本身等待一秒钟, 然后产生第三个也是最后一个值, 可以从图中
s2结束和s3开始之间的间隙看到这一点.前置操作(Prepend)
前置操作是
Concat支持一种特定的场景, 但方式有点繁琐.有时创建一个总是立即发出一些初始值的序列会很有用.
以在本书中经常使用的例子为例, 船舶发送AIS消息来报告其位置和其他信息:
在某些应用程序中, 可能不想等到船舶下次发送消息.
可以想象一个应用程序, 它记录任何船只的最后已知位置.
这将使应用程序能够提供, 比如说, 一个
IObservable<IVesselNavigation>, 它在订阅时立即报告最后已知信息, 然后如果船只产生任何新消息, 它将继续提供这些消息.如何实现这一点呢? 最初想要冷源般的行为, 但要过渡到热行为.
所以可以连接两个源. 可以使用
Observable.Return创建一个单元素冷源, 然后将其与实时流连接:IVesselNavigation lastKnown = ais.GetLastReportedNavigationForVessel(mmsi); IObservable<IVesselNavigation> live = ais.GetNavigationMessagesForVessel(mmsi); IObservable<IVesselNavigation> lastKnownThenLive = Observable.Concat( Observable.Return(lastKnown), live);这是一个足够常见的需求, Rx提供了
Prepend来实现类似的效果. 可以用以下代码替换最后一行:IObservable<IVesselNavigation> lastKnownThenLive = live.Prepend(lastKnown);这个可观察对象将完全做同样的事情: 订阅者将立即收到
lastKnown, 然后如果船只发出进一步的导航消息, 他们也将收到这些消息.顺便说一下, 对于这个场景, 可能还希望确保 最后已知 消息的查找尽可能晚地进行.
可以使用
Defer在订阅时延迟执行此操作:public static IObservable<IVesselNavigation> GetLastKnownAndSubsequenceNavigationForVessel(uint mmsi) { return Observable.Defer<IVesselNavigation>(() => { // 此lambda每次有人订阅时都会运行. IVesselNavigation lastKnown = ais.GetLastReportedNavigationForVessel(mmsi); IObservable<IVesselNavigation> live = ais.GetNavigationMessagesForVessel(mmsi); return live.Prepend(lastKnown); }); }StartWith可能会让人想起 第三章的BehaviorSubject<T>, 因为它也确保消费者在订阅时立即收到一个值.但它们并不完全相同:
BehaviorSubject<T>会缓存其自身源发出的最后一个值.可能认为这会使它成为实现此船只导航示例的更好方法.
然而, 由于此示例能够为任何船只返回一个源(
mmsi参数是唯一标识一艘船只的 海上移动业务识别码), 它需要为每个感兴趣的船只保持一个BehaviorSubject<T>运行, 这可能不太实际.BehaviorSubject<T>只能保存一个值, 这对于此AIS场景来说没问题,Prepend也有此限制. 但是, 如果需要一个源以某个特定序列开始呢?起始操作(StartWith)
StartWith是Prepend的泛化, 它使能够在订阅时立即提供任意数量的值的发出. 与Prepend一样, 它随后将转发源产生的任何进一步通知.从其签名可以看出, 此方法接受一个
params数组的值, 因此可以根据需要传递任意多个或少量的值:// 在可观察序列前添加一系列值. public static IObservable<TSource> StartWith<TSource>( this IObservable<TSource> source, params TSource[] values)还有一个重载接受一个
IEnumerable<T>. 请注意, Rx不会延迟对其的枚举.StartWith会在返回之前立即将IEnumerable<T>转换为数组.StartWith不是常见的LINQ操作符, 它的存在是Rx特有的. 如果想象一下StartWith在LINQ to Objects中会是什么样子, 它与 [Concat](https://learn.microsoft.com/en-us/dotnet/api/system.linq.enumerable.concat) 不会有太大区别.在Rx中存在差异是因为
StartWith有效地在拉取和推送世界之间架起了桥梁.它有效地将提供的项目转换为一个可观察对象, 然后将
source参数连接到该对象上.追加操作(Append)
Prepend的存在可能会让我们想知道是否有一个Append用于将单个项目添加到任何IObservable<T>的末尾.毕竟, 这是一个常见的LINQ操作符; 例如, LINQ to Objects有一个
Append实现.Rx确实提供了这样的功能:
IObservable<string> oneMore = arguments.Append("And another thing...");没有相应的
EndWith. 没有根本原因表明不能有这样的东西, 只是显然没有太多需求–Rx尚未收到相关功能请求.因此, 尽管
Prepend和Append的对称性确实表明在StartWith和尚未存在的假设性EndWith之间可能存在类似的对称性, 但缺少这个对应物似乎并没有造成任何问题.能够创建始终立即产生有用输出的可观察源显然是有价值的; 除了满足对对称性的渴望之外, 不清楚
EndWith会有什么用处.默认值操作(DefaultIfEmpty)
接下来要研究的下一个操作符并不严格执行顺序组合.
然而, 它与
Append和Prepend密切相关.与那些操作符一样, 它会发出其源所做的一切.
并且与那些操作符一样,
DefaultIfEmpty接受一个额外的项目. 不同之处在于它不会总是发出那个额外的项目.Prepend在开头发出其额外项目,Append在结尾发出其额外项目, 而DefaultIfEmpty仅在源完成且未产生任何内容时才发出额外项目.因此, 这提供了一种确保可观察对象不为空的方法.
不必为
DefaultIfEmpty提供一个值. 如果使用不提供此值的重载, 它将仅使用default(T).对于结构体类型, 这将是一个类似零的值, 对于引用类型, 这将是
null.重复操作(Repeat)
最后一个顺序组合序列的操作符是
Repeat. 它允许简单地重复一个序列. 它提供了重载, 可以在其中指定重复输入的次数, 以及一个无限重复的重载:// 重复可观察序列指定次数. public static IObservable<TSource> Repeat<TSource>( this IObservable<TSource> source, int repeatCount) // 无限且顺序地重复可观察序列. public static IObservable<TSource> Repeat<TSource>( this IObservable<TSource> source)Repeat会为每次重复重新订阅源. 这意味着只有当源每次订阅时都产生相同的项目时, 它才会严格重复.与
ReplaySubject<T>不同, 它不会存储和重放源产生的项目.这意味着通常不会想对热源调用
Repeat. (如果真的想要热源输出的重复,Replay和Repeat的组合可能符合要求.)如果使用无限重复的重载, 那么序列只有在出现错误或订阅被释放时才会停止.
指定重复次数的重载将在出现错误, 取消订阅或达到该次数时停止. 以下示例展示了序列
[0,1,2]被重复三次:var source = Observable.Range(0, 3); var result = source.Repeat(3); result.Subscribe( Console.WriteLine, () => Console.WriteLine("Completed"));输出:
#+end_src0 1 2 0 1 2 0 1 2 Completed
#+end_src并发序列
现在将转向用于组合可能并发产生值的可观察序列的操作符.
模糊选择操作(Amb)
Amb是一个名字奇怪的操作符. 它是" ambiguous" (模糊的)的缩写, 但这并没有比Amb本身告诉更多信息.如果对这个名字感到好奇, 可以在附录D中阅读
Amb的起源, 但现在, 让看看它实际做什么.Rx的
Amb接受任意数量的IObservable<T>源作为输入, 并等待查看哪个(如果有的话)首先产生某种输出.一旦发生这种情况, 它会立即取消订阅所有其他源, 并转发来自首先响应的源的所有通知.
为什么这很有用呢?
Amb的一个常见用例是当想要尽快产生某种结果, 并且有多个获取该结果的选项, 但事先不知道哪个最快时.也许有多个服务器都可能给想要的答案, 并且无法预测哪个响应时间最短.
可以向它们全部发送请求, 然后只使用第一个响应的.
如果将每个单独的请求建模为自己的
IObservable<T>,Amb可以处理这个问题.请注意, 这不是非常高效: 要求几个服务器都做同样的工作, 并且将丢弃大多数服务器的结果. (由于
Amb一旦第一个源响应就取消订阅它不会使用的所有源, 所以可能能够向所有其他服务器发送消息以取消请求. 但这仍然有点浪费. )但是在某些情况下, 及时性至关重要, 对于这些情况, 为了产生更快的结果, 容忍一点浪费的努力可能是值得的.
Amb大致类似于Task.WhenAny, 因为它检测多个源中的第一个何时做某事.然而, 这种类比并不精确.
Amb自动取消订阅所有其他源, 确保一切都被清理干净.对于
Task, 应该始终确保最终观察所有任务, 以防其中任何一个出错.为了说明
Amb的行为, 下面是一个弹珠图, 展示了三个序列s1,s2和s3, 每个序列都能够产生一系列值.标记为
r的线显示了将所有三个序列传递给Amb的结果.r提供的通知与s1完全相同, 因为在这个例子中,s1是第一个产生值的序列.以下代码创建了与该弹珠图中描述的完全相同的情况, 以验证这确实是
Amb的行为:var s1 = new Subject<int>(); var s2 = new Subject<int>(); var s3 = new Subject<int>(); var result = Observable.Amb(s1, s2, s3); result.Subscribe( Console.WriteLine, () => Console.WriteLine("Completed")); s1.OnNext(1); s2.OnNext(99); s3.OnNext(8); s1.OnNext(2); s2.OnNext(88); s3.OnNext(7); s2.OnCompleted(); s1.OnNext(3); s3.OnNext(6); s1.OnNext(4); s1.OnCompleted(); s3.OnCompleted();输出:
1 2 3 4 Completed如果改变顺序, 使
s2.OnNext(99)在调用s1.OnNext(1)之前发生, 那么s2将首先产生值, 弹珠图将如下所示.Amb有几个重载. 前面的示例使用了接受一个params数组序列的重载. 还有一个重载接受恰好两个源, 避免了使用params时发生的数组分配.最后, 可以传入一个
IEnumerable<IObservable<T>>. (请注意, 没有接受IObservable<IObservable<T>>的重载.Amb要求它监视的所有源可观察对象都预先提供. )// 传播首先响应的可观察序列. public static IObservable<TSource> Amb<TSource>( this IObservable<TSource> first, IObservable<TSource> second) {...} public static IObservable<TSource> Amb<TSource>( params IObservable<TSource>[] sources) {...} public static IObservable<TSource> Amb<TSource>( this IEnumerable<IObservable<TSource>> sources) {...}重用
Concat部分中的GetSequences方法, 看到Amb在订阅它返回的任何序列之前完全评估外部(IEnumerable)序列.GetSequences().Amb().Dump("Amb");输出:
GetSequences() called Yield 1st sequence Yield 2nd sequence Yield 3rd sequence GetSequences() complete 1st subscribed to 2nd subscribed to 3rd subscribed to Amb-->3 Amb completed以下是说明此代码行为的弹珠图:
请记住,
GetSequences一旦被请求就立即产生它的前两个可观察对象, 然后等待1秒, 然后产生第三个也是最后一个.但是与
Concat不同,Amb在从迭代器检索到所有源之前不会订阅任何源, 这就是为什么这个弹珠图显示所有三个源的订阅在1秒后开始. (前两个源更早可用–Amb一旦订阅发生就会开始枚举源, 但它等到有了所有三个源才订阅, 这就是为什么它们都出现在右边. )第三个序列在订阅和产生其值之间的延迟最短, 所以尽管它是最后返回的可观察对象, 但它能够最快地产生其值, 即使在它之前一秒有两个序列产生(由于
Thread.Sleep).合并操作(Merge)
Merge扩展方法接受多个序列作为输入. 任何时候这些输入序列中的任何一个产生一个值,Merge返回的可观察对象就会产生相同的值.如果输入序列在不同线程上同时产生值,
Merge会安全地处理这种情况, 确保一次传递一个项目.由于
Merge返回一个包含其所有输入序列的值的单个可观察序列, 在某种意义上它类似于Concat.但是,
Concat等待每个输入序列完成后再继续下一个, 而Merge支持并发活动的序列.一旦订阅了
Merge返回的可观察对象, 它会立即订阅其所有输入, 转发它们产生的任何内容.这个弹珠图展示了两个并发运行的序列
s1和s2,r展示了用Merge组合它们的效果: 两个源序列的值都从合并后的序列中出现.Merge的结果只有在所有输入序列完成后才会完成.然而, 如果任何输入序列错误终止(此时它将取消订阅其所有其他输入),
Merge操作符将出错.阅读创建可观察对象章节, 这是一个
Merge的示例.在Rx中表示文件系统事件部分的末尾使用它将表示
FileSystemWatcher提供的各种事件的单个序列组合成一个流.再举一个例子, 让再次看看AIS. 没有公开可用的单个全局源可以将全球所有的AIS消息作为
IObservable<IAisMessage>提供. 任何单个源可能只覆盖一个区域, 甚至可能只是一个AIS接收器. 使用Merge, 可以很容易地将这些组合成一个源:IObservable<IAisMessage> station1 = aisStations.GetMessagesFromStation("AdurStation"); IObservable<IAisMessage> station2 = aisStations.GetMessagesFromStation("EastbourneStation"); IObservable<IAisMessage> allMessages = station1.Merge(station2);如果想组合两个以上的源, 有几个选择:
- 将
Merge操作符链接在一起, 例如s1.Merge(s2).Merge(s3) - 将一个
params数组的序列传递给Observable.Merge静态方法, 例如Observable.Merge(s1,s2,s3) - 将
Merge操作符应用于一个IEnumerable<IObservable<T>> - 将
Merge操作符应用于一个IObservable<IObservable<T>>
重载如下所示:
/// 将两个可观察序列合并为一个可观察序列. /// 返回一个合并给定序列元素的序列. public static IObservable<TSource> Merge<TSource>( this IObservable<TSource> first, IObservable<TSource> second) {...} // 将所有可观察序列合并为一个可观察序列. // 合并可观察序列元素的可观察序列. #+begin_src csharp :results pp :exports both public static IObservable<TSource> Merge<TSource>( params IObservable<TSource>[] sources) {...} // 将一个可枚举的可观察序列集合合并为一个可观察序列. public static IObservable<TSource> Merge<TSource>( this IEnumerable<IObservable<TSource>> sources) {...} // 将一个可观察的可观察序列序列合并为一个可观察序列. // 将内部序列的所有元素合并到输出序列中. public static IObservable<TSource> Merge<TSource>( this IObservable<IObservable<TSource>> sources) {...}随着被合并的源数量增加, 接受集合的操作符相对于第一个重载具有优势. (即,
s1.Merge(s2).Merge(s3)的性能略逊于Observable.Merge(new[] { s1, s2, s3 }), 或等效的Observable.Merge(s1, s2, s3). )然而, 对于只有三个或四个源, 差异很小, 所以在实践中, 可以根据自己喜欢的风格在第一个两个重载之间进行选择. (如果要合并100个源或更多, 差异会更明显, 但到那时, 可能无论如何都不想使用链式调用风格了. )第三个和第四个重载允许合并可以在运行时延迟评估的序列.
最后一个接受序列的序列的
Merge重载特别有趣, 因为它使得被合并的源集合可以随着时间增长.只要代码仍然订阅
Merge返回的IObservable<T>,Merge就会一直订阅sources. 所以如果sources随着时间发出越来越多的IObservable<T>, 这些都会被Merge包含.这可能听起来很熟悉.
SelectMany操作符 能够将多个可观察源展平回一个可观察源.这只是另一个说明为什么将
SelectMany描述为Rx中的基本操作符的例子:严格来说, 不需要Rx提供给很多操作符, 因为可以使用
SelectMany构建它们. 以下是使用SelectMany对最后一个Merge重载的简单重新实现:public static IObservable<T> MyMerge<T>(this IObservable<IObservable<T>> sources) => sources.SelectMany(source => source);除了说明从技术上讲不需要Rx为提供最后一个
Merge之外, 这也是一个很好的例子, 说明为什么它提供这个操作符是有帮助的.它的作用不是很明显. 为什么要传递一个只返回其参数的lambda? 除非以前见过这个, 否则可能需要一些思考才能明白
SelectMany期望传递一个回调, 它会为每个传入的项目调用这个回调, 但输入项目已经是嵌套序列, 所以可以直接返回每个项目, 然后SelectMany会将其产生的所有内容合并到其输出流中.即使已经完全理解了
SelectMany, 以至于马上知道这只会展平sources, 可能仍然会发现Observable.Merge(sources)是更直接的意图表达. (此外, 由于Merge是一个更专门的操作符, Rx能够提供比上面显示的SelectMany版本稍微更高效的实现. )如果再次重用
GetSequences方法, 可以看到Merge操作符如何处理一个序列的序列.GetSequences().Merge().Dump("Merge");输出:
GetSequences() called Yield 1st sequence 1st subscribed to Yield 2nd sequence 2nd subscribed to Merge --> 2 Merge --> 1 Yield 3rd sequence 3rd subscribed to GetSequences() complete Merge --> 3 Merge completed从弹珠图中可以看出, s1和s2立即被产生并订阅. s3在一秒后才被产生并订阅. 一旦所有输入序列完成, 结果序列就完成.
对于每个接受可变数量源(通过数组,
IEnumerable<IObservable<T>>或IObservable<IObservable<T>>)的Merge重载, 都有一个额外的重载添加了一个maxconcurrent参数. 例如:public static IObservable<TSource> Merge<TSource>(this IEnumerable<IObservable<TSource>> sources, int maxConcurrent)这使能够限制
Merge在任何单个时间接受输入的源的数量. 如果可用源的数量超过maxConcurrent(要么是因为传入了一个包含更多源的集合, 要么是因为使用了基于IObservable<IObservable<T>的重载并且源发出的嵌套源比maxConcurrent多),Merge将等待现有源完成后再处理新的源.maxConcurrent为1时,Merge的行为与Concat相同.- 将
切换操作(Switch)
Rx的
Switch操作符接受一个IObservable<IObservable<T>>, 并从最近的嵌套可观察对象产生通知.每次其源产生一个新的嵌套
IObservable<T>,Switch取消订阅前一个嵌套源(除非这是第一个源, 在这种情况下不会有前一个源)并订阅最新的一个.Switch可用于日历应用程序中的 该出发了 类型的功能. 实际上, 可以在 必应如何提供(或者至少曾经提供; 实现可能已经改变)告诉你该出发去赴约的通知的源代码 中看到.由于这是从一个真实示例派生的, 所以有点复杂, 所以在这里只描述其本质.
该出发了 通知的基本思想是, 使用地图和路线查找服务来计算到达约会地点的预期行程时间, 并使用
Timer操作符 创建一个IObservable<T>, 当该出发时它将产生一个通知. (具体来说, 此代码产生一个IObservable<TrafficInfo>, 它报告旅程的建议路线和预期旅行时间. )然而, 有两件事情可能会改变, 使初始预测的行程时间变得无用.首先, 交通状况可能会改变. 当用户创建他们的约会时, 必须根据一天中相关时间的正常交通流量来猜测预期行程时间. 然而, 如果当天交通非常拥堵, 估计值将需要向上修订, 将需要更早地通知用户.
另一件可能改变的事情是用户的位置. 这显然也会影响预测的行程时间.
为了处理这个问题, 系统需要可观察源来报告用户位置的变化以及影响建议行程的交通状况的变化.
每次这些中的任何一个报告变化时, 将需要产生一个新的估计行程时间, 以及一个新的
IObservable<TrafficInfo>, 当该出发时它将产生一个通知.每次修订估计时, 希望放弃之前创建的
IObservable<TrafficInfo>. (否则, 用户将收到大量令人困惑的通知, 告诉他们出发, 每次重新计算行程时间都会有一个通知. )只想使用最新的一个. 而这正是Switch所做的.可以在 Reaqtor存储库中查看该场景的示例.
在这里, 将展示一个不同的, 更简单的场景: 实时搜索. 当输入时, 文本被发送到搜索服务, 结果作为可观察序列返回.
大多数实现会在发送请求之前有一个轻微的延迟, 以避免不必要的工作. 想象想搜索" Rx入门" . 快速输入" Into to" , 然后意识到漏了字母" r" . 短暂停顿并将文本更改为" Intro ".
到目前为止, 已经向服务器发送了两个搜索. 第一个搜索将返回不想要的结果.
此外, 如果收到第一个搜索的结果与第二个搜索的结果合并在一起, 这对用户来说将是非常奇怪的体验. 真的只想要与最新搜索文本对应的结果. 这个场景非常适合
Switch方法.在这个例子中, 有一个
IObservable<string>源表示搜索文本–用户输入的每个新值都从这个源序列中出现. 还有一个搜索函数, 它为给定的搜索词产生一个单一的搜索结果:private IObservable<string> SearchResults(string query) { ... }这个函数只返回一个值, 但将其建模为一个
IObservable<string>, 部分原因是处理执行搜索可能需要一些时间的事实, 并且也为了能够与Rx一起使用.可以获取搜索词源, 然后使用
Select将每个新搜索值传递给这个SearchResults函数. 这创建了结果嵌套序列IObservable<IObservable<string>>.假设然后使用
Merge来处理结果:IObservable<string> searchValues =....; IObservable<IObservable<string>> search = searchValues.Select(searchText => SearchResults(searchText)); var subscription = search .Merge() .Subscribe(Console.WriteLine);如果幸运的话, 每个搜索在
searchValues产生下一个元素之前完成, 输出看起来会合理.然而, 更有可能的是, 多个搜索会导致重叠的搜索结果. 这个弹珠图展示了在这种情况下
Merge函数可能会做什么.注意搜索结果的值是如何混合在一起的. 一些搜索词比其他搜索词花费更长时间才能得到搜索结果, 这也意味着它们的顺序是错误的. 这不是想要的.
如果使用
Switch扩展方法, 将得到更好的结果.Switch将订阅外部序列, 并且每当产生一个内部序列时, 它将订阅新的内部序列并取消对前一个内部序列的订阅. 这将导致以下弹珠图:现在, 每次新的搜索词到达, 导致新的搜索开始, 一个对应于该搜索结果的新的
IObservable<string>出现, 导致Switch取消对前一个结果的订阅.这意味着任何来得太晚的结果(即, 当结果是针对不再在搜索框中的搜索词时)将被丢弃. 碰巧的是, 在这个特定的例子中, 这意味着只看到最后一个搜索词的结果.
在用户输入时看到的所有中间值都没有停留很长时间, 因为用户在收到前一个值的结果之前不断按下下一个键.
只有在最后, 当用户停止输入足够长时间以至于搜索结果在过时之前回来时, 才最终从
Switch看到一个值. 最终的效果是消除了令人困惑的过时结果.这是另一个弹珠图的模糊性导致轻微问题的例子. 已经展示了每个对
SearchResults的调用产生的单值可观察对象, 但在实践中,Switch在除了最后一个之外的所有这些可观察对象有机会产生值之前就取消了对它们的订阅. 所以这个图显示的是那些源可能产生的值, 而不是它们作为订阅的一部分实际传递的值, 因为订阅被提前终止了.
配对序列
前面的方法允许将共享公共类型的多个序列展平为相同类型的结果序列(具有各种决定包含什么和丢弃什么的策略).
本节中的操作符仍然接受多个序列作为输入, 但尝试将每个序列中的值配对, 以为输出序列产生单个值. 在某些情况下, 它们还允许提供不同类型的序列.
拉链操作(Zip)
Zip组合来自两个序列的成对项目. 所以它的第一个输出是通过组合一个输入的第一个项目与另一个输入的第一个项目创建的.第二个输出组合每个输入的第二个项目. 依此类推. 这个名字旨在让人联想到衣服或袋子上的拉链, 它一次将拉链的每一半的齿组合在一起一对.
由于
Zip严格按照顺序组合成对项目, 它将在第一个序列完成时完成.如果其中一个序列到达末尾, 那么即使另一个序列继续发出值, 也将没有任何东西与这些值配对, 所以
Zip此时将取消订阅, 丢弃无法配对的值, 并报告完成.如果任何一个序列产生错误,
Zip返回的序列将报告相同的错误.如果一个源序列比另一个序列更快地发布值, 发布的速率将由 较慢的 序列决定, 因为它只有在从每个源都有一个元素时才能发出一个组合元素.
以下是一个示例:
// 生成值0,1,2 var nums = Observable.Interval(TimeSpan.FromMilliseconds(250)) .Take(3); // 生成值a,b,c,d,e,f var chars = Observable.Interval(TimeSpan.FromMilliseconds(150)) .Take(6) .Select(i => Char.ConvertFromUtf32((int)i + 97)); // 将值组合在一起 nums.Zip(chars, (lhs, rhs) => (lhs, rhs))) .Dump("Zip");其效果可以在下面的弹珠图中看到:
以下是代码的实际输出:
#+end_src{ Left = 0, Right = a } { Left = 1, Right = b } { Left = 2, Right = c }
#+end_src请注意,
nums序列在完成之前只产生了三个值, 而chars序列产生了六个值. 结果序列产生了三个值, 这是它能够配对的对数.值得注意的是,
Zip有一个第二个重载, 它接受一个IEnumerable<T>作为第二个输入序列.// 将一个可观察序列和一个可枚举序列合并为一个可观察序列, // 通过使用选择器函数将元素成对组合的结果包含在其中. public static IObservable<TResult> Zip<TFirst, TSecond, TResult>( this IObservable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector) {...}这允许将来自
IEnumerable<T>和IObservable<T>范式的序列进行拉链操作!序列相等操作(SequenceEqual)
还有另一个操作符处理来自两个源的成对项目:
SequenceEqual. 但是, 它不是为每对输入产生一个输出, 而是比较每对项目, 并最终产生一个单个值, 指示每对输入是否相等.在源产生不同值的情况下,
SequenceEqual一旦检测到这种情况就会产生一个单个的false值. 但是如果源相等, 它只能在两个源都完成时报告这一点, 因为在那之前, 它不知道是否可能稍后会有差异. 以下是一个示例说明其行为:var subject1 = new Subject<int>(); subject1.Subscribe( i => Console.WriteLine($"subject1.OnNext({i})"), () => Console.WriteLine("subject1 completed")); var subject2 = new Subject<int>(); subject2.Subscribe( i => Console.WriteLine($"subject2.OnNext({i})"), () => Console.WriteLine("subject2 completed")); var areEqual = subject1.SequenceEqual(subject2); areEqual.Subscribe( i => Console.WriteLine($"areEqual.OnNext({i})"), () => Console.WriteLine("areEqual completed")); subject1.OnNext(1); subject1.OnNext(2); subject2.OnNext(1); subject2.OnNext(2); subject2.OnNext(3); subject1.OnNext(3); subject1.OnCompleted(); subject2.OnCompleted();输出:
subject1.OnNext(1) subject1.OnNext(2) subject2.OnNext(1) subject2.OnNext(2) subject2.OnNext(3) subject1.OnNext(3) subject1 completed subject2 completed areEqual.OnNext(True) areEqual completed组合最新值操作(CombineLatest)
CombineLatest操作符与Zip类似, 它组合来自其源的成对项目. 然而, 它不是配对第一个项目, 然后是第二个项目, 依此类推,CombineLatest在其任何一个输入产生新值时产生一个输出.对于从一个输入中出现的每个新值,
CombineLatest使用该值以及来自另一个输入的最近看到的值.(确切地说, 它在每个输入至少产生一个值之前不会产生任何东西, 所以如果一个输入比另一个输入花费更长时间开始, 在其中一个输入产生其第一个值之前, 会有一段时间
CombineLatest实际上不会在每次其一个输入产生值时产生输出, 因为它在等待另一个输入产生其第一个值. )签名如下:// 通过使用选择器函数, 每当一个可观察序列产生一个元素时, // 将两个可观察序列组合为一个可观察序列. public static IObservable<TResult> CombineLatest<TFirst, TSecond, TResult>( this IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector) {...}下面的弹珠图展示了
CombineLatest与一个产生数字的序列(s1)和另一个产生字母的序列(s2)的用法.如果
resultSelector函数只是将数字和字母连接在一起作为一对, 这将产生底部行所示的结果.已经对每个输出进行了颜色编码, 以指示是哪个两个源导致它发出那个特定的结果, 每个输出都包括来自每个源的值.
如果慢慢浏览上面的弹珠图, 首先看到
s2产生字母" a".s1还没有产生任何值, 所以没有东西可以配对, 这意味着结果中不会产生值.接下来,
s1产生数字" 1" , 所以结果序列现在可以产生一对" 1,a" .然后从
s1收到数字" 2" . 最后一个字母仍然是" a" , 所以下一对是" 2,a" .然后字母" b" 被产生, 创建了对" 2,b" , 接着是" c" , 给出" 2,c" . 最后数字3被产生, 得到对" 3,c" .
这在需要评估某个状态的组合并且当该状态的任何单个组件发生变化时需要保持最新的情况下非常有用.
一个简单的例子是一个监控系统. 每个服务由一个序列表示, 该序列返回一个布尔值, 指示所述服务的可用性.
如果所有服务都可用, 监控状态为绿色; 可以通过让结果选择器执行逻辑与来实现这一点.
以下是一个示例:
IObservable<bool> webServerStatus = GetWebStatus(); IObservable<bool> databaseStatus = GetDBStatus(); // 当两个系统都启动时产生true. var systemStatus = webServerStatus .CombineLatest( databaseStatus, (webStatus, dbStatus) => webStatus && dbStatus);这个方法可能会产生很多重复的值. 例如, 如果Web服务器宕机, 结果序列将产生"
false" . 如果数据库随后宕机, 将产生另一个(不必要的)"false" 值.这将是使用
DistinctUntilChanged扩展方法的合适时机. 更正后的代码如下所示:// 当两个系统都启动时产生true, 并且仅在状态更改时产生. var systemStatus = webServerStatus .CombineLatest( databaseStatus, (webStatus, dbStatus) => webStatus && dbStatus) .DistinctUntilChanged();连接操作(Join)
Join操作符允许逻辑地连接两个序列. 而Zip操作符会根据项目在序列中的位置将两个序列的值配对,Join操作符允许根据元素的发射时间连接序列.由于可观察源产生值在逻辑上是一个瞬时事件, 连接使用相交窗口模型.
回想一下, 使用
Window操作符, 可以使用一个可观察序列定义每个窗口的持续时间.Join操作符使用类似的概念: 对于每个源, 可以定义一个时间窗口, 在该窗口内每个元素被认为是" 当前的" , 并且如果来自不同源的两个元素的时间窗口重叠, 它们将被连接.与
Zip操作符一样, 还需要提供一个选择器函数来从每对值中产生结果项目. 以下是Join操作符:public static IObservable<TResult> Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult> ( this IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, TRight, TResult> resultSelector )这是一个复杂的签名, 一下子理解起来有点困难, 所以让逐个参数来看.
IObservable<TLeft> left是第一个源序列.IObservable<TRight> right 是第二个源序列. ~Join旨在产生成对的项目, 每对包含来自left的一个元素和来自right的一个元素.leftDurationSelector参数使能够为来自left的每个项目定义时间窗口. 源项目的时间窗口在源发出该项目时开始.为了确定来自
left的项目的窗口何时关闭,Join将调用leftDurationSelector, 传入left刚刚产生的值.这个选择器必须返回一个可观察源. (这个源的元素类型完全不重要, 因为
Join只关心它何时执行操作. )项目的时间窗口在leftDurationSelector为该项目返回的源产生一个值或完成时立即结束.rightDurationSelector参数为来自right的每个项目定义时间窗口. 它的工作方式与leftDurationSelector完全相同.最初, 没有当前项目. 但是随着
left和right产生项目, 这些项目的窗口将开始, 所以Join可能会有多个项目, 它们的窗口当前都打开着.每次
left产生一个新项目时,Join会查看是否有来自right的项目仍然有打开的窗口.如果有,
left现在将与它们中的每一个配对. (所以一个源的单个项目可能与另一个源的多个项目配对. )Join会为每个这样的配对调用resultSelector. 同样, 每次right产生一个项目时, 如果有来自left的项目当前有打开的窗口, 那么来自right的新项目将与这些项目中的每一个配对, 并且再次,resultSelector将为每个这样的配对被调用.Join返回的可观察对象产生每次调用resultSelector的结果.现在让想象一个场景, 其中左序列产生值的速度是右序列的两倍.
想象一下, 此外从不关闭左窗口; 可以通过总是从
leftDurationSelector函数返回Observable.Never<Unit>()来做到这一点.并且想象使右窗口尽快关闭, 可以通过使
rightDurationSelector返回Observable.Empty<Unit>()来实现这一点. 下面的弹珠图说明了这一点:每次左持续时间窗口与右持续时间窗口相交时, 就会得到一个输出.
右持续时间窗口实际上都是零长度, 但这并不妨碍它们与左持续时间窗口相交, 因为那些窗口都永远不会结束.
所以来自right的第一个项目有一个(零长度)窗口, 它落在left项目的两个窗口内, 所以
Join产生两个结果.在图中将它们垂直堆叠显示, 以表明它们实际上几乎同时发生.
当然,
IObserver<T>的规则意味着它们实际上不能同时发生:Join必须等到消费者的OnNext处理完0,A后才能继续产生1,A. 但是, 只要一个源的单个事件与另一个源的多个窗口重叠, 它就会尽快产生所有配对.如果也立即通过返回
Observable.Empty<Unit>或也许Observable.Return(0)来关闭左窗口, 那么窗口将永远不会重叠, 所以永远不会产生任何配对. (理论上, 如果left和right同时产生项目, 那么也许可能会得到一个配对, 但由于事件的时间永远不会绝对精确, 设计一个依赖于此的系统将是一个坏主意. )如果想确保来自
right的项目只与来自left的单个值相交呢? 在那种情况下, 需要确保左持续时间不重叠.一种方法是让的
leftDurationSelector总是返回与作为left序列传递的相同序列.这将导致
Join对同一源进行多次订阅, 对于某些类型的源, 这可能会引入不必要的副作用, 但是Publish和RefCount操作符提供了一种处理这种情况的方法, 所以这实际上是一个合理的策略. 如果这样做, 结果看起来更像这样:最后一个例子与
CombineLatest非常相似, 除了它只在右序列改变时产生一个配对. 可以通过改变右持续时间使其与左持续时间工作方式相同来轻松使其工作方式相同. 以下代码展示了如何做到这一点(包括使用Publish和RefCount来确保尽管多次将底层left和right源提供给Join, 但只对其进行一次订阅).public static IObservable<TResult> MyCombineLatest<TLeft, TRight, TResult> ( IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, TRight, TResult> resultSelector ) { var refcountedLeft = left.Publish().RefCount(); var refcountedRight = right.Publish().RefCount(); return Observable.Join( refcountedLeft, refcountedRight, value => refcountedLeft, value => refcountedRight, resultSelector); }显然没有必要编写这个–可以直接使用内置的
CombineLatest. (而且它会稍微更高效, 因为它有一个专门的实现. )但这表明Join是一个强大的操作符.分组连接操作(GroupJoin)
当
Join操作符将窗口重叠的值配对时, 它会将标量值left和right传递给resultSelector.GroupJoin操作符基于相同的重叠窗口概念, 但其选择器的工作方式略有不同:GroupJoin仍然从左源传递一个单个(标量)值, 但它将一个IObservable<TRight>作为第二个参数传递. 这个参数表示在为其调用的特定左值的窗口内发生的右序列的所有值.所以这缺乏
Join的对称性, 因为左和右源的处理方式不同.GroupJoin会为left源产生的每个项目恰好调用一次resultSelector.当一个左值的窗口与多个右值的窗口重叠时,
Group会通过为每个这样的配对调用选择器一次来处理这种情况, 但GroupJoin通过让作为resultSelector的第二个参数传递的可观察对象发出与该左值重叠的每个右项目来处理这种情况. (如果一个左项目与右项目没有重叠,resultSelector仍将用该项目调用, 只是它将被传递一个不产生任何项目的IObservable<TRight>. )GroupJoin的签名与Join非常相似, 但请注意resultSelector参数的差异.public static IObservable<TResult> GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult> ( this IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IOObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, IObservable<TRight>, TResult> resultSelector )如果回到第一个
Join示例, 其中有left产生值的速度是right的两倍,- 左窗口永远不关闭,
- 右窗口立即关闭
这个图再次展示了那些相同的输入, 并且还展示了
GroupJoin会为left产生的每个项目传递给resultSelector的可观察对象:这产生了与
Join产生的所有相同事件相对应的事件, 它们只是分布在六个不同的IObservable<TRight>源中.同时, 使用
GroupJoin, 可以通过这样做有效地重新创建自己的Join方法:public IObservable<TResult> MyJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>( IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, TRight, TResult> resultSelector) { return Observable.GroupJoin ( left, right, leftDurationSelector, rightDurationSelector, (leftValue, rightValues) => rightValues.Select(rightValue=>resultSelector(leftValue, rightValue)) ) .Merge(); }甚至可以用这段代码创建一个粗糙版本的
Window:public IObservable<IObservable<T>> MyWindow<T>(IObservable<T> source, TimeSpan windowPeriod) { return Observable.Create<IObservable<T>>(o =>; { var sharedSource = source .Publish() .RefCount(); var intervals = Observable.Return(0L) .Concat(Observable.Interval(windowPeriod)) .TakeUntil(sharedSource.TakeLast(1)) .Publish() .RefCount(); return intervals.GroupJoin( sharedSource, _ => intervals, _ => Observable.Empty<Unit>(), (left, sourceValues) => sourceValues) .Subscribe(o); }); }Rx通过允许查询巧合序列, 提供了另一种查询动态数据的方法. 解决在从多个源进行匹配时管理状态和并发的内在复杂问题. 通过封装这些低级操作, Rx以富有表现力和可测试的方式设计软件. 使用Rx操作符作为构建块, 的代码有效地成为许多简单操作符的组合. 这使得领域代码的复杂性成为焦点, 而不是其他附带的支持代码.
与-然后-当操作(And-Then-When)
Zip只能接受两个序列作为输入. 如果这是个问题, 那么可以使用And,Then和When这三个方法的组合.这些方法的使用方式与大多数其他Rx方法略有不同. 在这三个方法中,
And是IObservable<T>的唯一扩展方法.与大多数Rx操作符不同, 它不返回一个序列; 相反, 它返回神秘的类型
Pattern<T1, T2>.Pattern<T1, T2>类型是公共的(显然), 但其所有属性都是内部的.可以对
Pattern<T1, T2>做的仅有的两件(有用的)事情是调用其And或Then方法.在
Pattern<T1, T2>上调用And方法返回一个Pattern<T1, T2, T3>. 在该类型上, 也会找到And和Then方法.泛型
Pattern类型的存在是为了允许将多个And方法链接在一起, 每个方法将泛型类型参数列表扩展一个.然后用
Then方法重载将它们全部组合在一起.Then方法返回一个Plan类型. 最后, 将这个Plan传递给Observable.When方法以创建序列.这可能听起来非常复杂, 但比较一些代码示例应该会使其更容易理解.
要将三个序列拉链在一起, 可以像这样链接
Zip方法:IObservable<long> one = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5); IObservable<long> two = Observable.Interval(TimeSpan.FromMilliseconds(250)).Take(10); IObservable<long> three = Observable.Interval(TimeSpan.FromMilliseconds(150)).Take(14); // lhs代表'Left Hand Side'(左手边) // rhs代表'Right Hand Side'(右手边) IObservable<(long One, long Two, long Three)> zippedSequence = one .Zip(two, (lhs, rhs) => (One: lhs, Two: rhs)) .Zip(three, (lhs, rhs) => (lhs.One, lhs.Two, Three: rhs)); zippedSequence.Subscribe( v => Console.WriteLine($"One: {v.One}, Two: {v.Two}, Three: {v.Three}"), () => Console.WriteLine("Completed"));也许使用
And/Then/When的更简洁语法:Pattern<long, long, long> pattern = one.And(two).And(three); Plan<(long One, long Two, long Three)> plan = pattern.Then((first, second, third) => (One: first, Two: second, Three: third)); IObservable<(long One, long Two, long Three)> zippedSequence = Observable.When(plan); zippedSequence.Subscribe( v => Console.WriteLine($"One: {v.One}, Two: {v.Two}, Three: {v.Three}"), () => Console.WriteLine("Completed"));如果愿意, 这可以进一步简化为:
IObservable<(long One, long Two, long Three)> zippedSequence = Observable.When( one.And(two).And(three) .Then((first, second, third) => (One: first, Two: second, Three: third)) ); zippedSequence.Subscribe( v => Console.WriteLine($"One: {v.One}, Two: {v.Two}, Three: {v.Three}"), () => Console.WriteLine("Completed"));And/Then/When这一组有更多的重载, 能够组合更多数量的序列. 它们还允许提供多个"计划" (Then方法的输出).总结
本章介绍了一组允许组合可观察序列的方法. 这使结束了第二部分的内容. 已经研究了主要用于定义想要对数据执行的计算的操作符. 在第三部分, 将转向实际问题, 如管理调度, 副作用和错误处理.