這個範本可以建立串流管道,持續輪詢新上傳到 Cloud Storage 的文字檔案,並逐行讀取每個檔案,然後將字串發布至 Pub/Sub 主題。此外,這個範本還能以內含 JSON 記錄的換行符號分隔檔案或 CSV 檔案格式,將記錄發布到 Pub/Sub 主題進行即時處理;您也可以針對 Pub/Sub 重送資料。
管道會無限期持續執行,由於使用「Watch」轉換 (不支援排空的「SplittableDoFn」),因此必須透過「cancel」手動終止,而非「drain」。
目前的輪詢間隔固定為 10 秒一次。這個範本不會對個別記錄設定任何時間戳記,因此在執行期間,事件時間會等於發布時間。假如您的管道需要準確的時間才能處理作業,則請勿使用這個管道。
管道相關規定
- 輸入檔案必須是換行符號分隔的 JSON 或 CSV 格式。在來源檔案中跨越多行的記錄可能會導致下游出現問題,因為檔案中的每一行都會以一條訊息的形式發布至 Pub/Sub。
- 執行作業前必須先有 Pub/Sub 主題。
- 管道會無限期持續執行,直到手動終止。
範本參數
必要參數
- inputFilePattern:要讀取的輸入檔案模式,例如:
gs://bucket-name/files/*.json。 - outputTopic:要寫入的 Pub/Sub 輸入主題,名稱的格式必須為
projects/<PROJECT_ID>/topics/<TOPIC_NAME>。例如:projects/your-project-id/topics/your-topic-name。
執行範本
控制台
- 前往 Dataflow 的「Create job from template」(依據範本建立工作) 頁面。 前往「依範本建立工作」
- 在「Job name」(工作名稱) 欄位中,輸入專屬工作名稱。
- 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為
us-central1。如需可執行 Dataflow 工作的區域清單,請參閱「Dataflow 位置」。
- 從「Dataflow template」(Dataflow 範本) 下拉式選單中,選取「Text Files on Cloud Storage to Pub/Sub」(從 Cloud Storage 匯入文字檔到 Pub/Sub) (串流) 範本。
- 在提供的參數欄位中輸入參數值。
- 選用:如要從「僅需處理一次」切換至「至少一次串流模式」,請選取「At Least Once」(至少一次)。
- 按一下「Run Job」(執行工作)。
gcloud
在殼層或終端機中執行範本:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/ \ --region REGION_NAME\ --staging-location STAGING_LOCATION\ --parameters \ inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME
更改下列內容:
JOB_NAME:您選擇的不重複工作名稱REGION_NAME:您要部署 Dataflow 工作的區域,例如us-central1STAGING_LOCATION:用於暫存本機檔案的位置 (例如gs://your-bucket/staging)TOPIC_NAME:Pub/Sub 主題名稱BUCKET_NAME:Cloud Storage bucket 的名稱FILE_PATTERN:要從 Cloud Storage bucket 讀取資料的檔案模式 glob (例如path/*.csv)
API
如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/ { "jobName": "JOB_NAME", "environment": { "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" } }
更改下列內容:
PROJECT_ID:您要執行 Dataflow 工作的 Google Cloud 專案 IDJOB_NAME:您選擇的不重複工作名稱LOCATION:您要部署 Dataflow 工作的區域,例如us-central1STAGING_LOCATION:用於暫存本機檔案的位置 (例如gs://your-bucket/staging)TOPIC_NAME:Pub/Sub 主題名稱BUCKET_NAME:Cloud Storage bucket 的名稱FILE_PATTERN:要從 Cloud Storage bucket 讀取資料的檔案模式 glob (例如path/*.csv)
後續步驟
- 瞭解 Dataflow 範本。
- 請參閱 Google 提供的範本清單。