Conectarse a fuentes de datos
Managed Service para Apache Spark se integra con bases de datos, almacenes de datos y servicios de transmisión para procesar datos de varios sistemas. En este documento, se muestra cómo conectar clústeres de Managed Service para Apache Spark a fuentes de datos dentro y fuera de Google Cloud.
Antes de comenzar
- Accede a tu Google Cloud cuenta de. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- Crea un clúster de Managed Service para Apache Spark.
- Crea buckets de Cloud Storage.
- Asegúrate de tener los permisos necesarios para acceder a las fuentes de datos.
Roles obligatorios
Se requieren ciertos roles de IAM para ejecutar los ejemplos de esta página. Según las políticas de la organización, es posible que ya se hayan otorgado estos roles. Para verificar las concesiones de roles, consulta ¿Necesitas otorgar roles?.
Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos,carpetas y organizaciones.
Funciones de usuario
Para obtener los permisos que necesitas para crear un clúster de Managed Service para Apache Spark, pídele a tu administrador que te otorgue los siguientes roles de IAM:
-
Editor de Dataproc (
roles/dataproc.editor) en el proyecto -
Usuario de cuenta de servicio (
roles/iam.serviceAccountUser) en la cuenta de servicio predeterminada de Compute Engine
Función de cuenta de servicio
Para asegurarte de que la cuenta de servicio predeterminada de Compute Engine tenga los permisos necesarios para crear un clúster de Managed Service para Apache Spark, pídele a tu administrador que otorgue el rol de IAM de Trabajador de Dataproc (roles/dataproc.worker) a la cuenta de servicio predeterminada de Compute Engine en el proyecto.
Conéctate a Google Cloud servicios
En esta sección, se muestra cómo conectar Managed Service para Apache Spark a Google Cloud y fuentes de datos externas.
Conecta a Cloud Storage
Managed Service para Apache Spark usa Cloud Storage como su sistema de archivos distribuido predeterminado. Para leer y escribir datos desde y hacia buckets de Cloud Storage, usa el prefijo de URI gs://. No se requiere configuración adicional.
En el siguiente ejemplo, se muestra cómo leer un archivo CSV de un bucket de Cloud Storage y, luego, escribir un archivo Parquet en el bucket.
# Read a CSV file from a Cloud Storage bucket into a Spark DataFrame
df = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("gs://BUCKET_NAME/path/to/data.csv")
df.show()
# Write a Spark DataFrame to a Cloud Storage bucket
df.write.format("parquet") \
.mode("overwrite") \
.save("gs://BUCKET_NAME/path/to/output.parquet")
Conéctese a BigQuery
El conector de BigQuery está preinstalado en Managed Service para Apache Spark 2.1 y versiones posteriores de la imagen. El conector permite la transferencia de datos a gran escala entre Managed Service para Apache Spark y BigQuery.
En el siguiente ejemplo, se muestra cómo leer una tabla de BigQuery en un DataFrame de Spark y, luego, realizar una consulta.
# Set up the BigQuery connector.
spark.conf.set("spark.sql.sources.provider", "bigquery")
spark.conf.set("viewsEnabled", "true")
spark.conf.set("materializationDataset", "YOUR_BQ_DATASET")
# Read a BigQuery table into a Spark DataFrame,
df = spark.read.format("bigquery") \
.option("table", "bigquery-public-data.samples.shakespeare") \
.load()
df.createOrReplaceTempView("shakespeare")
# Perform a query on the data,
word_counts = spark.sql("SELECT word, SUM(word_count) as count FROM shakespeare GROUP BY word ORDER BY count DESC")
word_counts.show()
Conéctate a bases de datos externas con JDBC
Puedes conectar Managed Service para Apache Spark a una base de datos que proporcione un controlador de Conectividad a base de datos de Java (JDBC), como PostgreSQL o MySQL, con la fuente de datos JDBC integrada de Spark.
Haz que el controlador JDBC esté disponible
Para conectarte a una base de datos con JDBC, haz que el archivo JAR del controlador esté disponible en la ruta de clase de Spark.
Por trabajo
Almacena el JAR del controlador en un bucket de Cloud Storage y haz referencia a él con la marca --jars cuando envíes un trabajo. Spark distribuye el archivo JAR a los nodos necesarios para el trabajo. Este es el método recomendado para las dependencias específicas del trabajo.
En todo el clúster
Si todos los trabajos de un clúster requieren un controlador específico, usa una acción de inicialización cuando crees el clúster para copiar el JAR del controlador de Cloud Storage en el directorio de JAR de Spark en cada nodo.
Crea una secuencia de comandos de acción de inicialización.
#!/bin/bash set -e -x gsutil cp gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jar /usr/lib/spark/jars/Crea el clúster y haz referencia a la secuencia de comandos.
gcloud dataproc clusters create my-jdbc-cluster \ --region=REGION \ --image-version=2.2-debian11 \ --initialization-actions=gs://YOUR_BUCKET_NAME/init-actions/install-jdbc-driver.sh
Realiza operaciones de lectura desde una base de datos PostgreSQL
En este ejemplo, se usa el método por trabajo para conectarse a una base de datos PostgreSQL.
Descarga el JAR del controlador JDBC de PostgreSQL y súbelo a un bucket de Cloud Storage.
Envía tu trabajo de PySpark y haz referencia a la ruta de acceso de Cloud Storage del controlador con la marca
--jars.gcloud dataproc jobs submit pyspark my_job.py \ --cluster=MY_CLUSTER \ --region=REGION \ --jars=gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jarUsa el siguiente código en tu archivo
job.pypara leer y escribir en la base de datos.from pyspark.sql import SparkSession def main(): # WARNING: Do not hardcode credentials. Use a service like Secret Manager # to handle sensitive credentials. db_properties = { "user": "USERNAME", "password": "PASSWORD", "driver": "org.postgresql.Driver" } jdbc_url = "jdbc:postgresql://DB_HOST:DB_PORT/DATABASE" spark = SparkSession.builder.appName("Postgres JDBC Example").getOrCreate() # Read data from a PostgreSQL table df = spark.read.jdbc(url=jdbc_url, table="public.my_table", properties=db_properties) df.printSchema() df.show() # Write data to a new PostgreSQL table (df.write .jdbc(url=jdbc_url, table="public.new_table", mode="overwrite", properties=db_properties)) if __name__ == "__main__": main()
Conéctate a bases de datos externas con ODBC
Para conectarte a fuentes de datos en las que no hay un controlador JDBC disponible, usa un controlador de Conectividad a base de datos abierta (ODBC). Usa una acción de inicialización para instalar el controlador y sus dependencias en cada nodo cuando crees el clúster.
En este ejemplo, se muestra cómo conectarse a una instancia de Microsoft SQL Server.
Crea la secuencia de comandos de acción de inicialización.
- Crea una secuencia de comandos que instale el controlador ODBC de Microsoft SQL Server para Debian.
#!/bin/bash # Initialization action for installing MS SQL ODBC driver on the cluster. set -e -x # Install dependencies for the driver. apt-get update apt-get install -y --no-install-recommends curl gnupg unixodbc-dev # Add Microsoft's official repository. curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list # Install the driver. apt-get update ACCEPT_EULA=Y apt-get install -y msodbcsql18 # Install the pyodbc library for Python. pip install pyodbc ``` 1. Upload the script to a Cloud Storage bucket. ```shell gsutil cp install-odbc-mssql.sh gs://YOUR_BUCKET_NAME/init-actions/ ```Crea un clúster de Managed Service para Apache Spark que ejecute la acción de inicialización.
Cuando crees el clúster, apunta a la secuencia de comandos de acción de inicialización en Cloud Storage.
gcloud dataproc clusters create my-odbc-cluster \ --region=REGION \ --image-version=2.2-debian11 \ --initialization-actions=gs://BUCKET_NAME/init-actions/install-odbc-mssql.shConéctate y realiza consultas con PySpark
Una vez que el clúster esté en funcionamiento, el controlador ODBC y la biblioteca
pyodbcestarán disponibles. El siguiente código usapyodbcen el nodo del controlador para recuperar datos en un DataFrame de Pandas y, luego, lo convierte en un DataFrame de Spark para el procesamiento distribuido.import pyodbc import pandas as pd from pyspark.sql import SparkSession def get_sql_data(connection_string, query): """ Connects to the database using pyodbc, executes a query, and returns the result as a Pandas DataFrame. """ cnxn = pyodbc.connect(connection_string) pdf = pd.read_sql(query, cnxn) cnxn.close() return pdf def main(): spark = SparkSession.builder.appName("ODBC Example").getOrCreate() # WARNING: Do not hardcode credentials. Use Secret Manager. server = 'SERVER.database.windows.net' database = 'DATABASE' username = 'USERNAME' password = 'PASSWORD' driver = '{ODBC Driver 18 for SQL Server}' # The driver name must match the installed driver. # Create the connection string. connection_string = f'DRIVER={driver};SERVER=tcp:{server},1433;DATABASE={database};UID={username};PWD={password};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;' sql_query = "SELECT * FROM Sales.Customer" # Fetch data as a Pandas DataFrame. pandas_df = get_sql_data(connection_string, sql_query) # Convert the Pandas DataFrame to a Spark DataFrame for distributed processing. spark_df = spark.createDataFrame(pandas_df) print("Successfully read data from SQL Server:") spark_df.printSchema() spark_df.show(5) # You can now perform distributed operations on the Spark DataFrame. print(f"Total number of customers: {spark_df.count()}") if __name__ == "__main__": main()
Prácticas recomendadas
En esta sección, se describen las prácticas recomendadas para administrar credenciales y dependencias.
Administración de credenciales
No codifiques contraseñas ni otros secretos en tu código. Usa Secret Manager para almacenar credenciales de forma segura y acceder a ellas desde tus trabajos de Managed Service para Apache Spark.
Administración de dependencias
Usa acciones de inicialización para instalar controladores y bibliotecas cuando crees un clúster. Para entornos complejos, considera compilar una imagen personalizada de Managed Service para Apache Spark con todas las dependencias preinstaladas. Una imagen personalizada reduce el tiempo de inicio del clúster y garantiza la coherencia.
Copia las secuencias de comandos de acción de inicialización de fuentes públicas a tu propio bucket de Cloud Storage con versiones antes de usarlas en producción. No hagas referencia a secuencias de comandos públicas directamente, ya que pueden cambiar sin previo aviso.
¿Qué sigue?
- Obtener más información sobre el administrador de secretos.
- Obtén información para crear una imagen personalizada.
- Obtén más información sobre las acciones de inicialización.