テンプレートを使用してデータを処理する

Dataplex Universal Catalog では Dataflow を活用したテンプレートを用意しており、データの取り込み、処理、データ ライフサイクルの管理といった一般的なデータ処理タスクを実行できます。このガイドでは、データ処理テンプレートを構成して実行する方法について説明します。

始める前に

Dataplex Universal Catalog テンプレートは、Dataflow によって提供されます。テンプレートを使用する前に、Dataflow API を有効にします。

Dataflow API を有効にする

次の点にご注意ください。

  • すべてのテンプレートは、共通の Dataflow パイプライン オプションをサポートしています。

  • Dataplex Universal Catalog は、データ パイプラインを使用して、テンプレートで定義されたタスクのスケジュールを設定します。

  • Google Cloud コンソールの [Dataplex Universal Catalog] ページには、Dataplex Universal Catalog でスケジュールしたタスクのみが表示されます。

テンプレート: 元データをキュレートされたデータに変換する

Dataplex Universal Catalog ファイル形式変換テンプレートは、Dataplex Universal Catalog Cloud Storage アセットまたは CSV / JSON 形式で保存されている Dataplex Universal Catalog エンティティのリストのデータを、別の Dataplex アセットの Parquet、もしくは Avro 形式データに変換します。変換時にパーティション レイアウトは保持されます。また、出力ファイルの圧縮もサポートしています。

テンプレートのパラメータ

パラメータ 説明
inputAssetOrEntitiesList 入力ファイルを含む Dataplex Universal Catalog アセットまたは Dataplex Universal Catalog エンティティ。このパラメータは、projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> または projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/entities/<entity1-name>,projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/entities/<entity 2 name>... の形式で指定する必要があります。
outputFileFormat Cloud Storage の出力ファイル形式。このパラメータは、PARQUET または AVRO の形式で指定する必要があります。
outputAsset 出力ファイルが保存される Cloud Storage バケットを含む Dataplex Universal Catalog アセットの名前。このパラメータは、projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> の形式で指定する必要があります。outputAsset は、 Google Cloud コンソールの Dataplex Universal Catalog アセットの [Details] タブで確認できます。
outputFileCompression 省略可: 出力ファイル圧縮。このパラメータのデフォルト値は SNAPPY です。パラメータの他の値は UNCOMPRESSEDSNAPPYGZIPBZIP2 のいずれかにできます。BZIP2PARQUET ファイルではサポートされていません。
writeDisposition 省略可: 宛先ファイルがすでに存在する場合に実施するアクションを指定します。このパラメータのデフォルト値は SKIP です。これは、宛先ディレクトリに存在しないファイルのみを処理するよう指示します。パラメータのほかの値としては、OVERWRITE(既存のファイルをすべて上書きする)または FAIL(少なくとも 1 つの宛先ファイルがすでに存在する場合は何も処理せず、エラーを生成する)があります。
updateDataplexMetadata

省略可: 新しく作成されたエンティティの Dataplex Universal Catalog メタデータを更新するかどうか。このパラメータのデフォルト値は false です。

有効にすると、パイプラインによりスキーマがソースからコピー先の Dataplex エンティティに自動的にコピーされ、自動化された Dataplex Universal Catalog Discovery は Dataplex エンティティに対して実行されません。このフラグは、ソース(未加工)データのスキーマが Dataplex によって管理されている場合に使用します。

テンプレートを実行する

コンソール

  1. Google Cloud コンソールで、Dataplex Universal Catalog の [処理] ページに移動します。

    [プロセス] に移動

  2. [タスクを作成] をクリックします。

  3. [キュレートされた形式に変換する] で、[タスクを作成] をクリックします。

  4. Dataplex Universal Catalog レイクを選択します。

  5. タスク名を指定します。

  6. タスクの実行に使用するリージョンを選択します。

  7. 必要なパラメータを入力します。

  8. [続行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--region=REGION_NAME \
--template-file-gcs-location=gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_File_Format_Conversion_Preview \
--parameters \
inputAssetOrEntitiesList=INPUT_ASSET_OR_ENTITIES_LIST,\
outputFileFormat=OUTPUT_FILE_FORMAT,\
outputAsset=OUTPUT_ASSET

次のように置き換えます。

JOB_NAME: a job name of your choice
PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
INPUT_ASSET_OR_ENTITIES_LIST: path to your JDBC drivers
OUTPUT_FILE_FORMAT: your output file format in Cloud Storage
OUTPUT_ASSET: your Dataplex Universal Catalog output asset ID

REST

HTTP POST リクエストの送信:

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION_NAME/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
        "inputAssetOrEntitiesList": "INPUT_ASSET_OR_ENTITIES_LIST",
        "outputFileFormat": "OUTPUT_FILE_FORMAT",
        "outputAsset": "OUTPUT_ASSET",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_File_Format_Conversion_Preview",
 }
}

次のように置き換えます。

PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
JOB_NAME: a job name of your choice
INPUT_ASSET_OR_ENTITIES_LIST: path to your JDBC drivers
OUTPUT_FILE_FORMAT: your output file format in Cloud Storage
OUTPUT_ASSET: your Dataplex Universal Catalog output asset ID

テンプレート: BigQuery アセットから Cloud Storage アセットにデータを階層化する

Dataplex Universal Catalog BigQuery to Cloud Storage テンプレートは、Dataplex Universal Catalog 互換のレイアウトと形式で、Dataplex Universal Catalog BigQuery アセットから Dataplex Universal Catalog Cloud Storage アセットにデータをコピーします。コピーする BigQuery データセットまたは BigQuery テーブルのリストを指定できます。柔軟性を高めるために、このテンプレートでは、指定した変更日より古いデータをコピーできます。また、コピーが正常に完了した後に BigQuery からデータを削除することもできます。

パーティション分割テーブルを BigQuery から Cloud Storage にコピーする場合:

  • このテンプレートにより、Cloud Storage バケットに Hive スタイルのパーティションが作成されます。BigQuery では、Hive スタイルのパーティション キーを既存の列と同じにすることはできません。オプション enforceSamePartitionKey を使用すると、新しいパーティション キーを作成するか、同じパーティション キーを保持して既存の列の名前を変更できます。
  • Dataplex Universal Catalog Discovery は、BigQuery テーブル(および Dataproc Metastore のテーブル)を作成するときに、パーティション タイプを string として登録します。これにより、既存のパーティション フィルタに影響する可能性があります。

1 回のテンプレート実行で変換できるテーブルとパーティションの数には上限があります(約 300 個)。正確な数は、テーブル名の長さなどの要因によって異なります。

テンプレートのパラメータ

パラメータ 説明
sourceBigQueryDataset データを階層化する BigQuery データセット。このパラメータには、projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> 形式の Dataplex Universal Catalog アセット名、または projects/<name>/datasets/<dataset-id> 形式の BigQuery データセット ID を含める必要があります。
destinationStorageBucketAssetName データを階層化する Cloud Storage バケットの Dataplex Universal Catalog アセット名。このパラメータは、projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> の形式である必要があります。
tables 省略可: 階層化する BigQuery テーブルのカンマ区切りのリスト。リストを指定しない場合、すべてのテーブルが階層化されます。テーブルは名前のみで指定する必要があり(プロジェクトまたはデータセットの接頭辞なし)、大文字と小文字が区別されます。
exportDataModifiedBeforeDateTime 省略可: このパラメータを使用して、この日付(およびオプションの時刻)より古いデータを移動します。BigQuery のパーティション分割テーブルの場合、この日時より前に最終更新されたパーティションを移動します。パーティション分割されていないテーブルの場合、テーブルの最終更新がこの日時より前の場合は移動します。指定しない場合、すべてのテーブル / パーティションを移動します。デフォルトでは、日付と時刻はデフォルトのタイムゾーンで解析されますが、オプションのサフィックス Z+HH:mm がサポートされています。このパラメータは、YYYY-MM-DDYYYY-MM-DDTHH:mm:ssYYYY-MM-DDTHH:mm:ss+03:00 の形式である必要があります。相対的な日時もサポートされており、-PnDTnHnMn.nS の形式である必要があります(-P で始まる必要があります。これは過去の時刻を示します)。
fileFormat 省略可: Cloud Storage の出力ファイル形式。このパラメータのデフォルト値は PARQUET です。パラメータの他の値は AVRO です。
fileCompression 省略可: 出力ファイル圧縮。このパラメータのデフォルト値は SNAPPY です。パラメータの他の値は UNCOMPRESSEDSNAPPYGZIPBZIP2 のいずれかにできます。BZIP2PARQUET ファイルではサポートされていません。
deleteSourceData 省略可: エクスポートが正常に完了した後に BigQuery からソースデータを削除するかどうか。値は true または false のいずれかです。このパラメータのデフォルト値は false です。
partitionIdRegExp 省略可: この正規表現に一致するパーティション ID を持つパーティションのみを処理します。値が指定されていない場合、このパラメータはデフォルトですべてを処理します。
writeDisposition 省略可: 宛先ファイルがすでに存在する場合(1 つ以上のテーブル / パーティションがすでに事前階層化されている場合)に実施するアクションを指定します。このパラメータのデフォルト値は SKIP です。これは、まだ事前階層化されていないテーブル / パーティションのみを処理するよう指示します。パラメータのほかの値としては、OVERWRITE(既存のファイルをすべて上書きする)または FAIL(少なくとも 1 つの宛先ファイルがすでに存在する場合は何も処理せず、エラーを生成する)があります。
enforceSamePartitionKey

省略可: 同じパーティション キーを適用するかどうか。BigQuery の制限により、外部のパーティション分割テーブルのパーティション キー(ファイルパス内)はファイル内の列のいずれかと同じ名前にすることはできません。このパラメータが true(デフォルト値)の場合、ターゲット ファイルのパーティション キーは元のパーティション列名に設定され、ファイルの列は名前が変更されます。false の場合、パーティション キーの名前が変更されます。

たとえば、元のテーブルが TS および enforceSamePartitionKey=true という名前の列でパーティション分割されている場合、宛先ファイルのパスは gs://<bucket>/TS=<partition ID>/<file> になり、ファイル内の列は TS_pkey に名前が変更されます。これにより、既存のクエリを古いテーブルまたは新しいテーブルの同じパーティションに対して実行できます。

enforceSamePartitionKey=false の場合、宛先ファイルのパスは gs://<bucket>/TS_pid=<partition ID>/<file> ですが、列名はファイル内の TS のままになります。

updateDataplexMetadata

省略可: 新しく作成されたエンティティの Dataplex Universal Catalog メタデータを更新するかどうか。このパラメータのデフォルト値は false です。

有効にすると、パイプラインによりスキーマがソースからコピー先の Dataplex エンティティに自動的にコピーされ、自動化された Dataplex Universal Catalog Discovery は Dataplex エンティティに対して実行されません。このフラグは、ソース BigQuery テーブルのスキーマを管理する場合に使用します。

テンプレートを実行する

コンソール

  1. Google Cloud コンソールで、Dataplex Universal Catalog の [処理] ページに移動します。

    [プロセス] に移動

  2. [タスクを作成] をクリックします。

  3. [BQ アセットから GCS アセットに階層化する] で、[タスクを作成] をクリックします。

  4. Dataplex Universal Catalog レイクを選択します。

  5. タスク名を指定します。

  6. タスクの実行に使用するリージョンを選択します。

  7. 必要なパラメータを入力します。

  8. [続行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--region=REGION_NAME \
--template-file-gcs-location=gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_BigQuery_to_GCS_Preview \
--parameters \
sourceBigQueryDataset=SOURCE_ASSET_NAME_OR_DATASET_ID,\
destinationStorageBucketAssetName=DESTINATION_ASSET_NAME

次のように置き換えます。

JOB_NAME: a job name of your choice
PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
SOURCE_ASSET_NAME_OR_DATASET_ID: your Dataplex Universal Catalog asset
name for the source BigQuery dataset, or the dataset ID
DESTINATION_ASSET_NAME: your Dataplex Universal Catalog asset name for
the destination Cloud Storage bucket

REST

HTTP POST リクエストの送信:

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION_NAME/flexTemplates:launch
{
 "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
        "sourceBigQueryDataset": "SOURCE_ASSET_NAME_OR_DATASET_ID",
        "destinationStorageBucketAssetName": "DESTINATION_ASSET_NAME",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_BigQuery_to_GCS_Preview",
 }
}

次のように置き換えます。

PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
JOB_NAME: a job name of your choice
SOURCE_ASSET_NAME_OR_DATASET_ID: your Dataplex Universal Catalog asset
name for the source BigQuery dataset, or the dataset ID
DESTINATION_ASSET_NAME: your Dataplex Universal Catalog asset name for
the destination Cloud Storage bucket
REGION_NAME: region in which to run the job

他の Google Cloud提供の Dataflow テンプレートまたはカスタム Dataflow テンプレートをスケジュール設定する

Dataplex Universal Catalog を使用すると、Google Cloud提供の Dataflow テンプレートまたはカスタム Dataflow テンプレートをコンソールでスケジュール設定してモニタリングできます。

スケジュール

コンソール

  1. Google Cloud コンソールで、Dataplex Universal Catalog の [処理] ページに移動します。

    [プロセス] に移動

  2. [タスクを作成] をクリックします。

  3. [Dataflow パイプラインの作成] で、[Dataflow パイプラインを作成] をクリックします。

  4. Dataplex Universal Catalog レイクを選択します。

  5. タスク名を指定します。

  6. タスクを実行するリージョンを選択します。

  7. Dataflow テンプレートを選択します。

  8. 必要なパラメータを入力します。

  9. [続行] をクリックします。

モニタリング

コンソール

  1. Google Cloud コンソールで、Dataplex Universal Catalog の [処理] ページに移動します。

    [プロセス] に移動

  2. [Dataflow パイプライン] をクリックします。

  3. レイク名またはパイプライン名でフィルタします。