En esta página, se describe cómo crear un clúster de Managed Service for Apache Spark que ejecute Spark. Puedes usar este clúster para trabajar con los metadatos de Knowledge Catalog (anteriormente, Dataplex Universal Catalog) para lakes, zonas y recursos.
Descripción general
Creas un clúster después de que la instancia del servicio de Dataproc Metastore se asocia con el lake de Knowledge Catalog para garantizar que el clúster pueda depender del extremo de Hive Metastore para acceder a los metadatos de Knowledge Catalog.
Se puede acceder a los metadatos administrados en Knowledge Catalog a través de interfaces estándar, como Hive Metastore, para potenciar las consultas de Spark. Las consultas se ejecutan en el clúster del servicio administrado para Apache Spark.
Para los datos de Parquet, configura la propiedad spark.sql.hive.convertMetastoreParquet de Spark como false para evitar errores de ejecución. Para obtener más información, consulta Conversión de tablas de Parquet del metastore de Hive.
Crea un clúster de Managed Service para Apache Spark
Ejecuta los siguientes comandos para crear un clúster de Managed Service for Apache Spark y especifica el servicio de Dataproc Metastore asociado con el data lake de Knowledge Catalog:
GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(endpointUri)" | cut -c9-)
WHDIR=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")
METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.version)")
# This command creates a cluster with default settings. You can customize
# it as needed. The --optional-components, --initialization-actions,
# --metadata and --properties flags are used to connect with
# the associated metastore.
gcloud dataproc clusters create CLUSTER_ID \
--project PROJECT \
--region LOCATION \
--scopes "https://www.googleapis.com/auth/cloud-platform" \
--image-version 2.0-debian10 \
--optional-components=DOCKER \
--initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
--metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
--properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"
Explorar metadatos
Ejecuta consultas en DQL para explorar los metadatos y consultas en Spark para consultar los datos.
Antes de comenzar
Abre una sesión de SSH en el nodo principal del clúster de Managed Service para Apache Spark.
VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \ --project PROJECT \ --region LOCATION \ --format "value(config.gceClusterConfig.zoneUri)") gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONEEn el símbolo del sistema del nodo principal, abre un nuevo REPL de Python.
python3
Enumerar bases de datos
Cada zona de Knowledge Catalog dentro del lake se asigna a una base de datos del metastore.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW DATABASES")
df.show()
Enumerar tablas
Enumera las tablas en una de las zonas.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW TABLES IN ZONE_ID")
df.show()
Consulta los datos
Consulta los datos en una de las tablas.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
# Modify the SQL statement to retrieve or filter on table columns.
df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
df.show()
Crea tablas y particiones en los metadatos
Ejecuta consultas DDL para crear tablas y particiones en los metadatos de Knowledge Catalog con Apache Spark.
Para obtener más información sobre los tipos de datos, los formatos de archivo y los formatos de filas admitidos, consulta Valores admitidos.
Antes de comenzar
Antes de crear una tabla, crea un activo de Knowledge Catalog que se asigne al bucket de Cloud Storage que contiene los datos subyacentes. Para obtener más información, consulta Cómo agregar un activo.
Crea una tabla
Se admiten tablas en formato Parquet, ORC, AVRO, CSV y JSON.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
df.show()
Cómo modificar una tabla
Knowledge Catalog no te permite alterar la ubicación de una tabla ni editar las columnas de partición de una tabla. La modificación de una tabla no establece automáticamente userManaged en true.
En Spark SQL, puedes cambiar el nombre de una tabla, agregar columnas y configurar el formato de archivo de una tabla.
Cambia el nombre de una tabla
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
df.show()
Agrega columnas
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
df.show()
Cómo establecer el formato de archivo
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
df.show()
Cómo soltar una mesa
Si se descarta una tabla de la API de metadatos de Dataplex, no se borran los datos subyacentes en Cloud Storage.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
df.show()
Agregar una partición
Knowledge Catalog no permite modificar una partición una vez creada. Sin embargo, se puede descartar la partición.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN=VALUE1) PARTITION (COLUMN=VALUE2)")
df.show()
Puedes agregar varias particiones de la misma clave de partición y diferentes valores de partición, como se muestra en el ejemplo anterior.
Cómo soltar una partición
Para descartar una partición, ejecuta el siguiente comando:
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
df.show()
Consulta tablas de Iceberg
Puedes consultar tablas de Iceberg con Apache Spark.
Antes de comenzar
Configura una sesión de Spark SQL con Iceberg.
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Crea una tabla de Iceberg
Para crear una tabla de Iceberg, ejecuta el siguiente comando:
CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');
Explora el historial y la instantánea de Iceberg
Puedes obtener instantáneas y el historial de las tablas de Iceberg con Apache Spark.
Antes de comenzar
Configura una sesión de PySpark con compatibilidad con Iceberg:
pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Obtén el historial de las tablas de Iceberg
Para obtener el historial de una tabla de Iceberg, ejecuta el siguiente comando:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)
Obtén instantáneas de tablas de Iceberg
Para obtener una instantánea de una tabla de Iceberg, ejecuta el siguiente comando:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)
Tipos de datos y formatos de archivo admitidos
Los tipos de datos admitidos se definen de la siguiente manera:
| Tipo de datos | Valores |
|---|---|
| Básico |
|
| Arreglo | ARRAY < DATA_TYPE > |
| Estructura | STRUCT < COLUMN : DATA_TYPE > |
Estos son los formatos de archivo admitidos:
TEXTFILEORCPARQUETAVROJSONFILE
Para obtener más información sobre los formatos de archivo, consulta Formatos de almacenamiento.
Se admiten los siguientes formatos de filas:
- DELIMITED [FIELDS TERMINATED BY CHAR]
- SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]
¿Qué sigue?
- Obtén más información para administrar los metadatos de lagos, zonas y activos.