Usa el conector de Spark Spanner

En esta página, se muestra cómo crear un clúster de Managed Service para Apache Spark que usa el conector de Spanner para Spark para leer y escribir datos en Spanner con Apache Spark.

El conector de Spanner funciona con Spark para leer datos de y escribir datos en la base de datos de Spanner con la biblioteca de Java de Spanner. El conector de Spanner admite la lectura de tablas y gráficos de Spanner en DataFrames DataFrames y GraphFrames de Spark, y la escritura de datos de DataFrame en tablas de Spanner.

Costos

En este documento, se usan los siguientes componentes facturables de Google Cloud:

  • Managed Service for Apache Spark
  • Spanner
  • Cloud Storage

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios.

Es posible que los usuarios Google Cloud nuevos decumplan con los requisitos para acceder a una prueba gratuita.

Antes de comenzar

  1. Accede a tu Google Cloud cuenta de. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  8. Otorga los roles necesarios.
  9. Configura un clúster de Managed Service para Apache Spark.
  10. Configura una instancia de Spanner con una tabla de base de datos de Singers.

Otorga roles necesarios

Se requieren ciertos roles de IAM para ejecutar los ejemplos de esta página. Según las políticas de la organización, es posible que ya se hayan otorgado estos roles. Para verificar las concesiones de roles, consulta ¿Necesitas otorgar roles?.

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Para garantizar que la cuenta de servicio predeterminada de Compute Engine tenga los permisos necesarios para crear un clúster de Managed Service para Apache Spark, pídele a tu administrador que otorgue los siguientes roles de IAM a la cuenta de servicio predeterminada de Compute Engine en el proyecto:

Configura un clúster de Managed Service para Apache Spark

Crea un clúster de Managed Service para Apache Spark o usa uno existente que se haya creado con la 2.1 o posterior imagen de Managed Service para Apache Spark. Si el clúster se creó con la 2.0 o anterior imagen, debe haberse creado con la scope propiedad establecida en el cloud-platform alcance.

Configura una instancia de Spanner con una tabla de base de datos de Singers

Crea una instancia de Spanner con una base de datos que contenga una tabla Singers. Toma nota del ID de la instancia de Spanner y el ID de la base de datos.

Usa el conector de Spanner con Spark

El conector de Spanner está disponible para las versiones 3.1+ de Spark. Especificas la versión del conector como parte de la especificación del archivo JAR del conector de Cloud Storage cuando envías un trabajo a un clúster de Managed Service para Apache Spark.

Ejemplo: Envío de trabajos de Spark de gcloud CLI con el conector de Spanner.

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

Reemplaza lo siguiente:

CONNECTOR_VERSION: Versión del conector de Spanner. Elige la versión del conector de Spanner de la lista de versiones en el repositorio de GitHub GoogleCloudDataproc/spark-spanner-connector.

Lee tablas de Spanner

Puedes usar Python o Scala para leer datos de la tabla de Spanner en un DataFrame de Spark con la API de fuente de datos de Spark.

PySpark

Puedes ejecutar el código de PySpark de ejemplo en esta sección en tu clúster. Para ello, envía el trabajo al servicio de Managed Service para Apache Spark o ejecuta el trabajo desde el REPL de spark-submit en la instancia principal del clúster.

Trabajo de Managed Service para Apache Spark

  1. Crea un archivo singers.py con un editor de texto local o en Cloud Shell con el editor de texto vi, vim o nano preinstalado.
    1. Después de propagar las variables de marcador de posición, pega el siguiente código en el archivo singers.py. Ten en cuenta que la función Data Boost de Spanner está habilitada, lo que tiene un impacto casi nulo en la instancia principal de Spanner.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Reemplaza lo siguiente:

      1. PROJECT_ID: Es el ID del Google Cloud proyecto. Los IDs del proyecto se enumeran en la sección Información del proyecto en el Google Cloud panel de la consola.
      2. INSTANCE_ID, DATABASE_ID y TABLE_NAME : Consulta Configura una instancia de Spanner con una tabla de base de datos de Singers.
    2. Guarda el archivo singers.py.
  2. Envía el trabajo a Managed Service para Apache Spark con la Google Cloud consola, gcloud CLI o la API de REST.

    Ejemplo: Envío de trabajos de la gcloud CLI con el conector de Spanner.

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
          

    Reemplaza lo siguiente:

    1. CLUSTER_NAME: El nombre del clúster nuevo.
    2. REGION: Una región de Compute Engine disponible para ejecutar la carga de trabajo.
    3. CONNECTOR_VERSION: Versión del conector de Spanner. Elige la versión del conector de Spanner de la lista de versiones en el repositorio de GitHub GoogleCloudDataproc/spark-spanner-connector.

Trabajo de spark-submit

  1. Conéctate a la instancia principal del clúster de Managed Service para Apache Spark con SSH.
    1. Ve a la página Clústeres de Managed Service para Apache Spark en la Google Cloud consola de y, luego, haz clic en el nombre de tu clúster.
    2. En la página Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en SSH a la derecha del nombre de la instancia principal del clúster.
      Captura de pantalla de la página de detalles del clúster de Dataproc en la consola Google Cloud , en la que se muestra el botón SSH que se usa para conectarse a la instancia principal del clúster.

      Se abrirá una ventana del navegador en tu directorio principal del nodo.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Crea un archivo singers.py en el nodo principal con el editor de texto vi, vim o nano preinstalado.
    1. Pega el siguiente código en el archivo singers.py después de propagar las variables de marcador de posición en el archivo singers.py. Ten en cuenta que la función Data Boost de Spanner está habilitada, lo que tiene un impacto casi nulo en la instancia principal de Spanner.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Reemplaza lo siguiente:

      1. PROJECT_ID: Es el ID del Google Cloud proyecto. Los IDs del proyecto se enumeran en la sección Información del proyecto en el Google Cloud panel de la consola.
      2. INSTANCE_ID, DATABASE_ID y TABLE_NAME : Consulta Configura una instancia de Spanner con una tabla de base de datos de Singers.
    2. Guarda el archivo singers.py.
  3. Ejecuta singers.py con spark-submit para crear la tabla Singers de Spanner.
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    Reemplaza lo siguiente:

    1. CONNECTOR_VERSION: Versión del conector de Spanner. Elige la versión del conector de Spanner de la lista de versiones en el repositorio de GitHub GoogleCloudDataproc/spark-spanner-connector.

    El resultado es el siguiente:

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Scala

Para ejecutar el código de Scala de ejemplo en tu clúster, completa los siguientes pasos:

  1. Conéctate a la instancia principal del clúster de Managed Service para Apache Spark con SSH.
    1. Ve a la página Clústeres de Managed Service para Apache Spark en la Google Cloud consola de y, luego, haz clic en el nombre de tu clúster.
    2. En la página Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en SSH a la derecha del nombre de la instancia principal del clúster. Página de detalles del clúster de Dataproc en la consola de Google Cloud .

      Se abrirá una ventana del navegador en tu directorio principal del nodo.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Crea un archivo singers.scala en el nodo principal con el editor de texto vi, vim o nano preinstalado.
    1. Pega el siguiente código en el archivo singers.scala. Ten en cuenta que la función Data Boost de Spanner está habilitada, lo que tiene un impacto casi nulo en la instancia principal de Spanner.
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      Reemplaza lo siguiente:

      1. PROJECT_ID: Es el ID del Google Cloud proyecto. Los IDs del proyecto se enumeran en la sección Información del proyecto en el Google Cloud panel de la consola.
      2. INSTANCE_ID, DATABASE_ID y TABLE_NAME : Consulta Configura una instancia de Spanner con una tabla de base de datos de Singers.
    2. Guarda el archivo singers.scala.
  3. Inicia el REPL de spark-shell.
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    Reemplaza lo siguiente:

    CONNECTOR_VERSION: Versión del conector de Spanner. Elige la versión del conector de Spanner de la lista de versiones en el repositorio de GitHub GoogleCloudDataproc/spark-spanner-connector.

  4. Ejecuta singers.scala con el comando :load singers.scala para crear la tabla Singers de Spanner. La lista de salida muestra ejemplosde la salida de Singers.
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

Lee gráficos de Spanner

El conector de Spanner admite la exportación del gráfico a nodos separados y bordes DataFrames , así como la exportación directa a GraphFrames.

En el siguiente ejemplo, se exporta un Spanner a un GraphFrame. Usa la clase SpannerGraphConnector de Python, incluida en el jar del conector de Spanner, para leer el gráfico de Spanner.

from pyspark.sql import SparkSession

connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar"

spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example")
         .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12")
         .config("spark.jars", connector_jar)
         .getOrCreate())
spark.sparkContext.addPyFile(connector_jar)

from spannergraph import SpannerGraphConnector

connector = (SpannerGraphConnector()
             .spark(spark)
             .project("PROJECT_ID")
             .instance("INSTANCE_ID")
             .database("DATABASE_ID")
             .graph("GRAPH_ID"))

g = connector.load_graph()
g.vertices.show()
g.edges.show()

Reemplaza lo siguiente:

  • CONNECTOR_VERSION: Versión del conector de Spanner. Elige la versión del conector de Spanner de la lista de versiones en el repositorio de GitHub GoogleCloudDataproc/spark-spanner-connector.
  • PROJECT_ID: Es el ID del Google Cloud proyecto. Los IDs del proyecto se enumeran en la sección Información del proyecto en el Google Cloud panel de la consola.
  • INSTANCE_ID, DATABASE_ID y TABLE_NAME: Inserta los IDs de la instancia, la base de datos y el gráfico.

Para exportar nodos y bordes DataFrames en lugar de GraphFrames, usa load_dfs en su lugar:

df_vertices, df_edges, df_id_map = connector.load_dfs()

Escribe tablas de Spanner

El conector de Spanner admite la escritura de un DataFrame de Spark en una tabla de Spanner con la API de fuente de datos de Spark.

Ejemplo de escritura de DataFrame en una tabla de Spanner

Propaga las variables antes de guardar y ejecutar el código.

"""Spanner PySpark write example."""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Spanner Write App').getOrCreate()

columns = ['id', 'name', 'email']
data = [(1, 'John Doe', 'john.doe@example.com'), (2, 'Jane Doe', 'jane.doe@example.com')]
df = spark.createDataFrame(data, columns)

df.write.format('cloud-spanner') \
    .option("projectId", "PROJECT_ID")
    .option("instanceId", "INSTANCE_ID")
    .option("databaseId", "DATABASE_ID")
    .option("table", "TABLE_NAME")
    .mode("append") \
    .save()

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID del proyecto de Google Cloud . Los IDs del proyecto se enumeran en la sección Información del proyecto en el Google Cloud panel de la consola.
  • INSTANCE_ID, DATABASE_ID y TABLE_NAME: Inserta los IDs de la instancia, la base de datos y la tabla.

Limpia

Para evitar cargos continuos en tu Google Cloud cuenta, puedes detener o borrar tu clúster de Managed Service para Apache Spark y borrar tu instancia de Spanner.

¿Qué sigue?