BigQuery テーブルのクエリ

このドキュメントでは、Managed Service for Apache Spark ワークロードで Spark SQL と Spark DataFrame API を使用して BigQuery テーブルにクエリを実行する方法について説明します。

始める前に

API を有効にして、必要に応じて Identity and Access Management ロールを付与します。

API を有効にする

  1. Google Cloud アカウントにログインします。 Google Cloudを初めて使用する場合は、 アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc and BigQuery APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc and BigQuery APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

Identity and Access Management のロールを付与する

このページの例を実行するには、Managed Service for Apache Spark と BigQuery のロール付与が必要です。組織のポリシーによっては、これらのロールがすでに付与されている場合があります。ロール付与を確認するには、ロールを付与する必要がありますか?をご覧ください。

Managed Service for Apache Spark のロール

デフォルトでは、ワークロードまたはセッションにカスタム サービス アカウントが指定されていない限り、バッチとセッションは Compute Engine のデフォルトのサービス アカウントとして実行されます。

サービス アカウント ユーザーのロール

バッチ ワークロードを送信するために必要な権限を取得するには、Compute Engine のデフォルト サービス アカウントに対するサービス アカウント ユーザー roles/iam.serviceAccountUser)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

Dataproc ワーカーのロール

Compute Engine のデフォルト サービス アカウントにバッチ ワークロードを送信するために必要な権限を付与するには、プロジェクトに対する Dataproc ワーカー roles/dataproc.worker)IAM ロールを Compute Engine のデフォルト サービス アカウントに付与するよう管理者に依頼してください。

ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

管理者は、カスタムロールや他の事前定義ロールを使用して、Compute Engine のデフォルト サービス アカウントに必要な権限を付与することもできます。

BigQuery のロール

Managed Service for Apache Spark バッチ ワークロードまたはインタラクティブ セッションの実行に使用されるサービス アカウントには、次のリソースに対する次の IAM ロールが付与されている必要があります。

  • テーブルからデータを読み取るための BigQuery データ閲覧者(roles/bigquery.dataViewer:

    • Spark SQL の SELECT と INSERT INTO の例で bigquery.DATASET_ID.SOURCE_TABLE から読み取ります。
    • DataFrame API の例で INFORMATION_SCHEMA から読み取る。
  • BigQuery ユーザー(roles/bigquery.user。Spark が BigQuery とやり取りするジョブを実行できるようにします。

  • データを書き込むための BigQuery データ編集者(roles/bigquery.dataEditor:

    • Spark SQL の INSERT INTO の例では、bigquery.DATASET_ID.DESTINATION_TABLE に書き込みます。
    • DataFrame API の例で INFORMATION_SCHEMA をクエリする場合、このロールは .option('materializationDataset', ...) で提供される DATASET_ID で必要です。これにより、コネクタは結果の一時テーブルを作成できます。

Spark バッチ ワークロードの送信

Google Cloud コンソール、Google Cloud CLI、または Managed Service for Apache Spark API を使用して、Managed Service for Apache Spark バッチ ワークロードを送信できます。

Spark SQL を使用する

Spark BigQuery カタログを使用すると、バッチ ワークロードまたはインタラクティブ セッションから標準の BigQuery テーブルに直接クエリを実行できます。この方法では、標準の GoogleSQL 構文を使用して spark-sql ジョブ内の BigQuery データを操作できます。PySpark コードを記述したり、DataFrame API を使用して一時ビューを作成する必要はありません。

BigQuery カタログを構成する

BigQuery カタログを有効にするには、次の Spark プロパティを Spark SQL バッチ ワークロードまたはインタラクティブ セッションに指定します。

  • dataproc.sparkBqConnector.version=CONNECTOR_VERSION: Spark BigQuery コネクタのバージョンを指定します。
  • spark.sql.catalog.bigquery=com.google.cloud.spark.bigquery.BigQueryCatalog: (省略可) bigquery カタログを Spark SQL カタログとして登録します。

Google Cloud CLI の例:

gcloud dataproc batches submit spark-sql \
    --project=PROJECT_ID \
    --region=REGION \
    --version=RUNTIME_VERSION \
    --subnet=SUBNET \
    --service-account=SERVICE_ACCOUNT \
    --properties="dataproc.sparkBqConnector.version=CONNECTOR_VERSION,spark.sql.catalog.bigquery=com.google.cloud.spark.bigquery.BigQueryCatalog" \
    gs://BUCKET/my_query.sql

次のように置き換えます。

BigQuery テーブルのクエリ

カタログを構成したら、次の形式 bigquery.DATASET_ID.TABLE_ID を使用して SQL スクリプトで BigQuery テーブルを参照できます。

サンプル SQL クエリ:

-- Query data from a BigQuery table.
SELECT
  column_a,
  SUM(column_b)
FROM
  bigquery.DATASET_ID.SOURCE_TABLE
WHERE
  partition_date = CURRENT_DATE()
GROUP BY column_a;

-- Insert results into another BigQuery table.
INSERT INTO bigquery.DATASET_ID.DESTINATION_TABLE
SELECT column_a, column_b
FROM bigquery.DATASET_ID.SOURCE_TABLE
WHERE column_c = 'some_value';

次のように置き換えます。

  • DATASET_ID: BigQuery データセット ID。
  • SOURCE_TABLE: クエリするテーブルの ID。
  • DESTINATION_TABLE: データを挿入するテーブルの ID。

DataFrame API を使用する

INFORMATION_SCHEMA ビューにアクセスするには、DataFrame API が必要です。

  • INFORMATION_SCHEMA をクエリするには:

    • spark.conf.set('viewsEnabled', 'true') を設定します。
    • コネクタが一時的な結果を書き込むための .option('materializationDataset', 'DATASET_ID') を指定します。

PySpark クエリの例:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('BigQuery Info Schema Test').getOrCreate()

# Required for INFORMATION_SCHEMA.
spark.conf.set('viewsEnabled', 'true')

# Query INFORMATION_SCHEMA.TABLES.
info_schema_df = spark.read.format('bigquery') \
  .option('project', 'PROJECT_ID') \
  .option('materializationDataset', 'DATASET_ID') \
  .load(f'SELECT table_name, creation_time FROM `PROJECT_ID.DATASET_ID.INFORMATION_SCHEMA.TABLES`')
info_schema_df.show(5, truncate=False)

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID。プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
  • DATASET_ID: SparkvBigQuery コネクタが一時データを書き込むことができる BigQuery データセット ID。

標準の BigQuery テーブルからデータを読み取り、結果を出力テーブルに書き込む PySpark の例については、PySpark ワードカウント バッチ ワークロードを送信するをご覧ください。

次のステップ