Cloud Function で Bigqueryにデータ挿入
データフロー
Bigqueryへデータを転送するとき一度GCSにデータを置きます。理由はエラー発生時にデータ再転送しなくて済むのとBigqueryのデータ更新回数1日1000回の制限超過を防ぐためです。しかしGCSにデータを置いた場合GCSからBigqueryへデータ転送バッチを作成しなければならず面倒です。そこでCloud Functionを使用してGCSにデータを置いたら自動でBiguqeryへデータ転送するよう設定します。
データフロー
fluentd ---> GCS ---> Clound Function ----> Bigquery
(データ転送はfluentdを想定)
Cloud Functionのソースコード
Cloud FunctionのトリガーをCloud Strage
、イベントをファイル作成時
に設定します。バケットはデータ配置バケットに設定します。
注意しなければいけないのはCloud Functionでデータを転送した後もバケットにファイルが残ってしまうことです。Cloud FunctionのソースコードでBigqueryに転送できたファイルは削除するよう処理します。
ここでは、本番テーブルに直接ファイルを挿入しません。一旦tmpテーブルにデータを挿入します。tmpテーブルが作成されたらデータを加工して本番テーブルに加工データを挿入します。最後に転送済GCSデータの削除とtmpテーブルの削除を行います。
データフロー
fluentd ---> GCS ---> Clound Function ----> Bigquery(tmpテーブル) ---> ELT ----> Bigquery(本番テーブル)
ソースコードは以下のようになります。
データセットIDはhoge_dataset_id
です。
テーブル名はバケットがgs://backet_name/AAAA/BBBB/CCCC.gz
ならばCCCC
がテーブル名です。
from google.cloud import bigquery, storage def load_data(data, context): bucket_name = data['bucket'] file_name = data['name'] uri = 'gs://{}/{}'.format(bucket_name, file_name) tmp_dataset_id = 'tmp_hoge_dataset_id' tmp_table_id = f"tmp_{file_name.split('/')[-1].split('.')[0]}" bq_client = bigquery.Client() dataset_ref = bq_client.dataset(tmp_dataset_id) job_config = bigquery.LoadJobConfig() job_config.field_delimiter = ',' job_config.source_format = bigquery.SourceFormat.CSV job_config.write_disposition = 'WRITE_TRUNCATE' job_config.schema = [bigquery.SchemaField("schema", "STRING")] # スキーマ # tmpテーブル作成 load_job = bq_client.load_table_from_uri( uri, dataset_ref.table(tmp_table_id), job_config=job_config ) load_job.result() dataset_id = 'hoge_dataset_id' table_id = f"{file_name.split('/')[-1].split('.')[0]}" job_config = bigquery.QueryJobConfig() dst_table_ref = bq_client.dataset(dataset_id).table(table_id) job_config.destination = dst_table_ref job_config.write_disposition = "WRITE_APPEND" sql = f""" # ここにtmpテーブルの加工処理を書く """ # tmpテーブル加工して本番テーブルに挿入 job = bq_client.query( sql, location='US', job_config=job_config) job.result() # tmp テーブル削除 table_ref = bq_client.dataset(tmp_dataset_id).table(tmp_table_id) tmp_table = bq_client.get_table(table_ref) bq_client.delete_table(tmp_table) # 転送したファイル削除 storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(file_name) blob.delete()
BigqueryのELTがめちゃくちゃ便利だった
ELTとは
ELTとはExtract/Load/Transformの頭文字を取っています。
(データ収集/データ転送/データ加工)
pythonで機械学習をしたい時に大容量データを扱うことがあります。このデータを使用するには加工が必須になりますが、自分の環境上で大規模データを加工しようとすると「加工処理が終わらない」なんてことがしばしばあります。C言語でデータ加工をしても良いですが、学習コストが高い & 複雑な処理のコードを書くのに多くの時間が取られてしまう可能性があります。そこで、時間がかかる加工処理を自分の環境上で行うのではなくクラウド上でデータを加工する方法を試してみました。
ちなみに加工後に転送はETLと言います。ELTとETLのT(加工)とL(転送)の順番が違います。
- ELTとは (大規模データ加工でおすすめ)
データ収集 --> クラウド上にデータ転送 ---> クラウド上でデータ加工
- ETLとは (従来の手法)
データ収集 ---> ローカルでデータ加工 ---> クラウド上にデータ転送
GCPのBigquery
GCPのBigQueryで 大規模データを加工を試みます。MySQLとBiguqeryのUDFでデータ加工をします。MySQLとBiguqeryのUDFは学習コストかなり低いです。
UDFでカテゴリ関数を作成し、カテゴリ変数を0か1で返す例
CREATE TEMP FUNCTION category(x STRING, y STRING)
RETURNS INT64
LANGUAGE js AS """
if (x == y) {
return 1;
} else {
return 0;
}
""";
select
r.dataid
, category(r.nj, '初日') AS nj_1
, category(r.nj, '2日目') AS nj_2
, category(r.nj, '3日目') AS nj_3
, category(r.nj, '4日目') AS nj_4
, category(r.nj, '5日目') AS nj_5
, category(r.nj, '6日目') AS nj_6
, category(r.nj, '最終日') AS nj_7
全体で約5GBを処理してみましたが、5分程度で処理が終了しました。ここの時間はデータ量とUDFが複雑度に依存しそう。
ローカルでは何百倍も処理時間がかかったので大規模なデータはBigQueryで処理した方が良いかもしれないです。
DataFlowのETLでも良いのでは?
ETLとELTの使い分けは必要かなと思います。「生ログがネストしている」、「欠損を表現する文字がN/Aだったり空白だったりバラバラ」な場合はELTを採用してます。
FaceNet Triplet loss メモ (途中)
サンプルの組みごとにlossを計算する。
GoogleのFaceNetをベースにした
GitHub - davidsandberg/facenet: Face recognition using Tensorflow
で書かれているTriplet lossを確認してみた。
def triplet_loss(anchor, positive, negative, alpha): """Calculate the triplet loss according to the FaceNet paper Args: anchor: the embeddings for the anchor images. positive: the embeddings for the positive images. negative: the embeddings for the negative images. Returns: the triplet loss according to the FaceNet paper as a float tensor. """ with tf.variable_scope('triplet_loss'): pos_dist = tf.reduce_sum(tf.square(tf.subtract(anchor, positive)), 1) neg_dist = tf.reduce_sum(tf.square(tf.subtract(anchor, negative)), 1) basic_loss = tf.add(tf.subtract(pos_dist,neg_dist), alpha) loss = tf.reduce_mean(tf.maximum(basic_loss, 0.0), 0) return loss
https://github.com/davidsandberg/facenet/blob/master/src/facenet.py#L44を参照
論文ではlossの計算でbasic_lossの総和を取っていたがここでは平均になっている。しかし、sumでもmeanでも結果は変わらないと思う。
最後のtf.maximumではbasic_lossか0を取っていた。basic_lossが下がりすぎると過学習するので、0を設定していると思う。
これにより、他のサンプルの組のbasic_lossを下げようとするため汎用的なmodelが作成できるはず(予想)。
参考
https://arxiv.org/pdf/1503.03832.pdf FaceNet 論文
future-architect.github.io
github.com
検証データを含めないでTarget encoding
注意してtarget encodingしないと、leakageが発生するらしい。
よくやるのが、検証データを含めてtarget encodingをしてしまうこと。
この状態でデータを学習してしまうと、leakageを起こす。
何故ならば、target encodingした特徴量に検証データ(未来のデータ)が含まれているためである。
そのため、訓練データと検証データを分離してからtarget encodingを行う必要がある。
sklearn.model_selectionのKFoldを使用して、target encodingしてみた。
平均を求めるとき、検証データを使用しないように気をつける。
参考