首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >php 操作kafka的实践

php 操作kafka的实践

作者头像
友儿
发布2022-07-27 16:22:29
发布2022-07-27 16:22:29
1K0
举报
文章被收录于专栏:友儿友儿

安装kafka的php扩展

代码语言:javascript
复制
先安装rdkfka库文件
  git clone https://github.com/edenhill/librdkafka.git
  cd librdkafka/
  ./configure 
  make
  sudo make install
  git clone https://github.com/arnaud-lb/php-rdkafka.git
  cd php-rdkafka
  phpize
  ./configure
  make all -j 5
  sudo make install
  
  vim php.ini
extension=rdkafka.so

php代码实践

生产者

代码语言:javascript
复制
<?php
$conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
  file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
  file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});

$rk = new RdKafka\Producer($conf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");

$cf = new RdKafka\TopicConf();
// -1必须等所有brokers同步完成的确认 1当前服务器确认 0不确认,这里如果是0回调里的offset无返回,如果是1和-1会返回offset
// 我们可以利用该机制做消息生产的确认,不过还不是100%,因为有可能会中途kafka服务器挂掉
$cf->set('request.required.acks', 0);
$topic = $rk->newTopic("test", $cf);

$option = 'qkl';
for ($i = 0; $i < 20; $i++) {
  //RD_KAFKA_PARTITION_UA自动选择分区
  //$option可选
  $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
}

$len = $rk->getOutQLen();
while ($len > 0) {
  $len = $rk->getOutQLen();
  var_dump($len);
  $rk->poll(50);
}
#运行生产者
#php producer.php
#output

int(20)
int(20)
int(20)
int(20)
int(0)

你可以查看你刚才上面启动的消费者shell应该会输出消息

代码语言:javascript
复制
qkl . 0
qkl . 1
qkl . 2
...
qkl . 19

Low Level 消费者

代码语言:javascript
复制
<?php
$conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
  file_put_contents("./c_dr_cb.log", var_export($message, true), FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
  file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});

//设置消费组
$conf->set('group.id', 'myConsumerGroup');

$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");

$topicConf = new RdKafka\TopicConf();
$topicConf->set('request.required.acks', 1);
//在interval.ms的时间内自动提交确认、建议不要启动
//$topicConf->set('auto.commit.enable', 1);
$topicConf->set('auto.commit.enable', 0);
$topicConf->set('auto.commit.interval.ms', 100);

// 设置offset的存储为file
//$topicConf->set('offset.store.method', 'file');
// 设置offset的存储为broker
 $topicConf->set('offset.store.method', 'broker');
//$topicConf->set('offset.store.path', __DIR__);

//smallest:简单理解为从头开始消费,其实等价于上面的 earliest
//largest:简单理解为从最新的开始消费,其实等价于上面的 latest
//$topicConf->set('auto.offset.reset', 'smallest');

$topic = $rk->newTopic("test", $topicConf);

// 参数1消费分区0
// RD_KAFKA_OFFSET_BEGINNING 重头开始消费
// RD_KAFKA_OFFSET_STORED 最后一条消费的offset记录开始消费
// RD_KAFKA_OFFSET_END 最后一条消费
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
//$topic->consumeStart(0, RD_KAFKA_OFFSET_END); //
//$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) {
  //参数1表示消费分区,这里是分区0
  //参数2表示同步阻塞多久
  $message = $topic->consume(0, 12 * 1000);
  if (is_null($message)) {
      sleep(1);
      echo "No more messages\n";
      continue;
  }
  switch ($message->err) {
      case RD_KAFKA_RESP_ERR_NO_ERROR:
          var_dump($message);
          break;
      case RD_KAFKA_RESP_ERR__PARTITION_EOF:
          echo "No more messages; will wait for more\n";
          break;
      case RD_KAFKA_RESP_ERR__TIMED_OUT:
          echo "Timed out\n";
          break;
      default:
          throw new \Exception($message->errstr(), $message->err);
          break;
  }
}

High LEVEL消费者

代码语言:javascript
复制
<?php
/**
 * Created by PhpStorm.
 * User: qkl
 * Date: 2018/8/22
 * Time: 17:58
 */
$conf = new \RdKafka\Conf();

function rebalance(\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
  global $offset;
  switch ($err) {
      case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
          echo "Assign: ";
          var_dump($partitions);
          $kafka->assign();
//            $kafka->assign([new RdKafka\TopicPartition("qkl01", 0, 0)]);
          break;

      case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
          echo "Revoke: ";
          var_dump($partitions);
          $kafka->assign(NULL);
          break;

      default:
          throw new \Exception($err);
  }
}

// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb(function(\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
  rebalance($kafka, $err, $partitions);
});

// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'test-110-g100');

// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '192.168.216.122');

$topicConf = new \RdKafka\TopicConf();

$topicConf->set('request.required.acks', -1);
//在interval.ms的时间内自动提交确认、建议不要启动
$topicConf->set('auto.commit.enable', 0);
//$topicConf->set('auto.commit.enable', 0);
$topicConf->set('auto.commit.interval.ms', 100);

// 设置offset的存储为file
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.path', __DIR__);
// 设置offset的存储为broker
// $topicConf->set('offset.store.method', 'broker');

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');

// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);

$consumer = new \RdKafka\KafkaConsumer($conf);

//$KafkaConsumerTopic = $consumer->newTopic('qkl01', $topicConf);

// Subscribe to topic 'test'
$consumer->subscribe(['qkl01']);

echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";

while (true) {
  $message = $consumer->consume(120*1000);
  switch ($message->err) {
      case RD_KAFKA_RESP_ERR_NO_ERROR:
          var_dump($message);
//            $consumer->commit($message);
//            $KafkaConsumerTopic->offsetStore(0, 20);
          break;
      case RD_KAFKA_RESP_ERR__PARTITION_EOF:
          echo "No more messages; will wait for more\n";
          break;
      case RD_KAFKA_RESP_ERR__TIMED_OUT:
          echo "Timed out\n";
          break;
      default:
          throw new \Exception($message->errstr(), $message->err);
          break;
  }
}
  • 消费组特别说明
    • 特别注意,High LEVEL消费者设置的消费组,kafka服务器才会记录, Low Level消费者设置的消费组,服务器不会记录
  • 分享一个打包好的php-rdkafka的类库
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 安装kafka的php扩展
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档