複数の BigQuery ジョブを並列実行する

BigQuery は、クエリ対象として一般公開できるいくつかの一般公開データセットをホストしています。このチュートリアルでは、複数の BigQuery クエリジョブを同時に実行するワークフローを作成して、ジョブを順次実行する場合に比べてパフォーマンスが向上していることを示します。

BigQuery クエリジョブを実行する

BigQuery では、インタラクティブ(オンデマンド)クエリジョブを実行できます。詳細については、インタラクティブ クエリとバッチクエリのジョブの実行をご覧ください。

コンソール

  1. Google Cloud コンソールで、[BigQuery] ページに移動します。

    BigQuery に移動

  2. [クエリエディタ] のテキスト領域に、次の BigQuery SQL クエリを入力します。

    SELECT TITLE, SUM(views)
    FROM `bigquery-samples.wikipedia_pageviews.201207h`
    GROUP BY TITLE
    ORDER BY SUM(views) DESC
    LIMIT 100
    
  3. [実行] をクリックします。

bq

ターミナルで、次の bq query コマンドを入力し、標準 SQL 構文を使用してインタラクティブ クエリを実行します。

bq query \
--use_legacy_sql=false \
'SELECT
  TITLE, SUM(views)
FROM
  `bigquery-samples.wikipedia_pageviews.201207h`
GROUP BY
  TITLE
ORDER BY
  SUM(views) DESC
LIMIT 100'

このクエリは、特定の月で閲覧数が最も多い上位 100 件の Wikipedia タイトルを返すクエリを実行し、その出力を一時テーブルに書き込みます。

クエリの実行に要した時間をメモします。

複数のクエリを順次実行するワークフローをデプロイする

ワークフロー定義は、ワークフロー構文を使用して説明した一連のステップで構成されています。ワークフローを作成したら、デプロイして実行できるようにします。デプロイの手順では、ソースファイルを実行できることも検証されます。

次のワークフローでは、Workflows の BigQuery コネクタを使用して、クエリを実行する 5 つのテーブルのリストを定義しています。クエリは順次実行され、各テーブルで最も閲覧されたタイトルが結果マップに保存されます。

コンソール

  1. Google Cloud コンソールで、[ワークフロー] ページに移動します。

    [ワークフロー] に移動

  2. [作成] をクリックします。

  3. 新しいワークフローの名前を入力します(例: workflow-serial-bqjobs)。

  4. 適切なリージョンを選択します(例: us-central1)。

  5. 先ほど作成したサービス アカウントを選択します。

    BigQuery > BigQuery ジョブユーザーLogging > ログ書き込み IAM ロールの両方がすでにサービス アカウントに付与されている必要があります。

  6. [次へ] をクリックします。

  7. ワークフロー エディタで、次のワークフローの定義を入力します。

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            for:
                value: table
                in: ${tables}
                steps:
                - logTable:
                    call: sys.log
                    args:
                        text: ${"Running query for table " + table}
                - runQuery:
                    call: googleapis.bigquery.v2.jobs.query
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            useLegacySql: false
                            useQueryCache: false
                            timeoutMs: 30000
                            # Find top 100 titles with most views on Wikipedia
                            query: ${
                                "SELECT TITLE, SUM(views)
                                FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                WHERE LENGTH(TITLE) > 10
                                GROUP BY TITLE
                                ORDER BY SUM(VIEWS) DESC
                                LIMIT 100"
                                }
                    result: queryResult
                - returnResult:
                    assign:
                        # Return the top title from each table
                        - results[table]: {}
                        - results[table].title: ${queryResult.rows[0].f[0].v}
                        - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  8. [デプロイ] をクリックします。

gcloud

  1. ターミナルを開き、ワークフローのソースコード ファイルを作成します。

    touch workflow-serial-bqjobs.yaml
  2. 次のワークフローをソースコード ファイルにコピーします。

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            for:
                value: table
                in: ${tables}
                steps:
                - logTable:
                    call: sys.log
                    args:
                        text: ${"Running query for table " + table}
                - runQuery:
                    call: googleapis.bigquery.v2.jobs.query
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            useLegacySql: false
                            useQueryCache: false
                            timeoutMs: 30000
                            # Find top 100 titles with most views on Wikipedia
                            query: ${
                                "SELECT TITLE, SUM(views)
                                FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                WHERE LENGTH(TITLE) > 10
                                GROUP BY TITLE
                                ORDER BY SUM(VIEWS) DESC
                                LIMIT 100"
                                }
                    result: queryResult
                - returnResult:
                    assign:
                        # Return the top title from each table
                        - results[table]: {}
                        - results[table].title: ${queryResult.rows[0].f[0].v}
                        - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  3. 次のコマンドを入力してワークフローをデプロイします。

    gcloud workflows deploy workflow-serial-bqjobs \
       --source=workflow-serial-bqjobs.yaml \
       --service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com

    MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com は、先ほど作成したサービス アカウントのメールアドレスに置き換えます。

    すでにサービス アカウントに roles/bigquery.jobUserroles/logging.logWriter の両方の IAM ロールを付与している必要があります。

ワークフローを実行し、複数のクエリを順次実行する

ワークフローを実行すると、そのワークフローに関連付けられた現在のワークフロー定義が実行されます。

コンソール

  1. Google Cloud コンソールで、[ワークフロー] ページに移動します。

    [ワークフロー] に移動

  2. [Workflows] ページで、[workflow-serial-bqjobs] ワークフローを選択して、詳細ページに移動します。

  3. [ワークフローの詳細] ページで [ 実行] を選択します。

  4. もう一度 [Execute] をクリックします。

  5. ワークフローの結果が [出力] ペインに表示されます。

gcloud

  1. ターミナルを開きます。

  2. ワークフローを実行します。

     gcloud workflows run workflow-serial-bqjob

ワークフローの実行には、約 1 分すなわち前回の実行時間の 5 倍ほどの時間を要します。結果には各テーブルが含まれており、次のようになります。

{
  "201201h": {
    "title": "Special:Search",
    "views": "14591339"
  },
  "201202h": {
    "title": "Special:Search",
    "views": "132765420"
  },
  "201203h": {
    "title": "Special:Search",
    "views": "123316818"
  },
  "201204h": {
    "title": "Special:Search",
    "views": "116830614"
  },
  "201205h": {
    "title": "Special:Search",
    "views": "131357063"
  }
}

複数のクエリを並列に実行するワークフローをデプロイして実行する

いくつかの変更を加えることで、5 つのクエリを順次実行する代わりに、クエリを並列に実行することができます。

 - runQueries:
    parallel:
        shared: [results]
        for:
            value: table
            in: ${tables}
  • parallel ステップを使用すると、for ループの各イテレーションを並列に実行できます。
  • results 変数は shared として宣言されます。これにより、ブランチによる書き込みが可能になり、各ブランチの結果を追加できます。

コンソール

  1. Google Cloud コンソールで、[ワークフロー] ページに移動します。

    [ワークフロー] に移動

  2. [作成] をクリックします。

  3. 新しいワークフローの名前を入力します(例: workflow-parallel-bqjobs)。

  4. 適切なリージョンを選択します(例: us-central1)。

  5. 先ほど作成したサービス アカウントを選択します。

  6. [次へ] をクリックします。

  7. ワークフロー エディタで、次のワークフローの定義を入力します。

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            parallel:
                shared: [results]
                for:
                    value: table
                    in: ${tables}
                    steps:
                    - logTable:
                        call: sys.log
                        args:
                            text: ${"Running query for table " + table}
                    - runQuery:
                        call: googleapis.bigquery.v2.jobs.query
                        args:
                            projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                            body:
                                useLegacySql: false
                                useQueryCache: false
                                timeoutMs: 30000
                                # Find top 100 titles with most views on Wikipedia
                                query: ${
                                    "SELECT TITLE, SUM(views)
                                    FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                    WHERE LENGTH(TITLE) > 10
                                    GROUP BY TITLE
                                    ORDER BY SUM(VIEWS) DESC
                                    LIMIT 100"
                                    }
                        result: queryResult
                    - returnResult:
                        assign:
                            # Return the top title from each table
                            - results[table]: {}
                            - results[table].title: ${queryResult.rows[0].f[0].v}
                            - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  8. [デプロイ] をクリックします。

  9. [ワークフローの詳細] ページで [ 実行] を選択します。

  10. もう一度 [Execute] をクリックします。

  11. ワークフローの結果が [出力] ペインに表示されます。

gcloud

  1. ターミナルを開き、ワークフローのソースコード ファイルを作成します。

    touch workflow-parallel-bqjobs.yaml
  2. 次のワークフローをソースコード ファイルにコピーします。

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            parallel:
                shared: [results]
                for:
                    value: table
                    in: ${tables}
                    steps:
                    - logTable:
                        call: sys.log
                        args:
                            text: ${"Running query for table " + table}
                    - runQuery:
                        call: googleapis.bigquery.v2.jobs.query
                        args:
                            projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                            body:
                                useLegacySql: false
                                useQueryCache: false
                                timeoutMs: 30000
                                # Find top 100 titles with most views on Wikipedia
                                query: ${
                                    "SELECT TITLE, SUM(views)
                                    FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                    WHERE LENGTH(TITLE) > 10
                                    GROUP BY TITLE
                                    ORDER BY SUM(VIEWS) DESC
                                    LIMIT 100"
                                    }
                        result: queryResult
                    - returnResult:
                        assign:
                            # Return the top title from each table
                            - results[table]: {}
                            - results[table].title: ${queryResult.rows[0].f[0].v}
                            - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  3. 次のコマンドを入力してワークフローをデプロイします。

    gcloud workflows deploy workflow-parallell-bqjobs \
       --source=workflow-parallel-bqjobs.yaml \
       --service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com

    MY_SERVICE_ACCOUNT@MY_PROJECT.iam.gserviceaccount.com は、先ほど作成したサービス アカウントのメールアドレスに置き換えます。

  4. ワークフローを実行します。

     gcloud workflows run workflow-parallel-bqjobs

結果は先ほどの出力と類似していますが、ワークフローの実行は約 20 秒以内に完了します。