Spanner 變更串流到來源資料庫範本

串流管道。從 Spanner 變更串流讀取資料,並寫入來源。

範本參數

參數 說明
changeStreamName 管道讀取的 Spanner 變更串流名稱。
instanceId 變更串流所在的 Spanner 執行個體名稱。
databaseId 變更串流監控的 Spanner 資料庫名稱。
spannerProjectId Spanner 專案名稱。
metadataInstance 這個執行個體會儲存連接器使用的中繼資料,以控管變更串流 API 資料的用量。
metadataDatabase 資料庫:用於儲存連接器使用的中繼資料,以控管變更串流 API 資料的用量。
sourceShardsFilePath Cloud Storage 檔案路徑,內含來源分片的連線設定檔資訊。
startTimestamp 選用:朗讀的開始時間戳記會變更。預設為空白。
endTimestamp 選用:讀取變更的結束時間戳記。如未提供時間戳記,則會無限期讀取。預設為空白。
shadowTablePrefix 選用:用於命名影子資料表的前置字串。預設值:shadow_
sessionFilePath 選填:Cloud Storage 中的工作階段路徑,內含 HarbourBridge 的對應資訊。
filtrationMode 選用:篩選模式。指定如何根據條件捨棄特定記錄。支援的模式包括:none (不篩選任何內容)、forward_migration (篩選使用轉送遷移管道寫入的記錄)。預設值為 forward_migration
shardingCustomJarPath 選用:Cloud Storage 中自訂 JAR 檔案的位置,其中包含擷取分片 ID 的自訂邏輯。如果設定這個參數,請設定 shardingCustomJarPath 參數。預設為空白。
shardingCustomClassName 選用:具有自訂分片 ID 實作的完整類別名稱。如果指定 shardingCustomJarPath,則必須提供這項參數。預設為空白。
shardingCustomParameters 選用:包含要傳遞至自訂分片類別的任何自訂參數的字串。預設為空白。
sourceDbTimezoneOffset 選用:來源資料庫與世界標準時間的時差。範例值:+10:00。預設值為 +00:00
dlqGcsPubSubSubscription 選用:在一般模式下執行時,Cloud Storage 通知政策中使用的 Pub/Sub 訂閱項目,適用於 DLQ 重試目錄。名稱格式應為 projects/<project-id>/subscriptions/<subscription-name>。設定後,系統會忽略 deadLetterQueueDirectory 和 dlqRetryMinutes。
skipDirectoryName 選用:從反向複製作業中略過的記錄會寫入這個目錄。預設目錄名稱為 skip。
maxShardConnections 選用:特定分片可接受的連線數上限。預設值為 10000
deadLetterQueueDirectory 選用:儲存錯誤佇列輸出內容時使用的路徑。預設路徑是 Dataflow 工作暫時位置下的目錄。
dlqMaxRetryCount 選用:透過死信佇列重試暫時性錯誤的次數上限。預設值為 500。
runMode 選用:執行模式類型。支援的值:regularretryDLQ。預設值:regular。指定 retryDLQ,只重試嚴重無效信件佇列記錄。
dlqRetryMinutes 選用:無效信件佇列重試的間隔分鐘數。預設值為 10。

執行範本

控制台

  1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
  2. 前往「依範本建立工作」
  3. 在「Job name」(工作名稱) 欄位中,輸入不重複的工作名稱。
  4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

    如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

  5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中,選取 the Spanner Change Streams to Source Database template。
  6. 在提供的參數欄位中輸入參數值。
  7. 按一下「Run Job」(執行工作)

gcloud CLI

在殼層或終端機中執行範本:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_to_SourceDb \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       changeStreamName=CHANGE_STREAM_NAME,\
       instanceId=INSTANCE_ID,\
       databaseId=DATABASE_ID,\
       spannerProjectId=SPANNER_PROJECT_ID,\
       metadataInstance=METADATA_INSTANCE,\
       metadataDatabase=METADATA_DATABASE,\
       sourceShardsFilePath=SOURCE_SHARDS_FILE_PATH,\

更改下列內容:

  • JOB_NAME: 您選擇的不重複工作名稱
  • VERSION:您要使用的範本版本

    您可以使用下列值:

  • REGION_NAME:您要部署 Dataflow 工作的區域,例如 us-central1
  • CHANGE_STREAM_NAME:要讀取的變更串流名稱
  • INSTANCE_ID:Cloud Spanner 執行個體 ID。
  • DATABASE_ID:Cloud Spanner 資料庫 ID。
  • SPANNER_PROJECT_ID:Cloud Spanner 專案 ID。
  • METADATA_INSTANCE:從變更串流讀取資料時,用於儲存中繼資料的 Cloud Spanner 執行個體
  • METADATA_DATABASE:從變更串流讀取資料時,用於儲存中繼資料的 Cloud Spanner 資料庫
  • SOURCE_SHARDS_FILE_PATH:包含來源分片詳細資料的 GCS 檔案路徑

API

如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "changeStreamName": "CHANGE_STREAM_NAME",
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "spannerProjectId": "SPANNER_PROJECT_ID",
       "metadataInstance": "METADATA_INSTANCE",
       "metadataDatabase": "METADATA_DATABASE",
       "sourceShardsFilePath": "SOURCE_SHARDS_FILE_PATH",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_to_SourceDb",
     "environment": { "maxWorkers": "10" }
  }
}

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的 Google Cloud 專案 ID
  • JOB_NAME: 您選擇的不重複工作名稱
  • VERSION:您要使用的範本版本

    您可以使用下列值:

  • LOCATION:您要部署 Dataflow 工作的區域,例如 us-central1
  • CHANGE_STREAM_NAME:要讀取的變更串流名稱
  • INSTANCE_ID:Cloud Spanner 執行個體 ID。
  • DATABASE_ID:Cloud Spanner 資料庫 ID。
  • SPANNER_PROJECT_ID:Cloud Spanner 專案 ID。
  • METADATA_INSTANCE:從變更串流讀取資料時,用於儲存中繼資料的 Cloud Spanner 執行個體
  • METADATA_DATABASE:從變更串流讀取資料時,用於儲存中繼資料的 Cloud Spanner 資料庫
  • SOURCE_SHARDS_FILE_PATH:包含來源分片詳細資料的 GCS 檔案路徑