Ejecutar varias tareas de BigQuery en paralelo

BigQuery aloja varios conjuntos de datos públicos que están disponibles para que el público en general los consulte. En este tutorial, crearás un flujo de trabajo que ejecuta varias tareas de consulta de BigQuery en paralelo, lo que demuestra una mejora del rendimiento en comparación con la ejecución de las tareas de forma secuencial, una después de otra.

Ejecutar una tarea de consulta de BigQuery

En BigQuery, puedes ejecutar una tarea de consulta interactiva (bajo demanda). Para obtener más información, consulta Ejecutar tareas de consulta interactivas y por lotes.

Consola

  1. En la Google Cloud consola, ve a la página BigQuery.

    Ir a BigQuery

  2. Introduce la siguiente consulta de SQL de BigQuery en el área de texto Editor de consultas:

    SELECT TITLE, SUM(views)
    FROM `bigquery-samples.wikipedia_pageviews.201207h`
    GROUP BY TITLE
    ORDER BY SUM(views) DESC
    LIMIT 100
    
  3. Haz clic en Ejecutar.

bq

En el terminal, introduce el siguiente comando bq query para ejecutar una consulta interactiva con la sintaxis de SQL estándar:

bq query \
--use_legacy_sql=false \
'SELECT
  TITLE, SUM(views)
FROM
  `bigquery-samples.wikipedia_pageviews.201207h`
GROUP BY
  TITLE
ORDER BY
  SUM(views) DESC
LIMIT 100'

Ejecuta una consulta que devuelve los 100 títulos de Wikipedia con más vistas en un mes concreto y escribe el resultado en una tabla temporal.

Anota cuánto tiempo tarda en ejecutarse la consulta.

Desplegar un flujo de trabajo que ejecute varias consultas de forma secuencial

Una definición de flujo de trabajo se compone de una serie de pasos descritos mediante la sintaxis de Workflows. Después de crear un flujo de trabajo, debes implementarlo para que se pueda ejecutar. El paso de implementación también valida que el archivo de origen se puede ejecutar.

El siguiente flujo de trabajo define una lista de cinco tablas en las que se va a ejecutar una consulta mediante el conector de BigQuery de Workflows. Las consultas se ejecutan de forma secuencial, una tras otra, y los títulos más vistos de cada tabla se guardan en un mapa de resultados.

Consola

  1. En la Google Cloud consola, ve a la página Flujos de trabajo:

    Ir a Workflows

  2. Haz clic en Crear.

  3. Escribe un nombre para el nuevo flujo de trabajo, como workflow-serial-bqjobs.

  4. Elige una región adecuada, como us-central1.

  5. Selecciona la cuenta de servicio que has creado anteriormente.

    Ya deberías haber asignado los roles de gestión de identidades y accesos BigQuery > Usuario de tareas de BigQuery y Logging > Escritor de registros a la cuenta de servicio.

  6. Haz clic en Siguiente.

  7. En el editor del flujo de trabajo, introduce la siguiente definición para tu flujo de trabajo:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            for:
                value: table
                in: ${tables}
                steps:
                - logTable:
                    call: sys.log
                    args:
                        text: ${"Running query for table " + table}
                - runQuery:
                    call: googleapis.bigquery.v2.jobs.query
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            useLegacySql: false
                            useQueryCache: false
                            timeoutMs: 30000
                            # Find top 100 titles with most views on Wikipedia
                            query: ${
                                "SELECT TITLE, SUM(views)
                                FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                WHERE LENGTH(TITLE) > 10
                                GROUP BY TITLE
                                ORDER BY SUM(VIEWS) DESC
                                LIMIT 100"
                                }
                    result: queryResult
                - returnResult:
                    assign:
                        # Return the top title from each table
                        - results[table]: {}
                        - results[table].title: ${queryResult.rows[0].f[0].v}
                        - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  8. Haz clic en Desplegar.

gcloud

  1. Abre un terminal y crea un archivo de código fuente para tu flujo de trabajo:

    touch workflow-serial-bqjobs.yaml
  2. Copia el siguiente flujo de trabajo en tu archivo de código fuente:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            for:
                value: table
                in: ${tables}
                steps:
                - logTable:
                    call: sys.log
                    args:
                        text: ${"Running query for table " + table}
                - runQuery:
                    call: googleapis.bigquery.v2.jobs.query
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            useLegacySql: false
                            useQueryCache: false
                            timeoutMs: 30000
                            # Find top 100 titles with most views on Wikipedia
                            query: ${
                                "SELECT TITLE, SUM(views)
                                FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                WHERE LENGTH(TITLE) > 10
                                GROUP BY TITLE
                                ORDER BY SUM(VIEWS) DESC
                                LIMIT 100"
                                }
                    result: queryResult
                - returnResult:
                    assign:
                        # Return the top title from each table
                        - results[table]: {}
                        - results[table].title: ${queryResult.rows[0].f[0].v}
                        - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  3. Para desplegar el flujo de trabajo, introduce el siguiente comando:

    gcloud workflows deploy workflow-serial-bqjobs \
       --source=workflow-serial-bqjobs.yaml \
       --service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com

    Sustituye MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com por el correo de la cuenta de servicio que has creado anteriormente.

    Ya deberías haber concedido los roles de gestión de identidades y accesos roles/bigquery.jobUser y roles/logging.logWriter a la cuenta de servicio.

Ejecutar el flujo de trabajo y realizar varias consultas de forma secuencial

Al ejecutar un flujo de trabajo, se ejecuta la definición del flujo de trabajo actual asociada al flujo de trabajo.

Consola

  1. En la Google Cloud consola, ve a la página Flujos de trabajo:

    Ir a Workflows

  2. En la página Flujos de trabajo, selecciona el flujo de trabajo workflow-serial-bqjobs para ir a su página de detalles.

  3. En la página Detalles del flujo de trabajo, haz clic en Ejecutar.

  4. Vuelve a hacer clic en Ejecutar.

  5. Consulta los resultados del flujo de trabajo en el panel Información.

gcloud

  1. Abre la terminal.

  2. Ejecuta el flujo de trabajo:

     gcloud workflows run workflow-serial-bqjob

La ejecución del flujo de trabajo debería durar aproximadamente un minuto o cinco veces el tiempo de ejecución anterior. El resultado incluirá cada tabla y tendrá un aspecto similar al siguiente:

{
  "201201h": {
    "title": "Special:Search",
    "views": "14591339"
  },
  "201202h": {
    "title": "Special:Search",
    "views": "132765420"
  },
  "201203h": {
    "title": "Special:Search",
    "views": "123316818"
  },
  "201204h": {
    "title": "Special:Search",
    "views": "116830614"
  },
  "201205h": {
    "title": "Special:Search",
    "views": "131357063"
  }
}

Desplegar y ejecutar un flujo de trabajo que ejecute varias consultas en paralelo

En lugar de ejecutar cinco consultas de forma secuencial, puedes ejecutarlas en paralelo haciendo algunos cambios:

 - runQueries:
    parallel:
        shared: [results]
        for:
            value: table
            in: ${tables}
  • Un paso parallel permite que cada iteración del bucle for se ejecute en paralelo.
  • La variable results se declara como shared, lo que permite que una rama pueda escribir en ella y que se le pueda añadir el resultado de cada rama.

Consola

  1. En la Google Cloud consola, ve a la página Flujos de trabajo:

    Ir a Workflows

  2. Haz clic en Crear.

  3. Escribe un nombre para el nuevo flujo de trabajo, como workflow-parallel-bqjobs.

  4. Elige una región adecuada, como us-central1.

  5. Selecciona la cuenta de servicio que has creado anteriormente.

  6. Haz clic en Siguiente.

  7. En el editor del flujo de trabajo, introduce la siguiente definición para tu flujo de trabajo:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            parallel:
                shared: [results]
                for:
                    value: table
                    in: ${tables}
                    steps:
                    - logTable:
                        call: sys.log
                        args:
                            text: ${"Running query for table " + table}
                    - runQuery:
                        call: googleapis.bigquery.v2.jobs.query
                        args:
                            projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                            body:
                                useLegacySql: false
                                useQueryCache: false
                                timeoutMs: 30000
                                # Find top 100 titles with most views on Wikipedia
                                query: ${
                                    "SELECT TITLE, SUM(views)
                                    FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                    WHERE LENGTH(TITLE) > 10
                                    GROUP BY TITLE
                                    ORDER BY SUM(VIEWS) DESC
                                    LIMIT 100"
                                    }
                        result: queryResult
                    - returnResult:
                        assign:
                            # Return the top title from each table
                            - results[table]: {}
                            - results[table].title: ${queryResult.rows[0].f[0].v}
                            - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  8. Haz clic en Desplegar.

  9. En la página Detalles del flujo de trabajo, haz clic en Ejecutar.

  10. Vuelve a hacer clic en Ejecutar.

  11. Consulta los resultados del flujo de trabajo en el panel Información.

gcloud

  1. Abre un terminal y crea un archivo de código fuente para tu flujo de trabajo:

    touch workflow-parallel-bqjobs.yaml
  2. Copia el siguiente flujo de trabajo en tu archivo de código fuente:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            parallel:
                shared: [results]
                for:
                    value: table
                    in: ${tables}
                    steps:
                    - logTable:
                        call: sys.log
                        args:
                            text: ${"Running query for table " + table}
                    - runQuery:
                        call: googleapis.bigquery.v2.jobs.query
                        args:
                            projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                            body:
                                useLegacySql: false
                                useQueryCache: false
                                timeoutMs: 30000
                                # Find top 100 titles with most views on Wikipedia
                                query: ${
                                    "SELECT TITLE, SUM(views)
                                    FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                    WHERE LENGTH(TITLE) > 10
                                    GROUP BY TITLE
                                    ORDER BY SUM(VIEWS) DESC
                                    LIMIT 100"
                                    }
                        result: queryResult
                    - returnResult:
                        assign:
                            # Return the top title from each table
                            - results[table]: {}
                            - results[table].title: ${queryResult.rows[0].f[0].v}
                            - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  3. Para desplegar el flujo de trabajo, introduce el siguiente comando:

    gcloud workflows deploy workflow-parallell-bqjobs \
       --source=workflow-parallel-bqjobs.yaml \
       --service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com

    Sustituye MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com por el correo de la cuenta de servicio que has creado anteriormente.

  4. Ejecuta el flujo de trabajo:

     gcloud workflows run workflow-parallel-bqjobs

El resultado será similar al de la salida anterior, pero la ejecución del flujo de trabajo debería tardar unos 20 segundos o menos.