概要 🔗
aws-sdk-go-v2 の s3 upload manager は goroutine セーフなので、並列にならべた goroutine から同時に呼べる。
It is safe to call Upload() on this structure for multiple objects and across concurrent goroutines.
cf: https://docs.aws.amazon.com/sdk-for-go/api/service/s3/s3manager/#Uploader
ざっくりとした実装イメージは以下のようになる。
func upload(files []io.Reader) {
uploader := manager.NewUploader(s3.NewFromConfig(cfg))
for _, f := range files {
go func(file io.Reader) {
_, err = uploader.Upload(ctx, &s3.PutObjectInput{
Bucket: "<bucket name>",
Key: aws.String("<filename>"),
Body: file,
})
// errorgroup などで handle error。
}(f)
}
// errogroup などで wait
}
ところが、files が多い場合、以下のようなエラーを踏む。
operation error S3: PutObject, failed to get rate limit token, retry quota exceeded, 1 available, 5 requested
原因 🔗
AWS SDK クライアントには
Retryer
という interface が用意されていて、これにリトライするエラーの判定やリトライ間隔、最大試行回数などを設定できる。
ここで注意すべきなのが、Retryerは内部に
RateLimiter
なるものを持っており、サービスクライアント横断的にリトライの同時実行数を制御することができる作りになっている。
具体的には、TokenBucket というバケツの中に「リトライ予算」みたいなものが入っていて、リトライするたびにこのバケツからトークンを取り出し、成功したら返す。
リトライしようとした時にトークンが不足していると、
QuotaExceededError
が返る。
上述のエラーの状況は、だいたいこんな雰囲気。
対応1: リトライにかかるコストを0にする 🔗
利用者側で、「リトライ時にバケットから取り出すコスト」に介入することができる。
これを 0 に指定することで、リトライの rate limit を無効化することができる。
func upload(files []io.Reader) {
cfg, _ := config.LoadDefaultConfig(
context.TODO(),
config.WithRetryer(func() aws.Retryer {
return retry.NewStandard(func(so *retry.StandardOptions) {
// ここでリトライにかかるコストを0にする。
so.RetryCost = 0
})
}),
)
uploader := manager.NewUploader(s3.NewFromConfig(cfg))
for _, f := range files {
go func(file io.Reader) {
_, err = uploader.Upload(ctx, &s3.PutObjectInput{
Bucket: "<bucket name>",
Key: aws.String("<filename>"),
Body: file,
})
// errorgroup などで handle error。
}(f)
}
// errogroup などで wait
}
ただ、不安定な状況に追い打ちをかけることになる懸念があり、対応としてはアグレシッブ。
使えるかどうかは状況に応じて見極めが必要だろう。
対応2: QuotaExceededError をハンドリングする。 🔗
QuotaExceededError が返ってくることはわかっているので、これを拾いに行く。
retryer の層はあくまで AWS サービスが返してくるエラーに応じた判定をするところなので、Upload() の呼び出し側で制御する。
雰囲気としてはこんな感じ。
func upload(files []io.Reader) {
uploader := manager.NewUploader(s3.NewFromConfig(cfg))
for _, f := range files {
go func(file io.Reader) {
var err error
for i := 0; i < maxRetryAttempt; i++ {
_, err = uploader.Upload(ctx, &s3.PutObjectInput{
Bucket: "<bucket name>",
Key: aws.String("<filename>"),
Body: file,
})
if err == nil {
break
}
if errors.As(err, ratelimit.QuotaExceededError) {
// exponential backoff
}
}
if err != nil {
// errorgroup などで handle error。
}
}(f)
}
// errogroup などで wait
}
対応3: 並列数を制御する 🔗
semaphore なんかを使って、並列数を制御する。
func upload(files []io.Reader) {
uploader := manager.NewUploader(s3.NewFromConfig(cfg))
sem := semaphore.NewWeighted(int64(concurrency))
for _, f := range files {
if err := sem.Acquire(ctx, 1); err != nil {
// handle error
}
go func(file io.Reader) {
defer sem.Release(1)
_, err = uploader.Upload(ctx, &s3.PutObjectInput{
Bucket: "<bucket name>",
Key: aws.String("<filename>"),
Body: file,
})
// errorgroup などで handle error。
}(f)
}
// errogroup などで wait
}
ちなみに、uploader.Concurrency は
The number of goroutines to spin up in parallel per call to Upload
とのことなので、「内部で分割してアップロードする時の goroutine の数」なので、Upload() 自体を待ったなしで呼ぶような最初の実装の時は、並列数の制御が効いてないので注意。
例えば CLI の実装のように、使う側である程度制御可能なのであれば「対応1」がリーズナブル。
一方でサーバープロセスなどサーキットブレーカー的な層がないと被害が拡大していくような場合は「対応3」できちんと並列数を制御した上で、「対応2」でリトライできるようになるまでお行儀よく待つ、みたいな対応がよいのではなかろうか。