操作场景
本文以 Node.js 客户端为介绍使用 VPC 网络接入消息队列 CKafka 版并收发消息的过程。
前提条件
下载 Demo
已参考 SDK 概述,获取相关的客户端连接参数
操作步骤
准备工作
1. 将下载的 Demo 中的 nodejskafkademo 上传至 Linux 服务器。
2. 登录 Linux 服务器,进入 nodejskafkademo 目录。
步骤1:安装 C++ 依赖库
1. 执行以下命令切换到 yum 源配置目录
/etc/yum.repos.d/。cd /etc/yum.repos.d/
2. 创建 yum 源配置文件 confluent.repo。
[Confluent.dist]name=Confluent repository (dist)baseurl=https://packages.confluent.io/rpm/5.1/7gpgcheck=1gpgkey=https://packages.confluent.io/rpm/5.1/archive.keyenabled=1[Confluent]name=Confluent repositorybaseurl=https://packages.confluent.io/rpm/5.1gpgcheck=1gpgkey=https://packages.confluent.io/rpm/5.1/archive.keyenabled=1
3. 执行以下命令安装 C++ 依赖库。
yum install librdkafka-devel
步骤2:安装 Node.js 依赖库
1. 执行以下命令为预处理器指定 OpenSSL 头文件路径。
export CPPFLAGS=-I/usr/local/opt/openssl/include
2. 执行以下命令为连接器指定 OpenSSL 库路径。
export LDFLAGS=-L/usr/local/opt/openssl/lib
3. 执行以下命令安装 Node.js 依赖库。
npm install i --unsafe-perm node-rdkafka
步骤3:准备配置
创建消息队列 CKafka 版配置文件 setting.js。
module.exports = {'bootstrap_servers': ["xxx.xx.xxx:xxxx"],'topic_name': 'xxx','group_id': 'xxx'}
参数 | 描述 |
bootstrap_servers | 接入网络,在控制台的实例基本信息页面接入方式模块的网络列复制。 |
topic_name | Topic 名称,您可以在控制台上 Topic 列表 页面复制。 |
group_id | 您可以自定义设置,Demo 运行成功后可以在 Consumer Group 页面看到该消费者。 |
步骤4:发送消息
1. 编写生产消息程序 producer.js。
const Kafka = require('node-rdkafka');const config = require('./setting');console.log("features:" + Kafka.features);console.log(Kafka.librdkafkaVersion);var producer = new Kafka.Producer({'api.version.request': 'true',// 设置入口服务,请通过控制台获取对应的服务地址。'bootstrap.servers': config['bootstrap_servers'],'dr_cb': true,'dr_msg_cb': true,// 请求发生错误时重试次数,建议将该值设置为大于0,失败重试最大程度保证消息不丢失'retries': '0',// 发送请求失败时到下一次重试请求之间的时间"retry.backoff.ms": 100,// producer 网络请求的超时时间。'socket.timeout.ms': 6000,});var connected = falseproducer.setPollInterval(100);producer.connect();producer.on('ready', function() {connected = trueconsole.log("connect ok")});producer.on("disconnected", function() {connected = false;producer.connect();})producer.on('event.log', function(event) {console.log("event.log", event);});producer.on("error", function(error) {console.log("error:" + error);});function produce() {try {producer.produce(config['topic_name'],null,new Buffer('Hello CKafka Default'),null,Date.now());} catch (err) {console.error('Error occurred when sending message(s)');console.error(err);}}producer.on('delivery-report', function(err, report) {console.log("delivery-report: producer ok");});producer.on('event.error', function(err) {console.error('event.error:' + err);})setInterval(produce, 1000, "Interval");
2. 执行以下命令发送消息。
node producer.js
3. 查看运行结果。


4. 在 CKafka 控制台 的 Topic 列表页面,选择对应的 Topic,单击更多 > 消息查询,查看刚刚发送的消息。

步骤5:订阅消息
1. 创建消费消息程序 consumer.js。
const Kafka = require('node-rdkafka');const config = require('./setting');console.log(Kafka.features);console.log(Kafka.librdkafkaVersion);console.log(config)var consumer = new Kafka.KafkaConsumer({'api.version.request': 'true',// 设置入口服务,请通过控制台获取对应的服务地址。'bootstrap.servers': config['bootstrap_servers'],'group.id' : config['group_id'],// 使用 Kafka 消费分组机制时,消费者超时时间。当 Broker 在该时间内没有收到消费者的心跳时,// 认为该消费者故障失败,Broker 发起重新 Rebalance 过程。'session.timeout.ms': 10000,// 客户端请求超时时间,如果超过这个时间没有收到应答,则请求超时失败'metadata.request.timeout.ms': 305000,// 设置客户端内部重试间隔。'reconnect.backoff.max.ms': 3000});consumer.connect();consumer.on('ready', function() {console.log("connect ok");consumer.subscribe([config['topic_name']]);consumer.consume();})consumer.on('data', function(data) {console.log(data);});consumer.on('event.log', function(event) {console.log("event.log", event);});consumer.on('error', function(error) {console.log("error:" + error);});consumer.on('event', function(event) {console.log("event:" + event);});
2. 执行以下命令消费消息。
node consumer.js
3. 查看运行结果


4. 在 CKafka 控制台Consumer Group 页面,选择对应的消费组,在主题名称输入 Topic 名称,单击查看详情,查看消费详情。
