Dataflow ジョブビルダーのブループリントを使用すると、クラウドベースのストレージ(Cloud Storage または Amazon S3)から Lakehouse の Apache Iceberg テーブルに既存の Apache Parquet ファイルを追加できます。
このプロセスでは、IcebergAddFiles 変換を使用します。Parquet ファイルが Cloud Storage にある場合、この変換では、基盤となるデータを移動または書き換えることなく、Lakehouse にファイルを登録します。ファイルが Amazon S3 などの外部ストレージ システムにある場合は、Lakehouse を介して高速にクエリを実行できるように Cloud Storage にコピーされてから登録されます。
次の接続の詳細を使用して、クラウドベースのストレージから Lakehouse の Apache Iceberg テーブルに Parquet ファイルを追加します。
始める前に
Dataflow API、BigQuery API、Lakehouse API を有効にします。
リソースの作成に必要な権限を取得するには、プロジェクトに必要な Identity and Access Management(IAM)ロールを付与するよう管理者に依頼してください。
データをインポートする Lakehouse for Apache Iceberg カタログ、Namespace、テーブルを作成します。
クラウドベースのストレージ バケット(Cloud Storage または Amazon S3)を作成し、Parquet ファイルをバケットにアップロードします。
使用しているクラウドベースのストレージ バケットが Google の Cloud Storage ではない場合は、ジョブのエラーログを保存する Cloud Storage バケットを作成します。
サポートと制限事項
Dataflow を使用してクラウドベースのストレージにある Parquet ファイルを Lakehouse for Apache Iceberg にインポートする場合、次の制限があります。
- ソースデータは Apache Parquet 形式で、Cloud Storage または Amazon S3 に保存されている必要があります。
- この機能はバッチ パイプラインのみをサポートしています。
Parquet ファイルを Lakehouse にインポートする
次の手順に沿って、Dataflow ジョブビルダー UI を使用して、クラウドベースのストレージから Lakehouse の Iceberg テーブルに Parquet ファイルをインポートします。
Google Cloud コンソールで、[Lakehouse for Apache Iceberg] ページに移動します。
データをインポートするカタログ、名前空間、テーブルを選択します。
[テーブルの詳細] ページで、 [テーブルをインポート] をクリックします。
[インポート構成] ダイアログで、[Apache Parquet ファイルから Lakehouse にテーブルをインポート(バッチ)] を選択します。
Dataflow の [ジョブビルダー] ページが開きます。
[ソース] セクションで、次の操作を行います。
すでに作成されている CreateGlobalInput ソース エントリを開きます。
[YAML ソース構成] エディタ セクションで、
elementsシーケンスに Parquet ファイルの 1 つ以上のパスを入力します。インポートの効率を高めるには、多数のファイルを登録する際に、複数のファイルセット(glob)を指定します。次に例を示します。
reshuffle: true elements: - gs://BUCKET_NAME/restaurant-data/2023/*.parquet - gs://BUCKET_NAME/restaurant-data/2024/*.parquet[完了] をクリックします。
[変換] セクションで、次の操作を行います。
[IcebergAddFiles] 変換セクションをクリックして開きます。
[Iceberg table] フィールドに、Namespace とテーブル名を入力します。例: NAMESPACE .TABLE_NAME
[カタログ プロパティ] で、次の項目を構成します。
warehouse: カタログの Cloud Storage のロケーション。例:
gs://CATALOG_PATHheader.x-goog-user-project: Google Cloud プロジェクト ID: PROJECT_ID。
[完了] をクリックします。
[シンク] セクションで、次の操作を行います。
[Write results] シンクをクリックして開きます。
[JSON の場所] フィールドで、エラー結果を書き込む Cloud Storage の場所とファイル名を指定します。次に例を示します。
gs://BUCKET_NAME/errors/errors.json[完了] をクリックします。
[Dataflow オプション] セクションで、[ジョブを実行] をクリックします。
Parquet ファイルの登録に使用される Dataflow パイプラインをさらにカスタマイズする必要がある場合は、ジョブビルダー フォームまたは YAML エディタを使用してカスタマイズできます。
ジョブの出力を調べる
ジョブが完了したら、BigQuery でクエリを実行して、データが Iceberg テーブルに登録されたことを確認できます。
Dataflow ジョブリストで、ジョブのステータスが [成功] であることを確認します。
ジョブが失敗した場合やエラーが発生した場合は、Cloud Storage の JSON エラーログ ファイルで詳細を確認します。
Google Cloud コンソールで、BigQuery Studio ページに移動します。
クエリエディタで、テーブルを検査する SQL クエリを入力します。
PROJECT_ID.CATALOG>NAMESPACE.TABLE_NAME規約を使用してクエリできます。SELECT * FROM `PROJECT_ID.CATALOG>NAMESPACE.TABLE_NAME` LIMIT 10(実行)をクリックします。
クエリ結果を確認して、データが正しく処理されたことを確認します。
次のステップ
- 詳細については、Lakehouse ランタイム カタログについてをご覧ください。
- 詳細については、Dataflow ジョブビルダー UI の概要をご覧ください。