« ^ »

celery beatとdjango-celery-beat

所要時間: 約 8分

TL;DR

  • Celeryの定期実行機能であるbeatの使い方を確認した。
  • Django Adminで使用可能なbeatの機能としてdjango-celery-beatもある。


Pythonの非同期ジョブを実行するためのフレームワークとしてCeleryというものがある。Celeryには定期的に処理を呼び出すような仕組みとして beat という機能が組み込まれている。今回はこの Celery beat について調べた。

パッケージのインストール

Redisをブローカーとして使用する場合

pip install -U celery[redis]

全体的に影響のある設定

ブローカー

broker_url = "redis://localhost:6379/0"
celeryconfig.py(抜粋)

インポートするモジュール

imports = (
    'tasks',
)
celeryconfig.py(抜粋)

タスクの定義

タスクの定義方法はいくつかあるが、ここではshared_taskデコレータを用いた方法を使用する。

from celery import shared_task
tasks.py(抜粋)

非同期タスクとして実行したい関数にshared_taskデコレータを付けることで、その関数を非同期タスクとして呼び出すことが可能になる。

@shared_task
def say_hello():
    print("Hello")
tasks.py(抜粋)

ワーカーの起動

ワーカーはCeleryの非同期タスクとして定義された処理を実際に実行する役割を持つ。簡略化したイメージとしては、呼び出された処理はブローカーを通して、このワーカーに呼び出されたイベントが送られ、ワーカーによって処理が実行される。実行した結果は結果バックエンド(result backend)に送られる。


+--------------+  +--------------+  +--------------+
|              |  |              |  |              |
| Web API      |  | Batc         |  | Shell        |
|              |  |              |  |              |
+------+-------+  +-------+------+  +-----+--------+
       |                  |               |
       v                  v               v
+------+------------------+---------------+--------+
|                                                  |
| Broker (Message transporter)                     |
|                                                  |
+----+---------------------+-----------------------+
     |                     |
     v                     v
+----+---------+    +------+-------+
|              |    |              |
| Worker1      |    | Worker2      |
|              |    |              |
+----+---------+    +------+-------+
     |                     |
     v                     v
+--------------------------+-----------------------+
|                                                  |
| Backend (Result backend)                         |
|                                                  |
+--------------------------------------------------+
celery worker

タスクのプリフェッチ

Celery workerは処理するタスクをブローカーから取得するが、必ずしも、タスクを1つ取得し、それを処理するという順番で処理を行っているわけではない。ワーカーモデルに何を採用しているか、またワーカープロセスや並行数や並列数によって、1つのワーカーで同時に処理の実行なタスクの数が異なる。Celery workerはタスクの取得と、タスクの実行に関しては分離することができ、それぞれにその数を変更することができる。ここではタスクの取得数について注目する。下の図のFetchとして表記した部分でブローカーからタスクを取得しているが、ここで複数のタスクを取得し、プールしておくことができる。



                     +-----------+
                     |           |
                     | Broker    |
                     |           |
                     +----+------+
                          |
+-------------------------+-------------------------------+
|                         |                               |
| Celery worker           v                               |
|                    +--------------+                     |
|                    |              |                     |
|                    | Fetch        |                     |
|                    |              |                     |
|                    +----+---------+                     |
|                         |                               |
|        +----------------+---------------------+         |
|        |                |                     |         |
|        v                v                     v         |
| +------+-------+   +----+---------+   +-------+------+  |
| |              |   |              |   |              |  |
| | Execute      |   | Execute      |   | Execute      |  |
| |              |   |              |   |              |  |
| +--------------+   +--------------+   +--------------+  |
|                                                         |
+---------------------------------------------------------+

この値はworker_prefetch_multiplierという設定によって変更でき、デフォルトは4に設定されている。動作を確認するだけであれば、設定を1にすることで、動きをシンプルに出来る。パフォーマンス改善の一環としてこの値を変更できるということを知っておくと、対処できる幅が広がる。

worker_prefetch_multiplier = 1
celeryconfig.py(抜粋)

https://docs.celeryq.dev/en/stable/userguide/configuration.html#std-setting-worker_prefetch_multiplier

それぞれのワーカーモデルについては https://blog.symdon.info/posts/1617189961/ にまとめた。

beat

ローカルファイルとしてデータベースを保持する

beatは定期実行や時間指定タスクの実行に用いるため機構としてCeleryに組み込まれている。 beat サブコマンドを実行すると、時間に応じて実行する処理の管理が行われる。beatはデフォルトでは celerybeat-schedule という名前のファイルを作成する。このファイルは、shelveモジュールで生成したデータベースファイルであり、実行が予約されたタスクの情報が格納される。


+--------------------------------------------------+
|                                                  |
| beat  +------------------------------+           |
|       | celerybeat-schedule(shelve)  |           |
|       +------------------------------+           |
+------+-------------------------------------------+
       |
       v
+------+-------------------------------------------+
|                                                  |
| Broker (Message transporter)                     |
|                                                  |
+----+---------------------+-----------------------+
     |                     |
     v                     v
+----+---------+    +------+-------+
|              |    |              |
| Worker1      |    | Worker2      |
|              |    |              |
+----+---------+    +------+-------+
     |                     |
     v                     v
+--------------------------+-----------------------+
|                                                  |
| Backend (Result backend)                         |
|                                                  |
+--------------------------------------------------+

例としてsay_helloタスクを1分毎に起動する定期実行を設定する。

beat_schedule = {
    'add-every-60-seconds': {
        'task': 'tasks.say_hello',
        'schedule': 60.0,
        'args': (),
    },
}
celeryconfig.py(抜粋)

時刻を設定する際には、それがどのタイムゾーンなのかを適切に設定する必要がある。不適だと時刻がずれる可能性がある。

timezone = 'UTC'
celeryconfig.py(抜粋)

celery beatコマンドを用いることで、サービスを開始できる。

celery beat

beatを実行すると、celerybeat-scheduleファイルが作成される。このファイルはshelveモジュールによって永続化されたデータベースだ。実際、以下のコードでデータベースに格納された情報を伺い知ることができる。

import shelve
import pprint as pp

with shelve.open("celerybeat-schedule") as db:
    pp.pprint(db["entries"])
{'celery.backend_cleanup': <ScheduleEntry: celery.backend_cleanup celery.backend_cleanup() <crontab: 0 4 * * * (m/h/d/dM/MY)>}
celerybeat-scheduleに格納された情報を確認する

日付を指定してタスクを実行する

apply_async()にetaを渡すことで日付を指定してタスクを実行することができる。これはbeatとは別の機構を用いているため、celerybeat-scheduleデータベースに反映されることはなく、ブローカー側で管理される。

import tasks
from datetime import datetime, timedelta

tomorrow = datetime.utcnow() + timedelta(days=1)
result = tasks.say_hello.apply_async(eta=tomorrow)

このようなタスクのブローカー上での扱われ方は、当然ブローカーの種類によって異なる。例えばRedisをブローカーに採用した場合、タスクの情報はunackedやunacked_indexに登録される。もし何らかの理由で挙動の調査が必要になった場合は、このキーを足掛りに調査をすると良い。

数秒後にタスクを実行する

apply_async()にcountdownを渡すことで発火タイミングを指定して、タスクを実行することができる。これもetaと同様で、beatとは別の機構を用いているため、celerybeat-scheduleデータベースに反映されることはなく、ブローカー側で管理される。

import tasks

result = tasks.say_hello.apply_async(countdown=60)

Djangoとの連携

CeleryはDjangoを強く意識しており、Djangoと統合するためのパッケージが幾つかあり、また機能が取り込まれたりしたものもある。

Djangoの設定ファイルを読み込む

djappは django-admin startproject djapp . というコマンドで作成したDjangoプロジェクトとする。通常は、このディレクトリにCelery用のファイルを作成する。公式のドキュメントでは celery.py という名前でファイルを作成している。 暗黙的相対インポートが有効の場合、名前解決に失敗しそうではあるため、この方法はあまり良くないのではないかと思うが、ここではそういった事を気にせず公式のドキュメントに従い実装する。djappディレクトリ配下はこのようになる。

tree djapp
djapp
|-- __init__.py
|-- asgi.py
|-- celery.py
|-- settings.py
|-- urls.py
`-- wsgi.py

celery.pyは以下のコードを実装する。

import os

from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djapp.settings")

app = Celery("djapp")

app.config_from_object("django.conf:settings", namespace="CELERY")

app.autodiscover_tasks()

import tasks

設定はsettings.pyに記述する。設定可能な値はceleryconfig.pyで設定可能な値とほぼ同じだが、変数名に CELERY_ 接頭辞を追加し、全ての文字を大文字にする必要がある。設定は以下のようになる。Celeryの設定は下部に記述している。

from pathlib import Path

BASE_DIR = Path(__file__).resolve().parent.parent
SECRET_KEY = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
DEBUG = True
ALLOWED_HOSTS = []

INSTALLED_APPS = [
    "django.contrib.admin",
    "django.contrib.auth",
    "django.contrib.contenttypes",
    "django.contrib.sessions",
    "django.contrib.messages",
    "django.contrib.staticfiles",
    "django_celery_beat",
]

MIDDLEWARE = [
    "django.middleware.security.SecurityMiddleware",
    "django.contrib.sessions.middleware.SessionMiddleware",
    "django.middleware.common.CommonMiddleware",
    "django.middleware.csrf.CsrfViewMiddleware",
    "django.contrib.auth.middleware.AuthenticationMiddleware",
    "django.contrib.messages.middleware.MessageMiddleware",
    "django.middleware.clickjacking.XFrameOptionsMiddleware",
]

ROOT_URLCONF = "djapp.urls"

TEMPLATES = [
    {
        "BACKEND": "django.template.backends.django.DjangoTemplates",
        "DIRS": [],
        "APP_DIRS": True,
        "OPTIONS": {
            "context_processors": [
                "django.template.context_processors.debug",
                "django.template.context_processors.request",
                "django.contrib.auth.context_processors.auth",
                "django.contrib.messages.context_processors.messages",
            ],
        },
    },
]

WSGI_APPLICATION = "djapp.wsgi.application"


DATABASES = {
    "default": {
        "ENGINE": "django.db.backends.sqlite3",
        "NAME": BASE_DIR / "db.sqlite3",
    }
}

AUTH_PASSWORD_VALIDATORS = [
    {
        "NAME": "django.contrib.auth.password_validation.UserAttributeSimilarityValidator",
    },
    {
        "NAME": "django.contrib.auth.password_validation.MinimumLengthValidator",
    },
    {
        "NAME": "django.contrib.auth.password_validation.CommonPasswordValidator",
    },
    {
        "NAME": "django.contrib.auth.password_validation.NumericPasswordValidator",
    },
]

LANGUAGE_CODE = "en-us"
TIME_ZONE = "UTC"
USE_I18N = True
USE_TZ = True
STATIC_URL = "static/"
DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"

# for celery
from celery.schedules import crontab

CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
CELERYBEAT_SCHEDULE = {
    'add-every-60-seconds': {
        'task': 'tasks.say_hello',
        'schedule': crontab(),
        'args': (),
    },
}
CELERY_TIMEZONE = 'UTC'

起動時には先ほど作成したdjapp/celery.pyをドッテド名にし -A オプションに渡す。例としてworkerを起動する例を示す。

celery -A djapp.celery worker

django-celery-beat - Djangoでデータベースを管理する

定期タスクの実行だけであれば、通常のCeleryの場合と、Djangoを用いたCeleryの場合とでは、特に違いなく実行できる。ただしDjangoの管理画面を用いて定期実行の管理をするためには、django-celery-beatパッケージをインストールする必要がある。このパッケージを利用すると、shelve形式の簡易データベースとしてcelerybeat-scheduleファイルを作成するのではなく、Djangoが管理するデータベースに定期タスクのデータを格納する。

#+begin_srcv bash pip install django-celery-beat #+end_src

そしてINSTALLED_APPSにdjango_celery_beatを追加する。

INSTALLED_APPS = [
    "django.contrib.admin",
    "django.contrib.auth",
    "django.contrib.contenttypes",
    "django.contrib.sessions",
    "django.contrib.messages",
    "django.contrib.staticfiles",
    "django_celery_beat",
]
djapp/settings.py(抜粋)

マイグレーションを実行すると、django-celery-beat用のテーブルが作成される。

sqlite> .tables
.tables
auth_group                           django_celery_beat_crontabschedule
auth_group_permissions               django_celery_beat_intervalschedule
auth_permission                      django_celery_beat_periodictask
auth_user                            django_celery_beat_periodictasks
auth_user_groups                     django_celery_beat_solarschedule
auth_user_user_permissions           django_content_type
django_admin_log                     django_migrations
django_celery_beat_clockedschedule   django_session

テーブルは次のように定義される。

CREATE TABLE IF NOT EXISTS "django_celery_beat_intervalschedule" ("id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "every" integer NOT NULL, "period" varchar(24) NOT NULL);
CREATE TABLE IF NOT EXISTS "django_celery_beat_periodictasks" ("ident" smallint NOT NULL PRIMARY KEY, "last_update" datetime NOT NULL);
CREATE TABLE IF NOT EXISTS "django_celery_beat_solarschedule" ("id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "event" varchar(24) NOT NULL, "latitude" decimal NOT NULL, "longitude" decimal NOT NULL);
CREATE UNIQUE INDEX "django_celery_beat_solarschedule_event_latitude_longitude_ba64999a_uniq" ON "django_celery_beat_solarschedule" ("event", "latitude", "longitude");
CREATE TABLE IF NOT EXISTS "django_celery_beat_periodictask" ("id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "name" varchar(200) NOT NULL UNIQUE, "task" varchar(200) NOT NULL, "args" text NOT NULL, "kwargs" text NOT NULL, "queue" varchar(200) NULL, "exchange" varchar(200) NULL, "routing_key" varchar(200) NULL, "expires" datetime NULL, "enabled" bool NOT NULL, "last_run_at" datetime NULL, "total_run_count" integer unsigned NOT NULL CHECK ("total_run_count" >= 0), "date_changed" datetime NOT NULL, "description" text NOT NULL, "crontab_id" integer NULL REFERENCES "django_celery_beat_crontabschedule" ("id") DEFERRABLE INITIALLY DEFERRED, "interval_id" integer NULL REFERENCES "django_celery_beat_intervalschedule" ("id") DEFERRABLE INITIALLY DEFERRED, "solar_id" integer NULL REFERENCES "django_celery_beat_solarschedule" ("id") DEFERRABLE INITIALLY DEFERRED, "one_off" bool NOT NULL, "start_time" datetime NULL, "priority" integer unsigned NULL CHECK ("priority" >= 0), "headers" text NOT NULL, "clocked_id" integer NULL REFERENCES "django_celery_beat_clockedschedule" ("id") DEFERRABLE INITIALLY DEFERRED, "expire_seconds" integer unsigned NULL CHECK ("expire_seconds" >= 0));
CREATE INDEX "django_celery_beat_periodictask_crontab_id_d3cba168" ON "django_celery_beat_periodictask" ("crontab_id");
CREATE INDEX "django_celery_beat_periodictask_interval_id_a8ca27da" ON "django_celery_beat_periodictask" ("interval_id");
CREATE INDEX "django_celery_beat_periodictask_solar_id_a87ce72c" ON "django_celery_beat_periodictask" ("solar_id");
CREATE TABLE IF NOT EXISTS "django_celery_beat_clockedschedule" ("id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "clocked_time" datetime NOT NULL);
CREATE INDEX "django_celery_beat_periodictask_clocked_id_47a69f82" ON "django_celery_beat_periodictask" ("clocked_id");
CREATE TABLE IF NOT EXISTS "django_celery_beat_crontabschedule" ("id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "minute" varchar(240) NOT NULL, "hour" varchar(96) NOT NULL, "day_of_week" varchar(64) NOT NULL, "day_of_month" varchar(124) NOT NULL, "month_of_year" varchar(64) NOT NULL, "timezone" varchar(63) NOT NULL);

参考資料

https://docs.celeryq.dev/en/stable/getting-started/introduction.html#get-started https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html

脚注