Executar vários jobs do BigQuery em paralelo

O BigQuery hospeda vários conjuntos de dados públicos que estão disponíveis para consulta do público em geral. Neste tutorial, você vai criar um fluxo de trabalho que executa vários jobs de consulta do BigQuery em paralelo, demonstrando uma melhoria no desempenho em comparação com a execução dos jobs em série, um após o outro.

Executar um job de consulta do BigQuery

No BigQuery, é possível executar um job de consulta interativa (sob demanda). Para mais informações, consulte Como executar jobs de consulta interativos e em lote.

Console

  1. No console do Google Cloud , acesse a página BigQuery.

    Acessar o BigQuery

  2. Insira a seguinte consulta SQL do BigQuery na área de texto do Editor de consultas:

    SELECT TITLE, SUM(views)
    FROM `bigquery-samples.wikipedia_pageviews.201207h`
    GROUP BY TITLE
    ORDER BY SUM(views) DESC
    LIMIT 100
    
  3. Clique em Executar.

bq

No terminal, insira o seguinte comando bq query para executar uma consulta interativa usando a sintaxe SQL padrão:

bq query \
--use_legacy_sql=false \
'SELECT
  TITLE, SUM(views)
FROM
  `bigquery-samples.wikipedia_pageviews.201207h`
GROUP BY
  TITLE
ORDER BY
  SUM(views) DESC
LIMIT 100'

Isso executa uma consulta que retorna os 100 títulos da Wikipédia com mais visualizações em um mês específico e grava a saída em uma tabela temporária.

Observe quanto tempo a consulta leva para ser executada.

Implantar um fluxo de trabalho que executa várias consultas em série

Uma definição de fluxo de trabalho é composta por uma série de etapas descritas usando a sintaxe do Workflows. Depois de criar um fluxo de trabalho, implante-o para que ele fique disponível para execução. A etapa de implantação também valida a execução do arquivo de origem.

O fluxo de trabalho a seguir define uma lista de cinco tabelas para executar uma consulta usando o conector do BigQuery do Workflows. As consultas são executadas em série, uma após a outra, e os títulos mais assistidos de cada tabela são salvos em um mapa de resultados.

Console

  1. No console Google Cloud , acesse a página Fluxos de trabalho:

    Acessar fluxos de trabalho

  2. Clique em Criar.

  3. Insira um nome para o novo fluxo de trabalho, como workflow-serial-bqjobs.

  4. Escolha uma região apropriada; por exemplo: us-central1.

  5. Selecione a conta de serviço que você criou.

    Você já precisa ter concedido os papéis do IAM BigQuery > Usuário de jobs do BigQuery e Logging > Gravador de registros à conta de serviço.

  6. Clique em Próxima.

  7. No editor de fluxo de trabalho, insira a seguinte definição:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            for:
                value: table
                in: ${tables}
                steps:
                - logTable:
                    call: sys.log
                    args:
                        text: ${"Running query for table " + table}
                - runQuery:
                    call: googleapis.bigquery.v2.jobs.query
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            useLegacySql: false
                            useQueryCache: false
                            timeoutMs: 30000
                            # Find top 100 titles with most views on Wikipedia
                            query: ${
                                "SELECT TITLE, SUM(views)
                                FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                WHERE LENGTH(TITLE) > 10
                                GROUP BY TITLE
                                ORDER BY SUM(VIEWS) DESC
                                LIMIT 100"
                                }
                    result: queryResult
                - returnResult:
                    assign:
                        # Return the top title from each table
                        - results[table]: {}
                        - results[table].title: ${queryResult.rows[0].f[0].v}
                        - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  8. Clique em Implantar.

gcloud

  1. Abra um terminal e crie um arquivo de código-fonte para seu fluxo de trabalho:

    touch workflow-serial-bqjobs.yaml
  2. Copie o fluxo de trabalho a seguir no arquivo de código-fonte:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            for:
                value: table
                in: ${tables}
                steps:
                - logTable:
                    call: sys.log
                    args:
                        text: ${"Running query for table " + table}
                - runQuery:
                    call: googleapis.bigquery.v2.jobs.query
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            useLegacySql: false
                            useQueryCache: false
                            timeoutMs: 30000
                            # Find top 100 titles with most views on Wikipedia
                            query: ${
                                "SELECT TITLE, SUM(views)
                                FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                WHERE LENGTH(TITLE) > 10
                                GROUP BY TITLE
                                ORDER BY SUM(VIEWS) DESC
                                LIMIT 100"
                                }
                    result: queryResult
                - returnResult:
                    assign:
                        # Return the top title from each table
                        - results[table]: {}
                        - results[table].title: ${queryResult.rows[0].f[0].v}
                        - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  3. Implante o fluxo de trabalho digitando o seguinte comando:

    gcloud workflows deploy workflow-serial-bqjobs \
       --source=workflow-serial-bqjobs.yaml \
       --service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com

    Substitua MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com pelo e-mail da conta de serviço que você criou.

    Você já deve ter concedido os papéis do IAM roles/bigquery.jobUser e roles/logging.logWriter à conta de serviço.

Executar o fluxo de trabalho e várias consultas em série

Quando um fluxo de trabalho é executado, a definição atual associada a ele também é.

Console

  1. No console Google Cloud , acesse a página Fluxos de trabalho:

    Acessar fluxos de trabalho

  2. Na página Fluxos de trabalho, selecione o fluxo de trabalho workflow-serial-bqjobs para acessar a página de detalhes dele.

  3. Na página Detalhes do fluxo de trabalho, clique em Executar.

  4. Clique em Executar novamente.

  5. Veja os resultados do fluxo de trabalho no painel Saída.

gcloud

  1. Abra um terminal.

  2. Execute o fluxo de trabalho:

     gcloud workflows run workflow-serial-bqjob

A execução do fluxo de trabalho deve levar aproximadamente um minuto ou cinco vezes o tempo de execução anterior. O resultado vai incluir cada tabela e será semelhante a esta:

{
  "201201h": {
    "title": "Special:Search",
    "views": "14591339"
  },
  "201202h": {
    "title": "Special:Search",
    "views": "132765420"
  },
  "201203h": {
    "title": "Special:Search",
    "views": "123316818"
  },
  "201204h": {
    "title": "Special:Search",
    "views": "116830614"
  },
  "201205h": {
    "title": "Special:Search",
    "views": "131357063"
  }
}

Implante e execute um fluxo de trabalho que executa várias consultas em paralelo

Em vez de executar cinco consultas em sequência, é possível executá-las em paralelo fazendo algumas mudanças:

 - runQueries:
    parallel:
        shared: [results]
        for:
            value: table
            in: ${tables}
  • Uma etapa parallel permite que cada iteração do loop for seja executada em paralelo.
  • A variável results é declarada como shared, o que permite que ela seja gravável por uma ramificação, e o resultado de cada ramificação pode ser anexado a ela.

Console

  1. No console Google Cloud , acesse a página Fluxos de trabalho:

    Acessar fluxos de trabalho

  2. Clique em Criar.

  3. Insira um nome para o novo fluxo de trabalho, como workflow-parallel-bqjobs.

  4. Escolha uma região apropriada; por exemplo: us-central1.

  5. Selecione a conta de serviço que você criou.

  6. Clique em Próxima.

  7. No editor de fluxo de trabalho, insira a seguinte definição:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            parallel:
                shared: [results]
                for:
                    value: table
                    in: ${tables}
                    steps:
                    - logTable:
                        call: sys.log
                        args:
                            text: ${"Running query for table " + table}
                    - runQuery:
                        call: googleapis.bigquery.v2.jobs.query
                        args:
                            projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                            body:
                                useLegacySql: false
                                useQueryCache: false
                                timeoutMs: 30000
                                # Find top 100 titles with most views on Wikipedia
                                query: ${
                                    "SELECT TITLE, SUM(views)
                                    FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                    WHERE LENGTH(TITLE) > 10
                                    GROUP BY TITLE
                                    ORDER BY SUM(VIEWS) DESC
                                    LIMIT 100"
                                    }
                        result: queryResult
                    - returnResult:
                        assign:
                            # Return the top title from each table
                            - results[table]: {}
                            - results[table].title: ${queryResult.rows[0].f[0].v}
                            - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  8. Clique em Implantar.

  9. Na página Detalhes do fluxo de trabalho, clique em Executar.

  10. Clique em Executar novamente.

  11. Veja os resultados do fluxo de trabalho no painel Saída.

gcloud

  1. Abra um terminal e crie um arquivo de código-fonte para seu fluxo de trabalho:

    touch workflow-parallel-bqjobs.yaml
  2. Copie o fluxo de trabalho a seguir no arquivo de código-fonte:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            parallel:
                shared: [results]
                for:
                    value: table
                    in: ${tables}
                    steps:
                    - logTable:
                        call: sys.log
                        args:
                            text: ${"Running query for table " + table}
                    - runQuery:
                        call: googleapis.bigquery.v2.jobs.query
                        args:
                            projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                            body:
                                useLegacySql: false
                                useQueryCache: false
                                timeoutMs: 30000
                                # Find top 100 titles with most views on Wikipedia
                                query: ${
                                    "SELECT TITLE, SUM(views)
                                    FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                    WHERE LENGTH(TITLE) > 10
                                    GROUP BY TITLE
                                    ORDER BY SUM(VIEWS) DESC
                                    LIMIT 100"
                                    }
                        result: queryResult
                    - returnResult:
                        assign:
                            # Return the top title from each table
                            - results[table]: {}
                            - results[table].title: ${queryResult.rows[0].f[0].v}
                            - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  3. Implante o fluxo de trabalho digitando o seguinte comando:

    gcloud workflows deploy workflow-parallell-bqjobs \
       --source=workflow-parallel-bqjobs.yaml \
       --service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com

    Substitua MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com pelo e-mail da conta de serviço que você criou.

  4. Execute o fluxo de trabalho:

     gcloud workflows run workflow-parallel-bqjobs

O resultado será semelhante à saída anterior, mas a execução do fluxo de trabalho deve levar aproximadamente 20 segundos ou menos.