Laravel中的异步处理服务

导语:有了事件与队列,异步处理真easy

一、前言

  往往业务场景中需要具备这些特性:轻量、种类多、自产自销、准实时,但Kafka这种重型队列中间件往往是用不着的。杀鸡焉用牛刀,采用Laravel作为基础框架,该框架自带优良的事件与队列处理机制,已经可以很好地支持上述业务场景。

二、任务如何生成

  异步任务的生成借助了Laravel的事件系统机制,该事件系统提供了一个简单的观察者的实现,能够监听和订阅应用中发生的各种事件。简单地说,事件系统由事件和监听器两部分组成,其中又把能监听多种事件的监听器称为订阅者,一种事件能被多个监听器监听,一个订阅者也能订阅多种事件。这种事件机制让不同的应用彼此隔离,实现代码层解耦。事件可以定义成字符串或者事件类,监听器则可定义为监听闭包或者监听类。

1. 服务注册

  事件服务注册其实是将事件与相关的监听者关联起来,称为绑定。Laravel的EventServiceProvider类提供了绑定的场所,其中的Listen数组用于绑定事件与监听器的关系,subscribe数组用于注册所有订阅者。具体的实现文件一般是:将事件放在app/Events路径下,将监听器放在app/Listeners路径下,将订阅者放在app/Subscribers路径下。

  比如,我们在EventServiceProvider可以这样注册事件、监听器和订阅者:

protected $listen = [ //绑定事件和监听器
    'App\Events\DoSomethingAEvent' => ['App\Listeners\NotifySomeOneAListener'],
    'App\Events\DoSomethingBEvent' => ['App\Listeners\NotifySomeOneBListener','App\Listeners\NotifySomeOneCListener'],
];
    
protected $subscribe = [ //注册订阅者
    'App\Subscribers\ReceiveSomethingASubscriber',
    'App\Subscribers\ReceiveSomethingBSubscriber',
];

2. 任务定义

  Laravel的事件就是保存任务信息的容器,一个最基本的事件类如下:它的构造函数接收一个Object类对象作为参数,并赋值给自身的成员变量$object

class SimpleEvent
{
    use SerializesModels;

    public $object;
    
    /**
     * Create a new event instance.
     */
    public function __construct(Object $obj)
    {
        $this->object$obj;
    }
}

  如果我们要想这个事件能携带更多的功能,可以添加上相应的trait,Laravel提供多种trait可供选用:SerializesModels用于序列化Eloquent模型,InteractsWithSockets用于websockets通信,Dispatchable用于异步队列分发。

  任务事件抛出后,有时候需要多方响应处理,这个时候监听器的工作也就来了。下面介绍如何定义一个简单的监听器:

class SimpleOperationListener
{
    // 队列定义在指定配置文件中
    const QUEUE'params.artisan_queue.simple_operation';

    /**
     * SimpleOperationListener constructor.
     */
    public function __construct()
    {
    }

    /**
     * Handle the event.
     * @param SimpleEvent $event
     */
    public function handle(SimpleEvent $event)
    {
        if (Env::isDevelopment()) { //开发环境:同步处理
            (new SimpleOperationJob($event))->handle();
        } else {                    //其他环境:异步处理
            dispatch((new SimpleOperationJob($event))->onQueue(config(self::QUEUE));
        }
    }
}

  例子中的SimpleOperationListener监听到SimpleEvent事件后,会主动执行handle方法。这里面引入了一个SimpleOperationJob的job类,它承担所有与任务处理相关的业务,也与Listener相互对应,这样的拆分让Listener无需关心业务处理逻辑而是专注于监听事件。注意到handle里根据所处环境不同分成了同步与异步两种处理方式,其中异步处理时,使用了dispatch的公共函数,这个函数用于将任务分发到队列,具体留待下面介绍。

3. 任务分发

  任务分发主要分为两步:一是业务方将任务事件抛出(事件分发到监听器),二是任务接收方将事件分发到处理方(监听器调用工作类)。

  第一步,用到了Laravel的全局辅助函数event(),它通过调用事件系统的Facade的方法将事件分发到所有已经注册的与之相关联的监听器上。比如:上个例子中event(new SimpleEvent($object))即分发了SimpleEvent事件。

    /**
     * Dispatch an event and call the listeners.
     *
     * @param  string|object  $event
     * @param  mixed  $payload
     * @param  bool  $halt
     * @return array|null
     */
    function event(...$args)
    {
        return app('events')->dispatch(...$args);
    }

  第二步,我们可以选择异步或者同步处理方式,对于异步处理,这里用到了Laravel的全局辅助函数dispatch(),该函数会将任务分配到指定的处理器上执行,同时要求处理器实现shouldQueue接口(下文会介绍)。如果需要采用不同的队列,dispatcher还可以指定队列驱动或者是队列名称,分别可用onCollectiononQueue来处理,比如:dispatch((new SimpleOperationJob($event))->onCollection('redis')->onQueue('default')是将任务分发到redis引擎的default队列上。目前Laravel提供了file、mysql、redis等队列驱动,经验表明:推荐使用redis做队列引擎。

    /**
     * Dispatch a job to its appropriate handler.
     *
     * @param  mixed  $job
     * @return \Illuminate\Foundation\Bus\PendingDispatch
     */
    function dispatch($job)
    {
        return new PendingDispatch($job);
    }

  至此,我们的任务主要通过Laravel事件系统已经分发到指定队列上了。

三、任务如何存储

  任务存储和处理主要涉及到Laravel队列系统的工作。队列存储引擎我们放弃使用mysql类的DB,是因为考虑到队列服务是读写峰值都较高的操作,由于DB事务锁的缘故,队列服务常常被阻塞,性能很容易遇到瓶颈。事实也证明,使用redis性能要好很多。

1. redis驱动配置

  为了使用redis做队列驱动,我们需要在Laravel配置文件中添加上几个配置项:

  首先,数据库配置文件中添加上redis连接配置:config/database.php中,添加如下

'redis' => [
    'client' => 'predis'// default timeout is 5.0 http://github.com/nrk/predis/wiki/Connection-Parameters
    
    // redis队列驱动
    'database_redis_queue' => [
        'host'               =>  '127.0.0.1',
        'port'               =>  6379,
        'timeout'            => 5.0,
        'read_write_timeout' => 60,
    ],
]

  然后,队列配置文件中添加redis驱动配置以及失败任务存储table:config/queue.php中,添加如下:

// 队列驱动连接
'connections' => [
    'redis_queue' => [
        'driver'      => 'redis',
        'connection'  => 'database_redis_queue',
        'queue'       => '{default}',
        'retry_after' => 90,
    ],
]

// 失败任务存储在mysql驱动的failed_jobs表中
'failed' => [
    'database' => 'mysql',
    'table'    => 'failed_jobs',
],

  最后,修改默认队列驱动为新配置的redis_queueconfg/queue.php中,修改如下:

// 默认驱动改为redis_queue
'default' => 'redis_queue'`

2. redis队列运转流程

  队列系统可以指定任务即时分发与延时分发,如果想延迟执行一个队列中的任务,我们可以用任务实例的delay方法。例如,我们指定一个任务在分配后1分钟内不可被处理:

dispatch((new SimpleOperationJob($event))->delay(Carbon::now()->addMinutes(1));

  Laravel的redis队列系统的运转流程图如下所示,以我们设置的default队列为例,redis会自动创建三种数据结构:queue:default默认队列(list类型)、queue:default:reserved待处理队列(zset类型)、queue:default:delayed延迟队列(zset类型)。

image.png   具体流程是:工作进程默认从queue:default中取任务,如果有任务,先把任务暂存到queues:default:reserved中(过期时间60秒,Redis Queue里面写一个任务最多执行60秒)。任务执行成功后会把queues:default:reserved 中的任务删除,如果执行失败,也会把queues:default:reserved中的任务删除,然后把任务扔进 queues:default:delayed,delay 1分钟(因为我们上面参数配置的是延迟1分钟)后再处理。

四、任务如何处理

1. 任务处理器

  异步任务处理器类需要实现ShouldQueue接口,我们可以通过引入相关的trait来实现。比如:SerializesModels用于序列化Eloquent模型,InteractsWithQueu用于队列交互,Dispatchable用于异步队列分发,Queueable实现了队列的连接方式。下面说明了一个基础的处理器类的定义,其中failed方法会在任务执行失败时接收处理进程抛出的异常,我们可以在此定义相关的通知服务。

class BaseJob implements ShouldQueue
{
    use DispatchableInteractsWithQueueQueueableSerializesModels;
    
    /**
     * 任务最大尝试次数。
     * @var int
     */
    public $tries 5;
    
    /**
     * 任务运行的超时时间。
     * @var int
     */
    public $timeout 120;

    public function __construct()
    {
        //
    }

    /**
     * 执行任务
     */
    public function handle()
    {
        //
    }
    
    /**
     * 要处理的失败任务。
     * @param  Exception  $exception
     */
    public function failed(Exception $exception)
    {
        // 给用户发送失败通知,等等...
    }
}

2. 任务处理进程

  任务分发到指定的队列后,需要由常驻的daemon进程来处理队列任务,比如,我们可以使用如下命令来启动一个Laravel artisan常驻进程。其中,--queue指定队列,--tries表示失败后的重试次数,--sleep表示进程睡眠时间,--daemon表示进程以daemon形式运行。其他命令可参考官网。

php artisan queue:work --queue=default --tries=3 --sleep=5 --daemon

如果要查看重试多次后仍然失败的任务,可以用命令:php artisan queue:failed

3. 任务调度流程

  任务队列工作流程如下示意图,daemon进程在执行完上一个任务,等待下一个任务时,会监听外界暂停、重启或退出等命令,如判断是上述几种命令则工作任务中断,执行暂停或退出操作,否则取出任务继续执行。

image.png

五、任务如何管理

  由于Laravel的daemon进程可能会出现异常退出的情况,这里我们采用Supervisor来进行异步任务进程的管理。Supervisor能够监控业务进程的运行状态,必要时会自动拉起异常退出的进程,并且它还具备良好的进程管理操作界面。更多的Supervisor操作可以参照:

  可以为不同的项目建立不同Supervisor配置进程配置文件,比如百草的配置文件为:script/supervisor/article-pools.conf,这里可以定义若干后台进程的管理配置,比如:

; 队列群组
[group:article-pools]
programs=queue_default

; 默认队列进程
[program:queue_default]
directory=/data/project
command=php artisan queue:work --queue=default --tries=3 --sleep=5 --daemon
process_name=%(program_name)s-%(process_num)02d
numprocs=1
user=nobody
umask=011
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=logs/queue/default.log
stdout_logfile_maxbytes=20MB
stdout_logfile_backups=10

六、结语

  本文介绍了如何Laravel事件和队列系统实现的异步处理服务,也从原理、配置、书写代码等角度详细介绍了事件系统与队列系统如何使用。最后,文中若有不当之处,欢迎指正!若有使用Laravel的同仁欢迎一起交流。

全部评论

相关推荐

点赞 收藏 评论
分享
牛客网
牛客企业服务