« ^ »

uvicornとasyncioとmultiprocessing

所要時間: 約 3分

uvicornはPython製のWebアプリケーションサーバで非同期IOに対応している。簡単なスクリプトを書いてその起動方法について考えてみる。

uvicornのインストール

まずはuvicornのインストールを行う。

pip install uvicorn

https://pypi.org/project/uvicorn/

サンプルアプリケーション

uvicornにサンプルとなるようなシンプルなアプリケーションが掲載されている。今回はそれをそのまま使うことにする。

async def app(scope, receive, send):
    assert scope['type'] == 'http'

    await send({
        'type': 'http.response.start',
        'status': 200,
        'headers': [
            [b'content-type', b'text/plain'],
        ],
    })
    await send({
        'type': 'http.response.body',
        'body': b'Hello, world!',
    })
example.py

https://www.uvicorn.org/#quickstart

CLIで起動

Installするとuvicornコマンドが使えるようになる。通常はCLIコマンドから起動する。CLIでの起動方法は公式ドキュメントに示されている。

uvicorn example:app

https://www.uvicorn.org/#quickstart

スクリプト内から起動する

uvicornをCLIコマンドで起動するのはなくPythonのスクリプト内から起動する

import uvicorn

uvicorn.run("example:app", host="127.0.0.1", port=5000, log_level="info")
main.py

公式のドキュメントにも記載がある。

https://www.uvicorn.org/#running-programmatically

今回は起動方法を考えるのでアプリケーションのファイルと起動用のスクリプトを分割した。アプリケーションが定義されているファイルはexample.py、起動のスクリプトファイルはmain.pyとした。

子プロセスとして起動する

ここからが今回考えたかったこと。最近は特にコンテナ技術の普及もあり1プロセスの機能を絞って実行することが望まれることが多いように思う。ただ1つのプロセス内で複数の独立した機能を実行したいこともある。例えばAPIと非同期WorkerとCronのように定期的に実行されいるバッチ処理などだ。

今回はその起動方法について考える。次のような構成を前提とする。

  • HTTP Server

    • Web API
  • Cron

    • 1秒に1回処理が実行される。
    • 外部のデータストアにデータを入れる。
    • 今回は外部のデータストアにRedisを用いる(LPUSH)。
  • Worker

    • 外部のデータストアからデータを取得して処理する。

図解すると次のようになる。


+---------------------------------------------------------+
|                                                         |
|  親プロセス                                             |
|                                                         |
|    子プロセス                                           |
|    +-------------+                                      |
|    | HTTP Server |                                      |
|    |   Web API   |                                      |
|    |   uvicorn   |                                      |
|    +-------------+                                      |
|                                                         |
|    子プロセス                   子プロセス              |
|    +------------------+         +------------------+    |
|    | Cron             |         | Worker           |    |
|    |   1秒ごとに      |         |   Redisから      |    |
|    |   Redisに        |         |   データを取得し |    |
|    |   データを詰める |         |   処理する       |    |
|    +---------o--------+         +---------o--------+    |
|              |                            |             |
+--------------|----------------------------|-------------+
               |                            |
               +----------------------------+
               |
           +---o---+
           |{s}    |
           | Redis |
           +-------+

Redisへのアクセスはaioredisを用いることにしたのでインストールする。

pip install aioredis

ファイル構成は先程と同様にアプリケーションが定義されているファイルはexample.py、起動のスクリプトファイルはmain.pyとする。

example.pyに以下の関数を定義する

  • Web API用のASGIアプリケーション
  • Cronとして起動する関数
  • Workerとして起動する関数
import asyncio
import logging
import time

import aioredis
import uvicorn

logger = logging.getLogger(__name__)

REDIS_URL = "redis://localhost"
REDIS_KEY = "TESTING"


async def app(scope, receive, send):
    """Web API"""
    assert scope["type"] == "http"

    await send(
        {
            "type": "http.response.start",
            "status": 200,
            "headers": [
                [b"content-type", b"text/plain"],
            ],
        }
    )
    await send(
        {
            "type": "http.response.body",
            "body": b"Hello, world!",
        }
    )


async def beat():
    """This is cron function. Like a celery beat."""
    redis = aioredis.from_url(REDIS_URL, encoding="utf-8", decode_responses=True)

    while True:
        await asyncio.sleep(1)
        async with redis.client() as conn:
            await conn.lpush(REDIS_KEY, "OK: {time.time()}")
            print("PUSH")

async def worker():
    """This is worker function."""
    redis = aioredis.from_url(REDIS_URL, encoding="utf-8", decode_responses=True)

    while True:
        async with redis as conn:
            val = await conn.brpop(REDIS_KEY)
            print(f"WORKING: {val}")


def start_app():
    uvicorn.run("example:app", host="127.0.0.1", port=5000, log_level="info")


def start_beat():
    awaitable = beat()
    loop = asyncio.new_event_loop()
    loop.run_until_complete(awaitable)


def start_worker():
    awaitable = worker()
    loop = asyncio.new_event_loop()
    loop.run_until_complete(awaitable)
example.py

一方、起動する側はmultiprocessingを使って子プロセスとして別のプロセスとして起動するようにした。

import multiprocessing as mp

import example


def main():
    p0 = mp.Process(target=example.start_app)
    p1 = mp.Process(target=example.start_beat)
    p2 = mp.Process(target=example.start_worker)

    p0.start()
    p1.start()
    p2.start()

    p0.join()
    p1.join()
    p2.join()


if __name__ == "__main__":
    main()
main.py

試しに実装してみたが、これはsupervisorの超劣化版を自分で実装していることに気がついた。ただこういう実装は、できても良いかもしれないとも思う。supervisorに既にある実装を、車輪を再発明するがごとく自分で実装しないといけない代わりに、各プロセスをどのようにコントロールするのかは自由に操作できる。