JDBC to BigQuery テンプレートは、リレーショナル データベース テーブルから既存の BigQuery テーブルにデータをコピーするバッチ パイプラインです。このパイプラインは、JDBC を使用してリレーショナル データベースに接続します。このテンプレートを使用すると、使用可能な JDBC ドライバがある任意のリレーショナル データベースから BigQuery にデータをコピーできます。
保護をさらに強化するために、Cloud KMS 鍵と一緒に、Cloud KMS 鍵で暗号化された Base64 でエンコードされたユーザー名、パスワード、接続文字列パラメータを渡すことができます。ユーザー名、パスワード、接続文字列パラメータの暗号化の詳細については、Cloud KMS API 暗号化エンドポイントをご覧ください。
パイプラインの要件
- リレーショナル データベース用の JDBC ドライバが使用可能である必要があります。
- パイプラインを実行する前に、BigQuery テーブルが存在する必要があります。
- BigQuery テーブルに互換性のあるスキーマが必要です。
- リレーショナル データベースは、Dataflow が実行されているサブネットからアクセス可能である必要があります。
テンプレートのパラメータ
必須パラメータ
- driverJars: ドライバ JAR ファイルのカンマ区切りのリスト。例:
gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar - driverClassName: JDBC ドライバのクラス名。例:
com.mysql.jdbc.Driver - connectionUrl: JDBC 接続 URL の文字列。例:
jdbc:mysql://some-host:3306/sampledb。この値は、Cloud KMS 鍵で暗号化され、Base64 でエンコードされた文字列として渡すことができます。Base64 でエンコードされた文字列から空白文字を削除します。Oracle の RAC 以外のデータベース接続文字列(jdbc:oracle:thin:@some-host:<port>:<sid>)と Oracle RAC データベース接続文字列(jdbc:oracle:thin:@//some-host[:<port>]/<service_name>)の違いに注意してください。例:jdbc:mysql://some-host:3306/sampledb - outputTable: BigQuery 出力テーブルの場所。例:
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME> - bigQueryLoadingTemporaryDirectory: BigQuery 読み込みプロセスの一時ディレクトリ。例:
gs://your-bucket/your-files/temp_dir
オプション パラメータ
- connectionProperties: JDBC 接続に使用するプロパティ文字列。文字列の形式は
[propertyName=property;]*にする必要があります。詳細については、MySQL ドキュメントの構成プロパティ(https://dev.mysql.com/doc/connector-j/en/connector-j-reference-configuration-properties.html)をご覧ください。例:unicode=true;characterEncoding=UTF-8 - username: JDBC 接続に使用するユーザー名。Cloud KMS 鍵で暗号化された文字列として渡すことができます。または、projects/{project}/secrets/{secret}/versions/{secret_version} 形式の Secret Manager シークレットとして渡すこともできます。
- password: JDBC 接続に使用するパスワード。Cloud KMS 鍵で暗号化された文字列として渡すことができます。または、projects/{project}/secrets/{secret}/versions/{secret_version} 形式の Secret Manager シークレットとして渡すこともできます。
- query: データを抽出するためにソースで実行されるクエリ。一部の JDBC SQL 型と BigQuery 型は同じ名前を共有していますが、いくつかの違いがあります。重要な SQL と BigQuery のデータ型のマッピングには、
DATETIME --> TIMESTAMPなどがあります。スキーマが一致しない場合は、型キャストが必要になることがあります。例:select * from sampledb.sample_table - KMSEncryptionKey: ユーザー名、パスワード、接続文字列の復号に使用する Cloud KMS 暗号鍵。Cloud KMS 鍵を渡す場合は、ユーザー名、パスワード、接続文字列も暗号化する必要があります。例:
projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key - useColumnAlias:
trueに設定すると、パイプラインは列名の代わりに列エイリアス(AS)を使用して、行を BigQuery にマッピングします。デフォルトはfalseです。 - isTruncate:
trueに設定すると、パイプラインは BigQuery にデータを読み込む前にデータを切り捨てます。デフォルトはfalseで、パイプラインはデータを追加します。 - partitionColumn:
partitionColumnがtableとともに指定されている場合、JdbcIO は範囲を使用して同じテーブルに対して複数のクエリ インスタンス(サブクエリ)を実行し、テーブルを並列で読み取ります。現在、Longパーティション列とDateTimeパーティション列をサポートしています。列のタイプはpartitionColumnTypeで渡します。 - partitionColumnType:
partitionColumnのタイプ。longまたはdatetimeのいずれかを指定します。デフォルトは long です。 - table: パーティションの使用時に読み取るテーブル。このパラメータは、かっこ内のサブクエリも受け入れます。例:
(select id, name from Person) as subq - numPartitions: パーティションの数。この値は、下限と上限とともに、パーティション列を均等に分割するための生成された
WHERE句式でパーティション ストライドを形成します。入力が1より小さい場合、数値は1に設定されます。 - lowerBound: パーティション スキームで使用する下限。指定しない場合、サポートされているタイプのこの値は Apache Beam によって自動的に推測されます。
datetimepartitionColumnType は、yyyy-MM-dd HH:mm:ss.SSSZ形式の下限を受け入れます。例:2024-02-20 07:55:45.000+03:30 - upperBound: パーティション スキームで使用する上限。指定しない場合、サポートされているタイプのこの値は Apache Beam によって自動的に推測されます。
datetimepartitionColumnType は、yyyy-MM-dd HH:mm:ss.SSSZ形式の上限を受け入れます。例:2024-02-20 07:55:45.000+03:30 - fetchSize: データベースから一度に取得される行数。パーティション分割される読み取りには使用されません。デフォルトは 50,000 です。
- createDisposition: 使用する BigQuery CreateDisposition。例:
CREATE_IF_NEEDEDまたはCREATE_NEVER。デフォルトは CREATE_NEVER です。 - bigQuerySchemaPath: BigQuery JSON スキーマの Cloud Storage パス。
createDispositionがCREATE_IF_NEEDEDに設定されている場合は、このパラメータを指定する必要があります。例:gs://your-bucket/your-schema.json - outputDeadletterTable: 出力テーブルに到達できなかったメッセージに使用する BigQuery テーブル(
"PROJECT_ID:DATASET_NAME.TABLE_NAME"形式)。テーブルが存在しない場合は、パイプラインの実行時に作成されます。このパラメータが指定されていない場合、書き込みエラーが発生するとパイプラインは失敗します。このパラメータは、useStorageWriteApiまたはuseStorageWriteApiAtLeastOnceが true に設定されている場合にのみ指定できます。 - disabledAlgorithms: 無効にするためのカンマ区切りのアルゴリズム。この値が
noneに設定されている場合、アルゴリズムは無効になりません。デフォルトで無効になっているアルゴリズムには脆弱性やパフォーマンスの問題が存在する可能性があるため、このパラメータは慎重に使用してください。例:SSLv3, RC4 - extraFilesToStage: ワーカーにステージングするファイルのカンマ区切りの Cloud Storage パスまたは Secret Manager シークレット。これらのファイルは、各ワーカーの /extra_files ディレクトリに保存されます。例:
gs://<BUCKET_NAME>/file.txt,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<VERSION_ID> - useStorageWriteApi:
trueの場合、パイプラインでは BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)を使用します。デフォルト値はfalseです。詳細については、Storage Write API の使用(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)をご覧ください。 - useStorageWriteApiAtLeastOnce: Storage Write API を使用する場合は、書き込みセマンティクスを指定します。「1 回以上」のセマンティクス(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)を使用するには、このパラメータを
trueに設定します。「正確に 1 回」のセマンティクスを使用するには、パラメータをfalseに設定します。このパラメータは、useStorageWriteApiがtrueの場合にのみ適用されます。デフォルト値はfalseです。
テンプレートを実行する
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは
us-central1です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、[ the JDBC to BigQuery with BigQuery Storage API support template] を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Jdbc_to_BigQuery_Flex \ --project=PROJECT_ID \ --region=REGION_NAME \ --parameters \ driverJars=DRIVER_JARS,\ driverClassName=DRIVER_CLASS_NAME,\ connectionURL=CONNECTION_URL,\ outputTable=OUTPUT_TABLE,\ bigQueryLoadingTemporaryDirectory=BIG_QUERY_LOADING_TEMPORARY_DIRECTORY,\
次のように置き換えます。
JOB_NAME: 一意の任意のジョブ名VERSION: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内の対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
REGION_NAME: Dataflow ジョブをデプロイするリージョン(例:us-central1)DRIVER_JARS: カンマで区切った JDBC ドライバの Cloud Storage パスDRIVER_CLASS_NAME: JDBC ドライバのクラス名CONNECTION_URL: JDBC 接続 URL 文字列OUTPUT_TABLE: BigQuery 出力テーブルBIG_QUERY_LOADING_TEMPORARY_DIRECTORY: BigQuery 読み込みプロセスで使用する一時ディレクトリ
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launchParameter": { "jobName": "JOB_NAME", "parameters": { "driverJars": "DRIVER_JARS", "driverClassName": "DRIVER_CLASS_NAME", "connectionURL": "CONNECTION_URL", "outputTable": "OUTPUT_TABLE", "bigQueryLoadingTemporaryDirectory": "BIG_QUERY_LOADING_TEMPORARY_DIRECTORY", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Jdbc_to_BigQuery_Flex", "environment": { "maxWorkers": "10" } } }
次のように置き換えます。
PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクト IDJOB_NAME: 一意の任意のジョブ名VERSION: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内の対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
LOCATION: Dataflow ジョブをデプロイするリージョン(例:us-central1)DRIVER_JARS: カンマで区切った JDBC ドライバの Cloud Storage パスDRIVER_CLASS_NAME: JDBC ドライバのクラス名CONNECTION_URL: JDBC 接続 URL 文字列OUTPUT_TABLE: BigQuery 出力テーブルBIG_QUERY_LOADING_TEMPORARY_DIRECTORY: BigQuery 読み込みプロセスで使用する一時ディレクトリ
次のステップ
- Dataflow テンプレートについて学習する。
- Google 提供のテンプレートのリストを確認する。