« ^ »

Firebase Cloud MessagingのAPIのダミーサーバーを実装する

2022/11/1 更新
約 1分 で読める
import json

from mitmproxy import http


def request(flow: http.HTTPFlow) -> None:
    # Google authentication
    if flow.request.path == "/token":
        flow.response = http.Response.make(
            200,
            headers=http.Headers(content_type="application/json; charset=utf-8"),
            content=json.dumps(
                {
                    "access_token": "XXXXXXXX",
                    "expires_in": 1667406302,
                    "token_type": "Bearer",
                }
            ).encode(),
        )
        return

    # Firebase push
    if flow.request.path == "/batch":
        flow.response = http.Response.make(
            200,
            headers=http.Headers(content_type='multipart/mixed; boundary="----xxxx"'),
            content=r'''----xxxx\r
Content-Type: text/plain\r
\r
OK\r
----xxxx--\r\n'''.encode()
        )
        return

Batch APIのレスポンスはmultipart/mixedであることに注意する。正しく返さないと、クライアント側でエラーが発生する。

firebase-adminを用いて送信する場合、以下のようなコードでメッセージを送信できる。

import json

from mitmproxy import http


def request(flow: http.HTTPFlow) -> None:
    # Google authentication
    if flow.request.path == "/token":
        flow.response = http.Response.make(
            200,
            headers=http.Headers(content_type="application/json; charset=utf-8"),
            content=json.dumps(
                {
                    "access_token": "XXXXXXXX",
                    "expires_in": 1667406302,
                    "token_type": "Bearer",
                }
            ).encode(),
        )
        return

    # Firebase push
    if flow.request.path == "/batch":
        flow.response = http.Response.make(
            200,
            headers=http.Headers(content_type='multipart/mixed; boundary="----xxxx"'),
            content=r'''----xxxx\r
Content-Type: text/plain\r
\r
OK\r
----xxxx--\r\n'''.encode()
        )
        return

firebase-adminを用い、更にダミーサーバーを使用する場合、エンドポイントを動的に変更するような方法が提供されていない。そのため、むりやりモンキーパッチする必要がある。ここではcustom_patch.pyとして外部から読み込むようにしている。

import firebase_admin

firebase_admin.messaging._MessagingService.FCM_BATCH_URL = "http://localhost:8080/batch"

またクレデンシャルについても手を入れる必要がある。クレデンシャルのJSONの中にURLっぽい形式の文字列がある。これらはローカルに向くように書き換える。

from firebase_admin import credentials, initialize_app, messaging

cert = {
    "type": "service_account",
    "project_id": "testing",
    "private_key_id": "0000000000000000000000000000000000000000",
    "private_key": "-----BEGIN RSA PRIVATE KEY-----\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\n-----END RSA PRIVATE KEY-----\n",
    "client_email": "[email protected]",
    "client_id": "000000000000000000000",
    "auth_uri": "http://localhost:8080/o/oauth2/auth",
    "token_uri": "http://localhost:8080/token",
    "auth_provider_x509_cert_url": "http://localhost:8080/oauth2/v1/certs",
    "client_x509_cert_url": "http://localhost:8080/robot/v1/metadata/x509/firebase-adminsdk%40testing.iam.gserviceaccount.com",
}

credential = credentials.Certificate(cert)
initialize_app(credential, {"databaseURL": "http://localhost:8080"})

しむどん三度無視 により 2022/11/1 に投稿、2022/11/1 に最終更新
« ^ »