PHP+RabbitMQ实现消息队列(代码全篇)
前言
先安装PHP对应的RabbitMQ,这里用的是 php_amqp 不同的扩展实现方式会有细微的差异. php扩展地址: http://pecl.php.net/package/amqp 具体以官网为准 http://www.rabbitmq.com/getstarted.html
介绍
config.php 配置信息BaseMQ.php MQ基类ProductMQ.php 生产者类ConsumerMQ.php 消费者类Consumer2MQ.php 消费者2(可有多个)
config.php
<?php return [ //配置 'host' => [ 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/', ], //交换机 'exchange'=>'word', //路由 'routes' => [], ];
BaseMQ.php
<?php
/**
* Created by PhpStorm.
* User: pc
* Date: 2018/12/13
* Time: 14:11
*/
namespace MyObjSummary\rabbitMQ;
/** Member
* AMQPChannel
* AMQPConnection
* AMQPEnvelope
* AMQPExchange
* AMQPQueue
* Class BaseMQ
* @package MyObjSummary\rabbitMQ
*/
class BaseMQ
{ /** MQ Channel
* @var \AMQPChannel
*/
public $AMQPChannel ;
/** MQ Link
* @var \AMQPConnection
*/
public $AMQPConnection ;
/** MQ Envelope
* @var \AMQPEnvelope
*/
public $AMQPEnvelope ;
/** MQ Exchange
* @var \AMQPExchange
*/
public $AMQPExchange ;
/** MQ Queue
* @var \AMQPQueue
*/
public $AMQPQueue ;
/** conf
* @var
*/
public $conf ;
/** exchange
* @var
*/
public $exchange ;
/** link
* BaseMQ constructor.
* @throws \AMQPConnectionException
*/
public function __construct() { $conf = require 'config.php' ; if(!$conf) throw new \AMQPConnectionException('config error!'); $this->conf = $conf['host'] ; $this->exchange = $conf['exchange'] ; $this->AMQPConnection = new \AMQPConnection($this->conf); if (!$this->AMQPConnection->connect()) throw new \AMQPConnectionException("Cannot connect to the broker!\n");
}
/**
* close link
*/
public function close() { $this->AMQPConnection->disconnect();
}
/** Channel
* @return \AMQPChannel
* @throws \AMQPConnectionException
*/
public function channel() { if(!$this->AMQPChannel) { $this->AMQPChannel = new \AMQPChannel($this->AMQPConnection);
} return $this->AMQPChannel;
}
/** Exchange
* @return \AMQPExchange
* @throws \AMQPConnectionException
* @throws \AMQPExchangeException
*/
public function exchange() { if(!$this->AMQPExchange) { $this->AMQPExchange = new \AMQPExchange($this->channel()); $this->AMQPExchange->setName($this->exchange);
} return $this->AMQPExchange ;
}
/** queue
* @return \AMQPQueue
* @throws \AMQPConnectionException
* @throws \AMQPQueueException
*/
public function queue() { if(!$this->AMQPQueue) { $this->AMQPQueue = new \AMQPQueue($this->channel());
} return $this->AMQPQueue ;
}
/** Envelope
* @return \AMQPEnvelope
*/
public function envelope() { if(!$this->AMQPEnvelope) { $this->AMQPEnvelope = new \AMQPEnvelope();
} return $this->AMQPEnvelope;
}
}ProductMQ.php
<?php
//生产者 P
namespace MyObjSummary\rabbitMQ; require 'BaseMQ.php'; class ProductMQ extends BaseMQ
{ private $routes = ['hello','word']; //路由key
/**
* ProductMQ constructor.
* @throws \AMQPConnectionException
*/
public function __construct() { parent::__construct();
}
/** 只控制发送成功 不接受消费者是否收到
* @throws \AMQPChannelException
* @throws \AMQPConnectionException
* @throws \AMQPExchangeException
*/
public function run() { //频道
$channel = $this->channel(); //创建交换机对象
$ex = $this->exchange(); //消息内容
$message = 'product message '.rand(1,99999); //开始事务
$channel->startTransaction(); $sendEd = true ; foreach ($this->routes as $route) { $sendEd = $ex->publish($message, $route) ; echo "Send Message:".$sendEd."\n";
} if(!$sendEd) { $channel->rollbackTransaction();
} $channel->commitTransaction(); //提交事务
$this->close(); die ;
}
} try{
(new ProductMQ())->run();
}catch (\Exception $exception){
var_dump($exception->getMessage()) ;
}ConsumerMQ.php
<?php
//消费者 C
namespace MyObjSummary\rabbitMQ; require 'BaseMQ.php'; class ConsumerMQ extends BaseMQ
{ private $q_name = 'hello'; //队列名
private $route = 'hello'; //路由key
/**
* ConsumerMQ constructor.
* @throws \AMQPConnectionException
*/
public function __construct() { parent::__construct();
}
/** 接受消息 如果终止 重连时会有消息
* @throws \AMQPChannelException
* @throws \AMQPConnectionException
* @throws \AMQPExchangeException
* @throws \AMQPQueueException
*/
public function run() {
//创建交换机
$ex = $this->exchange(); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$ex->setFlags(AMQP_DURABLE); //持久化
//echo "Exchange Status:".$ex->declare()."\n";
//创建队列
$q = $this->queue(); //var_dump($q->declare());exit();
$q->setName($this->q_name); $q->setFlags(AMQP_DURABLE); //持久化
//echo "Message Total:".$q->declareQueue()."\n";
//绑定交换机与队列,并指定路由键
echo 'Queue Bind: '.$q->bind($this->exchange, $this->route)."\n";
//阻塞模式接收消息
echo "Message:\n"; while(True){ $q->consume(function ($envelope,$queue){ $msg = $envelope->getBody(); echo $msg."\n"; //处理消息
$queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
}); //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
} $this->close();
}
} try{
(new ConsumerMQ)->run();
}catch (\Exception $exception){
var_dump($exception->getMessage()) ;
}