ストリームの管理

このページでは、Datastream API を使用して次の操作を行う方法について説明します。

  • ストリームを作成する
  • ストリームとストリーム オブジェクトに関する情報を取得する
  • ストリームの開始、一時停止、再開、変更、ストリーム オブジェクトのバックフィルの開始と停止によってストリームを更新する
  • 恒久的な障害が発生したストリームの復元
  • Oracle ストリームのラージ オブジェクトのストリーミングを有効にする
  • ストリームの削除

Datastream API を使用する方法は 2 つあります。REST API 呼び出しを行うか、Google Cloud CLI(CLI)を使用できます。

Google Cloud CLI を使用したデータストリームのストリームの管理の概要については、gcloud CLI データストリームのストリームをご覧ください。

ストリームの作成

このセクションでは、ソースから宛先にデータを転送するために使用するストリームを作成する方法について説明します。次の例は包括的なものではなく、Datastream の特定の機能をハイライトしたものです。特定のユースケースに対応するには、これらの例を Datastream API リファレンス ドキュメントと組み合わせて使用します。

このセクションでは、次のユースケースについて説明します。

例 1: 特定のオブジェクトを BigQuery にストリーミングする

この例では、以下の方法について学習します。

  • MySQL から BigQuery へのストリーミング
  • ストリームにオブジェクトのセットを含める
  • ストリームの書き込みモードを追記専用として定義する
  • ストリームに含まれるすべてのオブジェクトをバックフィルする

次のリクエストは、schema1 からすべてのテーブルを、schema2 から 2 つの特定のテーブル(tableAtableC)をそれぞれ pull するリクエストです。イベントは BigQuery のデータセットに書き込まれます。

リクエストに customerManagedEncryptionKey パラメータが含まれていないため、 Google Cloud 内部の鍵管理システムを使用してデータを暗号化し、CMEK を使用します。

履歴バックフィル(またはスナップショット)の実行に関連する backfillAll パラメータが空の辞書({})に設定されます。つまり、Datastream は、ストリームに含まれるすべてのテーブルの履歴データをバックフィルします。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "dataFreshness": "900s"
    }
  },
  "backfillAll": {}
}

gcloud

gcloud を使用してストリームを作成する方法については、Google Cloud SDK のドキュメントをご覧ください。

例 2: PostgreSQL ソースを含むストリームから特定のオブジェクトを除外する

この例では、以下の方法について学習します。

  • PostgreSQL から BigQuery へのストリーミング
  • ストリームからオブジェクトを除外する
  • バックフィルからオブジェクトを除外する

次のコードは、ソース PostgreSQL データベースから BigQuery にデータを転送するために使用するストリームの作成リクエストを示しています。 ソース PostgreSQL データベースからストリームを作成するときは、リクエストに PostgreSQL 固有の項目を 2 つ指定する必要があります。

  • replicationSlot: レプリケーション スロットは、レプリケーション用に PostgreSQL データベースを構成するための前提条件です。ストリームごとにレプリケーション スロットを作成する必要があります。
  • publication: パブリケーションは、変更を複製するテーブルのグループです。ストリームを開始する前に、パブリケーション名がデータベースに存在している必要があります。少なくとも、パブリケーションにはストリームの includeObjects リストで指定されたテーブルが含まれている必要があります。

履歴バックフィル(またはスナップショット)の実行に関連する backfillAll パラメータが、1 つのテーブルを除外するように設定されます。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

gcloud を使用してストリームを作成する方法については、Google Cloud SDK のドキュメントをご覧ください。

例 3: ストリームの追記専用の書き込みモードを指定する

BigQuery にストリーミングするときに、書き込みモード(merge または appendOnly)を定義できます。詳細については、書き込みモードを構成するをご覧ください。

ストリームを作成するリクエストで書き込みモードを指定しない場合、デフォルトの merge モードが使用されます。

次のリクエストは、MySQL から BigQuery へのストリームを作成するときに appendOnly モードを定義する方法を示しています。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream
{
  "displayName": "My append-only stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

gcloud を使用してストリームを作成する方法については、Google Cloud SDK のドキュメントをご覧ください。

例 4: BigQuery の別のプロジェクトにストリーミングする

Datastream リソースを 1 つのプロジェクトで作成したが、BigQuery の別のプロジェクトにストリーミングする場合は、次のようなリクエストを使用して行うことができます。

宛先データセットに sourceHierarchyDatasets を指定する場合は、projectId フィールドに入力する必要があります。

宛先データセットに singleTargetDataset を指定した場合は、projectId:datasetId 形式で datasetId フィールドに入力します。

REST

sourceHierarchyDatasets:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=crossProjectBqStream1
{
  "displayName": "My cross-project stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        },
        "projectId": "myProjectId2"
      }
    }
  },
  "backfillAll": {}
}

singleTargetDataset:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=crossProjectBqStream2
{
  "displayName": "My cross-project stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "singleTargetDataset": {
        "datasetId": "myProjectId2:myDatasetId"
      },
    }
  },
  "backfillAll": {}
}

gcloud

sourceHierarchyDatasets:

  datastream streams create crossProjectBqStream1 --location=us-central1
  --display-name=my-cross-project-stream --source=source-cp --mysql-source-config=mysql_source_config.json
  --destination=destination-cp --bigquery-destination-config=source_hierarchy_cross_project_config.json
  --backfill-none
  

source_hierarchy_cross_project_config.json 構成ファイルの内容:

  {"sourceHierarchyDatasets": {"datasetTemplate": {"location": "us-central1", "datasetIdPrefix": "prefix_"}, "projectId": "myProjectId2"}}
  

singleTargetDataset:

  datastream streams create crossProjectBqStream --location=us-central1
  --display-name=my-cross-project-stream --source=source-cp --mysql-source-config=mysql_source_config.json
  --destination=destination-cp --bigquery-destination-config=single_target_cross_project_config.json
  --backfill-none
  

single_target_cross_project_config.json 構成ファイルの内容:

  {"singleTargetDataset": {"datasetId": "myProjectId2:myDatastetId"}}
  

gcloud を使用してストリームを作成する方法については、Google Cloud SDK のドキュメントをご覧ください。

例 5: Cloud Storage の宛先にストリーミングする

この例では、以下の方法について学習します。

  • Oracle から Cloud Storage へのストリーミング
  • ストリームに含めるオブジェクトのセットを定義する
  • 保存データを暗号化するための CMEK を定義する

次のリクエストは、Cloud Storage のバケットにイベントを書き込むストリームを作成する方法を示しています。

このリクエスト例では、イベントは JSON 出力形式で書き込まれ、100 MB または 30 秒ごとに新しいファイルが作成されます(デフォルト値の 50 MB と 60 秒をオーバーライドしています)。

JSON 形式では、次のことが可能です。

  • パスに統合型スキーマ ファイルを含めるその結果、データストリームによって JSON データファイルと Avro スキーマ ファイルの 2 つのファイルが Cloud Storage に書き込まれます。スキーマ ファイルは、データファイルと同じ名前で、拡張子は .schema です。

  • gzip 圧縮を有効にする。これによって、Cloud Storage に書き込まれたファイルをデータストリームが圧縮するようにします。

backfillNone パラメータを使用すると、このリクエストでは、バックフィルなしに、進行中の変更のみが宛先にストリーミングされることが指定されます。

リクエストでは、 Google Cloud プロジェクト内の保存データの暗号化に使用する鍵を制御できる顧客管理の暗号鍵パラメータを指定します。このパラメータは、ソースから宛先にストリーミングされるデータの暗号化に Datastream が使用する CMEK を参照します。また、CMEK のキーリングも指定します。

キーリングの詳細については、Cloud KMS リソースをご覧ください。暗号鍵を使用してデータを保護する方法については、Cloud Key Management Service(KMS)をご覧ください。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

gcloud を使用してストリームを作成する方法については、Google Cloud SDK のドキュメントをご覧ください。

例 6: BigLake マネージド テーブルにストリーミングする

この例では、append-only モードで MySQL データベースから BigLake Iceberg テーブルにデータを複製するようにストリームを構成する方法について説明します。リクエストを作成する前に、次の手順を完了していることを確認してください。

  • データを保存する Cloud Storage バケットがある
  • Cloud リソース接続を作成する
  • Cloud Storage バケットへのアクセス権を Cloud リソース接続に付与する

次のリクエストを使用してストリームを作成できます。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlBigLakeStream
{
  "displayName": "MySQL to BigLake stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlBigLakeCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          {
            "database": "my-mysql-database"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id",
    "bigqueryDestinationConfig": {
      "blmtConfig": {
        "bucket": "my-gcs-bucket-name",
        "rootPath": "my/folder",
        "connectionName": "my-project-id.us-central1.my-bigquery-connection-name",
        "fileFormat": "PARQUET",
        "tableFormat": "ICEBERG"
        },
      "singleTargetDataset": {
        "datasetId": "my-project-id:my-bigquery-dataset-id"
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

datastream streams create mysqlBigLakeStream --location=us-central1
--display-name=mysql-to-bl-stream --source=source --mysql-source-config=mysql_source_config.json
--destination=destination --bigquery-destination-config=bl_config.json
--backfill-none

mysql_source_config.json ソース構成ファイルの内容:

{"excludeObjects": {}, "includeObjects": {"mysqlDatabases":[{"database":"my-mysql-database"}]}}

bl_config.json 構成ファイルの内容:

{ "blmtConfig": { "bucket": "my-gcs-bucket-name", "rootPath": "my/folder", "connectionName": "my-project-id.us-central1.my-bigquery-connection-name", "fileFormat": "PARQUET", "tableFormat": "ICEBERG" }, "singleTargetDataset": {"datasetId": "my-project-id:my-bigquery-dataset-id"}, "appendOnly": {} }

Terraform

resource "google_datastream_stream" "stream" {
  stream_id    = "mysqlBlStream"
  location     = "us-central1"
  display_name = "MySQL to BigLake stream"

  source_config {
    source_connection_profile = "/projects/myProjectId1/locations/us-central1/streams/mysqlBlCp"
    mysql_source_config {
      include_objects {
        mysql_databases {
          database = "my-mysql-database"
        }
      }
    }
  }

  destination_config {
    destination_connection_profile = "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id"
    bigquery_destination_config {
      single_target_dataset {
        dataset_id = "my-project-id:my-bigquery-dataset-id"
      }
      blmt_config {
        bucket          = "my-gcs-bucket-name"
        table_format    = "ICEBERG"
        file_format     = "PARQUET"
        connection_name = "my-project-id.us-central1.my-bigquery-connection-name"
        root_path       = "my/folder"
      }
      append_only {}
    }
  }

  backfill_none {}
}
    

ストリームの定義を検証する

ストリームは、作成する前にその定義を検証できます。これにより、すべての検証チェックに合格し、作成時にストリームが正常に実行されることが確実になります。

ストリームの検証では、次のことを確認します。

  • データストリームがソースからデータをストリーミングできるようにソースが適切に構成されているかどうか
  • ストリームがソースと宛先の両方に接続できるかどうか
  • ストリームのエンドツーエンド構成

ストリームを検証するには、リクエストの本文の前の URL に &validate_only=true を追加します。

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

このリクエストを行うと、Datastream がソースと宛先に対して実行する検証チェックと、チェックの合否が表示されます。検証で不合格となった場合は、失敗の理由と問題を解決する方法に関する情報が表示されます。

たとえば、ソースから宛先にストリーミングされるデータの暗号化に Datastream で使用する顧客管理の暗号鍵(CMEK)があるとします。ストリームの検証の一環として、Datastream はキーが存在することと、Datastream にキーを使用する権限があることを確認します。いずれかの条件が満たされていない場合、ストリームを検証すると次のエラー メッセージが返されます。

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

この問題を解決するには、指定した鍵が存在し、Datastream サービス アカウントにその鍵に対する cloudkms.cryptoKeys.get 権限があることを確認します。

適切な修正を行った後、リクエストを再度実行して、すべての検証チェックに合格することを確認します。上記の例では、CMEK_VALIDATE_PERMISSIONS チェックでエラー メッセージが返されなくなり、ステータスが PASSED になります。

ストリームに関する情報を取得する

次のコマンドでは、ストリームに関する情報を取得するリクエストを示します。これには以下の情報が含まれます。

  • ストリームの名前(固有識別子)
  • ストリームのユーザー フレンドリーな名前(表示名)
  • ストリームの作成日時と最終更新日時のタイムスタンプ
  • ストリームに関連付けられたソースと宛先の接続プロファイルに関する情報
  • ストリームの状態

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

レスポンスは次のように表示されます。

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

gcloud を使用してストリームに関する情報を取得する方法については、Google Cloud SDK のドキュメントをご覧ください。

ストリームを一覧表示する

次のコードは、指定したプロジェクトと場所内の全ストリームのリストを取得するリクエストを示しています。

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

gcloud を使用してすべてのストリームに関する情報を取得する方法については、Google Cloud SDK のドキュメントをご覧ください。

ストリームのオブジェクトを一覧表示する

次のコマンドでは、ストリームのすべてのオブジェクトに関する情報を取得するリクエストを示します。

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

gcloud を使用してストリームのすべてのオブジェクトに関する情報を取得する方法については、Google Cloud SDK のドキュメントをご覧ください。

返されるオブジェクトのリストは次のようになります。

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

gcloud を使用してストリームのオブジェクトを一覧表示する方法については、Google Cloud SDK のドキュメントをご覧ください。

ストリームを開始する

次のコマンドでは、ストリームを開始するリクエストを示します。

リクエストで updateMask パラメータを使用することによって、指定したフィールドのみをリクエストの本文に含めば済むようになります。ストリームを開始するには、state フィールドの値を CREATED から RUNNING に変更します。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

gcloud を使用してストリームを開始する方法については、Google Cloud SDK のドキュメントをご覧ください。

ストリームを一時停止する

次のコマンドでは、実行中のストリームを一時停止するリクエストを示します。

この例では、updateMask パラメータに state フィールドを指定します。ストリームを一時停止することで、その状態は RUNNING から PAUSED に変わります。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

gcloud を使用してストリームを一時停止する方法については、Google Cloud SDK のドキュメントをご覧ください。

ストリームを再開する

次のコマンドでは、一時停止されたストリームを再開するリクエストを示します。

この例では、updateMask パラメータに state フィールドを指定します。ストリームを再開すると、状態は PAUSED から RUNNING に変わります。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

gcloud を使用してストリームを再開する方法については、Google Cloud SDK のドキュメントをご覧ください。

ストリームを復元する

RunStream メソッドを使用して、恒久的に失敗したストリームを復元できます。各ソース データベース タイプには、可能なストリーム復元オペレーションの独自の定義があります。詳細については、ストリームを復元するをご覧ください。

MySQL または Oracle ソースのストリームを復元する

次のコードサンプルは、さまざまなログファイル位置から MySQL または Oracle ソースのストリームを復元するリクエストを示しています。

REST

現在の位置からストリームを復元します。これはデフォルトのオプションです。

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

次の使用可能な位置からストリームを復元します:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

最新の位置からストリームを復元します。

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

ストリームを特定の位置から復元します(MySQL バイナリログベースのレプリケーション)。

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

次のように置き換えます。

  • NAME_OF_THE_LOG_FILE: そこからストリームを復元するログファイルの名前
  • POSITION: ストリームを復元するログファイル内の位置。値を指定しない場合、Datastream はファイルの先頭からストリームを復元します。

次に例を示します。

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

ストリームを特定の位置から復元します(MySQL GTID ベースのレプリケーション)。

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "GTID_SET"
      }
    }
  }
}

GTID_SET は、ストリームを復元する 1 つ以上の単一の GTID または GTID の範囲に置き換えます。

次に例を示します。

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:1-3"
      }
    }
  }
}

ストリームを特定の位置から復元します(Oracle)。

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
scn を、そこからストリームを復元する redo ログファイル内のシステム変更番号(SCN)に置き換えます。この項目は必須です。

次に例を示します。

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

利用可能な復元オプションについて詳しくは、ストリームを復元するをご覧ください。

gcloud

gcloud を使用したストリームの復元はサポートされていません。

PostgreSQL ソースのストリームを復元する

次のコードサンプルは、PostgreSQL ソースのストリームを復元するリクエストを示しています。復元中、ストリームはストリーム用に構成されたレプリケーション スロットの最初のログシーケンス番号(LSN)から読み取りを開始します。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

次に例を示します。

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

レプリケーション スロットを変更する場合は、最初に新しいレプリケーション スロット名を更新します。

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

gcloud を使用したストリームの復元はサポートされていません。

SQL Server ソースのストリームを復元する

次のコードサンプルは、SQL Server ソースのストリームを復元するリクエストの例を示しています。

REST

使用可能な最初の位置からストリームを復元します:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

次に例を示します。

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run

指定のログシーケンス番号からストリームを復元します:

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": lsn
      }
    }
  }
}

次に例を示します。

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": 0000123C:0000BA78:0004
      }
    }
  }
}

gcloud

gcloud を使用したストリームの復元はサポートされていません。

特定のポジションからストリームを開始または再開する

MySQL と Oracle のソースでは、特定の位置からストリームを開始したり、一時停止したストリームを再開したりできます。これは、外部ツールを使用してバックフィルを実行する場合や、指定した位置から CDC を開始する場合に便利です。MySQL ソースの場合は binlog 位置または GTID セット、Oracle ソースの場合は redo ログファイル内のシステム変更番号(SCN)を指定する必要があります。

次のコードは、特定の位置から作成済みのストリームを開始または再開するリクエストを示しています。

特定のバイナリログ位置からストリームを開始または再開する(MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

以下を置き換えます。

  • NAME_OF_THE_LOG_FILE: そこからストリームを開始するログファイルの名前。
  • POSITION: ストリームを開始するログファイル内の位置。値を指定しない場合、Datastream はファイルの先頭から読み取りを開始します。

次に例を示します。

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

gcloud を使用して特定の位置からストリームを開始または再開することはできません。gcloud を使用してストリームを開始または再開する方法については、Cloud SDK のドキュメントをご覧ください。

特定の GTID セットからストリームを開始または再開する(MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "GTID_SET"
      }
    }
  }
}

GTID_SET は、ストリームの開始または再開に使用する 1 つ以上の単一の GTID または GTID の範囲に置き換えます。

次に例を示します。

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:3-7"
      }
    }
  }
}

gcloud

gcloud を使用して特定の位置からストリームを開始または再開することはできません。gcloud を使用してストリームを開始または再開する方法については、Cloud SDK のドキュメントをご覧ください。

redo ログファイル内の特定のシステム変更番号からストリームを開始または再開する(Oracle):

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
scn を、そこからストリームを開始する redo ログファイル内のシステム変更番号(SCN)に置き換えます。この項目は必須です。

次に例を示します。

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

gcloud を使用して特定の位置からストリームを開始または再開することはできません。gcloud を使用してストリームを開始する方法については、Cloud SDK のドキュメントをご覧ください。

ストリームを変更する

次のコマンドでは、ファイルを 75 MB または 45 秒ごとにローテーションするようにストリームのファイル ローテーション構成を更新するリクエストを示します。

この例では、updateMask パラメータで指定されたフィールドに fileRotationMb フィールドと fileRotationInterval フィールドが含まれ、それぞれ destinationConfig.gcsDestinationConfig.fileRotationMb フラグと destinationConfig.gcsDestinationConfig.fileRotationInterval フラグで表されます。

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

次のコマンドでは、データストリームが Cloud Storage に書き込むファイルのパスに統合型スキーマ ファイルを含めるリクエストを示します。この結果、データストリームによって JSON データファイルと Avro スキーマ ファイルの 2 つのファイルが書き込まれます。

この例では、destinationConfig.gcsDestinationConfig.jsonFileFormat フラグにより表された jsonFileFormat フィールドが指定されます。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }  
    }
  }
}

次のコマンドでは、進行中のデータに加え、既存のデータをソース データベースから宛先にレプリケートするデータストリームのリクエストを示します。

コードの oracleExcludedObjects セクションには、宛先へのバックフィルが制限されているテーブルとスキーマが示されます。

この例では、schema3 の tableA を除くすべてのテーブルとスキーマがバックフィルされます。

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}  

gcloud

gcloud を使用してストリームを変更する方法については、Google Cloud SDK のドキュメントをご覧ください。

ストリームのオブジェクトのバックフィルを開始する

データストリームのストリームでは、過去のデータをバックフィルすることや、進行中の変更を宛先にストリーミングすることが可能です。進行中の変更は常に、ソースから宛先にストリーミングされます。ただし、履歴データをストリーミングするかどうかは指定できます。

過去のデータをソースから宛先にストリーミングする場合は、backfillAll パラメータを使用します。

Datastream では、特定のデータベース テーブルの履歴データのみをストリーミングすることもできます。これを行うには、backfillAll パラメータを使用して、履歴データが不要なテーブルを除外します。

進行中の変更のみを宛先にストリーミングする場合は、backfillNone パラメータを使用します。その後、データストリームですべてのソースのスナップショットをソースから宛先にストリーミングする場合は、そのデータを含むオブジェクトのバックフィルを手動で開始する必要があります。

オブジェクトのバックフィルを開始するもう一つの理由は、データがソースと宛先の間で同期していない場合です。たとえば、ユーザーが誤って宛先のデータを削除してしまうと、そのデータが失われています。この場合、オブジェクトのバックフィルを開始することが「リセットの仕組み」として機能します。これは、すべてのデータが一度に宛先へストリーミングされるためです。その結果、ソースと宛先の間でデータが同期します。

ストリームのオブジェクトのバックフィルを開始する前には、オブジェクトに関する情報を取得する必要があります。

各オブジェクトには、オブジェクトを一意に識別する OBJECT_ID があります。OBJECT_ID を使用して、ストリームのバックフィルを開始します。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

gcloud を使用してストリームのオブジェクトのバックフィルを開始する方法については、Google Cloud SDK のドキュメントをご覧ください。

ストリームのオブジェクトのバックフィルを停止する

ストリームのオブジェクトのバックフィルを開始した後は、それを停止できます。たとえば、ユーザーがデータベース スキーマを変更すると、スキーマやデータが壊れる可能性があります。このスキーマやデータが宛先にストリーミングされないようにするには、オブジェクトのバックフィルを停止します。

また、ロード バランシングのためにオブジェクトのバックフィルを停止することもできます。データストリームは、複数のバックフィルを並行して実行できます。これにより、ソースの負荷が増加する可能性があります。負荷が著しく大きい場合は、各オブジェクトのバックフィルを停止してから、オブジェクトのバックフィルを 1 つずつ開始します。

ストリームのオブジェクトのバックフィルを停止する前に、ストリームのすべてのオブジェクトに関する情報を取得するリクエストを行う必要があります。返される各オブジェクトには、オブジェクトを一意に識別する OBJECT_ID があります。OBJECT_ID を使用して、ストリームのバックフィルを停止します。

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

gcloud を使用してストリームのオブジェクトのバックフィルを停止する方法については、Google Cloud SDK のドキュメントをご覧ください。

同時実行 CDC タスクの最大数を変更する

次のコードは、MySQL ストリームの同時実行変更データ キャプチャ(CDC)タスクの最大数を 7 に設定する方法を示しています。

この例では、updateMask パラメータに maxConcurrentCdcTasks フィールドを指定します。この値を 7 に設定すると、同時実行 CDC タスク数が以前の値から 7 に変更されます。0~50(50を含む) の値を使用できます。値を定義しない場合、または 0 として定義した場合、ストリームにはシステム デフォルトの 5 つのタスクが設定されます。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }  
}

gcloud

gcloud の使用方法については、Google Cloud SDK のドキュメントをご覧ください。

同時実行バックフィル タスクの最大数を変更する

次のコードは、MySQL ストリームの同時実行バックフィル タスクの最大数を 25 に設定する方法を示しています。

この例では、updateMask パラメータに maxConcurrentBackfillTasks フィールドを指定します。この値を 25 に設定すると、同時実行バックフィル タスクの最大数が以前の値から 25 に変更されます。0~50(50を含む) の値を使用できます。値を定義しない場合、または 0 として定義した場合、ストリームにはシステムのデフォルトである 16 個のタスクが設定されます。

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }  
}

gcloud

gcloud の使用方法については、Google Cloud SDK のドキュメントをご覧ください。

Oracle ソースのラージ オブジェクトのストリーミングを有効にする

Oracle ソースでのストリーミングでは、バイナリラージ オブジェクト(BLOB)、文字ラージ オブジェクト(CLOB)、国別文字ラージ オブジェクト(NCLOB)などの大規模オブジェクトのストリーミングを有効にできます。streamLargeObjects フラグを使用すると、新しいストリームでも既存のストリームでもラージ オブジェクトを作成できますこのフラグはストリームレベルで設定されラージ オブジェクト データ型の列を指定する必要はありません。

次の例は、大きなオブジェクトをストリーミングできるストリームを作成する方法を示しています。

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

gcloud を使用してストリームを更新する方法については、Google Cloud SDK のドキュメントをご覧ください。

ストリームを削除する

次のコマンドでは、ストリームを削除するリクエストを示します。

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

gcloud を使用してストリームを削除する方法については、Google Cloud SDK のドキュメントをご覧ください。

次のステップ