RQ-轻量级Python任务队列

RQ 是一个基于 Redis 的轻量级任务队列,依赖 Redis >= 2.7.0。RQ 将任务、执行结果 pickle 序列化后存储于 Redis 当中,在较小规模的应用中可以替代 Celery 执行异步任务。

另外我觉得比较好的一点是,RQ 的 worker 不会预先读取任务函数。因此,任务函数更改后,不需要重启 RQ 的 worker。同时,推荐以下基于 RQ 的项目:

  • rq-scheduler 基于 RQ 的定时任务;
  • Flask-RQ2 在 Flask 中集成 RQ 的扩展;
  • rq-dashboard RQ 的 web 监控工具,使用 Flask 开发,可以方便的集成到 Flask 应用中。

RQ 的不足在于,依赖于 fork() 进程,因此不能在 Windows 系统中使用

本文基于 RQ 0.12.0 版本。

QuickStart

安装 RQ:

1
pipenv install rq

编写任务函数:

1
2
3
4
5
import requests

def count_words_at_url(url):
resp = requests.get(url)
return len(resp.text.split())

创建 RQ 队列:

1
2
3
4
from redis import Redis
from rq import Queue

q = Queue(connection=Redis())

调用任务函数

1
2
3
from my_module import count_words_at_url

result = q.enqueue(count_words_at_url, 'http://python-rq.org')

启动 RQ worker:

1
2
3
4
5
6
7
$ rq worker
22:52:34 RQ worker 'rq:worker:InvokerPro.10689' started, version 0.12.0
22:52:34 *** Listening on default...
22:52:34 Cleaning registries for queue: default
22:52:34 default: job.count_words_at_url('http://python-rq.org') (c3b8fcab-347f-4529-ab4f-0865407fbaa2)
22:52:34 default: Job OK (c3b8fcab-347f-4529-ab4f-0865407fbaa2)
22:52:34 Result is kept for 500 seconds

获取结果(需要延时一段时间使异步任务执行完成)

1
2
3
4
5
6
7
8
import time

for i in range(20):
if result.
while result.status != 'finished':
time.sleep(1)

print(result.result)

Queue 队列

任务(job)是一个供 RQ 后台 worker 调用的 Python 函数对象。把该函数和运行参数压入队列的过程称为入队(enqueue)。

新建队列

首先,声明一个任务函数,这里不再赘述。

新建队列(Queue),可以在实例化时根据需要指定 Queue 名称,常见的命名模式是按照优先级命名队列(例如 highmediumlow)。

1
q = Queue('low', connection=redis_conn)

入队:enqueue 和 enqueue_call

使用 enqueue(f, *args, **kwargs) 方法将任务入队:

1
q.enqueue(job_func, job_arg1, job_arg2, job_kwarg1=value)

除此之外,使用 enqueue 入队时,可以接收以下参数控制任务执行:

  • timeout 任务超时时间,超时后将会被标记为 failed 状态。默认单位为秒,可以传入整数,或者能够被转换为整数的字符串,例如 2 或者 '2’。另外,也可以传入包含时分秒单位的字符串,例如 1h3m10s
  • result_ttl 在 Redis 中存储的任务结果的过期时间,过期后任务结果会被删除,默认 500 秒。
  • ttl 任务加入队列后,被取消之前的等待执行时间;超过该时间后任务会被取消执行。如果设置为 -1,任务将永远不会被取消,一直等待。
  • depends_on 指定另一个依赖任务(或者 job ID),依赖任务执行完毕后,当前任务才会入队。
  • job_id 指定 job_id。
  • at_front 将任务放在队列的前面,即优先插队执行。
  • kwargsargs 存放传入任务函数的关键字参数和可变参数。

举个例子:

1
2
3
4
5
q.enqueue_call(
func=count_words_at_url,
args=('http://nvie.com',),
timeout=30
)

另外,在一些场景中,入队任务进程可能无法访问在 worker 中运行的源代码,此时该函数也可以传入字符串:

1
q.enqueue('my_package.my_module.my_func', 3, 4)

队列使用

这里介绍一些 Queue 实例的其他 method。

获取队列中任务数量:

1
len(q)

获取队列中任务 job id 列表:

1
q.job_ids

获取任务实例列表:

1
q.jobs

根据 job id 获取任务实例:

1
q.fetch_job('my_id')

删除队列:

1
q.delete(delete_jobs=True)		# delete_jobs=True时,也会删除队列中的所有任务

RQ 依赖于 pickle 来序列化任务存入 Redis,因此只适用于 Python 系统。

获取执行结果

当任务入队时,该 queue.enqueue() 方法返回一个 Job 实例。这是一个可以用来检查运行结果的 proxy 对象。

该实例的 result 属性,在任务未完成时返回 None,在任务完成后返回任务函数的返回值(前提是有返回值)。

@job 装饰器

使用 Celery @task 的任务函数装饰器。(RQ >= 0.3)

1
2
3
4
5
6
7
8
9
from rq.decorators import job

@job('low', connection=my_redis_conn, timeout=5)
def add(x, y):
return x + y

job = add.delay(3, 4)
time.sleep(1)
print(job.result)

同步执行

不经过 worker,直接在当前进程中同步阻塞执行任务函数。(RQ >= 0.3.1)

需要在 Queue 实例化时传递参数 is_async=False

1
2
3
4
>>> q = Queue('low', is_async=False, connection=my_redis_conn)
>>> job = q.enqueue(fib, 8)
>>> job.result
21

注意这种情况下,仍然要建立 Redis 连接以存储任务执行状态。

链式执行

在任务入队时传入 depends_on 参数以保证任务链式执行。(RQ >= 0.4.0)

1
2
3
4
5
q = Queue('low', connection=my_redis_conn)

report_job = q.enqueue(generate_report)

q.enqueue_call(func=send_report, depends_on=report_job)

任务注意事项

  • 确保该函数的 __module__ 能够被 worker 导入。这意味着无法将 __main__ 模块中声明的函数作为任务入队。
  • 确保 worker 和 worker 生成器共享完全相同的源代码
  • 确保函数调用不依赖于其上下文。不要在任务函数中使用全局变量、Web应用程序中的 current_user 或者 current_request 对象。当 worker 运行任务函数时,函数所依赖的任何状态都不存在。如果要访问这些信息,应该将这些信息作为参数传递给 worker。

Worker 工作进程

工作进程(worker)是一个通常在后台运行的用于执行阻塞或长时任务的 Python 进程。

启动

RQ worker 基于 fork() 创建新进程。如果不使用 Windows Subsystem for Linux 并在 shell 中运行,那么 RQ 无法在 Windows 系统上执行任务。

启动 Worker

在项目根目录执行:

1
2
3
4
5
6
$ rq worker high normal low
09:07:05 RQ worker 'rq:worker:InvokerPro.11898' started, version 0.12.0
09:07:05 *** Listening on high, normal, low...
09:07:05 Cleaning registries for queue: high
09:07:05 Cleaning registries for queue: normal
09:07:05 Cleaning registries for queue: low

Worker 将会在无限循环中从给定的 Queue 中依次读取任务,因此启动 Worker 时 Queue 参数的顺序很重要,应该让高优先级任务 Queue 排在前面。

一个 Worker 每次只能执行一个任务,不能并发处理。如果要同时执行任务,只需要启动多个 worker 即可。

Burst mode 突发模式

默认情况下,Worker 启动后会立即开始处理任务,处理完成后阻塞等待新任务。

使用 —-burst 参数可以让 Worker 以突发模式启动。在此模式下,Worker 会在给定队列清空(即完成所有任务)后退出。

1
2
3
4
5
6
7
$ rq worker --burst high normal low
09:24:03 RQ worker 'rq:worker:InvokerPro.13525' started, version 0.12.0
09:24:03 *** Listening on high, normal, low...
09:24:03 Cleaning registries for queue: high
09:24:03 Cleaning registries for queue: normal
09:24:03 Cleaning registries for queue: low
09:24:03 RQ worker 'rq:worker:InvokerPro.13525' done, quitting

突发模式可以应用于:

  • 定期执行的批量任务,单独开 Worker 执行,在执行完毕后退出;
  • 在任务积压时,临时增加 Worker;

启动参数

—-burst 外,Worker 还支持以下启动参数:

  • —-url -u 指定 Redis 数据库连接,例如 redis://:secrets@example.com:1234/9
  • —-path -p 指定 import 路径,可以传多个值;
  • —-config -c 指定配置文件路径;
  • —-worker-class -w 指定使用的 RQ Worker 类;
  • --job-class -j 指定使用的 RQ Job 类;
  • --queue-class 指定使用的 RQ Queue 类;
  • —-connection-class 指定要使用的 Redis 连接类,默认 redis.StrictRedis
  • —-log-format 指定 Worker 日志格式,默认为 '%(asctime)s %(message)s'
  • --date-format 指定 Worker 日志的日期时间格式,默认为 '%H:%M:%S'

生命周期

Worker 的生命周期包括几个阶段:

  1. Boot。加载Python环境。
  2. Birth registration。Worker 将自己注册到系统。
  3. Start listening。从给定的 Redis 队列中取出任务。若所有队列都为空,如果 Worker 以突发模式运行则 Worker 结束运行,否则阻塞等待任务。
  4. Prepare job execution。Worker 把要执行的任务状态设置为 busy,并在 StartedJobRegistry 中注册该任务,告知系统准备执行该任务
  5. Fork a child process。Fork 一个子进程(被称为 work horse),该子进程在故障安全上下文(fail-safe context)中执行任务。
  6. Process work。子进程执行任务。
  7. Cleanup job execution。Worker 将任务状态设置为 idle,将任务结果结果存储到 Redis 中并根据 result_ttl 设置过期时间。把任务从 StartedJobRegistry 中删除,如果执行成功将任务添加到 FinishedJobRegistry 中,如果执行失败将任务添加到 FailedQueue 中。
  8. Loop。从第3步开始重复。

提升性能

RQ Worker shell 脚本基本上是一个 fetch-fork-execute 循环。这样做的好处是 RQ 不会泄露内存。但当任务需要进行冗长的设置,或任务都依赖于相同的模块时,每次运行任务都要耗费这一部分时间(因为要在 fork 出新进程后再进行 import)。

可以在 fork 之前就 import 必要的模块,以改进性能。RQ Worker 没有这样的设置项,但你可以在开始 worker loop 之前进行 import。

为此,你要自己实现 Worker 启动脚本,而不是使用 rq worker。举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python
import sys
from rq import Connection, Worker

# 提前导入必要模块
import library_that_you_want_preloaded

# qs 用于获取 Queue 名
with Connection():
qs = sys.argv[1:] or ['default']

w = Worker(qs)
w.work()

Worker 信息

Worker 名称一般是主机名和当前 PID 的组合,也可以在启动时通过 —-name 指定。

Worker 实例的运行时信息存储于 Redis 中,可以使用 rq.Worker.all 查询。注意每次查询都会从 Redis 中取信息构建 Worker 实例,也就是说,每次查询得到的实例不是同一内存对象。

1
2
3
4
5
6
7
8
9
10
11
from redis import Redis
from rq import Queue, Worker

redis = Redis()

# 返回当前 Redis 连接中注册的所有 Worker
workers = Worker.all(connection=redis)

# 返回指定 Queue 的所有 Worker(RQ >= 0.10.0)
queue = Queue('queue_name', connection=redis)
workers = Worker.all(queue=queue)

如果只是想得到 Worker 数量,可以使用 rq.Worker.count 方法(RQ >= 0.10.0):

1
2
3
4
5
6
7
8
9
from redis import Redis
from rq import Queue, Worker

redis = Redis()

workers = Worker.count(connection=redis)

queue = Queue('queue_name', connection=redis)
workers = Worker.count(queue=queue)

另外还可以通过 Worker 实例获得一些统计信息(RQ >= 0.9.0)。首先通过 Worker.find_by_key 方法获得 Worker 实例,传参为 Redis key,格式为 rq:worker:<name>。再通过实例属性查看统计信息。

1
2
3
4
5
6
7
from rq.worker import Worker

worker = Worker.find_by_key('rq:worker:name')

worker.successful_job_count # 执行成功任务数量
worker.failed_job_count. # 执行失败任务数量
worker.total_working_time # 总执行时间

停止 Worker

当 Worker 收到 SIGINT 信号(通过 Ctrl + C)或 SIGTERM 信号(通过 kill)时,会等待当前任务运行结束后,结束任务循环并注册自己的死亡。

如果在等待期间再次收到 SIGINT 或者 SIGTERM 信号,Worker 将会发送 SIGKILL 信号强行中止子进程,但仍然会尝试注册自己的死亡。

使用配置文件

要求 RQ >= 0.3.2。

配置文件需要为 Python 文件,在启动 Worker 时通过 -c 参数指定从哪个模块读取配置。以下是配置文件支持的配置项:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
REDIS_URL = 'redis://localhost:6379/1'

# 或者通过以下参数指定 Redis 数据库
# REDIS_HOST = 'redis.example.com'
# REDIS_PORT = 6380
# REDIS_DB = 3
# REDIS_PASSWORD = 'very secret'

# 指定监听的 Queue
QUEUES = ['high', 'normal', 'low']

# Sentry 设置
# The 'sync+' prefix is required for raven: https://github.com/nvie/rq/issues/350#issuecomment-43592410
SENTRY_DSN = 'sync+http://public:secret@example.com/1'

# 自定义 Worker 名称
NAME = 'worker-1024'

注意: QUEUESREDIS_PASSWORD 设置是0.3.3以后的新设置。

指定配置文件:

1
$ rq worker -c settings

自定义DeathPenalty类

当任务超时时,Worker 将尝试使用 death_penalty_class(默认值 UnixSignalDeathPenalty)提供的方法将其终止。如果您希望尝试以特定应用程序或“更干净”的方式杀死任务,则可以覆盖此项。

DeathPenalty 类使用以下参数构造 BaseDeathPenalty(timeout, JobTimeoutException, job_id=job.id)

自定义异常处理程序

要求 RQ >= 0.5.5。

如果要针对不同类型的作业以不同方式处理错误,或者想自定义 默认的错误处理,可以使用 --exception-handler 指定错误处理类:

1
2
3
4
$ rq worker --exception-handler 'path.to.my.ErrorHandler'

# Multiple exception handlers is also supported
$ rq worker --exception-handler 'path.to.my.ErrorHandler' --exception-handler 'another.ErrorHandler'

Result 执行结果

处理结果

如果一个任务有非 None 的返回值,Worker 会把返回值经过 pickle 序列化后,写入到任务在 Redis 中对应记录(key 为 rq:job:[hash] 格式,value 为 Hash 类型)的 result 字段中,默认将会在 500 秒后失效。

将任务入队时返回的 Job 实例是一个代理对象,绑定了任务 ID,以便能从任务执行结果中取到数据。

若 RQ >= 0.3.1 版本,可以在调用 enqueueenqueue_call 入队时,使用 result_ttl 参数指定存储结果删除时间:

  • 不设置,使用默认值 500,500 秒后删除;
  • 设置为 0,立即删除;
  • 设置为 -1,永不删除,此时要注意自己清理 Redis,以免 Redis 无限增长;
  • 设置为其他正整数数值 N,在 N 秒后删除;
1
2
3
4
q.enqueue(foo)  # result expires after 500 secs (the default)
q.enqueue(foo, result_ttl=86400) # result expires after 1 day
q.enqueue(foo, result_ttl=0) # result gets deleted immediately
q.enqueue(foo, result_ttl=-1) # result never expires--you should delete jobs manually

文档中的这一段与实际测试结果不符,实际测试没有返回值的情况下也使用默认的 500 秒的超时时间。

Additionally, you can use this for keeping around finished jobs without return values, which would be deleted immediately by default.

1
2
> q.enqueue(func_without_rv, result_ttl=500)  # job kept explicitly
>

异常处理

任务执行失败时会抛出异常,为了引起足够的注意,失败任务在 Redis 中的记录永不过期。RQ 目前没有可靠的或自动的方法判断某些失败的任务能否安全重试。

失败任务中抛出的异常,会被 Worker 捕获,pickle 序列化后存储到任务在 Redis 中记录的 exc_info 字段中,而 失败任务的 Job 对象则会放入 failed 队列中。

Job 对象本身有一些有用的属性,可以用于辅助检查:

  • 任务原始创建时间;
  • 最后入队日期;
  • 入队始发队列;
  • 函数调用的文本描述;
  • 异常信息;

可以根据这些信息手动检查和判断问题并可以重新提交任务。

中断处理

当 Worker 进程被以“礼貌的”方式杀死时(Ctrl + C 或 kill),RQ 会努力不丢失任何任务,会在当前任务处理完成后停止处理新的任务。

但是,Worker 进程也可以通过 kill -9 的方式强行杀死,此时 Worker 不能“优雅地”完成工作,没有时间把任务加入 failed 队列中。因此,强行杀死进程可能会导致异常。

超时处理

默认情况下,任务应该在 180 秒内执行完毕。否则,Worker 会杀死 work horse 进程,并将任务加入到 failed 队列中,表明工作超时。

根据任务需要,我们可以自定义超时时间。

在 Job 维度,入队时使用 timeout 参数设置:

1
2
q = Queue()
q.enqueue_call(mytask, args=(foo,), kwargs={'bar': qux}, timeout=600) # 10 mins

在 Queue 维度,新建 Queue 时使用 timeout 参数设置,对队列中所有任务都有效。

1
2
3
4
# High prio jobs should end in 8 secs, while low prio
# work may take up to 10 mins
high = Queue('high', default_timeout=8) # 8 secs
low = Queue('low', default_timeout=600) # 10 mins

Job 维度的设置项优先级高于 Queue 维度。

Jobs 任务

从 Redis 中获取 Job

所有的任务信息都存储于 Redis 中,可以使用 Job.fetch(id, connection=redis) 方法获取 Job 实例:

1
2
3
4
5
6
from redis import Redis
from rq.job import Job

connection = Redis()
job = Job.fetch('my_job_id', connection=redis)
print('Status: %s' $ job.get_status())

该 Job 对象的一些属性包括:

  • job.status
  • job.func_name
  • job.args
  • job.kwargs
  • job.result
  • job.enqueued_at
  • job.started_at
  • job.ended_at
  • job.exc_info

读写当前 Job 实例

由于任务函数是常规的 Python 函数,因此在任务函数中,只能通过 RQ 的 get_current_job 函数获得当前 Job 的实例。

1
2
3
4
5
6
from rq import get_current_job

def add(x, y):
job = get_current_job()
print('Current job: %s' % (job.id,))
return x + y

通过 Job 实例的 meta 属性 和 save_meta() 方法,可以向 Job 实例中写入数据。(RQ >= 0.8.0)

1
2
3
4
5
6
7
8
def add(x, y):
job = get_current_job()
job.meta['timestamp'] = time.time()
job.save_meta()

# do more work
time.sleep(1)
return x + y

注意,以上内容只在任务函数中有效

Job 的等待执行时间

一个任务有两个 TTL:一个用于执行结果,另一个用于任务 Job 本身。后者表示任务在队列中等待多久后会被取消。该 TTL 可以在创建任务或入队时指定:

1
2
3
4
5
6
7
from rq.job import Job

# 在任务创建时指定
job = Job.create(func=say_hello, ttl=100)

# 在任务入队时指定
job = q.enqueue(count_words_at_url, 'http://nvie.com', ttl=43)

设置为 -1 时任务将一直等待,不会被取消。被取消的 Job 会立即被从 Redis 中立即删除。

执行失败的 Job

如果任务执行失败,Worker 会把任务放入 failed 队列中,同时 Redis 中 Job 实例的 is_failed 属性会被置为 True。使用 get_failed_queue 可以获取所有失败的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from redis import StrictRedis
from rq import push_connection, get_failed_queue, Queue
from rq.job import Job


con = StrictRedis()
push_connection(con)

def div_by_zero(x):
return x / 0

job = Job.create(func=div_by_zero, args=(1, 2, 3))
job.origin = 'fake'
job.save()

# 获取 failed queue 对象
fq = get_failed_queue()

# 把 job 加入到 failed queue 中
fq.quarantine(job, Exception('Some fake error'))
assert fq.count == 1

# 把 job 重新加入到执行队列中,并从 failed queue 中删除
fq.requeue(job.id)

assert fq.count == 0
assert Queue('fake').count == 1

Monitoring 监控

rq-dashboard

RQ dashboard 是一个单独分发的,轻量级的 Web 前端监控工具,基于 Flask 开发。

安装方式如下:

1
pip install rq-dashboard

运行:

1
rq-dashboard

与 Flask 集成

1
2
3
4
5
6
7
8
9
10
11
12
13
from flask import Flask
import rq_dashboard

app = Flask(__name__)
app.config.from_object(rq_dashboard.default_settings)
app.register_blueprint(rq_dashboard.blueprint, url_prefix="/rq")

@app.route("/")
def hello():
return "Hello World!"

if __name__ == "__main__":
app.run()

Console 工具

RQ 自带了 console 监控工具,启动命令为 rq info

1
2
3
4
5
6
7
8
9
$ rq info
failed |██ 2
default | 0
2 queues, 2 jobs total

InvokerPro.53243 idle: default
1 workers, 2 queues

Updated: 2018-10-29 23:03:07.478540

查询指定队列

通过 rq info queue1 queue2 … 可以返回指定队列的信息:

1
2
3
4
5
6
7
8
$ rq info default
default | 0
1 queues, 0 jobs total

InvokerPro.53243 idle: default
1 workers, 1 queues

Updated: 2018-10-29 23:22:17.640684

按队列展示

默认情况下,rq info 输出活跃 Worker 和它们监听的 Queue。

通过设置 -R 或者 --by-queue,可以让 RQ 按照队列组织展示,即展示队列和监听队列的 Worker。

1
2
3
4
5
6
7
8
9
10
$ rq info -R
failed |██ 2
default | 0
2 queues, 2 jobs total

failed: –
default: InvokerPro.53243 (idle)
1 workers, 2 queues

Updated: 2018-10-29 23:33:58.255413

定时轮询

默认情况下,rq info 打印信息后就会退出。可以使用 —-interval 参数指定轮询间隔,以不断刷新监控信息。

1
$ rq info --interval 1

注意,如果 interval 设置的过低,会加重 Redis 的负载。

Connection 连接

RQ 维护一个 Redis 连接堆栈,每个 RQ 对象实例在创建时,会使用堆栈最顶层的 Redis 连接。因此我们可以使用 with 上下文管理器创建连接,并在其中新建 RQ 对象实例。

1
2
3
4
5
from rq import Queue, Connection
from redis import Redis

with Connection(Redis()):
q = Queue()

或者新建 RQ 对象实例时显式地指定连接:

1
2
3
4
5
from rq import Queue
from redis import Redis

conn = Redis('localhost', 6379)
q = Queue('foo', connection=conn)

多 Redis 连接

使用显式连接实现——准确但乏味:

1
2
3
4
5
6
7
8
from rq import Queue
from redis import Redis

conn1 = Redis('localhost', 6379)
conn2 = Redis('remote.host.org', 9836)

q1 = Queue('foo', connection=conn1)
q2 = Queue('bar', connection=conn2)

使用 with 上下文管理器实现:

1
2
3
4
5
6
7
8
9
10
11
12
from rq import Queue, Connection
from redis import Redis

with Connection(Redis('localhost', 6379)):
q1 = Queue('foo')
with Connection(Redis('remote.host.org', 9836)):
q2 = Queue('bar')
q3 = Queue('qux')

assert q1.connection != q2.connection
assert q2.connection != q3.connection
assert q1.connection == q3.connection

push/pop 连接

如果代码不允许使用 with 语句(例如在单元测试中),则可以使用 push_connection()pop_connection() 方法替代上下文管理器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import unittest
from rq import Queue
from rq import push_connection, pop_connection

class MyTest(unittest.TestCase):
def setUp(self):
# 将新连接压入连接堆栈
push_connection(Redis())

def tearDown(self):
# 从连接堆栈中丢弃连接
pop_connection()

def test_foo(self):
"""Any queues created here use local Redis."""
q = Queue()
...

结合 Sentinel

要使用 redis sentinel,必须在配置文件中指定字典。将此设置与带有自动重启选项的 systemd 或 docker 容器结合使用,以便允许 worker 和 RQ 通过容错连接(fault-tolerant connection)连接 Redis。

1
2
3
4
5
6
7
8
9
10
11
SENTINEL: {
'INSTANCES':[
('remote.host1.org', 26379),
('remote.host2.org', 26379),
('remote.host3.org', 26379)
],
'SOCKET_TIMEOUT': None,
'PASSWORD': 'secret',
'DB': 2,
'MASTER_NAME': 'master'
}

Exception 异常处理

任务在发生异常时会执行失败,当 RQ Worker 在后台运行时,你怎么才能知道任务失败了呢?

failed 队列

默认情况下,RQ 会将失败的任务放入到 failed 队列中,包含了它们的异常信息(类型,值,堆栈)。这只能被动保存发生的异常,但不会有任何主动通知。

自定义异常处理

RQ支持注册自定义异常处理程序(RQ >= 0.3.1)。这样就可以在发生异常时采取其他步骤,或者替换默认地将失败任务发送到 failed 队列的行为。

在创建 Worker 时,使用 exception_handlers 参数指定异常处理程序列表。

1
2
3
4
from rq.handlers import move_to_failed_queue	# RQ 默认的异常处理行为——发送任务到 failed 队列

w = Worker([q], exception_handlers=[my_handler, move_to_failed_queue])
...

异常处理 handler 是一个函数,其参数为:

  • job 任务对象
  • exc_type 异常类型
  • exc_value 异常值
  • traceback 异常堆栈
1
2
3
4
def my_handler(job, exc_type, exc_value, traceback):
# do custom things here
# for example, write the exception info to a DB
...

或者使用可变参数定义:job*exc_info

1
2
3
def my_handler(job, *exc_info):
# do custom things here
...

链式异常处理

异常处理程序可以决定是否完成处理异常,还是由堆栈中的后续程序继续处理异常。这通过返回值来控制:

  • 若返回 True 表示继续,并进入下一个异常处理程序;
  • 若返回 False 表示停止处理异常;
  • 若没有返回值,即返回 None,则认为是 True,继续进入下一个异常处理程序;

如果要替换默认的错误处理行为,错误处理程序应该返回 False

1
2
def black_hole(job, *exc_info):
return False

Testing 测试

单元测试中的 Worker

许多框架在执行单元测试时使用内存数据库,这些数据库与 RQ 默认使用的 fork() 不太兼容。

因此在单元测试用,应该使用 SimpleWorker 类来避免 fork(),且建议以突发模式运行。

1
2
3
4
5
6
7
8
from redis import Redis
from rq import SimpleWorker, Queue

queue = Queue(connection=Redis())
queue.enqueue(my_long_running_job)
worker = SimpleWorker([queue], connection=queue.connection)
worker.work(burst=True) # Runs enqueued job
# Check for result...

在单元测试中执行任务

另一种解决方案是,在创建队列时使用 is_async=False 参数,使任务在同一个线程中立即执行,而不是将其分配给 Worker。此时不需要启动 RQ worker。

除此之外,还可以使用 FakeStrictRedis 替代 Redis,也不必再启动 Redis 服务器,FakeStrictRedis 的实例可以直接作为连接参数传递给队列。

1
2
3
4
5
6
from fakeredis import FakeStrictRedis
from rq import Queue

queue = Queue(is_async=False, connection=FakeStrictRedis())
job = queue.enqueue(my_long_running_job)
assert job.is_finished

参考链接

觉得有用可以请作者喝杯咖啡呀~
0%