Webサービスを開発しているとSQSのようなメッセージキューを使用したくなるシーンはとても多い。そ用途は遅延タスクの管理に用いたり、ユーザーのメッセージを処理するための一時的なデータストアにしたりと様々だ。また最近ではGoを用いてサーバーサイドの実装をするケースも多い。そこで今回はGoでSQSを操作する方法を確認する。
Queueの作成
まずはQueueを作成する。Queueの作成はGoで実施することはあまりないため1ここではAWS CLIを用いてQueueを作成する。
aws sqs create-queue --queue-name testing
{ "QueueUrl": "http://localhost:4566/000000000000/testing" }
QueueUrlはQueueにアクセスするためのURLとなっている。この例ではLocalStackというダミーサーバーを用いているため、上記のURLとなっている。以降はこのURLを使用する。
メッセージの送信
メッセージの送信にはSendMessage関数を使用する。
package main
import (
"fmt"
"github.com/aws/aws-sdk-go/service/sqs"
)
func main() {
sqs_service := NewSQSService()
queue_url := "http://localhost:4566/000000000000/testing"
message := "test message"
send_message_input := sqs.SendMessageInput{
QueueUrl: &queue_url,
MessageBody: &message,
}
send_message_output, err := sqs_service.SendMessage(&send_message_input)
if err != nil {
panic(err.Error())
}
fmt.Println(send_message_output)
}
実行するとメッセージが送信される。
go run send.go lib.go
{ MD5OfMessageBody: "c72b9698fa1927e1dd12d3cf26ed84b2", MessageId: "9cf75318-82f2-b955-f368-add517a18301" }
SendMessageRequest関数も存在する。こちらのほうがSendMessage関数よりいろいろなことを処理してくれる。今回はその必要性がなかったためより原始的な関数であるSendMessage関数を使用した。
メッセージの受信
メッセージの受信にはReceiveMessage関数を使用する。
package main
import (
"fmt"
"github.com/aws/aws-sdk-go/service/sqs"
)
func main() {
sqs_service := NewSQSService()
queue_url := "http://localhost:4566/000000000000/testing"
receive_message_input := sqs.ReceiveMessageInput{
QueueUrl: &queue_url,
}
receive_message_output, err := sqs_service.ReceiveMessage(&receive_message_input)
if err != nil {
panic(err.Error())
}
fmt.Println(receive_message_output)
}
実行するとメッセージを受信できる。
go run recv.go lib.go
{ Messages: [{ Attributes: { SenderId: "AIDAIT2UOQQY3AUEKVGXU", SentTimestamp: "1646379882023", ApproximateReceiveCount: "1", ApproximateFirstReceiveTimestamp: "1646379942125" }, Body: "test message", MD5OfBody: "c72b9698fa1927e1dd12d3cf26ed84b2", MessageId: "beeef071-7eac-f524-46bd-9f67c34e6043", ReceiptHandle: "urwuxudzwbvpyugssupifgqtopwnrnoiznznsboiieeeycartacsbjiglcdqavfubklwzffwrrdubnihlzjtbbknepwfbphpknclhmwgnjxarljsfjsftaufcnnuzlufqudiskoceerioquypvktmunqkivqurfowpjocalwghrjhqfzbmfpwwice" }] }
ReceiveMessageRequest関数も存在する。こちらのほうがReceiveMessage関数よりいろいろなことを処理してくれる。今回はその必要性がなかったためより原始的な関数であるSendMessage関数を使用した。このあたりの実装はSendMessageとほぼ同様の対応関係となっている。
受信したメッセージを削除する
SQSのメッセージはRedisのSUBやRPOPとは異なりメッセージの削除処理を実行しないとVisibility Timeoutの秒数分だけ見えなくなったのち復活してくる。メッセージの削除にはメッセージを受信する時に得られるReceipt Handleの値が必要となる。
先程受信したメッセージの削除を行う。
package main
import (
"fmt"
"github.com/aws/aws-sdk-go/service/sqs"
)
func main() {
sqs_service := NewSQSService()
queue_url := "http://localhost:4566/000000000000/testing"
receipt_handle := "urwuxudzwbvpyugssupifgqtopwnrnoiznznsboiieeeycartacsbjiglcdqavfubklwzffwrrdubnihlzjtbbknepwfbphpknclhmwgnjxarljsfjsftaufcnnuzlufqudiskoceerioquypvktmunqkivqurfowpjocalwghrjhqfzbmfpwwice"
delete_message_input := sqs.DeleteMessageInput{
QueueUrl: &queue_url,
ReceiptHandle: &receipt_handle,
}
result, err := sqs_service.DeleteMessage(&delete_message_input)
if err != nil {
panic(err.Error())
}
fmt.Println(result)
}
実行するとメッセージが削除削除される。
go run delete.go lib.go
{ }
Go Module関連ファイル
使用した各種ライブラリの情報を掲載しておく。
go.mod
module example
go 1.17
require (
github.com/aws/aws-sdk-go v1.43.11 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
)
go.sum
github.com/aws/aws-sdk-go v1.43.11 h1:NebCNJ2QvsFCnsKT1ei98bfwTPEoO2qwtWT42tJ3N3Q=
github.com/aws/aws-sdk-go v1.43.11/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
SQSのキューの種類
SQSには標準キューとFIFOキューの2種類が存在する。それぞれに特徴と料金が異なる2。大まかに考えるなら、標準キューは雑だけれど大量に処理でき、FIFOキューはちゃんとしてる代わりに制限を受けるという感じだろう。また標準キューの方が安い3。ただ料金的な話であれば、SQS自体がかなり安いので、料金であまり気にしなければならないようなケースには遭遇した事はない。
標準キュー
標準キューのメッセージは必ず1度は配信されるが、同一メッセージを重複して配信するケースがある。そのため1つのメッセージを、1度しか配信してはいけないようなシステムには組み込めない。また、メッセージの順序は大体登録された順序で配信されるが、順序が入れ替わる事もある。そのため、順序が入れ替わってはいけないシステムには組み込めない。
このようにやや雑とも言えるようなな処理が行なわれる。その代わりに1秒間あたりのAPIの制限を受けない。
つまり、捌くメッセージの量が多く、重複配信されようが、順序が多少入れ替わろうが、気にしない場合には標準キューが使い所という事になる。逆にいうと、それが許容できない場合、FIFOキューを使うという事だろう。
FIFOキュー
FIFOキューでは1つのメッセージは常に1回だけ配信される。また先入れ先出し(つまりFIFO)で配信される。SQSに登録した順序でメッセージが配信されるということだ。FIFOキューでは各種SQSに対するAPI呼び出しに対し1秒あたりの制限が設定されている。例えば大量にメッセージを捌く必要がある場合には、この制限が問題になるかもしれない。そのためよく考えておく必要がある。こういったシステム的な制約が問題になった時に、それを回避する事が困難だったりするからだ。自分達が管理するシステムにどの程度の実力があるのかということは、把握しておく必要がある。システムに実力があるに越したことはないが、無制限に実力を追い求めるのは意味がない。強力に幅広く対応するという事にはコストが発生するからだ。コストは管理コストや運用コストも考えて判断しなければならないし、システムの複雑性を増すことだってコストになりえる。ちょうど良さが必要ということだ。
標準キューである程度、できること又はできないことを整理したため、だいたい想像が付いていたかもしれない。
参考
https://docs.aws.amazon.com/ja_jp/sdk-for-go/v1/developer-guide/sqs-example-create-queue.html