Utilizzare il connettore Spark Spanner

Questa pagina mostra come creare un cluster Managed Service for Apache Spark che utilizza il connettore Spark Spanner per leggere e scrivere dati in Spanner utilizzando Apache Spark.

Il connettore Spanner funziona con Spark per leggere dati da e scrivere dati nel database Spanner utilizzando la libreria Java Spanner. Il connettore Spanner supporta la lettura di tabelle e grafici Spanner in DataFrame DataFrames e GraphFrame GraphFrames Spark, e la scrittura di dati DataFrame nelle tabelle Spanner.

Costi

In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:

  • Managed Service for Apache Spark
  • Spanner
  • Cloud Storage

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il calcolatore prezzi.

I nuovi Google Cloud utenti potrebbero avere diritto a una prova senza costi.

Prima di iniziare

  1. Accedi al tuo Google Cloud account. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti senza costi per l'esecuzione, il test e il deployment dei workload.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  8. Concedi i ruoli richiesti.
  9. Configura un cluster Managed Service for Apache Spark.
  10. Configura un'istanza Spanner con una tabella di database Singers.

Concedi i ruoli richiesti

Per eseguire gli esempi in questa pagina sono necessari alcuni ruoli IAM. A seconda delle policy dell'organizzazione, questi ruoli potrebbero essere già stati concessi. Per verificare le concessioni dei ruoli, consulta la sezione Devi concedere i ruoli?.

Per ulteriori informazioni sulla concessione dei ruoli, consulta Gestire l'accesso a progetti, cartelle e organizzazioni.

Per assicurarti che il account di servizio predefinito di Compute Engine disponga delle autorizzazioni necessarie per creare un cluster Managed Service for Apache Spark, chiedi all'amministratore di concedere i seguenti ruoli IAM al account di servizio predefinito di Compute Engine sul progetto:

Configura un cluster Managed Service for Apache Spark

Crea un cluster Managed Service for Apache Spark o utilizza un cluster Managed Service for Apache Spark esistente creato con l'immagine Managed Service for Apache Spark 2.1 o versioni successive oppure, se il cluster è stato creato con l'immagine 2.0 o versioni precedenti, deve essere stato creato con la proprietà scope impostata sull'ambito cloud-platform scope.

Configura un'istanza Spanner con una tabella di database Singers

Crea un'istanza Spanner con un database contenente una tabella Singers. Prendi nota dell'ID istanza Spanner e dell'ID database.

Utilizza il connettore Spanner con Spark

Il connettore Spanner è disponibile per le versioni di Spark 3.1+. Specifichi la versione del connettore come parte della specifica del file JAR del connettore Cloud Storage quando invii un job a un cluster Managed Service for Apache Spark.

Esempio: invio di un job Spark con gcloud CLI con il connettore Spanner.

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

Sostituisci quanto segue:

CONNECTOR_VERSION: versione del connettore Spanner. Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub GoogleCloudDataproc/spark-spanner-connector.

Leggi le tabelle Spanner

Puoi utilizzare Python o Scala per leggere i dati della tabella Spanner in un DataFrame Spark utilizzando l' API dell'origine dati Spark.

PySpark

Puoi eseguire il codice PySpark di esempio in questa sezione sul cluster inviando il job al servizio Managed Service for Apache Spark o eseguendo il job dal REPL spark-submit sul nodo master del cluster.

Job Managed Service for Apache Spark

  1. Crea un file singers.py utilizzando un editor di testo locale o in Cloud Shell utilizzando l'editor di testo vi, vim, o nano preinstallato.
    1. Dopo aver compilato le variabili segnaposto, incolla il seguente codice nel file singers.py. Tieni presente che la funzionalità Spanner Data Boost è abilitata, il che ha un impatto quasi nullo sull'istanza Spanner principale.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Sostituisci quanto segue:

      1. PROJECT_ID: l' Google Cloud ID progetto. Gli ID progetto sono elencati nella sezione Informazioni sul progetto su la Google Cloud console Dashboard.
      2. INSTANCE_ID, DATABASE_ID, e TABLE_NAME : consulta Configura un'istanza Spanner con la tabella di database Singers.
    2. Salva il file singers.py.
  2. Invia il job a Managed Service for Apache Spark utilizzando la Google Cloud console, gcloud CLI o l'API REST.

    Esempio: invio di un job con gcloud CLI con il connettore Spanner.

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
          

    Sostituisci quanto segue:

    1. CLUSTER_NAME: il nome del nuovo cluster.
    2. REGION: una regione Compute Engine disponibile per l'esecuzione del workload.
    3. CONNECTOR_VERSION: versione del connettore Spanner. Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub GoogleCloudDataproc/spark-spanner-connector.

Job spark-submit

  1. Connettiti al nodo master del cluster Managed Service for Apache Spark utilizzando SSH.
    1. Vai alla pagina Cluster di Managed Service for Apache Spark nella Google Cloud console, quindi fai clic sul nome del cluster.
    2. Nella pagina Dettagli cluster, seleziona la scheda Istanze VM. Quindi fai clic SSH a destra del nome del nodo master del cluster.
      Screenshot della pagina dei dettagli del cluster Dataproc nella console Google Cloud , che mostra il pulsante SSH utilizzato per connettersi al nodo master del cluster.

      Si apre una finestra del browser nella directory home del nodo master.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Crea un file singers.py sul nodo master utilizzando l'editor di testo vi, vim o nano preinstallato.
    1. Incolla il seguente codice nel file singers.py dopo aver compilato le variabili segnaposto nel file singers.py. Tieni presente che la funzionalità Spanner Data Boost è abilitata, il che ha un impatto quasi nullo sull'istanza Spanner principale.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Sostituisci quanto segue:

      1. PROJECT_ID: l' Google Cloud ID progetto. Gli ID progetto sono elencati nella sezione Informazioni sul progetto su la Google Cloud console Dashboard.
      2. INSTANCE_ID, DATABASE_ID, e TABLE_NAME : consulta Configura un'istanza Spanner con la tabella di database Singers.
    2. Salva il file singers.py.
  3. Esegui singers.py con spark-submit per creare la tabella Singers di Spanner.
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    Sostituisci quanto segue:

    1. CONNECTOR_VERSION: versione del connettore Spanner. Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub GoogleCloudDataproc/spark-spanner-connector.

    L'output è:

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Scala

Per eseguire il codice Scala di esempio sul cluster:

  1. Connettiti al nodo master del cluster Managed Service for Apache Spark utilizzando SSH.
    1. Vai alla pagina Cluster di Managed Service for Apache Spark nella Google Cloud console, quindi fai clic sul nome del cluster.
    2. Nella pagina Dettagli cluster, seleziona la scheda Istanze VM. Quindi fai clic SSH a destra del nome del nodo master del cluster. Pagina dei dettagli del cluster Dataproc nella console Google Cloud .

      Si apre una finestra del browser nella directory home del nodo master.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Crea un file singers.scala sul nodo master utilizzando l'editor di testo vi, vim o nano preinstallato.
    1. Incolla il seguente codice nel file singers.scala. Tieni presente che la funzionalità Spanner Data Boost è abilitata, il che ha un impatto quasi nullo sull'istanza Spanner principale.
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      Sostituisci quanto segue:

      1. PROJECT_ID: l' Google Cloud ID progetto. Gli ID progetto sono elencati nella sezione Informazioni sul progetto su la Google Cloud console Dashboard.
      2. INSTANCE_ID, DATABASE_ID, e TABLE_NAME : consulta Configura un'istanza Spanner con la tabella di database Singers.
    2. Salva il file singers.scala.
  3. Avvia il REPL spark-shell.
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    Sostituisci quanto segue:

    CONNECTOR_VERSION: versione del connettore Spanner. Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub GoogleCloudDataproc/spark-spanner-connector.

  4. Esegui singers.scala con il comando :load singers.scala per creare la tabella Singers di Spanner. L'elenco di output mostra esempi dall'output di Singers.
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

Leggi i grafici Spanner

Il connettore Spanner supporta l'esportazione del grafico in DataFrame di nodi e archi DataFrames separati, nonché l'esportazione in GraphFrames direttamente.

L'esempio seguente esporta uno Spanner in un GraphFrame. Utilizza la classe Python SpannerGraphConnector, inclusa nel file JAR del connettore Spanner, per leggere lo Spanner Graph.

from pyspark.sql import SparkSession

connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar"

spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example")
         .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12")
         .config("spark.jars", connector_jar)
         .getOrCreate())
spark.sparkContext.addPyFile(connector_jar)

from spannergraph import SpannerGraphConnector

connector = (SpannerGraphConnector()
             .spark(spark)
             .project("PROJECT_ID")
             .instance("INSTANCE_ID")
             .database("DATABASE_ID")
             .graph("GRAPH_ID"))

g = connector.load_graph()
g.vertices.show()
g.edges.show()

Sostituisci quanto segue:

  • CONNECTOR_VERSION: versione del connettore Spanner. Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub GoogleCloudDataproc/spark-spanner-connector.
  • PROJECT_ID: l' Google Cloud ID progetto. Gli ID progetto sono elencati nella sezione Informazioni sul progetto su la Google Cloud dashboard della console.
  • INSTANCE_ID, DATABASE_ID e TABLE_NAME Inserisci gli ID istanza, database e grafico.

Per esportare nodi e archi DataFrames anziché GraphFrame, utilizza load_dfs invece:

df_vertices, df_edges, df_id_map = connector.load_dfs()

Scrivi le tabelle Spanner

Il connettore Spanner supporta la scrittura di un DataFrame Spark in una tabella Spanner utilizzando l' API dell'origine dati Spark.

Esempio di scrittura di DataFrame nella tabella Spanner

Compila le variabili prima di salvare ed eseguire il codice.

"""Spanner PySpark write example."""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Spanner Write App').getOrCreate()

columns = ['id', 'name', 'email']
data = [(1, 'John Doe', 'john.doe@example.com'), (2, 'Jane Doe', 'jane.doe@example.com')]
df = spark.createDataFrame(data, columns)

df.write.format('cloud-spanner') \
    .option("projectId", "PROJECT_ID")
    .option("instanceId", "INSTANCE_ID")
    .option("databaseId", "DATABASE_ID")
    .option("table", "TABLE_NAME")
    .mode("append") \
    .save()

Sostituisci quanto segue.

  • PROJECT_ID: l' Google Cloud ID progetto. Gli ID progetto sono elencati nella sezione Informazioni sul progetto su la Google Cloud dashboard della console.
  • INSTANCE_ID, DATABASE_ID, e TABLE_NAME Inserisci gli ID istanza, database e tabella.

Libera spazio

Per evitare addebiti continui sul tuo Google Cloud account, puoi arrestare o eliminare il cluster Managed Service for Apache Spark ed eliminare l'istanza Spanner.

Passaggi successivi