BigQuery-Tabellen abfragen
In diesem Dokument wird erläutert, wie Sie Spark SQL und die Spark DataFrame API in Managed Service for Apache Spark-Arbeitslasten verwenden können, um BigQuery-Tabellen abzufragen.
Hinweis
Aktivieren Sie die APIs und weisen Sie bei Bedarf Identity and Access Management-Rollen zu.
APIs aktivieren
- Melden Sie sich in Ihrem Google Cloud Konto an. Wenn Sie noch kein Konto bei Google Cloudhaben, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc and BigQuery APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc and BigQuery APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.
Identity and Access Management-Rollen zuweisen
Für die Ausführung der Beispiele auf dieser Seite sind Rollenzuweisungen für Managed Service for Apache Spark und BigQuery erforderlich. Je nach Organisationsrichtlinie wurden diese Rollen möglicherweise bereits zugewiesen. Informationen zum Prüfen von Rollenzuweisungen finden Sie unter Müssen Sie Rollen zuweisen?.
Managed Service for Apache Spark-Rollen
Standardmäßig werden Batches und Sitzungen als das Compute Engine-Standarddienstkonto ausgeführt, es sei denn, für die Arbeitslast oder Sitzung ist ein benutzerdefiniertes Dienstkonto angegeben.
Rolle „Dienstkontonutzer“
Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle „Dienstkontonutzer “ (roles/iam.serviceAccountUser) für das Compute Engine-Standarddienstkonto zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Senden einer Batch-Arbeitslast benötigen.
Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.
Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.
Rolle „Dataproc-Worker“
Bitten Sie Ihren Administrator, dem Compute Engine-Standarddienstkonto die IAM-Rolle „Dataproc-Worker “ (roles/dataproc.worker) für das Projekt zuzuweisen, damit das Compute Engine-Standarddienstkonto die erforderlichen Berechtigungen zum Senden einer Batch-Arbeitslast hat.
Ihr Administrator kann dem Compute Engine-Standarddienstkonto die erforderlichen Berechtigungen möglicherweise auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen zuweisen.
BigQuery-Rollen
Dem Dienstkonto, das zum Ausführen einer Managed Service for Apache Spark-Batch-Arbeitslast oder einer interaktiven Sitzung verwendet wird, müssen die folgenden IAM-Rollen für die folgenden Ressourcen zugewiesen werden:
BigQuery-Datenbetrachter (
roles/bigquery.dataViewer) zum Lesen von Daten aus Tabellen, wie folgt:- Lesen aus „bigquery.DATASET_ID.SOURCE_TABLE “ in den Spark SQL-Beispielen für SELECT und INSERT INTO.
- Lesen aus „INFORMATION_SCHEMA“ im Beispiel für die DataFrame API.
BigQuery-Nutzer (
roles/bigquery.user), damit Spark Jobs ausführen kann, die mit BigQuery interagieren.BigQuery-Datenbearbeiter (
roles/bigquery.dataEditor) zum Schreiben von Daten oder Metadaten, wie folgt:- Für das Spark SQL-Beispiel für INSERT INTO zum Schreiben in „bigquery.DATASET_ID.DESTINATION_TABLE“.
- Für das Beispiel für die DataFrame API, in dem „INFORMATION_SCHEMA“ abgefragt wird, ist diese Rolle
für die DATASET_ID in
.option('materializationDataset', ...)erforderlich, damit der Connector temporäre Tabellen für die Ergebnisse erstellen kann.
Spark-Batch-Arbeitslast senden
Sie können die Google Cloud console, die Google Cloud CLI oder die Managed Service for Apache Spark API verwenden, um eine Managed Service for Apache Spark-Batch-Arbeitslast zu senden.
Spark SQL nutzen
Sie können den Spark BigQuery-Katalog verwenden, um Standard-BigQuery-Tabellen direkt aus Batch-Arbeitslasten oder interaktiven Sitzungen abzufragen. Mit dieser Methode können Sie die Standard-GoogleSQL-Syntax verwenden, um mit BigQuery-Daten in spark-sql-Jobs zu interagieren, ohne PySpark-Code schreiben oder temporäre Ansichten mit der DataFrame API erstellen zu müssen.
BigQuery-Katalog konfigurieren
Wenn Sie den BigQuery-Katalog aktivieren möchten, geben Sie die folgenden Spark-Eigenschaften für Ihre Spark SQL-Batch-Arbeitslast oder interaktive Sitzung an:
dataproc.sparkBqConnector.version=CONNECTOR_VERSION: Gibt die Version des Spark BigQuery-Connectors an.spark.sql.catalog.bigquery=com.google.cloud.spark.bigquery.BigQueryCatalog: (Optional) Registriert denbigquery-Katalog als Spark SQL-Katalog.
Google Cloud CLI-Beispiel:
gcloud dataproc batches submit spark-sql \
--project=PROJECT_ID \
--region=REGION \
--version=RUNTIME_VERSION \
--subnet=SUBNET \
--service-account=SERVICE_ACCOUNT \
--properties="dataproc.sparkBqConnector.version=CONNECTOR_VERSION,spark.sql.catalog.bigquery=com.google.cloud.spark.bigquery.BigQueryCatalog" \
gs://BUCKET/my_query.sql
Ersetzen Sie Folgendes:
PROJECT_ID: Projekt-ID. Projekt-IDs werden im Bereich Projektinformationen im Google Cloud console Dashboard aufgeführt.REGION: Region, in der der Batch ausgeführt wirdRUNTIME_VERSION: Optional. Managed Service for Apache Spark-Laufzeitversion. Wenn nichts angegeben ist, wird die aktuelle Standardlaufzeitversion ausgewählt.CONNECTOR_VERSION: Version des Spark BigQuery-Connectors. Eine mit derRUNTIME_VERSIONkompatible Connectorversion finden Sie unter Managed Service for Apache Spark-Laufzeitversionen. Wenn der Connector nicht vorinstalliert ist, finden Sie die verfügbaren Versionen auf der GitHub-Seite mit den Releases.SUBNET: Optional. Subnetz, das für die Batch-Arbeitslast verwendet werden soll. Wenn nichts angegeben ist, wird dasdefault-Subnetz verwendet.SERVICE_ACCOUNT: Optional. Dienstkonto, mit dem der Batch-Job ausgeführt wird. Wenn nichts angegeben ist, wird das Compute Engine-Standarddienstkonto verwendet.BUCKET: Cloud Storage-Bucket mit der SQL-Datei.
BigQuery-Tabellen abfragen
Nachdem Sie den Katalog konfiguriert haben, können Sie mit dem folgenden
Format in einem SQL-Script auf
BigQuery-Tabellen verweisen: bigquery.DATASET_ID.TABLE_ID.
SQL-Beispielabfrage:
-- Query data from a BigQuery table.
SELECT
column_a,
SUM(column_b)
FROM
bigquery.DATASET_ID.SOURCE_TABLE
WHERE
partition_date = CURRENT_DATE()
GROUP BY column_a;
-- Insert results into another BigQuery table.
INSERT INTO bigquery.DATASET_ID.DESTINATION_TABLE
SELECT column_a, column_b
FROM bigquery.DATASET_ID.SOURCE_TABLE
WHERE column_c = 'some_value';
Ersetzen Sie Folgendes:
DATASET_ID: BigQuery-Dataset-ID.SOURCE_TABLE: ID der abzufragenden Tabelle.DESTINATION_TABLE: ID der Tabelle, in die Daten eingefügt werden sollen.
DataFrame API verwenden
Die DataFrame API ist erforderlich, um auf INFORMATION_SCHEMA-Ansichten zuzugreifen.
So fragen Sie
INFORMATION_SCHEMAab:- Legen Sie
spark.conf.set('viewsEnabled', 'true')fest. - Geben Sie
.option('materializationDataset', 'DATASET_ID')an, damit der Connector temporäre Ergebnisse schreiben kann.
- Legen Sie
PySpark-Beispielabfrage:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('BigQuery Info Schema Test').getOrCreate()
# Required for INFORMATION_SCHEMA.
spark.conf.set('viewsEnabled', 'true')
# Query INFORMATION_SCHEMA.TABLES.
info_schema_df = spark.read.format('bigquery') \
.option('project', 'PROJECT_ID') \
.option('materializationDataset', 'DATASET_ID') \
.load(f'SELECT table_name, creation_time FROM `PROJECT_ID.DATASET_ID.INFORMATION_SCHEMA.TABLES`')
info_schema_df.show(5, truncate=False)
Ersetzen Sie Folgendes:
PROJECT_ID: Projekt-ID. Projekt-IDs werden im Bereich Projektinformationen im Google Cloud console Dashboard aufgeführt.DATASET_ID: BigQuery-Dataset-ID , in die der Spark-BigQuery-Connector temporäre Daten schreiben kann.
Unter Batch-Arbeitslast für die Wortzählung mit PySpark senden finden Sie ein PySpark-Beispiel, in dem Daten aus einer Standard-BigQuery Tabelle gelesen und die Ergebnisse dann in eine Ausgabetabelle geschrieben werden.
Nächste Schritte
- Informationen zum Spark BigQuery-Connector.
- Kontingente für Managed Service for Apache Spark ansehen