DataStream API の使用

Datastream

多くの分離されたデータソースを持つ企業では、組織全体の企業データへのアクセスは、特にリアルタイムで困難になる可能性があります。その結果、データへのアクセスが制限されて遅くなり、組織の内省力を妨げることになります。

Datastream は、オンプレミスやクラウドベースのさまざまなデータソースから変更データにほぼリアルタイムでアクセスできるようにし、組織データへのアクセスを提供します。Datastream は、組織全体で最新のエンタープライズ データに誰でもアクセスできるようにする統合消費 API を提供し、統合されたほぼリアルタイムのシナリオを実現します。

そのようなシナリオの一つは、ソース データベースからクラウドベースのストレージ サービスまたはメッセージング キューにデータを転送し、ストレージ サービスまたはメッセージング キューと通信するその他のアプリケーションやサービスによって読み取り可能な形式にこのデータを変換することです。

このチュートリアルでは、Datastream を使用して、ソースの Oracle データベースから Cloud Storage バケット内のフォルダにスキーマ、テーブル、データを転送する方法を学習します。Cloud Storage は、 Google Cloud上にデータを保存してアクセスできるウェブサービスです。このサービスは、Google のクラウドのパフォーマンスとスケーラビリティに、高度なセキュリティ機能と共有機能を組み合わせたものです。

転送先の Cloud Storage バケット内のフォルダにこの情報を転送する操作の一環として、Datastream はこの情報を Avro に変換します。Avro は、JavaScript Object Notation(JSON)で記述されたスキーマによって定義されます。これにより、異なるデータソース間でのデータの読み取りを統一された方法で行うことができます。

環境変数の設定

この手順では、次の変数を設定します。

  • $PROJECT: この変数は Google Cloud プロジェクトに関連付けられています。割り当てて使用するGoogle Cloud リソースはすべて、プロジェクトに含まれている必要があります。
  • $TOKEN: この変数はアクセス トークンに関連付けられています。アクセス トークンは、Cloud Shell が REST API を使用して Datastream でタスクを実行するために使用するセッションを提供します。
  1. Cloud Shell アプリケーションを起動します。

  2. Google アカウントを使用してアプリケーションを認証したら、次のコマンドを入力します。

    gcloud auth login
    
  3. Do you want to continue (Y/n)? プロンプトで、Y を入力します。

  4. ウェブブラウザを開き、URL をブラウザにコピーします。

  5. Google アカウントを使用して Google Cloud SDK に対して認証します。[ログイン] ページにコードが表示されます。このコードがアクセス トークンです。

  6. アクセス トークンをコピーして、Cloud Shell アプリケーションの Enter verification code: パラメータに貼り付け、Enter を押します。

  7. プロンプトで「PROJECT=\"YOUR_PROJECT_NAME\"」と入力して、$PROJECT 環境変数を Google Cloudプロジェクトに設定します。

  8. プロンプトで「gcloud config set project YOUR_PROJECT_NAME」を入力して、作業するプロジェクトを Google Cloudプロジェクトに設定します。

    コマンド プロンプトは、アクティブなプロジェクトを反映するように更新され、次の形式が適用されます: USERNAME@cloudshell:~ (YOUR_PROJECT_NAME)$

  9. プロンプトで「TOKEN=$(gcloud auth print-access-token)」と入力してアクセス トークンを取得し、変数として保存します。

  10. プロンプトで次のコマンドを入力して、$PROJECT 変数と $TOKEN 変数が正しく設定されていることを確認します。

    • echo $PROJECT
    • echo $TOKEN

変数を設定したので、Datastream に対してリクエストを行い、接続プロファイルとストリームの両方を作成して管理できます。

接続プロファイルを作成して管理する

このセクションでは、Cloud Storage のソースの Oracle データベースと転送先バケットの接続プロファイルを作成して、管理します。

これらの接続プロファイルを作成すると、ソースのデータベースと転送先の Cloud Storage バケットに関する情報を含むレコードが作成されます。Datastream は、接続プロファイルの情報を使用して、ソースのデータベースから転送先バケットのフォルダにデータを転送します。

接続プロファイルの作成と管理には、次のものが含まれます。

  • ソースの Oracle データベースと Cloud Storage の転送先バケットの接続プロファイルの作成
  • 接続プロファイルに関する情報を取得する
  • 接続プロファイルの変更
  • ソース Oracle 接続プロファイルで Discover API 呼び出しを実行する。この呼び出しにより、データベース内部を覗いてそのオブジェクトに関連付けられているオブジェクトを確認できます。これらのオブジェクトには、データベースのデータを含むスキーマとテーブルが含まれます。Datastream を使用してストリームを構成する際には、すべてのオブジェクトをデータベースから pull するのではなく、オブジェクトのサブセット(たとえば、データベースの特定のテーブルとスキーマ)のみを pull することをおすすめします。Discover API を使用して、取得するデータベース オブジェクトのサブセットを見つけます。

接続プロファイルの作成

この手順では 2 つの接続プロファイル(移行元 Oracle データベースへの接続と、Cloud Storage 内の移行先バケットへの接続)を作成します。

  1. 移行元 Oracle データベースへの接続プロファイルを作成するプロンプトが表示されたら、次のコマンドを入力します。
ORACLE="{\"displayName\":\"DISPLAY_NAME\",\"oracle_profile\":{\"hostname\":\"HOSTNAME\",\"username\":\"USERNAME\",\"password\":\"PASSWORD\",\"database_service\":\"DATABASE_SERVICE\",\"port\":"PORT_NUMBER\"},\"no_connectivity\":{}}"
  

次の表は、ソースの Oracle データベースのパラメータ値を理解する際に役立ちます。

パラメータ値次のものに変更します
DISPLAY_NAMEソースのデータベースへの接続プロファイルの表示名。
HOSTNAMEソースのデータベース サーバーのホスト名。
USERNAMEソースのデータベースのアカウントのユーザー名(例: ROOT)。
PASSWORDソースのデータベース用のアカウントのパスワード。
DATABASE_SERVICEソースのデータベースが確実に保護され、モニタリングされるサービス。Oracle データベースの場合、データベース サービスは通常 ORCL です。
PORT_NUMBERソース データベース用に予約されているポート番号。Oracle データベースの場合、通常、ポート番号は 1521 です。

  1. プロンプトで echo $ORACLE | jq コマンドを入力すると、作成したソースの接続プロファイルが読みやすいテキストで表示されます。

    {
      "displayName": "DISPLAY_NAME",
      "oracle_profile": {
        "hostname": "HOSTNAME",
        "username": "USERNAME",
        "password": "PASSWORD",
        "database_service": "DATABASE_SERVICE",
        "port": PORT_NUMBER
       },
      "no_connectivity": {}
    }
  2. Oracle 接続プロファイルを送信して、この接続プロファイルを作成できるようにします。プロンプトが表示されたら、次のコマンドを入力します。

    curl -X POST -d $ORACLE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles?connection_profile_id=SOURCE_CONNECTION_PROFILE_ID

    次の表は、このコマンドのパラメータ値を理解する際に役立ちます。

    パラメータ値次のものに変更します
    DATASTREAM_API_VERSIONDatastream API の現在のバージョン(例: v1)。
    PROJECT_PATH Google Cloud プロジェクトのフルパス(例: projects/$PROJECT/locations/YOUR_PROJECT_LOCATION)。
    SOURCE_CONNECTION_PROFILE_IDこの接続プロファイル用に予約された一意の ID(cp-1 など)。
  3. 次のコードの行が表示されていることを確認します。

    {
      "name": "PROJECT_PATH/operations/operation-SOURCE_CONNECTION_PROFILE_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "datastream.googleapis.com/DATASREAM_VERSION/PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "verb": "create",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
  4. Cloud Storage の宛先バケットへの接続プロファイルを作成します。プロンプトが表示されたら、次のコマンドを入力します。

    GOOGLECLOUDSTORAGE="{\"displayName\":\"DISPLAY_NAME\",\"gcs_profile\":{\"bucket_name\":\"BUCKET_NAME\",\"root_path\":\"/FOLDER_PATH\"},\"no_connectivity\":{}}"

    次の表は、転送先バケットのパラメータ値を理解する際に役立ちます。

    パラメータ値次のものに変更します
    DISPLAY_NAME転送先バケットの接続プロファイルの表示名。
    BUCKET_NAME転送先バケットの名前。
    FOLDER_PATHDatastream がソースのデータベースからデータを転送する転送先バケット内のフォルダ(例: /root/path)。
  5. プロンプトで echo $GOOGLECLOUDSTORAGE | jq コマンドを入力すると、作成した転送先の接続プロファイルが読みやすいテキストで表示されます。

    {
      "displayName": "DISPLAY_NAME",
      "gcs_profile": {
        "bucket_name": "BUCKET_NAME",
        "root_path": "/FOLDER_PATH"
      },
      "no_connectivity": {}
    }
  6. Cloud Storage 接続プロファイルを送信して、この接続プロファイルを作成できるようにします。プロンプトが表示されたら、次のコマンドを入力します。

    curl -X POST -d $GOOGLECLOUDSTORAGE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles?connection_profile_id=DESTINATION_CONNECTION_PROFILE_ID
  7. 次のコードの行が表示されていることを確認します。

    {
      "name": "PROJECT_PATH/operations/operation-DESTINATION_CONNECTION_PROFILE_OPERATION_ID",
      "metadata": {
        "@type": "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "datastream.googleapis.com/DATASTREAM_VERSION/PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
        "verb": "create",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
  8. 両方の接続プロファイルが作成されていることを確認します。プロンプトが表示されたら、次のコマンドを入力します。

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles
  9. ソースと送信先の両方の接続プロファイルで、2 つの結果が返されることを確認します。

    {
      "connectionProfiles": [
        {
          "name": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
          "createTime": "DATE_AND_TIME_STAMP",
          "updateTime": "DATE_AND_TIME_STAMP",
          "displayName": "DISPLAY_NAME",
          "gcsProfile": {
            "bucketName": "BUCKET_NAME",
            "rootPath": "FOLDER_PATH"
          },
          "noConnectivity": {}
        },
       {
        "name": "PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "createTime": "DATE_AND_TIME_STAMP",
        "updateTime": "DATE_AND_TIME_STAMP",
        "displayName": "DISPLAY_NAME",
        "oracleProfile": {
          "hostname": "HOSTNAME",
          "port": PORT_NUMBER,
          "username": "USERNAME",
          "databaseService": "DATABASE_SERVICE"
        },
        "noConnectivity": {}
        }
      ]
    }

接続プロファイルの管理

この手順では、ソースの Oracle データベース用と Cloud Storage 内の転送先のバケット用に作成した接続プロファイルを管理します。以下に例を示します。

  • 移行先の Cloud Storage 接続プロファイルに関する情報を取得する
  • この接続プロファイルを変更する。このチュートリアルでは、転送先の Cloud Storage バケットのフォルダを /root/tutorial に変更します。Datastream は、ソースのデータベースからこのフォルダにデータを転送します。
  • ソースの Oracle 接続プロファイルで Discover API 呼び出しを実行する。
  1. 転送先の Cloud Storage 接続プロファイルに関する情報を取得します。プロンプトが表示されたら、次のコマンドを入力します。

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID
  2. この接続プロファイルに関する情報が表示されることを確認します。

    {
      "name": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
      "createTime": "DATE_AND_TIME_STAMP",
      "updateTime": "DATE_AND_TIME_STAMP",
      "displayName": "DISPLAY_NAME",
      "gcsProfile": {
        "bucketName": "BUCKET_NAME",
        "rootPath": "FOLDER_PATH"
      },
      "noConnectivity": {}
    }
  3. この接続プロファイルを変更する。これを行うには、まず UPDATE 変数を設定します。この変数には、変更する接続プロファイルの値が含まれます。このチュートリアルでは、転送先のバケットのフォルダを /root/tutorial に変更します。

    変数を設定するには、プロンプトで次のコマンドを入力します。

    UPDATE="{\"gcsProfile\":{\"rootPath\":\"/root/tutorial\"}}"
  4. プロンプトが表示されたら、次のコマンドを入力します。

    curl -X PATCH -d $UPDATE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID?update_mask=gcsProfile.rootPath
  5. 次のコードの行が表示されていることを確認します。

    {
      "name": "PROJECT_PATH/operations/operation-DESTINATION_CONNECTION_PROFILE_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
        "verb": "update",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
  6. 接続プロファイルが変更されたことを確認します。プロンプトが表示されたら、次のコマンドを入力します。

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID
  7. Cloud Storage 接続プロファイルの転送先バケットのフォルダが /root/tutorial になったことを確認します。

    {
      "name": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
      "createTime": "DATE_AND_TIME_STAMP",
      "updateTime": "DATE_AND_TIME_STAMP",
      "displayName": "DISPLAY_NAME",
      "gcsProfile": {
        "bucketName": "BUCKET_NAME",
        "rootPath": "/root/tutorial"
      },
      "noConnectivity": {}
    }
  8. Datastream の Discover API を使用して、ソースの Oracle データベースのスキーマとテーブルを検出します。Datastream は、ソースの接続プロファイルを介してこのデータベースへのアクセスを提供します。

    1. Oracle データベースのスキーマを検出します。プロンプトが表示されたら、次のコマンドを入力します。

      curl -X POST -d "{\"connection_profile_name\":\"projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID\", \"oracle_rdbms\":{\"oracleSchemas\":[{\"schema\":\"ROOT\"}]}}" -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles:discover
    2. データベースのすべてのスキーマが Datastream によって取得されていることを確認します。

    3. データベース内のスキーマのテーブルを取得します。このチュートリアルでは、Discover API を使用して ROOT スキーマのテーブルを取得します。ただし、データベース内の任意のスキーマのテーブルを検出できます。

      プロンプトが表示されたら、次のコマンドを入力します。

    curl -X POST -d "{\"connection_profile_name\":\"projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID\", \"oracle_rdbms\":{\"oracleSchemas\":[{\"schema\":\"ROOT\"}]}}" -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles:discover
    1. 指定したスキーマ(このチュートリアルでは ROOT スキーマ)のすべてのテーブルが Datastream によって取得されていることを確認します。

ソースの Oracle データベース用と Cloud Storage 内の転送先バケット用の接続プロファイルを作成して管理できたので、Datastream でストリームの作成と管理を行う準備ができました。

ストリームを作成、管理する

このセクションでは、ストリームを作成して管理します。Datastream は、このストリームを使用して、データ、スキーマ、テーブルをソース データベースから移行先の Cloud Storage バケットのフォルダに転送します。

ストリームの作成と管理には以下が含まれます。

  • ストリームを検証して、ストリームが正常に実行され、すべての検証チェックに合格することを確認する。次のチェック項目があります:
    • データストリームがソースからデータをストリーミングできるようにソースが適切に構成されているかどうか
    • ストリームがソースと宛先の両方に接続できるかどうか
    • ストリームのエンドツーエンド構成
  • 次のリストを使用してストリームを作成します。
    • 許可リスト。このリストでは、Datastream が Cloud Storage の転送先バケットのフォルダに転送できるソース データベース内のテーブルとスキーマを指定します。このチュートリアルでは、/root/tutorial フォルダを指定します。
    • 拒否リスト。このリストでは、Datastream が Cloud Storage の転送先バケットのフォルダへの転送を制限するソース データベース内のテーブルとスキーマを指定します。
  • ストリームに関する情報の取得
  • ストリームの変更
  • Datastream が、ソースのデータベースから転送先 Cloud Storage バケット内のフォルダにデータ、スキーマ、テーブルを転送することができるようにストリームを開始する
  • Fetch Errors API を使用して、ストリームに関連付けられているエラーを検出する
  • ストリームを一時停止する。ストリームが一時停止されると、Datastream はソースのデータベースから転送先バケットに新しいデータを pull しなくなります。
  • 一時停止したストリームを再開すると、Datastream は引き続き転送先バケットへのデータ転送を継続します。

ストリームの作成

この手順では、ソース Oracle データベースから転送先の Cloud Storage バケットのフォルダにストリームを作成します。作成するストリームには、許可リストと拒否リストの両方が含まれます。

  1. SCHEMAS 変数を設定します。この変数は、Datastream がソース データベースから取得して Cloud Storage の転送先バケットの /root/tutorial フォルダに転送するデータとテーブルを含むスキーマを定義します。このチュートリアルでは、SCHEMAS 変数を ROOT スキーマに関連付けるように設定します。

    プロンプトが表示されたら、次のコマンドを入力します。

    SCHEMAS="{\"oracleSchemas\":[{\"schema\":\"ROOT\"}]}"
  2. プロンプトで echo $SCHEMAS | jq コマンドを入力すると、この変数に定義した ROOT スキーマが読みやすいテキストで表示されます。

  3. ストリームの作成プロンプトが表示されたら、次のコマンドを入力します。

    STREAM="{\"display_name\":\"DISPLAY_NAME\",\"source_config\":{\"source_connection_profile_name\":\"PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",\"oracle_source_config\":{\"allowlist\":$SCHEMAS,\"rejectlist\":{}}},\"destination_config\":{\"destination_connection_profile_name\":\"PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID\",\"gcs_destination_config\":{\"file_rotation_mb\":5,\"file_rotation_interval\":{\"seconds\":15},\"avro_file_format\":{}},\"backfill_all\":{}}}"
  4. プロンプトで echo $STREAM | jq コマンドを入力して、作成したストリームを読みやすいテキストで表示します。

    {
      "display_name": "DISPLAY_NAME",
      "source_config": {
        "source_connection_profile_name": "PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "oracle_source_config": {
          "allowlist": {
            "oracleSchemas": [
              {
                "schema": "ROOT"
              }
            ]
          },
          "rejectlist": {}
        }
      },
      "destination_config": {
        "destination_connection_profile_name": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
        "gcs_destination_config": {
          "file_rotation_mb": 5,
          "file_rotation_interval": {
            "seconds": 15
          },
          "avro_file_format": {}
        }
      },
      "backfill_all": {}
    }

    次の表は、ストリームの次のパラメータを理解するのに役立ちます。

    パラメータ説明
    allowlistソースのデータベースから Cloud Storage の転送先バケットのフォルダに転送されるスキーマ(テーブルとデータを含む)。このチュートリアルでは、ROOT スキーマのすべてのテーブルとデータ(このスキーマのみ)が、転送先バケットの /root/tutorial フォルダに転送されます。
    rejectlistCloud Storage の転送先バケットのフォルダに転送されないスキーマ(テーブルとデータを含む)。このチュートリアルでは、{} 値は、ソースのデータベースから転送先バケットに転送されないテーブルとデータが 1 つもないことを意味しています。
    file_rotation_mbソース データベースから Cloud Storage の転送先バケットのフォルダに転送されるデータを含むファイルのサイズ(MB 単位)。このチュートリアルでは、データはソース データベースから取得されているため、5 MB のファイルに書き込まれます。このサイズを超えるデータは、複数の 5 MB ファイルに分割されます。
    file_rotation_intervalDatastream が Cloud Storage の転送先バケットのフォルダ内の既存のファイルを閉じて、ソース データベースから転送されるデータを含む別のファイルを開くまでの秒数。このチュートリアルでは、ファイル ローテーション間隔を 15 秒に設定します。
    avro_file_format

    Datastream がソースのデータベースから Cloud Storage の転送先バケットのフォルダに転送するファイルの形式。このチュートリアルでは、このファイル形式は Avro です。

    backfill_all

    このパラメータは、過去のバックフィルに関連付けられます。このパラメータに空の辞書({})を設定すると、データストリームは次の情報のバックフィルを行います。

    • ソース データベースから宛先への履歴データ(進行中のデータの変更を含む)
    • ソースから宛先へのスキーマとテーブル
  5. ストリームを検証して、ストリームが正常に実行され、すべての検証チェックに合格することを確認します。プロンプトが表示されたら、次のコマンドを入力します。

    curl -X POST -d $STREAM -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" "https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams?stream_id=STREAM_ID&validate_only=true"
  6. {} コード行が表示されていることを確認します。これは、ストリームがすべての検証チェックに合格し、ストリームに関連するエラーがないことを示します。

  7. ストリームを送信して、ストリームを作成できるようにします。プロンプトで、次のコマンドを入力します。

    curl -X POST -d $STREAM -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams?stream_id=STREAM_ID
  8. 次のコードの行が表示されていることを確認します。

    {
      "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/streams/STREAM_ID",
        "verb": "create",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
  9. ストリームが作成されたことを確認します。プロンプトが表示されたら、次のコマンドを入力します。

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams
  10. 作成したストリームの結果が返されることを確認します。

    {
      "streams": [
        {
          "name": "PROJECT_PATH/streams/STREAM_ID",
          "createTime": "DATE_AND_TIME_STAMP",
          "updateTime": "DATE_AND_TIME_STAMP",
          "displayName": "DISPLAY_NAME",
          "sourceConfig": {
            "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
            "oracleSourceConfig": {
              "allowlist": {
                "oracleSchemas": [
                  {
                    "schema": "ROOT"
                  }
                ]
              },
              "rejectlist": {}
            }
          },
          "destinationConfig": {
            "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
          "gcsDestinationConfig": {
              "fileRotationMb": 5,
              "fileRotationInterval": "15s"
              "avroFileFormat": {}
            }
          },
          "state": "CREATED",
          "backfillAll": {}
        }
      ]
    }

ストリームを管理する

この手順では、ソースの Oracle データベースから Cloud Storage 転送先バケットのフォルダにデータを転送するために作成したストリームを使用します。以下に例を示します。

  • ストリームに関する情報の取得
  • ストリームの変更
  • ストリームを開始する
  • Fetch Errors API を使用して、ストリームに関連付けられているエラーを検出する
  • ストリームの一時停止と再開
  1. ストリームに関する情報を取得します。プロンプトが表示されたら、次のコマンドを入力します。

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID
  2. このストリームに関する情報が表示されていることを確認します。

    {
      "name": "PROJECT_PATH/streams/STREAM_ID",
      "createTime": "DATE_AND_TIME_STAMP",
      "updateTime": "DATE_AND_TIME_STAMP",
      "displayName": "DISPLAY_NAME",
      "sourceConfig": {
        "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "oracleSourceConfig": {
          "allowlist": {
            "oracleSchemas": [
              {
                "schema": "ROOT"
              }
            ]
          },
          "rejectlist": {}
         }
        },
        "destinationConfig": {
          "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
          "gcsDestinationConfig": {
            "fileRotationMb": 5,
            "fileRotationInterval": "15s"
            "avroFileFormat": {}
          }
        },
        "state": "CREATED",
        "backfillAll": {}
      }
  3. このストリームを変更します。これを行うには、まず UPDATE 変数を設定します。この変数には、変更するストリームの値が含まれます。このチュートリアルでは、ソース データベースから Cloud Storage 転送先バケットのフォルダに転送されるデータを含むファイルのサイズ(MB 単位)を 5 MB から 100 バイトに変更します。データはソース データベースから取得されているため、100 MB のファイルに書き込まれます。このサイズを超えるデータは、複数の 100 MB ファイルに分割されます。

    変数を設定するには、プロンプトで次のコマンドを入力します。

    UPDATE="{\"destination_config\":{\"gcs_destination_config\":{\"file_rotation_mb\":100}}}"
  4. プロンプトが表示されたら、次のコマンドを入力します。

    curl -X PATCH -d $UPDATE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID/?update_mask=destination_config.gcs_destination_config.file_rotation_mb
  5. 次のコードの行が表示されていることを確認します。

    {
      "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/streams/STREAM_ID",
        "verb": "update",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
  6. ストリームが変更されたことを確認します。プロンプトが表示されたら、次のコマンドを入力します。

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID
  7. Cloud Storage 接続プロファイルの fileRotationMb パラメータの値が、100 になっていることを確認します。

    {
      "name": "PROJECT_PATH/streams/STREAM_ID",
      "createTime": "DATE_AND_TIME_STAMP",
      "updateTime": "DATE_AND_TIME_STAMP",
      "displayName": "DISPLAY_NAME",
      "sourceConfig": {
        "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "oracleSourceConfig": {
          "allowlist": {
            "oracleSchemas": [
              {
                "schema": "ROOT"
              }
            ]
          },
          "rejectlist": {}
         }
        },
        "destinationConfig": {
          "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
          "gcsDestinationConfig": {
            "fileRotationMb": 100,
            "fileRotationInterval": "15s"
            "avroFileFormat": {}
          }
        },
        "state": "CREATED",
        "backfillAll": {}
      }
  8. ストリームを開始する手順は次のとおりです。

    1. UPDATE 変数を変更します。プロンプトが表示されたら、次のコマンドを入力します。

      UPDATE="{\"state\":\"RUNNING\"}"
    2. 次のコマンドを入力します。

      curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID?updateMask=state
  9. 次のコードの行が表示されていることを確認します。

    {
      "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/streams/STREAM_ID",
        "verb": "start",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
  10. 数分後、ストリームに関する情報を取得して、ストリームが開始されたことを確認します。

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID
  11. ストリームの状態が CREATED から RUNNING に戻ったことを確認します。

    {
      "name": "PROJECT_PATH/streams/STREAM_ID",
      "createTime": "DATE_AND_TIME_STAMP",
      "updateTime": "DATE_AND_TIME_STAMP",
      "displayName": "DISPLAY_NAME",
      "sourceConfig": {
        "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "oracleSourceConfig": {
          "allowlist": {
            "oracleSchemas": [
              {
                "schema": "ROOT"
              }
            ]
          },
          "rejectlist": {}
        }
      },
      "destinationConfig": {
        "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
        "gcsDestinationConfig": {
          "fileRotationMb": 100,
          "fileRotationInterval": "15s"
          "avroFileFormat": {}
        }
      },
      "state": "RUNNING",
      "backfillAll": {}
    }
  12. Fetch Errors API を使用して、ストリームに関連付けられたエラーを取得します。

    1. プロンプトが表示されたら、次のコマンドを入力します。

      curl -X POST -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID:fetchErrors
    2. 次のコードの行が表示されていることを確認します。

        {
          "name": "PROJECT_PATH/operations/operation-FETCH_ERRORS_OPERATION_ID",
          "metadata": {
            "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
            "createTime": "DATE_AND_TIME_STAMP",
            "target": "PROJECT_PATH/streams/STREAM_ID",
            "verb": "fetchErrors",
            "requestedCancellation": false,
            "apiVersion": "DATASTREAM_API_VERSION"
          },
          "done": false
        }
        

    3. プロンプトが表示されたら、次のコマンドを入力します。

      curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/operations/operation-FETCH_ERRORS_OPERATION_ID
    4. 次のコードの行が表示されていることを確認します。

        {
          "name": "PROJECT_PATH/operations/operation-FETCH_ERRORS_OPERATION_ID",
          "metadata": {
            "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
            "createTime": "DATE_AND_TIME_STAMP",
            "endTime": "DATE_AND_TIME_STAMP",
            "target": "PROJECT_PATH/streams/STREAM_ID",
            "verb": "fetchErrors",
            "requestedCancellation": false,
            "apiVersion": "DATASTREAM_API_VERSION"
          },
          "done": true,
          "response": {
            "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.FetchErrorsResponse"
          }
        }
        

  13. 配信を一時停止する手順は次のとおりです。

    1. UPDATE 変数を変更します。プロンプトが表示されたら、次のコマンドを入力します。

      UPDATE="{\"state\":\"PAUSED\"}"
    2. 次のコマンドを入力します。

      curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID?updateMask=state
  14. 次のコードの行が表示されていることを確認します。

    {
      "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/streams/STREAM_ID",
        "verb": "start",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
  15. ストリームに関する情報を取得して、ストリームが一時停止していることを確認します。

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID
  16. ストリームの状態が RUNNING から PAUSED に戻ったことを確認します。

    {
      "name": "PROJECT_PATH/streams/STREAM_ID",
      "createTime": "DATE_AND_TIME_STAMP",
      "updateTime": "DATE_AND_TIME_STAMP",
      "displayName": "DISPLAY_NAME",
      "sourceConfig": {
        "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "oracleSourceConfig": {
          "allowlist": {
            "oracleSchemas": [
              {
                "schema": "ROOT"
              }
            ]
          },
          "rejectlist": {}
        }
      },
      "destinationConfig": {
        "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
        "gcsDestinationConfig": {
          "fileRotationMb": 100,
          "fileRotationInterval": "15s"
          "avroFileFormat": {}
        }
      },
      "state": "PAUSED",
      "backfillAll": {}
    }
  17. 一時停止したストリームを再開します。手順は次のとおりです。

    1. UPDATE 変数を変更します。プロンプトが表示されたら、次のコマンドを入力します。

      UPDATE="{\"state\":\"RUNNING\"}"
    2. 次のコマンドを入力します。

      curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID?updateMask=state
  18. 次のコードの行が表示されていることを確認します。

    {
      "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/streams/STREAM_ID",
        "verb": "start",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
  19. 数秒後に、ストリームに関する情報を取得して、ストリームが再度動作していることを確認します。

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID
  20. ストリームの状態が PAUSED から RUNNING に戻ったことを確認します。

    {
      "name": "PROJECT_PATH/streams/STREAM_ID",
      "createTime": "DATE_AND_TIME_STAMP",
      "updateTime": "DATE_AND_TIME_STAMP",
      "displayName": "DISPLAY_NAME",
      "sourceConfig": {
        "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "oracleSourceConfig": {
          "allowlist": {
            "oracleSchemas": [
              {
                "schema": "ROOT"
              }
            ]
          },
          "rejectlist": {}
        }
      },
      "destinationConfig": {
        "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
        "gcsDestinationConfig": {
          "fileRotationMb": 100,
          "fileRotationInterval": "15s"
          "avroFileFormat": {}
        }
      },
      "state": "RUNNING",
      "backfillAll": {}
    }

ストリームを作成して管理し、ストリームに関連するエラーがなく、ストリームの状態が RUNNING であることを確認したので、ソースのデータベースから Cloud Storage の転送先バケット内のフォルダにデータを転送できることを確認する準備ができました。

ストリームを確認する

この手順では、Datastream で以下のことができることを確認します。

  • ソースの Oracle データベースの ROOT スキーマに関連付けられているすべてのテーブルからのデータを、Cloud Storage の転送先バケットの /root/tutorial フォルダに転送します。
  • データを Avro ファイル形式に変換します。
  1. Cloud Storage の [ストレージ ブラウザ] ページに移動します。

    [ストレージ ブラウザ] ページに移動

  2. バケットを含むリンクをクリックします。

  3. [オブジェクト] タブが有効になっていない場合は、クリックします。

  4. root フォルダをクリックしてから、tutorial フォルダをクリックします。

  5. ソースの Oracle データベースの ROOT スキーマのテーブルを表すフォルダが表示されていることを確認します。

  6. いずれかのテーブル フォルダをクリックして、テーブルに関連付けられているデータを表示します。

  7. データを示すファイルをクリックしてから、[ダウンロード] をクリックします。

  8. このファイルを Avro ツール(Avro Viewer など)で開き、コンテンツが読めることを確認します。これにより、Datastream がデータを Avro ファイル形式に変換したことも確認できます。