En esta página, se muestra cómo crear un clúster de Managed Service para Apache Spark que usa el conector de Spanner para Spark para leer y escribir datos en Spanner con Apache Spark.
El conector de Spanner funciona con Spark para leer datos de y escribir datos en la base de datos de Spanner con la biblioteca de Java de Spanner. El conector de Spanner admite la lectura de tablas y gráficos de Spanner en DataFrames DataFrames y GraphFrames de Spark, y la escritura de datos de DataFrame en tablas de Spanner.
Costos
En este documento, se usan los siguientes componentes facturables de Google Cloud:
- Managed Service for Apache Spark
- Spanner
- Cloud Storage
Para generar una estimación de costos en función del uso previsto,
usa la calculadora de precios.
Antes de comenzar
- Accede a tu Google Cloud cuenta de. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- Otorga los roles necesarios.
- Configura un clúster de Managed Service para Apache Spark.
- Configura una instancia de Spanner con una tabla de base de datos de Singers.
Otorga roles necesarios
Se requieren ciertos roles de IAM para ejecutar los ejemplos de esta página. Según las políticas de la organización, es posible que ya se hayan otorgado estos roles. Para verificar las concesiones de roles, consulta ¿Necesitas otorgar roles?.
Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.
Para garantizar que la cuenta de servicio predeterminada de Compute Engine tenga los permisos necesarios para crear un clúster de Managed Service para Apache Spark, pídele a tu administrador que otorgue los siguientes roles de IAM a la cuenta de servicio predeterminada de Compute Engine en el proyecto:
-
Trabajador de Dataproc (
roles/dataproc.worker) -
Usuario de base de datos de Cloud Spanner (
roles/spanner.databaseUser) -
Lector de base de datos de Cloud Spanner con DataBoost (
roles/spanner.databaseReaderWithDataBoost)
Configura un clúster de Managed Service para Apache Spark
Crea un clúster de Managed Service para Apache Spark
o usa uno existente que se haya creado con la
2.1 o posterior imagen de Managed Service para Apache Spark. Si el
clúster se creó con la 2.0 o anterior imagen, debe haberse creado con
la scope propiedad establecida en el
cloud-platform alcance.
Configura una instancia de Spanner con una tabla de base de datos de Singers
Crea una instancia de Spanner
con una base de datos que contenga una tabla Singers. Toma nota del ID de la instancia de Spanner y el ID de la base de datos.
Usa el conector de Spanner con Spark
El conector de Spanner está disponible para las versiones 3.1+ de Spark. Especificas la
versión del conector
como parte de la especificación del archivo JAR del conector de Cloud Storage cuando
envías un trabajo a un
clúster de Managed Service para Apache Spark.
Ejemplo: Envío de trabajos de Spark de gcloud CLI con el conector de Spanner.
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \ ... [other job submission flags]
Reemplaza lo siguiente:
CONNECTOR_VERSION: Versión del conector de Spanner.
Elige la versión del conector de Spanner de la lista de versiones en el repositorio de GitHub
GoogleCloudDataproc/spark-spanner-connector.
Lee tablas de Spanner
Puedes usar Python o Scala para leer datos de la tabla de Spanner en un DataFrame de Spark con la API de fuente de datos de Spark.
PySpark
Puedes ejecutar el código de PySpark de ejemplo en esta sección en tu clúster. Para ello, envía el trabajo al servicio de Managed Service para Apache Spark o ejecuta el trabajo desde el REPL de spark-submit en la instancia principal del clúster.
Trabajo de Managed Service para Apache Spark
- Crea un archivo
singers.pycon un editor de texto local o en Cloud Shell con el editor de textovi,vimonanopreinstalado. - Después de propagar las variables de marcador de posición, pega el siguiente código
en el archivo
singers.py. Ten en cuenta que la función Data Boost de Spanner está habilitada, lo que tiene un impacto casi nulo en la instancia principal de Spanner.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Reemplaza lo siguiente:
- PROJECT_ID: Es el ID del Google Cloud proyecto. Los IDs del proyecto se enumeran en la sección Información del proyecto en el Google Cloud panel de la consola.
- INSTANCE_ID, DATABASE_ID y TABLE_NAME : Consulta
Configura una instancia de Spanner con una tabla de base de datos de
Singers.
- Guarda el archivo
singers.py. - Envía el trabajo
a Managed Service para Apache Spark con la Google Cloud consola, gcloud CLI o la
API de REST.
Ejemplo: Envío de trabajos de la gcloud CLI con el conector de Spanner.
gcloud dataproc jobs submit pyspark singers.py \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jarReemplaza lo siguiente:
- CLUSTER_NAME: El nombre del clúster nuevo.
- REGION: Una región de Compute Engine disponible para ejecutar la carga de trabajo.
- CONNECTOR_VERSION: Versión del conector de Spanner.
Elige la versión del conector de Spanner de la lista de versiones en el repositorio de GitHub
GoogleCloudDataproc/spark-spanner-connector.
Trabajo de spark-submit
- Conéctate a la instancia principal del clúster de Managed Service para Apache Spark con SSH.
- Ve a la página Clústeres de Managed Service para Apache Spark en la Google Cloud consola de y, luego, haz clic en el nombre de tu clúster.
- En la página Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en
SSHa la derecha del nombre de la instancia principal del clúster.
Se abrirá una ventana del navegador en tu directorio principal del nodo.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea un archivo
singers.pyen el nodo principal con el editor de textovi,vimonanopreinstalado.- Pega el siguiente código en el archivo
singers.pydespués de propagar las variables de marcador de posición en el archivosingers.py. Ten en cuenta que la función Data Boost de Spanner está habilitada, lo que tiene un impacto casi nulo en la instancia principal de Spanner.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Reemplaza lo siguiente:
- PROJECT_ID: Es el ID del Google Cloud proyecto. Los IDs del proyecto se enumeran en la sección Información del proyecto en el Google Cloud panel de la consola.
- INSTANCE_ID, DATABASE_ID y TABLE_NAME : Consulta
Configura una instancia de Spanner con una tabla de base de datos de
Singers.
- Guarda el archivo
singers.py.
- Pega el siguiente código en el archivo
- Ejecuta
singers.pyconspark-submitpara crear la tablaSingersde Spanner.spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
Reemplaza lo siguiente:
- CONNECTOR_VERSION: Versión del conector de Spanner.
Elige la versión del conector de Spanner de la lista de versiones en el repositorio de GitHub
GoogleCloudDataproc/spark-spanner-connector.
El resultado es el siguiente:
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION: Versión del conector de Spanner.
Elige la versión del conector de Spanner de la lista de versiones en el repositorio de GitHub
Scala
Para ejecutar el código de Scala de ejemplo en tu clúster, completa los siguientes pasos:
- Conéctate a la instancia principal del clúster de Managed Service para Apache Spark con SSH.
- Ve a la página Clústeres de Managed Service para Apache Spark en la Google Cloud consola de y, luego, haz clic en el nombre de tu clúster.
- En la página Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en
SSHa la derecha del nombre de la instancia principal del clúster.
Se abrirá una ventana del navegador en tu directorio principal del nodo.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea un archivo
singers.scalaen el nodo principal con el editor de textovi,vimonanopreinstalado.- Pega el siguiente código en el archivo
singers.scala. Ten en cuenta que la función Data Boost de Spanner está habilitada, lo que tiene un impacto casi nulo en la instancia principal de Spanner.object singers { def main(): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .option("enableDataBoost", true) .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
Reemplaza lo siguiente:
- PROJECT_ID: Es el ID del Google Cloud proyecto. Los IDs del proyecto se enumeran en la sección Información del proyecto en el Google Cloud panel de la consola.
- INSTANCE_ID, DATABASE_ID y TABLE_NAME : Consulta
Configura una instancia de Spanner con una tabla de base de datos de
Singers.
- Guarda el archivo
singers.scala.
- Pega el siguiente código en el archivo
- Inicia el REPL de
spark-shell.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Reemplaza lo siguiente:
CONNECTOR_VERSION: Versión del conector de Spanner. Elige la versión del conector de Spanner de la lista de versiones en el repositorio de GitHub
GoogleCloudDataproc/spark-spanner-connector. - Ejecuta
singers.scalacon el comando:load singers.scalapara crear la tablaSingersde Spanner. La lista de salida muestra ejemplosde la salida de Singers.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
Lee gráficos de Spanner
El conector de Spanner admite la exportación del gráfico a
nodos separados y bordes
DataFrames
, así como la exportación directa a
GraphFrames.
En el siguiente ejemplo, se exporta un Spanner a un GraphFrame. Usa la clase SpannerGraphConnector de Python, incluida en el jar del conector de Spanner, para leer el gráfico de Spanner.
from pyspark.sql import SparkSession connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar" spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example") .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12") .config("spark.jars", connector_jar) .getOrCreate()) spark.sparkContext.addPyFile(connector_jar) from spannergraph import SpannerGraphConnector connector = (SpannerGraphConnector() .spark(spark) .project("PROJECT_ID") .instance("INSTANCE_ID") .database("DATABASE_ID") .graph("GRAPH_ID")) g = connector.load_graph() g.vertices.show() g.edges.show()
Reemplaza lo siguiente:
- CONNECTOR_VERSION: Versión del conector de Spanner.
Elige la versión del conector de Spanner de la lista de versiones en el repositorio de GitHub
GoogleCloudDataproc/spark-spanner-connector. - PROJECT_ID: Es el ID del Google Cloud proyecto. Los IDs del proyecto se enumeran en la sección Información del proyecto en el Google Cloud panel de la consola.
- INSTANCE_ID, DATABASE_ID y TABLE_NAME: Inserta los IDs de la instancia, la base de datos y el gráfico.
Para exportar nodos y bordes DataFrames en lugar de GraphFrames, usa load_dfs en su lugar:
df_vertices, df_edges, df_id_map = connector.load_dfs()
Escribe tablas de Spanner
El conector de Spanner admite la escritura de un DataFrame de Spark en una tabla de Spanner con la API de fuente de datos de Spark.
Ejemplo de escritura de DataFrame en una tabla de Spanner
Propaga las variables antes de guardar y ejecutar el código.
"""Spanner PySpark write example.""" from pyspark.sql import SparkSession spark = SparkSession.builder.appName('Spanner Write App').getOrCreate() columns = ['id', 'name', 'email'] data = [(1, 'John Doe', 'john.doe@example.com'), (2, 'Jane Doe', 'jane.doe@example.com')] df = spark.createDataFrame(data, columns) df.write.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .mode("append") \ .save()
Reemplaza lo siguiente:
- PROJECT_ID: Es el ID del proyecto de Google Cloud . Los IDs del proyecto se enumeran en la sección Información del proyecto en el Google Cloud panel de la consola.
- INSTANCE_ID, DATABASE_ID y TABLE_NAME: Inserta los IDs de la instancia, la base de datos y la tabla.
Limpia
Para evitar cargos continuos en tu Google Cloud cuenta, puedes detener o borrar tu clúster de Managed Service para Apache Spark y borrar tu instancia de Spanner.
¿Qué sigue?
- Consulta los
pyspark.sql.DataFrameejemplos. - Para obtener información sobre la compatibilidad de idiomas de DataFrame de Spark, consulta lo siguiente:
- Consulta el repositorio del conector de Spanner para Spark en GitHub.
- Consulta las sugerencias de ajuste de trabajo de Spark.