CeleryはPython製の非同期タスク用フレームワークであり、Djangoで記述され たプロダクトでは特に良く採用される。非同期タスクは、実行時間、メモリ、 CPUなどの計算リソースについて、あまり消費しない "軽い" ものから、多く消 費するような "重い" タスクが実装されることもある。

非同期タスクの "重さ" によってマシンのCPUの種類やメモリのサイズを変更したい。 つまりどのタスクをどのワーカーで処理するかというルーティングを設定することになる。

Celeryではtask_routesを使用してタスクのルーティングができるように用意されている。 今回はそのタスクのルーティングの設定を確認した。

まず各種サードパーティパッケージをインストールする。

pip install celery[redis] flower

次にCeleryの非同期タスクを実装する。今回は3つの非同期タスクを実装した。

  • heavy_weight_task

  • normal_weight_task

  • light_weight_task

from celery import Celery
from kombu import Queue

app = Celery(
    "testing",
    broker="redis://localhost:6379/0",
    result_backend="redis://localhost:6379/0",
)

app.conf.task_routes = {
    "main.light_weight_task": {"queue": "light"},
}
app.conf.task_default_queue = "default"


@app.task
def heavy_weight_task(*args, **kargs):
    return "heavy!!"


@app.task
def normal_weight_task(*args, **kargs):
    return "normal!!"


@app.task
def light_weight_task(*args, **kargs):
    return "light!!"

app.conf.task_routesheavy_weight_tasklight Queueにルーティングされるように設定している。 また app.conf.task_default_queue でデフォルトのQueueを default に設定しているため、他のタスクは default Queueにルーティングされる。

各種ワーカーの起動は次のように行う。

flower: celery --broker='redis://localhost:6379/0' -A main.app flower
heavy: celery -A main.app worker -n heavy-1 -Q heavy
light: celery -A main.app worker -n light-1 -Q light
default: celery -A main.app worker -n default-1 -Q default

Pythonを起動して先程実装したタスクをimportし、それぞれのタスクを実行する。

import main


main.heavy_weight_task.delay()
main.normal_weight_task.delay()
main.light_weight_task.delay()

すると main.light_weight_task のみが light Queueからタスクを取得するワーカーで処理されることがわかる。 その他のタスクは default Queueからタスクを取得するワーカーで処理される。

flowerを起動すればどのワーカーでタスクが処理されたかを確認しやすい。