librdkafka 是一个高性能的 C/C++ Kafka 客户端库,提供了可靠的消息传递机制和丰富的功能。要使用 librdkafka,需要将其安装到系统中。
安装 librdkafka 的步骤:
克隆 librdkafka 仓库:使用 Git 克隆 librdkafka 的代码仓库到本地。
git clone https://github.com/edenhill/librdkafka.git
切换到指定版本 (推荐):为了保证稳定性和兼容性,建议切换到特定的 librdkafka 版本【可查看 librdkafka 的 Releases 页面 (https://github.com/edenhill/librdkafka/releases) 选择一个稳定的版本】。 这里以 v1.7.0
为例:
cd librdkafka git checkout v1.7.0
配置编译环境:进入 librdkafka
目录后,执行 ./configure
命令来配置编译环境。 这个命令会检查系统依赖项,并生成 Makefile 文件。
./configure
如果需要指定安装路径或启用/禁用某些功能,可以使用 ./configure
命令的选项。 例如,要将 librdkafka 安装到 /opt/librdkafka
目录,可以使用以下命令:
./configure --prefix=/opt/librdkafka
配置完成后,执行 make
命令来编译 librdkafka(这个过程可能需要一些时间):
make
编译完成后,使用 sudo make install
命令将 librdkafka 安装到系统中。 需要管理员权限才能执行此命令。
sudo make install
最后,执行 sudo ldconfig
命令来更新动态链接器缓存。 这可以确保系统能够找到新安装的 librdkafka 库。
sudo ldconfig
如果在步骤 3 中指定了非标准安装路径,需要将该路径添加到 /etc/ld.so.conf
文件中,然后再运行 sudo ldconfig
。
安装完成后,librdkafka 就已经成功安装到系统中了。
示例程序和参数:在 librdkafka
的 examples
目录下,包含了一些示例程序,可以帮助快速了解如何使用 librdkafka。 例如,consumer
示例程序演示了如何从 Kafka 集群消费消息。
要运行 consumer
示例程序,需要指定以下参数:
% Usage: ./consumer <broker> <group.id> <topic1> <topic2>..
<broker>
: Kafka Broker 的地址,例如 localhost:9092
。 可以指定多个 Broker 地址,用逗号分隔。
<group.id>
: 消费者组的 ID。 同一个消费者组中的消费者会协同消费 Kafka topic 的不同分区。
<topic1> <topic2> ...
: 要订阅的 Kafka Topic 的名称。 可以指定多个 Topic,用空格分隔。
以下是一个运行 consumer
示例程序的例子:
./consumer localhost:9092 my-group test-topic
这个命令会启动一个 consumer
实例,连接到 localhost:9092
的 Kafka Broker,加入 my-group
消费者组,并订阅 test-topic
Topic。
了解 librdkafka 中常用的缩略语可以更好地理解代码和文档。 下表列出了一些常见的缩略语及其含义:
缩略语 | 缩略语全称 | 示例或说明 |
---|---|---|
rd | Rapid Development |
|
rk | RdKafka | 代表 librdkafka 的主要结构体,用于创建 Kafka 客户端实例。 |
toppar | Topic Partition |
|
rep | Reply |
|
msgq | Message Queue |
|
rkb | RdKafka Broker | Kafka 代理服务器。 |
rko | RdKafka Operation | Kafka 操作,例如发送消息、获取元数据等。 |
rkm | RdKafka Message | Kafka 消息,包含了消息的内容、Headers 和元数据。 |
payload | 存在 Kafka 上的消息内容(或叫 Log)。 是 Kafka 消息体的内容,也就是实际要传输的数据。 |
在开始使用 Kafka 之前,需要先启动 ZooKeeper 和 Kafka 服务。 ZooKeeper 用于管理 Kafka 集群的元数据,Kafka 则负责消息的存储和传递。
ZooKeeper 可以独立搭建集群,提供更高的可用性和容错性。 但为了简化演示,这里可以直接使用 Kafka 自带的 ZooKeeper 服务。
进入 Kafka 安装目录的 bin
目录。 这个目录包含了启动和管理 Kafka 服务的脚本。 例如 Kafka 安装目录是 /opt/kafka
,那么可以使用以下命令进入 bin
目录:
cd /opt/kafka/bin
在 bin
目录下,可以使用 zookeeper-server-start.sh
脚本来启动 ZooKeeper 服务。
前台运行:使用以下命令可以在前台运行 ZooKeeper 服务。 前台运行会直接在终端显示 ZooKeeper 的日志输出,方便查看运行状态。
sh zookeeper-server-start.sh ../config/zookeeper.properties
后台运行 (推荐):使用 -daemon
参数可以在后台运行 ZooKeeper 服务。 这样可以避免终端被 ZooKeeper 服务占用。
sh zookeeper-server-start.sh -daemon ../config/zookeeper.properties
参数解释:
zookeeper-server-start.sh
: 启动 ZooKeeper 服务的脚本。
-daemon
: 指定在后台运行服务。
../config/zookeeper.properties
: ZooKeeper 的配置文件。 该文件包含了 ZooKeeper 服务的端口、数据存储路径等配置信息。
使用 lsof
命令来检查 ZooKeeper 服务是否已经成功启动,并监听了 2181 端口 (默认端口)。
lsof -i:2181
如果 ZooKeeper 服务已经启动成功,会看到类似以下的输出:
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 74930 fly 96u IPv6 734467 0t0 TCP *:2181 (LISTEN)
这表示 Java 进程正在监听 2181 端口,ZooKeeper 服务运行正常。
ZooKeeper 启动后,就可以启动 Kafka 服务了。
进入 Kafka 的 bin
目录 (如果还没有进入):
cd /opt/kafka/bin
使用 kafka-server-start.sh
脚本来启动 Kafka 服务。
sh kafka-server-start.sh -daemon ../config/server.properties
参数解释:
kafka-server-start.sh
: 启动 Kafka 服务的脚本。
-daemon
: 指定在后台运行服务。
../config/server.properties
: Kafka 服务的配置文件。 该文件包含了 Kafka 服务的 Broker ID、监听端口、ZooKeeper 连接地址等配置信息。
默认情况下,Kafka 服务会监听 9092 端口。 可以在 server.properties
文件中修改端口配置。
使用 jps
命令查看 Kafka 服务进程是否存在。
jps
如果 Kafka 服务已经启动成功,会看到类似以下的输出:
[PID] Kafka [PID] QuorumPeerMain
其中 [PID]
是进程 ID,Kafka
表示 Kafka 服务进程,QuorumPeerMain
表示 ZooKeeper 服务进程(如果使用 Kafka 自带的 ZooKeeper)。
在 Kafka 中,消息是按照 Topic 进行组织的。 在开始生产和消费消息之前,需要先创建 Topic。
进入 Kafka 的 bin
目录 (如果还没有进入):
cd /opt/kafka/bin
使用 kafka-topics.sh
脚本来创建 Topic。
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
参数解释:
kafka-topics.sh
: 用于管理 Kafka Topic 的脚本。
--create
: 指定创建 Topic 的操作。
--zookeeper localhost:2181
: 指定 Kafka 连接的 ZooKeeper 服务地址。 localhost:2181
表示 ZooKeeper 服务运行在本地的 2181 端口。
--replication-factor 1
: 指定 Topic 的副本因子。 副本因子是指每个消息在 Kafka 集群中保存的份数。 设置为 1 表示每个消息只有一个副本。 在生产环境中,建议将副本因子设置为大于 1 的值,以提高数据的可靠性。
--partitions 1
: 指定 Topic 的分区数。 分区是 Topic 的并行处理单元。 增加分区数可以提高 Topic 的吞吐量。
--topic test
: 指定要创建的 Topic 的名称。 这里创建的 Topic 名称为 test
。
如果 Topic 创建成功,会看到类似以下的输出:
Created topic "test".
现在,ZooKeeper, Kafka 服务都已启动,并且创建了一个名为 "test" 的Topic。 可以开始使用 Kafka 生产和消费消息了。 使用 kafka-console-producer.sh
生产消息,使用 kafka-console-consumer.sh
消费消息,来测试 Kafka 环境是否工作正常。
librdkafka 提供了 C 语言接口,可以使用 C 语言编写 Kafka 客户端程序。 在 librdkafka/examples
目录下,包含了一些示例代码。
consumer.c
文件是一个简单的 Kafka 消费者示例,演示了如何使用 C 语言从 Kafka Topic 消费消息。
/**
* Simple high-level balanced Apache Kafka consumer
* using the Kafka driver from librdkafka
* (https://github.com/edenhill/librdkafka)
*/
#include <stdio.h>
#include <signal.h>
#include <string.h>
#include <ctype.h>
/* Typical include path would be <librdkafka/rdkafka.h>, but this program
* is builtin from within the librdkafka source tree and thus differs. */
//#include <librdkafka/rdkafka.h>
#include "rdkafka.h"
static volatile sig_atomic_t run = 1;
/**
* @brief Signal termination of program
*/
static void stop (int sig) {
run = 0;
}
/**
* @returns 1 if all bytes are printable, else 0.
*/
static int is_printable (const char *buf, size_t size) {
size_t i;
for (i = 0 ; i < size ; i++)
if (!isprint((int)buf[i]))
return 0;
return 1;
}
int main (int argc, char **argv) {
rd_kafka_t *rk; /* Consumer instance handle */
rd_kafka_conf_t *conf; /* Temporary configuration object */
rd_kafka_resp_err_t err; /* librdkafka API error code */
char errstr[512]; /* librdkafka API error reporting buffer */
const char *brokers; /* Argument: broker list */
const char *groupid; /* Argument: Consumer group id */
char **topics; /* Argument: list of topics to subscribe to */
int topic_cnt; /* Number of topics to subscribe to */
rd_kafka_topic_partition_list_t *subscription; /* Subscribed topics */
int i;
/*
* Argument validation
*/
if (argc < 4) {
fprintf(stderr,
"%% Usage: "
"%s <broker> <group.id> <topic1> <topic2>..\n",
argv[0]);
return 1;
}
brokers = argv[1];
groupid = argv[2];
topics = &argv[3];
topic_cnt = argc - 3;
/*
* Create Kafka client configuration place-holder
*/
conf = rd_kafka_conf_new(); // 创建配置文件
/* Set bootstrap broker(s) as a comma-separated list of
* host or host:port (default port 9092).
* librdkafka will use the bootstrap brokers to acquire the full
* set of brokers from the cluster. */
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
/* Set the consumer group id.
* All consumers sharing the same group id will join the same
* group, and the subscribed topic' partitions will be assigned
* according to the partition.assignment.strategy
* (consumer config property) to the consumers in the group. */
if (rd_kafka_conf_set(conf, "group.id", groupid,
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
/* If there is no previously committed offset for a partition
* the auto.offset.reset strategy will be used to decide where
* in the partition to start fetching messages.
* By setting this to earliest the consumer will read all messages
* in the partition if there was no previously committed offset. */
if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
/*
* Create consumer instance.
*
* NOTE: rd_kafka_new() takes ownership of the conf object
* and the application must not reference it again after
* this call.
*/
// 创建一个kafka消费者
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!rk) {
fprintf(stderr,
"%% Failed to create new consumer: %s\n", errstr);
return 1;
}
conf = NULL; /* Configuration object is now owned, and freed,
* by the rd_kafka_t instance. */
/* Redirect all messages from per-partition queues to
* the main queue so that messages can be consumed with one
* call from all assigned partitions.
*
* The alternative is to poll the main queue (for events)
* and each partition queue separately, which requires setting
* up a rebalance callback and keeping track of the assignment:
* but that is more complex and typically not recommended. */
rd_kafka_poll_set_consumer(rk);// poll机制,设置消费者实例到poll中
/* Convert the list of topics to a format suitable for librdkafka */
// 创建主题分区列表
subscription = rd_kafka_topic_partition_list_new(topic_cnt);
for (i = 0 ; i < topic_cnt ; i++)
rd_kafka_topic_partition_list_add(subscription,
topics[i],
/* the partition is ignored
* by subscribe() */
RD_KAFKA_PARTITION_UA);
/* Subscribe to the list of topics */
err = rd_kafka_subscribe(rk, subscription);
if (err) {
fprintf(stderr,
"%% Failed to subscribe to %d topics: %s\n",
subscription->cnt, rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(subscription);
rd_kafka_destroy(rk);
return 1;
}
fprintf(stderr,
"%% Subscribed to %d topic(s), "
"waiting for rebalance and messages...\n",
subscription->cnt);
rd_kafka_topic_partition_list_destroy(subscription);
/* Signal handler for clean shutdown */
signal(SIGINT, stop);
/* Subscribing to topics will trigger a group rebalance
* which may take some time to finish, but there is no need
* for the application to handle this idle period in a special way
* since a rebalance may happen at any time.
* Start polling for messages. */
while (run) {
rd_kafka_message_t *rkm;
rkm = rd_kafka_consumer_poll(rk, 100);
if (!rkm)
continue; /* Timeout: no message within 100ms,
* try again. This short timeout allows
* checking for `run` at frequent intervals.
*/
/* consumer_poll() will return either a proper message
* or a consumer error (rkm->err is set). */
if (rkm->err) {
/* Consumer errors are generally to be considered
* informational as the consumer will automatically
* try to recover from all types of errors. */
fprintf(stderr,
"%% Consumer error: %s\n",
rd_kafka_message_errstr(rkm->err));
rd_kafka_message_destroy(rkm);
continue;
}
/* Proper message. */
printf("Message on %s [%"PRId32"] at offset %"PRId64":\n",
rd_kafka_topic_name(rkm->rkt), rkm->partition,
rkm->offset);
/* Print the message key. */
if (rkm->key && is_printable(rkm->key, rkm->key_len))
printf(" Key: %.*s\n",
(int)rkm->key_len, (const char *)rkm->key);
else if (rkm->key)
printf(" Key: (%d bytes)\n", (int)rkm->key_len);
/* Print the message value/payload. */
if (rkm->payload && is_printable(rkm->payload, rkm->len))
printf(" Value: %.*s\n",
(int)rkm->len, (const char *)rkm->payload);
else if (rkm->payload)
printf(" Value: (%d bytes)\n", (int)rkm->len);
rd_kafka_message_destroy(rkm);
}
/* Close the consumer: commit final offsets and leave the group. */
fprintf(stderr, "%% Closing consumer\n");
rd_kafka_consumer_close(rk);
/* Destroy the consumer */
rd_kafka_destroy(rk);
return 0;
}
代码流程:
参数解析: 从命令行参数中获取 Broker 地址、Group ID 和 Topic 列表。
配置创建: 使用 rd_kafka_conf_new()
创建一个配置对象。
配置设置: 使用 rd_kafka_conf_set()
设置配置参数,例如 bootstrap.servers
(Broker 地址)、group.id
(消费者组 ID) 和 auto.offset.reset
(Offset 重置策略)。
消费者创建: 使用 rd_kafka_new()
创建一个 Kafka 消费者实例。 参数 RD_KAFKA_CONSUMER
指定了创建的是消费者。
设置轮询: 使用 rd_kafka_poll_set_consumer()
将消费者实例设置到轮询中。
订阅 Topic: 使用 rd_kafka_topic_partition_list_new()
创建一个 Topic 分区列表,然后使用 rd_kafka_topic_partition_list_add()
将要订阅的 Topic 添加到列表中。 最后使用 rd_kafka_subscribe()
订阅 Topic 列表。
消息轮询: 在一个循环中,使用 rd_kafka_consumer_poll()
轮询消息。 该函数会阻塞一段时间,直到有消息到达或超时。
消息处理: 如果 rd_kafka_consumer_poll()
返回了消息,则处理该消息。 rkm->payload
包含了消息的内容,rkm->len
包含了消息的长度,rkm->topic
包含了消息所属的 Topic,rkm->partition
包含了消息所属的分区,rkm->offset
包含了消息的 Offset。
关闭消费者: 程序结束时,使用 rd_kafka_consumer_close()
关闭消费者,然后使用 rd_kafka_destroy()
销毁消费者实例。
关键函数:
函数调用 | 含义 |
---|---|
| 创建配置文件 |
| 设置参数。可以设置 broker、group id、auto.offset.reset 等 |
| 创建一个 Kafka 消费者 |
| 设置到 poll 里面 |
| 创建主题分区列表 |
| 将主题添加到列表中,有订阅多个就添加多个 |
| 订阅主题 |
| 轮询数据,可以设置超时,从分配的分区中消费消息。 |
producer.c
文件是一个简单的 Kafka 生产者示例,演示了如何使用 C 语言向 Kafka Topic 发送消息。
/**
* Simple Apache Kafka producer
* using the Kafka driver from librdkafka
* (https://github.com/edenhill/librdkafka)
*/
#include <stdio.h>
#include <signal.h>
#include <string.h>
/* Typical include path would be <librdkafka/rdkafka.h>, but this program
* is builtin from within the librdkafka source tree and thus differs. */
#include "rdkafka.h"
static volatile sig_atomic_t run = 1;
/**
* @brief Signal termination of program
*/
static void stop (int sig) {
run = 0;
fclose(stdin); /* abort fgets() */
}
/**
* @brief Message delivery report callback.
*
* This callback is called exactly once per message, indicating if
* the message was succesfully delivered
* (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) or permanently
* failed delivery (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR).
*
* The callback is triggered from rd_kafka_poll() and executes on
* the application's thread.
*/
static void dr_msg_cb (rd_kafka_t *rk,
const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err)
fprintf(stderr, "%% Message delivery failed: %s\n",
rd_kafka_err2str(rkmessage->err));
else
fprintf(stderr,
"%% Message delivered (%zd bytes, "
"partition %"PRId32")\n",
rkmessage->len, rkmessage->partition);
/* The rkmessage is destroyed automatically by librdkafka */
}
int main (int argc, char **argv) {
rd_kafka_t *rk; /* Producer instance handle */
rd_kafka_conf_t *conf; /* Temporary configuration object */
char errstr[512]; /* librdkafka API error reporting buffer */
char buf[512]; /* Message value temporary buffer */
const char *brokers; /* Argument: broker list */
const char *topic; /* Argument: topic to produce to */
/*
* Argument validation
*/
if (argc != 3) {
fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]);
return 1;
}
brokers = argv[1];
topic = argv[2];
/*
* Create Kafka client configuration place-holder
*/
conf = rd_kafka_conf_new();
/* Set bootstrap broker(s) as a comma-separated list of
* host or host:port (default port 9092).
* librdkafka will use the bootstrap brokers to acquire the full
* set of brokers from the cluster. */
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
return 1;
}
/* Set the delivery report callback.
* This callback will be called once per message to inform
* the application if delivery succeeded or failed.
* See dr_msg_cb() above.
* The callback is only triggered from rd_kafka_poll() and
* rd_kafka_flush(). */
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
/*
* Create producer instance.
*
* NOTE: rd_kafka_new() takes ownership of the conf object
* and the application must not reference it again after
* this call.
*/
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk) {
fprintf(stderr,
"%% Failed to create new producer: %s\n", errstr);
return 1;
}
/* Signal handler for clean shutdown */
signal(SIGINT, stop);
fprintf(stderr,
"%% Type some text and hit enter to produce message\n"
"%% Or just hit enter to only serve delivery reports\n"
"%% Press Ctrl-C or Ctrl-D to exit\n");
while (run && fgets(buf, sizeof(buf), stdin)) {
size_t len = strlen(buf);
rd_kafka_resp_err_t err;
if (buf[len-1] == '\n') /* Remove newline */
buf[--len] = '\0';
if (len == 0) {
/* Empty line: only serve delivery reports */
rd_kafka_poll(rk, 0/*non-blocking */);
continue;
}
/*
* Send/Produce message.
* This is an asynchronous call, on success it will only
* enqueue the message on the internal producer queue.
* The actual delivery attempts to the broker are handled
* by background threads.
* The previously registered delivery report callback
* (dr_msg_cb) is used to signal back to the application
* when the message has been delivered (or failed).
*/
retry:
err = rd_kafka_producev(
/* Producer handle */
rk,
/* Topic name */
RD_KAFKA_V_TOPIC(topic),
/* Make a copy of the payload. */
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
/* Message value and length */
RD_KAFKA_V_VALUE(buf, len),
/* Per-Message opaque, provided in
* delivery report callback as
* msg_opaque. */
RD_KAFKA_V_OPAQUE(NULL),
/* End sentinel */
RD_KAFKA_V_END);
if (err) {
/*
* Failed to *enqueue* message for producing.
*/
fprintf(stderr,
"%% Failed to produce to topic %s: %s\n",
topic, rd_kafka_err2str(err));
if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
/* If the internal queue is full, wait for
* messages to be delivered and then retry.
* The internal queue represents both
* messages to be sent and messages that have
* been sent or failed, awaiting their
* delivery report callback to be called.
*
* The internal queue is limited by the
* configuration property
* queue.buffering.max.messages */
rd_kafka_poll(rk, 1000/*block for max 1000ms*/);
goto retry;
}
} else {
fprintf(stderr, "%% Enqueued message (%zd bytes) "
"for topic %s\n",
len, topic);
}
/* A producer application should continually serve
* the delivery report queue by calling rd_kafka_poll()
* at frequent intervals.
* Either put the poll call in your main loop, or in a
* dedicated thread, or call it after every
* rd_kafka_produce() call.
* Just make sure that rd_kafka_poll() is still called
* during periods where you are not producing any messages
* to make sure previously produced messages have their
* delivery report callback served (and any other callbacks
* you register). */
rd_kafka_poll(rk, 0/*non-blocking*/);
}
/* Wait for final messages to be delivered or fail.
* rd_kafka_flush() is an abstraction over rd_kafka_poll() which
* waits for all messages to be delivered. */
fprintf(stderr, "%% Flushing final messages..\n");
rd_kafka_flush(rk, 10*1000 /* wait for max 10 seconds */);
/* If the output queue is still not empty there is an issue
* with producing messages to the clusters. */
if (rd_kafka_outq_len(rk) > 0)
fprintf(stderr, "%% %d message(s) were not delivered\n",
rd_kafka_outq_len(rk));
/* Destroy the producer instance */
rd_kafka_destroy(rk);
return 0;
}
代码流程:
参数解析: 从命令行参数中获取 Broker 地址和 Topic 名称。
配置创建: 使用 rd_kafka_conf_new()
创建一个配置对象。
配置设置: 使用 rd_kafka_conf_set()
设置配置参数,例如 bootstrap.servers
(Broker 地址)。
交付报告回调设置: 使用 rd_kafka_conf_set_dr_msg_cb()
设置交付报告回调函数。 交付报告回调函数会在消息发送成功或失败时被调用。
生产者创建: 使用 rd_kafka_new()
创建一个 Kafka 生产者实例。 参数 RD_KAFKA_PRODUCER
指定了创建的是生产者。
消息发送: 从标准输入读取消息,然后使用 rd_kafka_producev()
发送消息到 Kafka。 RD_KAFKA_V_TOPIC(topic)
指定了消息要发送到的 Topic,RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY)
指定了消息内容会被复制,RD_KAFKA_V_VALUE(buf, len)
指定了消息的内容和长度。
轮询事件: 使用 rd_kafka_poll()
轮询事件。 rd_kafka_poll()
会调用交付报告回调函数,并处理其他 Kafka 事件。
刷新消息: 程序结束时,使用 rd_kafka_flush()
刷新消息队列,确保所有消息都被发送到 Kafka。
销毁生产者: 使用 rd_kafka_destroy()
销毁生产者实例。
关键函数:
函数调用 | 含义 |
---|---|
| 创建配置文件 |
| 设置参数。设置 bootstrap.servers |
| 设置交付报告回调 |
| 创建一个 Kafka 生产者,RD_KAFKA_PRODUCER |
| 发送数据,是发送消息的主要函数,可以指定 Topic、Key、Value、Headers 等信息 |
使用 consumer
示例程序启动消费者。
./consumer localhost:9092 0 test
启动成功后,会显示以下信息:
% Subscribed to 1 topic(s), waiting for rebalance and messages...
使用 producer
示例程序启动生产者。
./producer localhost:9092 test
启动成功后,会显示以下信息:
% Type some text and hit enter to produce message % Or just hit enter to only serve delivery reports % Press Ctrl-C or Ctrl-D to exit
在生产者终端输入消息并按下回车键,例如输入 "hello consumer"。
$ ./producer localhost:9092 test % Type some text and hit enter to produce message % Or just hit enter to only serve delivery reports % Press Ctrl-C or Ctrl-D to exit hello consumer % Enqueued message (14 bytes) for topic test
消费者终端会接收到该消息,并显示以下信息:
$ ./consumer localhost:9092 0 test % Subscribed to 1 topic(s), waiting for rebalance and messages... Message on test [0] at offset 4: Value: hello consumer
这表明生产者成功地将消息发送到了 Kafka,消费者成功地从 Kafka 消费了该消息。
本文深入浅出地介绍了如何使用 C 语言操作 Apache Kafka 消息队列。详细讲解了 Kafka 环境的搭建,包括 ZooKeeper 和 Kafka 服务的启动,以及 Topic 的创建。重点介绍了 librdkafka 库,并结合其提供的 consumer.c
和 producer.c
示例代码,详细分析了 C 语言 Kafka 消费者的实现流程和生产者实现流程。
演示了生产者和消费者之间的交互过程,快速掌握 C 语言 Kafka 客户端开发,为高性能、低延迟的 Kafka 应用开发提供了新的选择。使用 C 语言不仅能更有效地利用系统资源,还能避免 JVM 带来的额外开销,在特定场景下具有显著优势。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有