Questa pagina descrive come creare un Managed Service per Apache Spark cluster che esegue Spark. Puoi utilizzare questo cluster per lavorare con i metadati di Dataplex Universal Catalog per lake, zone e asset.
Panoramica
Crea un cluster dopo che l'istanza del servizio Dataproc Metastore è associata al lake Dataplex Universal Catalog per assicurarti che il cluster possa fare affidamento sull'endpoint Hive Metastore per accedere ai metadati di Dataplex Universal Catalog.
È possibile accedere ai metadati gestiti in Dataplex Universal Catalog utilizzando interfacce standard, come Hive Metastore, per alimentare le query Spark. Le query vengono eseguite sul cluster Managed Service per Apache Spark.
Per i dati Parquet, imposta la proprietà Spark spark.sql.hive.convertMetastoreParquet su false per evitare errori di esecuzione. Per ulteriori informazioni, consulta
Conversione della tabella Parquet del metastore Hive.
Crea un cluster Managed Service per Apache Spark
Esegui i seguenti comandi per creare un cluster Managed Service per Apache Spark, specificando il servizio Dataproc Metastore associato al lake Dataplex Universal Catalog:
GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(endpointUri)" | cut -c9-)
WHDIR=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")
METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.version)")
# This command creates a cluster with default settings. You can customize
# it as needed. The --optional-components, --initialization-actions,
# --metadata and --properties flags are used to connect with
# the associated metastore.
gcloud dataproc clusters create CLUSTER_ID \
--project PROJECT \
--region LOCATION \
--scopes "https://www.googleapis.com/auth/cloud-platform" \
--image-version 2.0-debian10 \
--optional-components=DOCKER \
--initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
--metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
--properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"
Esplora i metadati
Esegui query DQL per esplorare i metadati ed esegui query Spark per eseguire query sui dati.
Prima di iniziare
Apri una sessione SSH sul nodo principale del cluster Managed Service per Apache Spark.
VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \ --project PROJECT \ --region LOCATION \ --format "value(config.gceClusterConfig.zoneUri)") gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONEAl prompt dei comandi del nodo principale, apri un nuovo REPL Python.
python3
Elenco database
Ogni zona Dataplex Universal Catalog all'interno del lake esegue il mapping a un database metastore.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW DATABASES")
df.show()
Elenca tabelle
Elenca le tabelle in una delle zone.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW TABLES IN ZONE_ID")
df.show()
Esegui query sui dati
Esegui query sui dati in una delle tabelle.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
# Modify the SQL statement to retrieve or filter on table columns.
df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
df.show()
Crea tabelle e partizioni nei metadati
Esegui query DDL per creare tabelle e partizioni nei metadati di Dataplex Universal Catalog utilizzando Apache Spark.
Per ulteriori informazioni sui tipi di dati, i formati di file e i formati di riga supportati, consulta Valori supportati.
Prima di iniziare
Prima di creare una tabella, crea un asset Dataplex Universal Catalog che esegue il mapping al bucket Cloud Storage contenente i dati sottostanti. Per ulteriori informazioni, consulta Aggiungere un asset.
Crea una tabella
Sono supportate le tabelle Parquet, ORC, AVRO, CSV e JSON.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
df.show()
Modifica una tabella
Dataplex Universal Catalog non consente di modificare la posizione di una tabella o di modificare le colonne di partizione per una tabella. La modifica di una tabella non imposta automaticamente
set
userManaged
su true.
In Spark SQL, puoi rinominare una tabella, aggiungere colonne, e impostare il formato file di una tabella.
Rinomina una tabella
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
df.show()
Aggiungi colonne
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
df.show()
Imposta il formato file
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
df.show()
Elimina una tabella
L'eliminazione di una tabella dall'API dei metadati di Dataplex Universal Catalog non elimina i dati sottostanti in Cloud Storage.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
df.show()
Aggiungi una partizione
Dataplex Universal Catalog non consente di modificare una partizione una volta creata. Tuttavia, la partizione può essere eliminata.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN=VALUE1) PARTITION (COLUMN=VALUE2)")
df.show()
Puoi aggiungere più partizioni della stessa chiave di partizione e valori di partizione diversi, come mostrato nell'esempio precedente.
Elimina una partizione
Per eliminare una partizione, esegui il comando seguente:
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
df.show()
Esegui query sulle tabelle Iceberg
Puoi eseguire query sulle tabelle Iceberg utilizzando Apache Spark.
Prima di iniziare
Configura una sessione Spark SQL con Iceberg.
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Crea una tabella Iceberg
Per creare una tabella Iceberg, esegui il comando seguente:
CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');
Esplora lo snapshot e la cronologia di Iceberg
Puoi ottenere snapshot e cronologia delle tabelle Iceberg utilizzando Apache Spark.
Prima di iniziare
Configura una sessione PySpark con il supporto di Iceberg:
pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Ottieni la cronologia delle tabelle Iceberg
Per ottenere la cronologia di una tabella Iceberg, esegui il comando seguente:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)
Ottieni snapshot delle tabelle Iceberg
Per ottenere uno snapshot di una tabella Iceberg, esegui il comando seguente:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)
Tipi di dati e formati di file supportati
I tipi di dati supportati sono definiti come segue:
| Tipo di dati | Valori |
|---|---|
| originario |
|
| Array | ARRAY < DATA_TYPE > |
| Strutturazione | STRUCT < COLUMN : DATA_TYPE > |
Di seguito sono riportati i formati di file supportati:
TEXTFILEORCPARQUETAVROJSONFILE
Per ulteriori informazioni sui formati di file, consulta Formati di archiviazione.
Di seguito sono riportati i formati di riga supportati:
- DELIMITED [FIELDS TERMINATED BY CHAR]
- SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]