Questa pagina mostra come scrivere, eseguire il deployment e attivare l'esecuzione di una pipeline utilizzando una funzione Cloud Functions basata su eventi con un trigger Cloud Pub/Sub. Segui questi passaggi:
Definisci una pipeline ML utilizzando l'SDK Kubeflow Pipelines (KFP) e compilala in un file YAML.
Carica la definizione della pipeline compilata in un bucket Cloud Storage.
Utilizza le funzioni Cloud Run per creare, configurare ed eseguire il deployment di una funzione attivata da un argomento Pub/Sub nuovo o esistente.
Definisci e compila una pipeline
Utilizzando l'SDK Kubeflow Pipelines, crea una pipeline pianificata e compila un file YAML.
Esempio hello-world-scheduled-pipeline
:
from kfp import compiler
from kfp import dsl
# A simple component that prints and returns a greeting string
@dsl.component
def hello_world(message: str) -> str:
greeting_str = f'Hello, {message}'
print(greeting_str)
return greeting_str
# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
hello_world_task = hello_world(greet_name)
# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
package_path='hello_world_scheduled_pipeline.yaml')
Carica il file YAML della pipeline compilato nel bucket Cloud Storage
Apri il browser Cloud Storage nella Google Cloud console.
Fai clic sul bucket Cloud Storage che hai creato quando hai configurato il progetto.
Utilizzando una cartella esistente o una nuova, carica il file YAML della pipeline compilato (in questo esempio
hello_world_scheduled_pipeline.yaml
) nella cartella selezionata.Fai clic sul file YAML caricato per accedere ai dettagli. Copia l'URI gsutil per un utilizzo successivo.
Crea una funzione Cloud Run con un trigger Pub/Sub
Visita la pagina Cloud Run Functions nella console.
Fai clic sul pulsante Crea funzione.
Nella sezione Nozioni di base, assegna un nome alla funzione (ad esempio
my-scheduled-pipeline-function
).Nella sezione Trigger, seleziona Cloud Pub/Sub come tipo di trigger.
Nell'elenco Seleziona un argomento Cloud Pub/Sub, fai clic su Crea un argomento.
Nella casella Crea un argomento, assegna un nome al nuovo argomento (ad esempio
my-scheduled-pipeline-topic
) e seleziona Crea argomento.Lascia invariati i valori predefiniti degli altri campi e fai clic su Salva per salvare la configurazione della sezione Trigger.
Lascia invariati i valori predefiniti degli altri campi e fai clic su Avanti per passare alla sezione Codice.
In Runtime, seleziona Python 3.7.
In Entry point, inserisci "subscribe" (il nome della funzione del punto di ingresso del codice di esempio).
In Codice sorgente, seleziona Editor incorporato se non è già selezionato.
Nel file
main.py
, aggiungi il seguente codice:import base64 import json from google.cloud import aiplatform PROJECT_ID = 'your-project-id' # <---CHANGE THIS REGION = 'your-region' # <---CHANGE THIS PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS def subscribe(event, context): """Triggered from a message on a Cloud Pub/Sub topic. Args: event (dict): Event payload. context (google.cloud.functions.Context): Metadata for the event. """ # decode the event payload string payload_message = base64.b64decode(event['data']).decode('utf-8') # parse payload string into JSON object payload_json = json.loads(payload_message) # trigger pipeline run with payload trigger_pipeline_run(payload_json) def trigger_pipeline_run(payload_json): """Triggers a pipeline run Args: payload_json: expected in the following format: { "pipeline_spec_uri": "<path-to-your-compiled-pipeline>", "parameter_values": { "greet_name": "<any-greet-string>" } } """ pipeline_spec_uri = payload_json['pipeline_spec_uri'] parameter_values = payload_json['parameter_values'] # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri aiplatform.init( project=PROJECT_ID, location=REGION, ) job = aiplatform.PipelineJob( display_name='hello-world-pipeline-cloud-function-invocation', template_path=pipeline_spec_uri, pipeline_root=PIPELINE_ROOT, enable_caching=False, parameter_values=parameter_values ) # Submit the PipelineJob job.submit()
Sostituisci quanto segue:
- PROJECT_ID: Il Google Cloud progetto in cui viene eseguita questa pipeline.
- REGION: la regione in cui viene eseguita questa pipeline.
- PIPELINE_ROOT: specifica un URI Cloud Storage a cui può accedere l'account di servizio dei tuoi pipeline. Gli artefatti delle esecuzioni della pipeline sono archiviati nella radice della pipeline.
Nel file
requirements.txt
, sostituisci i contenuti con i seguenti requisiti del pacchetto:google-api-python-client>=1.7.8,<2 google-cloud-aiplatform
Fai clic su Esegui il deployment per eseguire il deployment della funzione.
Passaggi successivi
- Scopri di più su Google Cloud Pub/Sub.
- Visualizza e analizza i risultati della pipeline.
- Scopri come creare trigger in Cloud Run dagli eventi Pub/Sub.
- Per visualizzare esempi di codice per l'utilizzo di Pub/Sub, consulta il Google Cloud browser degli esempi.