Questa pagina mostra come creare un cluster Managed Service for Apache Spark che utilizza il connettore Spark Spanner per leggere e scrivere dati in Spanner utilizzando Apache Spark.
Il connettore Spanner funziona con Spark per leggere dati da e scrivere dati nel database Spanner utilizzando la libreria Java Spanner. Il connettore Spanner supporta la lettura di tabelle e grafici Spanner in DataFrame DataFrames e GraphFrame GraphFrames Spark, e la scrittura di dati DataFrame nelle tabelle Spanner.
Costi
In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:
- Managed Service for Apache Spark
- Spanner
- Cloud Storage
Per generare una stima dei costi in base all'utilizzo previsto,
utilizza il calcolatore prezzi.
Prima di iniziare
- Accedi al tuo Google Cloud account. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti senza costi per l'esecuzione, il test e il deployment dei workload.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- Concedi i ruoli richiesti.
- Configura un cluster Managed Service for Apache Spark.
- Configura un'istanza Spanner con una tabella di database Singers.
Concedi i ruoli richiesti
Per eseguire gli esempi in questa pagina sono necessari alcuni ruoli IAM. A seconda delle policy dell'organizzazione, questi ruoli potrebbero essere già stati concessi. Per verificare le concessioni dei ruoli, consulta la sezione Devi concedere i ruoli?.
Per ulteriori informazioni sulla concessione dei ruoli, consulta Gestire l'accesso a progetti, cartelle e organizzazioni.
Per assicurarti che il account di servizio predefinito di Compute Engine disponga delle autorizzazioni necessarie per creare un cluster Managed Service for Apache Spark, chiedi all'amministratore di concedere i seguenti ruoli IAM al account di servizio predefinito di Compute Engine sul progetto:
-
Dataproc Worker (
roles/dataproc.worker) -
Cloud Spanner Database User (
roles/spanner.databaseUser) -
Cloud Spanner Database Reader with DataBoost (
roles/spanner.databaseReaderWithDataBoost)
Configura un cluster Managed Service for Apache Spark
Crea un cluster Managed Service for Apache Spark
o utilizza un cluster Managed Service for Apache Spark esistente creato con l'immagine Managed Service for Apache Spark
2.1 o versioni successive oppure, se il
cluster è stato creato con l'immagine 2.0 o versioni precedenti, deve essere stato creato con
la proprietà scope impostata sull'ambito
cloud-platform scope.
Configura un'istanza Spanner con una tabella di database Singers
Crea un'istanza Spanner
con un database contenente una tabella Singers. Prendi nota dell'ID istanza Spanner e dell'ID database.
Utilizza il connettore Spanner con Spark
Il connettore Spanner è disponibile per le versioni di Spark 3.1+. Specifichi la
versione del
connettore
come parte della specifica del file JAR del connettore Cloud Storage quando
invii un job a un
cluster Managed Service for Apache Spark.
Esempio: invio di un job Spark con gcloud CLI con il connettore Spanner.
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \ ... [other job submission flags]
Sostituisci quanto segue:
CONNECTOR_VERSION: versione del connettore Spanner.
Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub
GoogleCloudDataproc/spark-spanner-connector.
Leggi le tabelle Spanner
Puoi utilizzare Python o Scala per leggere i dati della tabella Spanner in un DataFrame Spark utilizzando l' API dell'origine dati Spark.
PySpark
Puoi eseguire il codice PySpark di esempio in questa sezione sul cluster inviando il job al servizio Managed Service for Apache Spark o eseguendo il job dal REPL spark-submit sul nodo master del cluster.
Job Managed Service for Apache Spark
- Crea un file
singers.pyutilizzando un editor di testo locale o in Cloud Shell utilizzando l'editor di testovi,vim, onanopreinstallato. - Dopo aver compilato le variabili segnaposto, incolla il seguente codice
nel file
singers.py. Tieni presente che la funzionalità Spanner Data Boost è abilitata, il che ha un impatto quasi nullo sull'istanza Spanner principale.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Sostituisci quanto segue:
- PROJECT_ID: l' Google Cloud ID progetto. Gli ID progetto sono elencati nella sezione Informazioni sul progetto su la Google Cloud console Dashboard.
- INSTANCE_ID, DATABASE_ID, e TABLE_NAME : consulta
Configura un'istanza Spanner con la tabella di database
Singers.
- Salva il file
singers.py. - Invia il job
a Managed Service for Apache Spark utilizzando la Google Cloud console, gcloud CLI o
l'API REST.
Esempio: invio di un job con gcloud CLI con il connettore Spanner.
gcloud dataproc jobs submit pyspark singers.py \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jarSostituisci quanto segue:
- CLUSTER_NAME: il nome del nuovo cluster.
- REGION: una regione Compute Engine disponibile per l'esecuzione del workload.
- CONNECTOR_VERSION: versione del connettore Spanner.
Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub
GoogleCloudDataproc/spark-spanner-connector.
Job spark-submit
- Connettiti al nodo master del cluster Managed Service for Apache Spark utilizzando SSH.
- Vai alla pagina Cluster di Managed Service for Apache Spark nella Google Cloud console, quindi fai clic sul nome del cluster.
- Nella pagina Dettagli cluster, seleziona la scheda Istanze VM. Quindi fai clic
SSHa destra del nome del nodo master del cluster.
Si apre una finestra del browser nella directory home del nodo master.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea un file
singers.pysul nodo master utilizzando l'editor di testovi,vimonanopreinstallato.- Incolla il seguente codice nel file
singers.pydopo aver compilato le variabili segnaposto nel filesingers.py. Tieni presente che la funzionalità Spanner Data Boost è abilitata, il che ha un impatto quasi nullo sull'istanza Spanner principale.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Sostituisci quanto segue:
- PROJECT_ID: l' Google Cloud ID progetto. Gli ID progetto sono elencati nella sezione Informazioni sul progetto su la Google Cloud console Dashboard.
- INSTANCE_ID, DATABASE_ID, e TABLE_NAME : consulta
Configura un'istanza Spanner con la tabella di database
Singers.
- Salva il file
singers.py.
- Incolla il seguente codice nel file
- Esegui
singers.pyconspark-submitper creare la tabellaSingersdi Spanner.spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
Sostituisci quanto segue:
- CONNECTOR_VERSION: versione del connettore Spanner.
Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub
GoogleCloudDataproc/spark-spanner-connector.
L'output è:
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION: versione del connettore Spanner.
Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub
Scala
Per eseguire il codice Scala di esempio sul cluster:
- Connettiti al nodo master del cluster Managed Service for Apache Spark utilizzando SSH.
- Vai alla pagina Cluster di Managed Service for Apache Spark nella Google Cloud console, quindi fai clic sul nome del cluster.
- Nella pagina Dettagli cluster, seleziona la scheda Istanze VM. Quindi fai clic
SSHa destra del nome del nodo master del cluster.
Si apre una finestra del browser nella directory home del nodo master.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea un file
singers.scalasul nodo master utilizzando l'editor di testovi,vimonanopreinstallato.- Incolla il seguente codice nel file
singers.scala. Tieni presente che la funzionalità Spanner Data Boost è abilitata, il che ha un impatto quasi nullo sull'istanza Spanner principale.object singers { def main(): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .option("enableDataBoost", true) .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
Sostituisci quanto segue:
- PROJECT_ID: l' Google Cloud ID progetto. Gli ID progetto sono elencati nella sezione Informazioni sul progetto su la Google Cloud console Dashboard.
- INSTANCE_ID, DATABASE_ID, e TABLE_NAME : consulta
Configura un'istanza Spanner con la tabella di database
Singers.
- Salva il file
singers.scala.
- Incolla il seguente codice nel file
- Avvia il REPL
spark-shell.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Sostituisci quanto segue:
CONNECTOR_VERSION: versione del connettore Spanner. Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub
GoogleCloudDataproc/spark-spanner-connector. - Esegui
singers.scalacon il comando:load singers.scalaper creare la tabellaSingersdi Spanner. L'elenco di output mostra esempi dall'output di Singers.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
Leggi i grafici Spanner
Il connettore Spanner supporta l'esportazione del grafico in
DataFrame di nodi e archi
DataFrames
separati, nonché l'esportazione in
GraphFrames
direttamente.
L'esempio seguente esporta uno Spanner in un GraphFrame. Utilizza la classe Python SpannerGraphConnector, inclusa nel
file JAR del connettore Spanner, per leggere lo
Spanner Graph.
from pyspark.sql import SparkSession connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar" spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example") .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12") .config("spark.jars", connector_jar) .getOrCreate()) spark.sparkContext.addPyFile(connector_jar) from spannergraph import SpannerGraphConnector connector = (SpannerGraphConnector() .spark(spark) .project("PROJECT_ID") .instance("INSTANCE_ID") .database("DATABASE_ID") .graph("GRAPH_ID")) g = connector.load_graph() g.vertices.show() g.edges.show()
Sostituisci quanto segue:
- CONNECTOR_VERSION: versione del connettore Spanner.
Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub
GoogleCloudDataproc/spark-spanner-connector. - PROJECT_ID: l' Google Cloud ID progetto. Gli ID progetto sono elencati nella sezione Informazioni sul progetto su la Google Cloud dashboard della console.
- INSTANCE_ID, DATABASE_ID e TABLE_NAME Inserisci gli ID istanza, database e grafico.
Per esportare nodi e archi DataFrames anziché GraphFrame, utilizza load_dfs
invece:
df_vertices, df_edges, df_id_map = connector.load_dfs()
Scrivi le tabelle Spanner
Il connettore Spanner supporta la scrittura di un DataFrame Spark in una tabella Spanner utilizzando l' API dell'origine dati Spark.
Esempio di scrittura di DataFrame nella tabella Spanner
Compila le variabili prima di salvare ed eseguire il codice.
"""Spanner PySpark write example.""" from pyspark.sql import SparkSession spark = SparkSession.builder.appName('Spanner Write App').getOrCreate() columns = ['id', 'name', 'email'] data = [(1, 'John Doe', 'john.doe@example.com'), (2, 'Jane Doe', 'jane.doe@example.com')] df = spark.createDataFrame(data, columns) df.write.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .mode("append") \ .save()
Sostituisci quanto segue.
- PROJECT_ID: l' Google Cloud ID progetto. Gli ID progetto sono elencati nella sezione Informazioni sul progetto su la Google Cloud dashboard della console.
- INSTANCE_ID, DATABASE_ID, e TABLE_NAME Inserisci gli ID istanza, database e tabella.
Libera spazio
Per evitare addebiti continui sul tuo Google Cloud account, puoi arrestare o eliminare il cluster Managed Service for Apache Spark ed eliminare l'istanza Spanner.
Passaggi successivi
- Consulta gli
pyspark.sql.DataFrameesempi. - Per il supporto linguistico di Spark DataFrame, consulta:
- Consulta il repository del connettore Spark Spanner su GitHub.
- Consulta i suggerimenti per l'ottimizzazione del job Spark.