Cloud Storage Text to BigQuery (Stream) with Python UDF テンプレート

Cloud Storage Text to BigQuery パイプラインは、Cloud Storage に格納されているテキスト ファイルをストリーミングして、指定された Python ユーザー定義関数(UDF)を使用して変換し、結果を BigQuery に追加するストリーミング パイプラインです。

パイプラインは無期限に実行され、ドレインではなくキャンセルによって手動で終了させる必要があります。これは、分割可能な DoFn で、ドレインをサポートしていない Watch 変換を使用しているためです。

パイプラインの要件

  • BigQuery の出力テーブルのスキーマを記述する JSON ファイルを作成します。

    fields というタイトルになっているトップレベルの JSON 配列があり、その内容が {"name": "COLUMN_NAME", "type": "DATA_TYPE"} のパターンに従っていることを確認します。次に例を示します。

    {
      "fields": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        }
      ]
    }
  • Python(.py)ファイルを作成し、このファイルに、テキスト行の変換ロジックを提供する UDF 関数を記述します。この関数は JSON 文字列を返します。

    次の例では、CSV ファイルの各行を分割し、値を含む JSON オブジェクトを作成して、JSON 文字列を返します。

    import json
    def process(value):
      data = value.split(',')
      obj = { 'name': data[0], 'age': int(data[1]) }
      return json.dumps(obj)

テンプレートのパラメータ

必須パラメータ

  • inputFilePattern: 処理の対象となる Cloud Storage 内のテキストへの gs:// パス。例: gs://your-bucket/your-file.txt
  • JSONPath: Cloud Storage に格納されている BigQuery スキーマを定義する JSON ファイルへの gs:// パス。例: gs://your-bucket/your-schema.json
  • outputTable: 処理されたデータを保存するために使用する BigQuery テーブルの場所。既存のテーブルは、再利用すると上書きされます。例: <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
  • bigQueryLoadingTemporaryDirectory: BigQuery 読み込みプロセスで使用する一時ディレクトリ。例: gs://your-bucket/your-files/temp-dir

オプション パラメータ

  • outputDeadletterTable: 出力テーブルに到達できなかったメッセージが記載されたテーブル。テーブルが存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は <outputTableSpec>_error_records が使用されます。例: <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
  • useStorageWriteApiAtLeastOnce: このパラメータは、Use BigQuery Storage Write API が有効になっている場合にのみ有効になります。有効になっている場合は、Storage Write API に「1 回以上」のセマンティクスが使用され、有効でなければ「正確に 1 回」のセマンティクスが使用されます。デフォルトは false です。
  • useStorageWriteApi: true の場合、パイプラインでは BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)を使用します。デフォルト値は false です。詳細については、Storage Write API の使用(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)をご覧ください。
  • numStorageWriteApiStreams: Storage Write API を使用する場合は、書き込みストリームの数を指定します。useStorageWriteApitrue であり、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。デフォルト値は 0 です。
  • storageWriteApiTriggeringFrequencySec: Storage Write API を使用する場合は、トリガーの頻度を秒単位で指定します。useStorageWriteApitrue であり、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。
  • pythonExternalTextTransformGcsPath: ユーザー定義関数を含む Python コードの Cloud Storage パスパターン。例: gs://your-bucket/your-function.py
  • pythonExternalTextTransformFunctionName: Python ファイルから呼び出す関数の名前。使用できるのは英字、数字、アンダースコアのみです例: 'transform' or 'transform_udf1'

ユーザー定義関数

このテンプレートには、パイプライン要件で説明されているように、入力ファイルを解析する UDF が必要です。このテンプレートでは、各入力ファイルのテキストごとに UDF を呼び出します。UDF の作成の詳細については、Dataflow テンプレートにユーザー定義関数を作成するをご覧ください。

関数の仕様

UDF の仕様は次のとおりです。

  • 入力: 入力ファイルの 1 行のテキスト。
  • 出力: BigQuery 宛先テーブルのスキーマに一致する JSON 文字列。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Cloud Storage Text to BigQuery (Stream) with Python UDF template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Xlang \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
pythonExternalTextTransformGcsPath=PATH_TO_PYTHON_UDF_FILE,\
pythonExternalTextTransformFunctionName=PYTHON_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内の対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • STAGING_LOCATION: ローカル ファイルをステージングする場所(例: gs://your-bucket/staging
  • PYTHON_FUNCTION: 使用する Python ユーザー定義関数(UDF)の名前。
  • PATH_TO_BIGQUERY_SCHEMA_JSON: スキーマ定義を含む JSON ファイルへの Cloud Storage パス
  • PATH_TO_PYTHON_UDF_FILE: 使用するユーザー定義関数(UDF)を定義する Python コードファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.py
  • PATH_TO_TEXT_DATA: テキスト データセットの Cloud Storage パス
  • BIGQUERY_TABLE: BigQuery テーブル名
  • BIGQUERY_UNPROCESSED_TABLE: 未処理のメッセージ用の BigQuery テーブルの名前
  • PATH_TO_TEMP_DIR_ON_GCS: 一時ディレクトリへの Cloud Storage パス

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
       "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Xlang",
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内の対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • STAGING_LOCATION: ローカル ファイルをステージングする場所(例: gs://your-bucket/staging
  • PYTHON_FUNCTION: 使用する Python ユーザー定義関数(UDF)の名前。
  • PATH_TO_BIGQUERY_SCHEMA_JSON: スキーマ定義を含む JSON ファイルへの Cloud Storage パス
  • PATH_TO_PYTHON_UDF_FILE: 使用するユーザー定義関数(UDF)を定義する Python コードファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.py
  • PATH_TO_TEXT_DATA: テキスト データセットの Cloud Storage パス
  • BIGQUERY_TABLE: BigQuery テーブル名
  • BIGQUERY_UNPROCESSED_TABLE: 未処理のメッセージ用の BigQuery テーブルの名前
  • PATH_TO_TEMP_DIR_ON_GCS: 一時ディレクトリへの Cloud Storage パス

次のステップ