這個範本會建立批次管道,從 MongoDB 讀取文件並寫入 BigQuery。
如要擷取 MongoDB 變更串流資料,可以使用 MongoDB to BigQuery (CDC) 範本。
管道相關規定
- 目標 BigQuery 資料集必須存在。
- Dataflow 工作站機器必須能夠存取來源 MongoDB 執行個體。
輸出格式
輸出記錄的格式取決於 userOption 參數的值。如果 userOption 是 NONE,輸出內容會採用下列結構定義。source_data 欄位包含 JSON 格式的文件。
[ {"name":"id","type":"STRING"}, {"name":"source_data","type":"STRING"}, {"name":"timestamp","type":"TIMESTAMP"} ]
如果 userOption 為 FLATTEN,管道會將文件扁平化,並將頂層欄位寫入為資料表欄。舉例來說,假設 MongoDB 集合中的文件包含下列欄位:
"_id"(string)"title"(string)"genre"(string)
使用 FLATTEN 時,輸出內容會採用下列結構定義。範本會新增 timestamp 欄位。
[ {"name":"_id","type":"STRING"}, {"name":"title","type":"STRING"}, {"name":"genre","type":"STRING"}, {"name":"timestamp","type":"TIMESTAMP"} ]
如果 userOption 為 JSON,管道會以 BigQuery JSON 格式儲存文件。BigQuery 內建支援使用 JSON 資料類型的 JSON 資料。詳情請參閱在 GoogleSQL 中使用 JSON 資料。
範本參數
必要參數
- mongoDbUri:MongoDB 連線 URI,格式為
mongodb+srv://:@.。 - 資料庫:要從中讀取集合的 MongoDB 資料庫。例如:
my-db。 - collection:MongoDB 資料庫中的集合名稱。例如:
my-collection。 - userOption:
FLATTEN、JSON或NONE。FLATTEN會將文件扁平化至單一層級。JSON會以 BigQuery JSON 格式儲存文件。NONE會將整份文件儲存為 JSON 格式的字串。預設值為「NONE」。 - outputTableSpec:要寫入的 BigQuery 資料表。例如:
bigquery-project:dataset.output_table。
選用參數
- KMSEncryptionKey:用來解密 MongoDB URI 連線字串的 Cloud KMS 加密金鑰。如果傳入 Cloud KMS 金鑰,則必須以加密方式傳送 MongoDB URI 連線字串。例如:
projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key。 - filter:JSON 格式的 Bson 篩選器。例如:
{ "val": { $gt: 0, $lt: 9 }}。 - useStorageWriteApi:如果為
true,管道會使用 BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api)。預設值為false。詳情請參閱「使用 Storage Write API」(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)。 - useStorageWriteApiAtLeastOnce:使用 Storage Write API 時,指定寫入語意。如要使用「至少一次」語意 (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics),請將這個參數設為
true。如要使用「僅限一次」語意,請將參數設為false。只有在useStorageWriteApi為true時,這個參數才會生效。預設值為false。 - bigQuerySchemaPath:BigQuery JSON 結構定義的 Cloud Storage 路徑。例如:
gs://your-bucket/your-schema.json。 - javascriptDocumentTransformGcsPath:定義要使用的 JavaScript 使用者定義函式 (UDF) 的
.js檔案 Cloud Storage URI。例如:gs://your-bucket/your-transforms/*.js。 - javascriptDocumentTransformFunctionName:要使用的 JavaScript 使用者定義函式 (UDF) 名稱。舉例來說,如果您的 JavaScript 函式程式碼是
myTransform(inJson) { /*...do stuff...*/ },則函式名稱就是 myTransform。如需 JavaScript UDF 範例,請參閱 UDF 範例 (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)。例如:transform。
使用者定義函式
您也可以選擇以 JavaScript 編寫使用者定義函式 (UDF),擴充這個範本。範本會針對每個輸入元素呼叫 UDF。 元素酬載會序列化為 JSON 字串。
如要使用 UDF,請將 JavaScript 檔案上傳至 Cloud Storage,並設定下列範本參數:
| 參數 | 說明 |
|---|---|
javascriptDocumentTransformGcsPath |
JavaScript 檔案的 Cloud Storage 位置。 |
javascriptDocumentTransformFunctionName |
JavaScript 函式的名稱。 |
詳情請參閱「為 Dataflow 範本建立使用者定義函式」。
函式規格
UDF 的規格如下:
userOption 為 NONE,JSON 物件必須包含名為 _id 的屬性,其中包含文件 ID。執行範本
控制台
- 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。 前往「依範本建立工作」
- 在「Job name」(工作名稱) 欄位中,輸入不重複的工作名稱。
- 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為
us-central1。如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。
- 從「Dataflow template」(Dataflow 範本) 下拉式選單中,選取 the MongoDB to BigQuery template。
- 在提供的參數欄位中輸入參數值。
- 按一下「Run Job」(執行工作)。
gcloud
在殼層或終端機中執行範本:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery \ --parameters \ outputTableSpec=OUTPUT_TABLE_SPEC,\ mongoDbUri=MONGO_DB_URI,\ database=DATABASE,\ collection=COLLECTION,\ userOption=USER_OPTION
更改下列內容:
PROJECT_ID: 您要執行 Dataflow 工作的 Google Cloud 專案 IDJOB_NAME: 您選擇的不重複工作名稱REGION_NAME:您要部署 Dataflow 工作的區域,例如us-central1VERSION:您要使用的範本版本您可以使用下列值:
latest使用最新版範本,該範本位於值區中非依日期命名的上層資料夾: gs://dataflow-templates-REGION_NAME/latest/- 版本名稱,例如
2023-09-12-00_RC00,可使用特定版本的範本,該範本會以巢狀結構存放在值區中相應的依日期命名上層資料夾中:gs://dataflow-templates-REGION_NAME/
OUTPUT_TABLE_SPEC:目標 BigQuery 資料表名稱。MONGO_DB_URI:您的 MongoDB URI。DATABASE:您的 MongoDB 資料庫。COLLECTION:您的 MongoDB 集合。USER_OPTION:FLATTEN、JSON 或 NONE。
API
如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "mongoDbUri": "MONGO_DB_URI", "database": "DATABASE", "collection": "COLLECTION", "userOption": "USER_OPTION" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery", } }
更改下列內容:
PROJECT_ID: 您要執行 Dataflow 工作的 Google Cloud 專案 IDJOB_NAME: 您選擇的不重複工作名稱LOCATION:您要部署 Dataflow 工作的區域,例如us-central1VERSION:您要使用的範本版本您可以使用下列值:
latest使用最新版範本,該範本位於值區中非依日期命名的上層資料夾: gs://dataflow-templates-REGION_NAME/latest/- 版本名稱,例如
2023-09-12-00_RC00,可使用特定版本的範本,該範本會以巢狀結構存放在值區中相應的依日期命名上層資料夾中:gs://dataflow-templates-REGION_NAME/
OUTPUT_TABLE_SPEC:目標 BigQuery 資料表名稱。MONGO_DB_URI:您的 MongoDB URI。DATABASE:您的 MongoDB 資料庫。COLLECTION:您的 MongoDB 集合。USER_OPTION:FLATTEN、JSON 或 NONE。
後續步驟
- 瞭解 Dataflow 範本。
- 請參閱 Google 提供的範本清單。