什么是Celery
Celery 是一个简单、灵活且可靠的分布式系统,可以处理大量消息,同时为操作提供维护此类系统所需的工具。它是一个专注于实时处理的任务队列,同时也支持任务调度。
官方网站:https
://docs.celeryproject.org/en/stable/
上面的定义听起来很复杂吧?不用担心。我将用一个真实世界的例子来简化定义。
假设你在你最喜欢的餐厅与你的女朋友共进晚餐。
- 服务员已经来找你并接受了你的订单。
- 服务员去了厨房,将你的订单告知Cook。
- Cook在 Queue 中有很多订单。他们正在从队列中一个一个地处理。
- 你的订单来自队列。一个厨子做好了菜,确认了服务员。
- 服务员把菜端到你的餐桌上。
- 你和你女朋友吃得很开心。
现在让我们将上述事件与 Celery 联系起来。与上面的例子类比。
- 订单是一条消息。消息是关于要执行什么任务和该任务的输入参数的信息。
- 煮菜是在 Celery 中执行的任务。一个任务(在编程中是一个函数)并且包含作用于输入并产生一些输出的动作/代码。
- Cook是Celery 的一名工人。工人是执行这些任务的程序,即;执行任务/功能。餐厅中可以有一名或多名工人,就像一名或多名厨师一样。
- Order Queue 是 Celery 中的一个任务队列。任务队列是要由工作人员执行的任务队列。
注意:Celery 使用 Message Broker 和它的 Messaging Queue 来进行操作。你可以阅读有关此主题的深入了解。但在这里我试图简化事情,让你清楚地了解 Celery 是如何工作的。
在 Celery 中,你可以假设订单或消息是要执行的任务,Waiter 是消息代理,订单队列是任务队列,而厨师是执行任务的 Worker。
当人们在餐厅点单时,任务队列会填满煮菜任务,例如 Task1、Task2、Task3……等等……Cook/工作人员从订单队列中获取订单或消息或任务并进行处理。
在 Celery 中,你可以创建可由工作人员执行的任务。客户端可以调用任务,任务由工作人员而不是客户端执行。一台或多台机器上可以有一个或多个 Celery worker(这就是为什么它在定义中被称为分布式)。
好的,足够的理论。现在让我们进入技术并使用 Celery 编写一个简单的程序。
准备工作
- 你需要 Redis/RabbitMQ 之一。我在这里使用 Redis。我已经安装了 Redis 并在 6379 端口上运行。
- pip安装相关的库
pip install celery
pip install redis
开始使用Celery
让我们创建一个文件夹 CeleryTest
并在上述文件夹( CeleryTest)中创建文件 tasks.py 并在文件中写入以下代码。请仔细阅读代码。
from celery import Celery
BROKER_URL = 'redis://localhost:6379/0'
celery_app = Celery('Restaurant', broker=BROKER_URL)
@celery_app.task
def cooking_task(table_no, dishes):
print("Started cooking for Table Number : "+table_no)
for dish in dishes:
print("Cooking : "+dish)
print("Done cooking for Table Number : "+table_no)
- 我们从 celery 包中导入了 Celery 类。
- 我们使用 Redis 创建了 BROKER_URL。
- 我们使用 Celery 类创建了 celery_app 实例,方法是将模块名称作为 Restaurant 传递,将 broker 作为 Redis 传递。
- 我们用 @celery_app.task 装饰器装饰了 cooking_task 函数。用 @celery_app.task 装饰器装饰的函数被认为是celery任务。
现在转到命令提示符并在你的 tasks.py 所在的同一文件夹中,运行以下命令以运行我们可以执行此任务的 celery worker。
celery -A tasks worker --pool=solo --loglevel=info
你应该看到上面的输出。确保你看到以红线标记的日志,以确保我们的工作人员正常运行。好的,现在我们的 worker 正在运行,可以执行任务 cooking_task。
现在我们将任务提交给我们的工人。
在上面的文件夹( CeleryTest)中创建文件 test.py 并在文件中写入以下代码。
from tasks import cooking_task
table_1_dishes = ["Chicken Biryani", "Lemon chicken", "Pepper chicken"]
result = cooking_task.delay("Table-1", table_1_dishes)
print(result)
- 我们从任务中导入了 cooking_task
- 我们通过 cooking_task.delay(*args, **kwargs) 函数通过传递相应的输入(例如 Table-No 和菜式)来调用此任务。
在 test.py 控制台中,你将看到打印的任务 ID。
转到工作人员控制台并检查。
我们的工作人员收到了任务并执行了它。
编辑 test.py 并开始为我们的工作人员分配更多任务
from tasks import cooking_task
table_1_dishes = ["Chicken Biryani", "Lemon chicken", "Pepper chicken"]
result = cooking_task.delay("Table-1", table_1_dishes)
print(result)
table_2_dishes = ["Mutton Biryani", "Egg Biryani"]
result2 = cooking_task.apply_async(args=["Table-2", table_2_dishes])
print(result2)
table_3_dishes = ["Butter Naan", "Andhra Chicken"]
result3 = cooking_task.apply_async(args=["Table-3", table_3_dishes])
print(result3)
table_4_dishes = ["Chicken Manchurian", "Chicken Noodles"]
result4 = cooking_task.apply_async(args=["Table-4", table_4_dishes])
print(result4)
在 test.py 控制台中,我们打印了 4 个任务 ID
返回工作人员控制台并检查。
工人执行四项任务
你已经了解了什么是 celery、如何在 celery 中编写任务、如何运行 worker 以及如何调用 celery 任务。我们将在下一篇文章中深入探讨Celery的使用场景跟Celery Workers、Pool 及其并发配置。
如果你发现我的任何文章对你有帮助或者有用,麻烦点赞或者转发。 谢谢!