Pythonでよく使われるJob QueueにCeleryがあります。 Celeryの定期実行タスクなどを使っていると多重起動を防止したくなったりします。 Celery Onceはそれ用のライブラリです。
import time
from celery import Celery, shared_task
from celery_once import QueueOnce
BROKER_URL = 'redis://localhost:6379/0'
app = Celery('tasks', broker=BROKER_URL)
app.conf.ONCE_REDIS_URL = 'redis://localhost:6379/1'
app.conf.ONCE_DEFAULT_TIMEOUT = 60 * 60
@shared_task(base=QueueOnce)
def slow_task():
print('START')
for ii in range(30):
print(ii)
time.sleep(1)
print('END')
return "Done!"
workerを起動します。
celery -A sample.app worker
-------------- [email protected] v4.0.2 (latentcall) ---- **** ----- --- * *** * -- Darwin-16.1.0-x86_64-i386-64bit 2017-03-04 18:50:04 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: tasks:0x107963550 - ** ---------- .> transport: redis://localhost:6379/0 - ** ---------- .> results: disabled:// - *** --- * --- .> concurrency: 4 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery
ついでにflowerも起動します。 flowerはCeleryのモニタリングツールです。 http://127.0.0.1:5555 でアクセスできます。
celery -A sample.app flower
```
別のターミナルでタスクをenqueueします。
workerの標準出力には次が出力されます。
[2017-03-04 18:50:06,348: WARNING/PoolWorker-2] START [2017-03-04 18:50:06,349: WARNING/PoolWorker-2] 0 [2017-03-04 18:50:07,350: WARNING/PoolWorker-2] 1 [2017-03-04 18:50:08,351: WARNING/PoolWorker-2] 2 [2017-03-04 18:50:09,352: WARNING/PoolWorker-2] 3 [2017-03-04 18:50:10,353: WARNING/PoolWorker-2] 4 [2017-03-04 18:50:11,354: WARNING/PoolWorker-2] 5 [2017-03-04 18:50:12,356: WARNING/PoolWorker-2] 6 [2017-03-04 18:50:13,357: WARNING/PoolWorker-2] 7 [2017-03-04 18:50:14,359: WARNING/PoolWorker-2] 8 [2017-03-04 18:50:15,360: WARNING/PoolWorker-2] 9 [2017-03-04 18:50:16,361: WARNING/PoolWorker-2] 10 [2017-03-04 18:50:17,363: WARNING/PoolWorker-2] 11 [2017-03-04 18:50:18,364: WARNING/PoolWorker-2] 12 [2017-03-04 18:50:19,365: WARNING/PoolWorker-2] 13 [2017-03-04 18:50:20,367: WARNING/PoolWorker-2] 14 [2017-03-04 18:50:21,368: WARNING/PoolWorker-2] 15 [2017-03-04 18:50:22,369: WARNING/PoolWorker-2] 16 [2017-03-04 18:50:23,370: WARNING/PoolWorker-2] 17 [2017-03-04 18:50:24,372: WARNING/PoolWorker-2] 18 [2017-03-04 18:50:25,374: WARNING/PoolWorker-2] 19 [2017-03-04 18:50:26,374: WARNING/PoolWorker-2] 20 [2017-03-04 18:50:27,377: WARNING/PoolWorker-2] 21 [2017-03-04 18:50:28,378: WARNING/PoolWorker-2] 22 [2017-03-04 18:50:29,379: WARNING/PoolWorker-2] 23 [2017-03-04 18:50:30,381: WARNING/PoolWorker-2] 24 [2017-03-04 18:50:31,382: WARNING/PoolWorker-2] 25 [2017-03-04 18:50:32,383: WARNING/PoolWorker-2] 26 [2017-03-04 18:50:33,385: WARNING/PoolWorker-2] 27 [2017-03-04 18:50:34,386: WARNING/PoolWorker-2] 28 [2017-03-04 18:50:35,388: WARNING/PoolWorker-2] 29 [2017-03-04 18:50:36,389: WARNING/PoolWorker-2] END
タスクを2つ連続で実行します。1個目のタスクが終了する前に2個目をenqueueします。
$ python execute.py # (1) $ python execute.py # (2) Traceback (most recent call last): File "execute.py", line 3, in <module> slow_task.delay() File "/var/lib/miniconda3/envs/py3.6.0/lib/python3.6/site-packages/celery/app/task.py", line 412, in delay return self.apply_async(args, kwargs) File "/var/lib/miniconda3/envs/py3.6.0/lib/python3.6/site-packages/celery_once/tasks.py", line 85, in apply_async raise e File "/var/lib/miniconda3/envs/py3.6.0/lib/python3.6/site-packages/celery_once/tasks.py", line 81, in apply_async self.raise_or_lock(key, once_timeout) File "/var/lib/miniconda3/envs/py3.6.0/lib/python3.6/site-packages/celery_once/tasks.py", line 118, in raise_or_lock raise self.AlreadyQueued(remaining) celery_once.tasks.AlreadyQueued: 3598
- (1). 1個目のタスクがenqueueされる。
- (2). 2個目のタスクをenqueueしようとするが、 (1) のタスクが終わっていないため `celery_once.tasks.AlreadyQueued` が送出される。