Nesta página, descrevemos como criar um cluster do Managed Service para Apache Spark que executa o Spark. Você pode usar esse cluster para trabalhar com metadados do Knowledge Catalog (antigo Dataplex Universal Catalog) para lakes, zonas e recursos.
Visão geral
Crie um cluster depois que a instância do serviço do metastore do Dataproc for associada ao lake do Knowledge Catalog para garantir que o cluster possa usar o endpoint do metastore do Hive e acessar os metadados do Knowledge Catalog.
Os metadados gerenciados no Knowledge Catalog podem ser acessados usando interfaces padrão, como o metastore do Hive, para impulsionar consultas do Spark. As consultas são executadas no cluster do Managed Service for Apache Spark.
Para dados do Parquet, defina a propriedade do Spark spark.sql.hive.convertMetastoreParquet como false para evitar erros de execução. Para mais informações, consulte
Conversão de tabelas Parquet do metastore do Hive.
Criar um cluster do Managed Service for Apache Spark
Execute os comandos a seguir para criar um cluster do serviço gerenciado para Apache Spark, especificando o serviço do metastore do Dataproc associado ao data lake do Knowledge Catalog:
GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(endpointUri)" | cut -c9-)
WHDIR=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")
METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.version)")
# This command creates a cluster with default settings. You can customize
# it as needed. The --optional-components, --initialization-actions,
# --metadata and --properties flags are used to connect with
# the associated metastore.
gcloud dataproc clusters create CLUSTER_ID \
--project PROJECT \
--region LOCATION \
--scopes "https://www.googleapis.com/auth/cloud-platform" \
--image-version 2.0-debian10 \
--optional-components=DOCKER \
--initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
--metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
--properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"
Explorar metadados
Execute consultas DQL para explorar os metadados e execute consultas do Spark para consultar dados.
Antes de começar
Abra uma sessão SSH no nó principal do cluster do Managed Service for Apache Spark.
VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \ --project PROJECT \ --region LOCATION \ --format "value(config.gceClusterConfig.zoneUri)") gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONENo prompt de comando do nó principal, abra um novo REPL do Python.
python3
Listar bancos de dados
Cada zona do Knowledge Catalog no lake é mapeada para um banco de dados do metastore.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW DATABASES")
df.show()
Listar tabelas
Liste as tabelas em uma das zonas.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW TABLES IN ZONE_ID")
df.show()
Consultar dados
Consulte os dados em uma das tabelas.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
# Modify the SQL statement to retrieve or filter on table columns.
df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
df.show()
Criar tabelas e partições em metadados
Execute consultas DDL para criar tabelas e partições nos metadados do Knowledge Catalog usando o Apache Spark.
Para mais informações sobre os tipos de dados, formatos de arquivo e formatos de linha aceitos, consulte Valores aceitos.
Antes de começar
Antes de criar uma tabela, crie um recurso do Knowledge Catalog que mapeie o bucket do Cloud Storage com os dados subjacentes. Para mais informações, consulte Adicionar um recurso.
Criar uma tabela
As tabelas Parquet, ORC, AVRO, CSV e JSON são compatíveis.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
df.show()
Alterar uma tabela
O catálogo de dados não permite alterar o local de uma tabela nem editar as colunas de partição dela. Alterar uma tabela não define automaticamente
userManaged
como true.
No Spark SQL, é possível renomear uma tabela, adicionar colunas e definir o formato de arquivo de uma tabela.
Renomear uma tabela
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
df.show()
Adicionar colunas
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
df.show()
Definir o formato do arquivo
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
df.show()
Remover uma tabela
Remover uma tabela da API de metadados do Dataplex não exclui os dados subjacentes no Cloud Storage.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
df.show()
Adicionar uma partição
O Knowledge Catalog não permite alterar uma partição depois que ela é criada. No entanto, a partição pode ser descartada.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN=VALUE1) PARTITION (COLUMN=VALUE2)")
df.show()
É possível adicionar várias partições da mesma chave de partição e valores de partição diferentes, conforme mostrado no exemplo anterior.
Soltar uma partição
Para descartar uma partição, execute o seguinte comando:
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
df.show()
Consultar tabelas Iceberg
É possível consultar tabelas do Iceberg usando o Apache Spark.
Antes de começar
Configure uma sessão do Spark SQL com o Iceberg.
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Criar uma tabela do Iceberg
Para criar uma tabela do Iceberg, execute o seguinte comando:
CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');
Analisar o snapshot e o histórico do Iceberg
É possível receber snapshots e o histórico de tabelas do Iceberg usando o Apache Spark.
Antes de começar
Configure uma sessão do PySpark com suporte ao Iceberg:
pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Receber o histórico de tabelas do Iceberg
Para acessar o histórico de uma tabela do Iceberg, execute o seguinte comando:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)
Receber snapshots de tabelas Iceberg
Para receber um snapshot de uma tabela do Iceberg, execute o seguinte comando:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)
Tipos de dados e formatos de arquivo aceitos
Os tipos de dados compatíveis são definidos da seguinte maneira:
| Tipo de dado | Valores |
|---|---|
| Primário |
|
| Matriz | ARRAY < DATA_TYPE > |
| Estrutura | STRUCT < COLUMN : DATA_TYPE > |
Estes são os formatos de arquivo compatíveis:
TEXTFILEORCPARQUETAVROJSONFILE
Para mais informações sobre os formatos de arquivo, consulte Formatos de armazenamento.
Estes são os formatos de linha compatíveis:
- DELIMITED [FIELDS TERMINATED BY CHAR]
- SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]
A seguir
- Saiba mais sobre como gerenciar metadados de data lakes, zonas e recursos.