MySQLのバイナリログをPythonのmysqlreplicationを使って読み取る。データベースドライバにはPyMySQLを使用する。
MySQL Serverを起動する。今回はDockerを使う。
export MYSQL_ALLOW_EMPTY_PASSWORD=1
docker run -it --rm \
--privileged=true \
--publish="127.0.0.1:3306:3306" \
--expose="3306" \
--workdir="/workdir" \
--volume "$(pwd):/workdir" \
--volume "mysql-server-8-data:/var/lib/mysql" \
--name="mysqld" \
--env-file="${SCRIPT_DIR}/.env.mysqld" \
mysql:8.0.2
MySQLにテスト用のデータベースとテーブルを作成する。
CREATE DATABASE page_1686018833;
use page_1686018833;
CREATE TABLE dept (
id MEDIUMINT NOT NULL AUTO_INCREMENT,
name VARCHAR(10),
PRIMARY KEY(id)
);
依存パッケージをインストールする。
pip install mysqlreplication pymysql
MySQLに接続し、バイナリログを読み出すために、BinLogStreamReaderをインスタンス化する。接続にはMySQL Serverへの接続接続情報の他に、サーバーIDが必要になる。バイナリログには座標があり、ログファイル名(log_file)と、ログ位置(log_pos)で表現される。以前読み込んだ位置の続きから読み込みたい場合には具体的な値を指定する。Noneを指定した場合、新たに追加されたバイナリログを読み出す。 only_tables
を指定すると、どのテーブルので発生したバイナリログを読み出すかを制限できる。また only_events
により読み出すイベントの種類も制限できる。そして、イテレータをイテレーションする事で、バイナリログを読み出す。
バイナリログを読み出す例
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (DeleteRowsEvent, UpdateRowsEvent,
TableMapEvent, WriteRowsEvent)
MYSQL_SERVER_ID = 1
# 読み出すバイナリログの位置。
# 以前読み込んだ位置の続きから読み込みたい場合に具体的な値を指定する。
log_file, log_pos = None, None
sr = BinLogStreamReader(
connection_settings={
"host": "localhost",
"port": 3306,
"user": "root",
},
server_id=MYSQL_SERVER_ID,
log_file=log_file,
log_pos=log_pos,
only_tables=["dept"], # deptテーブルのデータを読み出す
only_events=[ # 読み出すイベントの種類を指定できる
WriteRowsEvent, # - 書き込みイベント
UpdateRowsEvent, # - 更新イベント
DeleteRowsEvent, # - 削除イベント
TableMapEvent,
],
blocking=True,
resume_stream=True,
)
for event in sr:
breakpoint()
pass
print("finished")
バイナリログの1イベントを読むとイテレータからイベントが返される。バイナリログのイベントがない場合は blocking=True
を指定した場合はブロックされる。 event.rows
にはイベントに関連したレコードの値の情報が格納されている。これは次のような形式となっている。
[ { "before_value": {〜省略〜}, "after_value": {〜省略〜} }, { "before_value": {〜省略〜}, "after_value": {〜省略〜} }, ]
before_valueには処理前の行の値が、after_valueには処理後の行の値が列名と値の辞書として格納されている。気を付けたい事は、この値はJSONに直列化できない値が含まれている。例えば、時刻型の列の値はdatetime.datetimeやdatetime.dateのインスタンスになっているし、decimal型の列の値はdecimal.Decimalのインスンタンスのとなっているし、バイト型の列の値はbytesのインスタンスとなっている。もしJSONにしたい場合、これらを直列化する必要がある。
バイナリログの座標は BinLogStreamReader
のインスタンスが保持しており、バイナリログを読み出した時に更新される。
# srはBinLogStreamReaderのインスタンス
sr.log_file # バイナリログのログファイル名
sr.log_pos # バイナリログのログ位置
イベントの種類
pymysqlreplication/row_event.pyには RowsEvent
が定義されており、このクラスを継承して、各種イベントを定義している。これにより、そのログがどのような操作だったのかを識別できるように実装されている。
イベント | 発生する操作 | event.rows |
---|---|---|
DeleteRowsEvent | DELETE | [{'values': {...}}] |
WriteRowsEvent | INSERT | [{'values': {...}}] |
UpdateRowsEvent | UPDATE | [{'before_values': {...}, 'after_values': {...}}] |
TableMapEvent | INSERT, UPDATE, DELETE | 属性無し |
DeleteRowsEvent
DeleteRowsEventはレコードを削除する時に発生する。
DELETE FROM `dept`;
(Pdb) p event.rows
[{'values': {'id': 1, 'name': 'bar'}}]
(Pdb) p event.event_type
32
(Pdb)
もし削除されるデータが無ければ、バイナリログとしては出力されないため、イベントも発生しない。
WriteRowsEvent
WriteRowsEventはレコードを挿入する時に発生する。
INSERT INTO `dept` (`name`) VALUES ('foo');
(Pdb) event.rows
[{'values': {'id': 1, 'name': 'foo'}}]
(Pdb) event.event_type
30
(Pdb) c
UpdateRowsEvent
WriteRowsEventはレコードを更新する時に発生する。
UPDATE `dept` SET `name` = 'bar';
DeleteRowsEventやWriteRowsEventとはrowsに含まれる形式が異なる。
(Pdb) p event.rows
[{'before_values': {'id': 1, 'name': 'foo'}, 'after_values': {'id': 1, 'name': 'bar'}}]
(Pdb) p event.event_type
31
(Pdb)
更新対象のレコードが無ければ、バイナリログとしては出力されないため、イベントも発生しない。
TableMapEvent
TableMapEventは他のイベントとは異なり、DELETE、INSERT、UPDATEのどれを実行してもイベントとして発生する。例えば、これらのイベントを全て読み取るように設定してINSERTを行うと、まずTableMapEventを受け取り、次にWriteRowsEventを受け取る。
TableMapEventイベントにrows属性はなく、データの更新前に送信される。これはdocstringに記述があった。値のまたdocstringには「ライブラリの使用者はTableMapEventを使うな」との記述がある。理由は分からないけれど、使用しない方が良いかもしれない。
このイベントはテーブルの構造を記述します。テーブルに変更が発生する前に送信されます。ライブラリのエンドユーザーはこれを使用してはなりません