December 1, 2023
By: Kevin

ZeroMQ/NetMQ 编程指南

  1. Hello World
    1. Server
    2. Client
  2. Bind vs Connect
  3. 何时使用bind或connect?
  4. 多段消息(Frame)
  5. 通讯模式
  6. NetMQ的参数选项
  7. 发送及接收
  8. MQ的消息
    1. 消息结构
    2. 创建多段消息
    3. 读取多段消息
    4. 一个完整的示例
  9. 传输协议
    1. TCP
    2. InProc 进程间(通讯)
    3. PGM
  10. 清除
    1. 为什么NetMQ需要清除
    2. 什么是Linger?
    3. 如何清除?
  11. NetMQ的组件(Components )
    1. Pollers
    2. Timers
    3. 加入/移除 sockets/timers
    4. 控制polling
    5. 一个更复杂的例子
    6. 执行效率
    7. NetMQ Actor Model
    8. Timer
    9. Queue
    10. Proactor
  12. 通讯模式
    1. Request - Response
    2. Pub - Sub
    3. Push - Pull
  13. 参考

zmq

ZeroMQ是一个高性能, 适用于几乎所有场景的消息函数库,希望通过阅读这篇文章,能够帮助读者快速地掌握它, 以及它在.NET平台上的化身NetMQ.

NetMQ不是那些你下载后,看一下示例就会用的函数库,它背后有一些原则,要先了解后才能顺利应用,所以最佳的开始的地方是ZeroMQ guide,读一或两次后,再回来这篇文章。

ZeroMQ的哲理是从Zero开始.Zero是指无中介(Zero broker),零延迟(Zero delay), 零拷贝(Zero Copy) 及零管理(Zero management).

更进一步说,zero代表渗透整个项目的极简主义的文化. 精妙的上层抽象可以在消除复杂性的同时增加功能. ZeroMQ被称为ZMQ, 或者ØMQ, 下文中这几个词都是一个意思.

可以从NuGet获取函数库.

由于NetMQ就是关于sockets的,处理IO收发是它的本命.

Hello World

从Hello world开始:

Server

using (var server = new ResponseSocket())
{
    server.Bind("tcp://*:5555");

    while (true)
    {
        var message = server.ReceiveFrameString(Encoding.UTF8);

        Console.WriteLine("Received {0}", message);

        // processing the request
        Thread.Sleep(100);

        Console.WriteLine("Sending World");
        server.SendFrame("World", false);
    }
}

服务器创建了一个response的socket类型(在request-response章节有更多介绍),将它绑定到port 5555然后等待消息.

可以看到我们没有其他设置,只简单发送字符串.

注意, NetMQ中对字符串的编码的处理, frame在传输的时候是字节数组, 如果是字符串, 收发都涉及到转码的问题.

NetMQ不只可以发送字符串,虽然它没有实现序列化的功能(各种类型都需要自己实现), 本质上ZMQ是基于frame的, 可以很方便的发送Multipart messages.

Client

using (var client = new RequestSocket())
{
    client.Connect("tcp://localhost:5555");

    for (int i = 0; i < 10; i++)
    {
        Console.WriteLine("Sending Hello");
        client.SendFrame("Hello");

        var message = client.ReceiveFrameString();
        Console.WriteLine("Received {0}", message);
    }
}

Client端创建了一个request类型的socket,连接(connect)并开始发(send)送消息.

发送及接收函数默认是阻塞式的.对接收来说很简单:如果没有收到消息它会阻塞;

而发送较复杂一点,且跟它的socket类型有关.

对request sockets来说,如果到达高水位(high watermark),且没有另一端的连接,函数会阻塞.

然而你可以调用TrySendTryReceive以避免等待,如果需要等待(发不出去),这类带try的函数发送会返回false.

string message;
if (client.TryReceiveFrameString(out message))
    Console.WriteLine("Received {0}", message);
else
    Console.WriteLine("No message received");

Bind vs Connect

在上述示例中很显眼的是server端使用Bind而client端使用Connect.

BindConnect是表层抽象, 队列(queue)和连接(connection)是底层实现.

ZMQ为每个潜在的连接建立队列.如果你的socket连接到三个socket端点,背后实际有三个队列存在.

使用Bind,可以让其他端点建立连接,而且因为不知道未来会有多少端点且无法先建立队列,相反,队列会在每个端点bound后建立.

使用Connect,ZeroMQ知道至少会有一个端点,因此它会马上建立队列,除了ROUTE类型外的所有类型都是如此,而ROUTE类型只会在我们连接的每个端点有了回应后才建立队列.

因此,当发送消息至没有绑定的端点的socket或至没有连接的ROUTE时,将没有可以存储消息的队列存在.

何时使用bind或connect?

作为一般规则,在架构中最稳定的端点上使用Bind,在动态的、易变的端点上使用Connect.

request/reply型来说,服务端使用bind,而client端使用connect,如同传统的TCP一样.

如果你无法确认那一部分会比较稳定(例如点对点连接),可以考虑在中间放一个稳定的可让所有端点连接的设备.

你可 进一步阅读ZeroMQ FAQ中的”Why do I see different behavior when I bind a socket versus connect a socket?”部分.

多段消息(Frame)

ZMQ的内容收发围绕frame这一核心概念,大多数的消息都可以想成含有一或多个frame.

入门ZMQ的时候, 例子一般都拿一些函数收送字符串消息,然而复杂应用的核心始终是多段frame的序列化/反序列化.

这一话题在Message章节有更多说明.

通讯模式

当代分布式应用系统中, 有多种分布式的通讯模式, ZMQ编程指南讲述了所有需要知道的知识,以帮助应用这些模式.

在开始用NetMQ前请先确保你已读过下列章节.

  • Chapter 2 - Sockets and Patterns
  • Chapter 3 - Advanced Request-Reply Patterns
  • Chapter 4 - Reliable Request-Reply Patterns
  • Chapter 5 - Advanced Pub-Sub Patterns

NetMQ也提供了以NetMQ API撰写的几个模式的示例.

其他模式在看过ZMQ指南后很简单的改用NetMQ实现.

这里有一些已用NetMQ实现的示例程序:

  • Brokerless Reliability Pattern - Freelance Model one
  • Load Balancer Patterns
  • Lazy Pirate Pattern
  • Simple Pirate Pattern

其余的示例,多看看ZMQ指南吧.

ZMQ模式是以特定类型实现的sockets配对.

换句话说,要了解ZeroMQ的模式,要先知道有那些socket类型及它们如何配合.

这部分只需需要学习和适应, 并不那么显而易见.

ZeroMQ内建的核心模式是:

  • 请求-回应,将一组客户端连接至一组服务端.这是一种远程程序调用和task分布模式.
  • 发布-订阅,连接一组发布者至一组订阅者.这是一种数据分布式模式.
  • 管道,连接在一个有多步骤及回环的fan-out/fan-in模式中的节点,这是一种 parallel task distribution and collection.
  • Exclusive pair,独占式地连接两个socket.这是一种在process中连接两个执行线程的模式,不要和一般的socket配对混淆.

以下是NetMQ有效的connect/bind的socket合并配对(任何一方都可以bind):

  • PublisherSocketSubscriberSocket
  • RequestSocketResponseSocket
  • RequestSocketRouterSocket
  • DealerSocketResponseSocket
  • DealerSocketRouterSocket
  • DealerSocketDealerSocket
  • RouterSocketRouterSocket
  • PushSocketPullSocket
  • PairSocketPairSocket

任何其他的配对方式会产生不可靠的结果,ZMQ未来的版本可能会在你尝试时告知错误.

NetMQ的参数选项

NetMQ提供了数个会影响动作的选项.

根据你使用的socket类型或你尝试建立的拓扑,可能发现需要设置一些ZeroMQ的选项.

NetMQ中,可通过NetMQSocket.Options属性完成.

以下是你可以在NetMQSocket.Options上设置的可用属性的列表.

要设置哪些值取决于你想要实现什么.

这里能做的是列出选项, 如下所示:

  • Affinity
  • BackLog
  • CopyMessages
  • DelayAttachOnConnect
  • Endian
  • GetLastEndpoint
  • IPv4Only
  • Identity
  • Linger
  • MaxMsgSize
  • MulticastHops
  • MulticastRate
  • MulticastRecoveryInterval
  • ReceiveHighWaterMark
  • ReceiveMore
  • ReceiveBuffer
  • ReconnectInterval
  • ReconnectIntervalMax
  • SendHighWaterMark
  • SendTimeout
  • SendBuffer
  • TcpAcceptFilter
  • TcpKeepAlive
  • TcpKeepaliveIdle
  • TcpKeepaliveInterval
  • XPubVerbose

这里不会讲到所有选项,在用到时才会提.现在只要注意,如果你已经在ZeroMQ指南中读过某些选项,那么这应是你需要设置/读取的地方.

发送及接收

接收

IReceivingSocket(所有socket皆继承自此接口)有两个函数:

- void Receive(ref Msg msg);
- bool TryReceive(ref Msg msg, TimeSpan timeout);

第一个函数会永远阻塞直到消息到达,第二个让我们提供一个超时时间(可能是零).

这些函数依靠Msg对象的重复使用提供我们很高的效率(避免对象创建/回收的代价).

然而大多时间会想要更方便的函数来帮你接收stringbyte[]等类型,NetMQReceivingSocketExtensions类别中提供了很多IReceivingSocket类型的方便函数:

// Receiving byte[]
byte[] ReceiveFrameBytes()
byte[] ReceiveFrameBytes(out bool more)
bool TryReceiveFrameBytes(out byte[] bytes)
bool TryReceiveFrameBytes(out byte[] bytes, out bool more)
bool TryReceiveFrameBytes(TimeSpan timeout, out byte[] bytes)
bool TryReceiveFrameBytes(TimeSpan timeout, out byte[] bytes, out bool more)
List<byte[]> ReceiveMultipartBytes()
void ReceiveMultipartBytes(ref List<byte[]> frames)
bool TryReceiveMultipartBytes(ref List<byte[]> frames)
bool TryReceiveMultipartBytes(TimeSpan timeout, ref List<byte[]> frames)

// Receiving strings
string ReceiveFrameString()
string ReceiveFrameString(out bool more)
string ReceiveFrameString(Encoding encoding)
string ReceiveFrameString(Encoding encoding, out bool more)
bool TryReceiveFrameString(out string frameString)
bool TryReceiveFrameString(out string frameString, out bool more)
bool TryReceiveFrameString(Encoding encoding, out string frame

String)
bool TryReceiveFrameString(Encoding encoding, out string frameString, out bool more)
bool TryReceiveFrameString(TimeSpan timeout, out string frameString)
bool TryReceiveFrameString(TimeSpan timeout, out string frameString, out bool more)
bool TryReceiveFrameString(TimeSpan timeout, Encoding encoding, out string frameString)
bool TryReceiveFrameString(TimeSpan timeout, Encoding encoding, out string frameString, out bool more)
List<string> ReceiveMultipartStrings()
List<string> ReceiveMultipartStrings(Encoding encoding)
bool TryReceiveMultipartStrings(ref List<string> frames)
bool TryReceiveMultipartStrings(Encoding encoding, ref List<string> frames)
bool TryReceiveMultipartStrings(TimeSpan timeout, ref List<string> frames)
bool TryReceiveMultipartStrings(TimeSpan timeout, Encoding encoding, ref List<string> frames)

// Receiving NetMQMessage
NetMQMessage ReceiveMultipartMessage()
bool TryReceiveMultipartMessage(ref NetMQMessage message)
bool TryReceiveMultipartMessage(TimeSpan timeout, ref NetMQMessage message)

// Receiving signals
bool ReceiveSignal()
bool TryReceiveSignal(out bool signal)
bool TryReceiveSignal(TimeSpan timeout, out bool signal)

// Skipping frames
void SkipFrame()
void SkipFrame(out bool more)
bool TrySkipFrame()
bool TrySkipFrame(out bool more)
bool TrySkipFrame(TimeSpan timeout)
bool TrySkipFrame(TimeSpan timeout, out bool more)

注意为了可读性this IReceivingSocket socket参数被省略掉了.

这些扩展函数应符合大多数的需求,如果没有的话也可以很简单的建立自己需要的.

这里是上述扩展函数之一实现的方式,可以帮助建立自己的实现:

public static string ReceiveFrameString(this IReceivingSocket socket, Encoding encoding, out bool more)
{
    var msg = new Msg();
    msg.InitEmpty();
    socket.Receive(ref msg);
    more = msg.HasMore;
    var str = msg.Size > 0
        ? encoding.GetString(msg.Data, 0, msg.Size)
        : string.Empty;
    msg.Close();
    return str;
}

发送

一个NetMQSocket(所有socket皆继承至此)有一个Send函数.

public virtual void Send(ref Msg msg, SendReceiveOptions options)

如果你不想使用这个函数,也可以用为了IOutgoingSocket建立的其他方便的扩展函数.

以下列出这些扩展函数,应该够使用,若是不足也可以自行建立.

public static class OutgoingSocketExtensions
{
    public static void Send(this IOutgoingSocket socket, byte[] data);
    public static void Send(this IOutgoingSocket socket, byte[] data, int length, SendReceiveOptions options);
    public static void Send(this IOutgoingSocket socket, string message, bool dontWait = false, bool sendMore = false);
    public static void Send(this IOutgoingSocket socket, string message, Encoding encoding, SendReceiveOptions options);
    public static void Send(this IOutgoingSocket socket, byte[] data, int length, bool dontWait = false, bool sendMore = false);
    public static void Send(this IOutgoingSocket socket, string message, Encoding encoding, bool dontWait = false, bool sendMore = false);
    public static void SendMessage(this IOutgoingSocket socket, NetMQMessage message, bool dontWait = false);
    public static IOutgoingSocket SendMore(this IOutgoingSocket socket, byte

[] data, bool dontWait = false);
    public static IOutgoingSocket SendMore(this IOutgoingSocket socket, string message, bool dontWait = false);
    public static IOutgoingSocket SendMore(this IOutgoingSocket socket, byte[] data, int length, bool dontWait = false);
    public static IOutgoingSocket SendMore(this IOutgoingSocket socket, string message, Encoding encoding, bool dontWait = false);
    ....
    ....
}

这里是上述扩展函数之一实现的方式,可以帮助建立自己的:

public static void Send(this IOutgoingSocket socket, string message,
                        Encoding encoding, SendReceiveOptions options)
{
    var msg = new Msg();
    msg.InitPool(encoding.GetByteCount(message));
    encoding.GetBytes(message, 0, message.Length, msg.Data, 0);
    socket.Send(ref msg, options);
    msg.Close();
}

MQ的消息

消息结构

using (var server = new ResponseSocket("@tcp://127.0.0.1:5556"))
using (var client = new RequestSocket(">tcp://127.0.0.1:5556"))
{
    client.Send("Hello");

    string fromClientMessage = server.ReceiveFrameString();

    Console.WriteLine("From Client: {0}", fromClientMessage);

    server.SendFrame("Hi Back");

    string fromServerMessage = client.ReceiveFrameString();

    Console.WriteLine("From Server: {0}", fromServerMessage);

    Console.ReadLine();
}

NetMQ的socket有一个ReceiveFrameString()函数,这是一个很好且有用的函数,但如果你认为只能用它那就不对了.

ZeroMQ/NetMQ是完全基于frame的,意味着它们实现某种形式的协议.

虽然如果你希望设计一些复杂和精细的架构,最好能设计出一个好的协议,但幸运的是,我们并不总是需要这样做.

这在很大程度上归功于ZeroMQ/NetMQ的智能socket,它为我们抽象了很多内容,可以将socket视为构建复杂架构的基础结构.

一个例子是RouterSocket,它与众不同且聪明地使用frame,在发送者消息上加了一层代表返回地址的信息,所以当它接收到一个返回消息(从另一个工作的socket),它可以使用收到的frame消息来获取来源地址,并依此地址返回消息.

注意的一个内建的frame的使用的例子,frame并不限制在RouterSocket类型,可以在所有的地方使用,如下列示例:

如果想让frame[0]表示接下来的1frame的类型,这让接收者可以去掉不感兴趣的消息,且不需要花费时间反序列化消息,NetMQPub-Sub sockets中使用这个想法,可以替换或是扩展它.

也许想让frame[0]代表某种命令,frame[1]代表参数,frame[2]代表实际消息内容(也许包含序列化的对象).

这只是一些示例,实际上可以用任何你想的方式来操作frame,虽然一些socket类型会期待或产生特定的frame结构.

当使用多段消息(frames)时需要一次发送/接收所有段的消息.

有一个内建的more的概念可以让你整合使用,稍后会有更多

创建多段消息

创建多段消息很简单,有两种方式.

创建消息对象

你可以创建NetMQMessage对象并通过Append(...)重载函数来加上frame数据,也有其他重载可以让你加上Blob, NetMQFrame, byte[], int, longstring等.

下列是一个加上两个frame的消息的示例,每个frame都包含一个字符串值:

var message = new NetMQMessage();
message.Append("IAmFrame0");
message.Append("IAmFrame1");
server.SendMessage(message);

逐帧发送

另一个发送多段消息的方法是使用SendMoreFrame扩展函数,这不像SendMessage一样有很多重载,但是它可以很简单地发送byte[],string数据.这是一个和前述示例相像的示例:

server.SendMoreFrame("IAmFrame0")
      .SendFrame("IAmFrame1");

要发送超过两个frame,可将多个SendMoreFrame呼叫链结在一起,只要确定最后一个是SendFrame.

读取多段消息

读取多段消息也有两个方法.

逐帧接收 可以从socket中一次读出一个frame.Out参数more会告诉你目前是不是最后一个消息.

你也可以使用方便的NetMQ函数ReceiveFrameString(out more)多次,只需要知道是不是还有frame待读取,所以要追踪more变量的状态,如下示例:

// server sends a message with two frames
server.SendMoreFrame("A")
      .SendFrame("Hello");

// client receives all frames in the message, one by one
bool more = true;
while (more)
{
    string frame = client.ReceiveFrameString(out more);
    Console.WriteLine("frame={0}", frame);
    Console.WriteLine("more={0}", more);
}

这个循环将执行两次.第一次,more将被设为true.第二次,false.输出将是:

frame=A
more=true
frame=Hello
more=false

读取整段消息

一个更简单的方法是使用ReceiveMultipartMessage()函数,它提供一个包含消息的所有frame的对象.

NetMQMessage message = client.ReceiveMultipartMessage();
Console.WriteLine("message.FrameCount={0}", message.FrameCount);
Console.WriteLine("message[0]={0}", message[0].ConvertToString());
Console.WriteLine("message[1]={0}", message[1].ConvertToString());

输出会是:

message.FrameCount=2
message[0]=A
message[1]=Hello

也有其他功能,如:

IEnumerable<string> framesAsStrings = client.ReceiveMultipartStrings();
IEnumerable<byte[]> framesAsByteArrays = client.ReceiveMultipartBytes();

一个完整的示例

这里有一个完整的示例,以加深至目前为止我们谈论的印象:

using (var server = new ResponseSocket("@tcp://127.0.0.1:5556"))
using (var client = new RequestSocket(">tcp://127.0.0.1:5556"))


{
    // client sends message consisting of two frames
    Console.WriteLine("Client sending");
    client.SendMoreFrame("A").SendFrame("Hello");

    // server receives frames
    bool more = true;
    while (more)
    {
        string frame = server.ReceiveFrameString(out more);
        Console.WriteLine("Server received frame={0} more={1}",
            frame, more);
    }

    Console.WriteLine("================================");

    // server sends message, this time using NetMqMessage
    var msg = new NetMQMessage();
    msg.Append("From");
    msg.Append("Server");

    Console.WriteLine("Server sending");
    server.SendMultipartMessage(msg);

    // client receives the message
    msg = client.ReceiveMultipartMessage();
    Console.WriteLine("Client received {0} frames", msg.FrameCount);

    foreach (var frame in msg)
        Console.WriteLine("Frame={0}", frame.ConvertToString());

    Console.ReadLine();
}

输出如下:

Client sending
Server received frame=A more=true
Server received frame=Hello more=false
================================
Server sending
Client received 2 frames
Frame=From
Frame=Server

传输协议

NetMQ支持三种主要的协议:

  • TCP (tcp://)
  • InProc (inproc://)
  • PGM (pgm://) — 需要MSMQ并以管理员身份运行

下面将逐一介绍.

TCP

TCP是最常用到的协议,因此,大部分的代码会使用TCP展示.

示例

再来一个简单的示例:

using (var server = new ResponseSocket())
using (var client = new RequestSocket())
{
    server.Bind("tcp://*:5555");
    client.Connect("tcp://localhost:5555");

    Console.WriteLine("Sending Hello");
    client.SendFrame("Hello");

    var message = server.ReceiveFrameString();
    Console.WriteLine("Received {0}", message);

    Console.WriteLine("Sending World");
    server.SendFrame("World");

    message = client.ReceiveFrameString();
    Console.WriteLine("Received {0}", message);
}

输出:

Sending Hello
Received Hello
Sending World
Received World

地址格式

注意地址格式字符串会传送给Bind()及Connect()函数. 在TCP连接中,它会被组成:

tcp://*:5555

这由三个部分构成:

  • 协议(tcp)
  • 主机(IP地址,主机名或匹配"*"的wildcard)
  • 端口号(5555)

InProc 进程间(通讯)

InProc (in-process)让你可以在同一个process中用sockets连接通讯,这很有用,有几个理由:

  • 取消共享状态/锁.当你传送数据至socket时不需要担心共享状态.Socket的每一端都有自己的副本.
  • 能够在系统的不同的部分之间进行通信.
  • NetMQ提供了几个使用InProc的组件,例如Actor模型和Devices,在相关文件中会再讨论.

示例

现在让我们通过在两个执行线程之间传送一个字符串(为了简单起见)展示一个简单的InProc.

using (var end1 = new PairSocket())
using (var end2 = new PairSocket())
{
    end1.Bind("inproc://inproc-demo");
    end2.Connect("inproc://inproc-demo");

    var end1Task = Task.Run(() =>
    {
        Console.WriteLine("ThreadId = {0}", Thread.CurrentThread.ManagedThreadId);
        Console.WriteLine("Sending hello down the inproc pipeline");
        end1.SendFrame("Hello");
    });
    var end2Task = Task.Run(() =>
    {
        Console.WriteLine("ThreadId = {0}", Thread.CurrentThread.ManagedThreadId);
        var message = end2.ReceiveFrameString();
        Console.WriteLine(message);
    });
    Task.WaitAll(new[] { end1Task, end2Task });
}

输出:

ThreadId = 12
ThreadId = 6
Sending hello down the inproc pipeline
Hello

地址格式

注意地址格式字符串会传送给Bind()及Connect()函数. 在InProc连接中,它会被组成:

  • inproc://inproc-demo 这由两个部分构成:
  1. 协议(inproc)
  2. 标识名称(inproc-demo可以是任何字符串,在process范围内是唯一的名称)

PGM

Pragmatic General Multicast (PGM)是一种可靠的多播传输协议,用于需要有序、无序、不重复等可从多个来源至多个接收者的多播数据.

PGM保证群组中的接收者可接收来自不管是传送或修复,或可检测无法复原的数据封包的遗失.PGM被设计为一个拥有基本的可靠度需求的解决方案.它的中心设计目标是操作的简易性且保证其弹性及网络效率.

要使用NetMQ的PGM,我们不用做太多,只需遵循以下三点:

  • Sockets类型现在是PublisherSocket and SubscriberSocket,在pub-sub pattern会有更详细的介绍.
  • 确定你以”Administrator”等级执行软件.
  • 确定已打开”Multicasting Support”,可依以下方式:

示例 这里是一个使用PGM的小示例,以及PublisherSocket and SubscriberSocket和几个选项值.

const int MegaBit = 1024;
const int MegaByte = 1024;
using (var pub = new PublisherSocket())
using (var sub1 = new SubscriberSocket())
using (var sub2 = new SubscriberSocket())
{
    pub.Options.MulticastHops = 2;
    pub.Options.MulticastRate = 40 * MegaBit; // 40 megabit
    pub.Options.MulticastRecoveryInterval = TimeSpan.FromMinutes(10);
    pub.Options.SendBuffer = MegaByte * 10; // 10 megabyte
    pub.Connect("pgm://224.0.0.1:5555");

    sub1.Options.ReceiveBuffer = MegaByte * 10;
    sub1.Bind("pgm://224.0.0.1:5555");
    sub1.Subscribe("");

    sub2.Bind("pgm://224.0.0.1:5555");
    sub2.Options.ReceiveBuffer = MegaByte * 10;
    sub2.Subscribe("");

    Console.WriteLine("Server sending 'Hi'");
    pub.Send("Hi");

    bool more;
    Console.WriteLine("sub1 received = '{0}'", sub1.ReceiveString(out more));
    Console.WriteLine("sub2 received = '{0}'", sub2.ReceiveString(out more));
}

执行后输出如下:

Server sending 'Hi'
sub1 received = 'Hi'
sub2 received = 'Hi'

地址格式 注意传入Bind() and Connect()的字符串地址格式,对InProc连接来说,会类似:

pgm://224.0.0.1:5555

它以三个部分组成:

  • 协议(pgm)
  • 主机(如244.0.0.1之类的IP地址,主机名称,或通配符*的匹配)
  • 端口号(5555) 另一个不错的PGM的资料是PGM unit tests.

清除

在NetMQ第4版中我们拿掉了NetMQContext,现在我们可以用新的运算子建立sockets了,虽然这让函数库较简单,但也增加了一些需要清除的复杂性.

为什么NetMQ需要清除

NetMQ在背景建立了一些执行线程.因此,当你在Socket上呼叫Dispose时,这个处理是非同步的且发生在背景执行线程中.而因为NetMQ的执行线程是属于背景执行线程,所以你实际上可以不正确清除并离开程序,但不建议.

当离开AppDomain时会更复杂,所以你需要清除NetMQ.

什么是Linger?

Linger是socket在被dispose时传送当下尚未传送所有消息的允许时间.所以当我们在一个Linger设为1秒的socket上呼叫Dispose时,它会最多花费一秒直到socket被disposed,此时函数库会试着传送所有等待中的消息,如果它在linger时间到达前传送完成,此socket会马上被disposed.

正如所说,这一切发生在背景中,所以若linger有被设置,但我们没有正确清除函数库,linger会被略过.如果linger对你很重要,要确保你正确的清除函数库.

在第四版中预设的Linger值是零,表示函数库不会在dispose前等待.你可以变更单一socket的linger值,也可以透过NetMQConfig.Linger设定所有linger的值.

如何清除?

关于cleanup最重要的是你要在呼叫Cleanup前呼叫所有socket的Dispose,也要确保NetMQ函数库中的其它资源如NetMQPoller、NetMQQueue等被正确cleanup,如果socket没有被disposed,那NetMQConfig.Cleanup会永远阻塞.

最后你需要呼叫NetMQConfig.Cleanup,你可以如下所示的方式:

static void Main(string[] args)
{
    try
    {
        // Do you logic here
    }
    finally
    {
        NetMQConfig.Cleanup();
    }
}

如果你很懒惰,不关心清理函数库,你也可以呼叫NetMQConfig.Cleanup并将block参数设为false.当设为false时,cleanup不会等待Sockets发送所有消息,并且只会kill背景执行线程.

如果你在你的测试中使用NetMQ,你也要确保你正确的对函数库做cleanup.

这边建议可加一个全域的tear down在你的测试中,并呼叫NetMQConfig.Cleanup. 示范若是在NUnit中可以:

[SetUpFixture]
public class Setup
{
    [OneTimeTearDown]
    public void TearDown()
    {
        NetMQConfig.Cleanup(false);
    }
}

在测试中,呼叫Cleanup并代入false可让你在测试失败时不让程序中断.

NetMQ的组件(Components )

Pollers

动机1:效率

NetMQPoller有很多示例.首先让我们来看一个简单的服务器:

using (var rep = new ResponseSocket("@tcp://*:5002"))
{
    // process requests, forever...
    while (true)
    {
        // receive a request message
        var msg = rep.ReceiveFrameString();

        // send a canned response
        rep.Send("Response");
    }
}

这个服务器会很快且永远处理回应.

如果我们想在同一个执行线程中处理两个不同的response sockets中呢?

using (var rep1 = new ResponseSocket("@tcp://*:5001"))
using (var rep2 = new ResponseSocket("@tcp://*:5002"))
{
    while (true)
    {
        // Hmmm....
    }
}

我们要如何公平的处理两个response sockets的服务?不能一次处理一个吗?

// blocks until a message is received
var msg1 = rep1.ReceiveString();

// might never reach this code!
var msg2 = rep2.ReceiveString();

一个等待接收的函数会阻塞直到有消息抵达.如果我们在rep1等待接收,那传送给rep2的所有消息会被忽略,直到rep1收到消息-也可能永远收不到,所以这当然不是一个好方法.

相反的,我们可以在rep1和rep2上用非阻塞式的接收函数,但这可能会在没有消息的状况下让当前CPU的负载过高,所以,这也不是一个好方法…

我们可以引进使用非阻塞式函数中的timeout参数.然而,什么值比较合适呢?如果我们用10ms,那如果rep1没有收到消息,那rep2最多只能取得每秒100个消息(反之也成立),这严重限制了吞吐量,而且无法有效地利用资源.

所以我们需要一个较好的方式.

动机2: 正确性 接续上面的示例,也许你会考虑每个socket放在不同的执行线程当中,并且采用阻塞式呼叫,虽然这在一些状况下是个好方法,但是它有一些限制.

对ZeroMQ/NetMQ来说,为了发挥最大效能,所存在的限制是我们使用socket的方式.特别地说,

NetMQSocket不是执行线程安全的,在多个执行线程中同步使用同一個socket是无效的.

举例来说,考虑我们在Thread A中有一个socket A的循环在服务,在Thread B中有一个socket B的循环在服务,若试着在socket A中接收消息,并传送至socket B,是无效的.Socket不是执行线程安全的,所以试着在执行线程A和B中同步使用可能会导致错误.

事实上,这里描述的模式被称为proxy,并且也被内置在NetMQ中.在这一点上,你可能不会讶异地发现它由NetMQPoller来实作.

使用ReceiveReady

让我们使用一个Poller来从一个执行线程简单地服务两个sockets:

using (var rep1 = new ResponseSocket("@tcp://*:5001"))
using (var rep2 = new ResponseSocket("@tcp://*:5002"))
using (var poller = new NetMQPoller { rep1, rep2 })
{
    // these event will be raised by the Poller
    rep1.ReceiveReady += (s, a) =>
    {
        // receive won't block as a message is ready
        string msg = a.Socket.ReceiveString();
        // send a response
        a.Socket.Send("Response");
    };
    rep2.ReceiveReady += (s, a) =>
    {
        // receive won't block as a message is ready
        string msg = a.Socket.ReceiveString();
        // send a response
        a.Socket.Send("Response");
    };

    // start polling (on this thread)
    poller.Run();
}

这段程序设置了两个sockets,并绑定到不同的地址,并在一个NetMQPoller中使用集合初始化加入这两个sockets(也可以使用Add(NetMQSocket)函数),并在各别socket的ReceiveReady事件加上处理函数,最后poller由Run()启动,并开始阻塞直到Poller的Stop函数被呼叫为止.

在内部,NetMQPoller以最佳方式解决上述问题.

使用SendReady 暂缺

Timers

Pollers有一个额外的功能:Timer.

如果需要在一个执行线程当中对一或多个sockets,执行一些周期性的操作,你可以在NetMQPoller中加上一个NetMQTimer.

这个示例会每秒推送一个消息至所有已连接的端点.

var timer = new NetMQTimer(TimeSpan.FromSeconds(1));

using (var pub = new PublisherSocket("@tcp://*:5001"))
using (var poller = new NetMQPoller { pub, timer })
{
    pub.ReceiveReady += (s, a) => { /* ... */ };

    timer.Elapsed += (s, a) =>
    {
        pub.Send("Beep!");
    };

    poller.Run();
}

加入/移除 sockets/timers

Sockets和timers在执行时可以被安全的加入至或从Poller中移除. 注意NetMQSocket,NetMQActorNetMQBeacon都实现了ISocketPollable,所以NetMQPoller可以监示所有这些类型.

  • AddSocket(ISocketPollable)
  • RemoveSocket(ISocketPollable)
  • AddTimer(NetMQTimer)
  • RemoveTimer(NetMQTimer)
  • AddPollInSocket(System.Net.Sockets.Socket, Action)
  • RemovePollInSocket(System.Net.Sockets.Socket)

控制polling

到目前为止,我们知道了可以使用Plller的了Run函数.这让执行线程用于轮询活动,直到Poller被从socket/timer事件处理程序或从另一个执行线程中取消.

如果希望继续使被调用执行线程进行其他操作,可以RunAsync,它会在新执行线程中调用Run.

要停止Poller,使用StopStopAsync. 后者会等待直到Poller的循环在返回之前结束.

一个更复杂的例子

让我们看一个较复杂的示例,使用我们目前为止看到的大部分工具.

我们在接收到第一条消息时将从NetMQPoller中删除一个ResponseSocket,即使消息是正确的,ReceiveReady也不会被触发.

using (var rep = new ResponseSocket("@tcp://127.0.0.1:5002"))
using (var req = new RequestSocket(">tcp://127.0.0.1:5002"))
using (var poller = new NetMQPoller { rep })
{
    // this event will be raised by the Poller
    rep.ReceiveReady += (s, a) =>
    {
        bool more;
        string messageIn = a.Socket.ReceiveFrameString(out more);
        Console.WriteLine("messageIn = {0}", messageIn);
        a.Socket.SendFrame("World");

        // REMOVE THE SOCKET!
        poller.Remove(a.Socket);
    };

    // start the poller
    poller.RunAsync();

    // send a request
    req.SendFrame("Hello");

    bool more2;
    string messageBack = req.ReceiveFrameString(out more2);
    Console.WriteLine("messageBack = {0}", messageBack);

    // SEND ANOTHER MESSAGE
    req.SendFrame("Hello Again");

    // give the message a chance to be processed (though it won't be)
    Thread.Sleep(1000);
}

输出如下:

messageIn = Hello
messageBack = World

看到为什么Hello Again没有收到吗?这是因为在RecieiveReady中处理第一条消息时将ResponseSocketNetMQPoller中移除.

执行效率

使用Poller接收消息比在Socket上直接调用Receive函数慢.当处理数千条消息时,第二个或更多的Poller可能是瓶颈.

但是解决方案很简单,我们只需要使用Try*函数获取当前可用的socket的所有消息.以下是一个示例:

rep1.ReceiveReady += (s, a) =>
{
    string msg;
    // receiving all messages currently available in the socket before returning to the poller
    while (a.Socket.TryReceiveFrameString(out msg))
    {
        // send a response
        a.Socket.Send("Response");
    }
};

如果socket载入了不会停止的消息流,则上述解决方案可能导致其他socket的Starving.要解决这个问题,你可以限制一个批次中可以提取的消息数量.

rep1.ReceiveReady += (s, a) =>
{
    string msg;
    //  receiving 1000 messages or less if not available
    for (int count = 0; count < 1000; i++)
    {
        // exit the for loop if failed to receive a message
        if (!a.Socket.TryReceiveFrameString(out msg))
            break;

        // send a response
        a.Socket.Send("Response");
    }
};

以上就是关于NetMQ的一些基本使用和注意事项,希望对你有所帮助.在实际使用中,你可能还需要根据具体的业务需求和场景,对其进行适当的调整和优化.

NetMQ Actor Model

一个很好的思考Actors的方式是─他们是用来减轻一些在同步化时使用共享数据结构需要注意的地方.这是在你的程序中与actor通过消息传送/接收实现的.Actor本身可以将消息传送给其他actor,或者处理传送的消息本身.通过使用消息传送而不是使用共享数据结构,它有助于让你认为actor(或其发送消息的任何后续actor)实际上是在数据的拷贝上工作,而不是在相同的共享数据结构上工作.让我们摆脱了多执行线程程序中需要担心的可怕事情,如锁和任何讨厌的定时问题.如果actor使用自己的数据拷贝,那么我们应该没有其他的执行线程想要使用此actor所拥有的数据的问题,因为数据只在actor本身之内可见,unless we pass another message to a different actor.如果我们这样做,新的消息给另一个actor也只是另一个数据的拷贝,因此也是执行线程安全的.

在多执行线程当中共用数据 一个相当普遍的事情是用多个执行线程运行以加快速度,然后你发现到你的执行线程需要改变一些共享数据的状态,那么你会涉及到执行线程同步(最常见的 lock(..) statements, 以建立自己的 critical sections).这有用,但现在你正引入人为的延迟,由于必须等待锁被释放,所以你可以执行执行线程X的程序.

enter image description here

更进一步,让我们看看一些程序,可以说明这一点.想像一下,我们有一个数据结构代表非常简单的银行账户:

public class Account
{
    public Account(int id, string name, string sortCode, decimal balance)
    {
        Id = id;
        Name = name;
        SortCode = sortCode;
        Balance = balance;
    }

    public int Id { get; set; }
    public string Name { get; set; }
    public string SortCode { get; set; }
    public decimal Balance { get; set; }

    public override string ToString()
    {
        return string.Format("Id: {0}, Name: {1}, SortCode: {2}, Balance: {3}",
            Id, Name, SortCode, Balance);
    }
}

这里没有什么特别的,只是一些字段.让我们来看一些执行线程程序,我选择只显示两个执行线程共用Account实体的程序.

static void Main()
{
    var account = new Account(1, "sacha barber", "112233", 0);
    var syncLock = new object();

    // start two asynchronous tasks that both mutate the account balance

    var task1 = Task.Run(() =>
    {
        var threadId = Thread.CurrentThread.ManagedThreadId

以下是处理帐户操作的Actor的完整程序.这个例子是故意简化的,我们只用一笔金额借/贷一个帐户.你可以发送任何命令到Actor,而Actor只是一个一般化的处理消息的系统. 
代码如下:

```csharp
public class AccountActioner
{
    public class ShimHandler : IShimHandler
    {
        private PairSocket shim;
        private NetMQPoller poller;

        public void Initialise(object state)
        {
        }

        public void Run(PairSocket shim)
        {
            this.shim = shim;
            shim.ReceiveReady += OnShimReady;
            shim.SignalOK();

            poller = new NetMQPoller { shim };
            poller.Run();
        }

        private void OnShimReady(object sender, NetMQSocketEventArgs e)
        {
            string command = e.Socket.ReceiveFrameString();

            switch (command)
            {
                case NetMQActor.EndShimMessage:
                    Console.WriteLine("Actor received EndShimMessage");
                    poller.Stop();
                    break;
                case "AmmendAccount":
                    Console.WriteLine("Actor received AmmendAccount message");
                    string accountJson = e.Socket.ReceiveFrameString();
                    Account account
                        = JsonConvert.DeserializeObject<Account>(accountJson);
                    string accountActionJson = e.Socket.ReceiveFrameString();
                    AccountAction accountAction
                        = JsonConvert.DeserializeObject<AccountAction>(
                            accountActionJson);
                    Console.WriteLine("Incoming Account details are");
                    Console.WriteLine(account);
                    AmmendAccount(account, accountAction);
                    shim.SendFrame(JsonConvert.SerializeObject(account));
                    break;
            }
        }

        private void AmmendAccount(Account account, AccountAction accountAction)
        {
            switch (accountAction.TransactionType)
            {
                case TransactionType.Credit:
                    account.Balance += accountAction.Amount;
                    break;
                case TransactionType.Debit:
                    account.Balance -= accountAction.Amount;
                    break;
            }
        }
    }

    private NetMQActor actor;

    public void Start()
    {
        if (actor != null)
            return;

        actor = NetMQActor.Create(new ShimHandler());
    }

    public void Stop()
    {
        if (actor != null)
        {
            actor.Dispose();
            actor = null;
        }
    }

    public void SendPayload(Account account, AccountAction accountAction)
    {
        if (actor == null)
            return;

        Console.WriteLine("About to send person to Actor");

        var message = new NetMQMessage();
        message.Append("AmmendAccount");
        message.Append(JsonConvert.SerializeObject(account));
        message.Append(JsonConvert.SerializeObject(accountAction));
        actor.SendMultipartMessage(message);
    }

    public Account GetPayLoad()
    {
        return JsonConvert.DeserializeObject<Account>(actor.ReceiveFrameString());
    }
}

你可以使用以下代码和Actor沟通,再次地说,你可以使用任何命令,这个例子只显示对一个帐户的借/贷.

class Program
{
    static void Main(string[] args)
    {
        // CommandActioner uses an NetMq.Actor internally
        var accountActioner = new AccountActioner();

        var account = new Account(1, "Doron Somech", "112233", 0);
        PrintAccount(account);

        accountActioner.Start();
        Console.WriteLine("Sending account to AccountActioner/Actor");
        accountActioner.SendPayload(account,
            new AccountAction(TransactionType.Credit, 15));

        account = accountActioner.GetPayLoad();
        Print

```csharp
        PrintAccount(account);

        accountActioner.Stop();
        Console.WriteLine();
        Console.WriteLine("Sending account to AccountActioner/Actor");
        accountActioner.SendPayload(account,
            new AccountAction(TransactionType.Credit, 15));
        PrintAccount(account);

        Console.ReadLine();
    }

    static void PrintAccount(Account account)
    {
        Console.WriteLine("Account now");
        Console.WriteLine(account);
        Console.WriteLine();
    }
}

执行时应可以看见如下输出: enter image description here

我们希望这可以让你知道可以用一个Actor做些什么事…

NetMQBeacon实现了在局域网中点对点的发现服务.

一个beacon可以在局域网中通过UDP做广播或捕捉service announcements,你可以定义广播出去的beacon,也可以设置过滤器以过滤接收到的beacons.Beacons会在背景异步的执行发送及接收的动作.

我们可以使用NetMQBeacon自动地在网路中寻找及连接至其它NetMQ/CZMQ的服务而不需要一个中央的设置.请注意若要使用NetMQBeacon在你的架构中需要支援广播(broadcast)服务.而目前大部份的服端服务商并不支援.

这个实现使用IPv4 UDP广播,属于zbeacon from czmq并加上维护网路相容性的扩充函数.

示例:Implementing a Bus NetMQBeacon可以用来建立简单的bus系统,让一组节点仅需透过一个共享的端口号即可找到其它的节点.

每个bus的节点绑定至一个subscriber socket且靠publisher socket连接至其它节点. 每个节点会透过NetMQBeacon去公告它的存在及寻找其它节点,我们将使用NetMQActor来实作我们的节点. 示例在此:

Timer

一个NetMQTimer可以执行周期性的动作.Timer实体可以加至NetMQPoller中,且它的Elapsed事件会依指定的IntervalEnabled属性值被触发.

以下事件在poller执行线程中被唤起.

var timer = new NetMQTimer(TimeSpan.FromMilliseconds(100));

timer.Elapsed += (sender, args) => { /* handle timer event */ };

using (var poller = new NetMQPoller { timer })
{
    poller.Run();
}

Queue

NetMQQueue<T>是一个支持多个生产者及单一消费者的生产者/消费者队列.

你应该将队列加至NetMQPoller中,且在ReceiveReady事件中加上消费代码,而生产者会呼叫Enque(T)将数据加入.

此类别籍由将众多操作集结在单一执行线程中免去了撰写冗余代码的时间浪费.

using (var queue = new NetMQQueue<ICommand>())
using (var poller = new NetMQPoller { queue })
{
    queue.ReceiveReady += (sender, args) => ProcessCommand(queue.Dequeue());

    poller.RunAsync();

    // Then, from various threads...
    queue.Enqueue(new DoSomethingCommand());
    queue.Enqueue(new DoSomethingElseCommand());
}

Proactor

NetMQProactor会使用专有的执行线程处理在socket上收到的消息.

using (var receiveSocket = new DealerSocket

(">tcp://localhost:5555"))
using (var proactor = new NetMQProactor(receiveSocket, 
        (socket, message) => ProcessMessage(message)))
{
    // ...
}

在内部,proactor为socket建了一个NetMQPoller,以及一个NetMQActor处理poller执行线程及disposal.

通讯模式

Request - Response

Request/Response 应该是所有NetMQ socket 组合中最简单的一种了.

这不是说RequestSocket和ResponseSocket必须总是一起使用,只是会有很多时候你想将某一种socket和另一种socket一起使用.

有一些特定的socket的组合,刚好很适合在一起使用,而RequestSocket和ResponseSocket就是这样的一个模式.

可以无间配合的socket组合在ZeroMQ指南中皆已清楚描述.

Request / Response模式是两个NetMQ sockets协调工作的一个配置.这种组合类似于你在发出一个web request时看到的模式,也就是说,一方提出请求,且期望得到回应.

RequestSocketResponseSocket 是同步式、阻塞的,如果你试着以错误的顺序读取消息,会得到一个异常.

应该使用RequestSocketResponseSockets连接的方式如下:

  • 从RequestSocket传送消息
  • ResponseSocket读取请求的消息
  • ResponseSocket传送回应消息
  • RequestSocket接收来自ResponseSocket的消息

不你相信与否,你应该已看过这种范例很多次,因为它已是最简单的示范了.

其中RequestSocket和ResponseSockets可以都在同一个process中,也可以很容易地放在两个不同的process中.

我们尽可能保持简单以用于展示的目的.

using (var responseSocket = new ResponseSocket("@tcp://*:5555"))
using (var requestSocket = new RequestSocket(">tcp://localhost:5555"))
{
    Console.WriteLine("requestSocket : Sending 'Hello'");
    requestSocket.SendFrame("Hello");

    var message = responseSocket.ReceiveFrameString();

    Console.WriteLine("responseSocket : Server Received '{0}'", message);

    Console.WriteLine("responseSocket Sending 'World'");
    responseSocket.SendFrame("World");

    message = requestSocket.ReceiveFrameString();
    Console.WriteLine("requestSocket : Received '{0}'", message);

    Console.ReadLine();
}

输出如下: enter image description here

Request/Response 是阻塞式的

如上所述,RequestSocket和ResponseSocket是阻塞的,这意味着任何意外的发送或接呼叫将会导致异常.这里是这种异常的范例.

这个范例中我们试着在RequestSocket中执行两次Send().

或者这个范例,我们尝试执行RecieveString()两次,但只有一个消息从RequestSocket传送.

所以要小心用Request/Response模式.

魔鬼总在细节里.

Pub - Sub

发布/订阅(Publish/subscribe 或pub/sub)是一种消息规范,消息的传送者(发布者)不是计划传送其消息给特定的接收者(订阅者).而是发布的消息分为不同的类别,而不需要知道什么样的订阅者订阅.订阅者对一个或多个类别表达兴趣,于是只接收感兴趣的消息,而不需要知道什么样的发布者发布的消息.这种发布者和订阅者的解耦可以允许更好的可延伸性和更为动态的网络拓扑.

上述所谓的类别也可以当成是一个”主题”或”过滤器”.

NetMQ用两种socket类型支持Pub/Sub模式:

  • PublisherSocket
  • SubscriberSocket Topics 主题 ZeroMQ/NetMQ使用多段消息传送主题信息,可用byte数组来表示主题,或是字符串并加上适当的System.Text.Encoding.
// send a message on the 'status' topic
pub.SendMoreFrame("status").SendFrame("All is well");

订阅者使用SubscriberSocket的Subscribe函数指定他们有兴趣的主题.

// subscribe to the 'status' topic
sub.Subscribe("status");

Topic heirarchies 主题阶级 一个消息的主题会用prefix检查和订阅者的订阅主题比较.

也就是说,订阅主题的订阅者会接收具有主题的消息:

topic topic/subtopic topical 然而它不会接受这些主题:

topi TOPIC(记住,它是以byte做为比较方式) 使用prefix比对行为的结果,可以让你以空字符串来订阅所有发布的消息.

sub.Subscribe(""); // subscribe to all topics

范例 到了介绍范例的时间了,这范例很简单,且遵守下列规则:

  • 有一个发布者的process,会以500ms的时间随机发布主题为TopicA或’TopicB`的消息.
  • 可能会有很多订阅者,欲订阅的主题名称会以命令列参数代入程序中.

Publichser

using System;
using System.Threading;
using NetMQ;
using NetMQ.Sockets;

namespace Publisher
{
    class Program
    {
        static void Main(string[] args)
        {
            Random rand = new Random(50);

            using (var pubSocket = new PublisherSocket())
            {
                Console.WriteLine("Publisher socket binding...");
                pubSocket.Options.SendHighWatermark = 1000;
                pubSocket.Bind("tcp://localhost:12345");

                for (var i = 0; i < 100; i++)
                {
                    var randomizedTopic = rand.NextDouble();
                    if (randomizedTopic > 0.5)
                    {
                        var msg = "TopicA

msg-" + i;
                        Console.WriteLine("Sending message : {0}", msg);
                        pubSocket.SendMoreFrame("TopicA").SendFrame(msg);
                    }
                    else
                    {
                        var msg = "TopicB msg-" + i;
                        Console.WriteLine("Sending message : {0}", msg);
                        pubSocket.SendMoreFrame("TopicB").SendFrame(msg);
                    }

                    Thread.Sleep(500);
                }
            }
        }
    }
}

Subscriber

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;

namespace SubscriberA
{
    class Program
    {
        public static IList<string> allowableCommandLineArgs
            = new [] { "TopicA", "TopicB", "All" };

        static void Main(string[] args)
        {
            if (args.Length != 1 || !allowableCommandLineArgs.Contains(args[0]))
            {
                Console.WriteLine("Expected one argument, either " +
                                  "'TopicA', 'TopicB' or 'All'");
                Environment.Exit(-1);
            }

            string topic = args[0] == "All" ? "" : args[0];
            Console.WriteLine("Subscriber started for Topic : {0}", topic);

            using (var subSocket = new SubscriberSocket())
            {
                subSocket.Options.ReceiveHighWatermark = 1000;
                subSocket.Connect("tcp://localhost:12345");
                subSocket.Subscribe(topic);
                Console.WriteLine("Subscriber socket connecting...");
                while (true)
                {
                    string messageTopicReceived = subSocket.ReceiveFrameString();
                    string messageReceived = subSocket.ReceiveFrameString();
                    Console.WriteLine(messageReceived);
                }
            }
        }
    }
}

在这边提供三个批次档,让你方便执行,不过要稍微修改一下路径等一适合你的环境.

RunPubSub.bat

start RunPublisher.bat
start RunSubscriber "TopicA"
start RunSubscriber "TopicB"
start RunSubscriber "All"

RunPublisher.bat

cd Publisher\bin\Debug
Publisher.exe

RunSubscriber.bat

set "topic=%~1"
cd Subscriber\bin\Debug
Subscriber.exe %topic%

执行时输出如下: enter image description here

Other Considerations High water mark SendHighWaterMark / ReceiveHighWaterMark选项可设定指定socket的high water mark.High water mark是对未完成消息的最大数量的限制,NetMQ会将正在与指定的socket通讯的任何端点的消息排入队列中.

如果到达此限制,socket会进入异常状态,并且根据socket类型,NetMQ应采取适当的措施,如阻止或丢弃发送的消息.

预设的SendHighWaterMark / ReceiveHighWaterMark值为1000.零值表示“无限制”.

你也可以使用xxxSocket.Options属性值设定下列两个属性:

pubSocket.Options.SendHighWatermark = 1000;
pubSocket.Options.ReceiveHighWatermark = 1000;

Slow subscribers ZeroMQ指南有提到.

Late joining subscribers ZeroMQ指南有提到.

Push - Pull

NetMQ提供了PushSocket和PullSocket,这些是什么?要如何使用?

嗯,PushSocket一般是用来推送消息至PullSocket,而PullSocket是用来从PushSocket获取消息,听起来很对吧!

你通常使用这种设置的socket来生成一些分布式的工作,有点像divide and conquer的安排.

这个想法是,你有一些生成工作的东西,然后将工作分配给多个工人.工人每个都做一些工作,并将结果推送到其他工序(可能是一个执行线程),工人的产出在那里累积.

在ZeroMQ指南中,它显示了一个例子,其中work generator只是告诉每个工人睡眠一段时间.

我们试图创建一个比这更复杂的例子,但是最终觉得这个例子的简单性是相当重要的,所以我们让每个工人的工作量变成一个代入值,告诉工作休眠几毫秒(从而模拟一些实际工作).这个例子,正如我所说,是从ZeroMQ指南借来的.

在实际生活中,工作显然可以是任何事情,尽管你可能更希望工作是可以被切割和分发的,而工作生成器并不关心/知道有多少工人.

这里是我们试图实现的:

using System;
using NetMQ;

namespace Ventilator
{
    public class Program
    {
        public static void Main(string[] args)
        {
            // Task Ventilator
            // Binds PUSH socket to tcp://localhost:5557
            // Sends batch of tasks to workers via that socket
            Console.WriteLine("====== VENTILATOR ======");

            using (var sender = new PushSocket("@tcp://*:5557"))
            using (var sink = new PushSocket(">tcp://localhost:5558"))
            {
                Console.WriteLine("Press enter when worker are ready");
                Console.ReadLine();

                //the first message it "0" and signals start of batch
                //see the Sink.csproj Program.cs file for where this is used
                Console.WriteLine("Sending start of batch to Sink");
                sink.SendFrame("0");

                Console.WriteLine("Sending tasks to workers");

                //initialise random number generator
                Random rand = new Random(0);

                //expected costs in Ms
                int totalMs = 0;

                //send 100 tasks (workload for tasks, is just some random sleep time that
                //the workers can perform, in real life each work would do more than sleep
                for (int taskNumber = 0; taskNumber < 100; taskNumber++)
                {
                    //Random workload from 1 to 100 msec
                    int workload = rand.Next(0, 100);
                    totalMs += workload;
                    Console.WriteLine("Workload : {0}", workload);
                    sender.SendFrame(workload.ToString());
                }
                Console.WriteLine("Total expected cost : {0} msec", totalMs);
                Console.WriteLine("Press Enter to quit");
                Console.ReadLine();
            }
        }
    }
}


#### Push推 - Pull拉

NetMQ提供了`PushSocket`和`PullSocket`.

`PushSocket`通常用来推送消息至`PullSocket`,而`PullSocket`则用来从`PushSocket`获取消息.

你通常使用这种设置的socket来生成一些分布式的工作,有点像divide and conquer的安排.

这个想法是,有一些生成工作的东西,然后将工作分配给多个工人.工人每个都做一些工作,并将结果推送到其他工序(可能是一个执行线程),工人的产出在那里汇聚.

在`ZMQ`指南中,有一个例子,其中work generator只是告诉每个工人睡眠一段时间.

我们试图创建一个比这更复杂的例子,但是最终觉得这个例子的简单性是相当重要的,所以我们让每个工人的工作量变成一个代入值,告诉工作休眠几毫秒(从而模拟一些实际工作).这个例子,是从ZeroMQ指南借来的.

在现实生活中,工作当然可以是任何事情,尽管你可能更希望工作是可以被切割和分发的,而工作生成器并不关心/知道有多少工人.


```csharp
Ventilator
using System;
using NetMQ;

namespace Ventilator
{
    public class Program
    {
        public static void Main(string[] args)
        {
            // Task Ventilator
            // Binds PUSH socket to tcp://localhost:5557
            // Sends batch of tasks to workers via that socket
            Console.WriteLine("====== VENTILATOR ======");

            using (var sender = new PushSocket("@tcp://*:5557"))
            using (var sink = new PushSocket(">tcp://localhost:5558"))
            {
                Console.WriteLine("Press enter when worker are ready");
                Console.ReadLine();

                //the first message it "0" and signals start of batch
                //see the Sink.csproj Program.cs file for where this is used
                Console.WriteLine("Sending start of batch to Sink");
                sink.SendFrame("0");

                Console.WriteLine("Sending tasks to workers");

                //initialise random number generator
                Random rand = new Random(0);

                //expected costs in Ms
                int totalMs = 0;

                //send 100 tasks (workload for tasks, is just some random sleep time that
                //the workers can perform, in real life each work would do more than sleep
                for (int taskNumber = 0; taskNumber < 100; taskNumber++)
                {
                    //Random workload from 1 to 100 msec
                    int workload = rand.Next(0, 100);
                    totalMs += workload;
                    Console.WriteLine("Workload : {0}", workload);
                    sender.SendFrame(workload.ToString());
                }
                Console.WriteLine("Total expected cost : {0} msec", totalMs);
                Console.WriteLine("Press Enter to quit");
                Console.ReadLine();
            }
        }
    }
}

Worker

using System;
using System.Threading;
using NetMQ;

namespace Worker
{
    public class Program
    {
        public static void Main(string[] args)
        {
            // Task Worker
            // Connects PULL socket to tcp://localhost:5557
            // Collects workloads from ventilator via that socket
            Console.WriteLine("====== WORKER ======");

            using (var receiver = new PullSocket("@tcp://localhost:5557"))
            using (var sender = new PushSocket(">tcp://localhost:5558"))
            {
                Console.WriteLine("Worker ready, press any key to start");
                Console.ReadKey();

                Console.WriteLine("Working...");

                while (true)
                {
                    string workload = receiver.ReceiveFrameString();
                    //simulate doing some 'work'
                    Thread.Sleep(int.Parse(workload));

                    //send results to sink
                    sender.SendFrame(workload);
                }
            }
        }
    }
}

Sink

using System;
using System.Diagnostics;
using NetMQ;

namespace Sink
{
    public class Program
    {
        public static void Main(string[] args)
        {
            // Task Sink
            // Binds PULL socket to tcp://localhost:5558
            // Collects results from workers via that socket
            Console.WriteLine("====== SINK ======");

            using (var receiver = new PullSocket("@tcp://*:5558"))
            {
                //wait for start of batch
                string startOfBatchTrigger = receiver.ReceiveFrameString();
                Console.WriteLine("Received start of batch");

                //start our clock now
                Stopwatch watch = new Stopwatch();
                watch.Start();

                for (int taskNumber = 0; taskNumber < 100; taskNumber++)
                {
                    string workerDoneTrigger = receiver.ReceiveFrameString();
                    if ((taskNumber / 10) * 10 == taskNumber)
                    {
                        Console.Write(":");
                    }
                    else
                    {
                        Console.Write(".");
                    }
                }
                watch.Stop();
                Console.WriteLine();
                Console.WriteLine("Total elapsed time : {0} msec", watch.ElapsedMilliseconds);
                Console.WriteLine("Press Enter to quit");
                Console.ReadLine();
            }
        }
    }
}

以上是一个简单的Ventilator-Worker-Sink模型的实现,Ventilator生成任务并通过PushSocket发送给Worker,Worker接收任务并处理,处理完后通过PushSocket发送给Sink,Sink接收所有Worker的处理结果.

参考

https://gihomn.blogspot.com/2017/01/netmqing.html

Tags: c# ZeroMQ