הכנסת נתונים ל-BigQuery באמצעות משימה מקבילית מסוג For Each
במדריך הזה תיצרו Application Integration ושילוב משנה כדי לעבד סדרה של רשומות. לכל רשומה, השילוב הראשי מפעיל באופן אסינכרוני את השילוב המשני, שלוקח את הנתונים של כל רשומה ומוסיף אותם כשורה בטבלה במערך נתונים ב-BigQuery.
במדריך הזה תבצעו את הפעולות הבאות:
לפני שמתחילים
- מוודאים שיש לכם גישה ל-Application Integration.
-
מבצעים את הפעולות הבאות בפרויקט Google Cloud:
- מקצים לחשבון השירות שבו רוצים להשתמש כדי ליצור את החיבור את התפקידים הבאים:
roles/bigquery.dataEditorroles/bigquery.readSessionUserroles/secretmanager.viewerroles/secretmanager.secretAccessor
- מפעילים את השירותים הבאים:
-
secretmanager.googleapis.com(Secret Manager API) -
connectors.googleapis.com(Connectors API)
אם השירותים האלה לא הופעלו בפרויקט שלכם בעבר, תוצג לכם בקשה להפעיל אותם כשתיצרו את החיבור בדף 'יצירת חיבור'.
-
- מקצים לחשבון השירות שבו רוצים להשתמש כדי ליצור את החיבור את התפקידים הבאים:
הגדרת חיבור ל-BigQuery
מתחילים ביצירת מערך הנתונים והטבלה ב-BigQuery שבהם נשתמש במדריך הזה. אחרי שיוצרים את מערך הנתונים והטבלה, יוצרים חיבור ל-BigQuery. בהמשך המדריך הזה תשתמשו בחיבור הזה בשילוב.
הגדרת מערך נתונים וטבלה ב-BigQuery
כדי להגדיר את מערך הנתונים והטבלה ב-BigQuery, מבצעים את השלבים הבאים:
- בדף מסוף Cloud, בוחרים את הפרויקט. Google Cloud
- כדי להפעיל סשן של Cloud Shell מ Google Cloud המסוף, לוחצים על הסמל
Activate Cloud Shell במסוף Cloud. ייפתח סשן בחלונית התחתונה של Google Cloud המסוף.
-
כדי להפעיל את BigQuery APIs, מזינים את הפקודות הבאות בטרמינל של Cloud Shell:
בפקודה הזו, מחליפים את:export PROJECT_ID=project_id export REGION=region gcloud services enable --project "${PROJECT_ID}" \ bigquery.googleapis.com \ bigquerystorage.googleapis.com-
project_idבמזהה הפרויקט של הפרויקט. Google Cloud regionעם האזור שבו רוצים להשתמש כדי ליצור את מערך הנתונים ב-BigQuery.
-
- כדי ליצור מערך נתונים ב-BigQuery בשם
bq_tutorial, מזינים את הפקודה הבאה במסוף Cloud Shell:bq --project_id ${PROJECT_ID} --location ${REGION} mk bq_tutorial - כדי ליצור טבלה ב-BigQuery בשם
tutorial, מזינים את הפקודה הבאה במסוף Cloud Shell:bq --project_id ${PROJECT_ID} \ query \ --nouse_legacy_sql \ 'create table bq_tutorial.tutorial ( unique_key STRING NOT NULL, created_date STRING, closed_date STRING, agency STRING, agency_name STRING, complaint_type STRING, descriptor STRING, location_type STRING, incident_zip STRING, incident_address STRING, street_name STRING, cross_street_1 STRING, cross_street_2 STRING, intersection_street_1 STRING, intersection_street_2 STRING, address_type STRING, city STRING, landmark STRING, facility_type STRING, status STRING, due_date STRING, resolution_action_updated_date STRING, community_board STRING, borough STRING, x_coordinate_state_plane STRING, y_coordinate_state_plane STRING, park_facility_name STRING, park_borough STRING, school_name STRING, school_number STRING, school_region STRING, school_code STRING, school_phone_number STRING, school_address STRING, school_city STRING, school_state STRING, school_zip STRING, school_not_found STRING, school_or_citywide_complaint STRING, vehicle_type STRING, taxi_company_borough STRING, taxi_pick_up_location STRING, bridge_highway_name STRING, bridge_highway_direction STRING, bridge_highway_segment STRING, road_ramp STRING, garage_lot_name STRING, ferry_direction STRING, ferry_terminal_name STRING, latitude STRING, longitude STRING, location STRING ) ' -
Verify that your BigQuery table is created.
- In the Cloud console page, click the Navigation menu.
- In the Analytics section, click BigQuery.
-
Expand your project and confirm that the
bq_tutorialdataset is listed. -
Expand the bq_tutorial dataset and confirm that the
tutorialtable is listed. - Click the documents table to view the schema.
Create a BigQuery connection
Next, you'll create a BigQuery connection. A BigQuery connection lets you insert, read, update and delete rows in a BigQuery table and use the resulting output in an integration. After creating the BigQuery connection, you'll use this connection in an integration later in this tutorial to add rows to the BigQuery table.
To create a BigQuery connection, complete the following steps:
- In the Cloud console page, select your Google Cloud project.
- Open the connections page.
- Click + CREATE NEW to open the Create Connection page.
- Configure the connection:
- In the Create Connection section, complete the following:
- Connector: Select BigQuery from the drop down list of available Connectors.
- Connector version: Select the latest Connector version from the drop down list of available versions.
- In the Connection Name field, enter a name for the Connection instance. For this tutorial, enter connector-bq-tutorial.
- Optionally, add a Description of the connection instance.
- Service Account: Select a service account that has the required roles.
- Project ID: Enter the ID of the Google Cloud project where the BigQuery data resides.
- Dataset ID: Enter the ID of the BigQuery dataset that you want to use. For this tutorial, enter bq_tutorial.
- Optionally, click + ADD LABEL to add a label in the form of a key/value pair.
- Click Next.
- Location: Select a region from where the connection will run. Supported
regions for connectors include:
- Click Next.
For the list of all the supported regions, see Locations.
- Authentication: The BigQuery connection does not require authentication configuration. Click Next.
- Review: Review your connection's configuration details. In this section, the connection and authentication details of the new connection are displayed for your review.
- In the Create Connection section, complete the following:
- Click Create.
Set up a sub-integration
In this tutorial, the sub-integration takes each record sent to it by the main integration and inserts it as a row in the tutorial table in the bq_tutorial dataset.
Create a sub-integration
To create the sub-integration, complete the following steps:
- In the Google Cloud console, go to the Application Integration page.
- Click Integrations from the left navigation menu to open the Integrations page.
- Click Create integration.
- In the Create Integration dialog, do the following:
- Enter a name, for example, enter Process-each-record
- Optionally, enter a description. For example, enter API Trigger to process each record (sub-integration)
- Select the region where you want to create your integration.
- Click Create to open the integration editor.
Add an API Trigger
To add an API Trigger to the integration, do the following:
- In the integration editor, select Add a task/trigger > Triggers to display a list of available triggers.
- Drag the API Trigger element to the integration editor.
Add a Data Mapping task
To add a Data Mapping task in the integration, complete the following steps:
- Select +Add a task/trigger > Tasks in the integration editor to display the list of available tasks.
- Drag the Data Mapping element to the integration editor.
Configure the BigQuery connection
Now you are ready to use the BigQuery connection that you created earlier in the sub-integration. To configure the BigQuery connection in this integration, complete the following steps:
- Select +Add a task/trigger > Tasks in the integration editor to display the list of available tasks.
- Drag the Connectors element to the integration editor.
- Click the Connectors task element on the designer to view the task configuration pane.
- Click the edit icon on the right panel and update the Label to Insert row to BigQuery.
- Click Configure task.
The Configure connector task dialog appears.
- In the Configure connector task dialog, do the following:
- Select the connection region where you created your BigQuery connection.
- Select the BigQuery connection that you want to use. For this tutorial, select connector-bq-tutorial.
- Once a connection is chosen, the Type column appears. Select Entities and then tutorial from the list of available entities.
- Once a type is chosen, the Operation column appears. Select Create.
- Click Done to complete the connection configuration and close the dialog.
Connect the integration elements
Next, add edge connections to connect the API Trigger to the Data Mapping task and the Data Mapping task to the Connectors task. An edge connection is a connection between any two elements in an integration. For more information on edges and edge conditions, see Edges.
To add the edge connections, complete the following steps:
- Click the Fork control point at the bottom of the API Trigger element. Drag and drop the edge connection at the Join control point at the top of the Data Mapping element.
- Click the Fork control point at the bottom of the Data Mapping element. Drag and drop the edge connection at the Join control point at the top of the Connectors element.
Configure the Data Mapping task
To configure the Data Mapping task, complete the following steps:
- In the integration editor, click the Data Mapping task to view the task configuration pane.
- Click Open Data Mapping Editor.
- In the Data Mapping Editor, click Add to add a new variable.
- In the Create Variable dialog, enter the following information:
- Name: Enter record.
- Data Type: Select JSON.
-
Schema: Select Infer from a sample JSON payload. Enter the following sample JSON payload:
{ "unique_key":"304271", "created_date":"02/06/2007 12:00:00 AM", "closed_date":"03/01/2007 12:00:00 AM", "agency":"TLC", "agency_name":"Taxi and Limousine Commission", "complaint_type":"Taxi Complaint", "descriptor":"Driver Complaint", "location_type":"Street", "incident_zip":"10001", "incident_address":"", "street_name":"", "cross_street_1":"", "cross_street_2":"", "intersection_street_1":"WEST 29 STREET", "intersection_street_2":"7 AVENUE", "address_type":"INTERSECTION", "city":"NEW YORK", "landmark":"", "facility_type":"N/A", "status":"Closed", "due_date":"02/28/2007 12:00:00 AM", "resolution_action_updated_date":"03/01/2007 12:00:00 AM", "community_board":"05 MANHATTAN", "borough":"MANHATTAN", "x_coordinate_state_plane":"986215", "y_coordinate_state_plane":"211740", "park_facility_name":"", "park_borough":"MANHATTAN", "school_name":"", "school_number":"", "school_region":"", "school_code":"", "school_phone_number":"", "school_address":"", "school_city":"", "school_state":"", "school_zip":"", "school_not_found":"", "school_or_citywide_complaint":"", "vehicle_type":"", "taxi_company_borough":"", "taxi_pick_up_location":"Other", "bridge_highway_name":"", "bridge_highway_direction":"", "road_ramp":"", "bridge_highway_segment":"", "garage_lot_name":"", "ferry_direction":"", "ferry_terminal_name":"", "latitude":"40.74785373937869", "longitude":"-73.99290823133913", "location":"(40.74785373937869, -73.99290823133913)" } - לוחצים על יצירה.
- אחרי שיוצרים את המשתנה, מבצעים את השלבים הבאים בכלי לעריכת מיפוי נתונים:
- גוררים את משתנה התיעוד החדש לעמודה קלט.
- גוררים את המשתנה connectorInputPayload לעמודה פלט.
- סוגרים את הכלי לעריכת מיפוי נתונים כדי לחזור לכלי לעריכת שילובים.
פרסום השילוב המשני
כדי לפרסם את השילוב המשני, בעורך השילובים לוחצים על פרסום.
הגדרת השילוב הראשי
בקטע הזה מגדירים את השילוב הראשי, שמשתמש במשימה For Each Parallel כדי לעבד כל רשומה. השילוב הראשי מפעיל את השילוב המשני פעם אחת לכל רשומה.
יצירת השילוב הראשי
כדי ליצור את השילוב הראשי, מבצעים את השלבים הבאים:
- במסוף Google Cloud, עוברים לדף Application Integration.
- בתפריט הניווט שמימין, לוחצים על שילובים כדי לפתוח את הדף שילובים.
- לוחצים על יצירת שילוב.
- בתיבת הדו-שיח Create Integration:
- מזינים שם, למשל process-records.
- אפשר גם להוסיף תיאור. לדוגמה, מזינים API Trigger to process records (main integration)
- בוחרים את האזור שבו רוצים ליצור את השילוב.
- לוחצים על יצירה כדי לפתוח את כלי העריכה של השילוב.
הוספת טריגר API
כדי להוסיף טריגר API לשילוב:
- בכלי לעריכת שילובים, בוחרים באפשרות הוספת משימה או טריגר > טריגרים כדי להציג רשימה של טריגרים זמינים.
- גוררים את הרכיב API Trigger אל עורך השילוב.
הוספת משימה מסוג For Each Parallel
כדי להוסיף משימה מסוג For Each Parallel לשילוב, מבצעים את השלבים הבאים:
- בוחרים באפשרות +הוספת משימה או טריגר > משימות בכלי לעריכת שילובים כדי להציג את רשימת המשימות הזמינות.
- גוררים את הרכיב For Each Parallel אל כלי העריכה של השילוב.
חיבור רכיבי השילוב
בשלב הבא, מוסיפים חיבור קצה כדי לחבר את ההפעלה של ה-API למשימה For Each Parallel.
כדי להוסיף את חיבור הקצה, לוחצים על נקודת הבקרה Fork בתחתית של רכיב API Trigger. גוררים את חיבור הקצה אל נקודת הבקרה Join בחלק העליון של רכיב המשימה For Each Parallel.
הגדרת המשימה 'לכל אחד במקביל'
כדי להגדיר את המשימה For Each Parallel:
- בכלי לעריכת שילובים, לוחצים על המשימה For Each Parallel כדי לראות את חלונית הגדרת המשימה.
- בקטע Array Selection > List to iterate (בחירת מערך > רשימה לאיטרציה), לוחצים על Add new variable (הוספת משתנה חדש) כדי להוסיף משתנה חדש.
- בתיבת הדו-שיח Create Variable, מזינים את הפרטים הבאים:
-
שם: מזינים
records - Data Type (סוג הנתונים): בוחרים באפשרות JSON.
-
סכימה: בוחרים באפשרות הסקת מסקנות מתוך מטען ייעודי (payload) של JSON לדוגמה. מזינים את המטען הייעודי (payload) הבא של JSON לדוגמה:
[{ "unique_key":"304271", "created_date":"02/06/2007 12:00:00 AM", "closed_date":"03/01/2007 12:00:00 AM", "agency":"TLC", "agency_name":"Taxi and Limousine Commission", "complaint_type":"Taxi Complaint", "descriptor":"Driver Complaint", "location_type":"Street", "incident_zip":"10001", "incident_address":"", "street_name":"", "cross_street_1":"", "cross_street_2":"", "intersection_street_1":"WEST 29 STREET", "intersection_street_2":"7 AVENUE", "address_type":"INTERSECTION", "city":"NEW YORK", "landmark":"", "facility_type":"N/A", "status":"Closed", "due_date":"02/28/2007 12:00:00 AM", "resolution_action_updated_date":"03/01/2007 12:00:00 AM", "community_board":"05 MANHATTAN", "borough":"MANHATTAN", "x_coordinate_state_plane":"986215", "y_coordinate_state_plane":"211740", "park_facility_name":"", "park_borough":"MANHATTAN", "school_name":"", "school_number":"", "school_region":"", "school_code":"", "school_phone_number":"", "school_address":"", "school_city":"", "school_state":"", "school_zip":"", "school_not_found":"", "school_or_citywide_complaint":"", "vehicle_type":"", "taxi_company_borough":"", "taxi_pick_up_location":"Other", "bridge_highway_name":"", "bridge_highway_direction":"", "road_ramp":"", "bridge_highway_segment":"", "garage_lot_name":"", "ferry_direction":"", "ferry_terminal_name":"", "latitude":"40.74785373937869", "longitude":"-73.99290823133913", "location":"(40.74785373937869, -73.99290823133913)" }]
-
שם: מזינים
- לוחצים על יצירה.
- בקטע Sub-integration Details (פרטי שילוב משני), מזינים את הפרטים הבאים:
- API Trigger ID: בוחרים את רכיב ה-API Trigger (טריגר API) בשילוב המשני. לדוגמה, בוחרים באפשרות Process-each-record_API_1.
- אסטרטגיית ביצוע: בוחרים באפשרות ASYNC.
- בוחרים באפשרות הפעלת שילוב יחיד.
- בקטע On each execution (בכל הפעלה), בשדה Where to map individual array elements (איפה למפות רכיבי מערך בודדים), מזינים את שם המשתנה במשימת מיפוי הנתונים בשילוב המשנה. במקרה כזה, מזינים record. משתני השילוב המשני מפורטים רק בשילובים שפורסמו. אם המשתנים לא מופיעים, צריך לרענן את הדף, כי לוקח זמן עד שהמשתנים מופיעים אחרי פרסום שילוב המשנה.
פרסום השילוב הראשי
כדי לפרסם את השילוב הראשי, לוחצים על פרסום בכלי לעריכת שילובים.
בדיקת השילוב
כדי לבדוק את השילוב, פועלים לפי השלבים הבאים:
- מורידים נתוני דוגמה ל-Cloud Shell:
- כדי להפעיל סשן של Cloud Shell מ Google Cloud המסוף, לוחצים על הסמל
Activate Cloud Shell במסוף Cloud. כך מפעילים סשן בחלונית התחתונה של מסוף Google Cloud .
- מזינים את הפקודה הבאה בטרמינל של Cloud Shell:
wget https://raw.githubusercontent.com/GoogleCloudPlatform/application-integration-samples/main/assets/bq-sample-dataset.json - כדי לוודא שהנתונים לדוגמה הורדו, מזינים את הפקודה הבאה בטרמינל של Cloud Shell:
הקובץ שהורדתם מופיע בטרמינל של Cloud Shell.ls -la bq-sample-dataset.json
- כדי להפעיל סשן של Cloud Shell מ Google Cloud המסוף, לוחצים על הסמל
- כדי לבחור שלוש רשומות אקראיות ממערך הנתונים לדוגמה ולאחסן אותן כך שאפשר יהיה להעביר אותן לשילוב, מזינים את הפקודות הבאות במסוף Cloud Shell:
AUTH=$(gcloud auth print-access-token) export SAMPLE_DOCS=$(jq $(r=$((RANDOM % 1000)) ; echo ".[$r:$((r + 3))]") < bq-sample-dataset.json | jq -Rs '.') generate_post_data() { cat <<EOF { "triggerId": "api_trigger/process-records_API_1", "inputParameters": { "records": { "jsonValue": $SAMPLE_DOCS } } } EOF } - כדי להתחיל בבדיקה, מזינים את הפקודה הבאה במסוף Cloud Shell:
בפקודה הזו, מחליפים את:curl -X POST \ https://integrations.googleapis.com/v1/projects/project_id/locations/region/integrations/process-records:execute \ -H "Authorization: Bearer $AUTH" \ -H "Content-Type: application/json" \ -d "$(generate_post_data)"
-
project_idבמזהה הפרויקט של הפרויקט. Google Cloud regionבאזור שבו יצרתם את השילוב.
-
- כדי לוודא שהטבלה ב-BigQuery מכילה עכשיו את הרשומות האלה, מבצעים את השלבים הבאים:
- בדף מסוף Cloud, לוחצים על תפריט הניווט.
- בקטע Analytics, לוחצים על BigQuery.
-
מרחיבים את הפרויקט ולוחצים על מערך הנתונים
bq_tutorial. -
מרחיבים את מערך הנתונים bq_tutorial ולוחצים על הטבלה
tutorial. - לוחצים על הכרטיסייה Table Explorer כדי לראות את הרשומות שהוכנסו.
המאמרים הבאים
אפשר לנסות ליצור שילובים עם מחברים אחרים. רשימה של כל המחברים הנתמכים מופיעה כאן.