Datastream to MySQL or PostgreSQL (Stream) 範本

「Datastream to SQL」範本是串流管道,可讀取 Datastream 資料,並將資料複製到任何 MySQL 或 PostgreSQL 資料庫。範本會使用 Pub/Sub 通知從 Cloud Storage 讀取資料,並將資料複製到 SQL 副本資料表。指定 gcsPubSubSubscription 參數,從 Pub/Sub 通知讀取資料,或提供 inputFilePattern 參數,直接從 Cloud Storage 中的檔案讀取資料。

範本不支援資料定義語言 (DDL),且預期所有資料表都已存在於資料庫中。 複製作業會使用 Dataflow 有狀態轉換來篩選過時資料,並確保無序資料的一致性。舉例來說,如果資料列的較新版本已通過,系統就會忽略該資料列的延遲版本。執行的資料操縱語言 (DML) 會盡可能完美地將來源資料複製到目標資料。執行的 DML 陳述式遵循下列規則:

  • 如果存在主鍵,插入和更新作業會使用 upsert 語法 (即 INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE)。
  • 如果存在主鍵,系統會將刪除作業複製為刪除 DML。
  • 如果沒有主鍵,插入和更新作業都會插入資料表。
  • 如果沒有主鍵,系統會忽略刪除作業。

如果您使用 Oracle 至 Postgres 公用程式,請在 SQL 中新增 ROWID 做為主鍵 (如果沒有主鍵)。

管道相關規定

  • 已準備好或正在複製資料的 Datastream 串流。
  • 為 Datastream 資料啟用 Cloud Storage Pub/Sub 通知
  • 已使用必要結構定義植入 PostgreSQL 資料庫。
  • 設定 Dataflow 工作站與 PostgreSQL 之間的網路存取權。

範本參數

必要參數

  • inputFilePattern:Cloud Storage 中要複製的 Datastream 檔案位置。這個檔案位置通常是串流的根路徑。
  • databaseHost:要連線的 SQL 主機。
  • databaseUser:具備所有必要權限的 SQL 使用者,可寫入所有複寫資料表。
  • databasePassword:SQL 使用者的密碼。

選用參數

  • gcsPubSubSubscription:具有 Datastream 檔案通知的 Pub/Sub 訂閱項目。例如:projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>
  • inputFileFormat:Datastream 產生的輸出檔案格式。例如 avrojson。預設值為 avro
  • streamName:要輪詢結構定義資訊的串流名稱或範本。預設值為 {_metadata_stream}
  • rfcStartDateTime:用於從 Cloud Storage 擷取的開始日期時間 (https://tools.ietf.org/html/rfc3339)。預設值為:1970-01-01T00:00:00.00Z。
  • dataStreamRootUrl:Datastream API 根網址。預設值為 https://datastream.googleapis.com/
  • databaseType:要寫入的資料庫類型 (例如 Postgres)。預設值為 postgres。
  • databasePort:要連線的 SQL 資料庫連接埠。預設值為 5432
  • databaseName:要連線的 SQL 資料庫名稱。預設值為 postgres
  • defaultCasing:切換資料表大小寫行為。例如 (即 LOWERCASE = mytable -> mytable、UPPERCASE = mytable -> MYTABLECAMEL = my_table -> myTable、SNAKE = myTable -> my_table。預設值為 LOWERCASE。
  • columnCasing:目標資料欄名稱大小寫的切換鈕。小寫 (預設):my_column -> my_column。大寫:my_column -> MY_COLUMN。CAMEL:my_column -> myColumn。蛇形:myColumn -> my_column。
  • schemaMap:用於指定結構定義名稱變更的鍵/值對應 (即 old_name:new_name、CaseError:case_error)。預設為空白。
  • customConnectionString:選用連線字串,將取代預設資料庫字串。
  • numThreads:決定「格式」至「DML」步驟的索引鍵平行處理,具體來說,這個值會傳遞至 Reshuffle.withNumBuckets。預設值為 100。
  • databaseLoginTimeout:嘗試登入資料庫的逾時時間 (以秒為單位)。以免多個工作人員同時嘗試連線時,連線發生停滯。
  • datastreamSourceType:覆寫 Datastream CDC 資料的來源類型偵測結果。指定這個值後,系統就會使用這個值,而不是從 read_method 欄位衍生來源類型。有效值包括「mysql」、「postgresql」、「oracle」等。如果 read_method 欄位包含「cdc」,但系統無法自動判斷實際來源類型,這個參數就很有用。
  • orderByIncludesIsDeleted:資料的排序依據設定應優先處理未刪除的資料。預設值為 false。

執行範本

控制台

  1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
  2. 前往「依範本建立工作」
  3. 在「Job name」(工作名稱) 欄位中,輸入不重複的工作名稱。
  4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

    如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

  5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中,選取 the Cloud Datastream to SQL template。
  6. 在提供的參數欄位中輸入參數值。
  7. 按一下「Run Job」(執行工作)

gcloud

在殼層或終端機中執行範本:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的 Google Cloud 專案 ID
  • JOB_NAME: 您選擇的不重複工作名稱
  • REGION_NAME:您要部署 Dataflow 工作的區域,例如 us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH:Datastream 資料的 Cloud Storage 路徑。例如:gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME:用於讀取變更檔案的 Pub/Sub 訂閱項目。例如:projects/my-project-id/subscriptions/my-subscription-id
  • DATABASE_HOST:您的 SQL 主機 IP。
  • DATABASE_USER:您的 SQL 使用者。
  • DATABASE_PASSWORD:您的 SQL 密碼。

API

如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的 Google Cloud 專案 ID
  • JOB_NAME: 您選擇的不重複工作名稱
  • LOCATION:您要部署 Dataflow 工作的區域,例如 us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH:Datastream 資料的 Cloud Storage 路徑。例如:gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME:用於讀取變更檔案的 Pub/Sub 訂閱項目。例如:projects/my-project-id/subscriptions/my-subscription-id
  • DATABASE_HOST:您的 SQL 主機 IP。
  • DATABASE_USER:您的 SQL 使用者。
  • DATABASE_PASSWORD:您的 SQL 密碼。

後續步驟