LLM in einer Streamingpipeline ausführen

In dieser Anleitung erfahren Sie, wie Sie mithilfe der Apache Beam RunInference API ein großes Sprachmodell (LLM) in einer Dataflow-Streamingpipeline ausführen.

Weitere Informationen zur RunInference API finden Sie in der Apache Beam-Dokumentation unter Beam ML.

Der Beispielcode ist auf GitHub verfügbar.

Ziele

  • Pub/Sub-Themen und -Abos für die Eingaben und Antworten des Modells erstellen.
  • Das Modell mit einem benutzerdefinierten Vertex AI-Job in Cloud Storage laden.
  • Pipeline ausführen.
  • Dem Modell eine Frage stellen und eine Antwort erhalten.

Kosten

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen.

Neuen Nutzern von Google Cloud steht möglicherweise eine kostenlose Testversion zur Verfügung.

Nach Abschluss der in diesem Dokument beschriebenen Aufgaben können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.

Hinweis

Führen Sie diese Anleitung auf einem Computer aus, auf dem mindestens 5 GB freier Speicherplatz zur Installation der Abhängigkeiten verfügbar sind.

  1. Melden Sie sich in Ihrem Google Cloud -Konto an. Wenn Sie mit Google Cloudnoch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. Installieren Sie die Google Cloud CLI.

  3. Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.

  4. Führen Sie den folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  5. Erstellen Sie ein Google Cloud Projekt oder wählen Sie eines aus.

    Rollen, die zum Auswählen oder Erstellen eines Projekts erforderlich sind

    • Projekt auswählen: Für die Auswahl eines Projekts ist keine bestimmte IAM-Rolle erforderlich. Sie können jedes Projekt auswählen, für das Ihnen eine Rolle zugewiesen wurde.
    • Projekt erstellen: Zum Erstellen eines Projekts benötigen Sie die Rolle „Projektersteller“ (roles/resourcemanager.projectCreator), die die Berechtigung resourcemanager.projects.create enthält. Weitere Informationen zum Zuweisen von Rollen
    • So erstellen Sie ein Google Cloud -Projekt:

      gcloud projects create PROJECT_ID

      Ersetzen Sie PROJECT_ID durch einen Namen für das Google Cloud -Projekt, das Sie erstellen.

    • Wählen Sie das von Ihnen erstellte Google Cloud Projekt aus:

      gcloud config set project PROJECT_ID

      Ersetzen Sie PROJECT_ID durch den Namen Ihres Projekts in Google Cloud .

  6. Prüfen Sie, ob für Ihr Google Cloud Projekt die Abrechnung aktiviert ist.

  7. Aktivieren Sie die Dataflow API, die Compute Engine API, die Cloud Storage API, die Pub/Sub API und die Vertex AI API:

    Rollen, die zum Aktivieren von APIs erforderlich sind

    Zum Aktivieren von APIs benötigen Sie die IAM-Rolle „Service Usage-Administrator“ (roles/serviceusage.serviceUsageAdmin), die die Berechtigung serviceusage.services.enable enthält. Weitere Informationen zum Zuweisen von Rollen

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  8. Wenn Sie eine lokale Shell verwenden, erstellen Sie lokale Anmeldedaten zur Authentifizierung für Ihr Nutzerkonto:

    gcloud auth application-default login

    Wenn Sie Cloud Shell verwenden, müssen Sie das nicht tun.

    Wenn ein Authentifizierungsfehler zurückgegeben wird und Sie einen externen Identitätsanbieter (IdP) verwenden, prüfen Sie, ob Sie sich mit Ihrer föderierten Identität in der gcloud CLI angemeldet haben.

  9. Weisen Sie Ihrem Nutzerkonto Rollen zu. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Ihre Projekt-ID.
    • USER_IDENTIFIER: Die Kennung für Ihr Nutzerkonto . Beispiel: myemail@example.com
    • ROLE: Die IAM-Rolle, die Sie Ihrem Nutzerkonto zuweisen.
  10. Installieren Sie die Google Cloud CLI.

  11. Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.

  12. Führen Sie den folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  13. Erstellen Sie ein Google Cloud Projekt oder wählen Sie eines aus.

    Rollen, die zum Auswählen oder Erstellen eines Projekts erforderlich sind

    • Projekt auswählen: Für die Auswahl eines Projekts ist keine bestimmte IAM-Rolle erforderlich. Sie können jedes Projekt auswählen, für das Ihnen eine Rolle zugewiesen wurde.
    • Projekt erstellen: Zum Erstellen eines Projekts benötigen Sie die Rolle „Projektersteller“ (roles/resourcemanager.projectCreator), die die Berechtigung resourcemanager.projects.create enthält. Weitere Informationen zum Zuweisen von Rollen
    • So erstellen Sie ein Google Cloud -Projekt:

      gcloud projects create PROJECT_ID

      Ersetzen Sie PROJECT_ID durch einen Namen für das Google Cloud -Projekt, das Sie erstellen.

    • Wählen Sie das von Ihnen erstellte Google Cloud Projekt aus:

      gcloud config set project PROJECT_ID

      Ersetzen Sie PROJECT_ID durch den Namen Ihres Projekts in Google Cloud .

  14. Prüfen Sie, ob für Ihr Google Cloud Projekt die Abrechnung aktiviert ist.

  15. Aktivieren Sie die Dataflow API, die Compute Engine API, die Cloud Storage API, die Pub/Sub API und die Vertex AI API:

    Rollen, die zum Aktivieren von APIs erforderlich sind

    Zum Aktivieren von APIs benötigen Sie die IAM-Rolle „Service Usage-Administrator“ (roles/serviceusage.serviceUsageAdmin), die die Berechtigung serviceusage.services.enable enthält. Weitere Informationen zum Zuweisen von Rollen

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  16. Wenn Sie eine lokale Shell verwenden, erstellen Sie lokale Anmeldedaten zur Authentifizierung für Ihr Nutzerkonto:

    gcloud auth application-default login

    Wenn Sie Cloud Shell verwenden, müssen Sie das nicht tun.

    Wenn ein Authentifizierungsfehler zurückgegeben wird und Sie einen externen Identitätsanbieter (IdP) verwenden, prüfen Sie, ob Sie sich mit Ihrer föderierten Identität in der gcloud CLI angemeldet haben.

  17. Weisen Sie Ihrem Nutzerkonto Rollen zu. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Ihre Projekt-ID.
    • USER_IDENTIFIER: Die Kennung für Ihr Nutzerkonto . Beispiel: myemail@example.com
    • ROLE: Die IAM-Rolle, die Sie Ihrem Nutzerkonto zuweisen.
  18. Weisen Sie Ihrem Compute Engine-Standarddienstkonto Rollen zu. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    Ersetzen Sie dabei Folgendes:

    • PROJECT_ID: Ihre Projekt-ID.
    • PROJECT_NUMBER: Ihre Projektnummer. Verwenden Sie den Befehl gcloud projects describe, um Ihre Projektnummer zu ermitteln.
    • SERVICE_ACCOUNT_ROLE: Jede einzelne Rolle.
  19. Kopieren Sie die Google Cloud Projekt-ID. Sie benötigen diesen Wert später in dieser Anleitung.

Google Cloud -Ressourcen erstellen

In diesem Abschnitt werden die folgenden Ressourcen erstellt:

  • Ein Cloud Storage-Bucket, der als temporärer Speicherort verwendet werden soll
  • Ein Pub/Sub-Thema für die Prompts des Modells
  • Ein Pub/Sub-Thema und ‑Abo für die Antworten des Modells

Cloud Storage-Bucket erstellen

Erstellen Sie einen Cloud Storage-Bucket mit der gcloud CLI. Dieser Bucket wird von der Dataflow-Pipeline als temporärer Speicherort verwendet.

Verwenden Sie den Befehl gcloud storage buckets create, um den Bucket zu erstellen:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Ersetzen Sie dabei Folgendes:

Kopieren Sie den Bucket-Namen. Sie benötigen diesen Wert später in dieser Anleitung.

Pub/Sub-Themen und -Abos erstellen

Erstellen Sie zwei Pub/Sub-Themen und ein Abo. Ein Thema ist für die Prompts, die Sie an das Modell senden. Das andere Thema und das zugehörige Abo wird für die Antworten des Modells genutzt.

  1. Führen Sie zum Erstellen der Themen den gcloud pubsub topics create-Befehl zweimal aus, einmal für jedes Thema:

    gcloud pubsub topics create PROMPTS_TOPIC_ID
    gcloud pubsub topics create RESPONSES_TOPIC_ID
    

    Ersetzen Sie dabei Folgendes:

    • PROMPTS_TOPIC_ID: Die Themen-ID für die Prompts, die an das Modell gesendet werden sollen, z. B. prompts
    • RESPONSES_TOPIC_ID: Die Themen-ID für die Antworten des Modells, z. B. responses
  2. Verwenden Sie den Befehl gcloud pubsub subscriptions create, um das Abo zu erstellen und an Ihr Thema für Antworten anzuhängen:

    gcloud pubsub subscriptions create RESPONSES_SUBSCRIPTION_ID --topic=RESPONSES_TOPIC_ID
    

    Ersetzen Sie RESPONSES_SUBSCRIPTION_ID durch die Abo-ID für die Antworten des Modells, z. B. responses-subscription.

Kopieren Sie die Themen-IDs und die Abo-ID. Sie benötigen diese Werte später in dieser Anleitung.

Umgebung vorbereiten

Laden Sie die Codebeispiele herunter und richten Sie dann Ihre Umgebung ein, um die Anleitung auszuführen.

Die Codebeispiele im GitHub-Repository python-docs-samples enthalten den Code, den Sie zum Ausführen dieser Pipeline benötigen. Wenn Sie Ihre eigene Pipeline erstellen möchten, können Sie diesen Beispielcode als Vorlage verwenden.

Sie erstellen eine isolierte virtuelle Python-Umgebung, um Ihr Pipeline-Projekt mit venv auszuführen. In einer virtuellen Umgebung können Sie die Abhängigkeiten eines Projekts von den Abhängigkeiten anderer Projekte isolieren. Weitere Informationen zum Installieren von Python und zum Erstellen einer virtuellen Umgebung finden Sie unter Python-Entwicklungsumgebung einrichten.

  1. Klonen Sie das GitHub-Repository mit dem Befehl git clone:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    
  2. Rufen Sie das Verzeichnis run-inference auf:

    cd python-docs-samples/dataflow/run-inference
    
  3. Wenn Sie eine Eingabeaufforderung verwenden, prüfen Sie, ob Python 3 und pip in Ihrem System ausgeführt werden:

    python --version
    python -m pip --version
    

    Installieren Sie Python 3, falls erforderlich.

    Wenn Sie Cloud Shell verwenden, können Sie diesen Schritt überspringen, da Python bereits in Cloud Shell installiert ist.

  4. Erstellen Sie eine virtuelle Python-Umgebung::

    python -m venv /tmp/env
    source /tmp/env/bin/activate
    
  5. Installieren Sie die Abhängigkeiten:

    pip install -r requirements.txt --no-cache-dir
    

Beispielcode zum Laden von Modellen

Der Modellladecode in dieser Anleitung startet einen benutzerdefinierten Vertex AI-Job, der das state_dict-Objekt des Modells in Cloud Storage lädt.

Die Startdatei sieht so aus:

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Loads the state_dict for an LLM model into Cloud Storage."""

from __future__ import annotations

import os

import torch
from transformers import AutoModelForSeq2SeqLM


def run_local(model_name: str, state_dict_path: str) -> None:
    """Loads the state dict and saves it into the desired path.

    If the `state_dict_path` is a Cloud Storage location starting
    with "gs://", this assumes Cloud Storage is mounted with
    Cloud Storage FUSE in `/gcs`. Vertex AI is set up like this.

    Args:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
    """
    print(f"Loading model: {model_name}")
    model = AutoModelForSeq2SeqLM.from_pretrained(
        model_name, torch_dtype=torch.bfloat16
    )
    print(f"Model loaded, saving state dict to: {state_dict_path}")

    # Assume Cloud Storage FUSE is mounted in `/gcs`.
    state_dict_path = state_dict_path.replace("gs://", "/gcs/")
    directory = os.path.dirname(state_dict_path)
    if directory and not os.path.exists(directory):
        os.makedirs(os.path.dirname(state_dict_path), exist_ok=True)
    torch.save(model.state_dict(), state_dict_path)
    print("State dict saved successfully!")


def run_vertex_job(
    model_name: str,
    state_dict_path: str,
    job_name: str,
    project: str,
    bucket: str,
    location: str = "us-central1",
    machine_type: str = "e2-highmem-2",
    disk_size_gb: int = 100,
) -> None:
    """Launches a Vertex AI custom job to load the state dict.

    If the model is too large to fit into memory or disk, we can launch
    a Vertex AI custom job with a large enough VM for this to work.

    Depending on the model's size, it might require a different VM
    configuration. The model MUST fit into the VM's memory, and there
    must be enough disk space to stage the entire model while it gets
    copied to Cloud Storage.

    Args:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        job_name: Job display name in the Vertex AI console.
        project: Google Cloud Project ID.
        bucket: Cloud Storage bucket name, without the "gs://" prefix.
        location: Google Cloud regional location.
        machine_type: Machine type for the VM to run the job.
        disk_size_gb: Disk size in GB for the VM to run the job.
    """
    from google.cloud import aiplatform

    aiplatform.init(project=project, staging_bucket=bucket, location=location)

    job = aiplatform.CustomJob.from_local_script(
        display_name=job_name,
        container_uri="us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-13:latest",
        script_path="download_model.py",
        args=[
            "local",
            f"--model-name={model_name}",
            f"--state-dict-path={state_dict_path}",
        ],
        machine_type=machine_type,
        boot_disk_size_gb=disk_size_gb,
        requirements=["transformers"],
    )
    job.run()


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    subparsers = parser.add_subparsers(required=True)

    parser_local = subparsers.add_parser("local")
    parser_local.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser_local.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    parser_local.set_defaults(run=run_local)

    parser_vertex = subparsers.add_parser("vertex")
    parser_vertex.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser_vertex.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    parser_vertex.add_argument(
        "--job-name", required=True, help="Job display name in the Vertex AI console"
    )
    parser_vertex.add_argument(
        "--project", required=True, help="Google Cloud Project ID"
    )
    parser_vertex.add_argument(
        "--bucket",
        required=True,
        help='Cloud Storage bucket name, without the "gs://" prefix',
    )
    parser_vertex.add_argument(
        "--location", default="us-central1", help="Google Cloud regional location"
    )
    parser_vertex.add_argument(
        "--machine-type",
        default="e2-highmem-2",
        help="Machine type for the VM to run the job",
    )
    parser_vertex.add_argument(
        "--disk-size-gb",
        type=int,
        default=100,
        help="Disk size in GB for the VM to run the job",
    )
    parser_vertex.set_defaults(run=run_vertex_job)

    args = parser.parse_args()
    kwargs = args.__dict__.copy()
    kwargs.pop("run")

    args.run(**kwargs)

Beispiel für Pipelinecode

Der Pipelinecode in dieser Anleitung stellt eine Dataflow-Pipeline bereit, die Folgendes ausführt:

  • Liest einen Prompt aus Pub/Sub und codiert den Text in Token-Tensoren.
  • Führt die Transformation RunInference aus.
  • Decodiert die Ausgabetoken-Tensoren in Text und schreibt die Antwort in Pub/Sub.

Die Startdatei sieht so aus:

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Runs a streaming RunInference Language Model pipeline."""

from __future__ import annotations

import logging

import apache_beam as beam
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.pytorch_inference import make_tensor_model_fn
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
from apache_beam.options.pipeline_options import PipelineOptions
import torch
from transformers import AutoConfig
from transformers import AutoModelForSeq2SeqLM
from transformers import AutoTokenizer
from transformers.tokenization_utils import PreTrainedTokenizer

MAX_RESPONSE_TOKENS = 256


def to_tensors(input_text: str, tokenizer: PreTrainedTokenizer) -> torch.Tensor:
    """Encodes input text into token tensors.

    Args:
        input_text: Input text for the language model.
        tokenizer: Tokenizer for the language model.

    Returns: Tokenized input tokens.
    """
    return tokenizer(input_text, return_tensors="pt").input_ids[0]


def decode_response(result: PredictionResult, tokenizer: PreTrainedTokenizer) -> str:
    """Decodes output token tensors into text.

    Args:
        result: Prediction results from the RunInference transform.
        tokenizer: Tokenizer for the language model.

    Returns: The model's response as text.
    """
    output_tokens = result.inference
    return tokenizer.decode(output_tokens, skip_special_tokens=True)


class AskModel(beam.PTransform):
    """Asks an language model a prompt message and gets its responses.

    Attributes:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        max_response_tokens: Maximum number of tokens for the model to generate.
    """

    def __init__(
        self,
        model_name: str,
        state_dict_path: str,
        max_response_tokens: int = MAX_RESPONSE_TOKENS,
    ) -> None:
        self.model_handler = PytorchModelHandlerTensor(
            state_dict_path=state_dict_path,
            model_class=AutoModelForSeq2SeqLM.from_config,
            model_params={"config": AutoConfig.from_pretrained(model_name)},
            inference_fn=make_tensor_model_fn("generate"),
        )
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.max_response_tokens = max_response_tokens

    def expand(self, pcollection: beam.PCollection[str]) -> beam.PCollection[str]:
        return (
            pcollection
            | "To tensors" >> beam.Map(to_tensors, self.tokenizer)
            | "RunInference"
            >> RunInference(
                self.model_handler,
                inference_args={"max_new_tokens": self.max_response_tokens},
            )
            | "Get response" >> beam.Map(decode_response, self.tokenizer)
        )


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--messages-topic",
        required=True,
        help="Pub/Sub topic for input text messages",
    )
    parser.add_argument(
        "--responses-topic",
        required=True,
        help="Pub/Sub topic for output text responses",
    )
    parser.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    args, beam_args = parser.parse_known_args()

    logging.getLogger().setLevel(logging.INFO)
    beam_options = PipelineOptions(
        beam_args,
        pickle_library="cloudpickle",
        streaming=True,
    )

    simple_name = args.model_name.split("/")[-1]
    pipeline = beam.Pipeline(options=beam_options)
    _ = (
        pipeline
        | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(args.messages_topic)
        | "Decode bytes" >> beam.Map(lambda msg: msg.decode("utf-8"))
        | f"Ask {simple_name}" >> AskModel(args.model_name, args.state_dict_path)
        | "Encode bytes" >> beam.Map(lambda msg: msg.encode("utf-8"))
        | "Write to Pub/Sub" >> beam.io.WriteToPubSub(args.responses_topic)
    )
    pipeline.run()

Modell laden

LLMs können sehr große Modelle sein. Größere Modelle, die mit mehr Parametern trainiert werden, liefern in der Regel bessere Ergebnisse. Größere Modelle benötigen jedoch einen größeren Rechner und mehr Arbeitsspeicher. Größere Modelle können auf CPUs auch langsamer ausgeführt werden.

Bevor Sie ein PyTorch-Modell in Dataflow ausführen können, müssen Sie das state_dict-Objekt des Modells laden. Im state_dict-Objekt eines Modells werden die Gewichte für das Modell gespeichert.

In einer Dataflow-Pipeline, in der die Apache Beam-Transformation RunInference verwendet wird, muss das state_dict-Objekt des Modells in Cloud Storage geladen werden. Der Rechner, den Sie zum Laden des state_dict-Objekts in Cloud Storage verwenden, muss genügend Arbeitsspeicher haben, um das Modell zu laden. Der Rechner benötigt außerdem eine schnelle Internetverbindung, um die Gewichtungen herunterzuladen und in Cloud Storage hochzuladen.

In der folgenden Tabelle sehen Sie die Anzahl der Parameter für die einzelnen Modelle und den Mindestspeicher, der zum Laden der jeweiligen Modelle erforderlich ist.

Modell Parameter Erforderlicher Arbeitsspeicher
google/flan-t5-small 80 Millionen > 320 MB
google/flan-t5-base 250 Millionen > 1 GB
google/flan-t5-large 780 Millionen > 3,2 GB
google/flan-t5-xl 3 Milliarden > 12 GB
google/flan-t5-xxl 11 Milliarden > 44 GB
google/flan-ul2 20 Milliarden > 80 GB

Sie können zwar ein kleineres Modell lokal laden, in dieser Anleitung wird jedoch gezeigt, wie Sie einen benutzerdefinierten Vertex AI-Job starten, der das Modell mit einer VM mit angemessener Größe lädt.

Da LLMs so groß sein können, wird das state_dict-Objekt im Beispiel in diesem Tutorial im float16-Format anstelle des Standardformats float32 gespeichert. Bei dieser Konfiguration verwendet jeder Parameter 16 Bits anstelle von 32 Bits, sodass das state_dict-Objekt nur halb so groß ist. Eine kleinere Größe minimiert die Zeit, die zum Laden des Modells benötigt wird. Die Konvertierung des Formats bedeutet jedoch, dass die VM sowohl das Modell als auch das state_dict-Objekt in ihren Speicher aufnehmen muss.

In der folgenden Tabelle sind die Mindestanforderungen für das Laden eines Modells aufgeführt, nachdem das state_dict-Objekt im float16-Format gespeichert wurde. Die Tabelle enthält auch die vorgeschlagenen Rechnertypen zum Laden eines Modells mithilfe von Vertex AI. Die minimale (und standardmäßige) Laufwerkgröße für Vertex AI beträgt 100 GB. Bei einigen Modellen ist jedoch möglicherweise ein größeres Laufwerk erforderlich.

Modellname Erforderlicher Arbeitsspeicher Maschinentyp VM-Arbeitsspeicher VM-Laufwerk
google/flan-t5-small > 480 MB e2-standard-4 16 GB 100 GB
google/flan-t5-base > 1,5 GB e2-standard-4 16 GB 100 GB
google/flan-t5-large > 4,8 GB e2-standard-4 16 GB 100 GB
google/flan-t5-xl > 18 GB e2-highmem-4 32 GB 100 GB
google/flan-t5-xxl > 66 GB e2-highmem-16 128 GB 100 GB
google/flan-ul2 > 120 GB e2-highmem-16 128 GB 150 GB

Laden Sie das state_dict-Objekt des Modells mit einem benutzerdefinierten Vertex AI-Job in Cloud Storage:

python download_model.py vertex \
    --model-name="MODEL_NAME" \
    --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
    --job-name="Load MODEL_NAME" \
    --project="PROJECT_ID" \
    --bucket="BUCKET_NAME" \
    --location="LOCATION" \
    --machine-type="VERTEX_AI_MACHINE_TYPE" \
    --disk-size-gb="DISK_SIZE_GB"

Ersetzen Sie dabei Folgendes:

  • MODEL_NAME: Der Name des Modells, z. B. google/flan-t5-xl.
  • VERTEX_AI_MACHINE_TYPE: Der Maschinentyp, auf dem der benutzerdefinierte Vertex AI-Job ausgeführt werden soll, z. B. e2-highmem-4.
  • DISK_SIZE_GB: Die Laufwerksgröße für die VM in GB. Die Mindestgröße beträgt 100 GB.

Je nach Größe des Modells kann es einige Minuten dauern, bis es geladen ist. Den Status können Sie auf der Seite Benutzerdefinierte Jobs von Vertex AI aufrufen.

Benutzerdefinierte Jobs aufrufen

Pipeline ausführen

Nachdem Sie das Modell geladen haben, führen Sie die Dataflow-Pipeline aus. Zum Ausführen der Pipeline müssen sowohl das Modell als auch der von den verschiedenen Workers verwendete Arbeitsspeicher in den Speicher passen.

In der folgenden Tabelle sind die empfohlenen Maschinentypen für die Ausführung einer Inferenzpipeline aufgeführt.

Modellname Maschinentyp VM-Arbeitsspeicher
google/flan-t5-small n2-highmem-2 16 GB
google/flan-t5-base n2-highmem-2 16 GB
google/flan-t5-large n2-highmem-4 32 GB
google/flan-t5-xl n2-highmem-4 32 GB
google/flan-t5-xxl n2-highmem-8 64 GB
google/flan-ul2 n2-highmem-16 128 GB

Führen Sie die Pipeline aus:

python main.py \
    --messages-topic="projects/PROJECT_ID/topics/PROMPTS_TOPIC_ID" \
    --responses-topic="projects/PROJECT_ID/topics/RESPONSES_TOPIC_ID" \
    --model-name="MODEL_NAME" \
    --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
    --runner="DataflowRunner" \
    --project="PROJECT_ID" \
    --temp_location="gs://BUCKET_NAME/temp" \
    --region="REGION" \
    --machine_type="DATAFLOW_MACHINE_TYPE" \
    --requirements_file="requirements.txt" \
    --requirements_cache="skip" \
    --experiments="use_sibling_sdk_workers" \
    --experiments="no_use_multiple_sdk_containers"

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die Projekt-ID
  • PROMPTS_TOPIC_ID: die Themen-ID für die Prompts, die an das Modell gesendet werden sollen
  • RESPONSES_TOPIC_ID: Die Themen-ID für die Antworten des Modells
  • MODEL_NAME: der Name des Modells, z. B. google/flan-t5-xl.
  • BUCKET_NAME: Der Name des Buckets
  • REGION: die Region, in der der Job bereitgestellt werden soll, z. B. us-central1
  • DATAFLOW_MACHINE_TYPE: die VM, auf der die Pipeline ausgeführt werden soll, z. B. n2-highmem-4

Damit das Modell nur einmal pro Worker geladen wird und genügend Arbeitsspeicher zur Verfügung steht, konfigurieren Sie die Worker zur Verwendung einer einzelnen Methode. Dazu legen Sie die Pipelineoption --experiments=no_use_multiple_sdk_containers fest. Sie müssen die Anzahl der Threads nicht begrenzen, da die RunInference-Transformation dasselbe Modell mit mehreren Threads teilt.

Die Pipeline in diesem Beispiel wird mit CPUs ausgeführt. Bei einem größeren Modell ist mehr Zeit für die Verarbeitung jeder Anfrage erforderlich. Sie können GPUs aktivieren, wenn Sie schnellere Antworten benötigen.

Rufen Sie die Dataflow-Seite Jobs auf, um den Status der Pipeline aufzurufen.

ZU JOBS

Dem Modell eine Frage stellen

Nachdem die Pipeline ausgeführt wird, übergeben Sie dem Modell einen Prompt und erhalten eine Antwort.

  1. Senden Sie eine Nachricht an Pub/Sub, um Ihren Prompt zu senden. Führen Sie den Befehl gcloud pubsub topics publish aus:

    gcloud pubsub topics publish PROMPTS_TOPIC_ID \
        --message="PROMPT_TEXT"
    

    Ersetzen Sie PROMPT_TEXT durch einen String, der den Prompt enthält, den Sie bereitstellen möchten. Setzen Sie den Prompt in Anführungszeichen.

    Verwenden Sie einen eigenen Prompt oder versuchen Sie es mit einem der folgenden Beispiele:

    • Translate to Spanish: My name is Luka
    • Complete this sentence: Once upon a time, there was a
    • Summarize the following text: Dataflow is a Google Cloud service that provides unified stream and batch data processing at scale. Use Dataflow to create data pipelines that read from one or more sources, transform the data, and write the data to a destination.
  2. Verwenden Sie den Befehl gcloud pubsub subscriptions pull, um die Antwort abzurufen.

    Je nach Größe des Modells kann es einige Minuten dauern, bis eine Antwort generiert wird. Bei größeren Modellen dauern Bereitstellung und Generation einer Antwort länger.

    gcloud pubsub subscriptions pull RESPONSES_SUBSCRIPTION_ID --auto-ack
    

    Ersetzen Sie RESPONSES_SUBSCRIPTION_ID durch die Abo-ID für die Antworten des Modells.

Bereinigen

Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder Sie behalten das Projekt und löschen die einzelnen Ressourcen.

Projekt löschen

    Google Cloud -Projekt löschen:

    gcloud projects delete PROJECT_ID

Einzelne Ressourcen löschen

  1. Beenden Sie die virtuelle Python-Umgebung:

    deactivate
  2. Beenden Sie die Pipeline:

    1. Listen Sie die Job-IDs der Dataflow-Jobs auf, die derzeit ausgeführt werden, und notieren Sie sich die Job-ID des Jobs für das Tutorial:

      gcloud dataflow jobs list --region=REGION --status=active
    2. Job abbrechen:

      gcloud dataflow jobs cancel JOB_ID --region=REGION
  3. Löschen Sie den Bucket und seinen Inhalt:

    gcloud storage rm gs://BUCKET_NAME --recursive
  4. Löschen Sie die Themen und das Abo:

    gcloud pubsub topics delete PROMPTS_TOPIC_ID
    gcloud pubsub topics delete RESPONSES_TOPIC_ID
    gcloud pubsub subscriptions delete RESPONSES_SUBSCRIPTION_ID
  5. Widerrufen Sie die Rollen, die Sie dem Compute Engine-Standarddienstkonto zugewiesen haben. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects remove-iam-policy-binding PROJECT_ID --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com --role=SERVICE_ACCOUNT_ROLE
  6. Optional: Löschen Sie Rollen aus Ihrem Google-Konto.

    gcloud projects remove-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountUser
  7. Optional: Widerrufen Sie die von Ihnen erstellten Anmeldedaten für die Authentifizierung und löschen Sie die lokale Datei mit den Anmeldedaten:

    gcloud auth application-default revoke
  8. Optional: Widerrufen Sie Anmeldedaten von der gcloud-CLI.

    gcloud auth revoke

Nächste Schritte