架构设计图:
实操:
一、安装tp6
// 克隆 git clone https://gitee.com/top-think/think // 安装依赖 cd think && composer install
二、安装阿里云AMPQ环境
// 安装 stomp composer require stomp-php/stomp-php
三、TP6对接AMPQ,执行命令
php think make:command Aliyun_stomp
这时候会自动生成 app\command\Aliyun_stomp.php文件,在其修改内容后如下:
<?php declare (strict_types=1); namespace app\command; use think\console\Command; use think\console\Input; use think\console\input\Argument; use think\console\input\Option; use think\console\Output; use think\facade\Config; use Stomp\Client; use Stomp\Network\Observer\Exception\HeartbeatException; use Stomp\Network\Observer\ServerAliveObserver; use Stomp\StatefulStomp; class Aliyun_stomp extends Command { protected function configure() { // 指令配置 $this->setName('app\command\aliyun_stomp') ->setDescription('阿里云物联网AMPQ订阅'); } protected function execute(Input $input, Output $output) { // 指令输出 $output->writeln('start'); //参数说明,请参见AMQP客户端接入说明文档。 $accessKey = Config::get('app.config.accessKey'); $accessSecret = Config::get('app.config.accessSecret'); $consumerGroupId = "DEFAULT_GROUP"; //iotInstanceId:实例ID。 $iotInstanceId = Config::get('app.config.iotInstanceId'); //随意填写 $clientId = "12346989"; $timeStamp = round(microtime(true) * 1000); //签名方法:支持hmacmd5,hmacsha1和hmacsha256。 $signMethod = "hmacsha1"; //userName组装方法,请参见AMQP客户端接入说明文档。 //若使用二进制传输,则userName需要添加encode=base64参数,服务端会将消息体base64编码后再推送。具体添加方法请参见下一章节“二进制消息体说明”。 $userName = $clientId . "|authMode=aksign" . ",signMethod=" . $signMethod . ",timestamp=" . $timeStamp . ",authId=" . $accessKey . ",iotInstanceId=" . $iotInstanceId . ",consumerGroupId=" . $consumerGroupId . "|"; $signContent = "authId=" . $accessKey . "×tamp=" . $timeStamp; //计算签名,password组装方法,请参见AMQP客户端接入说明文档。 $password = base64_encode(hash_hmac("sha1", $signContent, $accessSecret, $raw_output = TRUE)); //接入域名,请参见AMQP客户端接入说明文档。下方 123456 替换为你的阿里云账号id, cn-shanghai 替换为你的地区代码 若是PHP开发,端口号是 61614 $client = new Client('ssl://'.Config::get('app.config.accountId').'.iot-amqp.cn-shanghai.aliyuncs.com:61614'); $sslContext = ['ssl' => ['verify_peer' => true, 'verify_peer_name' => false],]; $client->getConnection()->setContext($sslContext); //服务端心跳监听。 $observer = new ServerAliveObserver(); $client->getConnection()->getObservers()->addObserver($observer); //心跳设置,需要云端每50s发送一次心跳包。 $client->setHeartbeat(0, 5000); $client->setLogin($userName, $password); try { $client->connect(); } catch (StompException $e) { echo "failed to connect to server, msg:" . $e->getMessage(), PHP_EOL; } //无异常时继续执行。 $stomp = new StatefulStomp($client); $stomp->subscribe('/topic/#'); $output->writeln('connect success .'); while (true) { try { // 检查连接状态 if (!$client->isConnected()) { echo "connection not exists, will reconnect after 10s.", PHP_EOL; sleep(10); $client->connect(); $stomp->subscribe('/topic/#'); echo "connect success.", PHP_EOL; } $msg = $stomp->read(); if (($msg) != "") { // 处理消息业务逻辑。 $output->writeln("Read Topic:" . $msg->getHeaders()["topic"]); $output->writeln("Read PayLoad:" . $msg->getBody()); } } catch (HeartbeatException $e) { echo 'The server failed to send us heartbeats within the defined interval.', PHP_EOL; $stomp->getClient()->disconnect(); } catch (Exception $e) { echo 'process message occurs error: ' . $e->getMessage(), PHP_EOL; $stomp->getClient()->disconnect(); } } } }
在app\config\console修改指令定义,指向具体的文件:
<?php return [ // 指令定义 'commands' => [ 'start_ampq' => 'app\command\Aliyun_stomp', ], ];其次,代码部分结束, 切换到命令行工具,定位到我们的项目根目录(think文件所在的目录),执行:
php think start_ampq
如设备端上报信息,控制台就会打印出来了。