Cette page explique comment créer un cluster Managed Service pour Apache Spark exécutant Spark. Vous pouvez utiliser ce cluster pour travailler avec les métadonnées Dataplex Universal Catalog pour les lacs, les zones et les éléments.
Présentation
Vous créez un cluster après que l'instance de service Dataproc Metastore est associée au lac Dataplex Universal Catalog pour vous assurer que le cluster peut s'appuyer sur le point de terminaison Hive Metastore afin d'accéder aux métadonnées Dataplex Universal Catalog.
Les métadonnées gérées dans Dataplex Universal Catalog sont accessibles à l'aide d'interfaces standards, telles que Hive Metastore, pour alimenter les requêtes Spark. Les requêtes s'exécutent sur le cluster Managed Service pour Apache Spark.
Pour les données Parquet, définissez la propriété Spark spark.sql.hive.convertMetastoreParquet sur false afin d'éviter les erreurs d'exécution. Pour en savoir plus, consultez la section
Conversion de tables Parquet dans Hive Metastore.
Créer un cluster Managed Service pour Apache Spark
Exécutez les commandes suivantes pour créer un cluster Managed Service pour Apache Spark, en spécifiant le service Dataproc Metastore associé au lac Dataplex Universal Catalog :
GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(endpointUri)" | cut -c9-)
WHDIR=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")
METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.version)")
# This command creates a cluster with default settings. You can customize
# it as needed. The --optional-components, --initialization-actions,
# --metadata and --properties flags are used to connect with
# the associated metastore.
gcloud dataproc clusters create CLUSTER_ID \
--project PROJECT \
--region LOCATION \
--scopes "https://www.googleapis.com/auth/cloud-platform" \
--image-version 2.0-debian10 \
--optional-components=DOCKER \
--initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
--metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
--properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"
Explorer les métadonnées
Exécutez des requêtes DQL pour explorer les métadonnées et des requêtes Spark pour interroger les données.
Avant de commencer
Ouvrez une session SSH sur le nœud principal du cluster Managed Service pour Apache Spark.
VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \ --project PROJECT \ --region LOCATION \ --format "value(config.gceClusterConfig.zoneUri)") gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONEÀ l'invite de commande du nœud principal, ouvrez une nouvelle boucle REPL Python.
python3
Répertorier des bases de données
Chaque zone Dataplex Universal Catalog du lac est mappée à une base de données metastore.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW DATABASES")
df.show()
Répertorier des tables
Répertoriez les tables dans l'une des zones.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW TABLES IN ZONE_ID")
df.show()
Interroger les données
Interrogez les données dans l'une des tables.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
# Modify the SQL statement to retrieve or filter on table columns.
df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
df.show()
Créer des tables et des partitions dans les métadonnées
Exécutez des requêtes DDL pour créer des tables et des partitions dans les métadonnées Dataplex Universal Catalog à l'aide d'Apache Spark.
Pour en savoir plus sur les types de données, les formats de fichiers et les formats de lignes compatibles, consultez la section Valeurs compatibles.
Avant de commencer
Avant de créer une table, créez un élément Dataplex Universal Catalog qui mappe le bucket Cloud Storage contenant les données sous-jacentes. Pour en savoir plus, consultez la section Ajouter un élément.
Créer une table
Les tables Parquet, ORC, AVRO, CSV et JSON sont compatibles.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
df.show()
Modifier une table
Dataplex Universal Catalog ne vous permet pas de modifier l'emplacement d'une table ni de modifier les colonnes de partition d'une table. La modification d'une table ne définit pas automatiquement
set
userManaged
sur true.
Dans Spark SQL, vous pouvez renommer une table, ajouter des colonnes, et définir le format de fichier d'une table.
Renommer une table
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
df.show()
Ajouter des colonnes
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
df.show()
Définir le format de fichier
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
df.show()
Supprimer une table
La suppression d'une table de l'API de métadonnées Dataplex Universal Catalog ne supprime pas les données sous-jacentes dans Cloud Storage.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
df.show()
Ajouter une partition
Dataplex Universal Catalog ne permet pas de modifier une partition une fois qu'elle a été créée. Toutefois, la partition peut être supprimée.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN=VALUE1) PARTITION (COLUMN=VALUE2)")
df.show()
Vous pouvez ajouter plusieurs partitions de la même clé de partition et différentes valeurs de partition, comme illustré dans l'exemple précédent.
Supprimer une partition
Pour supprimer une partition, exécutez la commande suivante :
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
df.show()
Interroger des tables Iceberg
Vous pouvez interroger des tables Iceberg à l'aide d'Apache Spark.
Avant de commencer
Configurez une session Spark SQL avec Iceberg.
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Créer une table Iceberg
Pour créer une table Iceberg, exécutez la commande suivante :
CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');
Explorer l'instantané et l'historique Iceberg
Vous pouvez obtenir des instantanés et l'historique des tables Iceberg à l'aide d'Apache Spark.
Avant de commencer
Configurez une session PySpark avec la compatibilité Iceberg :
pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Obtenir l'historique des tables Iceberg
Pour obtenir l'historique d'une table Iceberg, exécutez la commande suivante :
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)
Obtenir des instantanés des tables Iceberg
Pour obtenir un instantané d'une table Iceberg, exécutez la commande suivante :
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)
Types de données et formats de fichiers compatibles
Les types de données compatibles sont définis comme suit :
| Type de données | Valeurs |
|---|---|
| Primitif |
|
| Tableau | ARRAY < DATA_TYPE > |
| Structure | STRUCT < COLUMN : DATA_TYPE > |
Voici les formats de fichiers compatibles :
TEXTFILEORCPARQUETAVROJSONFILE
Pour en savoir plus sur les formats de fichiers, consultez la section Formats de stockage.
Voici les formats de lignes compatibles :
- DELIMITED [FIELDS TERMINATED BY CHAR]
- SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]
Étape suivante
- Découvrez-en plus sur la gestion des métadonnées pour les lacs, les zones et les éléments.