前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【C++】开源:ZeroMQ消息中间件配置与使用

【C++】开源:ZeroMQ消息中间件配置与使用

作者头像
DevFrank
发布2024-07-24 15:26:58
4450
发布2024-07-24 15:26:58
举报
文章被收录于专栏:C++开发学习交流

😏1. ZMQ介绍

官网:https://zeromq.org/

Github:https://github.com/zeromq/libzmq

ZMQZeroMQ)是一种高性能的异步消息传递库,它可以在不同的进程和机器之间进行消息传递。它提供了多种传输协议、通信模式和编程语言支持,并且非常易于使用。

ZMQ 的核心思想是将网络通信抽象出来成为 socket 概念,使用不同类型的 socket 可以实现不同的消息传递模式,例如请求-应答模式、发布-订阅模式、推送-拉取模式等。ZMQ 提供了 TCP、IPC、inproc 等多种传输协议,可以根据需要选择合适的协议。

几种模式之间的区别和联系:

ZMQ 还提供了众多编程语言的封装,包括 C、C++、Python、Java 等,使得开发者可以方便地在各种平台上进行开发,并且具有很好的可扩展性和高效性。

总的来说,ZMQ 是一个轻量级、高效、灵活的消息传递库,适用于分布式系统、并发处理、网络爬虫等场景。

😊2. ZMQ安装

源码安装
代码语言:javascript
复制
sudo apt-get install libtool pkg-config build-essential autoconf automake
# 安装Sodium加密库
git clone https://github.com/jedisct1/libsodium.git
cd libsodium
./autogen.sh -s
./configure 
make check
sudo make install 
sudo ldconfig 
# 编译安装ZMQ核心库(ZMQ的核心库和C/C++依赖是分开的。)
git clone https://github.com/zeromq/libzmq
./autogen.sh
./configure
make check
sudo make install
sudo ldconfig
# 编译安装ZMQ的C依赖
git clone https://github.com/zeromq/czmq.git
cd czmq
./autogen.sh
./configure
make check
sudo make install
sudo ldconfig
编译方式:`gcc -lczmq -lzmq main.c -o main`
# 添加ZMQ的C++依赖,将头文件添加到系统目录即可
git clone https://github.com/zeromq/cppzmq.git
cd cppzmq
sudo cp zmq.hpp /usr/local/include/
apt安装
代码语言:javascript
复制
# 这种方式简单方便
sudo apt install libzmq3-dev libzmqpp-dev
# g++编译
g++ -o main main.cpp -lzmq

😆3. ZMQ入门案例

官方示例
代码语言:javascript
复制
git clone https://github.com/imatix/zguide.git
# C
cd zguide/examples/C
./build all
./hwserver
./hwclient
# C++
./build all
./hwserver
./hwclient

运行如下:

ZMQ支持多种模式和多种协议,常用的ZeroMQ URL格式如下:

代码语言:javascript
复制
TCP: "tcp://<address>:<port>"(使用TCP协议)
in-process: "inproc://<name>"(进程内通信)
inter-process: "ipc://<path>" (Unix系统) 或 "ipc://<name>" (Windows系统)(进程间通信)
多播: "epgm://<address>:<port>" (使用PGM协议) 或 "epub://<address>:<port>" (使用UDP协议)
请求-应答模式

server.cpp

代码语言:javascript
复制
#include <zmq.hpp>
#include <iostream>

int main() {
    // 创建上下文和套接字
    zmq::context_t context(1);
    zmq::socket_t socket(context, zmq::socket_type::rep);

    // 绑定到指定地址
    socket.bind("tcp://*:5555");

    while (true) {
        // 接收请求
        zmq::message_t request;
        socket.recv(request, zmq::recv_flags::none);

        // 打印请求内容
        std::cout << "Received request: " << std::string(static_cast<char*>(request.data()), request.size()) << std::endl;

        // 发送响应
        zmq::message_t response(5);
        memcpy(response.data(), "World", 5);
        socket.send(response, zmq::send_flags::none);
    }

    return 0;
}

client.cpp

代码语言:javascript
复制
#include <zmq.hpp>
#include <iostream>

int main() {
    // 创建上下文和套接字
    zmq::context_t context(1);
    zmq::socket_t socket(context, zmq::socket_type::req);

    // 连接到服务端地址
    socket.connect("tcp://localhost:5555");

    // 发送请求
    zmq::message_t request(5);
    memcpy(request.data(), "Hello", 5);
    socket.send(request, zmq::send_flags::none);

    // 接收响应
    zmq::message_t response;
    socket.recv(response, zmq::recv_flags::none);

    // 打印响应
    std::cout << "Received response: " << std::string(static_cast<char*>(response.data()), response.size()) << std::endl;

    return 0;
}
发布-订阅模式

publisher.cpp

代码语言:javascript
复制
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>

int main() {
    // 创建上下文和套接字
    zmq::context_t context(1);
    zmq::socket_t socket(context, zmq::socket_type::pub);

    // 绑定到指定地址
    socket.bind("tcp://*:5555");

    // 发布消息
    int count = 0;
    while (true) {
        std::string topic = "Topic";
        std::string message = "Message " + std::to_string(count);

        // 发布主题和消息
        zmq::message_t topicMsg(topic.size());
        memcpy(topicMsg.data(), topic.data(), topic.size());
        socket.send(topicMsg, zmq::send_flags::sndmore);

        zmq::message_t messageMsg(message.size());
        memcpy(messageMsg.data(), message.data(), message.size());
        socket.send(messageMsg, zmq::send_flags::none);

        std::cout << "Published: " << topic << " - " << message << std::endl;

        count++;
        sleep(1); // 每秒发布一条消息
    }

    return 0;
}

subscriber.cpp

代码语言:javascript
复制
#include <zmq.hpp>
#include <string>
#include <iostream>

int main() {
    // 创建上下文和套接字
    zmq::context_t context(1);
    zmq::socket_t socket(context, zmq::socket_type::sub);

    // 连接到发布者地址
    socket.connect("tcp://localhost:5555");

    // 订阅所有主题
    socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);

    // 接收并处理消息
    while (true) {
        // 接收主题
        zmq::message_t topicMsg;
        socket.recv(topicMsg, zmq::recv_flags::none);
        std::string topic(static_cast<char*>(topicMsg.data()), topicMsg.size());

        // 接收消息
        zmq::message_t messageMsg;
        socket.recv(messageMsg, zmq::recv_flags::none);
        std::string message(static_cast<char*>(messageMsg.data()), messageMsg.size());

        std::cout << "Received: " << topic << " - " << message << std::endl;
    }

    return 0;
}
推送-拉取模式

pusher.cpp

代码语言:javascript
复制
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>

int main() {
    // 创建上下文和套接字
    zmq::context_t context(1);
    zmq::socket_t socket(context, zmq::socket_type::push);

    // 绑定到指定地址
    socket.bind("tcp://*:5555");

    // 发送消息
    int count = 0;
    while (true) {
        std::string message = "Message " + std::to_string(count);

        // 发送消息
        zmq::message_t messageMsg(message.size());
        memcpy(messageMsg.data(), message.data(), message.size());
        socket.send(messageMsg, zmq::send_flags::none);

        std::cout << "Pushed: " << message << std::endl;

        count++;
        sleep(1); // 每秒推送一条消息
    }

    return 0;
}

puller.cpp

代码语言:javascript
复制
#include <zmq.hpp>
#include <string>
#include <iostream>

int main() {
    // 创建上下文和套接字
    zmq::context_t context(1);
    zmq::socket_t socket(context, zmq::socket_type::pull);

    // 连接到推送者地址
    socket.connect("tcp://localhost:5555");

    // 接收消息
    while (true) {
        zmq::message_t messageMsg;
        socket.recv(messageMsg, zmq::recv_flags::none);
        std::string message(static_cast<char*>(messageMsg.data()), messageMsg.size());

        std::cout << "Pulled: " << message << std::endl;
    }

    return 0;
}
进程间通信示例

sender.cpp

代码语言:javascript
复制
#include <zmq.hpp>
#include <string>
#include <iostream>

int main() {
    // 创建上下文和套接字
    zmq::context_t context(1);
    zmq::socket_t socket(context, zmq::socket_type::push);

    // 连接到接收者的地址
    socket.connect("ipc:///tmp/zmq_ipc_example");

    // 发送消息
    std::string message = "Hello from sender!";
    zmq::message_t messageMsg(message.size());
    memcpy(messageMsg.data(), message.data(), message.size());
    socket.send(messageMsg, zmq::send_flags::none);

    return 0;
}

receiver.cpp

代码语言:javascript
复制
#include <zmq.hpp>
#include <string>
#include <iostream>

int main() {
    // 创建上下文和套接字
    zmq::context_t context(1);
    zmq::socket_t socket(context, zmq::socket_type::pull);

    // 绑定到指定地址
    socket.bind("ipc:///tmp/zmq_ipc_example");

    // 接收消息
    zmq::message_t messageMsg;
    socket.recv(messageMsg, zmq::recv_flags::none);
    std::string message(static_cast<char*>(messageMsg.data()), messageMsg.size());

    std::cout << "Received message: " << message << std::endl;

    return 0;
}
Router-Dealer消息路由

Router 模式是 ZeroMQ 中的一种复杂通信模式,用于创建灵活的消息路由系统。在 Router 模式下,ROUTER套接字可以接收来自多个客户端的请求,并将这些请求分发给多个工作线程或服务DEALER套接字。

Router-Dealer 通信模式可以用于实现负载均衡、消息路由和复杂的请求-响应模式,非常适合需要多个客户端和多个服务端进行交互的场景。

server.cpp

代码语言:javascript
复制
#include <zmq.hpp>
#include <iostream>
#include <string>

int main() {
    zmq::context_t context(1);
    zmq::socket_t router(context, ZMQ_ROUTER);

    // 绑定到端口 5555
    router.bind("tcp://*:5555");

    while (true) {
        zmq::message_t identity;
        zmq::message_t message;

        // 接收来自 Dealer 的身份和消息
        router.recv(&identity);
        router.recv(&message);

        std::string identity_str = std::string(static_cast<char*>(identity.data()), identity.size());
        std::string message_str = std::string(static_cast<char*>(message.data()), message.size());

        std::cout << "Received message from " << identity_str << ": " << message_str << std::endl;

        // 回复消息给 Dealer
        std::string reply_message = "Hello from Router";
        zmq::message_t reply(reply_message.size());
        memcpy(reply.data(), reply_message.c_str(), reply_message.size());

        router.send(identity, ZMQ_SNDMORE);
        router.send(reply);
    }

    return 0;
}

client.cpp

代码语言:javascript
复制
// g++ -o client client.cpp -lzmq -lpthread
#include <zmq.hpp>
#include <iostream>
#include <string>
#include <thread>

void client_task() {
    zmq::context_t context(1);
    zmq::socket_t dealer(context, ZMQ_DEALER);

    // 连接到 Router
    dealer.connect("tcp://localhost:5555");

    std::string identity = "Client1";

    // 设置 Dealer 的身份
    dealer.setsockopt(ZMQ_IDENTITY, identity.c_str(), identity.size());

    // 发送消息给 Router
    std::string request_message = "Hello from Dealer";
    zmq::message_t request(request_message.size());
    memcpy(request.data(), request_message.c_str(), request_message.size());

    dealer.send(request);

    // 接收 Router 的回复
    zmq::message_t reply;
    dealer.recv(&reply);

    std::string reply_str = std::string(static_cast<char*>(reply.data()), reply.size());

    std::cout << "Received reply from Router: " << reply_str << std::endl;
}

int main() {
    std::thread client_thread(client_task);
    client_thread.join();

    return 0;
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-06-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 😏1. ZMQ介绍
  • 😊2. ZMQ安装
    • 源码安装
      • apt安装
      • 😆3. ZMQ入门案例
        • 官方示例
          • 请求-应答模式
            • 发布-订阅模式
              • 推送-拉取模式
                • 进程间通信示例
                  • Router-Dealer消息路由
                  相关产品与服务
                  负载均衡
                  负载均衡(Cloud Load Balancer,CLB)提供安全快捷的四七层流量分发服务,访问流量经由 CLB 可以自动分配到多台后端服务器上,扩展系统的服务能力并消除单点故障。轻松应对大流量访问场景。 网关负载均衡(Gateway Load Balancer,GWLB)是运行在网络层的负载均衡。通过 GWLB 可以帮助客户部署、扩展和管理第三方虚拟设备,操作简单,安全性强。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档