« ^ »

MySQLのバイナリログをPythonで読み出す

所要時間: 約 5分

MySQLのバイナリログをPythonのmysqlreplicationを使って読み取る。データベースドライバにはPyMySQLを使用する。

MySQL Serverを起動する。今回はDockerを使う。

export MYSQL_ALLOW_EMPTY_PASSWORD=1

docker run -it --rm \
     --privileged=true \
     --publish="127.0.0.1:3306:3306" \
     --expose="3306" \
     --workdir="/workdir" \
     --volume "$(pwd):/workdir" \
     --volume "mysql-server-8-data:/var/lib/mysql" \
     --name="mysqld" \
     --env-file="${SCRIPT_DIR}/.env.mysqld" \
     mysql:8.0.2

MySQLにテスト用のデータベースとテーブルを作成する。

CREATE DATABASE page_1686018833;

use page_1686018833;

CREATE TABLE dept (
  id MEDIUMINT NOT NULL AUTO_INCREMENT,
  name VARCHAR(10),
  PRIMARY KEY(id)
);

依存パッケージをインストールする。

pip install mysqlreplication pymysql

MySQLに接続し、バイナリログを読み出すために、BinLogStreamReaderをインスタンス化する。接続にはMySQL Serverへの接続接続情報の他に、サーバーIDが必要になる。バイナリログには座標があり、ログファイル名(log_file)と、ログ位置(log_pos)で表現される。以前読み込んだ位置の続きから読み込みたい場合には具体的な値を指定する。Noneを指定した場合、新たに追加されたバイナリログを読み出す。 only_tables を指定すると、どのテーブルので発生したバイナリログを読み出すかを制限できる。また only_events により読み出すイベントの種類も制限できる。そして、イテレータをイテレーションする事で、バイナリログを読み出す。

バイナリログを読み出す例

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (DeleteRowsEvent, UpdateRowsEvent,
                                          TableMapEvent, WriteRowsEvent)
                                          


MYSQL_SERVER_ID = 1

# 読み出すバイナリログの位置。
# 以前読み込んだ位置の続きから読み込みたい場合に具体的な値を指定する。
log_file, log_pos = None, None

sr = BinLogStreamReader(
    connection_settings={
        "host": "localhost",
        "port": 3306,
        "user": "root",
    },
    server_id=MYSQL_SERVER_ID,
    log_file=log_file,
    log_pos=log_pos,
    only_tables=["dept"],  # deptテーブルのデータを読み出す
    only_events=[        # 読み出すイベントの種類を指定できる
        WriteRowsEvent,  # - 書き込みイベント
        UpdateRowsEvent, # - 更新イベント
        DeleteRowsEvent, # - 削除イベント
        TableMapEvent,
    ],
    blocking=True,
    resume_stream=True,
)

for event in sr:
    breakpoint()
    pass

print("finished")

バイナリログの1イベントを読むとイテレータからイベントが返される。バイナリログのイベントがない場合は blocking=True を指定した場合はブロックされる。 event.rows にはイベントに関連したレコードの値の情報が格納されている。これは次のような形式となっている。

[
  {
    "before_value": {〜省略〜},
    "after_value": {〜省略〜}
  },
  {
    "before_value": {〜省略〜},
    "after_value": {〜省略〜}
  },
]

before_valueには処理前の行の値が、after_valueには処理後の行の値が列名と値の辞書として格納されている。気を付けたい事は、この値はJSONに直列化できない値が含まれている。例えば、時刻型の列の値はdatetime.datetimeやdatetime.dateのインスタンスになっているし、decimal型の列の値はdecimal.Decimalのインスンタンスのとなっているし、バイト型の列の値はbytesのインスタンスとなっている。もしJSONにしたい場合、これらを直列化する必要がある。

import json
from datetime import date, datetime
from decimal import Decimal

def encode_json(o):
    if isinstance(o, datetime):
        return o.strftime("%Y%m%d")
    elif isinstance(o, date):
        return o.strftime("%Y%m%d")
    elif isinstance(o, Decimal):
        return float(o)
    elif isinstance(o, bytes):
        return o.decode()
    return o

# eventはバイナリログから読み出したイベント
json.dumps(event.rows, default=encode_json)
event.rowsをJSONに直列化する例

バイナリログの座標は BinLogStreamReader のインスタンスが保持しており、バイナリログを読み出した時に更新される。

# srはBinLogStreamReaderのインスタンス
sr.log_file  # バイナリログのログファイル名
sr.log_pos   # バイナリログのログ位置

イベントの種類

pymysqlreplication/row_event.pyには RowsEvent が定義されており、このクラスを継承して、各種イベントを定義している。これにより、そのログがどのような操作だったのかを識別できるように実装されている。

イベント発生する操作event.rows
DeleteRowsEventDELETE[{'values': {...}}]
WriteRowsEventINSERT[{'values': {...}}]
UpdateRowsEventUPDATE[{'before_values': {...}, 'after_values': {...}}]
TableMapEventINSERT, UPDATE, DELETE属性無し
各種イベントと発生する操作

DeleteRowsEvent

DeleteRowsEventはレコードを削除する時に発生する。

DELETE FROM `dept`;
(Pdb) p event.rows
[{'values': {'id': 1, 'name': 'bar'}}]
(Pdb) p event.event_type
32
(Pdb)

もし削除されるデータが無ければ、バイナリログとしては出力されないため、イベントも発生しない。

WriteRowsEvent

WriteRowsEventはレコードを挿入する時に発生する。

INSERT INTO `dept` (`name`) VALUES ('foo');
(Pdb) event.rows
[{'values': {'id': 1, 'name': 'foo'}}]
(Pdb) event.event_type
30
(Pdb) c

UpdateRowsEvent

WriteRowsEventはレコードを更新する時に発生する。

UPDATE `dept` SET `name` = 'bar';

DeleteRowsEventやWriteRowsEventとはrowsに含まれる形式が異なる。

(Pdb) p event.rows
[{'before_values': {'id': 1, 'name': 'foo'}, 'after_values': {'id': 1, 'name': 'bar'}}]
(Pdb) p event.event_type
31
(Pdb)

更新対象のレコードが無ければ、バイナリログとしては出力されないため、イベントも発生しない。

TableMapEvent

TableMapEventは他のイベントとは異なり、DELETE、INSERT、UPDATEのどれを実行してもイベントとして発生する。例えば、これらのイベントを全て読み取るように設定してINSERTを行うと、まずTableMapEventを受け取り、次にWriteRowsEventを受け取る。

TableMapEventイベントにrows属性はなく、データの更新前に送信される。これはdocstringに記述があった。値のまたdocstringには「ライブラリの使用者はTableMapEventを使うな」との記述がある。理由は分からないけれど、使用しない方が良いかもしれない。

class TableMapEvent(BinLogEvent):
    """This event describes the structure of a table.
    It's sent before a change happens on a table.
    An end user of the lib should have no usage of this
    """
site-packages/pymysqlreplication/row_event.py

このイベントはテーブルの構造を記述します。テーブルに変更が発生する前に送信されます。ライブラリのエンドユーザーはこれを使用してはなりません