CeleryはPython製の非同期タスク用フレームワークであり、Djangoで記述され
たプロダクトでは特に良く採用される。非同期タスクは、実行時間、メモリ、
CPUなどの計算リソースについて、あまり消費しない "軽い"
ものから、多く消
費するような "重い"
タスクが実装されることもある。
非同期タスクの "重さ"
によってマシンのCPUの種類やメモリのサイズを変更したい。
つまりどのタスクをどのワーカーで処理するかというルーティングを設定することになる。
Celeryではtask_routesを使用してタスクのルーティングができるように用意されている。 今回はそのタスクのルーティングの設定を確認した。
まず各種サードパーティパッケージをインストールする。
pip install celery[redis] flower
次にCeleryの非同期タスクを実装する。今回は3つの非同期タスクを実装した。
- heavy_weight_task
- normal_weight_task
- light_weight_task
from celery import Celery
from kombu import Queue
app = Celery(
"testing",
broker="redis://localhost:6379/0",
result_backend="redis://localhost:6379/0",
)
app.conf.task_routes = {
"main.light_weight_task": {"queue": "light"},
}
app.conf.task_default_queue = "default"
@app.task
def heavy_weight_task(*args, **kargs):
return "heavy!!"
@app.task
def normal_weight_task(*args, **kargs):
return "normal!!"
@app.task
def light_weight_task(*args, **kargs):
return "light!!"
app.conf.task_routes
で heavy_weight_task
が light
Queueにルーティングされるように設定している。
また app.conf.task_default_queue
でデフォルトのQueueを default
に設定しているため、他のタスクは default
Queueにルーティングされる。
各種ワーカーの起動は次のように行う。
flower: celery --broker='redis://localhost:6379/0' -A main.app flower
heavy: celery -A main.app worker -n heavy-1 -Q heavy
light: celery -A main.app worker -n light-1 -Q light
default: celery -A main.app worker -n default-1 -Q default
Pythonを起動して先程実装したタスクをimportし、それぞれのタスクを実行する。
import main
main.heavy_weight_task.delay()
main.normal_weight_task.delay()
main.light_weight_task.delay()
すると main.light_weight_task
のみが light
Queueからタスクを取得するワーカーで処理されることがわかる。
その他のタスクは default
Queueからタスクを取得するワーカーで処理される。
flowerを起動すればどのワーカーでタスクが処理されたかを確認しやすい。