準備
今回は localstack
というダミーサーバを起動し、SQSの代わりとして利用する。また localstack
はDockerを用いて起動する事にする。
docker run -it --rm \
--publish="127.0.0.1:4566:4566" \
--publish="127.0.0.1:4571:4571" \
--publish="127.0.0.1:8080:8080" \
--expose="4566" \
--expose="4571" \
--expose="8080" \
--workdir="/workdir" \
--volume "$(pwd):/workdir" \
--name="localstack" \
localstack/localstack:0.12.6
SQSの料金はとても安いため、動作確認時においてもAWSにリソースを作った方が、効率的である事が多い。ただし例えばunittest用など諸般の事情でローカルにダミーのSQSを用意して、動作確認したい事もある。 localstack
はAWSのダミーサーバで、AWSが提供しているマネージドサービスの検証用のサーバとして使用できる。
キューの種類
標準キューとFIFOキューの2種類がある。標準キューは緩い配信処理で、順序は保証されず、重複して配信してしまう事もある。その代わり、APIの呼び出し制限がほぼ無い。FIFOキューはそれよりも厳しい配信処理で、必ず1回だけ配信され、また先に登録したメッセージが先に配信される。その代わり、APIの呼び出し制限がそこそこある1。
実行方法
AWSのリソースの操作は幾つかの方法がある。ここではAWS CLIを使用し --generate-cli-skeleton
によって作成したJSON形式のCLIスケルトンファイルを、AWS CLIに読み込ませる方法で実行する。
生成したCLIスケルトンファイルを保存し、必要な情報をCLIスケルトンファイルに書き込んだ後 --cli-input-json
とCLIスケルトンファイルを指定してコマンドを実行する。
各種操作
キューを作る
キューを作るには作成要求を送信する。
{
"QueueName": "testing_q",
"Attributes": {
"KeyName": ""
},
"tags": {
"KeyName": ""
}
}
キューの名前などを指定する。成功するとキューにアクセスするためのURLが払い出される。
メッセージをキューに登録する
キューにメッセージを登録するには送信要求を送信する。
{
"QueueUrl": "http://localhost:4566/000000000000/testing_q",
"MessageBody": "AAAA"
}
MessageBody
には登録するメッセージを指定する。バイナリデータの場合はbase64で符号化する等して、文字列で表現可能な形式になるよう直列化し登録する。
キューに入っているメッセージを1つ取得する
キューからメッセージを取得するには受信要求を送信する。
{
"QueueUrl": "http://localhost:4566/000000000000/testing_q",
"VisibilityTimeout": 1,
"WaitTimeSeconds": 2
}
登録したメッセージを取得できる。取得したメッセージは VisibilityTimeout
で指定した秒数の間は再取得できなくなる。この時間が経過した後は、再度取得できる。またレシートハンドルという値も同時に取得できる。レシートハンドルはメッセージを削除する時に使用する。
取得したメッセージをキューから削除する
メッセージは取得しただけでは削除されない。削除したければ、このメッセージを削除するという要求を送る必要がある。
{
"QueueUrl": "http://localhost:4566/000000000000/testing_q",
"ReceiptHandle": "czeonzsmfezmchcoiuxnpmqsfwzmjcqdohsijrgqycamujngnqgapvvfckrjlkejpmsorqlrcvxvuqheyumnkzsjlelkxilukzdppjyrxrjcjaepxzhoduowdlnqfdlpfmddluuojpjpxqadmrsubijjtydeuezeddvpbugjmbcpvvclxmgbqwecn"
}
メッセージを取得した時に、レシートハンドルという値も同時に取得できる。この値はメッセージと紐付いており、レシートハンドルを指定して削除要求を送信する事でメッセージを削除できる。レシートハンドルは不変なIDというわけではなく、同一のメッセージを再度取得した場合、メッセージが同じであっても、レシートハンドルは別の値となる。
キューの属性
キューにはそれぞれ属性がある。設定された属性の値により、キューの挙動が変化する。
属性を取得する
キューに設定されている属性を取得する。
{
"QueueUrl": "http://localhost:4566/000000000000/testing_q"
}
属性を設定する
キューに属性を設定する。
{
"QueueUrl": "http://localhost:4566/000000000000/testing_q",
"Attributes": {
"VisibilityTimeout": 50
}
}
設定できる属性名は、自由に決められるわけではなく、予め決められている。また値は文字列でなければならない。例えば VisibilityTimeout
に設定する値は秒数なので数値になるはずだが、値を設定する場合は "50"
のように文字列として指定する。
デッドレターキューを設定する
プログラムは、キューに登録されたメッセージを取得し、目的の処理を行う。その目的の処理を終えると、レシートハンドルに基いてキューからメッセージを削除する。その途中でメッセージを削除する前にエラーが発生した場合、メッセージはキューの中に残り続ける。そのメッセージが絶対に処理できない状態であった場合、メッセージを取得し、処理しようとするがエラーし削除できない。そしてまたメッセージが復活するという循環から抜け出せなくなってしまう。もし、このようなメッセージがどんどん増えて1万件になったならどうだろう。絶対に嫌だ。
このような状態を避けるために、プログラムがメッセージを受信した回数に応じて、ある回数を越えたメッセージは処理不能と判断し、別のキューにメッセージを登録し、元のキューからはメッセージを削除する。この別のキューのことをデッドレターキューと言う。
デッドレターキューはキューの RedrivePolicy
属性として登録される2。 RedrivePolicy
には、デッドレターキューのARNを deadLetterTargetArn
に、受信の最大回数を maxReceiveCount
に設定する。つまり maxReceiveCount
を超えたメッセージは処理不能として deadLetterTargetArn
に設定したキューに移される。
まずはデッドレターキュー自体が無いと、デッドレターキューの設定はできないので、デッドレターキューを aws sqs create-queue
で作成する。
{
"QueueName": "dl_q"
}
作成されたキューのARNを使い、既存のキューにデッドレターキューを設定する。設定は aws sqs set-queue-attributes
で行う。
{
"QueueUrl": "http://localhost:4566/000000000000/testing_q",
"Attributes": {
"RedrivePolicy": "{\"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:000000000000:dl_q\",\"maxReceiveCount\":\"2\"}"
}
}
この設定では2回目のメッセージを受信した後、可視性タイムアウトの時間切れまでにメッセージの削除が行なわれない場合、そのメッセージはデッドレターキューに移される。
コンテンツベースの重複削除
SQSには、同一と見做す事ができるメッセージの重複を削除できる。通常、重複しているかどうかは、DedupulcationIdが同一かどうかで見分ける。ただし設定で、メッセージBodyがどういつかどうかで重複と見分けるようにする事もできる。
https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html
フライト中に同一メッセージを送信するとどうなる?
フライト中(つまりReceiveMessageされたけれど、まだDeleteMessageされていない)のメッセージと同一と判定されるメッセージを送信した場合はどうなるか、気になったので調べた。
前提
設定によって挙動も変わるため、今回は以下の条件で挙動がどうなるかを確認する。
- FIFOキュー
- コンテンツベースの重複排除
- メッセージグループ単位での重複排除
メッセージはSQSに保存されAvailableとして扱われるが、フライト中のメッセージが除外されるまで、メッセージを取得することはできない。
先に送信したフライト中のメッセージをメッセージA、メッセージAがフライト中の状態で送信したメッセージをメッセージBとする。メッセージBは、メッセージAと、同じ内容、メッセージグループとする。
メッセージAがフライト中にメッセージBを送信すると、メッセージBはSQSに保存され、Availableとして扱われる。ただしFIFOキューの場合、メッセージBはメッセージAが除外されるまで、取得できない。