データフロー
Bigqueryへデータを転送するとき一度GCSにデータを置きます。理由はエラー発生時にデータ再転送しなくて済むのとBigqueryのデータ更新回数1日1000回の制限超過を防ぐためです。しかしGCSにデータを置いた場合GCSからBigqueryへデータ転送バッチを作成しなければならず面倒です。そこでCloud Functionを使用してGCSにデータを置いたら自動でBiguqeryへデータ転送するよう設定します。
データフロー
fluentd ---> GCS ---> Clound Function ----> Bigquery
(データ転送はfluentdを想定)
Cloud Functionのソースコード
Cloud FunctionのトリガーをCloud Strage
、イベントをファイル作成時
に設定します。バケットはデータ配置バケットに設定します。
注意しなければいけないのはCloud Functionでデータを転送した後もバケットにファイルが残ってしまうことです。Cloud FunctionのソースコードでBigqueryに転送できたファイルは削除するよう処理します。
ここでは、本番テーブルに直接ファイルを挿入しません。一旦tmpテーブルにデータを挿入します。tmpテーブルが作成されたらデータを加工して本番テーブルに加工データを挿入します。最後に転送済GCSデータの削除とtmpテーブルの削除を行います。
データフロー
fluentd ---> GCS ---> Clound Function ----> Bigquery(tmpテーブル) ---> ELT ----> Bigquery(本番テーブル)
ソースコードは以下のようになります。
データセットIDはhoge_dataset_id
です。
テーブル名はバケットがgs://backet_name/AAAA/BBBB/CCCC.gz
ならばCCCC
がテーブル名です。
from google.cloud import bigquery, storage def load_data(data, context): bucket_name = data['bucket'] file_name = data['name'] uri = 'gs://{}/{}'.format(bucket_name, file_name) tmp_dataset_id = 'tmp_hoge_dataset_id' tmp_table_id = f"tmp_{file_name.split('/')[-1].split('.')[0]}" bq_client = bigquery.Client() dataset_ref = bq_client.dataset(tmp_dataset_id) job_config = bigquery.LoadJobConfig() job_config.field_delimiter = ',' job_config.source_format = bigquery.SourceFormat.CSV job_config.write_disposition = 'WRITE_TRUNCATE' job_config.schema = [bigquery.SchemaField("schema", "STRING")] # スキーマ # tmpテーブル作成 load_job = bq_client.load_table_from_uri( uri, dataset_ref.table(tmp_table_id), job_config=job_config ) load_job.result() dataset_id = 'hoge_dataset_id' table_id = f"{file_name.split('/')[-1].split('.')[0]}" job_config = bigquery.QueryJobConfig() dst_table_ref = bq_client.dataset(dataset_id).table(table_id) job_config.destination = dst_table_ref job_config.write_disposition = "WRITE_APPEND" sql = f""" # ここにtmpテーブルの加工処理を書く """ # tmpテーブル加工して本番テーブルに挿入 job = bq_client.query( sql, location='US', job_config=job_config) job.result() # tmp テーブル削除 table_ref = bq_client.dataset(tmp_dataset_id).table(tmp_table_id) tmp_table = bq_client.get_table(table_ref) bq_client.delete_table(tmp_table) # 転送したファイル削除 storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(file_name) blob.delete()