« ^ »

Celery Onceを使ってタスクの多重実行を防止する

所要時間: 約 2分

Pythonでよく使われるJob QueueにCeleryがあります。 Celeryの定期実行タスクなどを使っていると多重起動を防止したくなったりします。 Celery Onceはそれ用のライブラリです。

import time

from celery import Celery, shared_task
from celery_once import QueueOnce


BROKER_URL = 'redis://localhost:6379/0'
app = Celery('tasks', broker=BROKER_URL)
app.conf.ONCE_REDIS_URL = 'redis://localhost:6379/1'
app.conf.ONCE_DEFAULT_TIMEOUT = 60 * 60


@shared_task(base=QueueOnce)
def slow_task():
    print('START')
    for ii in range(30):
        print(ii)
        time.sleep(1)
    print('END')
    return "Done!"

workerを起動します。

celery -A sample.app worker
 -------------- [email protected] v4.0.2 (latentcall)
---- **** -----
--- * ***  * -- Darwin-16.1.0-x86_64-i386-64bit 2017-03-04 18:50:04
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x107963550
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

ついでにflowerも起動します。 flowerはCeleryのモニタリングツールです。 http://127.0.0.1:5555 でアクセスできます。

celery -A sample.app flower

```

別のターミナルでタスクをenqueueします。

from sample import slow_task

slow_task.delay()
Pythonでタスクをenqueueする。

workerの標準出力には次が出力されます。

[2017-03-04 18:50:06,348: WARNING/PoolWorker-2] START
[2017-03-04 18:50:06,349: WARNING/PoolWorker-2] 0
[2017-03-04 18:50:07,350: WARNING/PoolWorker-2] 1
[2017-03-04 18:50:08,351: WARNING/PoolWorker-2] 2
[2017-03-04 18:50:09,352: WARNING/PoolWorker-2] 3
[2017-03-04 18:50:10,353: WARNING/PoolWorker-2] 4
[2017-03-04 18:50:11,354: WARNING/PoolWorker-2] 5
[2017-03-04 18:50:12,356: WARNING/PoolWorker-2] 6
[2017-03-04 18:50:13,357: WARNING/PoolWorker-2] 7
[2017-03-04 18:50:14,359: WARNING/PoolWorker-2] 8
[2017-03-04 18:50:15,360: WARNING/PoolWorker-2] 9
[2017-03-04 18:50:16,361: WARNING/PoolWorker-2] 10
[2017-03-04 18:50:17,363: WARNING/PoolWorker-2] 11
[2017-03-04 18:50:18,364: WARNING/PoolWorker-2] 12
[2017-03-04 18:50:19,365: WARNING/PoolWorker-2] 13
[2017-03-04 18:50:20,367: WARNING/PoolWorker-2] 14
[2017-03-04 18:50:21,368: WARNING/PoolWorker-2] 15
[2017-03-04 18:50:22,369: WARNING/PoolWorker-2] 16
[2017-03-04 18:50:23,370: WARNING/PoolWorker-2] 17
[2017-03-04 18:50:24,372: WARNING/PoolWorker-2] 18
[2017-03-04 18:50:25,374: WARNING/PoolWorker-2] 19
[2017-03-04 18:50:26,374: WARNING/PoolWorker-2] 20
[2017-03-04 18:50:27,377: WARNING/PoolWorker-2] 21
[2017-03-04 18:50:28,378: WARNING/PoolWorker-2] 22
[2017-03-04 18:50:29,379: WARNING/PoolWorker-2] 23
[2017-03-04 18:50:30,381: WARNING/PoolWorker-2] 24
[2017-03-04 18:50:31,382: WARNING/PoolWorker-2] 25
[2017-03-04 18:50:32,383: WARNING/PoolWorker-2] 26
[2017-03-04 18:50:33,385: WARNING/PoolWorker-2] 27
[2017-03-04 18:50:34,386: WARNING/PoolWorker-2] 28
[2017-03-04 18:50:35,388: WARNING/PoolWorker-2] 29
[2017-03-04 18:50:36,389: WARNING/PoolWorker-2] END

タスクを2つ連続で実行します。1個目のタスクが終了する前に2個目をenqueueします。

$ python execute.py  # (1)
$ python execute.py  # (2)
Traceback (most recent call last):
  File "execute.py", line 3, in <module>
    slow_task.delay()
  File "/var/lib/miniconda3/envs/py3.6.0/lib/python3.6/site-packages/celery/app/task.py", line 412, in delay
    return self.apply_async(args, kwargs)
  File "/var/lib/miniconda3/envs/py3.6.0/lib/python3.6/site-packages/celery_once/tasks.py", line 85, in apply_async
    raise e
  File "/var/lib/miniconda3/envs/py3.6.0/lib/python3.6/site-packages/celery_once/tasks.py", line 81, in apply_async
    self.raise_or_lock(key, once_timeout)
  File "/var/lib/miniconda3/envs/py3.6.0/lib/python3.6/site-packages/celery_once/tasks.py", line 118, in raise_or_lock
    raise self.AlreadyQueued(remaining)
celery_once.tasks.AlreadyQueued: 3598
  • (1). 1個目のタスクがenqueueされる。
  • (2). 2個目のタスクをenqueueしようとするが、 (1) のタスクが終わっていないため `celery_once.tasks.AlreadyQueued` が送出される。