将 GPU 与 Google Cloud Serverless for Apache Spark 结合使用

您可以将 GPU 加速器挂接到您的 Google Cloud Serverless for Apache Spark 批量工作负载,以实现以下结果:

  • 加快大规模数据分析工作负载的处理速度。

  • 使用 GPU 机器学习库加速对大型数据集的模型训练。

  • 执行高级数据分析,例如视频或自然语言处理。

所有受支持的 Serverless for Apache Spark Spark 运行时 都会将Spark RAPIDS 库添加到每个工作负载节点。 Serverless for Apache Spark Spark 运行时 版本 1.1 还会将 XGBoost 库 添加到工作负载节点。这些库提供了强大的数据转换和机器学习工具,您可以在 GPU 加速的工作负载中使用这些工具。

GPU 的优势

将 GPU 与 Serverless for Apache Spark 工作负载搭配使用时,可以获得以下优势:

  • 性能提升: GPU 加速可以显著提升 Spark 工作负载的性能,尤其是在计算密集型任务(例如机器学习和深度学习、图处理和复杂分析)方面。

  • 更快的模型训练: 对于机器学习任务,挂接 GPU 可以大幅缩短训练模型所需的时间,使数据科学家和工程师能够快速迭代和实验。

  • 可伸缩性: 客户可以向节点添加更多 GPU 节点或更强大的 GPU,以处理日益复杂的处理需求。

  • 经济高效: 虽然 GPU 需要初始投资,但由于处理时间缩短且资源利用率更高,因此随着时间的推移,您可以节省成本。

  • 增强的数据分析: GPU 加速可让您对大型数据集执行高级分析,例如图片和视频分析以及自然语言处理。

  • 改进的产品: 更快的处理速度有助于更快地做出决策,并使应用更具响应能力。

限制和注意事项

价格

GPU 加速器适用于 高级定价层级。如需了解加速器价格信息,请参阅 Serverless for Apache Spark 价格

准备工作

创建挂接 GPU 加速器的无服务器批处理工作负载之前, 请执行以下操作:

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud新手, 请创建一个账号来评估我们的产品在 实际场景中的表现。新客户还可获享 $300 赠金,用于 运行、测试和部署工作负载。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. 安装 Google Cloud CLI。

  6. 如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI

  7. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  9. Verify that billing is enabled for your Google Cloud project.

  10. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  11. 安装 Google Cloud CLI。

  12. 如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI

  13. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  14. 在 Google Cloud 控制台中,前往 Cloud Storage 存储分区 页面。

    进入“存储分区”

  15. 点击 创建
  16. 创建存储桶 页面上,输入您的存储桶信息。要转到下一步 ,请点击继续
    1. 开始使用 部分中,执行以下操作:
      • 输入符合 存储桶命名要求的全局唯一的名称。
      • 如需添加 存储桶标签, 请展开 标签 部分 (), 点击 添加标签,并为标签指定 keyvalue
    2. 选择存储数据的位置 部分中,执行以下操作:
      1. 选择位置类型
      2. 位置类型下拉菜单中选择一个位置,用于永久存储存储桶的数据。
      3. 如需设置 跨存储桶复制,请选择 通过 Storage Transfer Service 添加跨存储桶复制 ,然后 按照以下步骤操作:

        设置跨存储桶复制

        1. 存储桶 菜单中,选择一个存储桶。
        2. 复制设置 部分中, 点击配置 以配置 复制作业的设置。

          系统会显示配置跨存储桶复制 窗格 显示。

          • 如需按对象名称前缀过滤要复制的对象, 请输入要用于包含或排除对象的前缀,然后点击 添加前缀
          • 如需为复制的对象设置存储类别, 请从存储类别菜单中选择一个存储类别。 如果您跳过此步骤,则复制的对象会默认使用 目标存储桶的存储类别。
          • 点击完成
    3. 选择数据存储方式 部分中,执行以下操作:
      1. 为存储桶选择默认存储类别,或者选择Autoclass对存储桶数据进行自动存储类别管理。
      2. 如需启用 分层命名空间,请在 针对数据密集型工作负载优化存储 部分中,选择 在此存储桶上启用分层命名空间
    4. 选择如何控制对对象的访问权限 部分中,选择 存储桶是否强制执行禁止公开访问, 然后为存储桶对象选择访问权限控制方法
    5. 选择如何保护对象数据 部分中,执行以下操作:
      • 数据保护 下,选择您要为存储桶设置的任何选项。
        • 如需启用 软删除,请点击 软删除政策(用于数据恢复) 复选框, 然后指定您希望在删除对象后保留对象的天数。
        • 如需设置 对象版本控制,请点击 对象版本控制(用于版本控制) 复选框, 然后指定每个对象的最大版本数以及非当前版本过期的天数。
        • 如需在对象和存储分区上启用保留政策,请点击保留(用于合规性) 复选框,然后执行以下操作:
          • 如需启用 对象保留锁定,请点击 启用对象保留 复选框。
          • 如需启用存储桶锁,请点击设置存储桶保留政策 复选框,然后为保留期限选择时间单位和时间长度。
      • 如需选择对象数据的加密方式,请展开 数据加密 部分 (),然后选择 数据加密 方法
  17. 点击创建

创建使用 GPU 加速器的无服务器批处理工作负载

提交使用 NVIDIA L4 GPU 的 Serverless for Apache Spark 批处理工作负载,以运行并行化的 PySpark 任务。使用 gcloud CLI,按照以下步骤进行操作:

  1. 点击展开我 ,然后使用文本编辑器或代码编辑器在本地机器上创建一个 test-py-spark-gpu.py文件,并将列出的 PySpark 代码保存到该文件中。

    #!/usr/bin/env python
    
    """S8s Accelerators Example."""
    
    import subprocess
    from typing import Any
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col
    from pyspark.sql.types import IntegerType
    from pyspark.sql.types import StructField
    from pyspark.sql.types import StructType
    
    spark = SparkSession.builder.appName("joindemo").getOrCreate()
    
    
    def get_num_gpus(_: Any) -> int:
      """Returns the number of GPUs."""
      p_nvidia_smi = subprocess.Popen(
          ["nvidia-smi", "-L"], stdin=None, stdout=subprocess.PIPE
      )
      p_wc = subprocess.Popen(
          ["wc", "-l"],
          stdin=p_nvidia_smi.stdout,
          stdout=subprocess.PIPE,
          stderr=subprocess.PIPE,
          universal_newlines=True,
      )
      [out, _] = p_wc.communicate()
      return int(out)
    
    
    num_workers = 5
    result = (
        spark.sparkContext.range(0, num_workers, 1, num_workers)
        .map(get_num_gpus)
        .collect()
    )
    num_gpus = sum(result)
    print(f"Total accelerators: {num_gpus}")
    
    # Run the join example
    schema = StructType([StructField("value", IntegerType(), True)])
    df = (
        spark.sparkContext.parallelize(range(1, 10000001), 6)
        .map(lambda x: (x,))
        .toDF(schema)
    )
    df2 = (
        spark.sparkContext.parallelize(range(1, 10000001), 6)
        .map(lambda x: (x,))
        .toDF(schema)
    )
    joined_df = (
        df.select(col("value").alias("a"))
        .join(df2.select(col("value").alias("b")), col("a") == col("b"))
        .explain()
    )
  2. 在本地机器上使用 gcloud CLI 提交包含五个工作器的 Serverless for Apache Spark 无服务器批量作业,每个工作器都使用 L4 GPU 加速:

    gcloud dataproc batches submit pyspark test-py-spark-gpu.py \
        --project=PROJECT_ID \
        --region=REGION \
        --deps-bucket=BUCKET_NAME \
        --version=1.1 \
        --properties=spark.dataproc.executor.compute.tier=premium,spark.dataproc.executor.disk.tier=premium,spark.dataproc.executor.resource.accelerator.type=l4,spark.executor.instances=5,spark.dataproc.driverEnv.LANG=C.UTF-8,spark.executorEnv.LANG=C.UTF-8,spark.shuffle.manager=com.nvidia.spark.rapids.RapidsShuffleManager
    

注意:

  • PROJECT_ID:您的 Google Cloud 项目 ID。
  • REGION:用于运行工作负载的可用 Compute Engine 区域
  • BUCKET_NAME:Cloud Storage 存储桶的名称。Spark 会先将工作负载依赖项上传到此存储桶中的 /dependencies 文件夹,然后再运行批处理工作负载。
  • --version: 所有 受支持的 Google Cloud Serverless for Apache Spark 运行时 都会将 RAPIDS 库添加到 GPU 加速的工作负载的每个节点。只有运行时版本 1.1 会将 XGBoost 库添加到 GPU 加速的工作负载的每个节点。
  • --properties (请参阅 Spark 资源分配属性):

    • spark.dataproc.driverEnv.LANG=C.UTF-8spark.executorEnv.LANG=C.UTF-8(使用早于 2.2 的运行时版本时是必需的):这些属性会将默认字符集设置为 C.UTF-8。
    • spark.dataproc.executor.compute.tier=premium (必需):GPU 加速的工作负载按高级 数据计算单元 (DCU) 计费。请参阅 Serverless for Apache Spark 加速器价格

    • spark.dataproc.executor.disk.tier=premium (必需):具有 A100-40、A100-80 或 L4 加速器的节点必须使用高级磁盘层级。

    • spark.dataproc.executor.resource.accelerator.type=l4(必需):必须仅指定一种 GPU 类型。示例作业选择 L4 GPU。可以使用以下参数名称指定以下加速器类型:

      GPU 类型 参数名称
      A100 40GB a100-40
      A100 80GB a100-80

    • spark.executor.instances=5(必需):必须至少为 2。在此示例中设置为 5。

    • spark.executor.cores (可选):您可以设置此属性来指定核心 vCPU 的数量。L4 GPU 的有效值为 4(默认值)或 81216244896。A100 GPU 的唯一有效值和默认值为 12。具有 L4 GPU 和 244896 个核心的配置为每个执行器挂接了 248 个 GPU。所有其他配置都挂接了 1 个 GPU。

    • spark.dataproc.executor.disk.size (必需):L4 GPU 的固定磁盘大小为 375 GB,但具有 244896 个核心的配置除外,这些配置的磁盘大小分别为 7501,5003,000 GB。如果您在提交 L4 加速的工作负载时将此属性设置为其他值,则会发生错误。如果您选择 A100 40 或 A100 80 GPU,则有效大小为 375g、750g、1500g、3000g、6000g 和 9000g。

    • spark.executor.memory(可选)和 spark.executor.memoryOverhead(受限):您可以设置内存,但 不能设置 memoryOverhead。未被设置的属性占用的可用内存量将应用于未设置的属性。 spark.executor.memoryOverhead 对于 PySpark 批处理工作负载,设置为可用内存的 40%,对于其他工作负载,则设置为 10%(请参阅 Spark 资源分配属性)。

      下表显示了可以为不同的 A100 和 L4 GPU 配置设置的最大内存量。任一属性的最小值均为 1024 MB。

      A100 (40 GB) A100 (80 GB) L4(4 个核心) L4(8 个核心) L4(12 个核心) L4(16 个核心) L4(24 个核心) L4(48 个核心) L4(96 个核心)
      最大总内存 (MB) 78040 165080 13384 26768 40152 53536 113072 160608 321216
    • Spark RAPIDS 属性(可选):默认情况下,Serverless for Apache Spark 会设置以下 Spark RAPIDS 属性值:

      • spark.plugins=com.nvidia.spark.SQLPlugin
      • spark.executor.resource.gpu.amount=1
      • spark.task.resource.gpu.amount=1/$spark_executor_cores
      • spark.shuffle.manager=''. 默认情况下,此属性处于未设置状态。NVIDIA 建议在使用 GPU 时启用 RAPIDS shuffle 管理器 ,以提高性能。为此,请在提交工作负载时设置 spark.shuffle.manager=com.nvidia.spark.rapids.RapidsShuffleManager
      • spark.rapids.sql.concurrentGpuTasks= minimum of (gpuMemoryinMB / 8, 4)
      • spark.rapids.shuffle.multiThreaded.writer.threads= minimum of (cpu cores in the VM / gpu count per VM, 32)
      • spark.rapids.shuffle.multiThreaded.reader.threads= minimum of (cpu cores in the VM / gpu count per VM, 32)

      如需设置 Spark RAPIDS 属性,请参阅 RAPIDS Accelerator for Apache Spark 配置;如需设置 Spark 高级属性,请参阅 RAPIDS Accelerator for Apache Spark 高级配置