Menghubungkan ke sumber data

Managed Service untuk Apache Spark terintegrasi dengan database, data warehouse, dan layanan streaming untuk memproses data dari beberapa sistem. Dokumen ini menunjukkan cara menghubungkan cluster Managed Service untuk Apache Spark ke sumber data di dalam dan di luar Google Cloud.

Sebelum memulai

  1. Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that you have the permissions required to complete this guide.

  4. Verify that billing is enabled for your Google Cloud project.

  5. Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  7. Verify that you have the permissions required to complete this guide.

  8. Verify that billing is enabled for your Google Cloud project.

  9. Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  10. Buat cluster Managed Service for Apache Spark.
  11. Buat bucket Cloud Storage.
  12. Pastikan Anda memiliki izin yang diperlukan untuk mengakses sumber data.

Peran yang diperlukan

Peran IAM tertentu diperlukan untuk menjalankan contoh di halaman ini. Bergantung pada kebijakan organisasi, peran ini mungkin sudah diberikan. Untuk memeriksa pemberian peran, lihat Apakah Anda perlu memberikan peran?.

Untuk mengetahui informasi selengkapnya tentang pemberian peran, lihat Mengelola akses ke project,folder, dan organisasi.

Peran pengguna

Untuk mendapatkan izin yang Anda perlukan untuk membuat cluster Managed Service for Apache Spark, minta administrator untuk memberi Anda peran IAM berikut:

Peran akun layanan

Untuk memastikan bahwa akun layanan default Compute Engine memiliki izin yang diperlukan untuk membuat cluster Managed Service for Apache Spark, minta administrator Anda untuk memberikan peran IAM Dataproc Worker (roles/dataproc.worker) kepada akun layanan default Compute Engine di project.

Menghubungkan ke Google Cloud layanan

Bagian ini menunjukkan cara menghubungkan Managed Service untuk Apache Spark ke Google Cloud dan sumber data eksternal.

Menghubungkan ke Cloud Storage

Managed Service for Apache Spark menggunakan Cloud Storage sebagai sistem file terdistribusi default-nya. Untuk membaca dan menulis data dari dan ke bucket Cloud Storage, gunakan awalan URI gs://. Tidak diperlukan konfigurasi tambahan.

Contoh berikut menunjukkan cara membaca file CSV dari bucket Cloud Storage, lalu menulis file Parquet ke bucket.

# Read a CSV file from a Cloud Storage bucket into a Spark DataFrame
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("gs://BUCKET_NAME/path/to/data.csv")

df.show()

# Write a Spark DataFrame to a Cloud Storage bucket
df.write.format("parquet") \
    .mode("overwrite") \
    .save("gs://BUCKET_NAME/path/to/output.parquet")

Menghubungkan ke BigQuery

Konektor BigQuery telah diinstal sebelumnya di Managed Service for Apache Spark versi image 2.1 dan yang lebih baru. Konektor ini memungkinkan transfer data skala besar antara Managed Service for Apache Spark dan BigQuery.

Contoh berikut menunjukkan cara membaca tabel BigQuery ke dalam DataFrame Spark, lalu menjalankan kueri.

# Set up the BigQuery connector.
spark.conf.set("spark.sql.sources.provider", "bigquery")
spark.conf.set("viewsEnabled", "true")
spark.conf.set("materializationDataset", "YOUR_BQ_DATASET")

# Read a BigQuery table into a Spark DataFrame,
df = spark.read.format("bigquery") \
    .option("table", "bigquery-public-data.samples.shakespeare") \
    .load()

df.createOrReplaceTempView("shakespeare")

# Perform a query on the data,
word_counts = spark.sql("SELECT word, SUM(word_count) as count FROM shakespeare GROUP BY word ORDER BY count DESC")
word_counts.show()

Menghubungkan ke database eksternal menggunakan JDBC

Anda dapat menghubungkan Managed Service for Apache Spark ke database yang menyediakan driver Java Database Connectivity (JDBC), seperti PostgreSQL atau MySQL, menggunakan sumber data JDBC bawaan Spark.

Menyediakan driver JDBC

Untuk terhubung ke database dengan JDBC, sediakan file JAR driver di classpath Spark.

Per tugas

Simpan JAR driver di bucket Cloud Storage, dan rujuk dengan tanda --jars saat Anda mengirimkan tugas. Spark mendistribusikan file JAR ke node yang diperlukan untuk tugas. Ini adalah pendekatan yang direkomendasikan untuk dependensi khusus tugas.

Di seluruh cluster

Jika semua tugas di cluster memerlukan driver tertentu, gunakan tindakan inisialisasi saat Anda membuat cluster untuk menyalin JAR driver dari Cloud Storage ke direktori JAR Spark di setiap node.

  1. Buat skrip tindakan inisialisasi.

    #!/bin/bash
    set -e -x
    gsutil cp gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jar /usr/lib/spark/jars/
    
  2. Buat cluster dan rujuk skrip.

    gcloud dataproc clusters create my-jdbc-cluster \
        --region=REGION \
        --image-version=2.2-debian11 \
        --initialization-actions=gs://YOUR_BUCKET_NAME/init-actions/install-jdbc-driver.sh
    

Membaca dari database PostgreSQL

Contoh ini menggunakan metode per tugas untuk terhubung ke database PostgreSQL.

  1. Download JAR driver JDBC PostgreSQL, lalu upload ke bucket Cloud Storage.

  2. Kirimkan tugas PySpark Anda dan referensikan jalur Cloud Storage driver dengan flag --jars.

    gcloud dataproc jobs submit pyspark my_job.py \
        --cluster=MY_CLUSTER \
        --region=REGION \
        --jars=gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jar
    
  3. Gunakan kode berikut dalam file job.py Anda untuk membaca dan menulis ke database.

    from pyspark.sql import SparkSession
    
    def main():
        # WARNING: Do not hardcode credentials. Use a service like Secret Manager
        # to handle sensitive credentials.
        db_properties = {
            "user": "USERNAME",
            "password": "PASSWORD",
            "driver": "org.postgresql.Driver"
        }
    
        jdbc_url = "jdbc:postgresql://DB_HOST:DB_PORT/DATABASE"
    
        spark = SparkSession.builder.appName("Postgres JDBC Example").getOrCreate()
    
        # Read data from a PostgreSQL table
        df = spark.read.jdbc(url=jdbc_url, table="public.my_table", properties=db_properties)
    
        df.printSchema()
        df.show()
    
        # Write data to a new PostgreSQL table
        (df.write
            .jdbc(url=jdbc_url,
                  table="public.new_table",
                  mode="overwrite",
                  properties=db_properties))
    
    if __name__ == "__main__":
        main()
    

Menghubungkan ke database eksternal menggunakan ODBC

Untuk terhubung ke sumber data yang tidak memiliki driver JDBC, gunakan driver Open Database Connectivity (ODBC). Gunakan tindakan inisialisasi untuk menginstal driver dan dependensinya di setiap node saat Anda membuat cluster.

Contoh ini menunjukkan cara terhubung ke instance Microsoft SQL Server.

  1. Buat skrip tindakan inisialisasi.

    1. Buat skrip yang menginstal driver ODBC Microsoft SQL Server untuk Debian.
        #!/bin/bash
        # Initialization action for installing MS SQL ODBC driver on the cluster.
    
        set -e -x
    
        # Install dependencies for the driver.
        apt-get update
        apt-get install -y --no-install-recommends curl gnupg unixodbc-dev
    
        # Add Microsoft's official repository.
        curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
        curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list
    
        # Install the driver.
        apt-get update
        ACCEPT_EULA=Y apt-get install -y msodbcsql18
    
        # Install the pyodbc library for Python.
        pip install pyodbc
        ```
    
    1.  Upload the script to a Cloud Storage bucket.
    
    ```shell
        gsutil cp install-odbc-mssql.sh gs://YOUR_BUCKET_NAME/init-actions/
        ```
    
  2. Buat cluster Managed Service for Apache Spark yang menjalankan tindakan inisialisasi.

    Saat membuat cluster, tunjuk skrip tindakan inisialisasi di Cloud Storage.

    gcloud dataproc clusters create my-odbc-cluster \
        --region=REGION \
        --image-version=2.2-debian11 \
        --initialization-actions=gs://BUCKET_NAME/init-actions/install-odbc-mssql.sh
    
  3. Menghubungkan dan membuat kueri menggunakan PySpark

    Setelah cluster berjalan, driver ODBC dan library pyodbc tersedia. Kode berikut menggunakan pyodbc di node driver untuk mengambil data ke dalam DataFrame Pandas, lalu mengonversinya menjadi DataFrame Spark untuk pemrosesan terdistribusi.

    import pyodbc
    import pandas as pd
    from pyspark.sql import SparkSession
    
    def get_sql_data(connection_string, query):
        """
        Connects to the database using pyodbc, executes a query,
        and returns the result as a Pandas DataFrame.
        """
        cnxn = pyodbc.connect(connection_string)
        pdf = pd.read_sql(query, cnxn)
        cnxn.close()
        return pdf
    
    def main():
        spark = SparkSession.builder.appName("ODBC Example").getOrCreate()
    
        # WARNING: Do not hardcode credentials. Use Secret Manager.
        server = 'SERVER.database.windows.net'
        database = 'DATABASE'
        username = 'USERNAME'
        password = 'PASSWORD'
        driver = '{ODBC Driver 18 for SQL Server}' # The driver name must match the installed driver.
    
        # Create the connection string.
        connection_string = f'DRIVER={driver};SERVER=tcp:{server},1433;DATABASE={database};UID={username};PWD={password};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;'
    
        sql_query = "SELECT * FROM Sales.Customer"
    
        # Fetch data as a Pandas DataFrame.
        pandas_df = get_sql_data(connection_string, sql_query)
    
        # Convert the Pandas DataFrame to a Spark DataFrame for distributed processing.
        spark_df = spark.createDataFrame(pandas_df)
    
        print("Successfully read data from SQL Server:")
        spark_df.printSchema()
        spark_df.show(5)
    
        # You can now perform distributed operations on the Spark DataFrame.
        print(f"Total number of customers: {spark_df.count()}")
    
    if __name__ == "__main__":
        main()
    

Praktik terbaik

Bagian ini menjelaskan praktik terbaik untuk mengelola kredensial dan dependensi.

Pengelolaan kredensial

Jangan melakukan hard code sandi atau rahasia lainnya dalam kode Anda. Gunakan Secret Manager untuk menyimpan kredensial dengan aman dan mengaksesnya dari tugas Managed Service for Apache Spark.

Pengelolaan dependensi

Gunakan tindakan inisialisasi untuk menginstal driver dan library saat Anda membuat kluster. Untuk lingkungan yang kompleks, pertimbangkan untuk membuat image Managed Service for Apache Spark kustom dengan semua dependensi yang telah diinstal sebelumnya. Image kustom mengurangi waktu mulai cluster dan memastikan konsistensi.

Salin skrip tindakan inisialisasi dari sumber publik ke bucket Cloud Storage versi Anda sendiri sebelum menggunakannya dalam produksi. Jangan merujuk skrip publik secara langsung, karena skrip tersebut dapat berubah tanpa pemberitahuan.

Langkah berikutnya