Connessione alle origini dati

Managed Service per Apache Spark si integra con database, data warehouse e servizi di streaming per elaborare i dati provenienti da più sistemi. Questo documento mostra come connettere i cluster Managed Service per Apache Spark alle origini dati all'interno e all'esterno di Google Cloud.

Prima di iniziare

  1. Accedi al tuo Google Cloud account. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti senza costi per l'esecuzione, il test e il deployment dei workload.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that you have the permissions required to complete this guide.

  4. Verify that billing is enabled for your Google Cloud project.

  5. Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  7. Verify that you have the permissions required to complete this guide.

  8. Verify that billing is enabled for your Google Cloud project.

  9. Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  10. Crea un cluster Managed Service per Apache Spark.
  11. Crea un bucket Cloud Storage.
  12. Assicurati di disporre delle autorizzazioni necessarie per accedere alle origini dati.

Ruoli obbligatori

Per eseguire gli esempi in questa pagina sono necessari alcuni ruoli IAM. A seconda delle policy dell'organizzazione, questi ruoli potrebbero essere già stati concessi. Per verificare le concessioni dei ruoli, consulta la sezione Devi concedere i ruoli?.

Per saperne di più sulla concessione dei ruoli, consulta Gestire l'accesso a progetti, cartelle e organizzazioni.

Ruoli utente

Per ottenere le autorizzazioni necessarie per creare un cluster Managed Service per Apache Spark, chiedi all'amministratore di concederti i seguenti ruoli IAM:

Ruolo dell'account di servizio

Per assicurarti che il account di servizio predefinito di Compute Engine disponga delle autorizzazioni necessarie per creare un cluster Managed Service per Apache Spark, chiedi all'amministratore di concedere il ruolo IAM Dataproc Worker (roles/dataproc.worker) al account di servizio predefinito di Compute Engine sul progetto.

Connessione ai Google Cloud servizi

Questa sezione mostra come connettere Managed Service per Apache Spark a Google Cloud e alle origini dati esterne.

Connessione a Cloud Storage

Managed Service per Apache Spark utilizza Cloud Storage come file system distribuito predefinito. Per leggere e scrivere dati da e verso i bucket Cloud Storage, utilizza il prefisso URI gs://. Non occorrono ulteriori configurazioni.

L'esempio seguente mostra come leggere un file CSV da un bucket Cloud Storage e quindi scrivere un file Parquet nel bucket.

# Read a CSV file from a Cloud Storage bucket into a Spark DataFrame
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("gs://BUCKET_NAME/path/to/data.csv")

df.show()

# Write a Spark DataFrame to a Cloud Storage bucket
df.write.format("parquet") \
    .mode("overwrite") \
    .save("gs://BUCKET_NAME/path/to/output.parquet")

Connessione a BigQuery

Il connettore BigQuery è preinstallato nelle versioni dell'immagine Managed Service per Apache Spark 2.1 e successive. Il connettore consente il trasferimento di dati su larga scala tra Managed Service per Apache Spark e BigQuery.

L'esempio seguente mostra come leggere una tabella BigQuery in un DataFrame Spark e quindi eseguire una query.

# Set up the BigQuery connector.
spark.conf.set("spark.sql.sources.provider", "bigquery")
spark.conf.set("viewsEnabled", "true")
spark.conf.set("materializationDataset", "YOUR_BQ_DATASET")

# Read a BigQuery table into a Spark DataFrame,
df = spark.read.format("bigquery") \
    .option("table", "bigquery-public-data.samples.shakespeare") \
    .load()

df.createOrReplaceTempView("shakespeare")

# Perform a query on the data,
word_counts = spark.sql("SELECT word, SUM(word_count) as count FROM shakespeare GROUP BY word ORDER BY count DESC")
word_counts.show()

Connessione a database esterni tramite JDBC

Puoi connettere Managed Service per Apache Spark a un database che fornisce un driver JDBC (Java Database Connectivity), come PostgreSQL o MySQL, utilizzando l'origine dati JDBC integrata di Spark.

Rendere disponibile il driver JDBC

Per connetterti a un database con JDBC, rendi disponibile il file JAR del driver nel classpath di Spark.

Per job

Archivia il file JAR del driver in un bucket Cloud Storage e fai riferimento a questo file con il flag --jars quando invii un job. Spark distribuisce il file JAR ai nodi necessari per il job. Questo è l'approccio consigliato per le dipendenze specifiche del job.

A livello di cluster

Se tutti i job di un cluster richiedono un driver specifico, utilizza un'azione di inizializzazione quando crei il cluster per copiare il file JAR del driver da Cloud Storage nella directory dei file JAR di Spark su ogni nodo.

  1. Crea uno script di azione di inizializzazione.

    #!/bin/bash
    set -e -x
    gsutil cp gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jar /usr/lib/spark/jars/
    
  2. Crea il cluster e fai riferimento allo script.

    gcloud dataproc clusters create my-jdbc-cluster \
        --region=REGION \
        --image-version=2.2-debian11 \
        --initialization-actions=gs://YOUR_BUCKET_NAME/init-actions/install-jdbc-driver.sh
    

Lettura da un database PostgreSQL

Questo esempio utilizza il metodo per job per connettersi a un database PostgreSQL.

  1. Scarica il file JAR del driver JDBC PostgreSQL e caricalo in un bucket Cloud Storage.

  2. Invia il job PySpark e fai riferimento al percorso di Cloud Storage del driver con il flag --jars.

    gcloud dataproc jobs submit pyspark my_job.py \
        --cluster=MY_CLUSTER \
        --region=REGION \
        --jars=gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jar
    
  3. Utilizza il seguente codice nel file job.py per leggere e scrivere nel database.

    from pyspark.sql import SparkSession
    
    def main():
        # WARNING: Do not hardcode credentials. Use a service like Secret Manager
        # to handle sensitive credentials.
        db_properties = {
            "user": "USERNAME",
            "password": "PASSWORD",
            "driver": "org.postgresql.Driver"
        }
    
        jdbc_url = "jdbc:postgresql://DB_HOST:DB_PORT/DATABASE"
    
        spark = SparkSession.builder.appName("Postgres JDBC Example").getOrCreate()
    
        # Read data from a PostgreSQL table
        df = spark.read.jdbc(url=jdbc_url, table="public.my_table", properties=db_properties)
    
        df.printSchema()
        df.show()
    
        # Write data to a new PostgreSQL table
        (df.write
            .jdbc(url=jdbc_url,
                  table="public.new_table",
                  mode="overwrite",
                  properties=db_properties))
    
    if __name__ == "__main__":
        main()
    

Connessione a database esterni tramite ODBC

Per connetterti alle origini dati in cui non è disponibile un driver JDBC, utilizza un driver ODBC (Open Database Connectivity). Utilizza un'azione di inizializzazione per installare il driver e le relative dipendenze su ogni nodo quando crei il cluster.

Questo esempio mostra come connettersi a un'istanza di Microsoft SQL Server.

  1. Crea lo script di azione di inizializzazione.

    1. Crea uno script che installa il driver ODBC di Microsoft SQL Server per Debian.
        #!/bin/bash
        # Initialization action for installing MS SQL ODBC driver on the cluster.
    
        set -e -x
    
        # Install dependencies for the driver.
        apt-get update
        apt-get install -y --no-install-recommends curl gnupg unixodbc-dev
    
        # Add Microsoft's official repository.
        curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
        curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list
    
        # Install the driver.
        apt-get update
        ACCEPT_EULA=Y apt-get install -y msodbcsql18
    
        # Install the pyodbc library for Python.
        pip install pyodbc
        ```
    
    1.  Upload the script to a Cloud Storage bucket.
    
    ```shell
        gsutil cp install-odbc-mssql.sh gs://YOUR_BUCKET_NAME/init-actions/
        ```
    
  2. Crea un cluster Managed Service per Apache Spark che esegue l'azione di inizializzazione.

    Quando crei il cluster, fai riferimento allo script di azione di inizializzazione in Cloud Storage.

    gcloud dataproc clusters create my-odbc-cluster \
        --region=REGION \
        --image-version=2.2-debian11 \
        --initialization-actions=gs://BUCKET_NAME/init-actions/install-odbc-mssql.sh
    
  3. Connessione ed esecuzione di query tramite PySpark

    Una volta che il cluster è in esecuzione, il driver ODBC e la libreria pyodbc sono disponibili. Il seguente codice utilizza pyodbc sul nodo del driver per recuperare i dati in un DataFrame Pandas e quindi li converte in un DataFrame Spark per l'elaborazione distribuita.

    import pyodbc
    import pandas as pd
    from pyspark.sql import SparkSession
    
    def get_sql_data(connection_string, query):
        """
        Connects to the database using pyodbc, executes a query,
        and returns the result as a Pandas DataFrame.
        """
        cnxn = pyodbc.connect(connection_string)
        pdf = pd.read_sql(query, cnxn)
        cnxn.close()
        return pdf
    
    def main():
        spark = SparkSession.builder.appName("ODBC Example").getOrCreate()
    
        # WARNING: Do not hardcode credentials. Use Secret Manager.
        server = 'SERVER.database.windows.net'
        database = 'DATABASE'
        username = 'USERNAME'
        password = 'PASSWORD'
        driver = '{ODBC Driver 18 for SQL Server}' # The driver name must match the installed driver.
    
        # Create the connection string.
        connection_string = f'DRIVER={driver};SERVER=tcp:{server},1433;DATABASE={database};UID={username};PWD={password};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;'
    
        sql_query = "SELECT * FROM Sales.Customer"
    
        # Fetch data as a Pandas DataFrame.
        pandas_df = get_sql_data(connection_string, sql_query)
    
        # Convert the Pandas DataFrame to a Spark DataFrame for distributed processing.
        spark_df = spark.createDataFrame(pandas_df)
    
        print("Successfully read data from SQL Server:")
        spark_df.printSchema()
        spark_df.show(5)
    
        # You can now perform distributed operations on the Spark DataFrame.
        print(f"Total number of customers: {spark_df.count()}")
    
    if __name__ == "__main__":
        main()
    

Best practice

Questa sezione descrive le best practice per la gestione delle credenziali e delle dipendenze.

Gestione delle credenziali

Non codificare le password o altri secret nel codice. Utilizza Secret Manager per archiviare le credenziali in modo sicuro e accedervi dai job Managed Service per Apache Spark.

Gestione delle dipendenze

Utilizza le azioni di inizializzazione per installare driver e librerie quando crei un cluster. Per ambienti complessi, valuta la possibilità di creare un'immagine Managed Service per Apache Spark personalizzata con tutte le dipendenze preinstallate. Un'immagine personalizzata riduce il tempo di avvio del cluster e garantisce la coerenza.

Copia gli script di azione di inizializzazione da origini pubbliche nel tuo bucket Cloud Storage con controllo delle versioni prima di utilizzarli in produzione. Non fare riferimento direttamente agli script pubblici, perché possono cambiare senza preavviso.

Passaggi successivi