Usar o conector do Spark Spanner

Nesta página, mostramos como criar um cluster do Serviço Gerenciado para Apache Spark que usa o conector do Spark Spanner para ler e gravar dados no Spanner usando o Apache Spark.

O conector do Spanner funciona com o Spark para ler e gravar dados no banco de dados do Spanner usando a biblioteca Java do Spanner. O conector do Spanner permite ler tabelas e gráficos do Spanner em DataFrames e GraphFrames do Spark, além de gravar dados de DataFrame em tabelas do Spanner.

Custos

Neste documento, você vai usar os seguintes componentes faturáveis do Google Cloud:

  • Managed Service for Apache Spark
  • Spanner
  • Cloud Storage

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços.

Novos usuários do Google Cloud podem estar qualificados para um teste sem custo financeiro.

Antes de começar

  1. Faça login na sua conta do Google Cloud . Se você começou a usar o Google Cloud, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  8. Conceda os papéis necessários.
  9. Configure um cluster do Serviço Gerenciado para Apache Spark.
  10. Configure uma instância do Spanner com uma tabela de banco de dados "Singers".

Conceder os papéis necessários

Alguns papéis do IAM são necessários para executar os exemplos nesta página. Dependendo das políticas da organização, esses papéis podem já ter sido concedidos. Para verificar as concessões de papéis, consulte Você precisa conceder papéis?.

Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Para garantir que a conta de serviço padrão do Compute Engine tenha as permissões necessárias para criar um cluster do Managed Service for Apache Spark, peça ao administrador para conceder os seguintes papéis do IAM à conta de serviço padrão do Compute Engine no projeto:

Configurar um cluster do Serviço Gerenciado para Apache Spark

Crie um cluster do Serviço Gerenciado para Apache Spark ou use um cluster existente criado com a imagem 2.1 ou mais recente do Serviço Gerenciado para Apache Spark. Se o cluster tiver sido criado com a imagem 2.0 ou anterior, ele precisará ter sido criado com a propriedade scope definida como escopo cloud-platform.

Configurar uma instância do Spanner com uma tabela de banco de dados Singers

Crie uma instância do Spanner com um banco de dados que contenha uma tabela Singers. Anote o ID da instância e do banco de dados do Spanner.

Usar o conector do Spanner com o Spark

O conector do Spanner está disponível para as versões 3.1+ do Spark. Você especifica a versão do conector como parte da especificação do arquivo JAR do conector do Cloud Storage ao enviar um job para um cluster do Serviço gerenciado para Apache Spark.

Exemplo:envio de job do Spark da CLI gcloud com o conector do Spanner.

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

Substitua:

CONNECTOR_VERSION: versão do conector do Spanner. Escolha a versão do conector do Spanner na lista de versões do repositório GoogleCloudDataproc/spark-spanner-connector do GitHub.

Ler tabelas do Spanner

É possível usar Python ou Scala para ler dados de tabelas do Spanner em um DataFrame do Spark usando a API de fonte de dados do Spark.

PySpark

Você pode executar o exemplo de código PySpark nesta seção no cluster enviando o job para o Serviço Gerenciado para Apache Spark ou executando o job no REPL spark-submit no nó mestre do cluster.

Job do Serviço Gerenciado para Apache Spark

  1. Crie um arquivo singers.py usando um editor de texto local ou no Cloud Shell com o editor de texto pré-instalado vi, vim ou nano.
    1. Depois de preencher as variáveis de marcador de posição, cole o código a seguir no arquivo singers.py. O recurso Data Boost do Spanner está ativado, o que tem impacto quase zero na instância principal do Spanner.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Substitua:

      1. PROJECT_ID: o ID do projeto do Google Cloud . Os IDs do projeto estão listados na seção Informações do projeto no painel do console do Google Cloud .
      2. INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulte Configurar uma instância do Spanner com a tabela de banco de dados Singers.
    2. Salve o arquivo singers.py.
  2. Envie o job para o Serviço Gerenciado para Apache Spark usando o console Google Cloud , a CLI gcloud ou a API REST.

    Exemplo:envio de jobs da CLI gcloud com o conector do Spanner.

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
          

    Substitua:

    1. CLUSTER_NAME: o nome do novo cluster.
    2. REGION: uma região do Compute Engine disponível para executar a carga de trabalho.
    3. CONNECTOR_VERSION: versão do conector do Spanner. Escolha a versão do conector do Spanner na lista de versões do repositório GoogleCloudDataproc/spark-spanner-connector do GitHub.

Job spark-submit

  1. Conecte-se ao nó mestre do cluster do Serviço Gerenciado para Apache Spark usando SSH.
    1. Acesse a página Clusters do Serviço Gerenciado para Apache Spark no console Google Cloud e clique no nome do cluster.
    2. Na página Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em SSH à direita do nome do nó mestre do cluster.
      Captura de tela da página de detalhes do cluster do Dataproc no console Google Cloud , mostrando o botão SSH usado para se conectar ao nó mestre do cluster.

      Uma janela de navegador é aberta no diretório inicial do nó mestre.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Crie um arquivo singers.py no nó mestre usando o editor de texto vi, vim ou nano pré-instalado.
    1. Cole o código a seguir no arquivo singers.py depois de preencher as variáveis de marcador de posição no arquivo singers.py. O recurso Data Boost do Spanner está ativado, o que tem impacto quase zero na instância principal do Spanner.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Substitua:

      1. PROJECT_ID: o ID do projeto do Google Cloud . Os IDs do projeto estão listados na seção Informações do projeto no painel do console do Google Cloud .
      2. INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulte Configurar uma instância do Spanner com a tabela de banco de dados Singers.
    2. Salve o arquivo singers.py.
  3. Execute singers.py com spark-submit para criar a tabela Singers do Spanner.
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    Substitua:

    1. CONNECTOR_VERSION: versão do conector do Spanner. Escolha a versão do conector do Spanner na lista de versões do repositório GoogleCloudDataproc/spark-spanner-connector do GitHub.

    A resposta é:

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Scala

Para executar o exemplo de código Scala no cluster, siga estas etapas:

  1. Conecte-se ao nó mestre do cluster do Serviço Gerenciado para Apache Spark usando SSH.
    1. Acesse a página Clusters do Serviço Gerenciado para Apache Spark no console Google Cloud e clique no nome do cluster.
    2. Na página Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em SSH à direita do nome do nó mestre do cluster. Página de detalhes do cluster do Dataproc no console do Google Cloud .

      Uma janela de navegador é aberta no diretório inicial do nó mestre.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Crie um arquivo singers.scala no nó mestre usando o editor de texto pré-instalado vi, vim ou nano.
    1. Cole o código a seguir no arquivo singers.scala. O recurso Data Boost do Spanner está ativado, o que tem impacto quase zero na instância principal do Spanner.
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      Substitua:

      1. PROJECT_ID: o ID do projeto do Google Cloud . Os IDs do projeto estão listados na seção Informações do projeto no painel do console do Google Cloud .
      2. INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulte Configurar uma instância do Spanner com a tabela de banco de dados Singers.
    2. Salve o arquivo singers.scala.
  3. Inicie o REPL spark-shell.
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    Substitua:

    CONNECTOR_VERSION: versão do conector do Spanner. Escolha a versão do conector do Spanner na lista de versões do repositório GoogleCloudDataproc/spark-spanner-connector do GitHub.

  4. Execute singers.scala com o comando :load singers.scala para criar a tabela Singers do Spanner. A listagem de saída mostra exemplos da saída "Singers".
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

Ler gráficos do Spanner

O conector do Spanner permite exportar o gráfico para DataFrames de nós e arestas separados, além de exportar diretamente para GraphFrames.

O exemplo a seguir exporta um Spanner para um GraphFrame. Ele usa a classe SpannerGraphConnector do Python, incluída no jar do conector do Spanner, para ler o Spanner Graph.

from pyspark.sql import SparkSession

connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar"

spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example")
         .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12")
         .config("spark.jars", connector_jar)
         .getOrCreate())
spark.sparkContext.addPyFile(connector_jar)

from spannergraph import SpannerGraphConnector

connector = (SpannerGraphConnector()
             .spark(spark)
             .project("PROJECT_ID")
             .instance("INSTANCE_ID")
             .database("DATABASE_ID")
             .graph("GRAPH_ID"))

g = connector.load_graph()
g.vertices.show()
g.edges.show()

Substitua:

  • CONNECTOR_VERSION: versão do conector do Spanner. Escolha a versão do conector do Spanner na lista de versões do repositório GoogleCloudDataproc/spark-spanner-connector do GitHub.
  • PROJECT_ID: o ID do projeto do Google Cloud . Os IDs do projeto estão listados na seção Informações do projeto no painel do console Google Cloud .
  • INSTANCE_ID, DATABASE_ID e TABLE_NAME Insira os IDs da instância, do banco de dados e do gráfico.

Para exportar DataFrames de nós e arestas em vez de GraphFrames, use load_dfs:

df_vertices, df_edges, df_id_map = connector.load_dfs()

Gravar tabelas do Spanner

O conector do Spanner permite gravar um DataFrame do Spark em uma tabela do Spanner usando a API de fonte de dados do Spark.

Exemplo de gravação de DataFrame em uma tabela do Spanner

Preencha as variáveis antes de salvar e executar o código.

"""Spanner PySpark write example."""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Spanner Write App').getOrCreate()

columns = ['id', 'name', 'email']
data = [(1, 'John Doe', 'john.doe@example.com'), (2, 'Jane Doe', 'jane.doe@example.com')]
df = spark.createDataFrame(data, columns)

df.write.format('cloud-spanner') \
    .option("projectId", "PROJECT_ID")
    .option("instanceId", "INSTANCE_ID")
    .option("databaseId", "DATABASE_ID")
    .option("table", "TABLE_NAME")
    .mode("append") \
    .save()

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto Google Cloud . Os IDs de projeto estão listados na seção Informações do projeto do painel do console Google Cloud .
  • INSTANCE_ID, DATABASE_ID e TABLE_NAME. Insira os IDs da instância, do banco de dados e da tabela.

Limpar

Para evitar cobranças contínuas na sua conta do Google Cloud , você pode parar ou excluir seu cluster do Serviço Gerenciado para Apache Spark e excluir sua instância do Spanner.

A seguir