Mit Datenquellen verbinden

Managed Service for Apache Spark lässt sich in Datenbanken, Data Warehouses und Streamingdienste einbinden, um Daten aus mehreren Systemen zu verarbeiten. In diesem Dokument wird beschrieben, wie Sie Managed Service for Apache Spark-Cluster mit Datenquellen innerhalb und außerhalb von Google Cloudverbinden.

Hinweis

  1. Melden Sie sich in Ihrem Google Cloud -Konto an. Wenn Sie mit Google Cloudnoch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that you have the permissions required to complete this guide.

  4. Verify that billing is enabled for your Google Cloud project.

  5. Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  7. Verify that you have the permissions required to complete this guide.

  8. Verify that billing is enabled for your Google Cloud project.

  9. Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  10. Managed Service for Apache Spark-Cluster erstellen
  11. Cloud Storage-Bucket erstellen
  12. Sie müssen die erforderlichen Berechtigungen für den Zugriff auf Datenquellen haben.

Erforderliche Rollen

Für die Ausführung der Beispiele auf dieser Seite sind bestimmte IAM-Rollen erforderlich. Abhängig von den Organisationsrichtlinien wurden diese Rollen möglicherweise bereits gewährt. Informationen zum Prüfen von Rollenzuweisungen finden Sie unter Müssen Sie Rollen zuweisen?.

Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Nutzerrollen

Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Erstellen eines Managed Service for Apache Spark-Clusters benötigen:

Dienstkontorolle

Bitten Sie Ihren Administrator, dem Compute Engine-Standarddienstkonto die IAM-Rolle Dataproc-Worker (roles/dataproc.worker) für das Projekt zuzuweisen, damit das Compute Engine-Standarddienstkonto die erforderlichen Berechtigungen zum Erstellen eines Managed Service for Apache Spark-Clusters hat.

Verbindung zu Google Cloud -Diensten herstellen

In diesem Abschnitt wird beschrieben, wie Sie Managed Service for Apache Spark mit Google Cloudund externen Datenquellen verbinden.

Mit Cloud Storage verbinden

Managed Service for Apache Spark verwendet Cloud Storage als standardmäßiges verteiltes Dateisystem. Wenn Sie Daten aus Cloud Storage-Buckets lesen und in Cloud Storage-Buckets schreiben möchten, verwenden Sie das URI-Präfix gs://. Es ist keine zusätzliche Konfiguration erforderlich.

Im folgenden Beispiel wird gezeigt, wie eine CSV-Datei aus einem Cloud Storage-Bucket gelesen und dann eine Parquet-Datei in den Bucket geschrieben wird.

# Read a CSV file from a Cloud Storage bucket into a Spark DataFrame
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("gs://BUCKET_NAME/path/to/data.csv")

df.show()

# Write a Spark DataFrame to a Cloud Storage bucket
df.write.format("parquet") \
    .mode("overwrite") \
    .save("gs://BUCKET_NAME/path/to/output.parquet")

Mit BigQuery verbinden

Der BigQuery-Connector ist in Managed Service for Apache Spark-Image-Versionen 2.1 und höher vorinstalliert. Der Connector ermöglicht die Übertragung großer Datenmengen zwischen Managed Service for Apache Spark und BigQuery.

Im folgenden Beispiel wird gezeigt, wie eine BigQuery-Tabelle in einen Spark-DataFrame eingelesen und dann eine Abfrage ausgeführt wird.

# Set up the BigQuery connector.
spark.conf.set("spark.sql.sources.provider", "bigquery")
spark.conf.set("viewsEnabled", "true")
spark.conf.set("materializationDataset", "YOUR_BQ_DATASET")

# Read a BigQuery table into a Spark DataFrame,
df = spark.read.format("bigquery") \
    .option("table", "bigquery-public-data.samples.shakespeare") \
    .load()

df.createOrReplaceTempView("shakespeare")

# Perform a query on the data,
word_counts = spark.sql("SELECT word, SUM(word_count) as count FROM shakespeare GROUP BY word ORDER BY count DESC")
word_counts.show()

Mit JDBC eine Verbindung zu externen Datenbanken herstellen

Sie können Managed Service for Apache Spark mit einer Datenbank verbinden, die einen JDBC-Treiber (Java Database Connectivity) bereitstellt, z. B. PostgreSQL oder MySQL. Verwenden Sie dazu die integrierte JDBC-Datenquelle von Spark.

JDBC-Treiber verfügbar machen

Wenn Sie eine Verbindung zu einer Datenbank mit JDBC herstellen möchten, müssen Sie die JAR-Datei des Treibers im Spark-Klassenpfad verfügbar machen.

Pro Job

Speichern Sie die Treiber-JAR-Datei in einem Cloud Storage-Bucket und verweisen Sie mit dem Flag --jars darauf, wenn Sie einen Job senden. Spark verteilt die JAR-Datei auf die für den Job erforderlichen Knoten. Dies ist der empfohlene Ansatz für jobspezifische Abhängigkeiten.

Clusterweit

Wenn für alle Jobs in einem Cluster ein bestimmter Treiber erforderlich ist, verwenden Sie beim Erstellen des Clusters eine Initialisierungsaktion, um die Treiber-JAR-Datei aus Cloud Storage in das Spark-JAR-Verzeichnis auf jedem Knoten zu kopieren.

  1. Erstellen Sie ein Initialisierungsskript.

    #!/bin/bash
    set -e -x
    gsutil cp gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jar /usr/lib/spark/jars/
    
  2. Erstellen Sie den Cluster und verweisen Sie auf das Skript.

    gcloud dataproc clusters create my-jdbc-cluster \
        --region=REGION \
        --image-version=2.2-debian11 \
        --initialization-actions=gs://YOUR_BUCKET_NAME/init-actions/install-jdbc-driver.sh
    

Aus einer PostgreSQL-Datenbank lesen

In diesem Beispiel wird die Methode „pro Job“ verwendet, um eine Verbindung zu einer PostgreSQL-Datenbank herzustellen.

  1. Laden Sie die JAR-Datei des PostgreSQL-JDBC-Treibers herunter und in einen Cloud Storage-Bucket hoch.

  2. Senden Sie Ihren PySpark-Job und verweisen Sie mit dem Flag --jars auf den Cloud Storage-Pfad des Treibers.

    gcloud dataproc jobs submit pyspark my_job.py \
        --cluster=MY_CLUSTER \
        --region=REGION \
        --jars=gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jar
    
  3. Verwenden Sie den folgenden Code in Ihrer job.py-Datei, um Daten aus der Datenbank zu lesen und in die Datenbank zu schreiben.

    from pyspark.sql import SparkSession
    
    def main():
        # WARNING: Do not hardcode credentials. Use a service like Secret Manager
        # to handle sensitive credentials.
        db_properties = {
            "user": "USERNAME",
            "password": "PASSWORD",
            "driver": "org.postgresql.Driver"
        }
    
        jdbc_url = "jdbc:postgresql://DB_HOST:DB_PORT/DATABASE"
    
        spark = SparkSession.builder.appName("Postgres JDBC Example").getOrCreate()
    
        # Read data from a PostgreSQL table
        df = spark.read.jdbc(url=jdbc_url, table="public.my_table", properties=db_properties)
    
        df.printSchema()
        df.show()
    
        # Write data to a new PostgreSQL table
        (df.write
            .jdbc(url=jdbc_url,
                  table="public.new_table",
                  mode="overwrite",
                  properties=db_properties))
    
    if __name__ == "__main__":
        main()
    

Über ODBC eine Verbindung zu externen Datenbanken herstellen

Wenn Sie eine Verbindung zu Datenquellen herstellen möchten, für die kein JDBC-Treiber verfügbar ist, verwenden Sie einen ODBC-Treiber (Open Database Connectivity). Verwenden Sie eine Initialisierungsaktion, um den Treiber und seine Abhängigkeiten beim Erstellen des Clusters auf jedem Knoten zu installieren.

In diesem Beispiel wird gezeigt, wie Sie eine Verbindung zu einer Microsoft SQL Server-Instanz herstellen.

  1. Erstellen Sie das Initialisierungsaktionsskript.

    1. Erstellen Sie ein Skript, mit dem der Microsoft SQL Server ODBC-Treiber für Debian installiert wird.
        #!/bin/bash
        # Initialization action for installing MS SQL ODBC driver on the cluster.
    
        set -e -x
    
        # Install dependencies for the driver.
        apt-get update
        apt-get install -y --no-install-recommends curl gnupg unixodbc-dev
    
        # Add Microsoft's official repository.
        curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
        curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list
    
        # Install the driver.
        apt-get update
        ACCEPT_EULA=Y apt-get install -y msodbcsql18
    
        # Install the pyodbc library for Python.
        pip install pyodbc
        ```
    
    1.  Upload the script to a Cloud Storage bucket.
    
    ```shell
        gsutil cp install-odbc-mssql.sh gs://YOUR_BUCKET_NAME/init-actions/
        ```
    
  2. Erstellen Sie einen Managed Service for Apache Spark-Cluster, in dem die Initialisierungsaktion ausgeführt wird.

    Wenn Sie den Cluster erstellen, verweisen Sie auf das Initialisierungsaktionsskript in Cloud Storage.

    gcloud dataproc clusters create my-odbc-cluster \
        --region=REGION \
        --image-version=2.2-debian11 \
        --initialization-actions=gs://BUCKET_NAME/init-actions/install-odbc-mssql.sh
    
  3. Mit PySpark verbinden und Abfragen ausführen

    Nachdem der Cluster ausgeführt wird, sind der ODBC-Treiber und die pyodbc-Bibliothek verfügbar. Im folgenden Code wird pyodbc auf dem Driver-Knoten verwendet, um Daten in einen Pandas DataFrame abzurufen. Anschließend werden die Daten für die verteilte Verarbeitung in einen Spark DataFrame konvertiert.

    import pyodbc
    import pandas as pd
    from pyspark.sql import SparkSession
    
    def get_sql_data(connection_string, query):
        """
        Connects to the database using pyodbc, executes a query,
        and returns the result as a Pandas DataFrame.
        """
        cnxn = pyodbc.connect(connection_string)
        pdf = pd.read_sql(query, cnxn)
        cnxn.close()
        return pdf
    
    def main():
        spark = SparkSession.builder.appName("ODBC Example").getOrCreate()
    
        # WARNING: Do not hardcode credentials. Use Secret Manager.
        server = 'SERVER.database.windows.net'
        database = 'DATABASE'
        username = 'USERNAME'
        password = 'PASSWORD'
        driver = '{ODBC Driver 18 for SQL Server}' # The driver name must match the installed driver.
    
        # Create the connection string.
        connection_string = f'DRIVER={driver};SERVER=tcp:{server},1433;DATABASE={database};UID={username};PWD={password};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;'
    
        sql_query = "SELECT * FROM Sales.Customer"
    
        # Fetch data as a Pandas DataFrame.
        pandas_df = get_sql_data(connection_string, sql_query)
    
        # Convert the Pandas DataFrame to a Spark DataFrame for distributed processing.
        spark_df = spark.createDataFrame(pandas_df)
    
        print("Successfully read data from SQL Server:")
        spark_df.printSchema()
        spark_df.show(5)
    
        # You can now perform distributed operations on the Spark DataFrame.
        print(f"Total number of customers: {spark_df.count()}")
    
    if __name__ == "__main__":
        main()
    

Best Practices

In diesem Abschnitt werden Best Practices für die Verwaltung von Anmeldedaten und Abhängigkeiten beschrieben.

Anmeldedatenverwaltung

Hartcodieren Sie keine Passwörter oder andere Secrets in Ihrem Code. Verwenden Sie Secret Manager, um Anmeldedaten sicher zu speichern und über Ihre Managed Service for Apache Spark-Jobs darauf zuzugreifen.

Abhängigkeitsverwaltung

Verwenden Sie Initialisierungsaktionen, um beim Erstellen eines Clusters Treiber und Bibliotheken zu installieren. In komplexen Umgebungen sollten Sie ein benutzerdefiniertes Managed Service for Apache Spark-Image erstellen, in dem alle Abhängigkeiten vorinstalliert sind. Ein benutzerdefiniertes Image verkürzt die Startzeit des Clusters und sorgt für Konsistenz.

Kopieren Sie Initialisierungsaktionsskripts aus öffentlichen Quellen in Ihren eigenen versionierten Cloud Storage-Bucket, bevor Sie sie in der Produktion verwenden. Verweisen Sie nicht direkt auf öffentliche Skripts, da diese ohne Vorankündigung geändert werden können.

Nächste Schritte