RQ-轻量级Python任务队列
RQ 是一个基于 Redis 的轻量级任务队列,依赖 Redis >= 2.7.0。RQ 将任务、执行结果 pickle 序列化后存储于 Redis 当中,在较小规模的应用中可以替代 Celery 执行异步任务。
另外我觉得比较好的一点是,RQ 的 worker 不会预先读取任务函数。因此,任务函数更改后,不需要重启 RQ 的 worker。同时,推荐以下基于 RQ 的项目:
- rq-scheduler 基于 RQ 的定时任务;
- Flask-RQ2 在 Flask 中集成 RQ 的扩展;
- rq-dashboard RQ 的 web 监控工具,使用 Flask 开发,可以方便的集成到 Flask 应用中。
RQ 的不足在于,依赖于 fork()
进程,因此不能在 Windows 系统中使用。
本文基于 RQ 0.12.0 版本。
QuickStart
安装 RQ:
|
|
编写任务函数:
|
|
创建 RQ 队列:
|
|
调用任务函数
|
|
启动 RQ worker:
|
|
获取结果(需要延时一段时间使异步任务执行完成)
|
|
Queue 队列
任务(job)是一个供 RQ 后台 worker 调用的 Python 函数对象。把该函数和运行参数压入队列的过程称为入队(enqueue)。
新建队列
首先,声明一个任务函数,这里不再赘述。
新建队列(Queue),可以在实例化时根据需要指定 Queue 名称,常见的命名模式是按照优先级命名队列(例如 high
,medium
,low
)。
|
|
入队:enqueue 和 enqueue_call
使用 enqueue(f, *args, **kwargs)
方法将任务入队:
|
|
除此之外,使用 enqueue
入队时,可以接收以下参数控制任务执行:
timeout
任务超时时间,超时后将会被标记为failed
状态。默认单位为秒,可以传入整数,或者能够被转换为整数的字符串,例如2
或者'2’
。另外,也可以传入包含时分秒单位的字符串,例如1h
、3m
和10s
。result_ttl
在 Redis 中存储的任务结果的过期时间,过期后任务结果会被删除,默认 500 秒。ttl
任务加入队列后,被取消之前的等待执行时间;超过该时间后任务会被取消执行。如果设置为 -1,任务将永远不会被取消,一直等待。depends_on
指定另一个依赖任务(或者 job ID),依赖任务执行完毕后,当前任务才会入队。job_id
指定 job_id。at_front
将任务放在队列的前面,即优先插队执行。kwargs
和args
存放传入任务函数的关键字参数和可变参数。
举个例子:
|
|
另外,在一些场景中,入队任务进程可能无法访问在 worker 中运行的源代码,此时该函数也可以传入字符串:
|
|
队列使用
这里介绍一些 Queue 实例的其他 method。
获取队列中任务数量:
|
|
获取队列中任务 job id 列表:
|
|
获取任务实例列表:
|
|
根据 job id 获取任务实例:
|
|
删除队列:
|
|
RQ 依赖于 pickle 来序列化任务存入 Redis,因此只适用于 Python 系统。
获取执行结果
当任务入队时,该 queue.enqueue()
方法返回一个 Job
实例。这是一个可以用来检查运行结果的 proxy 对象。
该实例的 result
属性,在任务未完成时返回 None
,在任务完成后返回任务函数的返回值(前提是有返回值)。
@job 装饰器
使用 Celery @task
的任务函数装饰器。(RQ >= 0.3)
|
|
同步执行
不经过 worker,直接在当前进程中同步阻塞执行任务函数。(RQ >= 0.3.1)
需要在 Queue 实例化时传递参数 is_async=False
。
|
|
注意这种情况下,仍然要建立 Redis 连接以存储任务执行状态。
链式执行
在任务入队时传入 depends_on
参数以保证任务链式执行。(RQ >= 0.4.0)
|
|
任务注意事项
- 确保该函数的
__module__
能够被 worker 导入。这意味着无法将__main__
模块中声明的函数作为任务入队。 - 确保 worker 和 worker 生成器共享完全相同的源代码。
- 确保函数调用不依赖于其上下文。不要在任务函数中使用全局变量、Web应用程序中的 current_user 或者 current_request 对象。当 worker 运行任务函数时,函数所依赖的任何状态都不存在。如果要访问这些信息,应该将这些信息作为参数传递给 worker。
Worker 工作进程
工作进程(worker)是一个通常在后台运行的用于执行阻塞或长时任务的 Python 进程。
启动
RQ worker 基于 fork()
创建新进程。如果不使用 Windows Subsystem for Linux 并在 shell 中运行,那么 RQ 无法在 Windows 系统上执行任务。
启动 Worker
在项目根目录执行:
|
|
Worker 将会在无限循环中从给定的 Queue 中依次读取任务,因此启动 Worker 时 Queue 参数的顺序很重要,应该让高优先级任务 Queue 排在前面。
一个 Worker 每次只能执行一个任务,不能并发处理。如果要同时执行任务,只需要启动多个 worker 即可。
Burst mode 突发模式
默认情况下,Worker 启动后会立即开始处理任务,处理完成后阻塞等待新任务。
使用 —-burst
参数可以让 Worker 以突发模式启动。在此模式下,Worker 会在给定队列清空(即完成所有任务)后退出。
|
|
突发模式可以应用于:
- 定期执行的批量任务,单独开 Worker 执行,在执行完毕后退出;
- 在任务积压时,临时增加 Worker;
启动参数
除 —-burst
外,Worker 还支持以下启动参数:
—-url
-u
指定 Redis 数据库连接,例如redis://:[email protected]:1234/9
—-path
-p
指定 import 路径,可以传多个值;—-config
-c
指定配置文件路径;—-worker-class
-w
指定使用的 RQ Worker 类;--job-class
-j
指定使用的 RQ Job 类;--queue-class
指定使用的 RQ Queue 类;—-connection-class
指定要使用的 Redis 连接类,默认redis.StrictRedis
;—-log-format
指定 Worker 日志格式,默认为'%(asctime)s %(message)s'
;--date-format
指定 Worker 日志的日期时间格式,默认为'%H:%M:%S'
;
生命周期
Worker 的生命周期包括几个阶段:
- Boot。加载Python环境。
- Birth registration。Worker 将自己注册到系统。
- Start listening。从给定的 Redis 队列中取出任务。若所有队列都为空,如果 Worker 以突发模式运行则 Worker 结束运行,否则阻塞等待任务。
- Prepare job execution。Worker 把要执行的任务状态设置为
busy
,并在StartedJobRegistry
中注册该任务,告知系统准备执行该任务 - Fork a child process。Fork 一个子进程(被称为 work horse),该子进程在故障安全上下文(fail-safe context)中执行任务。
- Process work。子进程执行任务。
- Cleanup job execution。Worker 将任务状态设置为
idle
,将任务结果结果存储到 Redis 中并根据result_ttl
设置过期时间。把任务从StartedJobRegistry
中删除,如果执行成功将任务添加到FinishedJobRegistry
中,如果执行失败将任务添加到FailedQueue
中。 - Loop。从第3步开始重复。
提升性能
RQ Worker shell 脚本基本上是一个 fetch-fork-execute 循环。这样做的好处是 RQ 不会泄露内存。但当任务需要进行冗长的设置,或任务都依赖于相同的模块时,每次运行任务都要耗费这一部分时间(因为要在 fork 出新进程后再进行 import)。
可以在 fork 之前就 import 必要的模块,以改进性能。RQ Worker 没有这样的设置项,但你可以在开始 worker loop 之前进行 import。
为此,你要自己实现 Worker 启动脚本,而不是使用 rq worker
。举个例子:
|
|
Worker 信息
Worker
名称一般是主机名和当前 PID 的组合,也可以在启动时通过 —-name
指定。
Worker
实例的运行时信息存储于 Redis 中,可以使用 rq.Worker.all
查询。注意每次查询都会从 Redis 中取信息构建 Worker
实例,也就是说,每次查询得到的实例不是同一内存对象。
|
|
如果只是想得到 Worker 数量,可以使用 rq.Worker.count
方法(RQ >= 0.10.0):
|
|
另外还可以通过 Worker
实例获得一些统计信息(RQ >= 0.9.0)。首先通过 Worker.find_by_key
方法获得 Worker 实例,传参为 Redis key,格式为 rq:worker:<name>
。再通过实例属性查看统计信息。
|
|
停止 Worker
当 Worker 收到 SIGINT
信号(通过 Ctrl + C)或 SIGTERM
信号(通过 kill)时,会等待当前任务运行结束后,结束任务循环并注册自己的死亡。
如果在等待期间再次收到 SIGINT
或者 SIGTERM
信号,Worker 将会发送 SIGKILL
信号强行中止子进程,但仍然会尝试注册自己的死亡。
使用配置文件
要求 RQ >= 0.3.2。
配置文件需要为 Python 文件,在启动 Worker 时通过 -c
参数指定从哪个模块读取配置。以下是配置文件支持的配置项:
|
|
注意: QUEUES
和 REDIS_PASSWORD
设置是0.3.3以后的新设置。
指定配置文件:
|
|
自定义DeathPenalty类
当任务超时时,Worker 将尝试使用 death_penalty_class
(默认值 UnixSignalDeathPenalty
)提供的方法将其终止。如果您希望尝试以特定应用程序或“更干净”的方式杀死任务,则可以覆盖此项。
DeathPenalty 类使用以下参数构造 BaseDeathPenalty(timeout, JobTimeoutException, job_id=job.id)
自定义异常处理程序
要求 RQ >= 0.5.5。
如果要针对不同类型的作业以不同方式处理错误,或者想自定义 默认的错误处理,可以使用 --exception-handler
指定错误处理类:
|
|
Result 执行结果
处理结果
如果一个任务有非 None 的返回值,Worker 会把返回值经过 pickle 序列化后,写入到任务在 Redis 中对应记录(key 为 rq:job:[hash]
格式,value 为 Hash 类型)的 result
字段中,默认将会在 500 秒后失效。
将任务入队时返回的 Job
实例是一个代理对象,绑定了任务 ID,以便能从任务执行结果中取到数据。
若 RQ >= 0.3.1 版本,可以在调用 enqueue
和 enqueue_call
入队时,使用 result_ttl
参数指定存储结果删除时间:
- 不设置,使用默认值 500,500 秒后删除;
- 设置为 0,立即删除;
- 设置为 -1,永不删除,此时要注意自己清理 Redis,以免 Redis 无限增长;
- 设置为其他正整数数值 N,在 N 秒后删除;
|
|
文档中的这一段与实际测试结果不符,实际测试没有返回值的情况下也使用默认的 500 秒的超时时间。
Additionally, you can use this for keeping around finished jobs without return values, which would be deleted immediately by default.
1
q.enqueue(func_without_rv, result_ttl=500) # job kept explicitly
异常处理
任务执行失败时会抛出异常,为了引起足够的注意,失败任务在 Redis 中的记录永不过期。RQ 目前没有可靠的或自动的方法判断某些失败的任务能否安全重试。
失败任务中抛出的异常,会被 Worker 捕获,pickle 序列化后存储到任务在 Redis 中记录的 exc_info
字段中,而 失败任务的 Job
对象则会放入 failed
队列中。
Job
对象本身有一些有用的属性,可以用于辅助检查:
- 任务原始创建时间;
- 最后入队日期;
- 入队始发队列;
- 函数调用的文本描述;
- 异常信息;
可以根据这些信息手动检查和判断问题并可以重新提交任务。
中断处理
当 Worker 进程被以“礼貌的”方式杀死时(Ctrl + C 或 kill
),RQ 会努力不丢失任何任务,会在当前任务处理完成后停止处理新的任务。
但是,Worker 进程也可以通过 kill -9
的方式强行杀死,此时 Worker 不能“优雅地”完成工作,没有时间把任务加入 failed
队列中。因此,强行杀死进程可能会导致异常。
超时处理
默认情况下,任务应该在 180 秒内执行完毕。否则,Worker 会杀死 work horse 进程,并将任务加入到 failed
队列中,表明工作超时。
根据任务需要,我们可以自定义超时时间。
在 Job 维度,入队时使用 timeout
参数设置:
|
|
在 Queue 维度,新建 Queue 时使用 timeout
参数设置,对队列中所有任务都有效。
|
|
Job 维度的设置项优先级高于 Queue 维度。
Jobs 任务
从 Redis 中获取 Job
所有的任务信息都存储于 Redis 中,可以使用 Job.fetch(id, connection=redis)
方法获取 Job 实例:
|
|
该 Job 对象的一些属性包括:
job.status
job.func_name
job.args
job.kwargs
job.result
job.enqueued_at
job.started_at
job.ended_at
job.exc_info
读写当前 Job 实例
由于任务函数是常规的 Python 函数,因此在任务函数中,只能通过 RQ 的 get_current_job
函数获得当前 Job 的实例。
|
|
通过 Job 实例的 meta
属性 和 save_meta()
方法,可以向 Job 实例中写入数据。(RQ >= 0.8.0)
|
|
注意,以上内容只在任务函数中有效。
Job 的等待执行时间
一个任务有两个 TTL:一个用于执行结果,另一个用于任务 Job 本身。后者表示任务在队列中等待多久后会被取消。该 TTL 可以在创建任务或入队时指定:
|
|
设置为 -1
时任务将一直等待,不会被取消。被取消的 Job 会立即被从 Redis 中立即删除。
执行失败的 Job
如果任务执行失败,Worker 会把任务放入 failed
队列中,同时 Redis 中 Job 实例的 is_failed
属性会被置为 True。使用 get_failed_queue
可以获取所有失败的任务。
|
|
Monitoring 监控
rq-dashboard
RQ dashboard 是一个单独分发的,轻量级的 Web 前端监控工具,基于 Flask 开发。
安装方式如下:
|
|
运行:
|
|
与 Flask 集成
|
|
Console 工具
RQ 自带了 console 监控工具,启动命令为 rq info
:
|
|
查询指定队列
通过 rq info queue1 queue2 …
可以返回指定队列的信息:
|
|
按队列展示
默认情况下,rq info
输出活跃 Worker 和它们监听的 Queue。
通过设置 -R
或者 --by-queue
,可以让 RQ 按照队列组织展示,即展示队列和监听队列的 Worker。
|
|
定时轮询
默认情况下,rq info
打印信息后就会退出。可以使用 —-interval
参数指定轮询间隔,以不断刷新监控信息。
|
|
注意,如果 interval 设置的过低,会加重 Redis 的负载。
Connection 连接
**RQ 维护一个 Redis 连接堆栈,每个 RQ 对象实例在创建时,会使用堆栈最顶层的 Redis 连接。**因此我们可以使用 with 上下文管理器创建连接,并在其中新建 RQ 对象实例。
|
|
或者新建 RQ 对象实例时显式地指定连接:
|
|
多 Redis 连接
使用显式连接实现——准确但乏味:
|
|
使用 with 上下文管理器实现:
|
|
push/pop 连接
如果代码不允许使用 with
语句(例如在单元测试中),则可以使用 push_connection()
和 pop_connection()
方法替代上下文管理器。
|
|
结合 Sentinel
要使用 redis sentinel,必须在配置文件中指定字典。将此设置与带有自动重启选项的 systemd 或 docker 容器结合使用,以便允许 worker 和 RQ 通过容错连接(fault-tolerant connection)连接 Redis。
|
|
Exception 异常处理
任务在发生异常时会执行失败,当 RQ Worker 在后台运行时,你怎么才能知道任务失败了呢?
failed
队列
默认情况下,RQ 会将失败的任务放入到 failed
队列中,包含了它们的异常信息(类型,值,堆栈)。这只能被动保存发生的异常,但不会有任何主动通知。
自定义异常处理
RQ支持注册自定义异常处理程序(RQ >= 0.3.1)。这样就可以在发生异常时采取其他步骤,或者替换默认地将失败任务发送到 failed
队列的行为。
在创建 Worker 时,使用 exception_handlers
参数指定异常处理程序列表。
|
|
异常处理 handler 是一个函数,其参数为:
job
任务对象exc_type
异常类型exc_value
异常值traceback
异常堆栈
|
|
或者使用可变参数定义:job
和 *exc_info
。
|
|
链式异常处理
异常处理程序可以决定是否完成处理异常,还是由堆栈中的后续程序继续处理异常。这通过返回值来控制:
- 若返回
True
表示继续,并进入下一个异常处理程序; - 若返回
False
表示停止处理异常; - 若没有返回值,即返回
None
,则认为是True
,继续进入下一个异常处理程序;
如果要替换默认的错误处理行为,错误处理程序应该返回 False
。
|
|
Testing 测试
单元测试中的 Worker
许多框架在执行单元测试时使用内存数据库,这些数据库与 RQ 默认使用的 fork()
不太兼容。
因此在单元测试用,应该使用 SimpleWorker
类来避免 fork()
,且建议以突发模式运行。
|
|
在单元测试中执行任务
另一种解决方案是,在创建队列时使用 is_async=False
参数,使任务在同一个线程中立即执行,而不是将其分配给 Worker。此时不需要启动 RQ worker。
除此之外,还可以使用 FakeStrictRedis 替代 Redis,也不必再启动 Redis 服务器,FakeStrictRedis 的实例可以直接作为连接参数传递给队列。
|
|