将 Cloud Storage 连接器与 Apache Spark 搭配使用

本教程介绍了如何执行将Cloud Storage 连接器Apache Spark搭配使用的示例代码。

Lightning Engine 可增强与 Cloud Storage 的连接,以优化其原生引擎的 性能。改进后的 Cloud Storage 连接器可最大限度地减少元数据操作,从而降低费用,而优化的文件输出提交器可为 Spark 工作负载解锁性能和可靠性。填写 抢先体验 表单,申请抢先体验此非公开预览版功能。

目标

使用 Java、Scala 或 Python 编写简单的 wordcount Spark 作业,然后在 Managed Service for Apache Spark 集群上运行该作业。

费用

在本文档中,您将使用的以下收费组件: Google Cloud

  • Compute Engine
  • Managed Service for Apache Spark
  • Cloud Storage

您可使用 价格计算器 根据您的预计使用情况来估算费用。

新 Google Cloud 用户可能有资格申请免费试用

准备工作

运行以下步骤为运行本教程中的代码做准备。

  1. 设置项目。如有必要,请设置一个启用了 Managed Service for Apache Spark、Compute Engine 和 Cloud Storage API 并在本地机器上安装了 Google Cloud CLI 的项目。

    1. 登录您的 Google Cloud 账号。如果您是新手 Google Cloud, 请创建一个账号来评估我们的产品在 实际场景中的表现。新客户还可获享 $300 赠金,用于 运行、测试和部署工作负载。
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Roles required to select or create a project

      • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
      • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

      Go to project selector

    3. Verify that billing is enabled for your Google Cloud project.

    4. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

      Roles required to enable APIs

      To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

      Enable the APIs

    5. Create a service account:

      1. Ensure that you have the Create Service Accounts IAM role (roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin). Learn how to grant roles.
      2. In the Google Cloud console, go to the Create service account page.

        Go to Create service account
      3. Select your project.
      4. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

        In the Service account description field, enter a description. For example, Service account for quickstart.

      5. Click Create and continue.
      6. Grant the Project > Owner role to the service account.

        To grant the role, find the Select a role list, then select Project > Owner.

      7. Click Continue.
      8. Click Done to finish creating the service account.

        Do not close your browser window. You will use it in the next step.

    6. Create a service account key:

      1. In the Google Cloud console, click the email address for the service account that you created.
      2. Click Keys.
      3. Click Add key, and then click Create new key.
      4. Click Create. A JSON key file is downloaded to your computer.
      5. Click Close.
    7. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

    8. 安装 Google Cloud CLI。

    9. 如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI

    10. 如需初始化 gcloud CLI,请运行以下命令:

      gcloud init
    11. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Roles required to select or create a project

      • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
      • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

      Go to project selector

    12. Verify that billing is enabled for your Google Cloud project.

    13. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

      Roles required to enable APIs

      To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

      Enable the APIs

    14. Create a service account:

      1. Ensure that you have the Create Service Accounts IAM role (roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin). Learn how to grant roles.
      2. In the Google Cloud console, go to the Create service account page.

        Go to Create service account
      3. Select your project.
      4. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

        In the Service account description field, enter a description. For example, Service account for quickstart.

      5. Click Create and continue.
      6. Grant the Project > Owner role to the service account.

        To grant the role, find the Select a role list, then select Project > Owner.

      7. Click Continue.
      8. Click Done to finish creating the service account.

        Do not close your browser window. You will use it in the next step.

    15. Create a service account key:

      1. In the Google Cloud console, click the email address for the service account that you created.
      2. Click Keys.
      3. Click Add key, and then click Create new key.
      4. Click Create. A JSON key file is downloaded to your computer.
      5. Click Close.
    16. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

    17. 安装 Google Cloud CLI。

    18. 如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI

    19. 如需初始化 gcloud CLI,请运行以下命令:

      gcloud init

  2. 创建 Cloud Storage 存储桶。您需要使用 Cloud Storage 来保存教程数据。如果您没有可用的 Cloud Storage 存储桶,请在项目中创建新存储桶。

    1. 在 Google Cloud 控制台中,前往 Cloud Storage 存储分区 页面。

      进入“存储分区”

    2. 点击 创建
    3. 创建存储桶 页面上,输入您的存储桶信息。要转到下一步 ,请点击继续
      1. 开始使用 部分中,执行以下操作:
        • 输入符合 存储桶命名要求的全局唯一的名称。
        • 如需添加 存储桶标签, 请展开 标签 部分 (), 点击 添加标签,并为标签指定 keyvalue
      2. 选择存储数据的位置 部分中,执行以下操作:
        1. 选择位置类型
        2. 位置类型下拉菜单中选择一个位置,用于永久存储存储桶的数据。
        3. 如需设置 跨存储桶复制,请选择 通过 Storage Transfer Service 添加跨存储桶复制 ,然后 按照以下步骤操作:

          设置跨存储桶复制

          1. 存储桶 菜单中,选择一个存储桶。
          2. 复制设置 部分中, 点击配置 以配置 复制作业的设置。

            系统会显示配置跨存储桶复制 窗格 显示。

            • 如需按对象名称前缀过滤要复制的对象, 请输入要用于包含或排除对象的前缀,然后点击 添加前缀
            • 如需为复制的对象设置存储类别, 请从存储类别菜单中选择一个存储类别。 如果您跳过此步骤,则复制的对象会默认使用 目标存储桶的存储类别。
            • 点击完成
      3. 选择存储数据的方式 部分中,执行以下操作:
        1. 为存储桶选择默认存储类别,或者选择Autoclass对存储桶数据进行自动存储类别管理。
        2. 如需启用 分层命名空间,请在 针对数据密集型工作负载优化存储 部分中,选择 在此存储桶上启用分层命名空间
      4. 选择如何控制对对象的访问权限 部分中,选择 存储桶是否强制执行禁止公开访问, 然后为存储桶对象选择访问权限控制方法
      5. 选择如何保护对象数据 部分中,执行以下操作:
        • 数据保护 下,选择您要为存储桶设置的任何选项。
          • 如需启用 软删除,请点击 软删除政策(用于数据恢复) 复选框, 然后指定您希望在删除对象后保留对象的天数。
          • 如需设置 对象版本控制,请点击 对象版本控制(用于版本控制) 复选框, 然后指定每个对象的最大版本数以及非当前版本过期前的天数。
          • 如需对对象和存储分区启用保留政策,请点击保留(用于合规性) 复选框,然后执行以下操作:
            • 如需启用 对象保留锁定,请点击 启用对象保留 复选框。
            • 如需启用 存储桶锁定,请点击 设置存储桶保留政策 复选框,然后为保留期限选择时间单位和时间长度。
        • 如需选择对象数据的加密方式,请展开 数据加密 部分 (),然后选择 数据加密 方法
    4. 点击创建

  3. 设置本地环境变量。在本地机器上设置环境变量。设置您的 Google Cloud 项目 ID 以及您将在本教程中使用的 Cloud Storage 存储桶的名称。此外,还要提供现有或新 Managed Service for Apache Spark 集群的 名称和区域 。 您可以在下一步中创建要用于本教程的集群。

    PROJECT=project-id
    
    BUCKET_NAME=bucket-name
    
    CLUSTER=cluster-name
    
    REGION=cluster-region Example: "us-central1"
    

  4. 创建 Managed Service for Apache Spark 集群。运行以下命令,以 在指定的 Compute Engine 地区中创建 单节点 Managed Service for Apache Spark 集群。

    gcloud dataproc clusters create ${CLUSTER} \
        --project=${PROJECT} \
        --region=${REGION} \
        --single-node
    

  5. 将公共数据复制到 Cloud Storage 存储分区。将公开数据莎士比亚作品文本片段复制到 Cloud Storage 存储桶的 input 文件夹中:

    gcloud storage cp gs://pub/shakespeare/rose.txt \
        gs://${BUCKET_NAME}/input/rose.txt
    

  6. 设置 Java (Apache Maven)Scala (SBT)Python 开发环境。

准备 Spark wordcount 作业

在下面选择一个标签页,按照步骤准备作业软件包或文件以提交到集群。您可以准备以下作业类型之一:

Java

  1. pom.xml 文件复制到本地机器。 以下 pom.xml 文件指定 Scala 和 Spark 库 依赖项,它们具有 provided 范围,指示 Managed Service for Apache Spark 集群将在运行时提供这些 库。pom.xml 文件未指定 Cloud Storage 依赖项,因为连接器实现了标准 HDFS 接口。当 Spark 作业访问 Cloud Storage 集群文件时(URI 开头为 gs:// 的文件),系统会自动使用 Cloud Storage 连接器访问 Cloud Storage 中的文件
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
  2. 将下面列出的 WordCount.java 代码复制到您的本地机器
    1. 创建一组路径为 src/main/java/managed-spark/codelab 的目录:
      mkdir -p src/main/java/managed-spark/codelab
      
    2. WordCount.java 复制到您的本地机器的 src/main/java/managed-spark/codelab
      cp WordCount.java src/main/java/managed-spark/codelab
      

    WordCount.java 是 Java 中一个 Spark 作业,可从 Cloud Storage 读取文本文件、执行字数统计,然后将文本文件结果写入 Cloud Storage。

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
  3. 构建软件包
    mvn clean package
    
    如果构建成功,则系统会创建 target/word-count-1.0.jar
  4. 将软件包暂存到 Cloud Storage
    gcloud storage cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. build.sbt 文件复制到本地机器。 以下 build.sbt 文件指定 Scala 和 Spark 库 依赖项,它们具有 provided 范围,以 指示 Managed Service for Apache Spark 集群将在运行时提供这些 库。build.sbt 文件未指定 Cloud Storage 依赖项,因为连接器实现了标准 HDFS 接口。当 Spark 作业访问 Cloud Storage 集群文件时(URI 开头为 gs:// 的文件),系统会自动使用 Cloud Storage 连接器访问 Cloud Storage 中的文件
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
  2. word-count.scala 复制到本地机器。 这是 Java 中一个 Spark 作业,可从 Cloud Storage 读取文本文件、执行字数统计,然后将文本文件结果写入 Cloud Storage。
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
  3. 构建软件包
    sbt clean package
    
    如果构建成功,则系统会创建 target/scala-2.11/word-count_2.11-1.0.jar
  4. 将软件包暂存到 Cloud Storage
    gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. word-count.py 复制到本地机器。 这是 Python 中一个使用 PySpark 的 Spark 作业,可从 Cloud Storage 读取文本文件、执行字数统计,然后将文本文件结果写入 Cloud Storage。
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])

提交作业

运行以下 gcloud 命令,将 Wordcount 作业提交到 Managed Service for Apache Spark 集群。

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

查看输出

作业完成后,运行以下 gcloud CLI 命令以查看字数统计输出。

gcloud storage cat gs://${BUCKET_NAME}/output/*

字数统计输出应类似于以下内容:

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)

清理

完成本教程后,您可以清理您创建的资源,让它们停止使用配额,以免产生费用。以下部分介绍如何删除或关闭这些资源。

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往 管理资源 页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击 关闭以删除项目。

删除 Managed Service for Apache Spark 集群

您可能希望仅删除项目中的集群,而不是删除项目。

删除 Cloud Storage 存储桶

Google Cloud 控制台

  1. 在 Google Cloud 控制台中,前往 Cloud Storage 存储分区 页面。

    进入“存储分区”

  2. 点击要删除的存储分区对应的复选框。
  3. 如需删除存储桶, 请点击 删除,然后按照 说明操作。

命令行

    删除存储桶:
    gcloud storage buckets delete BUCKET_NAME

后续步骤