使用「For Each Parallel」工作將資料插入 BigQuery

在本教學課程中,您將建立 Application Integration 和子整合作業,處理一系列記錄。針對每筆記錄,主要整合會非同步叫用子整合,子整合會擷取每筆記錄的資料,並將資料插入 BigQuery 資料集中的資料表做為資料列。

在本教學課程中,您將完成下列工作:

事前準備

  • 確認您有權存取 Application Integration。
  • 在 Google Cloud 專案中執行下列操作:

    • 將下列角色授予要用來建立連線的服務帳戶:
      • roles/bigquery.dataEditor
      • roles/bigquery.readSessionUser
      • roles/secretmanager.viewer
      • roles/secretmanager.secretAccessor
    • 啟用下列服務:
      • secretmanager.googleapis.com (Secret Manager API)
      • connectors.googleapis.com (Connectors API)

      如果專案先前未啟用這些服務,系統會在「建立連線」頁面建立連線時,提示您啟用這些服務。

設定 BigQuery 連線

首先,請建立本教學課程中使用的 BigQuery 資料集和資料表。建立資料集和資料表後,請建立 BigQuery 連線。本教學課程稍後會使用這個連線進行整合。

設定 BigQuery 資料集和資料表

如要設定 BigQuery 資料集和資料表,請按照下列步驟操作:

  1. Cloud 控制台頁面中,選取 Google Cloud 專案。
  2. 如要從 Google Cloud 控制台啟動 Cloud Shell 工作階段,請按一下「啟用 Cloud Shell」圖示Cloud 控制台中的「啟用 Cloud Shell」圖示。系統會在 Google Cloud 控制台的底部窗格啟動工作階段。
  3. 如要啟用 BigQuery API,請在 Cloud Shell 終端機中輸入下列指令:
    export PROJECT_ID=project_id
    export REGION=region
    gcloud services enable --project "${PROJECT_ID}" \
        bigquery.googleapis.com \
        bigquerystorage.googleapis.com
    在這個指令中,請替換:
    • project_id 替換為 Google Cloud 專案的專案 ID。
    • region,並指定要用來建立 BigQuery 資料集的區域。
  4. 如要建立名為 bq_tutorial 的 BigQuery 資料集,請在 Cloud Shell 終端機中輸入下列指令:
          bq  --project_id ${PROJECT_ID} --location ${REGION} mk bq_tutorial
        
  5. 如要建立名為 tutorial 的 BigQuery 資料表,請在 Cloud Shell 終端機中輸入下列指令:
          bq --project_id ${PROJECT_ID} \
            query  \
            --nouse_legacy_sql \
          'create table bq_tutorial.tutorial (
          unique_key STRING NOT NULL,
          created_date STRING,
          closed_date STRING,
          agency STRING,
          agency_name STRING,
          complaint_type STRING,
          descriptor STRING,
          location_type STRING,
          incident_zip STRING,
          incident_address STRING,
          street_name STRING,
          cross_street_1 STRING,
          cross_street_2 STRING,
          intersection_street_1 STRING,
          intersection_street_2 STRING,
          address_type STRING,
          city STRING,
          landmark STRING,
          facility_type STRING,
          status STRING,
          due_date STRING,
          resolution_action_updated_date STRING,
          community_board STRING,
          borough STRING,
          x_coordinate_state_plane STRING,
          y_coordinate_state_plane STRING,
          park_facility_name STRING,
          park_borough STRING,
          school_name STRING,
          school_number STRING,
          school_region STRING,
          school_code STRING,
          school_phone_number STRING,
          school_address STRING,
          school_city STRING,
          school_state STRING,
          school_zip STRING,
          school_not_found STRING,
          school_or_citywide_complaint STRING,
          vehicle_type STRING,
          taxi_company_borough STRING,
          taxi_pick_up_location STRING,
          bridge_highway_name STRING,
          bridge_highway_direction STRING,
          bridge_highway_segment STRING,
          road_ramp STRING,
          garage_lot_name STRING,
          ferry_direction STRING,
          ferry_terminal_name STRING,
          latitude STRING,
          longitude STRING,
          location STRING
          ) '
      
  6. Verify that your BigQuery table is created.
    1. In the Cloud console page, click the Navigation menu.
    2. In the Analytics section, click BigQuery.
    3. Expand your project and confirm that the bq_tutorial dataset is listed.
    4. Expand the bq_tutorial dataset and confirm that the tutorial table is listed.
    5. Click the documents table to view the schema.

Create a BigQuery connection

Next, you'll create a BigQuery connection. A BigQuery connection lets you insert, read, update and delete rows in a BigQuery table and use the resulting output in an integration. After creating the BigQuery connection, you'll use this connection in an integration later in this tutorial to add rows to the BigQuery table.

To create a BigQuery connection, complete the following steps:

  1. In the Cloud console page, select your Google Cloud project.
  2. Open the connections page.
  3. Click + CREATE NEW to open the Create Connection page.
  4. Configure the connection:
    1. In the Create Connection section, complete the following:
      • Connector: Select BigQuery from the drop down list of available Connectors.
      • Connector version: Select the latest Connector version from the drop down list of available versions.
      • In the Connection Name field, enter a name for the Connection instance. For this tutorial, enter connector-bq-tutorial.
      • Optionally, add a Description of the connection instance.
      • Service Account: Select a service account that has the required roles.
      • Project ID: Enter the ID of the Google Cloud project where the BigQuery data resides.
      • Dataset ID: Enter the ID of the BigQuery dataset that you want to use. For this tutorial, enter bq_tutorial.
      • Optionally, click + ADD LABEL to add a label in the form of a key/value pair.
      • Click Next.
    2. Location: Select a region from where the connection will run. Supported regions for connectors include:

        For the list of all the supported regions, see Locations.

      • Click Next.
    3. Authentication: The BigQuery connection does not require authentication configuration. Click Next.
    4. Review: Review your connection's configuration details. In this section, the connection and authentication details of the new connection are displayed for your review.
  5. Click Create.

Set up a sub-integration

In this tutorial, the sub-integration takes each record sent to it by the main integration and inserts it as a row in the tutorial table in the bq_tutorial dataset.

Create a sub-integration

To create the sub-integration, complete the following steps:

  1. In the Google Cloud console, go to the Application Integration page.

    Go to Application Integration

  2. Click Integrations from the left navigation menu to open the Integrations page.
  3. Click Create integration.
  4. In the Create Integration dialog, do the following:
    • Enter a name, for example, enter Process-each-record
    • Optionally, enter a description. For example, enter API Trigger to process each record (sub-integration)
    • Select the region where you want to create your integration.
  5. Click Create to open the integration editor.

Add an API Trigger

To add an API Trigger to the integration, do the following:

  1. In the integration editor, select Add a task/trigger > Triggers to display a list of available triggers.
  2. Drag the API Trigger element to the integration editor.

Add a Data Mapping task

To add a Data Mapping task in the integration, complete the following steps:

  1. Select +Add a task/trigger > Tasks in the integration editor to display the list of available tasks.
  2. Drag the Data Mapping element to the integration editor.

Configure the BigQuery connection

Now you are ready to use the BigQuery connection that you created earlier in the sub-integration. To configure the BigQuery connection in this integration, complete the following steps:

  1. Select +Add a task/trigger > Tasks in the integration editor to display the list of available tasks.
  2. Drag the Connectors element to the integration editor.
  3. Click the Connectors task element on the designer to view the task configuration pane.
  4. Click the edit icon on the right panel and update the Label to Insert row to BigQuery.
  5. Click Configure task.

    The Configure connector task dialog appears.

  6. In the Configure connector task dialog, do the following:
    1. Select the connection region where you created your BigQuery connection.
    2. Select the BigQuery connection that you want to use. For this tutorial, select connector-bq-tutorial.
    3. Once a connection is chosen, the Type column appears. Select Entities and then tutorial from the list of available entities.
    4. Once a type is chosen, the Operation column appears. Select Create.
    5. Click Done to complete the connection configuration and close the dialog.

Connect the integration elements

Next, add edge connections to connect the API Trigger to the Data Mapping task and the Data Mapping task to the Connectors task. An edge connection is a connection between any two elements in an integration. For more information on edges and edge conditions, see Edges.

To add the edge connections, complete the following steps:

  1. Click the Fork control point at the bottom of the API Trigger element. Drag and drop the edge connection at the Join control point at the top of the Data Mapping element.
  2. Click the Fork control point at the bottom of the Data Mapping element. Drag and drop the edge connection at the Join control point at the top of the Connectors element.

Configure the Data Mapping task

To configure the Data Mapping task, complete the following steps:

  1. In the integration editor, click the Data Mapping task to view the task configuration pane.
  2. Click Open Data Mapping Editor.
  3. In the Data Mapping Editor, click Add to add a new variable.
  4. In the Create Variable dialog, enter the following information:
    • Name: Enter record.
    • Data Type: Select JSON.
    • Schema: Select Infer from a sample JSON payload. Enter the following sample JSON payload:
                  {
                    "unique_key":"304271",
                    "created_date":"02/06/2007 12:00:00 AM",
                    "closed_date":"03/01/2007 12:00:00 AM",
                    "agency":"TLC",
                    "agency_name":"Taxi and Limousine Commission",
                    "complaint_type":"Taxi Complaint",
                    "descriptor":"Driver Complaint",
                    "location_type":"Street",
                    "incident_zip":"10001",
                    "incident_address":"",
                    "street_name":"",
                    "cross_street_1":"",
                    "cross_street_2":"",
                    "intersection_street_1":"WEST 29 STREET",
                    "intersection_street_2":"7 AVENUE",
                    "address_type":"INTERSECTION",
                    "city":"NEW YORK",
                    "landmark":"",
                    "facility_type":"N/A",
                    "status":"Closed",
                    "due_date":"02/28/2007 12:00:00 AM",
                    "resolution_action_updated_date":"03/01/2007 12:00:00 AM",
                    "community_board":"05 MANHATTAN",
                    "borough":"MANHATTAN",
                    "x_coordinate_state_plane":"986215",
                    "y_coordinate_state_plane":"211740",
                    "park_facility_name":"",
                    "park_borough":"MANHATTAN",
                    "school_name":"",
                    "school_number":"",
                    "school_region":"",
                    "school_code":"",
                    "school_phone_number":"",
                    "school_address":"",
                    "school_city":"",
                    "school_state":"",
                    "school_zip":"",
                    "school_not_found":"",
                    "school_or_citywide_complaint":"",
                    "vehicle_type":"",
                    "taxi_company_borough":"",
                    "taxi_pick_up_location":"Other",
                    "bridge_highway_name":"",
                    "bridge_highway_direction":"",
                    "road_ramp":"",
                    "bridge_highway_segment":"",
                    "garage_lot_name":"",
                    "ferry_direction":"",
                    "ferry_terminal_name":"",
                    "latitude":"40.74785373937869",
                    "longitude":"-73.99290823133913",
                    "location":"(40.74785373937869, -73.99290823133913)"
                  }
                
    • 按一下 [Create] (建立)
    • 建立變數後,請在「資料對應編輯器」中完成下列步驟:
      • 將新的「記錄」變數拖曳到「輸入」欄。
      • connectorInputPayload 變數拖曳至「輸出」欄。
    • 關閉「資料對應編輯器」,返回整合編輯器。

發布子整合

如要發布子整合服務,請在整合服務編輯器中按一下「發布」

設定主要整合項目

在本節中,您會設定主要整合,使用「For Each Parallel」工作處理每筆記錄。接著,主要整合功能會針對每筆記錄叫用一次子整合功能。

建立主要整合

如要建立主要整合,請完成下列步驟:

  1. 前往 Google Cloud 控制台的「Application Integration」頁面。

    前往 Application Integration

  2. 按一下左側導覽選單中的「整合」,開啟「整合」頁面。
  3. 按一下「建立整合」
  4. 在「建立整合」對話方塊中,執行下列操作:
    • 輸入名稱,例如 process-records
    • 視需要輸入說明。舉例來說,輸入「API Trigger to process records (main integration)」(API 觸發條件,用於處理記錄 (主要整合))
    • 選取要建立整合的區域。
  5. 按一下「建立」,開啟整合編輯器。

新增 API 觸發條件

如要將 API 觸發條件新增至整合服務,請按照下列步驟操作:

  1. 在整合編輯器中,依序選取「新增工作/觸發條件」>「觸發條件」,即可顯示可用觸發條件清單。
  2. 將「API 觸發條件」元素拖曳至整合服務編輯器。

新增 For Each 並行工作

如要在整合中新增「For Each Parallel」工作,請完成下列步驟:

  1. 在整合服務編輯器中選取「+Add a task/trigger」(新增任務/觸發條件) >「Tasks」(任務),即可顯示可用任務清單。
  2. 將「For Each Parallel」元素拖曳至整合編輯器。

連結整合元素

接著,新增邊緣連線,將 API 觸發條件連結至「For Each Parallel」工作。

如要新增邊緣連線,請按一下「API 觸發條件」元素底部的「分叉」控制點。將邊緣連線拖曳至「For Each Parallel」工作元素頂端的「Join」控制點。

設定 For Each 並行工作

如要設定「For Each Parallel」工作,請完成下列步驟:

  1. 在整合編輯器中,按一下「For Each Parallel」(For Each 並行) 工作,即可查看工作設定窗格。
  2. 在「陣列選取」>「要疊代的清單」下方,按一下「新增變數」新增變數。
  3. 在「建立變數」對話方塊中,輸入下列資訊:
    • 名稱:輸入 records
    • 資料類型:選取「JSON」
    • 結構定義:選取「從 JSON 酬載示例推斷」。輸入下列 JSON 酬載範例:
                    [{
                      "unique_key":"304271",
                      "created_date":"02/06/2007 12:00:00 AM",
                      "closed_date":"03/01/2007 12:00:00 AM",
                      "agency":"TLC",
                      "agency_name":"Taxi and Limousine Commission",
                      "complaint_type":"Taxi Complaint",
                      "descriptor":"Driver Complaint",
                      "location_type":"Street",
                      "incident_zip":"10001",
                      "incident_address":"",
                      "street_name":"",
                      "cross_street_1":"",
                      "cross_street_2":"",
                      "intersection_street_1":"WEST 29 STREET",
                      "intersection_street_2":"7 AVENUE",
                      "address_type":"INTERSECTION",
                      "city":"NEW YORK",
                      "landmark":"",
                      "facility_type":"N/A",
                      "status":"Closed",
                      "due_date":"02/28/2007 12:00:00 AM",
                      "resolution_action_updated_date":"03/01/2007 12:00:00 AM",
                      "community_board":"05 MANHATTAN",
                      "borough":"MANHATTAN",
                      "x_coordinate_state_plane":"986215",
                      "y_coordinate_state_plane":"211740",
                      "park_facility_name":"",
                      "park_borough":"MANHATTAN",
                      "school_name":"",
                      "school_number":"",
                      "school_region":"",
                      "school_code":"",
                      "school_phone_number":"",
                      "school_address":"",
                      "school_city":"",
                      "school_state":"",
                      "school_zip":"",
                      "school_not_found":"",
                      "school_or_citywide_complaint":"",
                      "vehicle_type":"",
                      "taxi_company_borough":"",
                      "taxi_pick_up_location":"Other",
                      "bridge_highway_name":"",
                      "bridge_highway_direction":"",
                      "road_ramp":"",
                      "bridge_highway_segment":"",
                      "garage_lot_name":"",
                      "ferry_direction":"",
                      "ferry_terminal_name":"",
                      "latitude":"40.74785373937869",
                      "longitude":"-73.99290823133913",
                      "location":"(40.74785373937869, -73.99290823133913)"
                    }]
                  
  4. 按一下 [建立]。
  5. 在「子整合詳細資料」部分,輸入下列資訊:
    • API 觸發條件 ID:選取子整合中的 API 觸發條件元素。例如,選取「Process-each-record_API_1」Process-each-record_API_1
    • 執行策略:選取「ASYNC」
    • 選取「執行單一整合」
  6. 在「On each execution」(每次執行時) 部分,針對「Where to map individual array elements」(要將個別陣列元素對應至何處),在子整合中輸入資料對應工作的變數名稱。在此情況下,請輸入 record。只有已發布的整合服務,才會列出子整合變數。如果沒有列出變數,請重新整理頁面。發布子整合後,變數需要一段時間才會顯示。

發布主要整合項目

如要發布主要整合服務,請在整合服務編輯器中按一下「發布」

測試整合功能

如要測試整合,請完成下列步驟:

  1. 將範例資料下載至 Cloud Shell:
    1. 如要從 Google Cloud 控制台啟動 Cloud Shell 工作階段,請按一下「啟用 Cloud Shell」圖示 Cloud 控制台中的「啟用 Cloud Shell」圖示。系統會在 Google Cloud 控制台的底部窗格啟動工作階段。
    2. 在 Cloud Shell 終端機中輸入下列指令:
      wget https://raw.githubusercontent.com/GoogleCloudPlatform/application-integration-samples/main/assets/bq-sample-dataset.json
              
    3. 如要確認範例資料是否已下載,請在 Cloud Shell 終端機中輸入下列指令:
      ls -la bq-sample-dataset.json
      下載的檔案會列在 Cloud Shell 終端機中。
  2. 如要從範例資料集中選取三個隨機項目,並以可傳遞至整合服務的方式儲存,請在 Cloud Shell 終端機中輸入下列指令:
    AUTH=$(gcloud auth print-access-token)
    export SAMPLE_DOCS=$(jq $(r=$((RANDOM % 1000)) ; echo ".[$r:$((r + 3))]") < bq-sample-dataset.json | jq -Rs '.')
                
    generate_post_data()
      {
        cat <<EOF
          {
            "triggerId": "api_trigger/process-records_API_1",
            "inputParameters": 
              {
                "records": 
                  {
                    "jsonValue": $SAMPLE_DOCS
                  }
              }
          }
          EOF
      }
  3. 如要開始測試,請在 Cloud Shell 終端機中輸入下列指令:
    curl -X POST \
      https://integrations.googleapis.com/v1/projects/project_id/locations/region/integrations/process-records:execute \
      -H "Authorization: Bearer $AUTH" \
      -H "Content-Type: application/json" \
      -d "$(generate_post_data)"
    在這個指令中,請替換:
    • project_id 替換為 Google Cloud 專案的專案 ID。
    • region 則替換為您建立整合的地區。
    這個指令會叫用主要整合功能,並將範例資料集中的項目傳遞至主要整合功能。主要整合服務接著會將每個項目傳遞至子整合服務,後者會在 BigQuery 資料表中新增資料列。
  4. 如要確認 BigQuery 資料表現在是否包含這些記錄,請按照下列步驟操作:
    1. Cloud 控制台頁面中, 點選「導覽選單」
    2. 在「數據分析」部分,點選「BigQuery」
    3. 展開專案,然後按一下 bq_tutorial 資料集。
    4. 展開 bq_tutorial 資料集,然後點選 tutorial 資料表。
    5. 按一下「資料表探索工具」分頁標籤,即可查看插入的記錄。

後續步驟

請嘗試使用其他連接器建立整合。如需所有支援的連結器清單,請參閱 連結器參考資料