Ejecutar plantillas Flex en Dataflow

En esta página se describe cómo ejecutar un trabajo de Dataflow mediante una plantilla flexible. Las plantillas flexibles te permiten empaquetar un flujo de procesamiento de Dataflow para que puedas ejecutarlo sin tener un entorno de desarrollo de Apache Beam.

Permisos obligatorios

Cuando ejecutas una plantilla flexible, Dataflow crea una tarea. Para crear el trabajo, la cuenta de servicio de Dataflow necesita el siguiente permiso:

  • dataflow.serviceAgent

Cuando usas Dataflow por primera vez, el servicio te asigna este rol, por lo que no tienes que conceder este permiso.

De forma predeterminada, la cuenta de servicio de Compute Engine se usa en las VMs de inicio y de trabajador. La cuenta de servicio necesita los siguientes roles y funciones:

  • Administrador de objetos de Storage (roles/storage.objectAdmin)
  • Lector (roles/viewer)
  • Trabajador de Dataflow (roles/dataflow.worker)
  • Acceso de lectura y escritura al bucket de almacenamiento provisional
  • Acceso de lectura a la imagen de la plantilla Flex

Para conceder acceso de lectura y escritura al segmento de almacenamiento provisional, puedes usar el rol Administrador de objetos de Storage (roles/storage.objectAdmin). Para obtener más información, consulta Roles de gestión de identidades y accesos para Cloud Storage.

Para conceder acceso de lectura a la imagen de la plantilla de Flex, puedes usar el rol Lector de objetos de Storage (roles/storage.objectViewer). Para obtener más información, consulta Configurar el control de acceso.

Ejecutar una plantilla Flex

Para ejecutar una plantilla Flex, usa el comando gcloud dataflow flex-template run:

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://TEMPLATE_FILE_LOCATION \
  --region REGION \
  --staging-location STAGING_LOCATION \
  --temp-location TEMP_LOCATION \
  --parameters  PARAMETERS \
  --additional-user-labels LABELS \

Haz los cambios siguientes:

  • JOB_ID: el ID de tu trabajo

  • TEMPLATE_FILE_LOCATION: la ubicación del archivo de plantilla en Cloud Storage

  • REGION: la región en la que se ejecutará el trabajo de Dataflow

  • STAGING_LOCATION: la ubicación de Cloud Storage para almacenar archivos locales

  • TEMP_LOCATION: la ubicación de Cloud Storage en la que se escribirán los archivos temporales. Si no se define, se usará la ubicación de almacenamiento provisional de forma predeterminada.

  • PARAMETERS: parámetros de la canalización del trabajo

  • LABELS: opcional. Etiquetas adjuntas a tu trabajo, con el formato KEY_1=VALUE_1,KEY_2=VALUE_2,....

Durante el paso de almacenamiento provisional de la activación de una plantilla, Dataflow escribe archivos en la ubicación de almacenamiento provisional. Dataflow lee estos archivos de almacenamiento para crear el gráfico de la tarea. Durante el paso de ejecución, Dataflow escribe archivos en la ubicación temporal.

Definir opciones de flujo de procesamiento

Para definir las opciones de la canalización al ejecutar una plantilla Flex, usa las siguientes marcas en el comando gcloud dataflow flex-template run:

gcloud

Cuando transfieras parámetros de tipo List o Map, es posible que tengas que definir parámetros en un archivo YAML y usar la marca flags-file.

API

En el siguiente ejemplo se muestra cómo incluir opciones de canalización, experimentos y opciones adicionales en un cuerpo de solicitud:

{
  "jobName": "my-flex-template-job",
  "parameters": {
    "option_defined_in_metadata": "value"
  },
  "environment": {
    "additionalExperiments": [
      "use_runner_v2"
    ],
    "additionalPipelineOptions": {
      "common_pipeline_option": "value"
    }
  }
}

Cuando usas plantillas flexibles, puedes configurar algunas opciones de la canalización durante la inicialización, pero otras no se pueden cambiar. Si se sobrescriben los argumentos de línea de comandos que requiere la plantilla flex, es posible que el trabajo ignore, sobrescriba o descarte las opciones de la canalización que haya pasado el lanzador de plantillas. Es posible que la tarea no se inicie o que se inicie una tarea que no use la plantilla Flex. Para obtener más información, consulta No se ha podido leer el archivo de trabajo.

Durante la inicialización de la canalización, no cambies las siguientes opciones de canalización:

Java

  • runner
  • project
  • jobName
  • templateLocation
  • region

Python

  • runner
  • project
  • job_name
  • template_location
  • region

Go

  • runner
  • project
  • job_name
  • template_location
  • region

Bloquear las claves SSH del proyecto en las VMs que usan claves SSH basadas en metadatos

Puedes impedir que las VMs acepten claves SSH almacenadas en los metadatos del proyecto bloqueando las claves SSH del proyecto en las VMs. Usa la marca additional-experiments con la opción de servicio block_project_ssh_keys:

--additional-experiments=block_project_ssh_keys

Para obtener más información, consulta las opciones de servicio de Dataflow.

Actualizar una tarea de plantilla Flex

En la siguiente solicitud de ejemplo se muestra cómo actualizar un trabajo de streaming de plantillas mediante el método projects.locations.flexTemplates.launch. Si quieres usar la herramienta de línea de comandos gcloud, consulta Actualizar un flujo de procesamiento.

Si quieres actualizar una plantilla clásica, usa projects.locations.templates.launch en su lugar.

  1. Sigue los pasos para crear una tarea de streaming a partir de una plantilla flexible. Envía la siguiente solicitud HTTP POST con los valores modificados:

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
    {
        "launchParameter": {
          "update": true
          "jobName": "JOB_NAME",
          "parameters": {
            "input_subscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
            "output_table": "PROJECT_ID:DATASET.TABLE_NAME"
          },
        "containerSpecGcsPath": "STORAGE_PATH"
        },
    }
    
    • Sustituye PROJECT_ID por el ID del proyecto.
    • Sustituye REGION por la región de Dataflow de la tarea que vas a actualizar.
    • Sustituye JOB_NAME por el nombre exacto del trabajo que quieras actualizar.
    • Asigna parameters a tu lista de pares clave-valor. Los parámetros que se indican son específicos de este ejemplo de plantilla. Si usas una plantilla personalizada, modifica los parámetros según sea necesario. Si utilizas la plantilla de ejemplo, sustituye las siguientes variables.
      • Sustituye SUBSCRIPTION_NAME por el nombre de tu suscripción a Pub/Sub.
      • Sustituye DATASET por el nombre de tu conjunto de datos de BigQuery.
      • Sustituye TABLE_NAME por el nombre de tu tabla de BigQuery.
    • Sustituye STORAGE_PATH por la ubicación de Cloud Storage del archivo de plantilla. La ubicación debe empezar por gs://.
  2. Usa el parámetro environment para cambiar la configuración del entorno. Para obtener más información, consulta FlexTemplateRuntimeEnvironment.

  3. Opcional: Para enviar tu solicitud con curl (Linux, macOS o Cloud Shell), guarda la solicitud en un archivo JSON y, a continuación, ejecuta el siguiente comando:

    curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)"  https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
    

    Sustituye FILE_PATH por la ruta al archivo JSON que contiene el cuerpo de la solicitud.

  4. Usa la interfaz de monitorización de flujo de datos para comprobar que se ha creado un nuevo trabajo con el mismo nombre. Este trabajo tiene el estado Actualizado.

Siguientes pasos