搭配 Apache Spark 使用 Cloud Storage 連接器

本教學課程說明如何執行搭配使用 Apache SparkCloud Storage 連接器的範例程式碼。

Lightning Engine 可提升與 Cloud Storage 的連線能力,進而提高原生引擎的效能。改良後的 Cloud Storage 連接器可減少中繼資料作業,進而降低費用,而最佳化的檔案輸出提交工具則可提升 Spark 工作負載的效能和可靠性。填寫 搶先體驗 表單,申請搶先體驗這項私人預先發布功能。

目標

使用 Java、Scala 或 Python 編寫簡易的字數計算 Spark 工作,然後在 Managed Service for Apache Spark 叢集上執行。

費用

在本文件中,您會使用下列 Google Cloud的計費元件:

  • Compute Engine
  • Managed Service for Apache Spark
  • Cloud Storage

如要根據預測用量估算費用,請使用 Pricing Calculator

初次使用 Google Cloud 的使用者可能符合免費試用期資格。

事前準備

請按照下列步驟,為在本教學課程中執行程式碼做好準備。

  1. 設定專案。 請視需要設定專案,並啟用 Managed Service for Apache Spark、Compute Engine 和 Cloud Storage API,然後在本機安裝 Google Cloud CLI。

    1. 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Roles required to select or create a project

      • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
      • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

      Go to project selector

    3. Verify that billing is enabled for your Google Cloud project.

    4. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

      Roles required to enable APIs

      To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

      Enable the APIs

    5. Create a service account:

      1. Ensure that you have the Create Service Accounts IAM role (roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin). Learn how to grant roles.
      2. In the Google Cloud console, go to the Create service account page.

        Go to Create service account
      3. Select your project.
      4. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

        In the Service account description field, enter a description. For example, Service account for quickstart.

      5. Click Create and continue.
      6. Grant the Project > Owner role to the service account.

        To grant the role, find the Select a role list, then select Project > Owner.

      7. Click Continue.
      8. Click Done to finish creating the service account.

        Do not close your browser window. You will use it in the next step.

    6. Create a service account key:

      1. In the Google Cloud console, click the email address for the service account that you created.
      2. Click Keys.
      3. Click Add key, and then click Create new key.
      4. Click Create. A JSON key file is downloaded to your computer.
      5. Click Close.
    7. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

    8. 安裝 Google Cloud CLI。

    9. 若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI

    10. 執行下列指令,初始化 gcloud CLI:

      gcloud init
    11. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Roles required to select or create a project

      • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
      • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

      Go to project selector

    12. Verify that billing is enabled for your Google Cloud project.

    13. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

      Roles required to enable APIs

      To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

      Enable the APIs

    14. Create a service account:

      1. Ensure that you have the Create Service Accounts IAM role (roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin). Learn how to grant roles.
      2. In the Google Cloud console, go to the Create service account page.

        Go to Create service account
      3. Select your project.
      4. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

        In the Service account description field, enter a description. For example, Service account for quickstart.

      5. Click Create and continue.
      6. Grant the Project > Owner role to the service account.

        To grant the role, find the Select a role list, then select Project > Owner.

      7. Click Continue.
      8. Click Done to finish creating the service account.

        Do not close your browser window. You will use it in the next step.

    15. Create a service account key:

      1. In the Google Cloud console, click the email address for the service account that you created.
      2. Click Keys.
      3. Click Add key, and then click Create new key.
      4. Click Create. A JSON key file is downloaded to your computer.
      5. Click Close.
    16. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

    17. 安裝 Google Cloud CLI。

    18. 若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI

    19. 執行下列指令,初始化 gcloud CLI:

      gcloud init

  2. 建立 Cloud Storage bucket。 您需要 Cloud Storage 來保存教學課程資料。如果沒有可用的 bucket,請在專案中建立新 bucket。

    1. 前往 Google Cloud 控制台的 Cloud Storage「Buckets」(值區) 頁面。

      前往「Buckets」(值區) 頁面

    2. 點選 「Create」(建立)
    3. 在「建立 bucket」頁面中,輸入 bucket 資訊。如要前往下一個步驟,請按「繼續」
      1. 在「開始使用」部分,執行下列操作:
        • 輸入符合值區命名規定的全域不重複名稱。
        • 如要新增值區標籤,請展開「標籤」部分 (),按一下 「新增標籤」,然後為標籤指定 keyvalue
      2. 在「Choose where to store your data」(選擇資料的儲存位置) 專區中,執行下列操作:
        1. 選取「位置類型」
        2. 從「位置類型」下拉式選單中,選擇要永久儲存 bucket 資料的位置。
          • 如果您選取「雙區域」位置類型,也可以使用相關核取方塊啟用強化型複製
        3. 如要設定跨值區複製,請選取「透過 Storage 移轉服務新增跨值區複製作業」,然後按照下列步驟操作:

          設定跨 bucket 複製作業

          1. 在「Bucket」選單中選取 bucket。
          2. 在「複製設定」部分,按一下「設定」,設定複製作業的設定。

            系統隨即會顯示「設定跨 bucket 複製作業」窗格。

            • 如要依物件名稱前置字串篩選要複製的物件,請輸入要納入或排除物件的前置字串,然後按一下「新增前置字串」
            • 如要為複製的物件設定儲存空間級別,請從「儲存空間級別」選單中選取儲存空間級別。如果略過這個步驟,複製的物件預設會使用目標值區的儲存空間級別。
            • 按一下 [完成]
      3. 在「選擇資料儲存方式」部分,執行下列操作:
        1. 選取 bucket 的預設儲存空間級別,或選取「Autoclass」,讓系統自動管理 bucket 資料的儲存空間級別。
        2. 如要啟用階層命名空間,請在「為資料密集型工作負載提供最理想的儲存空間」部分,選取「為這個值區啟用階層命名空間」
      4. 在「選取如何控制物件的存取權」部分,選取 bucket 是否要強制執行禁止公開存取,並為 bucket 的物件選取存取控管方法
      5. 在「選擇保護物件資料的方式」部分,執行下列操作:
        • 選取「資料保護」下方的任何選項,為 bucket 設定所需項目。
          • 如要啟用虛刪除,請按一下「虛刪除政策 (用於資料復原)」核取方塊,並指定要保留物件的天數 (刪除後)。
          • 如要設定「物件版本管理」,請按一下「物件版本管理 (用於版本管控)」核取方塊,並指定每個物件的版本數量上限,以及非現行版本失效的天數。
          • 如要為物件和 bucket 啟用資料保留政策,請勾選「保留 (符合法規)」核取方塊,然後執行下列操作:
            • 如要啟用 Object Retention Lock,請按一下「啟用物件保留功能」核取方塊。
            • 如要啟用 Bucket Lock,請勾選「Set bucket retention policy」(設定值區資料保留政策) 核取方塊,然後選擇保留期限的時間單位和長度。
        • 如要選擇物件資料的加密方式,請展開「資料加密」部分 (),然後選取「資料加密」方法
    4. 點選「建立」

  3. 設定本機環境變數。請設定本機電腦的環境變數。設定您要在本教學課程中使用的 Google Cloud 專案 ID 及 Cloud Storage bucket 名稱,也請輸入現有或新設 Managed Service for Apache Spark 叢集的名稱和區域。您可以在下一步驟建立本教學課程要用的叢集。

    PROJECT=project-id
    
    BUCKET_NAME=bucket-name
    
    CLUSTER=cluster-name
    
    REGION=cluster-region Example: "us-central1"
    

  4. 建立 Managed Service for Apache Spark 叢集。執行下列指令,在指定的 Compute Engine 區域中建立單一節點 Managed Service for Apache Spark 叢集。

    gcloud dataproc clusters create ${CLUSTER} \
        --project=${PROJECT} \
        --region=${REGION} \
        --single-node
    

  5. 將資料複製到 Cloud Storage bucket。從公開資料中擷取莎士比亞作品文字片段,並複製到 Cloud Storage bucket 的 input 資料夾:

    gcloud storage cp gs://pub/shakespeare/rose.txt \
        gs://${BUCKET_NAME}/input/rose.txt
    

  6. 設定 Java (Apache Maven)Scala (SBT)Python 開發環境。

準備 Spark 字數統計工作

選取下列分頁,按照步驟準備要提交至叢集的工作套件或檔案。您可以準備下列其中一種工作類型:

Java

  1. pom.xml 檔案複製到本機電腦。 下列 pom.xml 檔案會指定 Scala 和 Spark 程式庫依附元件,這些元件設有給定的 provided 範圍,表示 Managed Service for Apache Spark 叢集會在執行階段提供這些程式庫。pom.xml 檔案不會指定 Cloud Storage 依附元件,因為連接器會實作標準 HDFS 介面。當 Spark 工作存取 Cloud Storage 叢集檔案 (URI 開頭為 gs:// 的檔案) 時,系統會自動使用 Cloud Storage 連接器存取 Cloud Storage 中的檔案。
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
  2. 將下方列出的 WordCount.java 程式碼複製到本機電腦。
    1. 使用 src/main/java/dataproc/codelab: 路徑建立一組目錄
      mkdir -p src/main/java/dataproc/codelab
      
    2. WordCount.java 複製到本機電腦的 src/main/java/dataproc/codelab:
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java 是一項 Java Spark 工作,可從 Cloud Storage 讀取文字檔案、計算字數,然後將文字檔案統計結果寫入 Cloud Storage。

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
  3. 建構套件。
    mvn clean package
    
    如果建構成功,系統會建立 target/word-count-1.0.jar
  4. 將套件暫存至 Cloud Storage。
    gcloud storage cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. build.sbt 檔案複製到本機電腦。 下列 build.sbt 檔案會指定 Scala 和 Spark 程式庫依附元件,這些元件設有給定的 provided 範圍,表示 Managed Service for Apache Spark 叢集會在執行階段提供這些程式庫。build.sbt 檔案不會指定 Cloud Storage 依附元件,因為連接器會實作標準 HDFS 介面。當 Spark 工作存取 Cloud Storage 叢集檔案 (URI 開頭為 gs:// 的檔案) 時,系統會自動使用 Cloud Storage 連接器存取 Cloud Storage 中的檔案。
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
  2. word-count.scala 複製到本機電腦。這是一項 Java Spark 工作,可從 Cloud Storage 讀取文字檔案、計算字數,然後將文字檔案統計結果寫入 Cloud Storage。
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
  3. 建構套件。
    sbt clean package
    
    如果建構成功,系統會建立 target/scala-2.11/word-count_2.11-1.0.jar
  4. 將套件暫存至 Cloud Storage。
    gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. word-count.py 複製到本機電腦。這是一項使用 PySpark 的 Python Spark 工作,可從 Cloud Storage 讀取文字檔案、計算字數,然後將文字檔案統計結果寫入 Cloud Storage。
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])

提交工作

執行下列 gcloud 指令,將字數計算工作提交至 Managed Service for Apache Spark 叢集。

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

查看輸出內容

工作完成後,請執行下列 gcloud CLI 指令,以查看字數統計輸出內容。

gcloud storage cat gs://${BUCKET_NAME}/output/*

畫面會顯示如下的字數計算輸出內容:

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)

清除所用資源

完成教學課程後,您可以清除所建立的資源,這樣資源就不會繼續使用配額,也不會產生費用。下列各節將說明如何刪除或關閉這些資源。

刪除專案

如要避免付費,最簡單的方法就是刪除您為了本教學課程所建立的專案。

刪除專案的方法如下:

  1. 前往 Google Cloud 控制台的「Manage resources」(管理資源) 頁面。

    前往「Manage resources」(管理資源)

  2. 在專案清單中選取要刪除的專案,然後點選「Delete」(刪除)
  3. 在對話方塊中輸入專案 ID,然後按一下 [Shut down] (關閉) 以刪除專案。

刪除 Managed Service for Apache Spark 叢集

您可以考慮只刪除專案中的叢集,而非刪除整個專案。

刪除 Cloud Storage 值區

Google Cloud 控制台

  1. 前往 Google Cloud 控制台的 Cloud Storage「Buckets」(值區) 頁面。

    前往「Buckets」(值區) 頁面

  2. 按一下要刪除的值區旁的核取方塊。
  3. 如要刪除值區,請依序點選 「Delete」(刪除),然後按照指示操作。

指令列

    刪除 bucket:
    gcloud storage buckets delete BUCKET_NAME

後續步驟