User Tools

Site Tools


celery

Celery

Установка

Предположим, что переменное окружение уже создано и брокер очередей уже установлен, тут используем RabbitMQ
Устанавливаем

pip install celery

Вуаля!

Запуск

Создадим простой модуль для проверки

from celery import Celery
 
app = Celery('tasks', brocker='amqp://localhost')
 
@app.task
def reverse(string):
    return string[::-1]

в cli запускаем celery

celery -A tasks worker --loglevel=info

Из другой cli проверяем

reverse("nevvad")
# отправляем
revers.delay("nevvad")

В windows полетели ошибки

celery ValueError: not enough values to unpack (expected 3, got 0)

решения фикс зыс трабл

pip install eventlet

снова запускаем

celery -A tasks worker --pool=eventlet -l info
# delay - пашет

Подключаем Базу данных

Документация по подключению бд искать на оффсайте celery
Дополнительно добавляем задержку на очередь в 10 сек.

from celery import Celery
import time
 
app= Celery('tasks', brocker='amqp://localhost', backend='db+sqlite:///celery.db')
 
@app.task
def reverse(string):
    time.sleep(10)
    return string[::-1]

проверяем в cli

result = reverse.delay('nevvad')
# сразу смотрим статус
result.status
# получаем 
'PENDING'
# через несколько секунд еще раз смотрим статус, задержка отработала
'SUCCESS'
# так же все delay присутствуют в бд

еще проверка

result = reverse.delay('nevvad')
result.ready()
# получаем False
# через несколько секунд
# получаем True

result.get()
'davven'

flask

Берем с оффсайта колбы макет для celery называем task_celery.py

from celery import Celery
 
def make_celery(app):
    celery = Celery(
        app.import_name,
        backend=app.config['CELERY_RESULT_BACKEND'],
        broker=app.config['CELERY_BROKER_URL']
    )
    celery.conf.update(app.config)
 
    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)
 
    celery.Task = ContextTask
    return celery

Создаем модуль flask c именем celery_example.py

from flask import Flask
from task_celery import make_celery
 
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'amqp://localhost'
app.config['CELERY_RESULT_BACKEND'] = 'db+sqlite:///celery.db'
 
celery = make_celery(app)
 
@app.route('/process/<name>')
def process(name):
    return name
 
@celery.task(name='celery_example.reverse')
def reverse(string):
    return string[::-1]
 
if __name__ == '__main__':
    app.run(debug=True)

стартуем celery

celery -A celery_example.celery worker --pool=eventlet -l info

проверяем в браузере

http://127.0.0.1:5000/process/nevvad
проверяем бд, должна появится запись

Отправка почты








celery.txt · Last modified: 2023/04/06 10:28 (external edit)