ML Diagnostics SDK 使用入门

ML Diagnostics Python SDK 可与机器学习工作负载集成,以收集和 管理模型指标、配置和性能分析文件 Google Cloud。使用该 SDK 进行程序化性能分析 并查看模型指标。如果您在不使用该 SDK 的情况下使用按需性能分析,则无法获取模型指标。

本指南将向您展示如何创建机器学习运行作业、收集和管理工作负载指标和配置、部署托管 XProf 资源,以及启用程序化和按需性能分析文件捕获。

如需详细了解如何使用 ML Diagnostics SDK,请参阅 google-cloud-mldiagnostics 代码库

安装 ML Diagnostics SDK

安装 google-cloud-mldiagnostics

pip install google-cloud-mldiagnostics

在机器学习工作负载代码中导入以下软件包:

from google_cloud_mldiagnostics import machinelearning_run
from google_cloud_mldiagnostics import metrics
from google_cloud_mldiagnostics import xprof

启用 Cloud Logging

该 SDK 使用标准 Python logging 模块输出指标和配置信息。如需将这些日志路由到 Cloud Logging,请安装并配置 google-cloud-logging 库。这样,您就可以在控制台中查看 SDK 日志、记录的指标以及 您自己的应用日志。 Google Cloud

安装 google-cloud-logging 库:

pip install google-cloud-logging

通过将 Cloud Logging 处理程序连接到 Python 根日志记录器,在脚本中配置日志记录。将以下行添加到 Python 脚本的开头:

  import logging
  import google.cloud.logging

  # Instantiate a Cloud Logging client
  logging_client = google.cloud.logging.Client()

  # Attach the Cloud Logging handler to the Python root logger
  logging_client.setup_logging()

  # Standard logging calls will go to Cloud Logging
  logging.info("SDK logs and application logs will appear in Cloud Logging.")

启用详细日志记录

默认情况下,日志记录级别设置为 INFO。如需从 SDK 接收更详细的日志(例如机器学习运行作业详细信息),请在调用 setup_logging() 后将日志记录级别设置为 DEBUG

import logging
import google.cloud.logging

logging_client = google.cloud.logging.Client()
logging_client.setup_logging()
logging.getLogger().setLevel(logging.DEBUG) # Enable DEBUG level logs

logging.debug("This is a debug message.")
logging.info("This is an info message.")

启用 DEBUG 后,您会在 Cloud Logging 中收到其他 SDK 诊断信息。例如:

DEBUG:google_cloud_mldiagnostics.core.global_manager:current run details:
{'name': 'projects/my-gcp-project/locations/us-central1/mlRuns/my-run-12345',
'gcs_path': 'gs://my-bucket/profiles', ...}

创建机器学习运行作业

如需使用 ML Diagnostics 平台,您需要先创建机器学习运行作业。这需要使用 SDK 检测机器学习工作负载,以执行日志记录、收集指标和启用性能分析跟踪。

以下是一个基本示例,用于初始化 Cloud Logging、创建机器学习运行作业 (MLRun)、记录指标和捕获性能分析文件:

import logging
import os
import google.cloud.logging
from google_cloud_mldiagnostics import machinelearning_run, metrics, xprof, metric_types

# 1. Set up Cloud Logging
# Make sure to pip install google-cloud-logging
logging_client = google.cloud.logging.Client()
logging_client.setup_logging()
# Optional: Set logging level to DEBUG for more detailed SDK logs
logging.getLogger().setLevel(logging.DEBUG)

# 2. Define and start machinelearning run
try:
    run = machinelearning_run(
          name="<run_name>",
          run_group="<run_group>",
          configs={ "epochs": 100, "batch_size": 32 },
          project="<some_project>",
          region="<some_zone>",
          gcs_path="gs://<some_bucket>",
          on_demand_xprof=True,
        )
    logging.info(f"MLRun created: {run.name}")

    # 3. Collect metrics during your run
    metrics.record(metric_types.MetricType.LOSS, 0.123, step=1)
    logging.info("Loss metric recorded.")

    # 4. Capture profiles programmatically
    with xprof():
        # ... your code to profile here ...
        pass
    logging.info("Profile captured.")

except Exception as e:
    logging.error(f"Error during MLRun: {e}", exc_info=True)

该代码示例使用以下变量:

变量 要求 说明
name 必需 特定运行作业的标识符。SDK 会自动创建 machine-learning-run-id,以确保运行 名称是唯一的。
run_group 可选 一个标识符,可帮助对属于同一实验的多个运行作业进行分组。例如,与 TPU 切片大小 扫描相关的所有运行作业都可以属于同一组。
project 可选 如果未指定,则从 Google Cloud CLI 中提取项目。
region 必需 us-east5 之外,所有 Cluster Director 位置 均受支持。您可以通过每个命令的实参或使用以下命令设置此标志: gcloud config set compute/region
configs 可选 包含运行作业的配置参数的键值对。如果未定义配置,则会显示默认软件和系统配置,但不会显示机器学习工作负载配置。
gcs_path 必需(有条件) 用于保存所有性能分析文件的 Google Cloud 存储位置。 例如:gs://my-bucketgs://my-bucket/folder1。 仅当 SDK 用于捕获性能分析文件时,才需要提供此参数。
on-demand-xprof 可选 在端口 9999 上启动 xprofz daemon 以启用按需 性能分析。您可以在同一代码中同时启用按需性能分析和程序化性能分析 ,只要它们不是同时发生即可。

以下配置由 SDK 自动收集,无需在 machinelearning_run 中指定:

  • 软件配置:框架、框架版本、XLA 标志。
  • 系统配置:设备类型、切片数、切片大小、主机数。

项目和区域信息存储为机器学习运行作业元数据。机器学习运行作业使用的区域不必与工作负载运行作业使用的区域一致。

写入配置

许多工作负载包含的配置过多,无法直接在 machinelearning_run 定义中定义。在这些情况下,您可以使用 JSON 或 YAML 将配置写入运行作业。

import yaml
import json

# Read the YAML file
with open('config.yaml', 'r') as yaml_file:
  # Parse YAML into a Python dictionary
  yaml_data = yaml.safe_load(yaml_file)

# Define machinelearning run
machinelearning_run(
  name="RUN_NAME",
  run_group="GROUP_NAME",
  configs=yaml_data,
  project="PROJECT_NAME",
  region="ZONE",
  gcs_path="gs://BUCKET_NAME",
)

收集指标

您可以使用 SDK 收集模型指标、模型性能指标和系统指标。您可以将这些指标创建为平均值,并使用时序图表直观呈现这些指标。

SDK 提供了两个用于记录指标的函数:metrics.record() 用于捕获单个数据点,metrics.record_metrics() 用于在单个批次中记录多个指标。这两个函数都会将指标写入 Cloud Logging,从而实现可视化和分析。

如需记录单个指标:

# Record a metric only with time as the x-axis
metrics.record(metric_types.MetricType.LOSS, 0.123)

# Record a metric with time and step as the x-axis
metrics.record(metric_types.MetricType.LOSS, 0.123, step=1)

如需记录多个指标:

from google_cloud_mldiagnostics import metric_types
# User codes
# machinelearning_run should be called
# ......

for step in range(num_steps):
  if (step + 1) % 10 == 0:
    metrics.record_metrics([
        # Model quality metrics
        {"metric_name": metric_types.MetricType.LEARNING_RATE, "value": step_size},
        {"metric_name": metric_types.MetricType.LOSS, "value": loss},
        {"metric_name": metric_types.MetricType.GRADIENT_NORM, "value": gradient},
        {"metric_name": metric_types.MetricType.TOTAL_WEIGHTS, "value": total_weights},
        # Model performance metrics
        {"metric_name": metric_types.MetricType.STEP_TIME, "value": step_time},
        {"metric_name": metric_types.MetricType.THROUGHPUT, "value": throughput},
        {"metric_name": metric_types.MetricType.LATENCY, "value": latency},
        {"metric_name": metric_types.MetricType.TFLOPS, "value": tflops},
        {"metric_name": metric_types.MetricType.MFU, "value": mfu},
    ], step=step+1)

以下系统指标由 SDK 从 libTPU、psutil 和 JAX 库自动收集:

  • TPU TensorCore 利用率
  • TPU 占空比
  • HBM 利用率
  • 主机 CPU 利用率
  • 主机内存利用率

您无需手动指定这些指标。这些系统指标的默认 x 轴为“时间”

如果分配了以下预定义指标键,它们将自动显示在 Google Cloud 控制台中。这些指标不会自动计算;它们是您可以为其分配值的预定义键。

  • 模型质量指标键LEARNING_RATELOSSGRADIENT_NORMTOTAL_WEIGHTS
  • 模型效果指标键STEP_TIMETHROUGHPUTLATENCYMFUTFLOPS

预定义指标以及其他用户定义的指标可以记录为 x 轴为 time,或者同时记录为 timestep。您可以记录工作负载中的任何自定义指标。

以下示例捕获工作负载的单个指标,您可以在特定机器学习运行作业的模型指标 标签页中查看该指标:

metrics.record("custom_metrics_1", step_size, step=step + 1)

如需在一个调用中记录多个指标,请使用 record_metrics 方法。例如:

metrics.record_metrics([
        # Model quality metrics
        {"metric_name": metric_types.MetricType.LEARNING_RATE, "value": step_size},
        {"metric_name": metric_types.MetricType.LOSS, "value": loss},
        {"metric_name": metric_types.MetricType.GRADIENT_NORM, "value": gradient},
        {"metric_name": metric_types.MetricType.TOTAL_WEIGHTS, "value": total_weights},
        # Model performance metrics
        {"metric_name": metric_types.MetricType.STEP_TIME, "value": step_time},
        {"metric_name": metric_types.MetricType.THROUGHPUT, "value": throughput},
        {"metric_name": metric_types.MetricType.LATENCY, "value": latency},
        {"metric_name": metric_types.MetricType.TFLOPS, "value": tflops},
        {"metric_name": metric_types.MetricType.MFU, "value": mfu},
     # Custom metrics
     {"custom_metrics_1", "value":<value>},
     {"custom_metrics_2", "value":<value>},
     {"avg_mtp_acceptance_rate_percent", "value":<value>},
     {"dpo_reward_accuracy", "value":<value>},
    ], step=step+1)

捕获性能分析文件

您可以使用XProf来捕获机器学习工作负载的性能分析文件 ,并使用程序化捕获或按需捕获(手动捕获)。程序化捕获涉及将性能分析命令直接嵌入到机器学习代码中,并明确说明何时开始和停止记录数据。 按需捕获是实时进行的,即在工作负载已在积极运行的情况下触发性能分析器。

用于捕获性能分析文件的 SDK 命令与框架无关,因为所有框架级性能分析命令都会自动集成到 ML Diagnostics 性能分析命令中。这意味着您的性能分析代码不依赖于您使用的框架。

程序化性能分析文件捕获

程序化捕获需要您对模型代码进行注释,并指定要在何处捕获性能分析文件。通常,您会捕获几个训练步骤的性能分析文件,或者对模型中的特定代码块进行性能分析。

您可以使用 ML Diagnostics SDK 通过以下方式执行程序化性能分析文件捕获:

  • 基于 API 的收集:使用 start()stop() 方法控制性能分析。
  • 基于装饰器的收集:使用 @xprof(run) 为函数添加注解,以进行 自动性能分析。
  • 上下文管理器:与 xprof() 结合使用,以进行基于范围的性能分析,该性能分析会 自动处理 start()stop() 操作。

您可以在所有框架中使用相同的性能分析文件捕获代码。所有性能分析会话都会捕获到机器学习运行作业中定义的 Cloud Storage 存储桶中。

# Support collection via APIs
prof = xprof()  # Updates metadata and starts xprofz collector
prof.start()  # Collects traces to bucket
# ..... Your code execution here
# ....
prof.stop()

# Also supports collection via decorators
@xprof()
def abc(self):
    # does something
    pass

# Use xprof as a context manager to automatically start and stop collection
with xprof() as prof:
    # Your training or execution code here
    train_model()
    evaluate_model()

多主机(进程)性能分析

在程序化性能分析期间,SDK 会在执行机器学习工作负载代码的每个主机(进程)上开始性能分析。如果未提供节点列表,则会包含所有主机。

# starts profiling on all nodes
prof = xprof()
prof.start()
# ...
prof.stop()

默认情况下,在多个主机上调用不带 session_id 实参的 prof.start() 方法会导致单独的跟踪会话,每个主机对应一个会话。如需将来自不同主机的跟踪记录分组到 XProf 中的单个统一多主机会话中,请确保在所有参与主机上使用相同的 session_id 实参调用 prof.start() 方法。 例如:

# Use the same session_id on all hosts to group traces
prof = xprof()
prof.start(session_id="profiling_session")
# ...
prof.stop()

如需为特定主机启用性能分析,请执行以下操作:

# starts profiling on node with index 0 and 2
prof = xprof(process_index_list=[0,2])
prof.start()
# ...
prof.stop()

按需性能分析文件捕获

如果您想临时捕获性能分析文件,或者尚未启用程序化性能分析文件捕获,可以使用按需性能分析文件捕获。如果在运行期间模型性能出现问题,并且您想在这些时刻捕获性能分析文件以诊断问题,那么按需捕获会很有用。

使用按需性能分析时,无法获取模型指标(由 SDK 发出)。TPU 系统指标仍可在 GKE TPU 信息中心中使用。

如需启用按需性能分析文件捕获,请配置运行作业以支持按需捕获:

# Define machinelearning run
machinelearning_run(
    name="<run_name>",
    # specify where profiling data is stored
    gcs_path="gs://<bucket>",
    ...
    # enable on demand profiling, starts xprofz daemon on port 9999
    on_demand_xprof=True
)

您可以在所有框架中使用相同的性能分析文件捕获代码。所有性能分析会话都会捕获到机器学习运行作业中定义的 Cloud Storage 存储桶中。

如需在 GKE 上进行按需性能分析,请将 GKE connection-operatorinjection-webhook 部署到 GKE 集群中。这样可确保机器学习运行作业可以找到其运行所在的 GKE 节点,并且按需捕获下拉列表可以自动填充这些节点。如需了解详情,请参阅配置 GKE 集群

为 GKE 打包工作负载

您可以使用 Dockerfile 打包使用 ML Diagnostics SDK 的应用。安装 google-cloud-logging 软件包以进行 Cloud Logging 集成。例如:

# Base image (user's choice, e.g., python:3.10-slim, or a base with ML frameworks)
FROM python:3.11-slim

# Install base utilities
RUN pip install --no-cache-dir --upgrade pip

# Install SDK and Logging client
# psutil is installed as a dependency of google-cloud-mldiagnostics
RUN pip install --no-cache-dir \
    google-cloud-mldiagnostics \
    google-cloud-logging

# Optional: For JAX/TPU workloads
# RUN pip install --no-cache-dir "jax[tpu]" -f https://storage.googleapis.com/jax-releases/libtpu_releases.html &&
#     pip install --no-cache-dir libtpu xprof

# Add your application code
COPY ./app /app
WORKDIR /app

# Run your script
CMD ["python", "your_train_script.py"]

部署工作负载

将 SDK 与工作负载集成后,将工作负载打包到映像中,并使用指定的映像创建 YAML 文件。对于程序化性能分析,请在 YAML 文件中使用 managed-mldiagnostics-gke=true 标记工作负载。 按需性能分析不需要此标签。

对于 GKE:

kubectl apply -f YAML_FILE_NAME

对于 Compute Engine,请使用 SSH 连接到虚拟机,然后运行工作负载的 Python 代码:

source venv/bin/activate
python3.11 WORKLOAD_FILE_NAME

部署工作负载后,通过搜索工作负载命名空间来查找作业名称:

kubectl get job -n YOUR_NAMESPACE

您可以通过传递作业名称和命名空间在 kubectl 日志中找到运行作业名称和链接。您必须指定工作负载容器(例如:-c workload),因为 ML Diagnostics Sidecar 会处理自己的日志记录。

kubectl logs jobs/s5-tpu-slice-0 -n YOUR_NAMESPACE -c workload