NSQ是一个分布式实时消息平台。NSQ可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。NSQ具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。NSQ非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。
PHP-NSQ 是一个用于 PHP 7 和 PHP 8 的 NSQ 客户端扩展。NSQ 是一个类似 Kafka 的消息队列系统,此扩展允许 PHP 应用程序与 NSQ 消息队列系统进行交互,实现消息的发布和订阅功能。
通过Docker安装
$ docker pull nsqio/nsq
Using default tag: latest
latest: Pulling from nsqio/nsq
709515475419: Pull complete
efd1c5a69d15: Pull complete
fa61d00bb52d: Pull complete
Digest: sha256:fad1937a88fec5b66fb9f4837b72ad3b70012692826aed5c6435f93c5a23b690
Status: Downloaded newer image for nsqio/nsq:latest
启动后
生产一条消息
curl -d 'Hello Linux localhost Msg' 'http://127.0.0.1:4151/pub?topic=test'
查看消息界面
安装此扩展需要依赖libevent
库,可以使用不同的包管理工具进行安装:
# Debian/Ubuntu
apt-get install libevent-dev
# CentOS/RHEL
yum install libevent-devel
# macOS
brew install libevent
pecl install nsq
<?php
namespaceapp\index\controller;
class NsqController extends Controller
{
publicfunction index()
{
ini_set('memory_limit', '8M');
$nsqdAddr = [
"127.0.0.1:4151",
"127.0.0.1:4150"
];
$nsq = new \Nsq();
$isTrue = $nsq->connectNsqd($nsqdAddr);
for ($i = ; $i < ; $i++) {
$nsq->publish("test", "Hi Tinywan");
}
$nsq->closeNsqdConnection();
halt($isTrue);
// Deferred publish
//function : deferredPublish(string topic,string message, int millisecond);
//millisecond default : [0 < millisecond < 3600000]
$deferred = new \Nsq();
$isTrue = $deferred->connectNsqd($nsqdAddr);
for ($i = ; $i < ; $i++) {
$deferred->deferredPublish("test", "message daly", );
}
$deferred->closeNsqdConnection();
}
publicfunction nsqSubMessage()
{
$nsq_lookupd = new \NsqLookupd("127.0.0.1:4161"); //the nsqlookupd http addr
$nsq = new \Nsq();
$config = array(
"topic" => "test",
"channel" => "struggle",
"rdy" => , //optional , default 1
"connect_num" => , //optional , default 1
"retry_delay_time" => , //optional, default 0 , if run callback failed, after 5000 msec, message will be retried
"auto_finish" => true, //default true
);
$nsq->subscribe($nsq_lookupd, $config, function ($msg, $bev) {
echo $msg->payload . "\n";
echo $msg->attempts . "\n";
echo $msg->messageId . "\n";
echo $msg->timestamp . "\n";
});
}
}
使用命令行模式,在console 显示信息
php think pay nsq
Connect succeed
nihao
f0f1ae3bef9002
Hi Tinywan
f0f1c2262f9000
Hi Tinywan
f0f1c2266f9001
Hi Tinywan
f0f1ec952f9000
Hi Tinywan
f0f1ec956f9001
Hi Tinywan
f0f1ec95af9000