教你运用mixphp打造多进程异步邮件发送
留意:这个是 MixPHP V1 的典范
邮件发送是很常见的需求,由于发送邮件的操纵一样是比力耗时的,所以我们一样采纳异步处置来晋升会员体验,而异步平常我们使用新闻队列来实现。
传统 MVC 框架由于缺少多进程开发能力,平常是采纳统一个足本施行屡次,发生多个进程的方式,mixphp 封装了 TaskExecutor 专用于多进程开发,会员能非常简便的开发出功效完美的高可用多进程利用。
引荐:《PHP视频教程》
下面演示一个异步邮件发送系统的开发历程,触及知识点:
- 异步
- 新闻队列
- 多进程
- 守护进程
怎样使用新闻队列实现异步
PHP 使用新闻队列平常是使用中心件来实现,常用的新闻中心件有:
- redis
- rabbitmq
- kafka
本次我们选用 redis 来实现异步邮件发送,redis 的数据类型中有一个 list 类型,可实现新闻队列,使用以下命令:
// 入列 $redis->lpush($key, $data); // 出列 $data = $redis->rpop($key); // 堵塞出列 $data = $redis->brpop($key, 10);
架构设计
本实例由传统 MVC 框架送达邮件发送需求,MixPHP 多进程施行发送任务。
邮件发送库选型
以往我们平常使用框架供给的邮件发送库,或者网上下载别的会员分享的库,composer 显现后,https://packagist.org/ 上有大量优良的库,我们只需选中一个最好的即可,本例选中 swiftmailer。
由于发送任务是由 MixPHP 施行,所以 swiftmailer 是安置在 MixPHP 项目中,在项目根名目中施行以下命令安置:
composer require swiftmailer/swiftmailer
生产者开发
在邮件发送这个需求中生产者是指送达发送任务的一方,这一方平常是一个接口或网页,这个部分并不必然需 mixphp 开发,TP、CI、YII 这些都可以,只需在接口或网页中把任务信息送达到新闻队列中即可。
在传统 MVC 框架的操纵器中增添如下代码:
平常框架中使用 redis 会安置一个类库来使用,本例使用原生代码,便于懂得。
// 连接 $redis = new \Redis(); if (!$redis->connect('127.0.0.1', 6379)) { throw new \Exception('Redis Connect Failure'); } $redis->auth(''); $redis->select(0); // 送达任务 $data = [ 'to' => ['***@qq.com' => 'A name'], 'body' => 'Here is the message itself', 'subject' => 'The title content', ]; $redis->lpush('queue:email', serialize($data));
平常异步开发中,送达完成后就会马上响应一个新闻给会员,当然此时该任务并没有施行。
消耗者开发
本例我们使用 MixPHP 的多进程开发工具 TaskExecutor 来完成这个需求,平常使用常驻进程来处置队列的消耗,所以我们使用 TaskExecutor 的 TYPE_DAEMON 类型,MODE_PUSH 模式。
TaskExecutor 的 MODE_PUSH 模式有二种进程:
左进程:负责从新闻队列取出任务数据,投放给中进程。
中进程:负责施行邮件发送任务。
PushCommand.php 代码如下:
<?php namespace apps\daemon\commands; use mix\console\ExitCode; use mix\facades\Input; use mix\facades\Redis; use mix\task\CenterProcess; use mix\task\LeftProcess; use mix\task\TaskExecutor; /** * 推送模式典范 * @author 刘健 <coder.liu@qq.com> */ class PushCommand extends BaseCommand { // 配置信息 const HOST = 'smtpdm.aliyun.com'; const PORT = 465; const SECURITY = 'ssl'; const USERNAME = '****@email.***.com'; const PASSWORD = '****'; // 初始化事件 public function onInitialize() { parent::onInitialize(); // TODO: Change the autogenerated stub // 猎取程序名称 $this->programName = Input::getCommandName(); // 设定pidfile $this->pidFile = "/var/run/{$this->programName}.pid"; } /** * 猎取效劳 * @return TaskExecutor */ public function getTaskService() { return create_object( [ // 类途径 'class' => 'mix\task\TaskExecutor', // 效劳名称 'name' => "mix-daemon: {$this->programName}", // 施行类型 'type' => \mix\task\TaskExecutor::TYPE_DAEMON, // 施行模式 'mode' => \mix\task\TaskExecutor::MODE_PUSH, // 左进程数 'leftProcess' => 1, // 中进程数 'centerProcess' => 5, // 任务超不时间 (秒) 'timeout' => 5, ] ); } // 启动 public function actionStart() { // 预处置 if (!parent::actionStart()) { return ExitCode::UNSPECIFIED_ERROR; } // 启动效劳 $service = $this->getTaskService(); $service->on('LeftStart', [$this, 'onLeftStart']); $service->on('CenterStart', [$this, 'onCenterStart']); $service->start(); // 返回退出码 return ExitCode::OK; } // 左进程启动事件回调函数 public function onLeftStart(LeftProcess $worker) { try { // 模型内使用长连接版本的数据库组件,这样组件会主动帮你保护连接不竭线 $queueModel = Redis::getInstance(); // 保持任务施行状态,轮回完毕后当前进程会退出,主进程会重新启动一个新进程连续施行任务,这样做是为了不长时间施行内存溢出 for ($j = 0; $j < 16000; $j++) { // 从新闻队列中心件堵塞猎取一条新闻 $data = $queueModel->brpop('queue:email', 10); if (empty($data)) { continue; } list(, $data) = $data; // 将新闻推送给中进程去处置,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html) $worker->push($data, false); } } catch (\Exception $e) { // 歇息一会,幸免 CPU 显现 100% sleep(1); // 抛出错误 throw $e; } } // 中进程启动事件回调函数 public function onCenterStart(CenterProcess $worker) { // 保持任务施行状态,轮回完毕后当前进程会退出,主进程会重新启动一个新进程连续施行任务,这样做是为了不长时间施行内存溢出 for ($j = 0; $j < 16000; $j++) { // 从进程新闻队列中抢占一条新闻 $data = $worker->pop(); if (empty($data)) { continue; } // 处置新闻 try { // 处置新闻,比方:发送短信、发送邮件、微信推送 var_dump($data); $ret = self::sendEmail($data); var_dump($ret); } catch (\Exception $e) { // 回退数据到新闻队列 $worker->rollback($data); // 歇息一会,幸免 CPU 显现 100% sleep(1); // 抛出错误 throw $e; } } } // 发送邮件 public static function sendEmail($data) { // Create the Transport $transport = (new \Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY)) ->setUsername(self::USERNAME) ->setPassword(self::PASSWORD); // Create the Mailer using your created Transport $mailer = new \Swift_Mailer($transport); // Create a message $message = (new \Swift_Message($data['subject'])) ->setFrom([self::USERNAME => '**网']) ->setTo($data['to']) ->setBody($data['body']); // Send the message $result = $mailer->send($message); return $result; } }
测试
1.在 shell 中启动 push 常驻程序。[root@localhost bin]# ./mix-daemon push start mix-daemon 'push' start successed.1.调取接口往新闻队列投听任务。
此时 shell 终端将打印:
成功收到测试邮件:
MixPHP
GitHub: https://github.com/mix-php/mix
官网:http://www.mixphp.cn/
以上就是教你使用mixphp打造多进程异步邮件发送的具体内容,更多请关注百分百源码网其它相关文章!