import json
from mitmproxy import http
def request(flow: http.HTTPFlow) -> None:
# Google authentication
if flow.request.path == "/token":
flow.response = http.Response.make(
200,
headers=http.Headers(content_type="application/json; charset=utf-8"),
content=json.dumps(
{
"access_token": "XXXXXXXX",
"expires_in": 1667406302,
"token_type": "Bearer",
}
).encode(),
)
return
# Firebase push
if flow.request.path == "/batch":
flow.response = http.Response.make(
200,
headers=http.Headers(content_type='multipart/mixed; boundary="----xxxx"'),
content=r'''----xxxx\r
Content-Type: text/plain\r
\r
OK\r
----xxxx--\r\n'''.encode()
)
return
Batch APIのレスポンスはmultipart/mixedであることに注意する。正しく返さないと、クライアント側でエラーが発生する。
firebase-adminを用いて送信する場合、以下のようなコードでメッセージを送信できる。
import json
from mitmproxy import http
def request(flow: http.HTTPFlow) -> None:
# Google authentication
if flow.request.path == "/token":
flow.response = http.Response.make(
200,
headers=http.Headers(content_type="application/json; charset=utf-8"),
content=json.dumps(
{
"access_token": "XXXXXXXX",
"expires_in": 1667406302,
"token_type": "Bearer",
}
).encode(),
)
return
# Firebase push
if flow.request.path == "/batch":
flow.response = http.Response.make(
200,
headers=http.Headers(content_type='multipart/mixed; boundary="----xxxx"'),
content=r'''----xxxx\r
Content-Type: text/plain\r
\r
OK\r
----xxxx--\r\n'''.encode()
)
return
firebase-adminを用い、更にダミーサーバーを使用する場合、エンドポイントを動的に変更するような方法が提供されていない。そのため、むりやりモンキーパッチする必要がある。ここではcustom_patch.pyとして外部から読み込むようにしている。
import firebase_admin
firebase_admin.messaging._MessagingService.FCM_BATCH_URL = "http://localhost:8080/batch"
またクレデンシャルについても手を入れる必要がある。クレデンシャルのJSONの中にURLっぽい形式の文字列がある。これらはローカルに向くように書き換える。
from firebase_admin import credentials, initialize_app, messaging
cert = {
"type": "service_account",
"project_id": "testing",
"private_key_id": "0000000000000000000000000000000000000000",
"private_key": "-----BEGIN RSA PRIVATE KEY-----\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\n-----END RSA PRIVATE KEY-----\n",
"client_email": "[email protected]",
"client_id": "000000000000000000000",
"auth_uri": "http://localhost:8080/o/oauth2/auth",
"token_uri": "http://localhost:8080/token",
"auth_provider_x509_cert_url": "http://localhost:8080/oauth2/v1/certs",
"client_x509_cert_url": "http://localhost:8080/robot/v1/metadata/x509/firebase-adminsdk%40testing.iam.gserviceaccount.com",
}
credential = credentials.Certificate(cert)
initialize_app(credential, {"databaseURL": "http://localhost:8080"})