Cargar datos de Cloud Storage en BigQuery con Workflows

En este tutorial se muestra cómo ejecutar de forma fiable flujos de trabajo sin servidor mediante Workflows, Cloud Run functions, y Firestore para cargar datos sin procesar, como registros de eventos, desde Cloud Storage hasta BigQuery. Las plataformas de analíticas suelen tener una herramienta de orquestación para cargar periódicamente datos en BigQuery mediante trabajos de BigQuery y, a continuación, transformar los datos para proporcionar métricas empresariales mediante instrucciones SQL, incluidas las instrucciones del lenguaje procedimental de BigQuery. Este tutorial está dirigido a desarrolladores y arquitectos que quieran crear flujos de procesamiento de datos basados en eventos sin servidor. En este tutorial se da por supuesto que tienes conocimientos de YAML, SQL y Python.

Arquitectura

En el siguiente diagrama se muestra la arquitectura general de una canalización de extracción, carga y transformación (ELT) sin servidor que usa Workflows.

Proceso de extracción, carga y transformación.

En el diagrama anterior, se muestra una plataforma de comercio que recoge periódicamente eventos de ventas como archivos de varias tiendas y, a continuación, escribe los archivos en un segmento de Cloud Storage. Los eventos se usan para proporcionar métricas empresariales importándolos y procesándolos en BigQuery. Esta arquitectura proporciona un sistema de orquestación fiable y sin servidor para importar tus archivos a BigQuery, y se divide en los dos módulos siguientes:

  • Lista de archivos: mantiene la lista de archivos sin procesar añadidos a un segmento de Cloud Storage en una colección de Firestore. Este módulo funciona a través de una función de Cloud Run que se activa mediante un evento de almacenamiento Object Finalize, que se genera cuando se añade un archivo nuevo al segmento de Cloud Storage. El nombre de archivo se añade a la files matriz de la colección llamada new en Firestore.
  • Flujo de trabajo: ejecuta los flujos de trabajo programados. Cloud Scheduler activa un flujo de trabajo que ejecuta una serie de pasos según una sintaxis basada en YAML para orquestar la carga y, a continuación, transformar los datos en BigQuery llamando a funciones de Cloud Run. Los pasos del flujo de trabajo llaman a funciones de Cloud Run para ejecutar las siguientes tareas:

    • Crea e inicia una tarea de carga de BigQuery.
    • Sondea el estado de la tarea de carga.
    • Crea e inicia la tarea de consulta de transformación.
    • Sondea el estado del trabajo de transformación.

Usar transacciones para mantener la lista de archivos nuevos en Firestore ayuda a asegurar que no se pierda ningún archivo cuando un flujo de trabajo los importe a BigQuery. Las ejecuciones independientes del flujo de trabajo se hacen idempotentes almacenando los metadatos y el estado de los trabajos en Firestore.

Prepara tu entorno

Para preparar tu entorno, crea una base de datos de Firestore, clona las muestras de código del repositorio de GitHub, crea recursos con Terraform, edita el archivo YAML de Workflows e instala los requisitos del generador de archivos.

  1. Para crear una base de datos de Firestore, sigue estos pasos:

    1. En la Google Cloud consola, ve a la página de Firestore.

      Ir a Firestore

    2. Haz clic en Seleccionar modo nativo.

    3. En el menú Selecciona una ubicación, elige la región en la que quieras alojar la base de datos de Firestore. Te recomendamos que elijas una región que esté cerca de tu ubicación física.

    4. Haz clic en Crear base de datos.

  2. En Cloud Shell, clona el repositorio de origen:

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/workflows-demos
    cd workflows-demos/workflows-bigquery-load
    
  3. En Cloud Shell, crea los siguientes recursos con Terraform:

    terraform init
    terraform apply \
        -var project_id=PROJECT_ID \
        -var region=REGION \
        -var zone=ZONE \
        --auto-approve
    

    Haz los cambios siguientes:

    • PROJECT_ID: tu ID de proyecto Google Cloud
    • REGION: una ubicación geográfica Google Cloud específica para alojar tus recursos, como us-central1
    • ZONE: una ubicación dentro de una región para alojar tus recursos, por ejemplo, us-central1-b

    Debería ver un mensaje similar al siguiente: Apply complete! Resources: 7 added, 0 changed, 1 destroyed.

    Terraform puede ayudarte a crear, cambiar y actualizar la infraestructura a gran escala de forma segura y predecible. En tu proyecto se crearán los siguientes recursos:

    • Cuentas de servicio con los privilegios necesarios para garantizar el acceso seguro a tus recursos.
    • Un conjunto de datos de BigQuery llamado serverless_elt_dataset y una tabla llamada word_count para cargar los archivos entrantes.
    • Un segmento de Cloud Storage llamado ${project_id}-ordersbucket para los archivos de entrada de staging.
    • Las cinco funciones de Cloud Run siguientes:
      • file_add_handler añade el nombre de los archivos que se han añadido al segmento de Cloud Storage a la colección de Firestore.
      • create_job crea una tarea de carga de BigQuery y asocia los archivos de la colección de Firebase con la tarea.
      • create_query crea una nueva tarea de consulta de BigQuery.
      • poll_bigquery_job obtiene el estado de una tarea de BigQuery.
      • run_bigquery_job inicia una tarea de BigQuery.
  4. Obtén las URLs de las funciones de create_job, create_query, poll_job y run_bigquery_job de Cloud Run que has implementado en el paso anterior.

    gcloud functions describe create_job | grep url
    gcloud functions describe poll_bigquery_job | grep url
    gcloud functions describe run_bigquery_job | grep url
    gcloud functions describe create_query | grep url
    

    El resultado debería ser similar al siguiente:

    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/poll_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/run_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_query
    

    Anota estas URLs, ya que las necesitarás cuando implementes tu flujo de trabajo.

Crear y desplegar un flujo de trabajo

  1. En Cloud Shell, abre el archivo de origen del flujo de trabajo, workflow.yaml:

    main:
      steps:
        - constants:
            assign:
              - create_job_url: CREATE_JOB_URL
              - poll_job_url: POLL_BIGQUERY_JOB_URL
              - run_job_url: RUN_BIGQUERY_JOB_URL
              - create_query_url: CREATE_QUERY_URL
              - region: BQ_REGION
              - table_name: BQ_DATASET_TABLE_NAME
            next: createJob
    
        - createJob:
            call: http.get
            args:
              url: ${create_job_url}
              auth:
                  type: OIDC
              query:
                  region: ${region}
                  table_name: ${table_name}
            result: job
            next: setJobId
    
        - setJobId:
            assign:
              - job_id: ${job.body.job_id}
            next: jobCreateCheck
    
        - jobCreateCheck:
            switch:
              - condition: ${job_id == Null}
                next: noOpJob
            next: runLoadJob
    
        - runLoadJob:
            call: runBigQueryJob
            args:
                job_id: ${job_id}
                run_job_url: ${run_job_url}
                poll_job_url: ${poll_job_url}
            result: jobStatus
            next: loadRunCheck
    
        - loadRunCheck:
            switch:
              - condition: ${jobStatus == 2}
                next: createQueryJob
            next: failedLoadJob
    
        - createQueryJob:
            call: http.get
            args:
              url: ${create_query_url}
              query:
                  qs: "select count(*) from serverless_elt_dataset.word_count"
                  region: "US"
              auth:
                  type: OIDC
            result: queryjob
            next: setQueryJobId
    
        - setQueryJobId:
            assign:
              - qid: ${queryjob.body.job_id}
            next: queryCreateCheck
    
        - queryCreateCheck:
            switch:
              - condition: ${qid == Null}
                next: failedQueryJob
            next: runQueryJob
    
        - runQueryJob:
            call: runBigQueryJob
            args:
              job_id: ${qid}
              run_job_url: ${run_job_url}
              poll_job_url: ${poll_job_url}
            result: queryJobState
            next: runQueryCheck
    
        - runQueryCheck:
            switch:
              - condition: ${queryJobState == 2}
                next: allDone
            next: failedQueryJob
    
        - noOpJob:
            return: "No files to import"
            next: end
    
        - allDone:
            return: "All done!"
            next: end
    
        - failedQueryJob:
            return: "Query job failed"
            next: end
    
        - failedLoadJob:
            return: "Load job failed"
            next: end
    
    
    runBigQueryJob:
      params: [job_id, run_job_url, poll_job_url]
      steps:
        - startBigQueryJob:
            try:
              call: http.get
              args:
                  url: ${run_job_url}
                  query:
                    job_id: ${job_id}
                  auth:
                    type: OIDC
                  timeout: 600
              result: submitJobState
            retry: ${http.default_retry}
            next: validateSubmit
    
        - validateSubmit:
            switch:
              - condition: ${submitJobState.body.status == 1}
                next: sleepAndPollLoad
            next: returnState
    
        - returnState:
            return: ${submitJobState.body.status}
    
        - sleepAndPollLoad:
            call: sys.sleep
            args:
              seconds: 5
            next: pollJob
    
        - pollJob:
            try:
              call: http.get
              args:
                url: ${poll_job_url}
                query:
                  job_id: ${job_id}
                auth:
                  type: OIDC
                timeout: 600
              result: pollJobState
            retry:
              predicate: ${http.default_retry_predicate}
              max_retries: 10
              backoff:
                initial_delay: 1
                max_delay: 60
                multiplier: 2
            next: stateCheck
    
        - stateCheck:
            switch:
              - condition: ${pollJobState.body.status == 2}
                return: ${pollJobState.body.status}
              - condition: ${pollJobState.body.status == 3}
                return: ${pollJobState.body.status}
            next: sleepAndPollLoad

    Haz los cambios siguientes:

    • CREATE_JOB_URL: URL de la función para crear un nuevo trabajo.
    • POLL_BIGQUERY_JOB_URL: la URL de la función para sondear el estado de un trabajo en curso
    • RUN_BIGQUERY_JOB_URL: la URL de la función para iniciar una tarea de carga de BigQuery
    • CREATE_QUERY_URL: la URL de la función para iniciar un trabajo de consulta de BigQuery
    • BQ_REGION: la región de BigQuery en la que se almacenan los datos (por ejemplo, US
    • BQ_DATASET_TABLE_NAME: nombre de la tabla del conjunto de datos de BigQuery en formato PROJECT_ID.serverless_elt_dataset.word_count
  2. Implementa el archivo workflow:

    gcloud workflows deploy WORKFLOW_NAME \
        --location=WORKFLOW_REGION \
        --description='WORKFLOW_DESCRIPTION' \
        --service-account=workflow-runner@PROJECT_ID.iam.gserviceaccount.com \
        --source=workflow.yaml
    

    Haz los cambios siguientes:

    • WORKFLOW_NAME: nombre único del flujo de trabajo
    • WORKFLOW_REGION: la región en la que se implementa el flujo de trabajo (por ejemplo, us-central1)
    • WORKFLOW_DESCRIPTION: la descripción del flujo de trabajo
  3. Crea un entorno virtual de Python 3 e instala los requisitos del generador de archivos:

    sudo apt-get install -y python3-venv
    python3 -m venv env
    . env/bin/activate
    cd generator
    pip install -r requirements.txt
    

Generar archivos para importar

La secuencia de comandos de Python gen.py genera contenido aleatorio en formato Avro. El esquema es el mismo que el de la tabla word_count de BigQuery. Estos archivos Avro se copian en el segmento de Cloud Storage especificado.

En Cloud Shell, genera los archivos:

python gen.py -p PROJECT_ID \
    -o PROJECT_ID-ordersbucket \
    -n RECORDS_PER_FILE \
    -f NUM_FILES \
    -x FILE_PREFIX

Haz los cambios siguientes:

  • RECORDS_PER_FILE: número de registros de un solo archivo
  • NUM_FILES: número total de archivos que se van a subir
  • FILE_PREFIX: el prefijo de los nombres de los archivos generados.

Ver entradas de archivos en Firestore

Cuando los archivos se copian en Cloud Storage, se activa la función handle_new_fileCloud Run. Esta función añade la lista de archivos al array de la lista de archivos del documento new de la colección jobs de Firestore.

Para ver la lista de archivos, en la Google Cloud consola, ve a la página Datos de Firestore.

Ir a Datos

Lista de archivos añadidos a la colección.

Activar el flujo de trabajo

Workflows vincula una serie de tareas sin servidor deGoogle Cloud y servicios de API. Los pasos individuales de este flujo de trabajo se ejecutan como funciones de Cloud Run y el estado se almacena en Firestore. Todas las llamadas a las funciones de Cloud Run se autentican mediante la cuenta de servicio del flujo de trabajo.

En Cloud Shell, ejecuta el flujo de trabajo:

gcloud workflows execute WORKFLOW_NAME

En el siguiente diagrama se muestran los pasos que se siguen en el flujo de trabajo:

Pasos utilizados en el flujo de trabajo principal y en el secundario.

El flujo de trabajo se divide en dos partes: el flujo de trabajo principal y el subflujo de trabajo. El flujo de trabajo principal gestiona la creación de tareas y la ejecución condicional, mientras que el subflujo de trabajo ejecuta una tarea de BigQuery. El flujo de trabajo realiza las siguientes operaciones:

  • La función create_job Cloud Run crea un objeto de trabajo, obtiene la lista de archivos añadidos a Cloud Storage desde el documento de Firestore y asocia los archivos al trabajo de carga. Si no hay archivos que cargar, la función no crea ningún trabajo.
  • La función create_query de Cloud Run toma la consulta que se debe ejecutar junto con la región de BigQuery en la que se debe ejecutar la consulta. La función crea el trabajo en Firestore y devuelve el ID del trabajo.
  • La función run_bigquery_job Cloud Run obtiene el ID del trabajo que se debe ejecutar y, a continuación, llama a la API de BigQuery para enviar el trabajo.
  • En lugar de esperar a que se complete el trabajo en la función de Cloud Run, puedes sondear periódicamente el estado del trabajo.
    • La función poll_bigquery_job de Cloud Run proporciona el estado del trabajo. Se llama repetidamente hasta que se completa el trabajo.
    • Para añadir un retraso entre las llamadas a la función de poll_bigquery_job Cloud Run, se llama a una sleep rutina desde Workflows.

Ver el estado del trabajo

Puedes ver la lista de archivos y el estado del trabajo.

  1. En la consola deGoogle Cloud , ve a la página Datos de Firestore.

    Ir a Datos

  2. Se genera un identificador único (UUID) para cada trabajo. Para ver el job_type y el status, haz clic en el ID de la tarea. Cada trabajo puede tener uno de los siguientes tipos y estados:

    • job_type: el tipo de trabajo que ejecuta el flujo de trabajo con uno de los siguientes valores:

      • 0: Carga datos en BigQuery.
      • 1: Ejecuta una consulta en BigQuery.
    • status: el estado actual del trabajo, con uno de los siguientes valores:

      • 0: La tarea se ha creado, pero no se ha iniciado.
      • 1: La tarea se está ejecutando.
      • 2: El trabajo ha completado su ejecución correctamente.
      • 3: Se ha producido un error y el trabajo no se ha completado correctamente.

    El objeto de trabajo también contiene atributos de metadatos, como la región del conjunto de datos de BigQuery, el nombre de la tabla de BigQuery y, si se trata de un trabajo de consulta, la cadena de consulta que se está ejecutando.

Lista de archivos con el estado de la tarea resaltado.

Ver datos en BigQuery

Para confirmar que el trabajo de ELT se ha realizado correctamente, compruebe que los datos aparecen en la tabla.

  1. En la Google Cloud consola, ve a la página Editor de BigQuery.

    Ir al editor

  2. Haz clic en la tabla serverless_elt_dataset.word_count.

  3. Haga clic en la pestaña Vista previa.

    Pestaña de vista previa que muestra los datos en una tabla.

Programar el flujo de trabajo

Para ejecutar el flujo de trabajo periódicamente según una programación, puedes usar Cloud Scheduler.