tag:["google-workflows", "google-bigquery", "google-cloud-spanner"]
データベースにCloud Spannerを採用しているプロジェクトでは、ユーザ利用分析や問題発生時の原因特定など、Cloud SpannerにあるデータをBigQueryにとりあえず全部持ってきていろいろ分析したいことも出てくるのではないでしょうか。 ただそのためには、一般的にSpannerからデータを取得する処理やデータをBigQueryにロードする処理を用意してApache Airflow等のワークフローエンジンで実行することになり、実行しようとするとやや腰が重いのではないかと思います。 そこでこのノウハウではフルマネージドなワークフローエンジンであるWorkflowsを利用することで手軽に指定したSpannerのdatabaseの全テーブルをBigQueryに定期コピーする例を紹介します。
この例ではSpannerからのデータ取得にあたって、Googleが提供するDataflow Templateの一つである、Cloud_Spanner_to_GCS_Avro を利用します。 このTemplateは指定したSpannerのdatabase配下の全テーブルのデータをテーブル別に区別されたAvroファイルとして保存することができます。 BigQueryではこのAvro形式のファイルのデータロードをサポートしているので、Workflowsを使ってこの公式Templateを起動してSpannerデータをAvroファイルとして出力し、テーブルごとにBigQueryにAvroファイルをロードするJobを実行するワークフローを作成しCloud Schedulerから定期実行します。
なお、この例ではGoogle公式Templateが以下の仕様を満たすことを前提とします。
- テーブルデータのAvroファイルは以下の命名規則により生成されるディレクトリ配下に出力される
{指定したGCSパス}/{spannerInstanceID}-{spannerDatabaseID}-{dataflowJobID}/
- 各テーブルのAvroファイルが上記ディレクトリ直下に以下命名規則により出力される
{tableName}.avro-*
- 全テーブル情報のリストを以下形式で含むファイルが上記ディレクトリ直下にspanner-export.jsonとして出力される
{"tables": [{"name": tableName, ... }...]}
Workflowsの設定は spanner-to-bigquery.yaml の通りです。 steps項目で指定したstepが基本的には上から順に実行されます。 この例では以下のstepが順に実行されます。
- init: 定数や変数を定義する
- check_existing_export: 既存のSpanner export結果があるかチェックする
- export_spanner: Dataflow Templateを実行してSpannerからデータをexportする
- set_export_directory: DataflowのjobIdからexport先のディレクトリ名を生成して変数として設定する
- get_spanner_export: 出力されたspanner-export.jsonファイルからテーブル一覧を取得する
- load_bigquery: テーブル一覧からテーブルごとにAvroファイルをロードする
以下個別にstepの中身を確認していきます。
最初のstepでは以下の定数を定義しています。以降のstepの定義でこれら変数を参照することができます。
- 処理を実行するGCPプロジェクトID
- データ移動対象とするSpannerのInstanceID、DatabaseID、
- Avroファイルを保存するGCSのバケット
- Dataflowを起動するリージョン、Google提供のSpannerからGCSにAvroで保存するTemplateのパス
- BigQueryの保存先Dataset名、ロケーション
- ロードが完了したテーブル名の格納変数
main:
params: [args]
steps:
- init:
assign:
- PROJECT_ID: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- SPANNER_INSTANCE: xxxx
- SPANNER_DATABASE: xxxx
- GCS_BUCKET_BACKUP: ${PROJECT_ID+"-spanner-backup"}
- DATAFLOW_LOCATION: us-central1
- DATAFLOW_TEMPLATE: gs://dataflow-templates/2022-06-06-00_RC00/Cloud_Spanner_to_GCS_Avro
- BQ_LOCATION: US
- BQ_DATASET: xxxx
- succeededTables: []
Workflowsを実行しているGCPプロジェクトは環境変数として設定されているのでsys.get_env関数を使って取得します。 この例ではSpannerやDataflow、BigQueryを同じGCPプロジェクトで動かしています。
Avroを出力するGCSのバケットの定義では先に取得したGCPプロジェクト名を使って後ろに-spanner-backup
を付けています。
このバケットはテーブルのロード実行にあたりBigQueryのデータセットと同じリージョンに設定する必要があるので注意ください。
なおこのバケットはデータ移動に伴う一時的なファイル置き場なので、ライフサイクル設定で一定時間経過したファイルを削除するようにするとコスト削減になり良いと思われます。
最後に定義しているsucceededTables
は後のBigQueryのロードしたテーブル名を集めて出力するために参照するための変数です。
また定数のかわりに実行時変数を定義してWorkflowを起動時に指定することもできます。
params
で変数を定義しておけば、実行時にこの変数にアクセスしてパラメータやフロー制御に用いることができます。
2つめのstepは、過去のSpanner export実行済みのAvroファイルを再利用するように実行時変数でexport出力先ディレクトリが設定されていた場合に、Dataflow TemplateによるSpanner exportの実行をスキップしてAvroファイルのBigQueryへのテーブルロード処理を行うように分岐処理を定義したものです。
実行時に設定した変数はargsのプロパティとして格納されます。ここではexportDirectoryという実行時変数にSpanner exportファイルが出力されたディレクトリが指定されていた場合に、後続のDataflow TemplateのJob実行をスキップするようフロー定義を記述しています。
main:
params: [args]
steps:
...
- check_existing_export:
switch:
- condition: ${map.get(args, "exportDirectory")!=null}
steps:
- set_existing_export_directory:
assign:
- exportDirectory: ${args.exportDirectory}
- go_get_spanner_export:
next: get_spanner_export
3番目のstepではGoogle公式TemplateからDataflow Jobを起動してSpannerからデータをexportしてAvroファイルを指定したGCSパス配下に保存します。
ここではdataflow Jobを起動して完了を待つ処理をsubworkflowsとしてlaunch_dataflow_job_and_wait
という名前で定義してそれを呼び出しています。
処理の意味的にまとまったステップをsubworkflowsとして定義して呼び出すようにするとmainのstepsの見通しが良くなります。
main:
params: [args]
steps:
...
- export_spanner:
call: launch_dataflow_job_and_wait
args:
projectId: ${PROJECT_ID}
location: ${DATAFLOW_LOCATION}
template: ${DATAFLOW_TEMPLATE}
instance: ${SPANNER_INSTANCE}
database: ${SPANNER_DATABASE}
bucket: ${GCS_BUCKET_BACKUP}
result: launchResult
...
launch_dataflow_job_and_wait:
params: [projectId, location, template, instance, database, bucket]
steps:
- assing_wait_seconds:
assign:
- wait_seconds: 0
- launch_dataflow_job:
call: googleapis.dataflow.v1b3.projects.locations.templates.launch
args:
projectId: ${projectId}
location: ${location}
gcsPath: ${template}
body:
jobName: spanner-backup
parameters:
instanceId: ${instance}
databaseId: ${database}
spannerProjectId: ${projectId}
outputDir: ${"gs://"+bucket+"/"}
spannerPriority: LOW
shouldExportTimestampAsLogicalType: "true"
avroTempDirectory: ${"gs://"+bucket+"/temp/"}
validateOnly: false
result: launchResult
- wait_for_job_completion_1st:
call: sys.sleep
args:
seconds: 300
- get_dataflow_job:
call: googleapis.dataflow.v1b3.projects.locations.jobs.get
args:
jobId: ${launchResult.job.id}
location: ${location}
projectId: ${projectId}
result: jobResult
- check_dataflow_job_done:
switch:
- condition: ${jobResult.currentState=="JOB_STATE_DONE"}
steps:
- done:
return: ${launchResult}
- condition: ${jobResult.currentState=="JOB_STATE_FAILED"}
steps:
- failed:
raise: ${"Failed to launch dataflow job for spanner export"}
- condition: ${wait_seconds>3600}
steps:
- timeout:
raise: ${"Timeout dataflow job for spanner export"}
- wait_for_job_completion:
call: sys.sleep
args:
seconds: 20
- repeat_check_job:
assign:
- wait_seconds: ${wait_seconds + 20}
next: get_dataflow_job
launch_dataflow_job_and_wait
の定義ではparams
を定義して呼び出し元から引数として変数を渡せるようにしています。
ここではDataflowの実行リージョン、テンプレートのGCSパス、SpannerのインスタンスIDとデータベースID、Avroファイルを保存するバケット名を引数として定義しています。
subworkflowであるlaunch_dataflow_job_and_wait
のstepsも上から順次実行されます。
最初のstepではTimeout時に例外を発生させるために、DataflowのJobの実行時間を保持する変数を定義して0で初期化しています。
後のstepでこの変数に待ち時間を追加していき一定時間を超えたらTimeoutとして失敗させるようにしています。
2番目のstepではDataflow TemplateからDataflow Jobを起動するDataflow Template Launch APIを指定しています。
Jobのパラメータとして、読み込むSpannerのGCPプロジェクトID、インスタンスID、データベースIDを指定しています。
またAvroファイルを出力するGCSのパスを指定しており、これらの値は引数として受け取った変数を利用しています。
その他のパラメータとしてspannerPriority
やshouldExportTimestampAsLogicalType
を指定しています。
spannerPriority
はSpannerからのデータ取得にあたってクエリの優先度を設定するもので、Spannerインスタンスへの負荷の影響を極力小さくするためにLOWを指定しています。
shouldExportTimestampAsLogicalType
は、このTemplateはデフォルトではTimestamp型を文字列として出力するため、これをBigQueryがTimestamp型と認識できるようにtrueに設定しています(trueは文字列として指定)
result
にはcallで実行した結果がlaunchResult
という変数に格納することを指示しており、次以降のstepでこの変数を参照することができます。
このstepはDataflow Jobの完了を待つことなく終了し次のstepに推移します。Jobの完了を待つためにこの後のstepでこの変数を参照します。
3番目のstepではsys.sleep関数を使ってJob起動後に最初にJob完了を確認するまでの待ちを入れています。 取得対象のSpannerにデータ量の多いTableが存在する場合はJobが完了するのにまとまった時間が掛かるためJob完了のチェック回数を減らすためのもので、Jobが平均的に完了するのに要する秒数を指定します。
4番目のstepではDataflowのJobの実行状態などを含む情報を取得するDataflow Job Get APIを呼び出しており、Job起動時にレスポンスとして受け取ったJobIdを指定して結果はjobResultという変数に代入しています。
5番目のstepではjobResultの中身を確認してJobが完了していたらsubworkflowを完了して処理を抜け出すよう定義しています。
またDataflow Jobが失敗していた場合は例外を投げてworkflowの処理が失敗となるように定義しています。
先に定義した待ち時間保持用の変数wait_seconds
が一定時間(ここでは1時間を設定)を超えたらTimeoutとして例外を投げて処理を失敗させています。
Jobが終わっていなかった場合は次のstepに遷移してインターバルとして20秒待つ組み込み関数を実行しています。
20秒待ったあとは最後のsteprepeat_check_job
にて変数wait_seconds
の値を20増やしてget_dataflow_job
のstepに遷移して繰り返しJobの状態を確認するよう定義します。
この20秒という値は実際のJobの実行時間などを考慮して適宜変更ください。
5番目のstepでは前のstepで完了したDataflow JobによりSpannerからexportされたファイルが格納されたディレクトリをDataflowのJob情報から組み立てて変数として設定しています。
main:
params: [args]
steps:
...
- set_export_directory:
assign:
- exportDirectory: ${SPANNER_INSTANCE+"-"+SPANNER_DATABASE+"-"+launchResult.job.id}
5番目となるstepではDataflow Jobにより出力されたSpannerのexportファイルを確認してAvroファイルや対応するテーブル名の情報を取得します。
ここでもCloud StorageにあるファイルをダウンロードしてJSONとして取得する処理をsubworkflowとしてdownload_gcs_object_as_json
という名前で定義してそれを呼び出しています。
main:
params: [args]
steps:
...
- get_spanner_export:
call: download_gcs_object_as_json
args:
bucket: ${GCS_BUCKET_BACKUP}
object: ${exportDirectory+"%2F"+"spanner-export.json"}
result: spannerExport
...
download_gcs_object_as_json:
params: [bucket, object]
steps:
- get_object:
call: googleapis.storage.v1.objects.get
args:
bucket: ${bucket}
object: ${object}
result: objectInfo
- download_object:
call: http.request
args:
url: ${objectInfo.mediaLink}
method: GET
auth:
type: OAuth2
result: response
- as_json:
return: ${json.decode(response.body)}
download_gcs_object_as_json
では指定されたパスのGCSのObject情報を取得して、ファイルをダウンロード、JSONファイルにデコードという順で処理を行います。
Cloud_Spanner_to_GCS_AvroはSpannerからのexport内容を記載したファイルを{指定したGCSパス}/{spannerInstanceID}-{spannerDatabaseID}-{dataflowJobID}/spanner-export.json
に出力します。
このファイルを取得するためにまずCloud Storage Object Get APIでファイル内容を取得します。
引数として取得したいファイルのあるGCS bucketとobjectをパラメータとして指定します(objectはURLエンコーディングする必要があるのに注意)。
このAPIで取得した情報にはファイルの中身は含まないため、次のstepで取得したファイル情報からダウンロードリンクを取得してhttpリクエストでファイルを取得します。 その際にはworkflowsのサービスアカウントがファイルにアクセスできるようにauthでOAuth2を指定します。
httpリクエストで取得したJSONファイルの内容はbodyフィールドにバイト列として格納されているため最後のstepで組み込み関数を使ってJSONとしてデコードします。
6番目のstepでは前のstepで取得したAvroファイルや対応するテーブル名の情報を参照して各Avroファイルを対応するBigQueryのテーブルにロードしていきます。
ここでも指定したGCSのAvroファイルからBigQueryにテーブルロードして正常完了まで待つ処理をsubworkflowとしてload_bigquery_and_wait
という名前で定義してそれを呼び出しています。
先に取得したspanner-export.jsonファイルに記載のテーブル一覧からロード処理の定義を組み立ててsubworkflowを実行し、全ての処理が完了した後にロードしたテーブル名一覧をworkflowの最終結果として出力しています。
main:
params: [args]
steps:
...
- load_bigquery:
parallel:
shared: [succeededTables]
for:
value: table
in: ${spannerExport.tables}
steps:
- load_bigquery_table:
call: load_bigquery_and_wait
args:
projectId: ${PROJECT_ID}
location: ${BQ_LOCATION}
dataset: ${BQ_DATASET}
table: ${table.name}
sourceUri: ${"gs://"+GCS_BUCKET_BACKUP+"/"+exportDirectory+"/"+table.name+".avro*"}
result: bigqueryLoadJob
- add_succeeded_tables:
assign:
- succeededTables: ${list.concat(succeededTables, bigqueryLoadJob.configuration.load.destinationTable.tableId)}
- the_end:
return: ${succeededTables}
...
load_bigquery_and_wait:
params: [projectId, location, dataset, table, sourceUri]
steps:
- assing_wait_seconds:
assign:
- wait_seconds: 0
- load_bigquery_table:
call: googleapis.bigquery.v2.jobs.insert
args:
projectId: ${projectId}
body:
configuration:
load:
createDisposition: CREATE_IF_NEEDED
writeDisposition: WRITE_TRUNCATE
destinationTable:
projectId: ${projectId}
datasetId: ${dataset}
tableId: ${table}
sourceFormat: AVRO
useAvroLogicalTypes: true
sourceUris:
- ${sourceUri}
result: bigqueryLoadJob
- get_bigquery_job:
call: googleapis.bigquery.v2.jobs.get
args:
jobId: ${bigqueryLoadJob.jobReference.jobId}
location: ${location}
projectId: ${projectId}
result: jobResult
- check_bigquery_job_done:
switch:
- condition: ${jobResult.status.state=="DONE" AND map.get(jobResult.status, "errorResult")==null}
steps:
- succeeded:
return: ${jobResult}
- condition: ${jobResult.status.state=="DONE" AND map.get(jobResult.status, "errorResult")!=null AND map.get(jobResult.status.errorResult, "reason")=="backendError"}
steps:
- backenderror:
next: load_bigquery_table
- condition: ${jobResult.status.state=="DONE" AND map.get(jobResult.status, "errorResult")!=null}
steps:
- failed:
raise: ${"Failed to load table "+table+" errorResult "+jobResult.status.errorResult.message}
- condition: ${wait_seconds>3600}
steps:
- timeout:
raise: ${"Timeout to load table "+table}
- wait_for_job_completion:
call: sys.sleep
args:
seconds: 10
- repeat_check_job:
assign:
- wait_seconds: ${wait_seconds + 10}
next: get_bigquery_job
各テーブルのロード処理は依存関係が無いため並行に実行可能なので、parallel句を指定しています。
このparallel句配下のstepsで定義された処理がin
句で指定した配列ごとに並行に実行されます。
in
句で指定した配列の中身はvalue
句で定義した変数に格納されます。
先に取得したspanner-export.jsonファイルに記載のテーブル一覧をin
句に指定してテーブルごとにload_bigquery_and_wait
を並行実行します。
Spannerから出力されたAvroファイルは{指定したGCSパス}/{spannerInstanceID}-{spannerDatabaseID}-{dataflowJobID}/{tableName}.avro-xxxx
として保存されています。
このテーブル名からAvroファイルのパスを組み立ててAvroファイルから同名のテーブルをBigQueryにロードします。
load_bigquery_and_wait
では指定されたパスのGCSのAvroファイルから指定されたBigQueryのテーブルにデータをロード、正常完了を待つという順で処理を行います。
最初のstepassing_wait_seconds
ではlaunch_dataflow_job_and_wait
と同じようにテーブルのロードに時間が掛かりすぎた場合にTimeoutでWorkflowを失敗させるための処理時間を保持する変数wait_seconds
を定義しています。
2番目のstepload_bigquery_table
では指定されたGCSパスのAvroファイルからデータをテーブルをロードするBigQueryのJobを実行するBigQuery Jobs Insert APIを実行します。
ここではAvroのTimestampやDateなどの型をBigQueryで同じ型として取り込むためにuseAvroLogicalTypes
でtrueを指定します。
3番目のstepget_bigquery_job
ではBigQuery Jobs Get APIを使ってBigQueryのテーブルロードのJob情報を取得しています。
4番目のstepcheck_bigquery_job_done
では取得したJobの状態をチェックして、正常に完了するとJob情報をsubworkflowの呼び出し元に返して処理を抜けます。
テーブルロードのJobが失敗した場合は例外を発生させて処理を異常終了させますが、backendErrorが原因の場合はリトライで正常に処理できる場合が多いため、load_bigquery_table
に遷移してテーブルロード処理をやり直します。
待ち時間保持用の変数wait_seconds
が一定時間(ここでは1時間を設定)を超えたらTimeoutとして例外を投げて処理を失敗させています。
Jobが終わっていなかった場合は次のstepwait_for_job_completion
に遷移してインターバルとして10秒待つ組み込み関数を実行しています。
10秒待ったあとは最後のsteprepeat_check_job
にて変数wait_seconds
の値を10増やしてget_bigquery_job
のstepに遷移して繰り返しJobの状態を確認するよう定義します。
この10秒という値は実際のJobの実行時間などを考慮して適宜変更ください。
全てのテーブルロードのJobが完了すると、workflowは正常に処理を終了します。 (一つでもテーブルロードJobが失敗した場合はworkflow全体が失敗したとみなされます)
なおここではsubworkflowが処理を正常に終えた際に取得したJob情報からテーブル名を抽出して最初に定義したsucceededTables
変数に追加していき、最後にworkflowの結果として表示しています。
ここで定義したYAML定義からコンソール画面やgcloudコマンド、APIからWorkflowを作成します。 workflowのtriggerとしてCloud SchedulerのJobを作成・連携することができ、ここで定義した処理内容を定期実行するよう簡単に設定できます。
今回利用したWorkflowsは、タスクをPython等のプログラムで定義可能な既存のワークフローエンジンと比べて自由度は落ちるものの、フルマネージドで運用負担やコストが小さいため、ちょっとしたタスクに順序依存関係のあるワークフローを手軽に実行するのにとても便利だと思いました。 またWorkflowsではGCPサービスの操作は組み込み関数として多数提供されており、サービスアカウントを利用した認証も連携しやすく、GCPのサービスに関するワークフローを実行するのに特に便利だと感じました。 Cloud Composerを使うにはオーバースペックに思えるようなシンプルなワークフローであればWorkflowsへの置き換えを検討しても良いかもしれません。