準備
今回は localstack
というダミーサーバを起動し、SQSの代わりとして利用する。また localstack
はDockerを用いて起動する事にする。
docker run -it --rm \
--publish="127.0.0.1:4566:4566" \
--publish="127.0.0.1:4571:4571" \
--publish="127.0.0.1:8080:8080" \
--expose="4566" \
--expose="4571" \
--expose="8080" \
--workdir="/workdir" \
--volume "$(pwd):/workdir" \
--name="localstack" \
localstack/localstack:0.12.6
SQSの料金はとても安いため、動作確認時においてもAWSにリソースを作った方が、効率的である事が多い。ただし例えばunittest用など諸般の事情でローカルにダミーのSQSを用意して、動作確認したい事もある。 localstack
はAWSのダミーサーバで、AWSが提供しているマネージドサービスの検証用のサーバとして使用できる。
キューの種類
標準キューとFIFOキューの2種類がある。標準キューは緩い配信処理で、順序は保証されず、重複して配信してしまう事もある。その代わり、APIの呼び出し制限がほぼ無い。FIFOキューはそれよりも厳しい配信処理で、必ず1回だけ配信され、また先に登録したメッセージが先に配信される。その代わり、APIの呼び出し制限がそこそこある1。
実行方法
AWSのリソースの操作は幾つかの方法がある。ここではAWS CLIを使用し --generate-cli-skeleton
によって作成したJSON形式のCLIスケルトンファイルを、AWS CLIに読み込ませる方法で実行する。
aws sqs create-queue --generate-cli-skeleton
生成したCLIスケルトンファイルを保存し、必要な情報をCLIスケルトンファイルに書き込んだ後 --cli-input-json
とCLIスケルトンファイルを指定してコマンドを実行する。
aws sqs create-queue --cli-input-json file://queue.json
各種操作
キューを作る
キューを作るには作成要求を送信する。
{
"QueueName": "testing_q",
"Attributes": {
"KeyName": ""
},
"tags": {
"KeyName": ""
}
}
キューの名前などを指定する。成功するとキューにアクセスするためのURLが払い出される。
メッセージをキューに登録する
キューにメッセージを登録するには送信要求を送信する。
{
"QueueUrl": "http://localhost:4566/000000000000/testing_q",
"MessageBody": "AAAA"
}
MessageBody
には登録するメッセージを指定する。バイナリデータの場合はbase64で符号化する等して、文字列で表現可能な形式になるよう直列化し登録する。
キューに入っているメッセージを1つ取得する
キューからメッセージを取得するには受信要求を送信する。
{
"QueueUrl": "http://localhost:4566/000000000000/testing_q",
"VisibilityTimeout": 1,
"WaitTimeSeconds": 2
}
登録したメッセージを取得できる。取得したメッセージは VisibilityTimeout
で指定した秒数の間は再取得できなくなる。この時間が経過した後は、再度取得できる。またレシートハンドルという値も同時に取得できる。レシートハンドルはメッセージを削除する時に使用する。
取得したメッセージをキューから削除する
メッセージは取得しただけでは削除されない。削除したければ、このメッセージを削除するという要求を送る必要がある。
{
"QueueUrl": "http://localhost:4566/000000000000/testing_q",
"ReceiptHandle": "czeonzsmfezmchcoiuxnpmqsfwzmjcqdohsijrgqycamujngnqgapvvfckrjlkejpmsorqlrcvxvuqheyumnkzsjlelkxilukzdppjyrxrjcjaepxzhoduowdlnqfdlpfmddluuojpjpxqadmrsubijjtydeuezeddvpbugjmbcpvvclxmgbqwecn"
}
メッセージを取得した時に、レシートハンドルという値も同時に取得できる。この値はメッセージと紐付いており、レシートハンドルを指定して削除要求を送信する事でメッセージを削除できる。レシートハンドルは不変なIDというわけではなく、同一のメッセージを再度取得した場合、メッセージが同じであっても、レシートハンドルは別の値となる。
キューの属性
キューにはそれぞれ属性がある。設定された属性の値により、キューの挙動が変化する。
属性を取得する
キューに設定されている属性を取得する。
{
"QueueUrl": "http://localhost:4566/000000000000/testing_q"
}
{ "Attributes": { "ApproximateNumberOfMessages": "0", "ApproximateNumberOfMessagesDelayed": "0", "ApproximateNumberOfMessagesNotVisible": "0", "CreatedTimestamp": "1684372206.490537", "DelaySeconds": "0", "LastModifiedTimestamp": "1684382380.375464", "MaximumMessageSize": "262144", "MessageRetentionPeriod": "345600", "QueueArn": "arn:aws:sqs:us-east-1:000000000000:testing_q", "ReceiveMessageWaitTimeSeconds": "0", "VisibilityTimeout": "30" } }
属性を設定する
キューに属性を設定する。
{
"QueueUrl": "http://localhost:4566/000000000000/testing_q",
"Attributes": {
"VisibilityTimeout": 50
}
}
設定できる属性名は、自由に決められるわけではなく、予め決められている。また値は文字列でなければならない。例えば VisibilityTimeout
に設定する値は秒数なので数値になるはずだが、値を設定する場合は "50"
のように文字列として指定する。
デッドレターキューを設定する
プログラムは、キューに登録されたメッセージを取得し、目的の処理を行う。その目的の処理を終えると、レシートハンドルに基いてキューからメッセージを削除する。その途中でメッセージを削除する前にエラーが発生した場合、メッセージはキューの中に残り続ける。そのメッセージが絶対に処理できない状態であった場合、メッセージを取得し、処理しようとするがエラーし削除できない。そしてまたメッセージが復活するという循環から抜け出せなくなってしまう。もし、このようなメッセージがどんどん増えて1万件になったならどうだろう。絶対に嫌だ。
このような状態を避けるために、プログラムがメッセージを受信した回数に応じて、ある回数を越えたメッセージは処理不能と判断し、別のキューにメッセージを登録し、元のキューからはメッセージを削除する。この別のキューのことをデッドレターキューと言う。
デッドレターキューはキューの RedrivePolicy
属性として登録される2。 RedrivePolicy
には、デッドレターキューのARNを deadLetterTargetArn
に、受信の最大回数を maxReceiveCount
に設定する。つまり maxReceiveCount
を超えたメッセージは処理不能として deadLetterTargetArn
に設定したキューに移される。
まずはデッドレターキュー自体が無いと、デッドレターキューの設定はできないので、デッドレターキューを aws sqs create-queue
で作成する。
{
"QueueName": "dl_q"
}
作成されたキューのARNを使い、既存のキューにデッドレターキューを設定する。設定は aws sqs set-queue-attributes
で行う。
{
"QueueUrl": "http://localhost:4566/000000000000/testing_q",
"Attributes": {
"RedrivePolicy": "{\"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:000000000000:dl_q\",\"maxReceiveCount\":\"2\"}"
}
}
この設定では2回目のメッセージを受信した後、可視性タイムアウトの時間切れまでにメッセージの削除が行なわれない場合、そのメッセージはデッドレターキューに移される。