Run multiple BigQuery jobs in parallel

BigQuery hosts a number of public datasets that are available to the general public to query. In this tutorial, you create a workflow that runs multiple BigQuery query jobs in parallel, demonstrating an improvement in performance when compared to running the jobs serially, one after the other.

Run a BigQuery query job

In BigQuery, you can run an interactive (on-demand) query job. For more information, see Running interactive and batch query jobs.

Console

  1. In the Google Cloud console, go to the BigQuery page.

    Go to BigQuery

  2. Enter the following BigQuery SQL query in the Query editor text area:

    SELECT TITLE, SUM(views)
    FROM `bigquery-samples.wikipedia_pageviews.201207h`
    GROUP BY TITLE
    ORDER BY SUM(views) DESC
    LIMIT 100
    
  3. Click Run.

bq

In your terminal, enter the following bq query command to run an interactive query using standard SQL syntax:

bq query \
--use_legacy_sql=false \
'SELECT
  TITLE, SUM(views)
FROM
  `bigquery-samples.wikipedia_pageviews.201207h`
GROUP BY
  TITLE
ORDER BY
  SUM(views) DESC
LIMIT 100'

This runs a query that returns the top 100 Wikipedia titles with the most views in a specific month and writes the output to a temporary table.

Note how long the query takes to run.

Deploy a workflow that runs multiple queries serially

A workflow definition is made up of a series of steps described using the Workflows syntax. After creating a workflow, you deploy it to make it available for execution. The deploy step also validates that the source file can be executed.

The following workflow defines a list of five tables to run a query against using the Workflows BigQuery connector. The queries are run serially, one after the other, and the most viewed titles from each table are saved to a results map.

Console

  1. In the Google Cloud console, go to the Workflows page:

    Go to Workflows

  2. Click Create.

  3. Enter a name for the new workflow, such as workflow-serial-bqjobs.

  4. Choose an appropriate region; for example, us-central1.

  5. Select the service account you previously created.

    You should have already granted both the BigQuery > BigQuery Job User and Logging > Logs Writer IAM roles to the service account.

  6. Click Next.

  7. In the workflow editor, enter the following definition for your workflow:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            for:
                value: table
                in: ${tables}
                steps:
                - logTable:
                    call: sys.log
                    args:
                        text: ${"Running query for table " + table}
                - runQuery:
                    call: googleapis.bigquery.v2.jobs.query
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            useLegacySql: false
                            useQueryCache: false
                            timeoutMs: 30000
                            # Find top 100 titles with most views on Wikipedia
                            query: ${
                                "SELECT TITLE, SUM(views)
                                FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                WHERE LENGTH(TITLE) > 10
                                GROUP BY TITLE
                                ORDER BY SUM(VIEWS) DESC
                                LIMIT 100"
                                }
                    result: queryResult
                - returnResult:
                    assign:
                        # Return the top title from each table
                        - results[table]: {}
                        - results[table].title: ${queryResult.rows[0].f[0].v}
                        - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  8. Click Deploy.

gcloud

  1. Open a terminal and create a source code file for your workflow:

    touch workflow-serial-bqjobs.yaml
  2. Copy the following workflow to your source code file:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            for:
                value: table
                in: ${tables}
                steps:
                - logTable:
                    call: sys.log
                    args:
                        text: ${"Running query for table " + table}
                - runQuery:
                    call: googleapis.bigquery.v2.jobs.query
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            useLegacySql: false
                            useQueryCache: false
                            timeoutMs: 30000
                            # Find top 100 titles with most views on Wikipedia
                            query: ${
                                "SELECT TITLE, SUM(views)
                                FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                WHERE LENGTH(TITLE) > 10
                                GROUP BY TITLE
                                ORDER BY SUM(VIEWS) DESC
                                LIMIT 100"
                                }
                    result: queryResult
                - returnResult:
                    assign:
                        # Return the top title from each table
                        - results[table]: {}
                        - results[table].title: ${queryResult.rows[0].f[0].v}
                        - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  3. Deploy the workflow by entering the following command:

    gcloud workflows deploy workflow-serial-bqjobs \
       --source=workflow-serial-bqjobs.yaml \
       --service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com

    Replace MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com with the email of the service account you previously created.

    You should have already granted both the roles/bigquery.jobUser and roles/logging.logWriter IAM roles to the service account.

Execute the workflow and run multiple queries serially

Executing a workflow runs the current workflow definition associated with the workflow.

Console

  1. In the Google Cloud console, go to the Workflows page:

    Go to Workflows

  2. On the Workflows page, select the workflow-serial-bqjobs workflow to go to its details page.

  3. On the Workflow Details page, click Execute.

  4. Click Execute again.

  5. View the results of the workflow in the Output pane.

gcloud

  1. Open a terminal.

  2. Execute the workflow:

     gcloud workflows run workflow-serial-bqjob

The workflow execution should take approximately a minute or five times the previous running time. The result will include each table and look similar to the following:

{
  "201201h": {
    "title": "Special:Search",
    "views": "14591339"
  },
  "201202h": {
    "title": "Special:Search",
    "views": "132765420"
  },
  "201203h": {
    "title": "Special:Search",
    "views": "123316818"
  },
  "201204h": {
    "title": "Special:Search",
    "views": "116830614"
  },
  "201205h": {
    "title": "Special:Search",
    "views": "131357063"
  }
}

Deploy and execute a workflow that runs multiple queries in parallel

Instead of running five queries sequentially, you can run the queries in parallel by making a few changes:

 - runQueries:
    parallel:
        shared: [results]
        for:
            value: table
            in: ${tables}
  • A parallel step allows each iteration of the for loop to run in parallel.
  • The results variable is declared as shared which allows it to be writable by a branch, and the result of each branch can be appended to it.

Console

  1. In the Google Cloud console, go to the Workflows page:

    Go to Workflows

  2. Click Create.

  3. Enter a name for the new workflow, such as workflow-parallel-bqjobs.

  4. Choose an appropriate region; for example, us-central1.

  5. Select the service account you previously created.

  6. Click Next.

  7. In the workflow editor, enter the following definition for your workflow:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            parallel:
                shared: [results]
                for:
                    value: table
                    in: ${tables}
                    steps:
                    - logTable:
                        call: sys.log
                        args:
                            text: ${"Running query for table " + table}
                    - runQuery:
                        call: googleapis.bigquery.v2.jobs.query
                        args:
                            projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                            body:
                                useLegacySql: false
                                useQueryCache: false
                                timeoutMs: 30000
                                # Find top 100 titles with most views on Wikipedia
                                query: ${
                                    "SELECT TITLE, SUM(views)
                                    FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                    WHERE LENGTH(TITLE) > 10
                                    GROUP BY TITLE
                                    ORDER BY SUM(VIEWS) DESC
                                    LIMIT 100"
                                    }
                        result: queryResult
                    - returnResult:
                        assign:
                            # Return the top title from each table
                            - results[table]: {}
                            - results[table].title: ${queryResult.rows[0].f[0].v}
                            - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  8. Click Deploy.

  9. On the Workflow Details page, click Execute.

  10. Click Execute again.

  11. View the results of the workflow in the Output pane.

gcloud

  1. Open a terminal and create a source code file for your workflow:

    touch workflow-parallel-bqjobs.yaml
  2. Copy the following workflow to your source code file:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            parallel:
                shared: [results]
                for:
                    value: table
                    in: ${tables}
                    steps:
                    - logTable:
                        call: sys.log
                        args:
                            text: ${"Running query for table " + table}
                    - runQuery:
                        call: googleapis.bigquery.v2.jobs.query
                        args:
                            projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                            body:
                                useLegacySql: false
                                useQueryCache: false
                                timeoutMs: 30000
                                # Find top 100 titles with most views on Wikipedia
                                query: ${
                                    "SELECT TITLE, SUM(views)
                                    FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                    WHERE LENGTH(TITLE) > 10
                                    GROUP BY TITLE
                                    ORDER BY SUM(VIEWS) DESC
                                    LIMIT 100"
                                    }
                        result: queryResult
                    - returnResult:
                        assign:
                            # Return the top title from each table
                            - results[table]: {}
                            - results[table].title: ${queryResult.rows[0].f[0].v}
                            - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  3. Deploy the workflow by entering the following command:

    gcloud workflows deploy workflow-parallell-bqjobs \
       --source=workflow-parallel-bqjobs.yaml \
       --service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com

    Replace MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com with the email of the service account you previously created.

  4. Execute the workflow:

     gcloud workflows run workflow-parallel-bqjobs

The result will be similar to the previous output but the workflow execution should take approximately twenty seconds or less!