卡飞资源网

专业编程技术资源共享平台

Python每日一库|Celery (一)

什么是Celery

Celery 是一个简单、灵活且可靠的分布式系统,可以处理大量消息,同时为操作提供维护此类系统所需的工具。它是一个专注于实时处理的任务队列,同时也支持任务调度。

官方网站:https
://docs.celeryproject.org/en/stable/

上面的定义听起来很复杂吧?不用担心。我将用一个真实世界的例子来简化定义。

假设你在你最喜欢的餐厅与你的女朋友共进晚餐。

  1. 服务员已经来找你并接受了你的订单。
  2. 服务员去了厨房,将你的订单告知Cook。
  3. Cook在 Queue 中有很多订单。他们正在从队列中一个一个地处理。
  4. 你的订单来自队列。一个厨子做好了菜,确认了服务员。
  5. 服务员把菜端到你的餐桌上。
  6. 你和你女朋友吃得很开心。

现在让我们将上述事件与 Celery 联系起来。与上面的例子类比。

  1. 订单是一条消息。消息是关于要执行什么任务和该任务的输入参数的信息。
  2. 煮菜是在 Celery 中执行的任务。一个任务(在编程中是一个函数)并且包含作用于输入并产生一些输出的动作/代码。
  3. Cook是Celery 的一名工人。工人是执行这些任务的程序,即;执行任务/功能。餐厅中可以有一名或多名工人,就像一名或多名厨师一样。
  4. Order Queue 是 Celery 中的一个任务队列。任务队列是要由工作人员执行的任务队列。

注意:Celery 使用 Message Broker 和它的 Messaging Queue 来进行操作。你可以阅读有关此主题的深入了解。但在这里我试图简化事情,让你清楚地了解 Celery 是如何工作的。

在 Celery 中,你可以假设订单或消息是要执行的任务,Waiter 是消息代理,订单队列是任务队列,而厨师是执行任务的 Worker。

当人们在餐厅点单时,任务队列会填满煮菜任务,例如 Task1、Task2、Task3……等等……Cook/工作人员从订单队列中获取订单或消息或任务并进行处理。

在 Celery 中,你可以创建可由工作人员执行的任务。客户端可以调用任务,任务由工作人员而不是客户端执行。一台或多台机器上可以有一个或多个 Celery worker(这就是为什么它在定义中被称为分布式)。

好的,足够的理论。现在让我们进入技术并使用 Celery 编写一个简单的程序。

准备工作

  1. 你需要 Redis/RabbitMQ 之一。我在这里使用 Redis。我已经安装了 Redis 并在 6379 端口上运行。
  2. pip安装相关的库
pip install celery
pip install redis

开始使用Celery

让我们创建一个文件夹 CeleryTest

在上述文件夹( CeleryTest)中创建文件 tasks.py 并在文件中写入以下代码。请仔细阅读代码。

from celery import Celery

BROKER_URL = 'redis://localhost:6379/0'

celery_app = Celery('Restaurant', broker=BROKER_URL)

@celery_app.task
def cooking_task(table_no, dishes):
    print("Started cooking for Table Number : "+table_no)
    for dish in dishes:
        print("Cooking : "+dish)
    print("Done cooking for Table Number : "+table_no)
  • 我们从 celery 包中导入了 Celery 类。
  • 我们使用 Redis 创建了 BROKER_URL。
  • 我们使用 Celery 类创建了 celery_app 实例,方法是将模块名称作为 Restaurant 传递,将 broker 作为 Redis 传递。
  • 我们用 @celery_app.task 装饰器装饰了 cooking_task 函数。用 @celery_app.task 装饰器装饰的函数被认为是celery任务。

现在转到命令提示符并在你的 tasks.py 所在的同一文件夹中,运行以下命令以运行我们可以执行此任务的 celery worker。

celery -A tasks worker --pool=solo --loglevel=info

你应该看到上面的输出。确保你看到以红线标记的日志,以确保我们的工作人员正常运行。好的,现在我们的 worker 正在运行,可以执行任务 cooking_task。

现在我们将任务提交给我们的工人。

在上面的文件夹( CeleryTest)中创建文件 test.py 并在文件中写入以下代码。

from tasks import cooking_task

table_1_dishes = ["Chicken Biryani", "Lemon chicken", "Pepper chicken"]
result = cooking_task.delay("Table-1", table_1_dishes)
print(result)
  • 我们从任务中导入了 cooking_task
  • 我们通过 cooking_task.delay(*args, **kwargs) 函数通过传递相应的输入(例如 Table-No 和菜式)来调用此任务。

在 test.py 控制台中,你将看到打印的任务 ID。

转到工作人员控制台并检查。

我们的工作人员收到了任务并执行了它。

编辑 test.py 并开始为我们的工作人员分配更多任务

from tasks import cooking_task


table_1_dishes = ["Chicken Biryani", "Lemon chicken", "Pepper chicken"]


result = cooking_task.delay("Table-1", table_1_dishes)

print(result)

table_2_dishes = ["Mutton Biryani", "Egg Biryani"]
result2 = cooking_task.apply_async(args=["Table-2", table_2_dishes])
print(result2)

table_3_dishes = ["Butter Naan", "Andhra Chicken"]
result3 = cooking_task.apply_async(args=["Table-3", table_3_dishes])
print(result3)

table_4_dishes = ["Chicken Manchurian", "Chicken Noodles"]
result4 = cooking_task.apply_async(args=["Table-4", table_4_dishes])
print(result4)

在 test.py 控制台中,我们打印了 4 个任务 ID


返回工作人员控制台并检查。

工人执行四项任务

你已经了解了什么是 celery、如何在 celery 中编写任务、如何运行 worker 以及如何调用 celery 任务。我们将在下一篇文章中深入探讨Celery的使用场景跟Celery Workers、Pool 及其并发配置。

如果你发现我的任何文章对你有帮助或者有用,麻烦点赞或者转发。 谢谢!

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言