« ^ »

Go+Ginで実装したAPIにバックグランドタスクを追加する

所要時間: 約 3分

Ginで実装しているAPIにバックグラウンドで実行するタスクを追加したくなったので、 小さな構成を作成して、挙動を確認することにした。

仕様

要求

  1. TCP 8000でポートを開きHTTPサーバを稼動させる。
  2. HTTPサーバは GET / に対して特定の文字列を返す。
  3. 2)で返すBODYはS3互換オブジェクトストレージのexampleバケットのobject.txtキーで取得できるファイルの中身とする。
  4. 2)で返すデータは1分間キャッシュする。またキャッシュはアプリケーションで行う。
  5. HTTPサーバはSIGTERMを受信すると5秒以内に停止する。

制限

  1. 言語はGolang 1.17を使用する。
  2. HTTPサーバーのためのアプリケーションフレームワークとしてGinを使用する。

設計

システム構成

https://res.cloudinary.com/symdon/image/upload/v1652413591/blog.symdon.info/1652408244/sysmte-architecture_dhtaaf.png

各種コンポーネント

https://res.cloudinary.com/symdon/image/upload/v1652413591/blog.symdon.info/1652408244/component-architecture_fbsb4b.png

アクティビティ

HTTP Serverが起動した

  1. MinIOのexampleバケットからobject.txtを取得する。
  2. 取得したobject.txtのデータをCached Valueにコピーする。
  3. バックグラウンドジョブを開始する。
  4. APIサーバーを設定する。
  5. APIサーバーを起動する。

clientが GET / を受信した

  1. リクエストを受信する。
  2. Cached Valueを用いてResponseを生成する。
  3. 生成したResponseを返す。

HTTP Serverのバックグラウンドジョブが起動してから1分立った

  1. 1分立ったイベントを受信する。
  2. MinIOのexampleバケットからobject.txtを取得する。
  3. 取得したobject.txtのデータをCached Valueにコピーする。

HTTP Serverのバックグラウンドジョブ中にエラーが発生した

  1. エラーの発生をメインスレッドに通知する。
  2. メインスレッドを異常終了する。

HTTP ServerがSIG TERMを受信した

  1. バックグラウンドジョブを停止する。
  2. メインスレッドを停止する。

実装

main.go::

package main

import (
	"time"
	"io"
	"io/ioutil"
	"bufio"
	// "bytes"
	"fmt"
	"github.com/aws/aws-sdk-go/aws"
	aws_session "github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/s3"
	"github.com/aws/aws-sdk-go/service/s3/s3manager"
	"github.com/gin-gonic/gin"
	"github.com/rs/zerolog/log"
)

var CachedValue string


func FetchNewCachedValueFromS3 (input *s3.GetObjectInput, awsConfig *aws.Config) (string, error) {
	fp, err := ioutil.TempFile("", "example")
	defer fp.Close()
	sess := aws_session.New(awsConfig)
	downloader := s3manager.NewDownloader(sess)
	downloadedByteSize, err := downloader.Download(fp, input)
	if err != nil {
		panic(err)
	}
	log.Debug().Int64("downloadedByteSize", downloadedByteSize).Msg("Downloaded response data from S3")
	readSeeker := io.ReadSeeker(fp)
	offset, err := readSeeker.Seek(0, 0)
	if err != nil {
		panic(err)
	}
	log.Debug().Int64("offset", offset).Msg("Seeked data")
	if offset != 0 {
		log.Fatal().Msg("Failed to seek")
		panic("Seek error")
	}
	reader := bufio.NewReader(fp)
	line, _, _ := reader.ReadLine()
	return string(line), nil 
}


func main() {
	awsRegion := "ap-northeast-1"
	awsBucketName := "example"
	awsObjectKey := "testing/foo/bar"
	awsOrigin := "http://localhost:9000"

	fmt.Println("Ok")

	awsConfig := &aws.Config{
		Region: aws.String(awsRegion),
	}
	if awsOrigin != "" {
		awsConfig.Endpoint = aws.String(awsOrigin)
		awsConfig.S3ForcePathStyle = aws.Bool(true)
	}
	input := &s3.GetObjectInput{
		Bucket: aws.String(awsBucketName),
		Key:    aws.String(awsObjectKey),
	}
	newValue, err := FetchNewCachedValueFromS3(input, awsConfig)
	if err != nil {
		panic("Failed to fetch data")
	}
	CachedValue = newValue

	r := gin.Default()
	r.GET("/", func (c *gin.Context) {
		c.String(200, CachedValue)
		return
	})

	ticker := time.NewTicker(3 * time.Second)
	stop := make(chan bool)
	go func() {
	loop:
		for {
			select {
			case t := <-ticker.C:
				fmt.Println("Tick at", t)
				newValue, err := FetchNewCachedValueFromS3(input, awsConfig)
				if err != nil {
					panic("Failed to fetch data")
				}
				CachedValue = newValue
			case <-stop:
				break loop
			}
		}
		fmt.Println("Reachable!")
	}()	
	r.Run("127.0.0.1:8000")
	
}

go.mod::

module github.com/TakesxiSximada/symdon/pages/posts/1652408244

go 1.17

require (
	github.com/aws/aws-sdk-go v1.44.13
	github.com/gin-gonic/gin v1.9.1
	github.com/rs/zerolog v1.26.1
)

require (
	github.com/bytedance/sonic v1.9.1 // indirect
	github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
	github.com/gabriel-vasile/mimetype v1.4.2 // indirect
	github.com/gin-contrib/sse v0.1.0 // indirect
	github.com/go-playground/locales v0.14.1 // indirect
	github.com/go-playground/universal-translator v0.18.1 // indirect
	github.com/go-playground/validator/v10 v10.14.0 // indirect
	github.com/goccy/go-json v0.10.2 // indirect
	github.com/jmespath/go-jmespath v0.4.0 // indirect
	github.com/json-iterator/go v1.1.12 // indirect
	github.com/klauspost/cpuid/v2 v2.2.4 // indirect
	github.com/leodido/go-urn v1.2.4 // indirect
	github.com/mattn/go-isatty v0.0.19 // indirect
	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
	github.com/modern-go/reflect2 v1.0.2 // indirect
	github.com/pelletier/go-toml/v2 v2.0.8 // indirect
	github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
	github.com/ugorji/go/codec v1.2.11 // indirect
	golang.org/x/arch v0.3.0 // indirect
	golang.org/x/crypto v0.9.0 // indirect
	golang.org/x/net v0.10.0 // indirect
	golang.org/x/sys v0.8.0 // indirect
	golang.org/x/text v0.9.0 // indirect
	google.golang.org/protobuf v1.30.0 // indirect
	gopkg.in/yaml.v3 v3.0.1 // indirect
)