Conectarse a fuentes de datos

Managed Service para Apache Spark se integra con bases de datos, almacenes de datos y servicios de transmisión para procesar datos de varios sistemas. En este documento, se muestra cómo conectar clústeres de Managed Service para Apache Spark a fuentes de datos dentro y fuera de Google Cloud.

Antes de comenzar

  1. Accede a tu Google Cloud cuenta de. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that you have the permissions required to complete this guide.

  4. Verify that billing is enabled for your Google Cloud project.

  5. Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  7. Verify that you have the permissions required to complete this guide.

  8. Verify that billing is enabled for your Google Cloud project.

  9. Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  10. Crea un clúster de Managed Service para Apache Spark.
  11. Crea buckets de Cloud Storage.
  12. Asegúrate de tener los permisos necesarios para acceder a las fuentes de datos.

Roles obligatorios

Se requieren ciertos roles de IAM para ejecutar los ejemplos de esta página. Según las políticas de la organización, es posible que ya se hayan otorgado estos roles. Para verificar las concesiones de roles, consulta ¿Necesitas otorgar roles?.

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos,carpetas y organizaciones.

Funciones de usuario

Para obtener los permisos que necesitas para crear un clúster de Managed Service para Apache Spark, pídele a tu administrador que te otorgue los siguientes roles de IAM:

Función de cuenta de servicio

Para asegurarte de que la cuenta de servicio predeterminada de Compute Engine tenga los permisos necesarios para crear un clúster de Managed Service para Apache Spark, pídele a tu administrador que otorgue el rol de IAM de Trabajador de Dataproc (roles/dataproc.worker) a la cuenta de servicio predeterminada de Compute Engine en el proyecto.

Conéctate a Google Cloud servicios

En esta sección, se muestra cómo conectar Managed Service para Apache Spark a Google Cloud y fuentes de datos externas.

Conecta a Cloud Storage

Managed Service para Apache Spark usa Cloud Storage como su sistema de archivos distribuido predeterminado. Para leer y escribir datos desde y hacia buckets de Cloud Storage, usa el prefijo de URI gs://. No se requiere configuración adicional.

En el siguiente ejemplo, se muestra cómo leer un archivo CSV de un bucket de Cloud Storage y, luego, escribir un archivo Parquet en el bucket.

# Read a CSV file from a Cloud Storage bucket into a Spark DataFrame
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("gs://BUCKET_NAME/path/to/data.csv")

df.show()

# Write a Spark DataFrame to a Cloud Storage bucket
df.write.format("parquet") \
    .mode("overwrite") \
    .save("gs://BUCKET_NAME/path/to/output.parquet")

Conéctese a BigQuery

El conector de BigQuery está preinstalado en Managed Service para Apache Spark 2.1 y versiones posteriores de la imagen. El conector permite la transferencia de datos a gran escala entre Managed Service para Apache Spark y BigQuery.

En el siguiente ejemplo, se muestra cómo leer una tabla de BigQuery en un DataFrame de Spark y, luego, realizar una consulta.

# Set up the BigQuery connector.
spark.conf.set("spark.sql.sources.provider", "bigquery")
spark.conf.set("viewsEnabled", "true")
spark.conf.set("materializationDataset", "YOUR_BQ_DATASET")

# Read a BigQuery table into a Spark DataFrame,
df = spark.read.format("bigquery") \
    .option("table", "bigquery-public-data.samples.shakespeare") \
    .load()

df.createOrReplaceTempView("shakespeare")

# Perform a query on the data,
word_counts = spark.sql("SELECT word, SUM(word_count) as count FROM shakespeare GROUP BY word ORDER BY count DESC")
word_counts.show()

Conéctate a bases de datos externas con JDBC

Puedes conectar Managed Service para Apache Spark a una base de datos que proporcione un controlador de Conectividad a base de datos de Java (JDBC), como PostgreSQL o MySQL, con la fuente de datos JDBC integrada de Spark.

Haz que el controlador JDBC esté disponible

Para conectarte a una base de datos con JDBC, haz que el archivo JAR del controlador esté disponible en la ruta de clase de Spark.

Por trabajo

Almacena el JAR del controlador en un bucket de Cloud Storage y haz referencia a él con la marca --jars cuando envíes un trabajo. Spark distribuye el archivo JAR a los nodos necesarios para el trabajo. Este es el método recomendado para las dependencias específicas del trabajo.

En todo el clúster

Si todos los trabajos de un clúster requieren un controlador específico, usa una acción de inicialización cuando crees el clúster para copiar el JAR del controlador de Cloud Storage en el directorio de JAR de Spark en cada nodo.

  1. Crea una secuencia de comandos de acción de inicialización.

    #!/bin/bash
    set -e -x
    gsutil cp gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jar /usr/lib/spark/jars/
    
  2. Crea el clúster y haz referencia a la secuencia de comandos.

    gcloud dataproc clusters create my-jdbc-cluster \
        --region=REGION \
        --image-version=2.2-debian11 \
        --initialization-actions=gs://YOUR_BUCKET_NAME/init-actions/install-jdbc-driver.sh
    

Realiza operaciones de lectura desde una base de datos PostgreSQL

En este ejemplo, se usa el método por trabajo para conectarse a una base de datos PostgreSQL.

  1. Descarga el JAR del controlador JDBC de PostgreSQL y súbelo a un bucket de Cloud Storage.

  2. Envía tu trabajo de PySpark y haz referencia a la ruta de acceso de Cloud Storage del controlador con la marca --jars.

    gcloud dataproc jobs submit pyspark my_job.py \
        --cluster=MY_CLUSTER \
        --region=REGION \
        --jars=gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jar
    
  3. Usa el siguiente código en tu archivo job.py para leer y escribir en la base de datos.

    from pyspark.sql import SparkSession
    
    def main():
        # WARNING: Do not hardcode credentials. Use a service like Secret Manager
        # to handle sensitive credentials.
        db_properties = {
            "user": "USERNAME",
            "password": "PASSWORD",
            "driver": "org.postgresql.Driver"
        }
    
        jdbc_url = "jdbc:postgresql://DB_HOST:DB_PORT/DATABASE"
    
        spark = SparkSession.builder.appName("Postgres JDBC Example").getOrCreate()
    
        # Read data from a PostgreSQL table
        df = spark.read.jdbc(url=jdbc_url, table="public.my_table", properties=db_properties)
    
        df.printSchema()
        df.show()
    
        # Write data to a new PostgreSQL table
        (df.write
            .jdbc(url=jdbc_url,
                  table="public.new_table",
                  mode="overwrite",
                  properties=db_properties))
    
    if __name__ == "__main__":
        main()
    

Conéctate a bases de datos externas con ODBC

Para conectarte a fuentes de datos en las que no hay un controlador JDBC disponible, usa un controlador de Conectividad a base de datos abierta (ODBC). Usa una acción de inicialización para instalar el controlador y sus dependencias en cada nodo cuando crees el clúster.

En este ejemplo, se muestra cómo conectarse a una instancia de Microsoft SQL Server.

  1. Crea la secuencia de comandos de acción de inicialización.

    1. Crea una secuencia de comandos que instale el controlador ODBC de Microsoft SQL Server para Debian.
        #!/bin/bash
        # Initialization action for installing MS SQL ODBC driver on the cluster.
    
        set -e -x
    
        # Install dependencies for the driver.
        apt-get update
        apt-get install -y --no-install-recommends curl gnupg unixodbc-dev
    
        # Add Microsoft's official repository.
        curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
        curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list
    
        # Install the driver.
        apt-get update
        ACCEPT_EULA=Y apt-get install -y msodbcsql18
    
        # Install the pyodbc library for Python.
        pip install pyodbc
        ```
    
    1.  Upload the script to a Cloud Storage bucket.
    
    ```shell
        gsutil cp install-odbc-mssql.sh gs://YOUR_BUCKET_NAME/init-actions/
        ```
    
  2. Crea un clúster de Managed Service para Apache Spark que ejecute la acción de inicialización.

    Cuando crees el clúster, apunta a la secuencia de comandos de acción de inicialización en Cloud Storage.

    gcloud dataproc clusters create my-odbc-cluster \
        --region=REGION \
        --image-version=2.2-debian11 \
        --initialization-actions=gs://BUCKET_NAME/init-actions/install-odbc-mssql.sh
    
  3. Conéctate y realiza consultas con PySpark

    Una vez que el clúster esté en funcionamiento, el controlador ODBC y la biblioteca pyodbc estarán disponibles. El siguiente código usa pyodbc en el nodo del controlador para recuperar datos en un DataFrame de Pandas y, luego, lo convierte en un DataFrame de Spark para el procesamiento distribuido.

    import pyodbc
    import pandas as pd
    from pyspark.sql import SparkSession
    
    def get_sql_data(connection_string, query):
        """
        Connects to the database using pyodbc, executes a query,
        and returns the result as a Pandas DataFrame.
        """
        cnxn = pyodbc.connect(connection_string)
        pdf = pd.read_sql(query, cnxn)
        cnxn.close()
        return pdf
    
    def main():
        spark = SparkSession.builder.appName("ODBC Example").getOrCreate()
    
        # WARNING: Do not hardcode credentials. Use Secret Manager.
        server = 'SERVER.database.windows.net'
        database = 'DATABASE'
        username = 'USERNAME'
        password = 'PASSWORD'
        driver = '{ODBC Driver 18 for SQL Server}' # The driver name must match the installed driver.
    
        # Create the connection string.
        connection_string = f'DRIVER={driver};SERVER=tcp:{server},1433;DATABASE={database};UID={username};PWD={password};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;'
    
        sql_query = "SELECT * FROM Sales.Customer"
    
        # Fetch data as a Pandas DataFrame.
        pandas_df = get_sql_data(connection_string, sql_query)
    
        # Convert the Pandas DataFrame to a Spark DataFrame for distributed processing.
        spark_df = spark.createDataFrame(pandas_df)
    
        print("Successfully read data from SQL Server:")
        spark_df.printSchema()
        spark_df.show(5)
    
        # You can now perform distributed operations on the Spark DataFrame.
        print(f"Total number of customers: {spark_df.count()}")
    
    if __name__ == "__main__":
        main()
    

Prácticas recomendadas

En esta sección, se describen las prácticas recomendadas para administrar credenciales y dependencias.

Administración de credenciales

No codifiques contraseñas ni otros secretos en tu código. Usa Secret Manager para almacenar credenciales de forma segura y acceder a ellas desde tus trabajos de Managed Service para Apache Spark.

Administración de dependencias

Usa acciones de inicialización para instalar controladores y bibliotecas cuando crees un clúster. Para entornos complejos, considera compilar una imagen personalizada de Managed Service para Apache Spark con todas las dependencias preinstaladas. Una imagen personalizada reduce el tiempo de inicio del clúster y garantiza la coherencia.

Copia las secuencias de comandos de acción de inicialización de fuentes públicas a tu propio bucket de Cloud Storage con versiones antes de usarlas en producción. No hagas referencia a secuencias de comandos públicas directamente, ya que pueden cambiar sin previo aviso.

¿Qué sigue?