前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >用 C 语言操作 Kafka :基于 librdkafka 的开发指南

用 C 语言操作 Kafka :基于 librdkafka 的开发指南

作者头像
Lion 莱恩呀
发布于 2025-04-25 14:38:25
发布于 2025-04-25 14:38:25
19600
代码可运行
举报
概述
本文介绍使用 C 语言操作 Kafka 消息队列的方法。通过 Kafka 环境搭建和 librdkafka 示例代码分析,详细讲解了 C 语言 Kafka 消费者和生产者的实现,并演示了二者交互过程。
文章被收录于专栏:后端开发技术后端开发技术
运行总次数:0
代码可运行

一、安装 librdkafka 客户端库

librdkafka 是一个高性能的 C/C++ Kafka 客户端库,提供了可靠的消息传递机制和丰富的功能。要使用 librdkafka,需要将其安装到系统中。

安装 librdkafka 的步骤:

  1. 克隆 librdkafka 仓库:使用 Git 克隆 librdkafka 的代码仓库到本地。

    git clone https://github.com/edenhill/librdkafka.git

  2. 切换到指定版本 (推荐):为了保证稳定性和兼容性,建议切换到特定的 librdkafka 版本【可查看 librdkafka 的 Releases 页面 (https://github.com/edenhill/librdkafka/releases) 选择一个稳定的版本】。 这里以 v1.7.0 为例:

    cd librdkafka git checkout v1.7.0

  3. 配置编译环境:进入 librdkafka 目录后,执行 ./configure 命令来配置编译环境。 这个命令会检查系统依赖项,并生成 Makefile 文件。

    ./configure

    如果需要指定安装路径或启用/禁用某些功能,可以使用 ./configure 命令的选项。 例如,要将 librdkafka 安装到 /opt/librdkafka 目录,可以使用以下命令:

    ./configure --prefix=/opt/librdkafka

  4. 配置完成后,执行 make 命令来编译 librdkafka(这个过程可能需要一些时间):

    make

  5. 编译完成后,使用 sudo make install 命令将 librdkafka 安装到系统中。 需要管理员权限才能执行此命令。

    sudo make install

  6. 最后,执行 sudo ldconfig 命令来更新动态链接器缓存。 这可以确保系统能够找到新安装的 librdkafka 库。

    sudo ldconfig

    如果在步骤 3 中指定了非标准安装路径,需要将该路径添加到 /etc/ld.so.conf 文件中,然后再运行 sudo ldconfig

安装完成后,librdkafka 就已经成功安装到系统中了。

示例程序和参数:在 librdkafka 的 examples 目录下,包含了一些示例程序,可以帮助快速了解如何使用 librdkafka。 例如,consumer 示例程序演示了如何从 Kafka 集群消费消息。

要运行 consumer 示例程序,需要指定以下参数:

代码语言:Bash
换行
自动换行
AI代码解释
% Usage: ./consumer <broker> <group.id> <topic1> <topic2>..
  • <broker>: Kafka Broker 的地址,例如 localhost:9092。 可以指定多个 Broker 地址,用逗号分隔。

  • <group.id>: 消费者组的 ID。 同一个消费者组中的消费者会协同消费 Kafka topic 的不同分区。

  • <topic1> <topic2> ...: 要订阅的 Kafka Topic 的名称。 可以指定多个 Topic,用空格分隔。

以下是一个运行 consumer 示例程序的例子:

代码语言:Bash
换行
自动换行
AI代码解释
./consumer localhost:9092 my-group test-topic

这个命令会启动一个 consumer 实例,连接到 localhost:9092 的 Kafka Broker,加入 my-group 消费者组,并订阅 test-topic Topic。

了解 librdkafka 中常用的缩略语可以更好地理解代码和文档。 下表列出了一些常见的缩略语及其含义:

缩略语

缩略语全称

示例或说明

rd

Rapid Development

rd.h 文件包含了 librdkafka 的通用定义。

rk

RdKafka

代表 librdkafka 的主要结构体,用于创建 Kafka 客户端实例。

toppar

Topic Partition

struct rd_kafka_toppar_t { }; 表示 Topic 的一个分区,包含了分区 ID、offset 等信息。

rep

Reply

struct rd_kafka_t { rd_kafka_q_t *rk_rep }; 通常用于异步操作的回复队列。

msgq

Message Queue

struct rd_kafka_msgq_t { }; 消息队列,用于存储待发送或待处理的消息。

rkb

RdKafka Broker

Kafka 代理服务器。

rko

RdKafka Operation

Kafka 操作,例如发送消息、获取元数据等。

rkm

RdKafka Message

Kafka 消息,包含了消息的内容、Headers 和元数据。

payload

存在 Kafka 上的消息内容(或叫 Log)。 是 Kafka 消息体的内容,也就是实际要传输的数据。

二、启动 Kafka 相关服务

在开始使用 Kafka 之前,需要先启动 ZooKeeper 和 Kafka 服务。 ZooKeeper 用于管理 Kafka 集群的元数据,Kafka 则负责消息的存储和传递。

2.1 启动 ZooKeeper

ZooKeeper 可以独立搭建集群,提供更高的可用性和容错性。 但为了简化演示,这里可以直接使用 Kafka 自带的 ZooKeeper 服务。

  1. 进入 Kafka 安装目录的 bin 目录。 这个目录包含了启动和管理 Kafka 服务的脚本。 例如 Kafka 安装目录是 /opt/kafka,那么可以使用以下命令进入 bin 目录:

    cd /opt/kafka/bin

  2. 在 bin 目录下,可以使用 zookeeper-server-start.sh 脚本来启动 ZooKeeper 服务。

    • 前台运行:使用以下命令可以在前台运行 ZooKeeper 服务。 前台运行会直接在终端显示 ZooKeeper 的日志输出,方便查看运行状态。

      sh zookeeper-server-start.sh ../config/zookeeper.properties

    • 后台运行 (推荐):使用 -daemon 参数可以在后台运行 ZooKeeper 服务。 这样可以避免终端被 ZooKeeper 服务占用。

      sh zookeeper-server-start.sh -daemon ../config/zookeeper.properties

      参数解释:

      • zookeeper-server-start.sh: 启动 ZooKeeper 服务的脚本。

      • -daemon: 指定在后台运行服务。

      • ../config/zookeeper.properties: ZooKeeper 的配置文件。 该文件包含了 ZooKeeper 服务的端口、数据存储路径等配置信息。

  3. 使用 lsof 命令来检查 ZooKeeper 服务是否已经成功启动,并监听了 2181 端口 (默认端口)。

    lsof -i:2181

    如果 ZooKeeper 服务已经启动成功,会看到类似以下的输出:

    COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 74930 fly 96u IPv6 734467 0t0 TCP *:2181 (LISTEN)

    这表示 Java 进程正在监听 2181 端口,ZooKeeper 服务运行正常。

2.2 启动 Kafka 服务

ZooKeeper 启动后,就可以启动 Kafka 服务了。

  1. 进入 Kafka 的 bin 目录 (如果还没有进入):

    cd /opt/kafka/bin

  2. 使用 kafka-server-start.sh 脚本来启动 Kafka 服务。

    sh kafka-server-start.sh -daemon ../config/server.properties

    参数解释:

    • kafka-server-start.sh: 启动 Kafka 服务的脚本。

    • -daemon: 指定在后台运行服务。

    • ../config/server.properties: Kafka 服务的配置文件。 该文件包含了 Kafka 服务的 Broker ID、监听端口、ZooKeeper 连接地址等配置信息。

    默认情况下,Kafka 服务会监听 9092 端口。 可以在 server.properties 文件中修改端口配置。

  3. 使用 jps 命令查看 Kafka 服务进程是否存在。

    jps

    如果 Kafka 服务已经启动成功,会看到类似以下的输出:

    [PID] Kafka [PID] QuorumPeerMain

    其中 [PID] 是进程 ID,Kafka 表示 Kafka 服务进程,QuorumPeerMain 表示 ZooKeeper 服务进程(如果使用 Kafka 自带的 ZooKeeper)。

2.3 创建 Topic

在 Kafka 中,消息是按照 Topic 进行组织的。 在开始生产和消费消息之前,需要先创建 Topic。

  1. 进入 Kafka 的 bin 目录 (如果还没有进入):

    cd /opt/kafka/bin

  2. 使用 kafka-topics.sh 脚本来创建 Topic。

    sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    参数解释:

    • kafka-topics.sh: 用于管理 Kafka Topic 的脚本。

    • --create: 指定创建 Topic 的操作。

    • --zookeeper localhost:2181: 指定 Kafka 连接的 ZooKeeper 服务地址。 localhost:2181 表示 ZooKeeper 服务运行在本地的 2181 端口。

    • --replication-factor 1: 指定 Topic 的副本因子。 副本因子是指每个消息在 Kafka 集群中保存的份数。 设置为 1 表示每个消息只有一个副本。 在生产环境中,建议将副本因子设置为大于 1 的值,以提高数据的可靠性。

    • --partitions 1: 指定 Topic 的分区数。 分区是 Topic 的并行处理单元。 增加分区数可以提高 Topic 的吞吐量。

    • --topic test: 指定要创建的 Topic 的名称。 这里创建的 Topic 名称为 test

  3. 如果 Topic 创建成功,会看到类似以下的输出:

    Created topic "test".

现在,ZooKeeper, Kafka 服务都已启动,并且创建了一个名为 "test" 的Topic。 可以开始使用 Kafka 生产和消费消息了。 使用 kafka-console-producer.sh 生产消息,使用 kafka-console-consumer.sh 消费消息,来测试 Kafka 环境是否工作正常。

三、 C 语言操作 Kafka 示例

librdkafka 提供了 C 语言接口,可以使用 C 语言编写 Kafka 客户端程序。 在 librdkafka/examples 目录下,包含了一些示例代码。

3.1 消费者示例 (consumer.c)

consumer.c 文件是一个简单的 Kafka 消费者示例,演示了如何使用 C 语言从 Kafka Topic 消费消息。

代码语言:C
换行
代码运行次数:0
自动换行
运行AI代码解释
/** * Simple high-level balanced Apache Kafka consumer * using the Kafka driver from librdkafka * (https://github.com/edenhill/librdkafka) */ #include <stdio.h> #include <signal.h> #include <string.h> #include <ctype.h> /* Typical include path would be <librdkafka/rdkafka.h>, but this program * is builtin from within the librdkafka source tree and thus differs. */ //#include <librdkafka/rdkafka.h> #include "rdkafka.h" static volatile sig_atomic_t run = 1; /** * @brief Signal termination of program */ static void stop (int sig) { run = 0; } /** * @returns 1 if all bytes are printable, else 0. */ static int is_printable (const char *buf, size_t size) { size_t i; for (i = 0 ; i < size ; i++) if (!isprint((int)buf[i])) return 0; return 1; } int main (int argc, char **argv) { rd_kafka_t *rk; /* Consumer instance handle */ rd_kafka_conf_t *conf; /* Temporary configuration object */ rd_kafka_resp_err_t err; /* librdkafka API error code */ char errstr[512]; /* librdkafka API error reporting buffer */ const char *brokers; /* Argument: broker list */ const char *groupid; /* Argument: Consumer group id */ char **topics; /* Argument: list of topics to subscribe to */ int topic_cnt; /* Number of topics to subscribe to */ rd_kafka_topic_partition_list_t *subscription; /* Subscribed topics */ int i; /* * Argument validation */ if (argc < 4) { fprintf(stderr, "%% Usage: " "%s <broker> <group.id> <topic1> <topic2>..\n", argv[0]); return 1; } brokers = argv[1]; groupid = argv[2]; topics = &argv[3]; topic_cnt = argc - 3; /* * Create Kafka client configuration place-holder */ conf = rd_kafka_conf_new(); // 创建配置文件 /* Set bootstrap broker(s) as a comma-separated list of * host or host:port (default port 9092). * librdkafka will use the bootstrap brokers to acquire the full * set of brokers from the cluster. */ if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%s\n", errstr); rd_kafka_conf_destroy(conf); return 1; } /* Set the consumer group id. * All consumers sharing the same group id will join the same * group, and the subscribed topic' partitions will be assigned * according to the partition.assignment.strategy * (consumer config property) to the consumers in the group. */ if (rd_kafka_conf_set(conf, "group.id", groupid, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%s\n", errstr); rd_kafka_conf_destroy(conf); return 1; } /* If there is no previously committed offset for a partition * the auto.offset.reset strategy will be used to decide where * in the partition to start fetching messages. * By setting this to earliest the consumer will read all messages * in the partition if there was no previously committed offset. */ if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%s\n", errstr); rd_kafka_conf_destroy(conf); return 1; } /* * Create consumer instance. * * NOTE: rd_kafka_new() takes ownership of the conf object * and the application must not reference it again after * this call. */ // 创建一个kafka消费者 rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); if (!rk) { fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr); return 1; } conf = NULL; /* Configuration object is now owned, and freed, * by the rd_kafka_t instance. */ /* Redirect all messages from per-partition queues to * the main queue so that messages can be consumed with one * call from all assigned partitions. * * The alternative is to poll the main queue (for events) * and each partition queue separately, which requires setting * up a rebalance callback and keeping track of the assignment: * but that is more complex and typically not recommended. */ rd_kafka_poll_set_consumer(rk);// poll机制,设置消费者实例到poll中 /* Convert the list of topics to a format suitable for librdkafka */ // 创建主题分区列表 subscription = rd_kafka_topic_partition_list_new(topic_cnt); for (i = 0 ; i < topic_cnt ; i++) rd_kafka_topic_partition_list_add(subscription, topics[i], /* the partition is ignored * by subscribe() */ RD_KAFKA_PARTITION_UA); /* Subscribe to the list of topics */ err = rd_kafka_subscribe(rk, subscription); if (err) { fprintf(stderr, "%% Failed to subscribe to %d topics: %s\n", subscription->cnt, rd_kafka_err2str(err)); rd_kafka_topic_partition_list_destroy(subscription); rd_kafka_destroy(rk); return 1; } fprintf(stderr, "%% Subscribed to %d topic(s), " "waiting for rebalance and messages...\n", subscription->cnt); rd_kafka_topic_partition_list_destroy(subscription); /* Signal handler for clean shutdown */ signal(SIGINT, stop); /* Subscribing to topics will trigger a group rebalance * which may take some time to finish, but there is no need * for the application to handle this idle period in a special way * since a rebalance may happen at any time. * Start polling for messages. */ while (run) { rd_kafka_message_t *rkm; rkm = rd_kafka_consumer_poll(rk, 100); if (!rkm) continue; /* Timeout: no message within 100ms, * try again. This short timeout allows * checking for `run` at frequent intervals. */ /* consumer_poll() will return either a proper message * or a consumer error (rkm->err is set). */ if (rkm->err) { /* Consumer errors are generally to be considered * informational as the consumer will automatically * try to recover from all types of errors. */ fprintf(stderr, "%% Consumer error: %s\n", rd_kafka_message_errstr(rkm->err)); rd_kafka_message_destroy(rkm); continue; } /* Proper message. */ printf("Message on %s [%"PRId32"] at offset %"PRId64":\n", rd_kafka_topic_name(rkm->rkt), rkm->partition, rkm->offset); /* Print the message key. */ if (rkm->key && is_printable(rkm->key, rkm->key_len)) printf(" Key: %.*s\n", (int)rkm->key_len, (const char *)rkm->key); else if (rkm->key) printf(" Key: (%d bytes)\n", (int)rkm->key_len); /* Print the message value/payload. */ if (rkm->payload && is_printable(rkm->payload, rkm->len)) printf(" Value: %.*s\n", (int)rkm->len, (const char *)rkm->payload); else if (rkm->payload) printf(" Value: (%d bytes)\n", (int)rkm->len); rd_kafka_message_destroy(rkm); } /* Close the consumer: commit final offsets and leave the group. */ fprintf(stderr, "%% Closing consumer\n"); rd_kafka_consumer_close(rk); /* Destroy the consumer */ rd_kafka_destroy(rk); return 0; }

代码流程:

  1. 参数解析: 从命令行参数中获取 Broker 地址、Group ID 和 Topic 列表。

  2. 配置创建: 使用 rd_kafka_conf_new() 创建一个配置对象。

  3. 配置设置: 使用 rd_kafka_conf_set() 设置配置参数,例如 bootstrap.servers (Broker 地址)、group.id (消费者组 ID) 和 auto.offset.reset (Offset 重置策略)。

  4. 消费者创建: 使用 rd_kafka_new() 创建一个 Kafka 消费者实例。 参数 RD_KAFKA_CONSUMER 指定了创建的是消费者。

  5. 设置轮询: 使用 rd_kafka_poll_set_consumer() 将消费者实例设置到轮询中。

  6. 订阅 Topic: 使用 rd_kafka_topic_partition_list_new() 创建一个 Topic 分区列表,然后使用 rd_kafka_topic_partition_list_add() 将要订阅的 Topic 添加到列表中。 最后使用 rd_kafka_subscribe() 订阅 Topic 列表。

  7. 消息轮询: 在一个循环中,使用 rd_kafka_consumer_poll() 轮询消息。 该函数会阻塞一段时间,直到有消息到达或超时。

  8. 消息处理: 如果 rd_kafka_consumer_poll() 返回了消息,则处理该消息。 rkm->payload 包含了消息的内容,rkm->len 包含了消息的长度,rkm->topic 包含了消息所属的 Topic,rkm->partition 包含了消息所属的分区,rkm->offset 包含了消息的 Offset。

  9. 关闭消费者: 程序结束时,使用 rd_kafka_consumer_close() 关闭消费者,然后使用 rd_kafka_destroy() 销毁消费者实例。

关键函数:

函数调用

含义

rd_kafka_conf_new()

创建配置文件

rd_kafka_conf_set(...)

设置参数。可以设置 broker、group id、auto.offset.reset 等

rd_kafka_new(...)

创建一个 Kafka 消费者

rd_kafka_poll_set_consumer(...)

设置到 poll 里面

rd_kafka_topic_partition_list_new(...)

创建主题分区列表

rd_kafka_topic_partition_list_add(...)

将主题添加到列表中,有订阅多个就添加多个

rd_kafka_subscribe(...)

订阅主题

rd_kafka_consumer_poll(...)

轮询数据,可以设置超时,从分配的分区中消费消息。

3.2 生产者示例 (producer.c)

producer.c 文件是一个简单的 Kafka 生产者示例,演示了如何使用 C 语言向 Kafka Topic 发送消息。

代码语言:C
换行
代码运行次数:0
自动换行
运行AI代码解释
/** * Simple Apache Kafka producer * using the Kafka driver from librdkafka * (https://github.com/edenhill/librdkafka) */ #include <stdio.h> #include <signal.h> #include <string.h> /* Typical include path would be <librdkafka/rdkafka.h>, but this program * is builtin from within the librdkafka source tree and thus differs. */ #include "rdkafka.h" static volatile sig_atomic_t run = 1; /** * @brief Signal termination of program */ static void stop (int sig) { run = 0; fclose(stdin); /* abort fgets() */ } /** * @brief Message delivery report callback. * * This callback is called exactly once per message, indicating if * the message was succesfully delivered * (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) or permanently * failed delivery (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR). * * The callback is triggered from rd_kafka_poll() and executes on * the application's thread. */ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { if (rkmessage->err) fprintf(stderr, "%% Message delivery failed: %s\n", rd_kafka_err2str(rkmessage->err)); else fprintf(stderr, "%% Message delivered (%zd bytes, " "partition %"PRId32")\n", rkmessage->len, rkmessage->partition); /* The rkmessage is destroyed automatically by librdkafka */ } int main (int argc, char **argv) { rd_kafka_t *rk; /* Producer instance handle */ rd_kafka_conf_t *conf; /* Temporary configuration object */ char errstr[512]; /* librdkafka API error reporting buffer */ char buf[512]; /* Message value temporary buffer */ const char *brokers; /* Argument: broker list */ const char *topic; /* Argument: topic to produce to */ /* * Argument validation */ if (argc != 3) { fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]); return 1; } brokers = argv[1]; topic = argv[2]; /* * Create Kafka client configuration place-holder */ conf = rd_kafka_conf_new(); /* Set bootstrap broker(s) as a comma-separated list of * host or host:port (default port 9092). * librdkafka will use the bootstrap brokers to acquire the full * set of brokers from the cluster. */ if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%s\n", errstr); return 1; } /* Set the delivery report callback. * This callback will be called once per message to inform * the application if delivery succeeded or failed. * See dr_msg_cb() above. * The callback is only triggered from rd_kafka_poll() and * rd_kafka_flush(). */ rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); /* * Create producer instance. * * NOTE: rd_kafka_new() takes ownership of the conf object * and the application must not reference it again after * this call. */ rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!rk) { fprintf(stderr, "%% Failed to create new producer: %s\n", errstr); return 1; } /* Signal handler for clean shutdown */ signal(SIGINT, stop); fprintf(stderr, "%% Type some text and hit enter to produce message\n" "%% Or just hit enter to only serve delivery reports\n" "%% Press Ctrl-C or Ctrl-D to exit\n"); while (run && fgets(buf, sizeof(buf), stdin)) { size_t len = strlen(buf); rd_kafka_resp_err_t err; if (buf[len-1] == '\n') /* Remove newline */ buf[--len] = '\0'; if (len == 0) { /* Empty line: only serve delivery reports */ rd_kafka_poll(rk, 0/*non-blocking */); continue; } /* * Send/Produce message. * This is an asynchronous call, on success it will only * enqueue the message on the internal producer queue. * The actual delivery attempts to the broker are handled * by background threads. * The previously registered delivery report callback * (dr_msg_cb) is used to signal back to the application * when the message has been delivered (or failed). */ retry: err = rd_kafka_producev( /* Producer handle */ rk, /* Topic name */ RD_KAFKA_V_TOPIC(topic), /* Make a copy of the payload. */ RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), /* Message value and length */ RD_KAFKA_V_VALUE(buf, len), /* Per-Message opaque, provided in * delivery report callback as * msg_opaque. */ RD_KAFKA_V_OPAQUE(NULL), /* End sentinel */ RD_KAFKA_V_END); if (err) { /* * Failed to *enqueue* message for producing. */ fprintf(stderr, "%% Failed to produce to topic %s: %s\n", topic, rd_kafka_err2str(err)); if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) { /* If the internal queue is full, wait for * messages to be delivered and then retry. * The internal queue represents both * messages to be sent and messages that have * been sent or failed, awaiting their * delivery report callback to be called. * * The internal queue is limited by the * configuration property * queue.buffering.max.messages */ rd_kafka_poll(rk, 1000/*block for max 1000ms*/); goto retry; } } else { fprintf(stderr, "%% Enqueued message (%zd bytes) " "for topic %s\n", len, topic); } /* A producer application should continually serve * the delivery report queue by calling rd_kafka_poll() * at frequent intervals. * Either put the poll call in your main loop, or in a * dedicated thread, or call it after every * rd_kafka_produce() call. * Just make sure that rd_kafka_poll() is still called * during periods where you are not producing any messages * to make sure previously produced messages have their * delivery report callback served (and any other callbacks * you register). */ rd_kafka_poll(rk, 0/*non-blocking*/); } /* Wait for final messages to be delivered or fail. * rd_kafka_flush() is an abstraction over rd_kafka_poll() which * waits for all messages to be delivered. */ fprintf(stderr, "%% Flushing final messages..\n"); rd_kafka_flush(rk, 10*1000 /* wait for max 10 seconds */); /* If the output queue is still not empty there is an issue * with producing messages to the clusters. */ if (rd_kafka_outq_len(rk) > 0) fprintf(stderr, "%% %d message(s) were not delivered\n", rd_kafka_outq_len(rk)); /* Destroy the producer instance */ rd_kafka_destroy(rk); return 0; }

代码流程:

  1. 参数解析: 从命令行参数中获取 Broker 地址和 Topic 名称。

  2. 配置创建: 使用 rd_kafka_conf_new() 创建一个配置对象。

  3. 配置设置: 使用 rd_kafka_conf_set() 设置配置参数,例如 bootstrap.servers (Broker 地址)。

  4. 交付报告回调设置: 使用 rd_kafka_conf_set_dr_msg_cb() 设置交付报告回调函数。 交付报告回调函数会在消息发送成功或失败时被调用。

  5. 生产者创建: 使用 rd_kafka_new() 创建一个 Kafka 生产者实例。 参数 RD_KAFKA_PRODUCER 指定了创建的是生产者。

  6. 消息发送: 从标准输入读取消息,然后使用 rd_kafka_producev() 发送消息到 Kafka。 RD_KAFKA_V_TOPIC(topic) 指定了消息要发送到的 Topic,RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY) 指定了消息内容会被复制,RD_KAFKA_V_VALUE(buf, len) 指定了消息的内容和长度。

  7. 轮询事件: 使用 rd_kafka_poll() 轮询事件。 rd_kafka_poll() 会调用交付报告回调函数,并处理其他 Kafka 事件。

  8. 刷新消息: 程序结束时,使用 rd_kafka_flush() 刷新消息队列,确保所有消息都被发送到 Kafka。

  9. 销毁生产者: 使用 rd_kafka_destroy() 销毁生产者实例。

关键函数:

函数调用

含义

rd_kafka_conf_new()

创建配置文件

rd_kafka_conf_set(...)

设置参数。设置 bootstrap.servers

rd_kafka_conf_set_dr_msg_cb(...)

设置交付报告回调

rd_kafka_new(...)

创建一个 Kafka 生产者,RD_KAFKA_PRODUCER

rd_kafka_producev(...)

发送数据,是发送消息的主要函数,可以指定 Topic、Key、Value、Headers 等信息

3.3 生产者和消费者的交互

  1. 使用 consumer 示例程序启动消费者。

    ./consumer localhost:9092 0 test

    启动成功后,会显示以下信息:

    % Subscribed to 1 topic(s), waiting for rebalance and messages...

  2. 使用 producer 示例程序启动生产者。

    ./producer localhost:9092 test

    启动成功后,会显示以下信息:

    % Type some text and hit enter to produce message % Or just hit enter to only serve delivery reports % Press Ctrl-C or Ctrl-D to exit

  3. 在生产者终端输入消息并按下回车键,例如输入 "hello consumer"。

    $ ./producer localhost:9092 test % Type some text and hit enter to produce message % Or just hit enter to only serve delivery reports % Press Ctrl-C or Ctrl-D to exit hello consumer % Enqueued message (14 bytes) for topic test

    消费者终端会接收到该消息,并显示以下信息:

    $ ./consumer localhost:9092 0 test % Subscribed to 1 topic(s), waiting for rebalance and messages... Message on test [0] at offset 4: Value: hello consumer

    这表明生产者成功地将消息发送到了 Kafka,消费者成功地从 Kafka 消费了该消息。

四、总结

本文深入浅出地介绍了如何使用 C 语言操作 Apache Kafka 消息队列。详细讲解了 Kafka 环境的搭建,包括 ZooKeeper 和 Kafka 服务的启动,以及 Topic 的创建。重点介绍了 librdkafka 库,并结合其提供的 consumer.c 和 producer.c 示例代码,详细分析了 C 语言 Kafka 消费者的实现流程和生产者实现流程。

演示了生产者和消费者之间的交互过程,快速掌握 C 语言 Kafka 客户端开发,为高性能、低延迟的 Kafka 应用开发提供了新的选择。使用 C 语言不仅能更有效地利用系统资源,还能避免 JVM 带来的额外开销,在特定场景下具有显著优势。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验