并行运行多个 BigQuery 作业

BigQuery 托管了许多可供公众查询的公共数据集。在本教程中,您将创建一个并行运行多个 BigQuery 查询作业的工作流,与按顺序依次运行作业相比,性能有所提升。

运行 BigQuery 查询作业

在 BigQuery 中,您可以运行交互式(按需)查询作业。如需了解详情,请参阅运行交互式查询作业和批量查询作业

控制台

  1. 在 Google Cloud 控制台中,前往 BigQuery 页面。

    转到 BigQuery

  2. 查询编辑器文本区域中输入以下 BigQuery SQL 查询:

    SELECT TITLE, SUM(views)
    FROM `bigquery-samples.wikipedia_pageviews.201207h`
    GROUP BY TITLE
    ORDER BY SUM(views) DESC
    LIMIT 100
    
  3. 点击运行

bq

在终端中,输入以下 bq query 命令,以使用标准 SQL 语法运行交互式查询:

bq query \
--use_legacy_sql=false \
'SELECT
  TITLE, SUM(views)
FROM
  `bigquery-samples.wikipedia_pageviews.201207h`
GROUP BY
  TITLE
ORDER BY
  SUM(views) DESC
LIMIT 100'

此命令会运行一个查询,该查询会返回特定月份内浏览量最高的 100 个维基百科标题,并将输出写入临时表中。

注意查询运行所需的时间。

部署按顺序运行多个查询的工作流

工作流定义由一系列使用 Workflows 语法描述的步骤组成。创建工作流后,可以进行部署,使其可以执行。部署步骤还会验证源文件是否可以执行。

以下工作流定义了一个包含五个表的列表,用于使用 Workflows BigQuery 连接器针对这些表运行查询。 这些查询会按顺序依次运行,每个表中最受观看者欢迎的影视内容会保存到结果映射中。

控制台

  1. 在 Google Cloud 控制台中,前往 Workflows 页面:

    转到 Workflows

  2. 点击创建

  3. 输入新工作流的名称,例如 workflow-serial-bqjobs

  4. 选择适当的区域;例如 us-central1

  5. 选择您之前创建的服务账号

    您应该已向该服务账号授予 BigQuery > BigQuery Job UserLogging > Logs Writer IAM 角色。

  6. 点击下一步

  7. 在工作流编辑器中,输入工作流的定义:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            for:
                value: table
                in: ${tables}
                steps:
                - logTable:
                    call: sys.log
                    args:
                        text: ${"Running query for table " + table}
                - runQuery:
                    call: googleapis.bigquery.v2.jobs.query
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            useLegacySql: false
                            useQueryCache: false
                            timeoutMs: 30000
                            # Find top 100 titles with most views on Wikipedia
                            query: ${
                                "SELECT TITLE, SUM(views)
                                FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                WHERE LENGTH(TITLE) > 10
                                GROUP BY TITLE
                                ORDER BY SUM(VIEWS) DESC
                                LIMIT 100"
                                }
                    result: queryResult
                - returnResult:
                    assign:
                        # Return the top title from each table
                        - results[table]: {}
                        - results[table].title: ${queryResult.rows[0].f[0].v}
                        - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  8. 点击部署

gcloud

  1. 打开终端,然后为工作流创建源代码文件:

    touch workflow-serial-bqjobs.yaml
  2. 将以下工作流复制到您的源代码文件中:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            for:
                value: table
                in: ${tables}
                steps:
                - logTable:
                    call: sys.log
                    args:
                        text: ${"Running query for table " + table}
                - runQuery:
                    call: googleapis.bigquery.v2.jobs.query
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            useLegacySql: false
                            useQueryCache: false
                            timeoutMs: 30000
                            # Find top 100 titles with most views on Wikipedia
                            query: ${
                                "SELECT TITLE, SUM(views)
                                FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                WHERE LENGTH(TITLE) > 10
                                GROUP BY TITLE
                                ORDER BY SUM(VIEWS) DESC
                                LIMIT 100"
                                }
                    result: queryResult
                - returnResult:
                    assign:
                        # Return the top title from each table
                        - results[table]: {}
                        - results[table].title: ${queryResult.rows[0].f[0].v}
                        - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  3. 输入以下命令以部署工作流:

    gcloud workflows deploy workflow-serial-bqjobs \
       --source=workflow-serial-bqjobs.yaml \
       --service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com

    MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com 替换为您之前创建的服务账号的电子邮件地址。

    您应该已向服务账号授予 roles/bigquery.jobUserroles/logging.logWriter IAM 角色。

执行工作流并按顺序运行多个查询

执行某个工作流会运行与该工作流关联的当前工作流定义。

控制台

  1. 在 Google Cloud 控制台中,前往 Workflows 页面:

    转到 Workflows

  2. 工作流页面上,选择 workflow-serial-bqjobs 工作流以转到其详情页面。

  3. 工作流详细信息页面上,选择 执行

  4. 再次点击执行

  5. 输出窗格中查看工作流的结果。

gcloud

  1. 打开终端。

  2. 执行工作流:

     gcloud workflows run workflow-serial-bqjob

工作流执行时间应约为 1 分钟或之前运行时间的 5 倍。结果将包含每个表,并且类似于以下内容:

{
  "201201h": {
    "title": "Special:Search",
    "views": "14591339"
  },
  "201202h": {
    "title": "Special:Search",
    "views": "132765420"
  },
  "201203h": {
    "title": "Special:Search",
    "views": "123316818"
  },
  "201204h": {
    "title": "Special:Search",
    "views": "116830614"
  },
  "201205h": {
    "title": "Special:Search",
    "views": "131357063"
  }
}

部署并执行可并行运行多个查询的工作流

您只需稍作更改,即可并行运行查询,而无需按顺序运行五个查询:

 - runQueries:
    parallel:
        shared: [results]
        for:
            value: table
            in: ${tables}
  • parallel 步允许 for 循环的每次迭代并行运行。
  • results 变量声明为 shared,这样分支就可以写入该变量,并且每个分支的结果都可以附加到该变量。

控制台

  1. 在 Google Cloud 控制台中,前往 Workflows 页面:

    转到 Workflows

  2. 点击创建

  3. 输入新工作流的名称,例如 workflow-parallel-bqjobs

  4. 选择适当的区域;例如 us-central1

  5. 选择您之前创建的服务账号

  6. 点击下一步

  7. 在工作流编辑器中,输入工作流的定义:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            parallel:
                shared: [results]
                for:
                    value: table
                    in: ${tables}
                    steps:
                    - logTable:
                        call: sys.log
                        args:
                            text: ${"Running query for table " + table}
                    - runQuery:
                        call: googleapis.bigquery.v2.jobs.query
                        args:
                            projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                            body:
                                useLegacySql: false
                                useQueryCache: false
                                timeoutMs: 30000
                                # Find top 100 titles with most views on Wikipedia
                                query: ${
                                    "SELECT TITLE, SUM(views)
                                    FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                    WHERE LENGTH(TITLE) > 10
                                    GROUP BY TITLE
                                    ORDER BY SUM(VIEWS) DESC
                                    LIMIT 100"
                                    }
                        result: queryResult
                    - returnResult:
                        assign:
                            # Return the top title from each table
                            - results[table]: {}
                            - results[table].title: ${queryResult.rows[0].f[0].v}
                            - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  8. 点击部署

  9. 工作流详细信息页面上,选择 执行

  10. 再次点击执行

  11. 输出窗格中查看工作流的结果。

gcloud

  1. 打开终端,然后为工作流创建源代码文件:

    touch workflow-parallel-bqjobs.yaml
  2. 将以下工作流复制到您的源代码文件中:

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            parallel:
                shared: [results]
                for:
                    value: table
                    in: ${tables}
                    steps:
                    - logTable:
                        call: sys.log
                        args:
                            text: ${"Running query for table " + table}
                    - runQuery:
                        call: googleapis.bigquery.v2.jobs.query
                        args:
                            projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                            body:
                                useLegacySql: false
                                useQueryCache: false
                                timeoutMs: 30000
                                # Find top 100 titles with most views on Wikipedia
                                query: ${
                                    "SELECT TITLE, SUM(views)
                                    FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                    WHERE LENGTH(TITLE) > 10
                                    GROUP BY TITLE
                                    ORDER BY SUM(VIEWS) DESC
                                    LIMIT 100"
                                    }
                        result: queryResult
                    - returnResult:
                        assign:
                            # Return the top title from each table
                            - results[table]: {}
                            - results[table].title: ${queryResult.rows[0].f[0].v}
                            - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  3. 输入以下命令以部署工作流:

    gcloud workflows deploy workflow-parallell-bqjobs \
       --source=workflow-parallel-bqjobs.yaml \
       --service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com

    MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com 替换为您之前创建的服务账号的电子邮件地址。

  4. 执行工作流:

     gcloud workflows run workflow-parallel-bqjobs

结果将与之前的输出类似,但工作流执行时间应约为 20 秒或更短!