連結至資料來源

Managed Service for Apache Spark 可與資料庫、資料倉儲和串流服務整合,處理多個系統的資料。本文說明如何將 Managed Service for Apache Spark 叢集連線至 Google Cloud內外的資料來源。

事前準備

  1. 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that you have the permissions required to complete this guide.

  4. Verify that billing is enabled for your Google Cloud project.

  5. Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  7. Verify that you have the permissions required to complete this guide.

  8. Verify that billing is enabled for your Google Cloud project.

  9. Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  10. 建立 Managed Service for Apache Spark 叢集
  11. 建立 Cloud Storage 值區
  12. 確認您具備存取資料來源的必要權限。

必要的角色

如要執行本頁的範例,您必須具備特定 IAM 角色。視組織政策而定,系統可能已授予這些角色。如要檢查角色授予情形,請參閱「是否需要授予角色?」一節。

如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

使用者角色

如要取得建立 Managed Service for Apache Spark 叢集所需的權限,請要求管理員授予您下列 IAM 角色:

  • 專案的 Dataproc 編輯者 (roles/dataproc.editor)
  • Compute Engine 預設服務帳戶的「服務帳戶使用者」 (roles/iam.serviceAccountUser)

服務帳戶角色

為確保 Compute Engine 預設服務帳戶具備建立 Managed Service for Apache Spark 叢集的必要權限,請要求管理員在專案中,將 Dataproc Worker (roles/dataproc.worker) IAM 角色授予 Compute Engine 預設服務帳戶。

連結 Google Cloud 服務

本節說明如何將 Managed Service for Apache Spark 連線至 Google Cloud和外部資料來源。

連線至 Cloud Storage

Managed Service for Apache Spark 使用 Cloud Storage 做為預設分散式檔案系統。如要從 Cloud Storage bucket 讀取及寫入資料,請使用 gs:// URI 前置字元。您不需要採取進一步的設定。

下列範例說明如何從 Cloud Storage bucket 讀取 CSV 檔案,然後將 Parquet 檔案寫入 bucket。

# Read a CSV file from a Cloud Storage bucket into a Spark DataFrame
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("gs://BUCKET_NAME/path/to/data.csv")

df.show()

# Write a Spark DataFrame to a Cloud Storage bucket
df.write.format("parquet") \
    .mode("overwrite") \
    .save("gs://BUCKET_NAME/path/to/output.parquet")

連結至 BigQuery

在 Managed Service for Apache Spark 2.1 以上映像檔版本中,系統會預先安裝 BigQuery 連接器。這個連接器可在 Managed Service for Apache Spark 和 BigQuery 之間大規模轉移資料。

以下範例說明如何將 BigQuery 資料表讀取到 Spark DataFrame,然後執行查詢。

# Set up the BigQuery connector.
spark.conf.set("spark.sql.sources.provider", "bigquery")
spark.conf.set("viewsEnabled", "true")
spark.conf.set("materializationDataset", "YOUR_BQ_DATASET")

# Read a BigQuery table into a Spark DataFrame,
df = spark.read.format("bigquery") \
    .option("table", "bigquery-public-data.samples.shakespeare") \
    .load()

df.createOrReplaceTempView("shakespeare")

# Perform a query on the data,
word_counts = spark.sql("SELECT word, SUM(word_count) as count FROM shakespeare GROUP BY word ORDER BY count DESC")
word_counts.show()

使用 JDBC 連線至外部資料庫

您可以使用 Spark 內建的 JDBC 資料來源,將 Managed Service for Apache Spark 連線至提供 Java Database Connectivity (JDBC) 驅動程式的資料庫,例如 PostgreSQL 或 MySQL。

提供 JDBC 驅動程式

如要透過 JDBC 連線至資料庫,請在 Spark 類路徑中提供驅動程式的 JAR 檔案。

每項工作

將驅動程式 JAR 儲存在 Cloud Storage bucket 中,並在提交工作時使用 --jars 旗標參照該檔案。Spark 會將 JAR 檔案分配給工作所需的節點。建議您使用這種方法處理工作專屬的依附元件。

整個叢集

如果叢集上的所有工作都需要特定驅動程式,請在建立叢集時使用初始化動作,將驅動程式 JAR 從 Cloud Storage 複製到每個節點的 Spark JAR 目錄。

  1. 建立初始化動作指令碼。

    #!/bin/bash
    set -e -x
    gsutil cp gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jar /usr/lib/spark/jars/
    
  2. 建立叢集並參照指令碼。

    gcloud dataproc clusters create my-jdbc-cluster \
        --region=REGION \
        --image-version=2.2-debian11 \
        --initialization-actions=gs://YOUR_BUCKET_NAME/init-actions/install-jdbc-driver.sh
    

從 PostgreSQL 資料庫讀取資料

這個範例使用每個工作的方法連線至 PostgreSQL 資料庫。

  1. 下載 PostgreSQL JDBC 驅動程式 JAR,然後上傳至 Cloud Storage 值區。

  2. 提交 PySpark 工作,並使用 --jars 旗標參照驅動程式的 Cloud Storage 路徑。

    gcloud dataproc jobs submit pyspark my_job.py \
        --cluster=MY_CLUSTER \
        --region=REGION \
        --jars=gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jar
    
  3. job.py 檔案中使用下列程式碼,即可讀取及寫入資料庫。

    from pyspark.sql import SparkSession
    
    def main():
        # WARNING: Do not hardcode credentials. Use a service like Secret Manager
        # to handle sensitive credentials.
        db_properties = {
            "user": "USERNAME",
            "password": "PASSWORD",
            "driver": "org.postgresql.Driver"
        }
    
        jdbc_url = "jdbc:postgresql://DB_HOST:DB_PORT/DATABASE"
    
        spark = SparkSession.builder.appName("Postgres JDBC Example").getOrCreate()
    
        # Read data from a PostgreSQL table
        df = spark.read.jdbc(url=jdbc_url, table="public.my_table", properties=db_properties)
    
        df.printSchema()
        df.show()
    
        # Write data to a new PostgreSQL table
        (df.write
            .jdbc(url=jdbc_url,
                  table="public.new_table",
                  mode="overwrite",
                  properties=db_properties))
    
    if __name__ == "__main__":
        main()
    

使用 ODBC 連線至外部資料庫

如要連線至沒有 JDBC 驅動程式的資料來源,請使用 Open Database Connectivity (ODBC) 驅動程式。建立叢集時,使用初始化動作在每個節點上安裝驅動程式及其依附元件。

這個範例說明如何連線至 Microsoft SQL Server 執行個體。

  1. 建立初始化動作指令碼。

    1. 建立指令碼,安裝適用於 Debian 的 Microsoft SQL Server ODBC 驅動程式。
        #!/bin/bash
        # Initialization action for installing MS SQL ODBC driver on the cluster.
    
        set -e -x
    
        # Install dependencies for the driver.
        apt-get update
        apt-get install -y --no-install-recommends curl gnupg unixodbc-dev
    
        # Add Microsoft's official repository.
        curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
        curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list
    
        # Install the driver.
        apt-get update
        ACCEPT_EULA=Y apt-get install -y msodbcsql18
    
        # Install the pyodbc library for Python.
        pip install pyodbc
        ```
    
    1.  Upload the script to a Cloud Storage bucket.
    
    ```shell
        gsutil cp install-odbc-mssql.sh gs://YOUR_BUCKET_NAME/init-actions/
        ```
    
  2. 建立 Managed Service for Apache Spark 叢集,執行初始化動作。

    建立叢集時,請指向 Cloud Storage 中的初始化動作指令碼。

    gcloud dataproc clusters create my-odbc-cluster \
        --region=REGION \
        --image-version=2.2-debian11 \
        --initialization-actions=gs://BUCKET_NAME/init-actions/install-odbc-mssql.sh
    
  3. 使用 PySpark 連線及查詢

    叢集執行後,即可使用 ODBC 驅動程式和 pyodbc 程式庫。下列程式碼會在驅動程式節點上使用 pyodbc,將資料擷取到 Pandas DataFrame 中,然後轉換為 Spark DataFrame 以進行分散式處理。

    import pyodbc
    import pandas as pd
    from pyspark.sql import SparkSession
    
    def get_sql_data(connection_string, query):
        """
        Connects to the database using pyodbc, executes a query,
        and returns the result as a Pandas DataFrame.
        """
        cnxn = pyodbc.connect(connection_string)
        pdf = pd.read_sql(query, cnxn)
        cnxn.close()
        return pdf
    
    def main():
        spark = SparkSession.builder.appName("ODBC Example").getOrCreate()
    
        # WARNING: Do not hardcode credentials. Use Secret Manager.
        server = 'SERVER.database.windows.net'
        database = 'DATABASE'
        username = 'USERNAME'
        password = 'PASSWORD'
        driver = '{ODBC Driver 18 for SQL Server}' # The driver name must match the installed driver.
    
        # Create the connection string.
        connection_string = f'DRIVER={driver};SERVER=tcp:{server},1433;DATABASE={database};UID={username};PWD={password};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;'
    
        sql_query = "SELECT * FROM Sales.Customer"
    
        # Fetch data as a Pandas DataFrame.
        pandas_df = get_sql_data(connection_string, sql_query)
    
        # Convert the Pandas DataFrame to a Spark DataFrame for distributed processing.
        spark_df = spark.createDataFrame(pandas_df)
    
        print("Successfully read data from SQL Server:")
        spark_df.printSchema()
        spark_df.show(5)
    
        # You can now perform distributed operations on the Spark DataFrame.
        print(f"Total number of customers: {spark_df.count()}")
    
    if __name__ == "__main__":
        main()
    

最佳做法

本節說明管理憑證和依附元件的最佳做法。

憑證管理

請勿在程式碼中以硬式編碼方式加入密碼或其他私密資訊。使用 Secret Manager 安全地儲存憑證,並從 Managed Service for Apache Spark 工作存取憑證。

依附元件管理

建立叢集時,可以使用初始化動作安裝驅動程式和程式庫。如果是複雜環境,建議您建立自訂的 Managed Service for Apache Spark 映像檔,並預先安裝所有依附元件。自訂映像檔可縮短叢集啟動時間,並確保一致性。

請先將初始化動作指令碼從公開來源複製到您自己的已設定版本 Cloud Storage bucket,再用於正式環境。請勿直接參照公開指令碼,因為這類指令碼可能會在未經通知的情況下變更。

後續步驟