« ^ »

SQSを使う

所要時間: 約 6分

準備

今回は localstack というダミーサーバを起動し、SQSの代わりとして利用する。また localstack はDockerを用いて起動する事にする。

docker run -it --rm \
     --publish="127.0.0.1:4566:4566" \
     --publish="127.0.0.1:4571:4571" \
     --publish="127.0.0.1:8080:8080" \
     --expose="4566" \
     --expose="4571" \
     --expose="8080" \
     --workdir="/workdir" \
     --volume "$(pwd):/workdir" \
     --name="localstack" \
     localstack/localstack:0.12.6

SQSの料金はとても安いため、動作確認時においてもAWSにリソースを作った方が、効率的である事が多い。ただし例えばunittest用など諸般の事情でローカルにダミーのSQSを用意して、動作確認したい事もある。 localstack はAWSのダミーサーバで、AWSが提供しているマネージドサービスの検証用のサーバとして使用できる。

キューの種類

標準キューとFIFOキューの2種類がある。標準キューは緩い配信処理で、順序は保証されず、重複して配信してしまう事もある。その代わり、APIの呼び出し制限がほぼ無い。FIFOキューはそれよりも厳しい配信処理で、必ず1回だけ配信され、また先に登録したメッセージが先に配信される。その代わり、APIの呼び出し制限がそこそこある1

実行方法

AWSのリソースの操作は幾つかの方法がある。ここではAWS CLIを使用し --generate-cli-skeleton によって作成したJSON形式のCLIスケルトンファイルを、AWS CLIに読み込ませる方法で実行する。

aws sqs create-queue --generate-cli-skeleton
aws sqs create-queueコマンドのCLIスケルトンファイルを生成する例

生成したCLIスケルトンファイルを保存し、必要な情報をCLIスケルトンファイルに書き込んだ後 --cli-input-json とCLIスケルトンファイルを指定してコマンドを実行する。

aws sqs create-queue --cli-input-json file://queue.json
CLIスケルトンファイルを使用してaws sqs create-queueを実行する例

各種操作

キューを作る

キューを作るには作成要求を送信する。

{
    "QueueName": "testing_q",
    "Attributes": {
        "KeyName": ""
    },
    "tags": {
        "KeyName": ""
    }
}

キューの名前などを指定する。成功するとキューにアクセスするためのURLが払い出される。

メッセージをキューに登録する

キューにメッセージを登録するには送信要求を送信する。

{
    "QueueUrl": "http://localhost:4566/000000000000/testing_q",
    "MessageBody": "AAAA"
}

MessageBody には登録するメッセージを指定する。バイナリデータの場合はbase64で符号化する等して、文字列で表現可能な形式になるよう直列化し登録する。

キューに入っているメッセージを1つ取得する

キューからメッセージを取得するには受信要求を送信する。

{
    "QueueUrl": "http://localhost:4566/000000000000/testing_q",
    "VisibilityTimeout": 1,
    "WaitTimeSeconds": 2
}

登録したメッセージを取得できる。取得したメッセージは VisibilityTimeout で指定した秒数の間は再取得できなくなる。この時間が経過した後は、再度取得できる。またレシートハンドルという値も同時に取得できる。レシートハンドルはメッセージを削除する時に使用する。

取得したメッセージをキューから削除する

メッセージは取得しただけでは削除されない。削除したければ、このメッセージを削除するという要求を送る必要がある。

{
    "QueueUrl": "http://localhost:4566/000000000000/testing_q",
    "ReceiptHandle": "czeonzsmfezmchcoiuxnpmqsfwzmjcqdohsijrgqycamujngnqgapvvfckrjlkejpmsorqlrcvxvuqheyumnkzsjlelkxilukzdppjyrxrjcjaepxzhoduowdlnqfdlpfmddluuojpjpxqadmrsubijjtydeuezeddvpbugjmbcpvvclxmgbqwecn"
}

メッセージを取得した時に、レシートハンドルという値も同時に取得できる。この値はメッセージと紐付いており、レシートハンドルを指定して削除要求を送信する事でメッセージを削除できる。レシートハンドルは不変なIDというわけではなく、同一のメッセージを再度取得した場合、メッセージが同じであっても、レシートハンドルは別の値となる。

キューの属性

キューにはそれぞれ属性がある。設定された属性の値により、キューの挙動が変化する。

属性を取得する

キューに設定されている属性を取得する。

{
    "QueueUrl": "http://localhost:4566/000000000000/testing_q"
}
{
    "Attributes": {
        "ApproximateNumberOfMessages": "0",
        "ApproximateNumberOfMessagesDelayed": "0",
        "ApproximateNumberOfMessagesNotVisible": "0",
        "CreatedTimestamp": "1684372206.490537",
        "DelaySeconds": "0",
        "LastModifiedTimestamp": "1684382380.375464",
        "MaximumMessageSize": "262144",
        "MessageRetentionPeriod": "345600",
        "QueueArn": "arn:aws:sqs:us-east-1:000000000000:testing_q",
        "ReceiveMessageWaitTimeSeconds": "0",
        "VisibilityTimeout": "30"
    }
}
取得できる属性の例

属性を設定する

キューに属性を設定する。

{
    "QueueUrl": "http://localhost:4566/000000000000/testing_q",
    "Attributes": {
        "VisibilityTimeout": 50
    }
}

設定できる属性名は、自由に決められるわけではなく、予め決められている。また値は文字列でなければならない。例えば VisibilityTimeout に設定する値は秒数なので数値になるはずだが、値を設定する場合は "50" のように文字列として指定する。

デッドレターキューを設定する

プログラムは、キューに登録されたメッセージを取得し、目的の処理を行う。その目的の処理を終えると、レシートハンドルに基いてキューからメッセージを削除する。その途中でメッセージを削除する前にエラーが発生した場合、メッセージはキューの中に残り続ける。そのメッセージが絶対に処理できない状態であった場合、メッセージを取得し、処理しようとするがエラーし削除できない。そしてまたメッセージが復活するという循環から抜け出せなくなってしまう。もし、このようなメッセージがどんどん増えて1万件になったならどうだろう。絶対に嫌だ。

このような状態を避けるために、プログラムがメッセージを受信した回数に応じて、ある回数を越えたメッセージは処理不能と判断し、別のキューにメッセージを登録し、元のキューからはメッセージを削除する。この別のキューのことをデッドレターキューと言う。

デッドレターキューはキューの RedrivePolicy 属性として登録される2RedrivePolicy には、デッドレターキューのARNを deadLetterTargetArn に、受信の最大回数を maxReceiveCount に設定する。つまり maxReceiveCount を超えたメッセージは処理不能として deadLetterTargetArn に設定したキューに移される。

まずはデッドレターキュー自体が無いと、デッドレターキューの設定はできないので、デッドレターキューを aws sqs create-queue で作成する。

{
    "QueueName": "dl_q"
}

作成されたキューのARNを使い、既存のキューにデッドレターキューを設定する。設定は aws sqs set-queue-attributes で行う。

{
    "QueueUrl": "http://localhost:4566/000000000000/testing_q",
    "Attributes": {
        "RedrivePolicy": "{\"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:000000000000:dl_q\",\"maxReceiveCount\":\"2\"}"
    }
}

この設定では2回目のメッセージを受信した後、可視性タイムアウトの時間切れまでにメッセージの削除が行なわれない場合、そのメッセージはデッドレターキューに移される。

コンテンツベースの重複削除

SQSには、同一と見做す事ができるメッセージの重複を削除できる。通常、重複しているかどうかは、DedupulcationIdが同一かどうかで見分ける。ただし設定で、メッセージBodyがどういつかどうかで重複と見分けるようにする事もできる。

https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html

フライト中に同一メッセージを送信するとどうなる?

フライト中(つまりReceiveMessageされたけれど、まだDeleteMessageされていない)のメッセージと同一と判定されるメッセージを送信した場合はどうなるか、気になったので調べた。

前提

設定によって挙動も変わるため、今回は以下の条件で挙動がどうなるかを確認する。

  • FIFOキュー
  • コンテンツベースの重複排除
  • メッセージグループ単位での重複排除

メッセージはSQSに保存されAvailableとして扱われるが、フライト中のメッセージが除外されるまで、メッセージを取得することはできない。

先に送信したフライト中のメッセージをメッセージA、メッセージAがフライト中の状態で送信したメッセージをメッセージBとする。メッセージBは、メッセージAと、同じ内容、メッセージグループとする。

メッセージAがフライト中にメッセージBを送信すると、メッセージBはSQSに保存され、Availableとして扱われる。ただしFIFOキューの場合、メッセージBはメッセージAが除外されるまで、取得できない。