Esegui un LLM in una pipeline di streaming

Questo tutorial mostra come eseguire un modello linguistico di grandi dimensioni (LLM) in una pipeline Dataflow di streaming utilizzando l'API RunInference di Apache Beam.

Per saperne di più sull'API RunInference, consulta la sezione Informazioni su Beam ML nella documentazione di Apache Beam.

Il codice di esempio è disponibile su GitHub.

Obiettivi

  • Crea argomenti e sottoscrizioni Pub/Sub per l'input e le risposte del modello.
  • Carica il modello in Cloud Storage utilizzando un job personalizzato di Vertex AI.
  • Esegui la pipeline.
  • Fai una domanda al modello e ricevi una risposta.

Costi

In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il calcolatore prezzi.

I nuovi utenti di Google Cloud potrebbero avere diritto a una prova senza costi.

Al termine delle attività descritte in questo documento, puoi evitare l'addebito di ulteriori costi eliminando le risorse che hai creato. Per saperne di più, consulta Esegui la pulizia.

Prima di iniziare

Esegui questo tutorial su una macchina con almeno 5 GB di spazio libero per installare le dipendenze.

  1. Accedi al tuo account Google Cloud . Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti senza costi per l'esecuzione, il test e il deployment dei workload.
  2. Installa Google Cloud CLI.

  3. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere a gcloud CLI con la tua identità federata.

  4. Per inizializzare gcloud CLI, esegui questo comando:

    gcloud init
  5. Crea o seleziona un Google Cloud progetto.

    Ruoli richiesti per selezionare o creare un progetto

    • Seleziona un progetto: la selezione di un progetto non richiede un ruolo IAM specifico. Puoi selezionare qualsiasi progetto per il quale ti è stato concesso un ruolo.
    • Crea un progetto: per creare un progetto, devi disporre del ruolo Autore progetto (roles/resourcemanager.projectCreator), che contiene l'autorizzazione resourcemanager.projects.create. Scopri come concedere i ruoli.
    • Creare un progetto Google Cloud :

      gcloud projects create PROJECT_ID

      Sostituisci PROJECT_ID con un nome per il progetto Google Cloud che stai creando.

    • Seleziona il progetto Google Cloud che hai creato:

      gcloud config set project PROJECT_ID

      Sostituisci PROJECT_ID con il nome del progetto Google Cloud .

  6. Verifica che la fatturazione sia abilitata per il tuo progetto Google Cloud .

  7. Abilita le API Dataflow, Compute Engine, Cloud Storage, Pub/Sub e Vertex AI:

    Ruoli richiesti per abilitare le API

    Per abilitare le API, devi disporre del ruolo IAM Amministratore utilizzo dei servizi (roles/serviceusage.serviceUsageAdmin), che include l'autorizzazione serviceusage.services.enable. Scopri come concedere i ruoli.

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  8. Se utilizzi una shell locale, crea credenziali di autenticazione locali per il tuo account utente:

    gcloud auth application-default login

    Non è necessario eseguire questa operazione se utilizzi Cloud Shell.

    Se viene restituito un errore di autenticazione e utilizzi un provider di identità (IdP) esterno, verifica di aver acceduto a gcloud CLI con la tua identità federata.

  9. Concedi ruoli al tuo account utente. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Sostituisci quanto segue:

    • PROJECT_ID: il tuo ID progetto.
    • USER_IDENTIFIER: l'identificatore del tuo account utente . Ad esempio: myemail@example.com.
    • ROLE: il ruolo IAM che concedi al tuo account utente.
  10. Installa Google Cloud CLI.

  11. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere a gcloud CLI con la tua identità federata.

  12. Per inizializzare gcloud CLI, esegui questo comando:

    gcloud init
  13. Crea o seleziona un Google Cloud progetto.

    Ruoli richiesti per selezionare o creare un progetto

    • Seleziona un progetto: la selezione di un progetto non richiede un ruolo IAM specifico. Puoi selezionare qualsiasi progetto per il quale ti è stato concesso un ruolo.
    • Crea un progetto: per creare un progetto, devi disporre del ruolo Autore progetto (roles/resourcemanager.projectCreator), che contiene l'autorizzazione resourcemanager.projects.create. Scopri come concedere i ruoli.
    • Creare un progetto Google Cloud :

      gcloud projects create PROJECT_ID

      Sostituisci PROJECT_ID con un nome per il progetto Google Cloud che stai creando.

    • Seleziona il progetto Google Cloud che hai creato:

      gcloud config set project PROJECT_ID

      Sostituisci PROJECT_ID con il nome del progetto Google Cloud .

  14. Verifica che la fatturazione sia abilitata per il tuo progetto Google Cloud .

  15. Abilita le API Dataflow, Compute Engine, Cloud Storage, Pub/Sub e Vertex AI:

    Ruoli richiesti per abilitare le API

    Per abilitare le API, devi disporre del ruolo IAM Amministratore utilizzo dei servizi (roles/serviceusage.serviceUsageAdmin), che include l'autorizzazione serviceusage.services.enable. Scopri come concedere i ruoli.

    gcloud services enable dataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  16. Se utilizzi una shell locale, crea credenziali di autenticazione locali per il tuo account utente:

    gcloud auth application-default login

    Non è necessario eseguire questa operazione se utilizzi Cloud Shell.

    Se viene restituito un errore di autenticazione e utilizzi un provider di identità (IdP) esterno, verifica di aver acceduto a gcloud CLI con la tua identità federata.

  17. Concedi ruoli al tuo account utente. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Sostituisci quanto segue:

    • PROJECT_ID: il tuo ID progetto.
    • USER_IDENTIFIER: l'identificatore del tuo account utente . Ad esempio: myemail@example.com.
    • ROLE: il ruolo IAM che concedi al tuo account utente.
  18. Concedi ruoli al account di servizio Compute Engine predefinito. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    Sostituisci quanto segue:

    • PROJECT_ID: il tuo ID progetto.
    • PROJECT_NUMBER: il numero del progetto. Per trovare il numero di progetto, utilizza il comando gcloud projects describe.
    • SERVICE_ACCOUNT_ROLE: ogni singolo ruolo.
  19. Copia l'ID progetto Google Cloud . Questo valore ti servirà più avanti in questo tutorial.

Crea le Google Cloud risorse

Questa sezione spiega come creare le seguenti risorse:

  • Un bucket Cloud Storage da utilizzare come posizione di archiviazione temporanea
  • Un argomento Pub/Sub per i prompt del modello
  • Un argomento e una sottoscrizione Pub/Sub per le risposte del modello

Crea un bucket Cloud Storage

Crea un bucket Cloud Storage utilizzando gcloud CLI. Questo bucket viene utilizzato come posizione di archiviazione temporanea dalla pipeline Dataflow.

Per creare il bucket, utilizza il comando gcloud storage buckets create:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Sostituisci quanto segue:

Copia il nome del bucket. Questo valore ti servirà più avanti in questo tutorial.

Crea argomenti e sottoscrizioni Pub/Sub

Crea due argomenti Pub/Sub e una sottoscrizione. Un argomento riguarda i prompt di input che invii al modello. L'altro argomento e il relativo abbonamento sono per le risposte del modello.

  1. Per creare gli argomenti, esegui il comando gcloud pubsub topics create due volte, una per ogni argomento:

    gcloud pubsub topics create PROMPTS_TOPIC_ID
    gcloud pubsub topics create RESPONSES_TOPIC_ID
    

    Sostituisci quanto segue:

    • PROMPTS_TOPIC_ID: l'ID argomento per i prompt di input da inviare al modello, ad esempio prompts
    • RESPONSES_TOPIC_ID: l'ID argomento per le risposte del modello, ad esempio responses
  2. Per creare la sottoscrizione e collegarla all'argomento delle risposte, utilizza il comando gcloud pubsub subscriptions create:

    gcloud pubsub subscriptions create RESPONSES_SUBSCRIPTION_ID --topic=RESPONSES_TOPIC_ID
    

    Sostituisci RESPONSES_SUBSCRIPTION_ID con l'ID abbonamento per le risposte del modello, ad esempio responses-subscription.

Copia gli ID argomento e l'ID abbonamento. Questi valori ti serviranno più avanti in questo tutorial.

prepara l'ambiente

Scarica gli esempi di codice e configura l'ambiente per eseguire il tutorial.

Gli esempi di codice nel repository GitHub python-docs-samples forniscono il codice necessario per eseguire questa pipeline. Quando sarai pronto a creare la tua pipeline, potrai utilizzare questo codice campione come modello.

Crea un ambiente virtuale Python isolato per eseguire il progetto della pipeline utilizzando venv. Un ambiente virtuale consente di isolare le dipendenze di un progetto da quelle di altri progetti. Per saperne di più su come installare Python e creare un ambiente virtuale, vedi Configurazione di un ambiente di sviluppo Python.

  1. Utilizza il comando git clone per clonare il repository GitHub:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    
  2. Vai alla directory run-inference:

    cd python-docs-samples/dataflow/run-inference
    
  3. Se utilizzi un prompt dei comandi, verifica che nel sistema siano in esecuzione Python 3 e pip:

    python --version
    python -m pip --version
    

    Se necessario, installa Python 3.

    Se utilizzi Cloud Shell, puoi saltare questo passaggio perché Python è già installato.

  4. Crea un ambiente virtuale Python:

    python -m venv /tmp/env
    source /tmp/env/bin/activate
    
  5. Installa le dipendenze:

    pip install -r requirements.txt --no-cache-dir
    

Esempio di codice di caricamento del modello

Il codice di caricamento del modello in questo tutorial avvia un job personalizzato di Vertex AI che carica l'oggetto state_dict del modello in Cloud Storage.

Il file iniziale è simile al seguente:

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Loads the state_dict for an LLM model into Cloud Storage."""

from __future__ import annotations

import os

import torch
from transformers import AutoModelForSeq2SeqLM


def run_local(model_name: str, state_dict_path: str) -> None:
    """Loads the state dict and saves it into the desired path.

    If the `state_dict_path` is a Cloud Storage location starting
    with "gs://", this assumes Cloud Storage is mounted with
    Cloud Storage FUSE in `/gcs`. Vertex AI is set up like this.

    Args:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
    """
    print(f"Loading model: {model_name}")
    model = AutoModelForSeq2SeqLM.from_pretrained(
        model_name, torch_dtype=torch.bfloat16
    )
    print(f"Model loaded, saving state dict to: {state_dict_path}")

    # Assume Cloud Storage FUSE is mounted in `/gcs`.
    state_dict_path = state_dict_path.replace("gs://", "/gcs/")
    directory = os.path.dirname(state_dict_path)
    if directory and not os.path.exists(directory):
        os.makedirs(os.path.dirname(state_dict_path), exist_ok=True)
    torch.save(model.state_dict(), state_dict_path)
    print("State dict saved successfully!")


def run_vertex_job(
    model_name: str,
    state_dict_path: str,
    job_name: str,
    project: str,
    bucket: str,
    location: str = "us-central1",
    machine_type: str = "e2-highmem-2",
    disk_size_gb: int = 100,
) -> None:
    """Launches a Vertex AI custom job to load the state dict.

    If the model is too large to fit into memory or disk, we can launch
    a Vertex AI custom job with a large enough VM for this to work.

    Depending on the model's size, it might require a different VM
    configuration. The model MUST fit into the VM's memory, and there
    must be enough disk space to stage the entire model while it gets
    copied to Cloud Storage.

    Args:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        job_name: Job display name in the Vertex AI console.
        project: Google Cloud Project ID.
        bucket: Cloud Storage bucket name, without the "gs://" prefix.
        location: Google Cloud regional location.
        machine_type: Machine type for the VM to run the job.
        disk_size_gb: Disk size in GB for the VM to run the job.
    """
    from google.cloud import aiplatform

    aiplatform.init(project=project, staging_bucket=bucket, location=location)

    job = aiplatform.CustomJob.from_local_script(
        display_name=job_name,
        container_uri="us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-13:latest",
        script_path="download_model.py",
        args=[
            "local",
            f"--model-name={model_name}",
            f"--state-dict-path={state_dict_path}",
        ],
        machine_type=machine_type,
        boot_disk_size_gb=disk_size_gb,
        requirements=["transformers"],
    )
    job.run()


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    subparsers = parser.add_subparsers(required=True)

    parser_local = subparsers.add_parser("local")
    parser_local.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser_local.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    parser_local.set_defaults(run=run_local)

    parser_vertex = subparsers.add_parser("vertex")
    parser_vertex.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser_vertex.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    parser_vertex.add_argument(
        "--job-name", required=True, help="Job display name in the Vertex AI console"
    )
    parser_vertex.add_argument(
        "--project", required=True, help="Google Cloud Project ID"
    )
    parser_vertex.add_argument(
        "--bucket",
        required=True,
        help='Cloud Storage bucket name, without the "gs://" prefix',
    )
    parser_vertex.add_argument(
        "--location", default="us-central1", help="Google Cloud regional location"
    )
    parser_vertex.add_argument(
        "--machine-type",
        default="e2-highmem-2",
        help="Machine type for the VM to run the job",
    )
    parser_vertex.add_argument(
        "--disk-size-gb",
        type=int,
        default=100,
        help="Disk size in GB for the VM to run the job",
    )
    parser_vertex.set_defaults(run=run_vertex_job)

    args = parser.parse_args()
    kwargs = args.__dict__.copy()
    kwargs.pop("run")

    args.run(**kwargs)

Esempio di codice della pipeline

Il codice della pipeline in questo tutorial esegue il deployment di una pipeline Dataflow che esegue le seguenti operazioni:

  • Legge un prompt da Pub/Sub e codifica il testo in tensori di token.
  • Esegue la trasformazione RunInference.
  • Decodifica i tensori dei token di output in testo e scrive la risposta in Pub/Sub.

Il file iniziale è simile al seguente:

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Runs a streaming RunInference Language Model pipeline."""

from __future__ import annotations

import logging

import apache_beam as beam
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.pytorch_inference import make_tensor_model_fn
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
from apache_beam.options.pipeline_options import PipelineOptions
import torch
from transformers import AutoConfig
from transformers import AutoModelForSeq2SeqLM
from transformers import AutoTokenizer
from transformers.tokenization_utils import PreTrainedTokenizer

MAX_RESPONSE_TOKENS = 256


def to_tensors(input_text: str, tokenizer: PreTrainedTokenizer) -> torch.Tensor:
    """Encodes input text into token tensors.

    Args:
        input_text: Input text for the language model.
        tokenizer: Tokenizer for the language model.

    Returns: Tokenized input tokens.
    """
    return tokenizer(input_text, return_tensors="pt").input_ids[0]


def decode_response(result: PredictionResult, tokenizer: PreTrainedTokenizer) -> str:
    """Decodes output token tensors into text.

    Args:
        result: Prediction results from the RunInference transform.
        tokenizer: Tokenizer for the language model.

    Returns: The model's response as text.
    """
    output_tokens = result.inference
    return tokenizer.decode(output_tokens, skip_special_tokens=True)


class AskModel(beam.PTransform):
    """Asks an language model a prompt message and gets its responses.

    Attributes:
        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.
        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.
        max_response_tokens: Maximum number of tokens for the model to generate.
    """

    def __init__(
        self,
        model_name: str,
        state_dict_path: str,
        max_response_tokens: int = MAX_RESPONSE_TOKENS,
    ) -> None:
        self.model_handler = PytorchModelHandlerTensor(
            state_dict_path=state_dict_path,
            model_class=AutoModelForSeq2SeqLM.from_config,
            model_params={"config": AutoConfig.from_pretrained(model_name)},
            inference_fn=make_tensor_model_fn("generate"),
        )
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.max_response_tokens = max_response_tokens

    def expand(self, pcollection: beam.PCollection[str]) -> beam.PCollection[str]:
        return (
            pcollection
            | "To tensors" >> beam.Map(to_tensors, self.tokenizer)
            | "RunInference"
            >> RunInference(
                self.model_handler,
                inference_args={"max_new_tokens": self.max_response_tokens},
            )
            | "Get response" >> beam.Map(decode_response, self.tokenizer)
        )


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--messages-topic",
        required=True,
        help="Pub/Sub topic for input text messages",
    )
    parser.add_argument(
        "--responses-topic",
        required=True,
        help="Pub/Sub topic for output text responses",
    )
    parser.add_argument(
        "--model-name",
        required=True,
        help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",
    )
    parser.add_argument(
        "--state-dict-path",
        required=True,
        help="File path to the model's state_dict, can be in Cloud Storage",
    )
    args, beam_args = parser.parse_known_args()

    logging.getLogger().setLevel(logging.INFO)
    beam_options = PipelineOptions(
        beam_args,
        pickle_library="cloudpickle",
        streaming=True,
    )

    simple_name = args.model_name.split("/")[-1]
    pipeline = beam.Pipeline(options=beam_options)
    _ = (
        pipeline
        | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(args.messages_topic)
        | "Decode bytes" >> beam.Map(lambda msg: msg.decode("utf-8"))
        | f"Ask {simple_name}" >> AskModel(args.model_name, args.state_dict_path)
        | "Encode bytes" >> beam.Map(lambda msg: msg.encode("utf-8"))
        | "Write to Pub/Sub" >> beam.io.WriteToPubSub(args.responses_topic)
    )
    pipeline.run()

Carica il modello

Gli LLM possono essere modelli molto grandi. I modelli più grandi addestrati con più parametri generalmente danno risultati migliori. Tuttavia, i modelli più grandi richiedono una macchina più grande e più memoria per essere eseguiti. I modelli più grandi possono anche essere più lenti da eseguire sulle CPU.

Prima di eseguire un modello PyTorch su Dataflow, devi caricare l'oggetto state_dict del modello. L'oggetto state_dict di un modello memorizza i pesi del modello.

In una pipeline Dataflow che utilizza la trasformazione RunInference di Apache Beam, l'oggetto state_dict del modello deve essere caricato in Cloud Storage. La macchina che utilizzi per caricare l'oggettostate_dict in Cloud Storage deve disporre di memoria sufficiente per caricare il modello. La macchina ha anche bisogno di una connessione a internet veloce per scaricare i pesi e caricarli in Cloud Storage.

La tabella seguente mostra il numero di parametri per ogni modello e la memoria minima necessaria per caricare ciascun modello.

Modello Parametri Memoria necessaria
google/flan-t5-small 80 milioni > 320 MB
google/flan-t5-base 250 milioni > 1 GB
google/flan-t5-large 780 milioni > 3,2 GB
google/flan-t5-xl 3 miliardi > 12 GB
google/flan-t5-xxl 11 miliardi > 44 GB
google/flan-ul2 20 miliardi > 80 GB

Sebbene sia possibile caricare un modello più piccolo in locale, questo tutorial mostra come avviare un job personalizzato Vertex AI che carica il modello con una VM di dimensioni appropriate.

Poiché gli LLM possono essere molto grandi, l'esempio di questo tutorial salva l'oggetto state_dict in formato float16 anziché nel formato float32 predefinito. Con questa configurazione, ogni parametro utilizza 16 bit anziché 32 bit, riducendo le dimensioni dell'oggetto state_dict della metà. Una dimensione più piccola riduce al minimo il tempo necessario per caricare il modello. Tuttavia, la conversione del formato implica che la VM deve adattare sia il modello sia l'oggetto state_dict alla memoria.

La seguente tabella mostra i requisiti minimi per caricare un modello dopo che l'oggetto state_dict è stato salvato nel formato float16. La tabella mostra anche i tipi di macchina suggeriti per caricare un modello utilizzando Vertex AI. La dimensione minima (e predefinita) del disco per Vertex AI è 100 GB, ma alcuni modelli potrebbero richiedere un disco più grande.

Nome modello Memoria necessaria Tipo di macchina Memoria VM Disco VM
google/flan-t5-small > 480 MB e2-standard-4 16 GB 100 GB
google/flan-t5-base > 1,5 GB e2-standard-4 16 GB 100 GB
google/flan-t5-large > 4,8 GB e2-standard-4 16 GB 100 GB
google/flan-t5-xl > 18 GB e2-highmem-4 32 GB 100 GB
google/flan-t5-xxl > 66 GB e2-highmem-16 128 GB 100 GB
google/flan-ul2 > 120 GB e2-highmem-16 128 GB 150 GB

Carica l'oggetto state_dict del modello in Cloud Storage utilizzando un job personalizzato Vertex AI:

python download_model.py vertex \
    --model-name="MODEL_NAME" \
    --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
    --job-name="Load MODEL_NAME" \
    --project="PROJECT_ID" \
    --bucket="BUCKET_NAME" \
    --location="LOCATION" \
    --machine-type="VERTEX_AI_MACHINE_TYPE" \
    --disk-size-gb="DISK_SIZE_GB"

Sostituisci quanto segue:

  • MODEL_NAME: il nome del modello, ad esempio google/flan-t5-xl.
  • VERTEX_AI_MACHINE_TYPE: il tipo di macchina su cui eseguire il job personalizzato di Vertex AI, ad esempio e2-highmem-4.
  • DISK_SIZE_GB: le dimensioni del disco per la VM, in GB. La dimensione minima è 100 GB.

A seconda delle dimensioni del modello, il caricamento potrebbe richiedere alcuni minuti. Per visualizzare lo stato, vai alla pagina Job personalizzati di Vertex AI.

Vai a Job personalizzati

esegui la pipeline.

Dopo aver caricato il modello, esegui la pipeline Dataflow. Per eseguire la pipeline, sia il modello che la memoria utilizzata da ogni worker devono rientrare nella memoria.

La tabella seguente mostra i tipi di macchine consigliati per eseguire una pipeline di inferenza.

Nome modello Tipo di macchina Memoria VM
google/flan-t5-small n2-highmem-2 16 GB
google/flan-t5-base n2-highmem-2 16 GB
google/flan-t5-large n2-highmem-4 32 GB
google/flan-t5-xl n2-highmem-4 32 GB
google/flan-t5-xxl n2-highmem-8 64 GB
google/flan-ul2 n2-highmem-16 128 GB

Esegui la pipeline:

python main.py \
    --messages-topic="projects/PROJECT_ID/topics/PROMPTS_TOPIC_ID" \
    --responses-topic="projects/PROJECT_ID/topics/RESPONSES_TOPIC_ID" \
    --model-name="MODEL_NAME" \
    --state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt" \
    --runner="DataflowRunner" \
    --project="PROJECT_ID" \
    --temp_location="gs://BUCKET_NAME/temp" \
    --region="REGION" \
    --machine_type="DATAFLOW_MACHINE_TYPE" \
    --requirements_file="requirements.txt" \
    --requirements_cache="skip" \
    --experiments="use_sibling_sdk_workers" \
    --experiments="no_use_multiple_sdk_containers"

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto
  • PROMPTS_TOPIC_ID: l'ID argomento per i prompt di input da inviare al modello
  • RESPONSES_TOPIC_ID: l'ID argomento per le risposte del modello
  • MODEL_NAME: il nome del modello, ad esempio google/flan-t5-xl
  • BUCKET_NAME: il nome del bucket
  • REGION: la regione in cui eseguire il deployment del job, ad esempio us-central1
  • DATAFLOW_MACHINE_TYPE: la VM su cui eseguire la pipeline, ad esempio n2-highmem-4

Per assicurarti che il modello venga caricato una sola volta per worker e non esaurisca la memoria, configura i worker in modo che utilizzino un singolo processo impostando l'opzione della pipeline --experiments=no_use_multiple_sdk_containers. Non devi limitare il numero di thread perché la trasformazione RunInference condivide lo stesso modello con più thread.

La pipeline in questo esempio viene eseguita con le CPU. Per un modello più grande, è necessario più tempo per elaborare ogni richiesta. Se hai bisogno di risposte più rapide, puoi attivare le GPU.

Per visualizzare lo stato della pipeline, vai alla pagina Job di Dataflow.

Vai a Job

Fare una domanda al modello

Dopo l'avvio della pipeline, fornisci un prompt al modello e ricevi una risposta.

  1. Invia il prompt pubblicando un messaggio su Pub/Sub. Utilizza il comando gcloud pubsub topics publish:

    gcloud pubsub topics publish PROMPTS_TOPIC_ID \
        --message="PROMPT_TEXT"
    

    Sostituisci PROMPT_TEXT con una stringa che contiene il prompt che vuoi fornire. Racchiudi il prompt tra virgolette.

    Utilizza un prompt personalizzato o prova uno dei seguenti esempi:

    • Translate to Spanish: My name is Luka
    • Complete this sentence: Once upon a time, there was a
    • Summarize the following text: Dataflow is a Google Cloud service that provides unified stream and batch data processing at scale. Use Dataflow to create data pipelines that read from one or more sources, transform the data, and write the data to a destination.
  2. Per ottenere la risposta, utilizza il comando gcloud pubsub subscriptions pull.

    A seconda delle dimensioni del modello, la generazione di una risposta potrebbe richiedere alcuni minuti. I modelli più grandi richiedono più tempo per essere implementati e per generare una risposta.

    gcloud pubsub subscriptions pull RESPONSES_SUBSCRIPTION_ID --auto-ack
    

    Sostituisci RESPONSES_SUBSCRIPTION_ID con l'ID abbonamento per le risposte del modello.

Esegui la pulizia

Per evitare che al tuo Account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial, elimina il progetto che contiene le risorse oppure mantieni il progetto ed elimina le singole risorse.

Elimina il progetto

    Elimina un progetto Google Cloud :

    gcloud projects delete PROJECT_ID

Elimina singole risorse

  1. Esci dall'ambiente virtuale Python:

    deactivate
  2. Arresta la pipeline:

    1. Elenca gli ID job per i job Dataflow in esecuzione e poi annota l'ID job per il job del tutorial:

      gcloud dataflow jobs list --region=REGION --status=active
    2. Annulla il job:

      gcloud dataflow jobs cancel JOB_ID --region=REGION
  3. Elimina il bucket e tutto ciò che contiene:

    gcloud storage rm gs://BUCKET_NAME --recursive
  4. Elimina gli argomenti e la sottoscrizione:

    gcloud pubsub topics delete PROMPTS_TOPIC_ID
    gcloud pubsub topics delete RESPONSES_TOPIC_ID
    gcloud pubsub subscriptions delete RESPONSES_SUBSCRIPTION_ID
  5. Revoca i ruoli che hai concesso all'account di servizio Compute Engine predefinito. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloud projects remove-iam-policy-binding PROJECT_ID --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com --role=SERVICE_ACCOUNT_ROLE
  6. (Facoltativo) Revoca i ruoli dal tuo Account Google.

    gcloud projects remove-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountUser
  7. (Facoltativo) Revoca le credenziali di autenticazione che hai creato ed elimina il file delle credenziali locale.

    gcloud auth application-default revoke
  8. (Facoltativo) Revoca le credenziali da gcloud CLI.

    gcloud auth revoke

Passaggi successivi