Queue多节点使用
定义第一个队列(自定义nodeId)
<?php
namespace App\Utility;
use EasySwoole\Component\Singleton;
use EasySwoole\Queue\Queue;
use EasySwoole\Queue\QueueDriverInterface;
class MyQueue1 extends Queue
{
use Singleton;
public function __construct(QueueDriverInterface $driver)
{
parent::__construct($driver);
$this->setNodeId('xxxxx1');
}
}
定义第二个队列(自动生成nodeId)
namespace App\Utility;
use EasySwoole\Component\Singleton;
use EasySwoole\Queue\Queue;
class MyQueue2 extends Queue
{
use Singleton;
}
获取节点id
<?php
namespace App\Utility;
use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\Queue\Job;
class QueueProcess extends AbstractProcess
{
protected function run($arg)
{
go(function () {
MyQueue1::getInstance()->consumer()->listen(function (Job $job) {
var_dump($job->getNodeId());
var_dump($job->getJobId());
});
MyQueue2::getInstance()->consumer()->listen(function (Job $job) {
var_dump($job->getNodeId());
var_dump($job->getJobId());
});
});
}
}
可以多进程,多协程消费
驱动注册
<?php
namespace EasySwoole\EasySwoole;
use App\Utility\MyQueue1;
use App\Utility\MyQueue2;
use App\Utility\QueueProcess;
use EasySwoole\Component\Timer;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\EasySwoole\Swoole\EventRegister;
use EasySwoole\Queue\Job;
class EasySwooleEvent implements Event
{
public static function initialize()
{
date_default_timezone_set('Asia/Shanghai');
}
public static function mainServerCreate(EventRegister $register)
{
//redis pool使用请看redis 章节文档
\EasySwoole\RedisPool\RedisPool::getInstance()->register(new \EasySwoole\Redis\Config\RedisConfig(
[
'host' => '127.0.0.1',
'port' => '6379'
]
), 'queue');
$redisPool = \EasySwoole\RedisPool\RedisPool::getInstance()->getPool('queue');
$driver = new \EasySwoole\Queue\Driver\Redis($redisPool, 'queue');
// 这里是重点
MyQueue1::getInstance($driver);
MyQueue2::getInstance($driver);
//注册一个消费进程
\EasySwoole\Component\Process\Manager::getInstance()->addProcess(new QueueProcess());
//模拟生产者,可以在任意位置投递
$register->add($register::onWorkerStart, function ($ser, $id) {
if ($id == 0) {
Timer::getInstance()->loop(3000, function () {
$job = new Job();
$job->setJobData(['time' => \time()]);
// 这里是重点
MyQueue1::getInstance()->producer()->push($job);
MyQueue2::getInstance()->producer()->push($job);
});
}
});
}
}