Se connecter à des sources de données

Managed Service pour Apache Spark s'intègre aux bases de données, aux entrepôts de données et aux services de streaming pour traiter les données de plusieurs systèmes. Ce document explique comment connecter des clusters Managed Service pour Apache Spark à des sources de données à l'intérieur et à l'extérieur de Google Cloud.

Avant de commencer

  1. Connectez-vous à votre Google Cloud compte. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $de crédits sans frais pour exécuter, tester et déployer des charges de travail.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that you have the permissions required to complete this guide.

  4. Verify that billing is enabled for your Google Cloud project.

  5. Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  7. Verify that you have the permissions required to complete this guide.

  8. Verify that billing is enabled for your Google Cloud project.

  9. Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  10. Créez un cluster Managed Service pour Apache Spark.
  11. Créez un bucket Cloud Storage.
  12. Assurez-vous de disposer des autorisations requises pour accéder aux sources de données.

Rôles requis

Certains rôles IAM sont requis pour exécuter les exemples de cette page. En fonction des règles d'administration, ces rôles peuvent déjà avoir été attribués. Pour vérifier les attributions de rôles, consultez la section Devez-vous attribuer des rôles ?.

Pour en savoir plus sur l'attribution de rôles, consultez la section Gérer l'accès aux projets, aux dossiers et aux organisations.

Rôles utilisateur

Pour obtenir les autorisations nécessaires pour créer un cluster Managed Service pour Apache Spark, demandez à votre administrateur de vous accorder les rôles IAM suivants :

Rôle du compte de service

Pour vous assurer que le compte de service Compute Engine par défaut dispose des autorisations nécessaires pour créer un cluster Managed Service pour Apache Spark, demandez à votre administrateur d'accorder le rôle IAM Nœud de calcul Dataproc (roles/dataproc.worker) au compte de service Compute Engine par défaut sur le projet.

Se connecter à des Google Cloud services

Cette section explique comment connecter Managed Service pour Apache Spark à Google Cloud et à des sources de données externes.

Se connecter à Cloud Storage

Managed Service pour Apache Spark utilise Cloud Storage comme système de fichiers distribué par défaut. Pour lire et écrire des données dans des buckets Cloud Storage, utilisez le préfixe d'URI gs://. Aucune configuration supplémentaire n'est requise.

L'exemple suivant montre comment lire un fichier CSV à partir d'un bucket Cloud Storage, puis écrire un fichier Parquet dans le bucket.

# Read a CSV file from a Cloud Storage bucket into a Spark DataFrame
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("gs://BUCKET_NAME/path/to/data.csv")

df.show()

# Write a Spark DataFrame to a Cloud Storage bucket
df.write.format("parquet") \
    .mode("overwrite") \
    .save("gs://BUCKET_NAME/path/to/output.parquet")

Se connecter à BigQuery

Le connecteur BigQuery est préinstallé sur les versions d'image 2.1 et ultérieures de Managed Service pour Apache Spark. Il permet le transfert de données à grande échelle entre Managed Service pour Apache Spark et BigQuery.

L'exemple suivant montre comment lire une table BigQuery dans un DataFrame Spark, puis exécuter une requête.

# Set up the BigQuery connector.
spark.conf.set("spark.sql.sources.provider", "bigquery")
spark.conf.set("viewsEnabled", "true")
spark.conf.set("materializationDataset", "YOUR_BQ_DATASET")

# Read a BigQuery table into a Spark DataFrame,
df = spark.read.format("bigquery") \
    .option("table", "bigquery-public-data.samples.shakespeare") \
    .load()

df.createOrReplaceTempView("shakespeare")

# Perform a query on the data,
word_counts = spark.sql("SELECT word, SUM(word_count) as count FROM shakespeare GROUP BY word ORDER BY count DESC")
word_counts.show()

Se connecter à des bases de données externes à l'aide de JDBC

Vous pouvez connecter Managed Service pour Apache Spark à une base de données qui fournit un pilote Java Database Connectivity (JDBC), tel que PostgreSQL ou MySQL, à l'aide de la source de données JDBC intégrée à Spark.

Rendre le pilote JDBC disponible

Pour vous connecter à une base de données avec JDBC, mettez le fichier JAR du pilote à disposition sur le chemin de classe Spark.

Par tâche

Stockez le fichier JAR du pilote dans un bucket Cloud Storage et référencez-le avec l'option --jars lorsque vous envoyez une tâche. Spark distribue le fichier JAR aux nœuds nécessaires pour la tâche. Il s'agit de l'approche recommandée pour les dépendances spécifiques à une tâche.

À l'échelle du cluster

Si toutes les tâches d'un cluster nécessitent un pilote spécifique, utilisez une action d'initialisation lorsque vous créez le cluster pour copier le fichier JAR du pilote de Cloud Storage dans le répertoire de fichiers JAR Spark de chaque nœud.

  1. Créez un script d'action d'initialisation.

    #!/bin/bash
    set -e -x
    gsutil cp gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jar /usr/lib/spark/jars/
    
  2. Créez le cluster et référencez le script.

    gcloud dataproc clusters create my-jdbc-cluster \
        --region=REGION \
        --image-version=2.2-debian11 \
        --initialization-actions=gs://YOUR_BUCKET_NAME/init-actions/install-jdbc-driver.sh
    

Lire dans une base de données PostgreSQL

Cet exemple utilise la méthode par tâche pour se connecter à une base de données PostgreSQL.

  1. Téléchargez le fichier JAR du pilote JDBC PostgreSQL et importez-le dans un bucket Cloud Storage.

  2. Envoyez votre tâche PySpark et référencez le chemin d'accès Cloud Storage du pilote avec l'option --jars.

    gcloud dataproc jobs submit pyspark my_job.py \
        --cluster=MY_CLUSTER \
        --region=REGION \
        --jars=gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jar
    
  3. Utilisez le code suivant dans votre fichier job.py pour lire et écrire dans la base de données.

    from pyspark.sql import SparkSession
    
    def main():
        # WARNING: Do not hardcode credentials. Use a service like Secret Manager
        # to handle sensitive credentials.
        db_properties = {
            "user": "USERNAME",
            "password": "PASSWORD",
            "driver": "org.postgresql.Driver"
        }
    
        jdbc_url = "jdbc:postgresql://DB_HOST:DB_PORT/DATABASE"
    
        spark = SparkSession.builder.appName("Postgres JDBC Example").getOrCreate()
    
        # Read data from a PostgreSQL table
        df = spark.read.jdbc(url=jdbc_url, table="public.my_table", properties=db_properties)
    
        df.printSchema()
        df.show()
    
        # Write data to a new PostgreSQL table
        (df.write
            .jdbc(url=jdbc_url,
                  table="public.new_table",
                  mode="overwrite",
                  properties=db_properties))
    
    if __name__ == "__main__":
        main()
    

Se connecter à des bases de données externes à l'aide d'ODBC

Pour vous connecter à des sources de données où aucun pilote JDBC n'est disponible, utilisez un pilote Open Database Connectivity (ODBC). Utilisez une action d'initialisation pour installer le pilote et ses dépendances sur chaque nœud lorsque vous créez le cluster.

Cet exemple montre comment se connecter à une instance Microsoft SQL Server.

  1. Créez le script d'action d'initialisation.

    1. Créez un script qui installe le pilote ODBC Microsoft SQL Server pour Debian.
        #!/bin/bash
        # Initialization action for installing MS SQL ODBC driver on the cluster.
    
        set -e -x
    
        # Install dependencies for the driver.
        apt-get update
        apt-get install -y --no-install-recommends curl gnupg unixodbc-dev
    
        # Add Microsoft's official repository.
        curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
        curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list
    
        # Install the driver.
        apt-get update
        ACCEPT_EULA=Y apt-get install -y msodbcsql18
    
        # Install the pyodbc library for Python.
        pip install pyodbc
        ```
    
    1.  Upload the script to a Cloud Storage bucket.
    
    ```shell
        gsutil cp install-odbc-mssql.sh gs://YOUR_BUCKET_NAME/init-actions/
        ```
    
  2. Créez un cluster Managed Service pour Apache Spark qui exécute l'action d'initialisation.

    Lorsque vous créez votre cluster, pointez vers le script d'action d'initialisation dans Cloud Storage.

    gcloud dataproc clusters create my-odbc-cluster \
        --region=REGION \
        --image-version=2.2-debian11 \
        --initialization-actions=gs://BUCKET_NAME/init-actions/install-odbc-mssql.sh
    
  3. Connectez-vous et interrogez à l'aide de PySpark

    Une fois le cluster en cours d'exécution, le pilote ODBC et la bibliothèque pyodbc sont disponibles. Le code suivant utilise pyodbc sur le nœud du pilote pour extraire des données dans un DataFrame Pandas, puis les convertit en DataFrame Spark pour le traitement distribué.

    import pyodbc
    import pandas as pd
    from pyspark.sql import SparkSession
    
    def get_sql_data(connection_string, query):
        """
        Connects to the database using pyodbc, executes a query,
        and returns the result as a Pandas DataFrame.
        """
        cnxn = pyodbc.connect(connection_string)
        pdf = pd.read_sql(query, cnxn)
        cnxn.close()
        return pdf
    
    def main():
        spark = SparkSession.builder.appName("ODBC Example").getOrCreate()
    
        # WARNING: Do not hardcode credentials. Use Secret Manager.
        server = 'SERVER.database.windows.net'
        database = 'DATABASE'
        username = 'USERNAME'
        password = 'PASSWORD'
        driver = '{ODBC Driver 18 for SQL Server}' # The driver name must match the installed driver.
    
        # Create the connection string.
        connection_string = f'DRIVER={driver};SERVER=tcp:{server},1433;DATABASE={database};UID={username};PWD={password};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;'
    
        sql_query = "SELECT * FROM Sales.Customer"
    
        # Fetch data as a Pandas DataFrame.
        pandas_df = get_sql_data(connection_string, sql_query)
    
        # Convert the Pandas DataFrame to a Spark DataFrame for distributed processing.
        spark_df = spark.createDataFrame(pandas_df)
    
        print("Successfully read data from SQL Server:")
        spark_df.printSchema()
        spark_df.show(5)
    
        # You can now perform distributed operations on the Spark DataFrame.
        print(f"Total number of customers: {spark_df.count()}")
    
    if __name__ == "__main__":
        main()
    

Bonnes pratiques

Cette section décrit les bonnes pratiques pour gérer les identifiants et les dépendances.

Gestion des identifiants

Ne codez pas en dur les mots de passe ni d'autres secrets dans votre code. Utilisez Secret Manager pour stocker les identifiants de manière sécurisée et y accéder à partir de vos tâches Managed Service pour Apache Spark.

Gestion des dépendances

Utilisez des actions d'initialisation pour installer des pilotes et des bibliothèques lorsque vous créez un cluster. Pour les environnements complexes, envisagez de créer une image Managed Service pour Apache Spark personnalisée avec toutes les dépendances préinstallées. Une image personnalisée réduit le temps de démarrage du cluster et garantit la cohérence.

Copiez les scripts d'action d'initialisation à partir de sources publiques dans votre propre bucket Cloud Storage versionné avant de les utiliser en production. Ne référencez pas directement les scripts publics, car ils peuvent être modifiés sans préavis.

Étape suivante