在串流管道中執行 LLM

本教學課程說明如何使用 Apache Beam RunInference API,在串流 Dataflow 管道中執行大型語言模型 (LLM)。

如要進一步瞭解 RunInference API,請參閱 Apache Beam 說明文件中的「關於 Beam ML」。

您可以在 GitHub 取得範例程式碼

目標

  • 為模型的輸入內容和回覆建立 Pub/Sub 主題和訂閱項目。
  • 使用 Vertex AI 自訂工作,將模型載入 Cloud Storage。
  • 執行管道。
  • 向模型提問並取得回覆。

費用

在本文件中,您會使用下列 Google Cloud的計費元件:

如要根據預測用量估算費用,請使用 Pricing Calculator

初次使用 Google Cloud 的使用者可能符合免費試用期資格。

完成本文所述工作後,您可以刪除建立的資源,避免繼續計費,詳情請參閱「清除所用資源」。

事前準備

請在可用磁碟空間至少有 5 GB 的電腦上執行本教學課程,以便安裝依附元件。

  1. 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
  2. 安裝 Google Cloud CLI。

  3. 若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI

  4. 執行下列指令,初始化 gcloud CLI:

    gcloud init
  5. 建立或選取 Google Cloud 專案

    選取或建立專案所需的角色

    • 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
    • 建立專案:如要建立專案,您需要具備專案建立者角色 (roles/resourcemanager.projectCreator),其中包含 resourcemanager.projects.create 權限。瞭解如何授予角色
    • 建立 Google Cloud 專案:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替換為您要建立的 Google Cloud 專案名稱。

    • 選取您建立的 Google Cloud 專案:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替換為 Google Cloud 專案名稱。

  6. 確認專案已啟用計費功能 Google Cloud

  7. 啟用 Dataflow、Compute Engine、Cloud Storage、Pub/Sub 和 Vertex AI API:

    啟用 API 時所需的角色

    如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (roles/serviceusage.serviceUsageAdmin),其中包含 serviceusage.services.enable 權限。瞭解如何授予角色

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  8. 如果您使用本機殼層,請為使用者帳戶建立本機驗證憑證:

    gcloud auth application-default login

    如果您使用 Cloud Shell,則不需要執行這項操作。

    如果系統傳回驗證錯誤,且您使用外部識別資訊提供者 (IdP),請確認您已 使用聯合身分登入 gcloud CLI

  9. 將角色授予使用者帳戶。針對下列每個 IAM 角色,執行一次下列指令: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    更改下列內容:

    • PROJECT_ID:專案 ID。
    • USER_IDENTIFIER:使用者帳戶的 ID。 例如:myemail@example.com
    • ROLE:授予使用者帳戶的 IAM 角色。
  10. 安裝 Google Cloud CLI。

  11. 若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI

  12. 執行下列指令,初始化 gcloud CLI:

    gcloud init
  13. 建立或選取 Google Cloud 專案

    選取或建立專案所需的角色

    • 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
    • 建立專案:如要建立專案,您需要具備專案建立者角色 (roles/resourcemanager.projectCreator),其中包含 resourcemanager.projects.create 權限。瞭解如何授予角色
    • 建立 Google Cloud 專案:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替換為您要建立的 Google Cloud 專案名稱。

    • 選取您建立的 Google Cloud 專案:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替換為 Google Cloud 專案名稱。

  14. 確認專案已啟用計費功能 Google Cloud

  15. 啟用 Dataflow、Compute Engine、Cloud Storage、Pub/Sub 和 Vertex AI API:

    啟用 API 時所需的角色

    如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (roles/serviceusage.serviceUsageAdmin),其中包含 serviceusage.services.enable 權限。瞭解如何授予角色

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  16. 如果您使用本機殼層,請為使用者帳戶建立本機驗證憑證:

    gcloud auth application-default login

    如果您使用 Cloud Shell,則不需要執行這項操作。

    如果系統傳回驗證錯誤,且您使用外部識別資訊提供者 (IdP),請確認您已 使用聯合身分登入 gcloud CLI

  17. 將角色授予使用者帳戶。針對下列每個 IAM 角色,執行一次下列指令: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    更改下列內容:

    • PROJECT_ID:專案 ID。
    • USER_IDENTIFIER:使用者帳戶的 ID。 例如:myemail@example.com
    • ROLE:授予使用者帳戶的 IAM 角色。
  18. 將角色授予 Compute Engine 預設服務帳戶。針對下列每個 IAM 角色,執行一次下列指令:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    更改下列內容:

    • PROJECT_ID:您的專案 ID。
    • PROJECT_NUMBER:您的專案編號。 如要找出專案編號,請使用 gcloud projects describe 指令
    • SERVICE_ACCOUNT_ROLE:每個角色。
  19. 複製 Google Cloud 專案 ID。本教學課程稍後會用到這個值。

建立 Google Cloud 資源

本節說明如何建立下列資源:

  • Cloud Storage bucket,做為暫時儲存位置
  • 模型提示的 Pub/Sub 主題
  • 模型回覆的 Pub/Sub 主題和訂閱項目

建立 Cloud Storage 值區

使用 gcloud CLI 建立 Cloud Storage bucket。 這個 bucket 是 Dataflow 管道的暫時儲存位置。

如要建立 bucket,請使用 gcloud storage buckets create 指令

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

更改下列內容:

  • BUCKET_NAME:Cloud Storage bucket 的名稱,必須符合 bucket 命名規定。Cloud Storage bucket 名稱不得重複。
  • LOCATION:值區的位置

複製 bucket 名稱。本教學課程稍後會用到這個值。

建立 Pub/Sub 主題和訂閱項目

建立兩個 Pub/Sub 主題和一個訂閱項目。其中一個主題是您傳送給模型的輸入提示。另一個主題及其附加訂閱項目則用於模型回覆。

  1. 如要建立主題,請執行 gcloud pubsub topics create 指令兩次,每個主題執行一次:

    gcloud pubsub topics create PROMPTS_TOPIC_ID
    gcloud pubsub topics create RESPONSES_TOPIC_ID
    

    更改下列內容:

    • PROMPTS_TOPIC_ID:要傳送至模型的主題 ID,例如 prompts
    • RESPONSES_TOPIC_ID:模型回應的主題 ID,例如 responses
  2. 如要建立訂閱並附加至回應主題,請使用 gcloud pubsub subscriptions create 指令

    gcloud pubsub subscriptions create RESPONSES_SUBSCRIPTION_ID --topic=RESPONSES_TOPIC_ID
    

    請將 RESPONSES_SUBSCRIPTION_ID 替換為模型回應的訂閱 ID,例如 responses-subscription

複製主題 ID 和訂閱 ID。您會在後續步驟中用到這些值。

準備環境

下載程式碼範例,然後設定環境來執行本教學課程。

python-docs-samples GitHub 存放區中的程式碼範例提供執行這項管道所需的程式碼。準備好建構自己的管道時,您可以將這段程式碼範例做為範本。

您可以使用 venv 建立獨立的 Python 虛擬環境,以便執行管道專案。虛擬環境可讓您將一個專案的依附元件與其他專案的依附元件隔開。如要進一步瞭解如何安裝 Python 及建立虛擬環境,請參閱「設定 Python 開發環境」。

  1. 使用 git clone 指令複製 GitHub 存放區:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    
  2. 請前往 run-inference 目錄:

    cd python-docs-samples/dataflow/run-inference
    
  3. 如果您使用命令提示字元,請確認系統中已執行 Python 3 和 pip

    python --version
    python -m pip --version
    

    視需要安裝 Python 3

    如果您使用 Cloud Shell,可以略過這個步驟,因為 Cloud Shell 已安裝 Python。

  4. 建立 Python 虛擬環境

    python -m venv /tmp/env
    source /tmp/env/bin/activate
    
  5. 安裝依附元件:

    pip install -r requirements.txt --no-cache-dir
    

模型載入程式碼範例

本教學課程中的模型載入程式碼會啟動 Vertex AI 自訂工作,將模型的 state_dict 物件載入 Cloud Storage。

範例檔案如下所示:

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Loads the state_dict for an LLM model into Cloud Storage."""

from __future__ import annotations

import os

import torch
from transformers import AutoModelForSeq2SeqLM


def run_local(model_name: str, state_dict_path: str) -> None:
    """Loads the state dict and saves it into the desired path.

    If the `state_dict_path` is a Cloud Storage location starting
    with "gs://", this assumes Cloud Storage is mounted with
    Cloud Storage FUSE in `/gcs`. Vertex AI is set up like this.

    Args:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
    """
    print(f"Loading model: {model_name}")
    model = AutoModelForSeq2SeqLM.from_pretrained(
        model_name, torch_dtype=torch.bfloat16
    )
    print(f"Model loaded, saving state dict to: {state_dict_path}")

    # Assume Cloud Storage FUSE is mounted in `/gcs`.
    state_dict_path = state_dict_path.replace("gs://", "/gcs/")
    directory = os.path.dirname(state_dict_path)
    if directory and not os.path.exists(directory):
        os.makedirs(os.path.dirname(state_dict_path), exist_ok=True)
    torch.save(model.state_dict(), state_dict_path)
    print("State dict saved successfully!")


def run_vertex_job(
    model_name: str,
    state_dict_path: str,
    job_name: str,
    project: str,
    bucket: str,
    location: str = "us-central1",
    machine_type: str = "e2-highmem-2",
    disk_size_gb: int = 100,
) -> None:
    """Launches a Vertex AI custom job to load the state dict.

    If the model is too large to fit into memory or disk, we can launch
    a Vertex AI custom job with a large enough VM for this to work.

    Depending on the model's size, it might require a different VM
    configuration. The model MUST fit into the VM's memory, and there
    must be enough disk space to stage the entire model while it gets
    copied to Cloud Storage.

    Args:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        job_name: Job display name in the Vertex AI console.
        project: Google Cloud Project ID.
        bucket: Cloud Storage bucket name, without the "gs://" prefix.
        location: Google Cloud regional location.
        machine_type: Machine type for the VM to run the job.
        disk_size_gb: Disk size in GB for the VM to run the job.
    """
    from google.cloud import aiplatform

    aiplatform.init(project=project, staging_bucket=bucket, location=location)

    job = aiplatform.CustomJob.from_local_script(
        display_name=job_name,
        container_uri="us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-13:latest",
        script_path="download_model.py",
        args=[
            "local",
            f"--model-name={model_name}",
            f"--state-dict-path={state_dict_path}",
        ],
        machine_type=machine_type,
        boot_disk_size_gb=disk_size_gb,
        requirements=["transformers"],
    )
    job.run()


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    subparsers = parser.add_subparsers(required=True)

    parser_local = subparsers.add_parser("local")
    parser_local.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser_local.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    parser_local.set_defaults(run=run_local)

    parser_vertex = subparsers.add_parser("vertex")
    parser_vertex.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser_vertex.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    parser_vertex.add_argument(
        "--job-name", required=True, help="Job display name in the Vertex AI console"
    )
    parser_vertex.add_argument(
        "--project", required=True, help="Google Cloud Project ID"
    )
    parser_vertex.add_argument(
        "--bucket",
        required=True,
        help='Cloud Storage bucket name, without the "gs://" prefix',
    )
    parser_vertex.add_argument(
        "--location", default="us-central1", help="Google Cloud regional location"
    )
    parser_vertex.add_argument(
        "--machine-type",
        default="e2-highmem-2",
        help="Machine type for the VM to run the job",
    )
    parser_vertex.add_argument(
        "--disk-size-gb",
        type=int,
        default=100,
        help="Disk size in GB for the VM to run the job",
    )
    parser_vertex.set_defaults(run=run_vertex_job)

    args = parser.parse_args()
    kwargs = args.__dict__.copy()
    kwargs.pop("run")

    args.run(**kwargs)

管道程式碼範例

本教學課程中的管道程式碼會部署 Dataflow 管道,執行下列操作:

  • 從 Pub/Sub 讀取提示詞,並將文字編碼為詞元張量。
  • 執行 RunInference 轉換。
  • 將輸出權杖張量解碼為文字,並將回覆寫入 Pub/Sub。

範例檔案如下所示:

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Runs a streaming RunInference Language Model pipeline."""

from __future__ import annotations

import logging

import apache_beam as beam
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.pytorch_inference import make_tensor_model_fn
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
from apache_beam.options.pipeline_options import PipelineOptions
import torch
from transformers import AutoConfig
from transformers import AutoModelForSeq2SeqLM
from transformers import AutoTokenizer
from transformers.tokenization_utils import PreTrainedTokenizer

MAX_RESPONSE_TOKENS = 256


def to_tensors(input_text: str, tokenizer: PreTrainedTokenizer) -> torch.Tensor:
    """Encodes input text into token tensors.

    Args:
        input_text: Input text for the language model.
        tokenizer: Tokenizer for the language model.

    Returns: Tokenized input tokens.
    """
    return tokenizer(input_text, return_tensors="pt").input_ids[0]


def decode_response(result: PredictionResult, tokenizer: PreTrainedTokenizer) -> str:
    """Decodes output token tensors into text.

    Args:
        result: Prediction results from the RunInference transform.
        tokenizer: Tokenizer for the language model.

    Returns: The model's response as text.
    """
    output_tokens = result.inference
    return tokenizer.decode(output_tokens, skip_special_tokens=True)


class AskModel(beam.PTransform):
    """Asks an language model a prompt message and gets its responses.

    Attributes:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        max_response_tokens: Maximum number of tokens for the model to generate.
    """

    def __init__(
        self,
        model_name: str,
        state_dict_path: str,
        max_response_tokens: int = MAX_RESPONSE_TOKENS,
    ) -> None:
        self.model_handler = PytorchModelHandlerTensor(
            state_dict_path=state_dict_path,
            model_class=AutoModelForSeq2SeqLM.from_config,
            model_params={"config": AutoConfig.from_pretrained(model_name)},
            inference_fn=make_tensor_model_fn("generate"),
        )
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.max_response_tokens = max_response_tokens

    def expand(self, pcollection: beam.PCollection[str]) -> beam.PCollection[str]:
        return (
            pcollection
            | "To tensors" >> beam.Map(to_tensors, self.tokenizer)
            | "RunInference"
            >> RunInference(
                self.model_handler,
                inference_args={"max_new_tokens": self.max_response_tokens},
            )
            | "Get response" >> beam.Map(decode_response, self.tokenizer)
        )


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--messages-topic",
        required=True,
        help="Pub/Sub topic for input text messages",
    )
    parser.add_argument(
        "--responses-topic",
        required=True,
        help="Pub/Sub topic for output text responses",
    )
    parser.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    args, beam_args = parser.parse_known_args()

    logging.getLogger().setLevel(logging.INFO)
    beam_options = PipelineOptions(
        beam_args,
        pickle_library="cloudpickle",
        streaming=True,
    )

    simple_name = args.model_name.split("/")[-1]
    pipeline = beam.Pipeline(options=beam_options)
    _ = (
        pipeline
        | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(args.messages_topic)
        | "Decode bytes" >> beam.Map(lambda msg: msg.decode("utf-8"))
        | f"Ask {simple_name}" >> AskModel(args.model_name, args.state_dict_path)
        | "Encode bytes" >> beam.Map(lambda msg: msg.encode("utf-8"))
        | "Write to Pub/Sub" >> beam.io.WriteToPubSub(args.responses_topic)
    )
    pipeline.run()

載入模型

LLM 可能是非常龐大的模型。以更多參數訓練的大型模型通常能提供更優異的結果。不過,模型越大,執行時就需要更大的機器和更多記憶體。在 CPU 上執行大型模型時,速度也可能較慢。

在 Dataflow 上執行 PyTorch 模型之前,您需要載入模型的 state_dict 物件。模型的state_dict 物件會儲存模型的權重。

在使用 Apache Beam RunInference 轉換的 Dataflow 管道中,模型的 state_dict 物件必須載入 Cloud Storage。用來將 state_dict 物件載入至 Cloud Storage 的機器,必須有足夠的記憶體來載入模型。機器也需要快速的網際網路連線,才能下載權重並上傳至 Cloud Storage。

下表顯示每個模型的參數數量,以及載入每個模型所需的最低記憶體。

型號 參數 所需記憶體
google/flan-t5-small 8,000 萬 > 320 MB
google/flan-t5-base 2.5 億 > 1 GB
google/flan-t5-large 7.8 億 > 3.2 GB
google/flan-t5-xl 30 億次 > 12 GB
google/flan-t5-xxl 110 億次 > 44 GB
google/flan-ul2 200 億次 > 80 GB

雖然您可以在本機載入較小的模型,但本教學課程會說明如何啟動 Vertex AI 自訂工作,並使用適當大小的 VM 載入模型。

由於 LLM 可能非常龐大,本教學課程中的範例會將 state_dict 物件儲存為 float16 格式,而非預設的 float32 格式。完成這項設定後,每個參數會使用 16 位元而非 32 位元,因此 state_dict 物件的大小會減半。模型越小,載入所需的時間就越短。不過,轉換格式表示 VM 必須將模型和 state_dict 物件都放入記憶體。

下表列出將 state_dict 物件儲存為 float16 格式後,載入模型的最低需求。表格也會顯示建議的機器類型,方便您使用 Vertex AI 載入模型。Vertex AI 的磁碟大小下限 (和預設值) 為 100 GB,但部分模型可能需要更大的磁碟。

模型名稱 所需記憶體 機型 VM 記憶體 VM 磁碟
google/flan-t5-small > 480 MB e2-standard-4 16 GB 100 GB
google/flan-t5-base > 1.5 GB e2-standard-4 16 GB 100 GB
google/flan-t5-large > 4.8 GB e2-standard-4 16 GB 100 GB
google/flan-t5-xl > 18 GB e2-highmem-4 32 GB 100 GB
google/flan-t5-xxl > 66 GB e2-highmem-16 128 GB 100 GB
google/flan-ul2 > 120 GB e2-highmem-16 128 GB 150 GB

使用 Vertex AI 自訂工作,將模型的 state_dict 物件載入 Cloud Storage:

python download_model.py vertex \
    --model-name="MODEL_NAME" \
    --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
    --job-name="Load MODEL_NAME" \
    --project="PROJECT_ID" \
    --bucket="BUCKET_NAME" \
    --location="LOCATION" \
    --machine-type="VERTEX_AI_MACHINE_TYPE" \
    --disk-size-gb="DISK_SIZE_GB"

更改下列內容:

  • MODEL_NAME:模型名稱,例如 google/flan-t5-xl
  • VERTEX_AI_MACHINE_TYPE:用於執行 Vertex AI 自訂作業的機器類型,例如 e2-highmem-4
  • DISK_SIZE_GB:VM 的磁碟大小 (單位為 GB)。大小下限為 100 GB。

視模型大小而定,載入模型可能需要幾分鐘。如要查看狀態,請前往 Vertex AI 的「自訂工作」頁面。

前往「自訂工作」

執行管道

載入模型後,請執行 Dataflow 管道。如要執行管道,模型和每個工作站使用的記憶體都必須符合記憶體容量。

下表列出建議用來執行推論管道的機型。

模型名稱 機型 VM 記憶體
google/flan-t5-small n2-highmem-2 16 GB
google/flan-t5-base n2-highmem-2 16 GB
google/flan-t5-large n2-highmem-4 32 GB
google/flan-t5-xl n2-highmem-4 32 GB
google/flan-t5-xxl n2-highmem-8 64 GB
google/flan-ul2 n2-highmem-16 128 GB

執行管道:

python main.py \
    --messages-topic="projects/PROJECT_ID/topics/PROMPTS_TOPIC_ID" \
    --responses-topic="projects/PROJECT_ID/topics/RESPONSES_TOPIC_ID" \
    --model-name="MODEL_NAME" \
    --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
    --runner="DataflowRunner" \
    --project="PROJECT_ID" \
    --temp_location="gs://BUCKET_NAME/temp" \
    --region="REGION" \
    --machine_type="DATAFLOW_MACHINE_TYPE" \
    --requirements_file="requirements.txt" \
    --requirements_cache="skip" \
    --experiments="use_sibling_sdk_workers" \
    --experiments="no_use_multiple_sdk_containers"

更改下列內容:

  • PROJECT_ID:專案 ID
  • PROMPTS_TOPIC_ID:要傳送至模型的輸入提示主題 ID
  • RESPONSES_TOPIC_ID:模型回應的主題 ID
  • MODEL_NAME:模型名稱,例如 google/flan-t5-xl
  • BUCKET_NAME:bucket 的名稱
  • REGION:要部署作業的區域,例如 us-central1
  • DATAFLOW_MACHINE_TYPE:要執行管道的 VM,例如 n2-highmem-4

如要確保每個工作站只載入一次模型,且不會耗盡記憶體,請設定管道選項 --experiments=no_use_multiple_sdk_containers,讓工作站使用單一程序。您不必限制執行緒數量,因為 RunInference 轉換會與多個執行緒共用同一個模型。

本範例中的管道會使用 CPU 執行。模型越大,處理每項要求所需的時間就越長。如需更快速的回覆,可以啟用 GPU

如要查看管道狀態,請前往 Dataflow 的「Jobs」(工作) 頁面。

前往工作

向模型提問

pipeline 開始執行後,您就可以向模型提供提示並接收回覆。

  1. 將訊息發布至 Pub/Sub,傳送提示。使用 gcloud pubsub topics publish 指令:

    gcloud pubsub topics publish PROMPTS_TOPIC_ID \
        --message="PROMPT_TEXT"
    

    PROMPT_TEXT 替換為包含您要提供提示的字串。在提示詞前後加上引號。

    使用自己的提示,或試試下列任一範例:

    • Translate to Spanish: My name is Luka
    • Complete this sentence: Once upon a time, there was a
    • Summarize the following text: Dataflow is a Google Cloud service that provides unified stream and batch data processing at scale. Use Dataflow to create data pipelines that read from one or more sources, transform the data, and write the data to a destination.
  2. 如要取得回應,請使用 gcloud pubsub subscriptions pull 指令

    視模型大小而定,模型可能需要幾分鐘才能生成回覆。模型越大,部署和生成回覆的時間就越長。

    gcloud pubsub subscriptions pull RESPONSES_SUBSCRIPTION_ID --auto-ack
    

    請將 RESPONSES_SUBSCRIPTION_ID 替換為模型回覆的訂閱 ID。

清除所用資源

為避免因為本教學課程所用資源,導致系統向 Google Cloud 收取費用,請刪除含有相關資源的專案,或者保留專案但刪除個別資源。

刪除專案

    刪除 Google Cloud 專案:

    gcloud projects delete PROJECT_ID

刪除個別資源

  1. 結束 Python 虛擬環境:

    deactivate
  2. 停止管道:

    1. 列出正在執行的 Dataflow 工作 ID,然後記下本教學課程工作的 ID:

      gcloud dataflow jobs list --region=REGION --status=active
    2. 取消工作:

      gcloud dataflow jobs cancel JOB_ID --region=REGION
  3. 刪除 bucket 和當中內容:

    gcloud storage rm gs://BUCKET_NAME --recursive
  4. 刪除主題和訂閱項目:

    gcloud pubsub topics delete PROMPTS_TOPIC_ID
    gcloud pubsub topics delete RESPONSES_TOPIC_ID
    gcloud pubsub subscriptions delete RESPONSES_SUBSCRIPTION_ID
  5. 撤銷您授予 Compute Engine 預設服務帳戶的角色。針對下列每個 IAM 角色,執行一次下列指令:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects remove-iam-policy-binding PROJECT_ID --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com --role=SERVICE_ACCOUNT_ROLE
  6. 選用:從 Google 帳戶撤銷角色。

    gcloud projects remove-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountUser
  7. 選用:撤銷您建立的驗證憑證,並刪除本機憑證檔案。

    gcloud auth application-default revoke
  8. 選用:從 gcloud CLI 撤銷憑證。

    gcloud auth revoke

後續步驟