Pub/Sub to Elasticsearch テンプレートは、Pub/Sub サブスクリプションからメッセージを読み取り、ユーザー定義関数(UDF)を実行して、それらをドキュメントとして Elasticsearch に書き込むストリーミング パイプラインです。Dataflow テンプレートは、Elasticsearch のデータ ストリーム機能を使用して、複数のインデックスにまたがる時系列データを保存し、リクエストに対して単一の名前付きリソースを提供します。データ ストリームは、ログ、指標、トレースに適しています。また、継続的に生成され、Pub/Sub に保存されるデータにも適しています。
テンプレートは、logs-gcp.DATASET-NAMESPACE という名前のデータ ストリームを作成します。
- DATASET は、
datasetテンプレート パラメータの値です。指定しない場合はpubsubになります。 - NAMESPACE は、
namespaceテンプレート パラメータの値です。指定しない場合はdefaultになります。
パイプラインの要件
- ソース Pub/Sub サブスクリプションが存在し、メッセージが有効な JSON 形式でエンコードされていること。
- Google Cloud Platform インスタンス上または Elasticsearch バージョン 7.0 以上の Elastic Cloud 上の公開アクセス可能な Elasticsearch ホスト。詳細については、Elastic の Google Cloud 統合をご覧ください。
- エラー出力用の Pub/Sub トピック。
テンプレートのパラメータ
必須パラメータ
- inputSubscription: 入力を使用する Pub/Sub サブスクリプション。例:
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME> - errorOutputTopic: 失敗したレコードを公開するための Pub/Sub 出力トピック。形式:
projects/<PROJECT_ID>/topics/<TOPIC_NAME> - connectionUrl: Elasticsearch URL(
https://hostname:[port]形式)。Elastic Cloud を使用している場合は、CloudID を指定します。例:https://elasticsearch-host:9200 - apiKey: 認証に使用する Base64 でエンコードされた API キー。
オプション パラメータ
- dataset: Pub/Sub 経由で送信されるログのタイプ。直ちに使用を開始できるダッシュボードが用意されています。既知のログタイプ値は
audit、vpcflow、firewallです。デフォルトはpubsubです。 - namespace: 環境(開発、生産、QA)、チーム、戦略事業部門などの任意のグループ。デフォルト:
default - elasticsearchTemplateVersion: Dataflow テンプレート バージョン ID。通常は Google Cloud によって定義されます。デフォルトは 1.0.0 です。
- javascriptTextTransformGcsPath: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例:
gs://my-bucket/my-udfs/my_file.js - javascriptTextTransformFunctionName: 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数コードが
myTransform(inJson) { /*...do stuff...*/ }の場合、関数名はmyTransformです。JavaScript UDF の例については、UDF の例(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)をご覧ください。 - javascriptTextTransformReloadIntervalMinutes: UDF を再読み込みする頻度を指定します(分単位)。値が 0 より大きい場合、Dataflow は Cloud Storage 内の UDF ファイルを定期的にチェックし、ファイルが変更された場合は UDF を再読み込みします。このパラメータを使用すると、パイプラインの実行中にジョブを再起動することなく、UDF を更新できます。値が
0の場合、UDF の再読み込みは無効になります。デフォルト値は0です。 - elasticsearchUsername: 認証に使用する Elasticsearch のユーザー名。指定すると、
apiKeyの値は無視されます。 - elasticsearchPassword: 認証に使用する Elasticsearch のパスワード。指定すると、
apiKeyの値は無視されます。 - batchSize: バッチサイズ(ドキュメント数)。デフォルトは
1000です。 - batchSizeBytes: バッチサイズ(バイト数)。デフォルト値は
5242880(5 MB)です。 - maxRetryAttempts: 再試行の最大回数。0 より大きい値にする必要があります。デフォルトは
no retriesです。 - maxRetryDuration: 最大再試行時間(ミリ秒)。0 より大きい値にする必要があります。デフォルトは
no retriesです。 - propertyAsIndex: インデックスに登録されているドキュメント内のプロパティ。このプロパティの値は、一括リクエストでドキュメントに含まれる
_indexメタデータを指定します。_indexUDF よりも優先されます。デフォルトはnoneです。 - javaScriptIndexFnGcsPath: 一括リクエストでドキュメントに含まれる
_indexメタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルトはnoneです。 - javaScriptIndexFnName: 一括リクエストでドキュメントに含まれる
_indexメタデータを指定する UDF JavaScript 関数の名前。デフォルトはnoneです。 - propertyAsId: インデックスに登録されているドキュメント内のプロパティ。このプロパティの値は、一括リクエストでドキュメントに含まれる
_idメタデータを指定します。_idUDF よりも優先されます。デフォルトはnoneです。 - javaScriptIdFnGcsPath: 一括リクエストでドキュメントに含まれる
_idメタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルトはnoneです。 - javaScriptIdFnName: 一括リクエストでドキュメントに含まれる
_idメタデータを指定する UDF JavaScript 関数の名前。デフォルトはnoneです。 - javaScriptTypeFnGcsPath: 一括リクエストでドキュメントに含まれる
_typeメタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルトはnoneです。 - javaScriptTypeFnName: 一括リクエストでドキュメントに含まれる
_typeメタデータを指定する UDF JavaScript 関数の名前。デフォルトはnoneです。 - javaScriptIsDeleteFnGcsPath: ドキュメントを挿入や更新ではなく削除するかどうかを決定する関数の JavaScript UDF ソースへの Cloud Storage パス。この関数は、文字列値
trueまたはfalseを返します。デフォルトはnoneです。 - javaScriptIsDeleteFnName: ドキュメントを挿入または更新する代わりに削除するかどうかを決定する UDF JavaScript 関数の名前。この関数は、文字列値
trueまたはfalseを返します。デフォルトはnoneです。 - usePartialUpdate: Elasticsearch リクエストで部分的な更新(作成やインデックス登録ではなく更新、部分的なドキュメントを許可する)を使用するかどうか。デフォルトは
falseです。 - bulkInsertMethod: Elasticsearch 一括リクエストで
INDEX(インデックス登録、upserts を許可する)またはCREATE(作成、duplicate _id でエラー)を使用するかどうか。デフォルトはCREATEです。 - trustSelfSignedCerts: 自己署名証明書を信頼するかどうか。インストールされた Elasticsearch インスタンスに自己署名証明書が存在する場合があります。SSL 証明書の検証をバイパスするには、この値を True に設定します(デフォルトは
falseです)。 - disableCertificateValidation:
trueの場合、自己署名 SSL 証明書を信頼します。Elasticsearch インスタンスには自己署名証明書が存在する場合があります。証明書の検証をバイパスするには、このパラメータをtrueに設定します。デフォルトはfalseです。 - apiKeyKMSEncryptionKey: API キーを復号するための Cloud KMS 鍵。
apiKeySourceがKMSに設定されている場合、このパラメータは必須です。このパラメータを指定する場合は、暗号化されたapiKey文字列を渡します。KMS API 暗号化エンドポイントを使用してパラメータを暗号化します。キーにはprojects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>の形式を使用します。https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt をご覧ください。例:projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name - apiKeySecretId: API キーの Secret Manager シークレット ID。
apiKeySourceがSECRET_MANAGERに設定されている場合は、このパラメータを指定します。projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example,projects/your-project-id/secrets/your-secret/versions/your-secret-version` の形式を使用します。 - apiKeySource: API キーのソース。使用できる値は
PLAINTEXT、KMS、SECRET_MANAGERです。Secret Manager または KMS を使用する場合、このパラメータは必須です。apiKeySourceがKMSに設定されている場合は、apiKeyKMSEncryptionKeyと暗号化された apiKey を指定する必要があります。apiKeySourceがSECRET_MANAGERに設定されている場合は、apiKeySecretIdを指定する必要があります。apiKeySourceがPLAINTEXTに設定されている場合は、apiKeyを指定する必要があります。デフォルトは PLAINTEXT です。 - socketTimeout: 設定すると、Elastic RestClient のデフォルトの最大再試行タイムアウトとデフォルトのソケット タイムアウト(30,000 ms)が上書きされます。
ユーザー定義の関数
次のように、このテンプレートでは、パイプライン内の複数のポイントでユーザー定義関数(UDF)をサポートしています。詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。
テキスト変換関数
Pub/Sub メッセージを Elasticsearch ドキュメントに変換します。
テンプレートのパラメータ:
javascriptTextTransformGcsPath: JavaScript ファイルの Cloud Storage URI。javascriptTextTransformFunctionName: JavaScript 関数の名前。
関数の仕様:
- 入力: JSON 文字列としてシリアル化された Pub/Sub メッセージ データ フィールド。
- 出力: Elasticsearch に挿入する文字列化された JSON ドキュメント。
インデックス関数
ドキュメントが属するインデックスを返します。
テンプレートのパラメータ:
javaScriptIndexFnGcsPath: JavaScript ファイルの Cloud Storage URI。javaScriptIndexFnName: JavaScript 関数の名前。
関数の仕様:
- 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
- 出力: ドキュメントの
_indexメタデータ フィールドの値。
ドキュメント ID 関数
ドキュメント ID を返します。
テンプレートのパラメータ:
javaScriptIdFnGcsPath: JavaScript ファイルの Cloud Storage URI。javaScriptIdFnName: JavaScript 関数の名前。
関数の仕様:
- 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
- 出力: ドキュメントの
_idメタデータ フィールドの値。
ドキュメント削除関数
ドキュメントを削除するかどうかを指定します。この関数を使用するには、一括挿入モードを INDEX に設定し、ドキュメント ID 関数を指定します。
テンプレートのパラメータ:
javaScriptIsDeleteFnGcsPath: JavaScript ファイルの Cloud Storage URI。javaScriptIsDeleteFnName: JavaScript 関数の名前。
関数の仕様:
- 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
- 出力: ドキュメントを削除する場合は文字列
"true"を、ドキュメントをアップサートする場合は"false"を返します。
マッピング タイプ関数
ドキュメントのマッピング タイプを返します。
テンプレートのパラメータ:
javaScriptTypeFnGcsPath: JavaScript ファイルの Cloud Storage URI。javaScriptTypeFnName: JavaScript 関数の名前。
関数の仕様:
- 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
- 出力: ドキュメントの
_typeメタデータ フィールドの値。
テンプレートを実行する
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは
us-central1です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、[ the Pub/Sub to Elasticsearch template] を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Elasticsearch_Flex \ --parameters \ inputSubscription=SUBSCRIPTION_NAME,\ connectionUrl=CONNECTION_URL,\ dataset=DATASET,\ namespace=NAMESPACE,\ apiKey=APIKEY,\ errorOutputTopic=ERROR_OUTPUT_TOPIC
次のように置き換えます。
PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクト IDJOB_NAME: 一意の任意のジョブ名REGION_NAME: Dataflow ジョブをデプロイするリージョン(例:us-central1)VERSION: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内の対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
ERROR_OUTPUT_TOPIC: エラー出力用の Pub/Sub トピック。SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名CONNECTION_URL: Elasticsearch の URLDATASET: ログタイプNAMESPACE: データセットの名前空間APIKEY: 認証用に Base64 でエンコードされた API キー
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "SUBSCRIPTION_NAME", "connectionUrl": "CONNECTION_URL", "dataset": "DATASET", "namespace": "NAMESPACE", "apiKey": "APIKEY", "errorOutputTopic": "ERROR_OUTPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_Elasticsearch_Flex", } }
次のように置き換えます。
PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクト IDJOB_NAME: 一意の任意のジョブ名LOCATION: Dataflow ジョブをデプロイするリージョン(例:us-central1)VERSION: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内の対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
ERROR_OUTPUT_TOPIC: エラー出力用の Pub/Sub トピック。SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名CONNECTION_URL: Elasticsearch の URLDATASET: ログタイプNAMESPACE: データセットの名前空間APIKEY: 認証用に Base64 でエンコードされた API キー
次のステップ
- Dataflow テンプレートについて学習する。
- Google 提供のテンプレートのリストを確認する。