Crie os seus próprios componentes de pipeline

É comum que, quando executa um componente, queira ver não só o link para a tarefa do componente que está a ser iniciada, mas também o link para os recursos de nuvem subjacentes, como as tarefas de previsão em lote do Vertex ou as tarefas do Dataflow.

O gcp_resource proto é um parâmetro especial que pode usar no seu componente para permitir que a Google Cloud consola forneça uma vista personalizada dos registos e do estado do recurso na consola do Vertex AI Pipelines.

Produzir o parâmetro gcp_resource

Usar um componente baseado em contentores

Primeiro, tem de definir o parâmetro gcp_resource no seu componente, conforme mostrado no ficheiro de exemplo component.py seguinte:

Python

Para saber como instalar ou atualizar o SDK Vertex AI para Python, consulte o artigo Instale o SDK Vertex AI para Python. Para mais informações, consulte a Python documentação de referência da API.

# Copyright 2023 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List

from google_cloud_pipeline_components import _image
from google_cloud_pipeline_components import _placeholders
from kfp.dsl import container_component
from kfp.dsl import ContainerSpec
from kfp.dsl import OutputPath


@container_component
def dataflow_python(
    python_module_path: str,
    temp_location: str,
    gcp_resources: OutputPath(str),
    location: str = 'us-central1',
    requirements_file_path: str = '',
    args: List[str] = [],
    project: str = _placeholders.PROJECT_ID_PLACEHOLDER,
):
  # fmt: off
  """Launch a self-executing Beam Python file on Google Cloud using the
  Dataflow Runner.

  Args:
      location: Location of the Dataflow job. If not set, defaults to `'us-central1'`.
      python_module_path: The GCS path to the Python file to run.
      temp_location: A GCS path for Dataflow to stage temporary job files created during the execution of the pipeline.
      requirements_file_path: The GCS path to the pip requirements file.
      args: The list of args to pass to the Python file. Can include additional parameters for the Dataflow Runner.
      project: Project to create the Dataflow job. Defaults to the project in which the PipelineJob is run.

  Returns:
      gcp_resources: Serialized gcp_resources proto tracking the Dataflow job. For more details, see https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/proto/README.md.
  """
  # fmt: on
  return ContainerSpec(
      image=_image.GCPC_IMAGE_TAG,
      command=[
          'python3',
          '-u',
          '-m',
          'google_cloud_pipeline_components.container.v1.dataflow.dataflow_launcher',
      ],
      args=[
          '--project',
          project,
          '--location',
          location,
          '--python_module_path',
          python_module_path,
          '--temp_location',
          temp_location,
          '--requirements_file_path',
          requirements_file_path,
          '--args',
          args,
          '--gcp_resources',
          gcp_resources,
      ],
  )

Em seguida, no contentor, instale o Google Cloud pacote de componentes do pipeline:

pip install --upgrade google-cloud-pipeline-components

Em seguida, no código Python, defina o recurso como um parâmetro gcp_resource:

Python

Para saber como instalar ou atualizar o SDK Vertex AI para Python, consulte o artigo Instale o SDK Vertex AI para Python. Para mais informações, consulte a Python documentação de referência da API.

from google_cloud_pipeline_components.proto.gcp_resources_pb2 import GcpResources
from google.protobuf.json_format import MessageToJson

dataflow_resources = GcpResources()
dr = dataflow_resources.resources.add()
dr.resource_type='DataflowJob'
dr.resource_uri='https://dataflow.googleapis.com/v1b3/projects/[your-project]/locations/us-east1/jobs/[dataflow-job-id]'

with open(gcp_resources, 'w') as f:
    f.write(MessageToJson(dataflow_resources))

Usar um componente Python

Em alternativa, pode devolver o parâmetro de saída gcp_resources como faria com qualquer parâmetro de saída de string:

@dsl.component(
    base_image='python:3.9',
    packages_to_install=['google-cloud-pipeline-components==2.21.0'],
)
def launch_dataflow_component(project: str, location:str) -> NamedTuple("Outputs",  [("gcp_resources", str)]):
  # Launch the dataflow job
  dataflow_job_id = [dataflow-id]
  dataflow_resources = GcpResources()
  dr = dataflow_resources.resources.add()
  dr.resource_type='DataflowJob'
  dr.resource_uri=f'https://dataflow.googleapis.com/v1b3/projects/{project}/locations/{location}/jobs/{dataflow_job_id}'
  gcp_resources=MessageToJson(dataflow_resources)
  return gcp_resources

Valores resource_type suportados

Pode definir o resource_type como uma string arbitrária, mas apenas os seguintes tipos têm links na consola Google Cloud :

  • BatchPredictionJob
  • BigQueryJob
  • CustomJob
  • DataflowJob
  • HyperparameterTuningJob

Escreva um componente para cancelar os recursos subjacentes

Quando uma tarefa de pipeline é cancelada, o comportamento predefinido é que os recursos Google Cloud subjacentes continuem a ser executados. Não são canceladas automaticamente. Para alterar este comportamento, deve anexar um controlador SIGTERM à tarefa do pipeline. Um bom local para o fazer é imediatamente antes de um ciclo de sondagem para uma tarefa que possa ser executada durante muito tempo.

O cancelamento foi implementado em vários Google Cloud componentes do pipeline, incluindo:

  • Tarefa de previsão em lote
  • Tarefa do BigQuery ML
  • Tarefa personalizada
  • Google Cloud Sem servidor para tarefas de lote do Apache Spark
  • Tarefa de aperfeiçoamento de hiperparâmetros

Para mais informações, incluindo um código de exemplo que mostra como anexar um controlador SIGTERM, consulte os seguintes links do GitHub:

Considere o seguinte ao implementar o controlador SIGTERM:

  • A propagação do cancelamento só funciona depois de o componente estar em execução durante alguns minutos. Isto deve-se normalmente a tarefas de arranque em segundo plano que têm de ser processadas antes de os controladores de sinais do Python serem chamados.
  • Alguns Google Cloud recursos podem não ter o cancelamento implementado. Por exemplo, a criação ou a eliminação de um ponto final ou de um modelo da Vertex AI pode criar uma operação de longa duração que aceite um pedido de cancelamento através da respetiva API REST, mas não implemente a própria operação de cancelamento.