Auf dieser Seite wird beschrieben, wie Sie einen Managed Service for Apache Spark Cluster erstellen, auf dem Spark ausgeführt wird. Sie können diesen Cluster verwenden, um mit Dataplex Universal Catalog-Metadaten für Lakes, Zonen und Assets zu arbeiten.
Übersicht
Sie erstellen einen Cluster, nachdem die Dataproc Metastore-Dienstinstanz mit dem Dataplex Universal Catalog-Lake verknüpft wurde. So kann der Cluster den Hive Metastore-Endpunkt verwenden, um auf Dataplex Universal Catalog-Metadaten zuzugreifen.
Auf Metadaten, die in Dataplex Universal Catalog verwaltet werden, kann über Standardschnittstellen wie Hive Metastore zugegriffen werden, um Spark-Abfragen zu ermöglichen. Die Abfragen werden im Managed Service for Apache Spark-Cluster ausgeführt.
Legen Sie für Parquet-Daten die Spark-Eigenschaft spark.sql.hive.convertMetastoreParquet auf false fest, um Ausführungsfehler zu vermeiden. Weitere Informationen finden Sie unter
Hive Metastore-Parquet-Tabellenkonvertierung.
Managed Service for Apache Spark-Cluster erstellen
Führen Sie die folgenden Befehle aus, um einen Managed Service for Apache Spark-Cluster zu erstellen. Geben Sie dabei den Dataproc Metastore-Dienst an, der mit dem Dataplex Universal Catalog-Lake verknüpft ist:
GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(endpointUri)" | cut -c9-)
WHDIR=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")
METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.version)")
# This command creates a cluster with default settings. You can customize
# it as needed. The --optional-components, --initialization-actions,
# --metadata and --properties flags are used to connect with
# the associated metastore.
gcloud dataproc clusters create CLUSTER_ID \
--project PROJECT \
--region LOCATION \
--scopes "https://www.googleapis.com/auth/cloud-platform" \
--image-version 2.0-debian10 \
--optional-components=DOCKER \
--initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
--metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
--properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"
Metadaten untersuchen
Führen Sie DQL-Abfragen aus, um die Metadaten zu untersuchen, und führen Sie Spark-Abfragen aus, um Daten abzufragen.
Hinweis
Öffnen Sie eine SSH-Sitzung auf dem primären Knoten des Managed Service for Apache Spark-Clusters.
VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \ --project PROJECT \ --region LOCATION \ --format "value(config.gceClusterConfig.zoneUri)") gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONEÖffnen Sie in der Eingabeaufforderung des primären Knotens eine neue Python-REPL.
python3
Datenbanken auflisten
Jede Dataplex Universal Catalog-Zone im Lake wird einer Metastore-Datenbank zugeordnet.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW DATABASES")
df.show()
Tabellen auflisten
Listen Sie Tabellen in einer der Zonen auf.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW TABLES IN ZONE_ID")
df.show()
Daten abfragen
Fragen Sie die Daten in einer der Tabellen ab.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
# Modify the SQL statement to retrieve or filter on table columns.
df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
df.show()
Tabellen und Partitionen in Metadaten erstellen
Führen Sie DDL-Abfragen aus, um mit Apache Spark Tabellen und Partitionen in Dataplex Universal Catalog-Metadaten zu erstellen.
Weitere Informationen zu den unterstützten Datentypen, Dateiformaten und Zeilen formaten finden Sie unter Unterstützte Werte.
Hinweis
Bevor Sie eine Tabelle erstellen, erstellen Sie ein Dataplex Universal Catalog-Asset, das dem Cloud Storage-Bucket mit den zugrunde liegenden Daten zugeordnet ist. Weitere Informationen finden Sie unter Asset hinzufügen.
Tabelle erstellen
Parquet-, ORC-, AVRO-, CSV- und JSON-Tabellen werden unterstützt.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
df.show()
Tabelle ändern
Mit Dataplex Universal Catalog können Sie den Speicherort einer Tabelle nicht ändern oder die Partitionspalten für eine Tabelle bearbeiten. Wenn Sie eine Tabelle ändern, wird
userManaged
nicht automatisch auf true gesetzt.
In Spark SQL können Sie eine Tabelle umbenennen, Spalten hinzufügen, und das Dateiformat einer Tabelle festlegen.
Tabelle umbenennen
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
df.show()
Spalten hinzufügen
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
df.show()
Dateiformat festlegen
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
df.show()
Tabelle löschen
Wenn Sie eine Tabelle über die Dataplex Universal Catalog-Metadaten-API löschen, werden die zugrunde liegenden Daten in Cloud Storage nicht gelöscht.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
df.show()
Partition hinzufügen
In Dataplex Universal Catalog können Sie eine Partition nach dem Erstellen nicht mehr ändern. Die Partition kann jedoch gelöscht werden.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN=VALUE1) PARTITION (COLUMN=VALUE2)")
df.show()
Sie können mehrere Partitionen mit demselben Partitionsschlüssel und unterschiedlichen Partitionswerten hinzufügen, wie im vorherigen Beispiel gezeigt.
Partition löschen
Führen Sie den folgenden Befehl aus, um eine Partition zu löschen:
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
df.show()
Iceberg-Tabellen abfragen
Sie können Iceberg-Tabellen mit Apache Spark abfragen.
Hinweis
Richten Sie eine Spark SQL-Sitzung mit Iceberg ein.
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Iceberg-Tabelle erstellen
Führen Sie den folgenden Befehl aus, um eine Iceberg-Tabelle zu erstellen:
CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');
Iceberg-Snapshot und -Verlauf untersuchen
Sie können Snapshots und den Verlauf von Iceberg-Tabellen mit Apache Spark abrufen.
Hinweis
Richten Sie eine PySpark-Sitzung mit Iceberg-Unterstützung ein:
pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Verlauf von Iceberg-Tabellen abrufen
Führen Sie den folgenden Befehl aus, um den Verlauf einer Iceberg-Tabelle abzurufen:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)
Snapshots von Iceberg-Tabellen abrufen
Führen Sie den folgenden Befehl aus, um einen Snapshot einer Iceberg-Tabelle abzurufen:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)
Unterstützte Datentypen und Dateiformate
Die unterstützten Datentypen sind so definiert:
| Datentyp | Werte |
|---|---|
| Primitiv |
|
| Array | ARRAY < DATA_TYPE > |
| Struktur | STRUCT < COLUMN : DATA_TYPE > |
Folgende Dateiformate werden unterstützt:
TEXTFILEORCPARQUETAVROJSONFILE
Weitere Informationen zu den Dateiformaten finden Sie unter Speicherformate.
Folgende Zeilenformate werden unterstützt:
- DELIMITED [FIELDS TERMINATED BY CHAR]
- SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]
Nächste Schritte
- Weitere Informationen zum Verwalten von Metadaten für Lakes, Zonen und Assets.