Ejecuta varios trabajos de BigQuery en paralelo

BigQuery aloja varios conjuntos de datos públicos que están disponibles para que el público en general los consulte. En este instructivo, crearás un flujo de trabajo que ejecuta varios trabajos de consulta de BigQuery en paralelo, lo que demuestra una mejora en el rendimiento en comparación con la ejecución de los trabajos de forma serial, uno tras otro.

Ejecuta un trabajo de consulta de BigQuery

En BigQuery, puedes ejecutar un trabajo de consulta interactivo (a pedido). Para obtener más información, consulta Ejecuta trabajos de consulta interactivos y por lotes.

Console

  1. En la consola de Google Cloud , ve a la página BigQuery.

    Ir a BigQuery

  2. Ingresa la siguiente consulta en SQL en BigQuery en el área de texto del Editor de consultas:

    SELECT TITLE, SUM(views)
    FROM `bigquery-samples.wikipedia_pageviews.201207h`
    GROUP BY TITLE
    ORDER BY SUM(views) DESC
    LIMIT 100
    
  3. Haz clic en Ejecutar.

bq

En tu terminal, ingresa el siguiente comando bq query para ejecutar una consulta interactiva con la sintaxis de SQL estándar:

bq query \
--use_legacy_sql=false \
'SELECT
  TITLE, SUM(views)
FROM
  `bigquery-samples.wikipedia_pageviews.201207h`
GROUP BY
  TITLE
ORDER BY
  SUM(views) DESC
LIMIT 100'

Esto ejecuta una consulta que devuelve los 100 títulos de Wikipedia con más vistas en un mes específico y escribe el resultado en una tabla temporal.

Observa cuánto tiempo tarda en ejecutarse la consulta.

Implementa un flujo de trabajo que ejecuta varias consultas de forma serial

La definición de un flujo de trabajo está compuesta por una serie de pasos descritos con la sintaxis de Workflows. Después de crear un flujo de trabajo, debes implementarlo para que esté disponible para su ejecución. El paso de implementación también valida que se pueda ejecutar el archivo de origen.

El siguiente flujo de trabajo define una lista de cinco tablas para ejecutar una consulta con el conector de BigQuery de Workflows. Las consultas se ejecutan de forma serial, una después de la otra, y los títulos más vistos de cada tabla se guardan en un mapa de resultados.

Console

  1. En la consola de Google Cloud , ve a la página Workflows:

    Ir a Workflows

  2. Haz clic en Crear.

  3. Ingresa un nombre para el flujo de trabajo nuevo, como workflow-serial-bqjobs.

  4. Elegir una región adecuada por ejemplo, us-central1.

  5. Selecciona la cuenta de servicio que creaste anteriormente.

    Ya deberías haber otorgado los roles de IAM de BigQuery > Usuario de trabajo de BigQuery y Logging > Escritor de registros a la cuenta de servicio.

  6. Haz clic en Siguiente.

  7. En el editor de flujos de trabajo, ingresa la siguiente definición para el flujo de trabajo:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            for:
                value: table
                in: ${tables}
                steps:
                - logTable:
                    call: sys.log
                    args:
                        text: ${"Running query for table " + table}
                - runQuery:
                    call: googleapis.bigquery.v2.jobs.query
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            useLegacySql: false
                            useQueryCache: false
                            timeoutMs: 30000
                            # Find top 100 titles with most views on Wikipedia
                            query: ${
                                "SELECT TITLE, SUM(views)
                                FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                WHERE LENGTH(TITLE) > 10
                                GROUP BY TITLE
                                ORDER BY SUM(VIEWS) DESC
                                LIMIT 100"
                                }
                    result: queryResult
                - returnResult:
                    assign:
                        # Return the top title from each table
                        - results[table]: {}
                        - results[table].title: ${queryResult.rows[0].f[0].v}
                        - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  8. Haz clic en Implementar.

gcloud

  1. Abre una terminal y crea un archivo de código fuente para tu flujo de trabajo:

    touch workflow-serial-bqjobs.yaml
  2. Copia el siguiente flujo de trabajo en tu archivo de código fuente:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            for:
                value: table
                in: ${tables}
                steps:
                - logTable:
                    call: sys.log
                    args:
                        text: ${"Running query for table " + table}
                - runQuery:
                    call: googleapis.bigquery.v2.jobs.query
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            useLegacySql: false
                            useQueryCache: false
                            timeoutMs: 30000
                            # Find top 100 titles with most views on Wikipedia
                            query: ${
                                "SELECT TITLE, SUM(views)
                                FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                WHERE LENGTH(TITLE) > 10
                                GROUP BY TITLE
                                ORDER BY SUM(VIEWS) DESC
                                LIMIT 100"
                                }
                    result: queryResult
                - returnResult:
                    assign:
                        # Return the top title from each table
                        - results[table]: {}
                        - results[table].title: ${queryResult.rows[0].f[0].v}
                        - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  3. Para implementar el flujo de trabajo, ingresa el siguiente comando:

    gcloud workflows deploy workflow-serial-bqjobs \
       --source=workflow-serial-bqjobs.yaml \
       --service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com

    Reemplaza MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com por el correo electrónico de la cuenta de servicio que creaste anteriormente.

    Ya deberías haber otorgado los roles de IAM roles/bigquery.jobUser y roles/logging.logWriter a la cuenta de servicio.

Ejecuta el flujo de trabajo y ejecuta varias consultas de forma serial

Ejecuta la definición actual del flujo de trabajo asociada con el flujo de trabajo.

Console

  1. En la consola de Google Cloud , ve a la página Workflows:

    Ir a Workflows

  2. En la página Flujos de trabajo, selecciona el flujo de trabajo workflow-serial-bqjobs para ir a su página de detalles.

  3. En la página Detalles del flujo de trabajo, haz clic en Ejecutar.

  4. Haz clic de nuevo en Ejecutar.

  5. Consulta los resultados del flujo de trabajo en el panel Output.

gcloud

  1. Abre una terminal.

  2. Ejecuta el flujo de trabajo:

     gcloud workflows run workflow-serial-bqjob

La ejecución del flujo de trabajo debería demorar aproximadamente un minuto o cinco veces el tiempo de ejecución anterior. El resultado incluirá cada tabla y se verá similar al siguiente:

{
  "201201h": {
    "title": "Special:Search",
    "views": "14591339"
  },
  "201202h": {
    "title": "Special:Search",
    "views": "132765420"
  },
  "201203h": {
    "title": "Special:Search",
    "views": "123316818"
  },
  "201204h": {
    "title": "Special:Search",
    "views": "116830614"
  },
  "201205h": {
    "title": "Special:Search",
    "views": "131357063"
  }
}

Implementa y ejecuta un flujo de trabajo que ejecuta varias consultas en paralelo

En lugar de ejecutar cinco consultas de forma secuencial, puedes ejecutarlas en paralelo realizando algunos cambios:

 - runQueries:
    parallel:
        shared: [results]
        for:
            value: table
            in: ${tables}
  • Un paso parallel permite que cada iteración del bucle for se ejecute en paralelo.
  • La variable results se declara como shared, lo que permite que una rama la escriba, y el resultado de cada rama se puede agregar a ella.

Console

  1. En la consola de Google Cloud , ve a la página Workflows:

    Ir a Workflows

  2. Haz clic en Crear.

  3. Ingresa un nombre para el flujo de trabajo nuevo, como workflow-parallel-bqjobs.

  4. Elegir una región adecuada por ejemplo, us-central1.

  5. Selecciona la cuenta de servicio que creaste anteriormente.

  6. Haz clic en Siguiente.

  7. En el editor de flujos de trabajo, ingresa la siguiente definición para el flujo de trabajo:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            parallel:
                shared: [results]
                for:
                    value: table
                    in: ${tables}
                    steps:
                    - logTable:
                        call: sys.log
                        args:
                            text: ${"Running query for table " + table}
                    - runQuery:
                        call: googleapis.bigquery.v2.jobs.query
                        args:
                            projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                            body:
                                useLegacySql: false
                                useQueryCache: false
                                timeoutMs: 30000
                                # Find top 100 titles with most views on Wikipedia
                                query: ${
                                    "SELECT TITLE, SUM(views)
                                    FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                    WHERE LENGTH(TITLE) > 10
                                    GROUP BY TITLE
                                    ORDER BY SUM(VIEWS) DESC
                                    LIMIT 100"
                                    }
                        result: queryResult
                    - returnResult:
                        assign:
                            # Return the top title from each table
                            - results[table]: {}
                            - results[table].title: ${queryResult.rows[0].f[0].v}
                            - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  8. Haz clic en Implementar.

  9. En la página Detalles del flujo de trabajo, haz clic en Ejecutar.

  10. Haz clic de nuevo en Ejecutar.

  11. Consulta los resultados del flujo de trabajo en el panel Output.

gcloud

  1. Abre una terminal y crea un archivo de código fuente para tu flujo de trabajo:

    touch workflow-parallel-bqjobs.yaml
  2. Copia el siguiente flujo de trabajo en tu archivo de código fuente:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            parallel:
                shared: [results]
                for:
                    value: table
                    in: ${tables}
                    steps:
                    - logTable:
                        call: sys.log
                        args:
                            text: ${"Running query for table " + table}
                    - runQuery:
                        call: googleapis.bigquery.v2.jobs.query
                        args:
                            projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                            body:
                                useLegacySql: false
                                useQueryCache: false
                                timeoutMs: 30000
                                # Find top 100 titles with most views on Wikipedia
                                query: ${
                                    "SELECT TITLE, SUM(views)
                                    FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                    WHERE LENGTH(TITLE) > 10
                                    GROUP BY TITLE
                                    ORDER BY SUM(VIEWS) DESC
                                    LIMIT 100"
                                    }
                        result: queryResult
                    - returnResult:
                        assign:
                            # Return the top title from each table
                            - results[table]: {}
                            - results[table].title: ${queryResult.rows[0].f[0].v}
                            - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  3. Para implementar el flujo de trabajo, ingresa el siguiente comando:

    gcloud workflows deploy workflow-parallell-bqjobs \
       --source=workflow-parallel-bqjobs.yaml \
       --service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com

    Reemplaza MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com por el correo electrónico de la cuenta de servicio que creaste anteriormente.

  4. Ejecuta el flujo de trabajo:

     gcloud workflows run workflow-parallel-bqjobs

El resultado será similar al anterior, pero la ejecución del flujo de trabajo debería tardar aproximadamente veinte segundos o menos.