前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布

ZMQ

作者头像
为为为什么
发布2024-09-26 09:28:06
840
发布2024-09-26 09:28:06
举报
文章被收录于专栏:又见苍岚

ZMQ(ZeroMQ)是一个开源的库,用于在应用程序中实现消息传递, 本文记录相关内容。

ZMQ

ZeroMQ (也称为 ØMQ,0MQ,或 zmq)看起来像一个可嵌入的网络库,但其作用类似于并发框架。它提供了跨进程、进程间、 TCP 和多播等各种传输方式携带原子消息的套接字。您可以使用诸如扇出、发布-订阅、任务分配和请求-应答等模式将套接字 N 到 N 连接起来。它的速度足以成为集群产品的结构。它的异步 I/O 模型为您提供了可伸缩的多核应用程序,构建为异步消息处理任务。它有许多语言 API,并且运行在大多数操作系统上。

它被设计为类似于socket的API,但其运作方式更像是消息队列(message queue)或企业消息传递系统(enterprise messaging system)。

特点

  1. 高级抽象:ZMQ提供高级的抽象,使得消息传递变得简单,无需担心底层网络细节。
  2. 模式多样:支持多种通信模式,如请求-应答(request-reply)、发布-订阅(publish-subscribe)、推-拉(push-pull)等。
  3. 跨平台:可以在多种操作系统和编程语言上使用。
  4. 性能优异:经过优化,具有很高的消息吞吐量。
  5. 无中心:ZMQ不需要一个中心节点,每个节点既是客户端也是服务器。

通信方式: ZeroMQ的三种通信模式分别是:Request-ReplyPublisher-subscriber, Parallel Pipeline

官方网站https://zeromq.org/

PyZMQhttps://pyzmq.readthedocs.io/en/latest/

Python 安装

PyZMQ 需要在 python 3.7 及以上的版本上使用,当前(2024.09)最新版本为 26.2

  • 安装 zmq
代码语言:txt
复制
pip install pyzmq

Request-Reply(应答模式)

应答模式特点:

  1. 客户端提出请求,服务端必须回答请求,每个请求只回答一次
  2. 客户端没有收到答复前,不能再次进行请求
  3. 可以有多个客户端提出请求,服务端能保证各个客户端只接收到自己的答复
代码语言:txt
复制
1.  如果服务端断掉或者客户端断掉会产生怎样的影响?  如果是客户端断掉,对服务端没有任何影响,如果客户端随后又重新启动,那么两方继续一问一答,但是如果是服务端断掉了,就可能会产生一些问题,这要看服务端是在什么情况下断掉的,如果服务端收是在回答完问题后断掉的,那么没影响,重启服务端后,双发继续一问一答,但如果服务端是在收到问题后断掉了,还没来得及回答问题,这就有问题了,那个提问的客户端迟迟得不到答案,就会一直等待答案,因此不会再发送新的提问,服务端重启后,客户端迟迟不发问题,所以也就一直等待提问。
 
python 实现 1
  • 客户端和服务端代码如下:

zmq_server.py

代码语言:txt
复制
import zmq


context = zmq.Context()            #创建上下文
socket = context.socket(zmq.REP)   #创建Response服务端socket
socket.bind("tcp://*:5555")        #socket绑定,*表示本机ip,端口号为5555,采用tcp协议通信

while True:
    message = socket.recv()
    print(type(message))          #接收到的消息也会bytes类型(字节)
    print("收到消息:{}".format(message))
    socket.send(b"new message")   #发送消息,字节码消息

zmq_client.py

代码语言:txt
复制
#coding:utf-8

import zmq

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")

socket.send(b"A message")
response = socket.recv()
print(response)

常用数据发送API如下:

代码语言:txt
复制
#发送数据
socket.send_json(data)   #data 会被json序列化后进行传输 (json.dumps)
socket.send_string(data, encoding="utf-8")   #data为unicode字符串,会进行编码成子节再传输
socket.send_pyobj(obj)    #obj为python对象,采用pickle进行序列化后传输
socket.send_multipart(msg_parts)   # msg_parts, 发送多条消息组成的迭代器序列,每条消息是子节类型,
                                    # 如[b"message1", b"message2", b"message2"]

#接收数据
socket.recv_json()
socket.recv_string()
socket.recv_pyobj()
socket.recv_multipart()

python 实现 2

sever.py

代码语言:txt
复制
import zmq
import sys
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
while True:
    try:
        print("wait for client ...")
        message = socket.recv()
        print("message from client:", message.decode('utf-8'))
        socket.send(message)
    except Exception as e:
        print('异常:',e)
        sys.exit()

client.py

代码语言:txt
复制
import zmq
import sys
context = zmq.Context()
print("Connecting to server...")
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
while True:

    input1 = input("请输入内容:").strip()
    if input1 == 'b':
        sys.exit()
    socket.send(input1.encode('utf-8'))

    message = socket.recv()
    print("Received reply: ", message.decode('utf-8'))

Publisher-Subscriber (发布-订阅模式)

publiser广播消息到所有客户端,客户端根据订阅主题过滤消息。

广播所有client,没有队列缓存,断开连接数据将永远丢失。

PUB发送,send。SUB接收,recv。和PUSH-PULL模式不同,PUB将消息同时发给和他建立的链接,类似于广播。另外发布订阅模式也可以使用订阅过滤来实现只接收特定的消息。订阅过滤是在服务器上进行过滤的,如果一个订阅者设定了过滤,那么发布者将只发布满足他订阅条件的消息。

这个就是广播和收听的关系。PUB-SUB模式虽然没有使用网络的广播功能,但是它内部是异步的。也就是一次发送没有结束立刻开始下一次发送。

广播所有client,没有队列缓存,断开连接数据将永远丢失。client可以进行数据过滤。

Python 实现

python实现代码如下, 其中publisher发布两条消息,第一条消息的topic为client1, 被第一个subscriber接收到;第二条消息的topic为client2, 被第二个subscriber接收到。

注意的是 subscriber 在匹配时,并不是完全匹配的,消息的topic为client1开头的字符串都会被匹配到,如果topic为"client1cient2", 也会被第一个subscriber接收到

server.py

代码语言:txt
复制
import zmq
import time
import sys
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")

while True:
    msg = input("请输入要发布的信息:").strip()
    if msg == 'b':
        sys.exit()
    socket.send(msg.encode('utf-8'))
    time.sleep(1)

client1.py

代码语言:txt
复制
import zmq


context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt(zmq.SUBSCRIBE,''.encode('utf-8'))  # 接收所有消息
while True:
    response = socket.recv().decode('utf-8');
    print("response: %s" % response)

client2.py

代码语言:txt
复制
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt(zmq.SUBSCRIBE,'123'.encode('utf-8'))  # 消息过滤  只接受123开头的信息
while True:
    response = socket.recv().decode('utf-8');
    print("response: %s" % response)

C++ 实现

server:

代码语言:txt
复制
#include <zmq.h>
#include <stdio.h>
#include <stdlib.h>
#include "zmq_helper.h"

int main(void)
{
    void * context = zmq_ctx_new();     void * socket = zmq_socket(context, ZMQ_PUB);
    zmq_bind(socket, "tcp://*:5556");

    srandom((unsigned)time(NULL));

    while(1)
    {
        int zipcode = randof(100000);   // 邮编: 0 ~ 99999
        int temp = randof(84) - 42;     // 温度: -42 ~ 41
        int relhumidity = randof(50) + 10;  // 相对湿度: 10 ~ 59

        char msg[20];
        snprintf(msg, sizeof(msg), "%5d %d %d", zipcode, temp, relhumidity);
        s_send(socket, msg);
    }

    zmq_close(socket);
    zmq_ctx_destroy(context);

    return 0;
}

client:

代码语言:txt
复制
#include <zmq.h>
#include <stdio.h>
#include "zmq_helper.h"

int main(void)
{
    void * context = zmq_ctx_new();
    void * socket = zmq_socket(context, ZMQ_SUB);
    zmq_connect(socket, "tcp://localhost:5556");

    char * zipcode = "10001";
    zmq_setsockopt(socket, ZMQ_SUBSCRIBE, zipcode, strlen(zipcode));

    for(int i = 0; i < 50; ++i)
    {
        char * string = s_recv(socket);
        printf("[Subscriber] Received weather report msg: %s\n", string);
        free(string);
    }

    zmq_close(socket);
    zmq_ctx_destroy(context);
    
    return 0;
}

  1. ZMQ_PUB类型的socket, 如果没有任何client与其相连, 其所有消息都将被简单就地抛弃
  2. ZMQ_SUB类型的socket, 即是client, 可以与多个ZMQ_PUB类型的socket相连, 即村民可以同时收听多个msg 但必须为每个msg都设置过滤器. 否则默认情况下, zmq认为client不关心msg里的所有内容.
  3. 当一个cline收听多个时, 接收消息采用公平队列策略
  4. 如果存在至少一个clint在收听, 那么这个消息就不会被随意抛弃: 这句话的意思是, 当消息过多, 而client的消化能力比较低的话, 未发送的消息会缓存在msg里.
  5. 在ZMQ大版本号在3以上的版本里, 当msg与client的速度不匹配时. 若使用的传输层协议是tcpipc这种面向连接的协议, 则堆积的消息缓存在里, 当使用epgm这种协议时, 堆积的消息缓存了client里. 在ZMQ 大版本号为2的版本中, 所有情况下, 消息都将堆积在clinet里

Parallel Pipeline(管道模型)

由三部分组成,push进行数据推送,work进行数据缓存,pull进行数据竞争获取处理。区别于Publish-Subscribe存在一个数据缓存和处理负载。

当连接被断开,数据不会丢失,重连后数据继续发送到对端。

分治套路里有三个角色:

  1. Ventilator. 包工头, 向手下各个工程队分派任务. (一个)
  2. Worker. 工程队, 从包工头里接收任务, 干活. (多个)
  3. Sink. 甲方监理, 工程队干完活后, 向甲方监理报告. 所以工程队的活干完之后, 监理统一收集所有工程队的成果. (一个)
Python 实现

server.py

代码语言:txt
复制
import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5557")

while True:
    msg = input("请输入要发布的信息:").strip()
    socket.send(msg.encode('utf-8'))
    print("已发送")
    time.sleep(1)

worker.py

代码语言:txt
复制
import zmq
context = zmq.Context()
receive = context.socket(zmq.PULL)
receive.connect('tcp://127.0.0.1:5557')
sender = context.socket(zmq.PUSH)
sender.connect('tcp://127.0.0.1:5558')

while True:
    data = receive.recv()
    print("正在转发...")
    sender.send(data)

client.py

代码语言:txt
复制
import zmq
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind("tcp://*:5558")

while True:
    response = socket.recv().decode('utf-8')
    print("response: %s" % response)

C++ 实现

包工头代码:

代码语言:txt
复制
#include <zmq.h>
#include <stdio.h>
#include <time.h>
#include "zmq_helper.h"

int main(void)
{
    void * context = zmq_ctx_new();
    void * socket_to_sink = zmq_socket(context, ZMQ_PUSH);
    void * socket_to_worker = zmq_socket(context, ZMQ_PUSH);
    zmq_connect(socket_to_sink, "tcp://localhost:5558");
    zmq_bind(socket_to_worker, "tcp://*:5557");

    printf("Press Enter when all workers get ready:");
    getchar();
    printf("Sending tasks to workers...\n");

    s_send(socket_to_sink, "Get ur ass up");    // 通知监理, 干活了

    srandom((unsigned)time(NULL));

    int total_ms = 0;
    for(int i = 0; i < 100; ++i)
    {
        int workload = randof(100) + 1;     // 工作需要的耗时, 单位ms
        total_ms += workload;
        char string[10];
        snprintf(string, sizeof(string), "%d", workload);
        s_send(socket_to_worker, string);   // 将工作分派给工程队
    }

    printf("Total expected cost: %d ms\n", total_ms);

    zmq_close(socket_to_sink);
    zmq_close(socket_to_worker);
    zmq_ctx_destroy(context);

    return 0;
}

工程队代码:

代码语言:txt
复制
#include <zmq.h>
#include <stdio.h>
#include "zmq_helper.h"

int main(void)
{
    void * context = zmq_ctx_new();
    void * socket_to_ventilator = zmq_socket(context, ZMQ_PULL);
    void * socket_to_sink = zmq_socket(context, ZMQ_PUSH);
    zmq_connect(socket_to_ventilator, "tcp://localhost:5557");
    zmq_connect(socket_to_sink, "tcp://localhost:5558");

    while(1)
    {
        char * msg = s_recv(socket_to_ventilator);
        printf("Received msg: %s\n", msg);
        fflush(stdout);
        s_sleep(atoi(msg));     // 干活, 即睡眠指定毫秒
        free(msg);
        s_send(socket_to_sink, "DONE"); // 活干完了通知监理
    }

    zmq_close(socket_to_ventilator);
    zmq_close(socket_to_sink);
    zmq_ctx_destroy(context);

    return 0;
}

监理代码:

代码语言:txt
复制
#include <zmq.h>
#include <stdio.h>
#include "zmq_helper.h"

int main(void)
{
    void * context = zmq_ctx_new();
    void * socket_to_worker_and_ventilator = zmq_socket(context, ZMQ_PULL);
    zmq_bind(socket_to_worker_and_ventilator, "tcp://*:5558");

    char * msg = s_recv(socket_to_worker_and_ventilator);
    printf("Received msg: %s", msg);    // 接收来自包工头的开始干活的消息
    free(msg);

    int64_t start_time = s_clock();

    for(int i = 0; i < 100; ++i)
    {
        // 接收100个worker干完活的消息
        char * msg = s_recv(socket_to_worker_and_ventilator);
        free(msg);

        if(i / 10 * 10 == i)
            printf(":");
        else
            printf(".");
        fflush(stdout);
    }

    printf("Total elapsed time: %d ms]\n", (int)(s_clock() - start_time));

    zmq_close(socket_to_worker_and_ventilator);
    zmq_ctx_destroy(context);

    return 0;
}

这个示例程序的逻辑流程是这样的:

  1. 包工头向两个角色发送消息: 向工程队发送共计100个任务, 向监理发送消息, 通知监理开始干活
  2. 工程队接收来自包工头的消息, 并按消息里的数值, 睡眠指定毫秒. 每个任务结束后都通知监理.
  3. 监理先是接收来自包工头的消息, 开始计时. 然后统计来自工程队的消息, 当收集到100个任务完成的消息后, 计算实际耗时.

包工头里输出的预计耗时是100个任务的共计耗时, 在监理那里统计的实际耗时则是由多个工程队并行处理100个任务实际的耗时.

这里个例子中需要注意的点有:

  1. 这个例子中使用了ZMQ_PULLZMQ_PUSH两种socket. 分别供消息分发方与消息接收方使用. 看起来略微有点类似于发布-订阅套路, 具体之间的区别后续章节会讲到.
  2. 工程队上接包工头, 下接监理. 在任务执行过程中, 你可以随意的增加工程队的数量.
  3. 我们通过让包工头通知监理, 以及手动输入enter来启动任务分发的方式, 手动同步了工程队/包工头/监理. PUSH/PULL模式虽然和PUB/SUB不一样, 不会丢失消息. 但如果不手动同步的话, 最先建立连接的工程队将几乎把所有任务都接收到手, 导致后续完成连接的工程队拿不到任务, 任务分配不平衡.
  4. 包工头分派任务使用的是轮流/平均分配的方式.这是一种简单的负载均衡
  5. 监理接收多个工程队的消息, 使用的是公平队列策略.

参考资料

文章链接:

https://cloud.tencent.com/developer/article/2453741

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-9-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ZMQ
    • Python 安装
    • Request-Reply(应答模式)
      • python 实现 1
        • python 实现 2
        • Publisher-Subscriber (发布-订阅模式)
          • Python 实现
            • C++ 实现
            • Parallel Pipeline(管道模型)
              • Python 实现
                • C++ 实现
                • 参考资料
                相关产品与服务
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档