概要 🔗
Google Cloud の Cloud Spanner で特定のテーブルの中身を毎日(あるいは定期的に)テキストファイルとして出力したい、という要件がありました。
最終的に terraform module として構築したので、実装内容を記載します。
アウトプットのイメージ 🔗
要件としては、以下の通りです。
- Cloud Spanner の指定したテーブルが毎日 GCS に出力されること
- 出力される場所は
gs://<bucket>/<YYYY-MM-DD>/のように日付ごとに分けたい - データフォーマットは何でも良い
- terraform で管理されている
出力部分 🔗
基本的な発想としては、google cloud が提供してくれている Cloud Spanner to Cloud Storage Text という dataflow テンプレートがあるので、これを定期実行する仕組みを構築します。
現状、テーブル指定での出力に対応しているのはこのテンプレートだけで、データフォーマットは CSV しか指定できないようだったので、今回は CSV で出力することとしました。(もし JSONL や avro など、他のデータフォーマットが必要な場合は、独自の dataflow テンプレートを作成するか、 この辺り が使えるかもしれません。)
定期実行部分 🔗
Dataflow 自体は定期実行の仕組みを持っておらず、 よくある質問 によれば、
パイプラインの実行を自動化するには、次の操作を行います。
- Cloud Scheduler を使用する 。
- Cloud Composer ワークフロー で、Apache Airflow の Dataflow Operator (いずれか 1 つの Google Cloud Operator )を使用する。
- Compute Engine 上でカスタム(cron)ジョブプロセスを実行する。
とのことだったので、定期実行は cloud scheduler にお任せする形にします。
cloud scheduler から直接 dataflow を kick する場合には、
community tutorial
によると dataflow のエンドポイントに直接リクエストを投げるような cloud scheduler を用意するのですが、これだと 2. 出力される場所は "gs://<bucket>/<YYYY-MM-DD>/" のように日付ごとに分けたい という要件を満たすのが難しいです(出力先は textWritePrefix で指定する形になるのですが、cloud scheduler のリクエストが固定値であるため)。
そこで、間に cloud functions を挟んで、そこから起動するような形にします(*1)。
*1: cloud scheduler は HTTP リクエストを定期実行できるので直接 cloud functions の HTTP トリガーで起動することも検討しましたが、 公式のチュートリアル には cloud pub/sub 経由で起動する内容が紹介されていたのでそれに倣う形で実装しました。
実装 🔗
では、実装内容を紹介していきます。
dataflow の job を実行するサービスアカウント 🔗
dataflow の job を実行するためには roles/dataflow.worker ロールが必要なので、サービスアカウントを作成し、ロールを割り当てます (dataflow のアクセス権限については
公式ドキュメント
を参照してください)。
resource "google_service_account" "runner" {
account_id = "{任意のID}"
display_name = "{任意の表示名}"
}
resource "google_project_iam_member" "df-table-dump-runner-iam" {
role = "roles/dataflow.worker"
member = "serviceAccount:${google_service_account.runner.email}"
}
タスクキューを管理する pubsub トピック 🔗
pubsub のトピックを作ります。
resource "google_pubsub_topic" "pubsub-schedule" {
name = "{任意のトピック名}"
}
dataflow job をキックする cloud function 🔗
ここで少しコードを記述する必要があります。 cloud functions で実行する関数の書き方は 公式のドキュメント に詳しく書かれています。
package kicker
import (
"context"
"encoding/json"
"fmt"
"time"
dataflow "google.golang.org/api/dataflow/v1b3"
)
const dfTemplate = "gs://dataflow-templates/2021-06-21-00_RC00/Spanner_to_GCS_Text"
type kickRequest struct {
JobName string
ProjectID string
SpannerDatabaseID string
SpannerInstanceID string
SpannerTable string
OutBucketName string
WorkerRegion string
}
type PubSubMessage struct {
Data []byte `json:"data"`
}
func Kick(ctx context.Context, m PubSubMessage) error {
svc, err := dataflow.NewService(ctx)
if err != nil {
return err
}
req := new(kickRequest)
if err := json.Unmarshal(m.Data, req); err != nil {
return err
}
params := &dataflow.LaunchTemplateParameters{
Environment: &dataflow.RuntimeEnvironment{
MaxWorkers: 1,
NumWorkers: 1,
TempLocation: tempLocation(req.OutBucketName),
},
JobName: req.JobName,
Parameters: map[string]string{
"spannerProjectId": req.ProjectID,
"spannerDatabaseId": req.SpannerDatabaseID,
"spannerInstanceId": req.SpannerInstanceID,
"spannerTable": req.SpannerTable,
"textWritePrefix": textWritePrefix(req.OutBucketName, req.SpannerTable),
},
}
_, err = svc.Projects.Locations.Templates.Launch(req.ProjectID, req.WorkerRegion, params).GcsPath(dfTemplate).Do()
return err
}
func textWritePrefix(bucketName, tableName string) string {
return fmt.Sprintf("gs://%s/%s/%s", bucketName, time.Now().Format("2006-01-02"), tableName)
}
func tempLocation(bucketName string) string {
return fmt.Sprintf("gs://%s/%s/temp", bucketName, time.Now().Format("2006-01-02"))
}
svc, err := dataflow.NewService(ctx) でサービスインスタンスを生成している箇所は、グローバルスコープに定義しておくと、コールドスタートのタイミングでのみ評価されて、2回目以降はインスタンスを再利用できるのでもしかしたらそっちの方がいいかもしれません (今回は、1日に1回しか実行されないことと、処理時間をシビアに実装する必要がなかったので、使い捨てる形にしています) 。
cloud functions に関しては、コールドスタートやシークレットの管理方法でいくつかプラクティスがあるので、機会があれば別記事を書きます。
出来上がったコードは、zip に固めて GCS に配置します。zip 内のディレクトリ構成についても縛りがあるので、 公式のドキュメント が参考になります。
結論としては、以下のように関数の実装をモジュール化して go.mod が含まれていれば大丈夫です。
.
├── function.go
├── go.mod
└── go.sum
ただし、モジュール名は kicker のような適当な名前にはできなくて、ちゃんと github.com/<owner>/<repo> のようになっている必要があります。
余談ですが、dataflow には リージョン エンドポイント という概念があり、dataflow の job をどこで実行するかを制御することができます。 go の dataflow サービスから job を実行する方法としては、以下の2通りが可能ですが、
svc.Projects.Locations.Templates.Launch(...)svc.Projects.Templates.Launch(...)リージョンエンドポイントを使うには、前者の方法で起動する必要があります。
また、google が提供しているテンプレートは ここ にまとまっているので、差し替えれば任意の template を実行できます。
参考:
- Method: projects.templates.launch | Cloud Dataflow | Google Cloud
- Method: projects.locations.templates.launch | Cloud Dataflow
これを zip で固めたら、storage object を作成します。
ここではすでに存在するバケットを、外部から terraform module に渡せる設計にしています。
resource "google_storage_bucket_object" "kicker_source" {
name = "{任意のファイル名}"
bucket = "{ソースを格納するバケット名}"
source = "${path.module}/{モジュールルートからの相対パス}"
}
これを使って、cloud functions を作ります。
resource "google_cloudfunctions_function" "kicker" {
name = "{任意の名前}"
runtime = "go113"
available_memory_mb = 128
source_archive_bucket = "{ソースのバケット名}"
source_archive_object = google_storage_bucket_object.kicker_source.name
entry_point = "Kick"
event_trigger {
event_type = "google.pubsub.topic.publish"
resource = google_pubsub_topic.pubsub-schedule.id
failure_policy {
retry = false
}
}
}
cloud scheduler を作る 🔗
最後にスケジューラーを追加して完成です。
resource "google_cloud_scheduler_job" "scheduler" {
name = "{任意のスケジューラー名}"
schedule = "{実行時間のcrontab}"
region = "{スケジューラーのリージョン}"
time_zone = "{タイムゾーン}"
pubsub_target {
topic_name = google_pubsub_topic.pubsub-schedule.id
data = base64encode(<<-EOT
{
"jobName": "{任意のjob名}",
"projectId": "{プロジェクトID}",
"spannerDatabaseId": "{spannerのデータベースID}",
"spannerInstanceId": "{spannerのインスタンスID}",
"spannerTable": "{spannerのテーブル名}",
"outBucketName": "{出力先のバケット名}",
"workerRegion": "{jobを実行するリージョン}"
}
EOT
)
}
}
参考 🔗
- terraform 関連
- google cloud 関連
- go 関連