Usar o catálogo de tempo de execução do Lakehouse com tabelas no BigQuery

Este documento mostra um exemplo de como usar o catálogo de tempo de execução do Lakehouse com tabelas do BigQuery e o Serviço Gerenciado para Apache Spark.

Com o catálogo de tempo de execução do Lakehouse, é possível criar e usar tabelas padrão (integradas), tabelas do Apache Iceberg gerenciadas pelo BigQuery e tabelas externas do Apache Iceberg no BigQuery.

Formatos de tabela compatíveis

Somente tabelas do Apache Iceberg V2 são aceitas. As tabelas do Iceberg V1 não são compatíveis. Se você tiver tabelas do Iceberg V1, faça upgrade para a V2 (por exemplo, executando ALTER TABLE catalog.schema.table SET TBLPROPERTIES ('format-version'='2'); ou usando operações de mecanismo semelhantes) antes de usá-las com o catálogo de tempo de execução do Lakehouse.

Antes de começar

  1. Ative o faturamento para o projeto do Google Cloud . Saiba como verificar se o faturamento está ativado em um projeto.
  2. Ative as APIs BigQuery e Dataproc.

    Ativar as APIs

Funções exigidas

Para receber as permissões necessárias para usar o Serviço gerenciado para Apache Spark com o catálogo de tempo de execução do Lakehouse como um repositório de metadados, peça ao administrador para conceder a você os seguintes papéis do IAM:

Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Também é possível conseguir as permissões necessárias usando papéis personalizados ou outros papéis predefinidos.

Conectar-se a uma tabela

  1. Crie um conjunto de dados no console do Google Cloud .

    CREATE SCHEMA `PROJECT_ID`.DATASET_NAME;

    Substitua:

    • PROJECT_ID: o ID do projeto Google Cloud para criar o conjunto de dados.
    • DATASET_NAME: um nome para o conjunto de dados.
  2. Crie uma conexão de recursos do Cloud.

  3. Crie uma tabela padrão do BigQuery.

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.TABLE_NAME (name STRING,id INT64);

    Substitua:

    • TABLE_NAME: um nome para a tabela.
  4. Insira dados na tabela padrão do BigQuery.

    INSERT INTO `PROJECT_ID`.DATASET_NAME.TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  5. Crie uma tabela do Apache Iceberg gerenciada pelo BigQuery.

    Por exemplo, para criar uma tabela, execute a seguinte instrução CREATE.

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME(
    name STRING,id INT64
    )
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
    file_format = 'PARQUET',
    table_format = 'ICEBERG',
    storage_uri = 'STORAGE_URI');

    Substitua:

    • ICEBERG_TABLE_NAME: um nome para sua tabela gerenciada do Apache Iceberg. Por exemplo, iceberg_managed_table.
    • CONNECTION_NAME: o nome da conexão. Você criou isso na etapa anterior. Por exemplo, myproject.us.myconnection.
    • STORAGE_URI: um URI do Cloud Storage totalmente qualificado. Por exemplo, gs://mybucket/table.
  6. Insira dados na tabela gerenciada do Apache Iceberg do BigQuery.

    INSERT INTO `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  7. Crie uma tabela externa do Apache Iceberg.

    Por exemplo, para criar uma tabela externa do Apache Iceberg, execute a seguinte instrução CREATE.

    CREATE OR REPLACE EXTERNAL TABLE  `PROJECT_ID`.DATASET_NAME.READONLY_ICEBERG_TABLE_NAME
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
      format = 'ICEBERG',
      uris =
        ['BUCKET_PATH'],
      require_partition_filter = FALSE);

    Substitua:

    • READONLY_ICEBERG_TABLE_NAME: um nome para sua tabela somente leitura.
    • BUCKET_PATH: o caminho para o bucket do Cloud Storage que contém os dados da tabela externa, no formato ['gs://bucket_name/[folder_name/]file_name'].
  8. No Apache Spark, consulte a tabela padrão, a tabela do Apache Iceberg gerenciada pelo BigQuery e a tabela externa do Apache Iceberg.

    from pyspark.sql import SparkSession
    
    # Create a spark session
    spark = SparkSession.builder \
    .appName("Lakehouse runtime catalog Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    spark.conf.set("viewsEnabled","true")
    
    # Use the Lakehouse runtime catalog
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("USE NAMESPACE DATASET_NAME;")
    
    # Configure spark for temp results
    spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
    spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")
    
    # List the tables in the dataset
    df = spark.sql("SHOW TABLES;")
    df.show();
    
    # Query the tables
    sql = """SELECT * FROM DATASET_NAME.TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    sql = """SELECT * FROM DATASET_NAME.READONLY_ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()

    Substitua:

    • WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage conectada à tabela gerenciada do Apache Iceberg do BigQuery e à tabela externa do Apache Iceberg.
    • CATALOG_NAME: o nome do catálogo que você está usando.
    • MATERIALIZATION_NAMESPACE: o namespace para armazenar resultados temporários.
  9. Execute o script do Apache Spark usando o Serviço Gerenciado para Apache Spark.

    gcloud dataproc batches submit pyspark SCRIPT_PATH \
      --version=2.2 \
      --project=PROJECT_ID \
      --region=REGION \
      --deps-bucket=YOUR_BUCKET \

    Substitua:

    • SCRIPT_PATH: o caminho para o script usado pelo job em lote.
    • PROJECT_ID: o ID do Google Cloud projeto em que o job em lote será executado.
    • REGION: a região em que sua carga de trabalho é executada.
    • YOUR_BUCKET: o local do bucket do Cloud Storage para fazer upload das dependências da carga de trabalho. O prefixo de URI gs:// do bucket não é necessário. É possível especificar o caminho ou o nome do bucket, por exemplo, mybucketname1.

A seguir