Асинхронные задачи с использованием Flask, Redis и Celery

| Python

В стремлении справиться с возросшим трафиком или усложнением функциональности веб-приложений иногда допустимо отложить вычисление на время и вернуть результаты клиенту позднее. Таким образом, веб-приложение не заставляет пользователя ждать неопределенное время, а вместо этого отправляет результаты позже. Достигается это с использованием фоновых задач при низком уровне загруженности или обработки в пакетном режиме.

Одним из решений, которое можно использовать для достижения этой цели, является Celery. Он помогает разбивать сложную работу на части и выполнять их на разных компьютерах для снижения нагрузки и сокращения времени её выполнения.

Что такое очередь задач?

Очередь задач – это механизм для распределения небольших порций работы или задач, выполняемые без вмешательства в цикл запрос-ответ большинства веб-приложений.

Очереди задач полезны при делегировании работы, которая в противном случае замедляла бы работу приложений в ожидании ответов. Их также можно использовать для выполнения ресурсоемких задач.

Что такое Celery?

Celery – асинхронная очередь задач, основанная на распределенной передаче сообщений для распределения рабочей нагрузки по компьютерам или потокам. Система Celery состоит из клиентов, брокера и нескольких воркеров.

Воркеры отвечают за выполнение задач или работ, помещенных в очередь, и возвращения результатов. С помощью Celery можно создавать локальных, так и удаленных воркеров, поэтому работа может быть делегирована на различные и более высокопроизводительные компьютеры через Интернет и результаты возвращаться обратно клиенту.

Таким образом, нагрузка на основной компьютер снижается и для обработки пользовательских запросов расходуется меньше ресурсов.

Клиент отвечает за постановку заданий воркерам, общение с которыми производится с помощью брокера сообщений. Брокер облегчает связь между клиентами и воркерами через очередь сообщений.

Примеры брокеров сообщений (серверы очередей) могут быть: Redis, RabbitMQ, Kafka и т.д.

Зачем нужен Celery?

Существуют различные причины, по которым может использоваться Celery для повседневных задач. Во-первых, он достаточно масштабируемый, что позволяет добавлять больше воркеров по требованию для обработки возросшей нагрузки или трафика. Celery также находится в активной разработке с краткой документацией и активным сообществом пользователей.

Еще одним преимуществом является то, что Celery легко интегрируется с популярными веб-фреймворками, при этом большинство из них содержат библиотеки для облегчения интеграции.

Он также предоставляет функциональные возможности для взаимодействия с другими веб-приложениями через webhooks, где нет библиотеки для поддержки взаимодействия.

Celery также может использовать различные брокеры сообщений, что предоставляет дополнительную гибкость. Рекомендуется RabbitMQ, но он также может работать с Redis, Beanstalk и т.д.

Демо приложение

В этой статье будет создано простое Flask приложение, которое будет рассылать пользователям напоминания в виде электронных писем. Также будет реализована функциональность для настройки количества времени, прежде чем сообщение или напоминание будет обработано, и электронное письмо будет отправлено пользователю.

Как и любой другой проект, работа будет проходить в виртуальной среде, которую будем создавать с помощью модуля venv. Будет использоваться Python версии 3.5. Операционная система – Ubuntu Linux 18.04.

$ python3 -m venv ./example
$ source ./example/bin/activate

Для этого проекта нам потребуется также установить пакеты Flask, Redis и Celery:

$ pip install flask celery redis

Файловая структура Flask приложения:

├── app.py
├── config.py
├── index.html
├── __pycache__
│   ├── app.cpython-35.pyc
│   └── config.cpython-35.pyc
├── README.md
├── req.txt
└── templates
    └── index.html

2 directories, 8 files

Для нашего проекта будет использоваться Redis в качестве брокера сообщений. Инструкции по установке и настройке Redis для вашей операционной системы легко нагугливаются.

Реализация приложения

Начнем с создания Flask приложения, которое будет отображать форму, позволяющую пользователям вводить детали сообщения, отправляемые эл. письмом в будущем.

Добавим следующее в файл app.py:

from flask import Flask, flash, render_template, request, redirect, url_for

app = Flask(__name__)
app.config.from_object("config")
app.secret_key = app.config['SECRET_KEY']

@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('index.html')

    elif request.method == 'POST':
        email = request.form['email']
        first_name = request.form['first_name']
        last_name = request.form['last_name']
        message = request.form['message']
        duration = request.form['duration']
        duration_unit = request.form['duration_unit']

        flash("Message scheduled")
        return redirect(url_for('index'))


if __name__ == '__main__':
    app.run(debug=True)

Это простое приложение с одним маршрутом обработки запросов GET и POST. Как только детали сообщения отправлены, данные передаются в функцию, планировщиков работы.

Чтобы очистить основной файл приложения от лишнего кода, поместим переменные конфигурации в отдельный файл config.py и загрузим конфигурацию из файла:

app.config.from_object("config")

Файл config.py будет находиться в той же папке, что и файл app.py, и содержать основные настройки:

SECRET_KEY = 'sdsds213123kldfklsd232'

Далее реализуем целевую страницу index.html:

{% for message in get_flashed_messages() %}
  <p style="color: red;">{{ message }}</p>
{% endfor %}

<form method="POST">
    First Name: <input id="first_name" name="first_name" type="text"><br/>
    Last Name: <input id="last_name" name="last_name" type="text"><br/>
    Email: <input id="email" name="email" type="email"><br/>
    Message: <textarea id="textarea" name="message"></textarea><br/>
    Duration: <input id="duration" name="duration" placeholder="Enter duration as a number. for example: 3" type="text"><br/>

   <select name="duration_unit">
      <option value="" disabled selected>Choose the duration</option>
      <option value="1">Minutes</option>
      <option value="2">Hours</option>
      <option value="3">Days</option>
   </select>

   <button type="submit" name="action">Submit </button>
</form>

Стилизация и форматирование были сокращены для краткости, не стесняйтесь форматировать и стилизовать HTML так, как вам хочется. Для этих целей воспользоваться одним из CSS фреймворков, например Bootstrap.

Теперь запустим приложение:

 $ python3 app.py

Рассылка писем с помощью Flask-Mail

Чтобы отправлять электронные письма из нашего Flask приложения, будет использоваться библиотека flask-mail, которая добавляется в проект следующим образом:

$ pip install flask-mail

Далее подружим наше Flask приложением с flask-mail в файле app.py:

from flask_mail import Mail, Message

app = Flask(__name__)
app.config.from_object("config")
app.secret_key = app.config['SECRET_KEY']


mail = Mail(app)

def send_mail(data):
    """ Функция отправки эл. писем.
    """
    with app.app_context():
        msg = Message("Ping!",
                    sender="admin.ping",
                    recipients=[data['email']])
        msg.body = data['message']
        mail.send(msg)

Функция send_main(data) будет получать отправляемое сообщение и получателя электронного письма. Затем она будет вызвана по истечении заданного времени для отправки электронного письма пользователю.

Также необходимо добавить переменные в config.py, чтобы Flask-Mail работал:

# Flask-Mail
MAIL_SERVER = 'smtp.mail.ru'
MAIL_PORT = 465
MAIL_USE_TLS = True
MAIL_USERNAME = '[email protected]'
MAIL_PASSWORD = '12345678'

Интеграция с Celery

Теперь, когда Flask приложение готово и оснащено функцией отправки электронной почты, мы можем подключить Celery, чтобы запланировать рассылку писем на более позднее время.

Внесем изменения в app.py:

# Existing imports are maintained
from celery import Celery

# Flask app and flask-mail configuration truncated

# Set up celery client
client = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
client.conf.update(app.config)

# Add this decorator to our send_mail function
@client.task
def send_mail(data):
    # Function remains the same

@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('index.html')

    elif request.method == 'POST':
        data = {}
        data['email'] = request.form['email']
        data['first_name'] = request.form['first_name']
        data['last_name'] = request.form['last_name']
        data['message'] = request.form['message']
        duration = int(request.form['duration'])
        duration_unit = request.form['duration_unit']

        if duration_unit == 'minutes':
            duration *= 60
        elif duration_unit == 'hours':
            duration *= 3600
        elif duration_unit == 'days':
            duration *= 86400

        send_mail.apply_async(args=[data], countdown=duration)
        mes = 'Email will be sent to {email} in {duration} {duration_unit}'.format(email=data["email"], 
                                                                               duration=request.form["duration"],
                                                                               duration_unit=duration_unit)
        flash(mes)

        return redirect(url_for('index'))

Использовалась возможность форматирования строк в Python, описанная [здесь][4]. Производим импорт celery и используем его для инициализации клиента Celery , прикрепляя URL для брокера обмена сообщениями. В нашем случае будет использоваться Redis в качестве посредника, поэтому добавим следующее в config.py:

CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

Чтобы функция send_mail() выполнялась как фоновая задача, добавим декоратор @client.task, чтобы клиент Celery знал об этом.

После настройки клиента Celery, основная функция, которая также обрабатывает ввод формы, изменяется.

Сначала упаковываются входные данные для функции send_mail() в словарь. Затем вызывается почтовая функцию через API вызова задач Celery, используя функцию apply_async.

Устанавливается необязательный параметр обратного отсчета, определяющий задержку между запуском кода и выполнением задачи.

Эта продолжительность указана в секундах, поэтому производится конвертация продолжительности, передаваемую пользователем, в секунды в зависимости от выбранной им единицы времени.

После того, как пользователь отправил форму, подтвердим получение и уведомим его с помощью информационного сообщения – баннера, когда сообщение будет отправлено.

Собираем все вместе

Чтобы запустить наш проект, нам понадобятся два терминала, один для запуска Flask приложения, а другой для запуска Celery воркера, который будет отправлять сообщения в фоновом режиме.

Запустим приложение в первом терминале:

$ python app.py

Во втором терминале активируем виртуальную среду, а затем запустим воркера Celery:

# запуск virtualenv
$ source bin/activate
$ celery worker -A app.client --loglevel=info

Если все пойдет хорошо, то получим следующий вывод в терминале:

Теперь перейдем по адресу http://localhost:5000 и заполним поля формы, определяющие прибытие электронного письма через 2 минуты после отправки.

Над формой появится сообщение с указанием адреса, по которому будет получено электронное письмо, и продолжительность, после которой электронное письмо будет отправлено. В терминале Celery также сможем увидеть запись в журнале, которая означает, что отправка электронного письма была запланирована:

[2019-11-08 15:05:44,805: INFO/MainProcess] Received task: app.send_mail[627d24b4-8b98-4b60-91d5-1b5780d2882c]  ETA:[2019-11-08 10:05:46.502260+00:00]

Раздел ETA записи показывает, когда будет вызвана функция send_email() и когда будет отправлено электронное письмо.

Теперь электронные письма попадают в очередь и отправляются в указанное время, однако, отсутствует одна важная возможность. Нет визуализации задач до или после их выполнения, и нет возможности определить, действительно ли письмо отправлено или нет.

Поэтому реализуем мониторинг и просмотр фоновых задач, а также быть в курсе, если что-то пойдет не так и задачи не будут вовремя.

Мониторинг Celery кластера Flower

Flower - это веб-инструмент, который обеспечивает наглядность настроек Celery и предоставляет функции для просмотра хода выполнения задачи, истории, подробностей и статистики, включая показатели успешности или неудачи. Также можно отслеживать всех воркеров кластера и задачи, которые они в данный момент выполняются.

Установить Flower так же просто, как Flask:

$ source bin/activate
$ pip install flower

Ранее уже были описаны детали клиента Celery в файле app.py. Теперь нужно передать клиента в Flower, чтобы его мониторинга.

Для этого откроем третье окно терминала, активировав виртуальную среду и запустить инструмент мониторинга:

$ flower -A app.client —port=5555

При запуске Flower указываем клиента Celery, передавая его через аргумент приложения (-A), а также определяя порт, через аргумент --port.

Настроив мониторинг, запланируем еще одно электронное письмо, а затем перейдем по адресу http://localhost:5555

На этой странице видим список воркеров кластера Celery, который в настоящее время состоит только из одного компьютера.

Чтобы просмотреть электронное письмо, которое только что запланировали, нажмите кнопку «Tasks» в верхней левой части панели инструментов, чтобы можно увидеть запланированные задачи:

В этом разделе также видим время, когда текст был получен и когда он был выполнен.

В разделе «Monitor» представлены графики, отображающие успешность и частоту отказов фоновых задач.

Планировать рассылку сообщений можно в любое время, но это также означает, что воркер должен быть подключен к сети и функционировать в это же время, когда предполагается выполнение задачи.