Cloud Function で Bigqueryにデータ挿入

データフロー

Bigqueryへデータを転送するとき一度GCSにデータを置きます。理由はエラー発生時にデータ再転送しなくて済むのとBigqueryのデータ更新回数1日1000回の制限超過を防ぐためです。しかしGCSにデータを置いた場合GCSからBigqueryへデータ転送バッチを作成しなければならず面倒です。そこでCloud Functionを使用してGCSにデータを置いたら自動でBiguqeryへデータ転送するよう設定します。

データフロー

fluentd ---> GCS ---> Clound Function ----> Bigquery

(データ転送はfluentdを想定)

Cloud Functionのソースコード

Cloud FunctionのトリガーをCloud Strage、イベントをファイル作成時に設定します。バケットはデータ配置バケットに設定します。 注意しなければいけないのはCloud Functionでデータを転送した後もバケットにファイルが残ってしまうことです。Cloud FunctionのソースコードでBigqueryに転送できたファイルは削除するよう処理します。 ここでは、本番テーブルに直接ファイルを挿入しません。一旦tmpテーブルにデータを挿入します。tmpテーブルが作成されたらデータを加工して本番テーブルに加工データを挿入します。最後に転送済GCSデータの削除とtmpテーブルの削除を行います。

データフロー

fluentd ---> GCS ---> Clound Function ----> Bigquery(tmpテーブル) ---> ELT ----> Bigquery(本番テーブル)

ソースコードは以下のようになります。 データセットIDはhoge_dataset_idです。 テーブル名はバケットgs://backet_name/AAAA/BBBB/CCCC.gzならばCCCCがテーブル名です。

from google.cloud import bigquery, storage
def load_data(data, context):
    bucket_name = data['bucket']
    file_name = data['name']
    uri = 'gs://{}/{}'.format(bucket_name, file_name)
    tmp_dataset_id = 'tmp_hoge_dataset_id'
    tmp_table_id = f"tmp_{file_name.split('/')[-1].split('.')[0]}"
    bq_client = bigquery.Client()
    dataset_ref = bq_client.dataset(tmp_dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.field_delimiter = ','
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.write_disposition = 'WRITE_TRUNCATE'
    job_config.schema = [bigquery.SchemaField("schema", "STRING")] # スキーマ
    # tmpテーブル作成
    load_job = bq_client.load_table_from_uri(
        uri, dataset_ref.table(tmp_table_id), job_config=job_config
    )
    load_job.result()
    dataset_id = 'hoge_dataset_id'
    table_id = f"{file_name.split('/')[-1].split('.')[0]}"
    job_config = bigquery.QueryJobConfig()
    dst_table_ref = bq_client.dataset(dataset_id).table(table_id)
    job_config.destination = dst_table_ref
    job_config.write_disposition = "WRITE_APPEND"
    sql = f"""
    # ここにtmpテーブルの加工処理を書く
"""
    # tmpテーブル加工して本番テーブルに挿入
    job = bq_client.query(
        sql,
        location='US',
        job_config=job_config)
    job.result()
    # tmp テーブル削除
    table_ref = bq_client.dataset(tmp_dataset_id).table(tmp_table_id)
    tmp_table = bq_client.get_table(table_ref)
    bq_client.delete_table(tmp_table)
    # 転送したファイル削除
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(file_name)
    blob.delete()