前言
先引入php-amqplib包:
1 |
composer require php-amqplib/php-amqplib |
场景
订单超时未支付,关闭订单
用户下单
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public function index() { //创建订单 $order = new Order(); $order->order_sn = date('YmdHis').time(); $order->user_id = 1; $order->product_id = 1; $order->save(); //推送至队列 (new OrderService())->push($order); //返回相关信息 return true; } |
队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
<?php namespace App\Service; use App\Models\Order; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; class OrderService { const HOST = '192.168.1.199'; const PORT = '5672'; const LOGIN = 'guest'; const PASSWORD = 'guest'; const VHOST = '/'; //交换机名称 public $exchangeName = 'laravel_exchange_name'; //普通队列名称和路由key public $queueName = 'laravel_queue_name'; public $routeKey = 'laravel_route_key'; //延迟队列和路由 public $delayQueueName = 'laravel_delay_queue_name'; public $delayRouteKey = 'laravel_delay_route_key'; //延迟时长 public $delaySecond = 10; public $channel; public function __construct() { $connection = new AMQPStreamConnection(self::HOST,self::PORT,self::LOGIN,self::PASSWORD); $this->channel = $connection->channel(); $this->init(); } public function init() { // 声明交换机 $this->channel->exchange_declare($this->exchangeName, 'direct', false, true, false); $this->declareConsumeQueue(); $this->declareDelayQueue(); } //消费队列 private function declareConsumeQueue() { //声明消费队列 $this->channel->queue_declare($this->queueName, false, true, false, false); //绑定交换机及队列 $this->channel->queue_bind($this->queueName, $this->exchangeName, $this->routeKey); } //延迟队列 private function declareDelayQueue() { //设置消息过期时间 $tab = new AMQPTable([ 'x-dead-letter-exchange' => $this->exchangeName, //消息过期后推送至此交换机 'x-dead-letter-routing-key' => $this->routeKey, //消息过期后推送至此路由地址 //也就是我们消费的正常队列 与①对应 'x-message-ttl' => intval($this->delaySecond) * 1000, //10秒 ]); //声明延迟队列 $this->channel->queue_declare($this->delayQueueName,false,true,false,false,false,$tab); //绑定交换机及延迟队列 $this->channel->queue_bind($this->delayQueueName, $this->exchangeName, $this->delayRouteKey); } //入队列 public function push($order) { $message = json_encode([ 'id' => $order->id ]); //创建消息 $msg = new AMQPMessage($message, [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, ]); //推送至队列 //消息 //交换机名称 //路由 推送至延迟队列中 $this->channel->basic_publish($msg, $this->exchangeName, $this->delayRouteKey); } //出队列 public function consume() { //消费 普通消费队列 //① $this->channel->basic_consume($this->queueName, '', false, false, false, false, [$this, 'process_message']); while (count($this->channel->callbacks)) { $this->channel->wait(); } } //开始消费 public function process_message($message) { $obj = json_decode($message->body); try { $order = Order::find($obj->id); if (strtotime($order->created_at) + $this->delaySecond > time()){ throw new \Exception('取消订单时间未到', 404); } //更改数据库状态 $order->status = 10; $order->colsed_at = date('Y-m-d H:i:s'); $res = $order->save(); if (!$res){ throw new \Exception('取消订单失败', 404); } } catch (\Exception $e) { //记录日志 } //确认消息处理完成 $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); } } |
用户下单:
当用户下单以后,延迟队列就会出现一条待消费的记录,这里队列名称和我们代码中生成的名称一致,laravel_delay_queue_name
当消息过期以后,此消息就会被推送至我们设置好的队列中,也就是 laravel_queue_name,从而会被消费掉,达到超时未支付,取消订单的效果
定义调度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
<?php namespace App\Console; use App\Console\Commands\OrderNopay; use Illuminate\Console\Scheduling\Schedule; use Illuminate\Foundation\Console\Kernel as ConsoleKernel; class Kernel extends ConsoleKernel { //省略其他代码 protected $commands = [ OrderNopay::class, ]; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
<?php namespace App\Console\Commands; use App\Service\OrderService; use Illuminate\Console\Command; class OrderNopay extends Command { protected $signature = 'order:nopay'; protected $description = '订单超时未支付'; public function __construct() { parent::__construct(); } public function handle(OrderService $order) { $order->consume(); } } |
执行
可以使用 php artisan order:nopay
命令,或者通过 supervisor
来跑
最后
生产者通过代码把数据发布到RabbitMQ,消费者通过命令在后台监听。