Pythonの非同期ジョブを実行するためのフレームワークとしてCeleryというものがある。Celeryには定期的に処理を呼び出すような仕組みとして beat
という機能が組み込まれている。今回はこの Celery beat
について調べた。
パッケージのインストール
Redisをブローカーとして使用する場合
pip install -U celery[redis]
全体的に影響のある設定
ブローカー
インポートするモジュール
タスクの定義
タスクの定義方法はいくつかあるが、ここではshared_taskデコレータを用いた方法を使用する。
非同期タスクとして実行したい関数にshared_taskデコレータを付けることで、その関数を非同期タスクとして呼び出すことが可能になる。
ワーカーの起動
ワーカーはCeleryの非同期タスクとして定義された処理を実際に実行する役割を持つ。簡略化したイメージとしては、呼び出された処理はブローカーを通して、このワーカーに呼び出されたイベントが送られ、ワーカーによって処理が実行される。実行した結果は結果バックエンド(result backend)に送られる。
+--------------+ +--------------+ +--------------+
| | | | | |
| Web API | | Batc | | Shell |
| | | | | |
+------+-------+ +-------+------+ +-----+--------+
| | |
v v v
+------+------------------+---------------+--------+
| |
| Broker (Message transporter) |
| |
+----+---------------------+-----------------------+
| |
v v
+----+---------+ +------+-------+
| | | |
| Worker1 | | Worker2 |
| | | |
+----+---------+ +------+-------+
| |
v v
+--------------------------+-----------------------+
| |
| Backend (Result backend) |
| |
+--------------------------------------------------+
celery worker
タスクのプリフェッチ
Celery workerは処理するタスクをブローカーから取得するが、必ずしも、タスクを1つ取得し、それを処理するという順番で処理を行っているわけではない。ワーカーモデルに何を採用しているか、またワーカープロセスや並行数や並列数によって、1つのワーカーで同時に処理の実行なタスクの数が異なる。Celery workerはタスクの取得と、タスクの実行に関しては分離することができ、それぞれにその数を変更することができる。ここではタスクの取得数について注目する。下の図のFetchとして表記した部分でブローカーからタスクを取得しているが、ここで複数のタスクを取得し、プールしておくことができる。
+-----------+
| |
| Broker |
| |
+----+------+
|
+-------------------------+-------------------------------+
| | |
| Celery worker v |
| +--------------+ |
| | | |
| | Fetch | |
| | | |
| +----+---------+ |
| | |
| +----------------+---------------------+ |
| | | | |
| v v v |
| +------+-------+ +----+---------+ +-------+------+ |
| | | | | | | |
| | Execute | | Execute | | Execute | |
| | | | | | | |
| +--------------+ +--------------+ +--------------+ |
| |
+---------------------------------------------------------+
この値はworker_prefetch_multiplierという設定によって変更でき、デフォルトは4に設定されている。動作を確認するだけであれば、設定を1にすることで、動きをシンプルに出来る。パフォーマンス改善の一環としてこの値を変更できるということを知っておくと、対処できる幅が広がる。
https://docs.celeryq.dev/en/stable/userguide/configuration.html#std-setting-worker_prefetch_multiplier
それぞれのワーカーモデルについては https://blog.symdon.info/posts/1617189961/ にまとめた。
beat
ローカルファイルとしてデータベースを保持する
beatは定期実行や時間指定タスクの実行に用いるため機構としてCeleryに組み込まれている。 beat
サブコマンドを実行すると、時間に応じて実行する処理の管理が行われる。beatはデフォルトでは celerybeat-schedule
という名前のファイルを作成する。このファイルは、shelveモジュールで生成したデータベースファイルであり、実行が予約されたタスクの情報が格納される。
+--------------------------------------------------+
| |
| beat +------------------------------+ |
| | celerybeat-schedule(shelve) | |
| +------------------------------+ |
+------+-------------------------------------------+
|
v
+------+-------------------------------------------+
| |
| Broker (Message transporter) |
| |
+----+---------------------+-----------------------+
| |
v v
+----+---------+ +------+-------+
| | | |
| Worker1 | | Worker2 |
| | | |
+----+---------+ +------+-------+
| |
v v
+--------------------------+-----------------------+
| |
| Backend (Result backend) |
| |
+--------------------------------------------------+
例としてsay_helloタスクを1分毎に起動する定期実行を設定する。
時刻を設定する際には、それがどのタイムゾーンなのかを適切に設定する必要がある。不適だと時刻がずれる可能性がある。
celery beatコマンドを用いることで、サービスを開始できる。
celery beat
beatを実行すると、celerybeat-scheduleファイルが作成される。このファイルはshelveモジュールによって永続化されたデータベースだ。実際、以下のコードでデータベースに格納された情報を伺い知ることができる。
日付を指定してタスクを実行する
apply_async()にetaを渡すことで日付を指定してタスクを実行することができる。これはbeatとは別の機構を用いているため、celerybeat-scheduleデータベースに反映されることはなく、ブローカー側で管理される。
import tasks
from datetime import datetime, timedelta
tomorrow = datetime.utcnow() + timedelta(days=1)
result = tasks.say_hello.apply_async(eta=tomorrow)
このようなタスクのブローカー上での扱われ方は、当然ブローカーの種類によって異なる。例えばRedisをブローカーに採用した場合、タスクの情報はunackedやunacked_indexに登録される。もし何らかの理由で挙動の調査が必要になった場合は、このキーを足掛りに調査をすると良い。
数秒後にタスクを実行する
apply_async()にcountdownを渡すことで発火タイミングを指定して、タスクを実行することができる。これもetaと同様で、beatとは別の機構を用いているため、celerybeat-scheduleデータベースに反映されることはなく、ブローカー側で管理される。
import tasks
result = tasks.say_hello.apply_async(countdown=60)
Djangoとの連携
CeleryはDjangoを強く意識しており、Djangoと統合するためのパッケージが幾つかあり、また機能が取り込まれたりしたものもある。
Djangoの設定ファイルを読み込む
djappは django-admin startproject djapp .
というコマンドで作成したDjangoプロジェクトとする。通常は、このディレクトリにCelery用のファイルを作成する。公式のドキュメントでは celery.py
という名前でファイルを作成している。
暗黙的相対インポートが有効の場合、名前解決に失敗しそうではあるため、この方法はあまり良くないのではないかと思うが、ここではそういった事を気にせず公式のドキュメントに従い実装する。djappディレクトリ配下はこのようになる。
tree djapp
djapp |-- __init__.py |-- asgi.py |-- celery.py |-- settings.py |-- urls.py `-- wsgi.py
celery.pyは以下のコードを実装する。
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djapp.settings")
app = Celery("djapp")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
import tasks
設定はsettings.pyに記述する。設定可能な値はceleryconfig.pyで設定可能な値とほぼ同じだが、変数名に CELERY_
接頭辞を追加し、全ての文字を大文字にする必要がある。設定は以下のようになる。Celeryの設定は下部に記述している。
from pathlib import Path
BASE_DIR = Path(__file__).resolve().parent.parent
SECRET_KEY = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
DEBUG = True
ALLOWED_HOSTS = []
INSTALLED_APPS = [
"django.contrib.admin",
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"django.contrib.messages",
"django.contrib.staticfiles",
"django_celery_beat",
]
MIDDLEWARE = [
"django.middleware.security.SecurityMiddleware",
"django.contrib.sessions.middleware.SessionMiddleware",
"django.middleware.common.CommonMiddleware",
"django.middleware.csrf.CsrfViewMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
"django.contrib.messages.middleware.MessageMiddleware",
"django.middleware.clickjacking.XFrameOptionsMiddleware",
]
ROOT_URLCONF = "djapp.urls"
TEMPLATES = [
{
"BACKEND": "django.template.backends.django.DjangoTemplates",
"DIRS": [],
"APP_DIRS": True,
"OPTIONS": {
"context_processors": [
"django.template.context_processors.debug",
"django.template.context_processors.request",
"django.contrib.auth.context_processors.auth",
"django.contrib.messages.context_processors.messages",
],
},
},
]
WSGI_APPLICATION = "djapp.wsgi.application"
DATABASES = {
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": BASE_DIR / "db.sqlite3",
}
}
AUTH_PASSWORD_VALIDATORS = [
{
"NAME": "django.contrib.auth.password_validation.UserAttributeSimilarityValidator",
},
{
"NAME": "django.contrib.auth.password_validation.MinimumLengthValidator",
},
{
"NAME": "django.contrib.auth.password_validation.CommonPasswordValidator",
},
{
"NAME": "django.contrib.auth.password_validation.NumericPasswordValidator",
},
]
LANGUAGE_CODE = "en-us"
TIME_ZONE = "UTC"
USE_I18N = True
USE_TZ = True
STATIC_URL = "static/"
DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"
# for celery
from celery.schedules import crontab
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
CELERYBEAT_SCHEDULE = {
'add-every-60-seconds': {
'task': 'tasks.say_hello',
'schedule': crontab(),
'args': (),
},
}
CELERY_TIMEZONE = 'UTC'
起動時には先ほど作成したdjapp/celery.pyをドッテド名にし -A
オプションに渡す。例としてworkerを起動する例を示す。
celery -A djapp.celery worker
django-celery-beat - Djangoでデータベースを管理する
定期タスクの実行だけであれば、通常のCeleryの場合と、Djangoを用いたCeleryの場合とでは、特に違いなく実行できる。ただしDjangoの管理画面を用いて定期実行の管理をするためには、django-celery-beatパッケージをインストールする必要がある。このパッケージを利用すると、shelve形式の簡易データベースとしてcelerybeat-scheduleファイルを作成するのではなく、Djangoが管理するデータベースに定期タスクのデータを格納する。
#+begin_srcv bash pip install django-celery-beat #+end_src
そしてINSTALLED_APPSにdjango_celery_beatを追加する。
マイグレーションを実行すると、django-celery-beat用のテーブルが作成される。
sqlite> .tables .tables auth_group django_celery_beat_crontabschedule auth_group_permissions django_celery_beat_intervalschedule auth_permission django_celery_beat_periodictask auth_user django_celery_beat_periodictasks auth_user_groups django_celery_beat_solarschedule auth_user_user_permissions django_content_type django_admin_log django_migrations django_celery_beat_clockedschedule django_session
テーブルは次のように定義される。
CREATE TABLE IF NOT EXISTS "django_celery_beat_intervalschedule" ("id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "every" integer NOT NULL, "period" varchar(24) NOT NULL); CREATE TABLE IF NOT EXISTS "django_celery_beat_periodictasks" ("ident" smallint NOT NULL PRIMARY KEY, "last_update" datetime NOT NULL); CREATE TABLE IF NOT EXISTS "django_celery_beat_solarschedule" ("id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "event" varchar(24) NOT NULL, "latitude" decimal NOT NULL, "longitude" decimal NOT NULL); CREATE UNIQUE INDEX "django_celery_beat_solarschedule_event_latitude_longitude_ba64999a_uniq" ON "django_celery_beat_solarschedule" ("event", "latitude", "longitude"); CREATE TABLE IF NOT EXISTS "django_celery_beat_periodictask" ("id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "name" varchar(200) NOT NULL UNIQUE, "task" varchar(200) NOT NULL, "args" text NOT NULL, "kwargs" text NOT NULL, "queue" varchar(200) NULL, "exchange" varchar(200) NULL, "routing_key" varchar(200) NULL, "expires" datetime NULL, "enabled" bool NOT NULL, "last_run_at" datetime NULL, "total_run_count" integer unsigned NOT NULL CHECK ("total_run_count" >= 0), "date_changed" datetime NOT NULL, "description" text NOT NULL, "crontab_id" integer NULL REFERENCES "django_celery_beat_crontabschedule" ("id") DEFERRABLE INITIALLY DEFERRED, "interval_id" integer NULL REFERENCES "django_celery_beat_intervalschedule" ("id") DEFERRABLE INITIALLY DEFERRED, "solar_id" integer NULL REFERENCES "django_celery_beat_solarschedule" ("id") DEFERRABLE INITIALLY DEFERRED, "one_off" bool NOT NULL, "start_time" datetime NULL, "priority" integer unsigned NULL CHECK ("priority" >= 0), "headers" text NOT NULL, "clocked_id" integer NULL REFERENCES "django_celery_beat_clockedschedule" ("id") DEFERRABLE INITIALLY DEFERRED, "expire_seconds" integer unsigned NULL CHECK ("expire_seconds" >= 0)); CREATE INDEX "django_celery_beat_periodictask_crontab_id_d3cba168" ON "django_celery_beat_periodictask" ("crontab_id"); CREATE INDEX "django_celery_beat_periodictask_interval_id_a8ca27da" ON "django_celery_beat_periodictask" ("interval_id"); CREATE INDEX "django_celery_beat_periodictask_solar_id_a87ce72c" ON "django_celery_beat_periodictask" ("solar_id"); CREATE TABLE IF NOT EXISTS "django_celery_beat_clockedschedule" ("id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "clocked_time" datetime NOT NULL); CREATE INDEX "django_celery_beat_periodictask_clocked_id_47a69f82" ON "django_celery_beat_periodictask" ("clocked_id"); CREATE TABLE IF NOT EXISTS "django_celery_beat_crontabschedule" ("id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "minute" varchar(240) NOT NULL, "hour" varchar(96) NOT NULL, "day_of_week" varchar(64) NOT NULL, "day_of_month" varchar(124) NOT NULL, "month_of_year" varchar(64) NOT NULL, "timezone" varchar(63) NOT NULL);
参考資料
https://docs.celeryq.dev/en/stable/getting-started/introduction.html#get-started https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html