The Cloud Storage to Elasticsearch template is a batch pipeline that reads data from CSV files stored in a Cloud Storage bucket and writes the data into Elasticsearch as JSON documents.
Pipeline requirements
- The Cloud Storage bucket must exist.
- A Elasticsearch host on a Google Cloud Platform instance or on Elasticsearch Cloud that is accessible from Dataflow must exist.
- A BigQuery table for error output must exist.
CSV schema
If the CSV files contain headers, set the containsHeaders
template parameter to true.
Otherwise, create a JSON schema file that describes the data. Specify the
Cloud Storage URI of the schema file in the jsonSchemaPath
template parameter. The following example shows a JSON schema:
[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]
Alternatively, you can provide a user-defined function (UDF) that parses the CSV text and outputs Elasticsearch documents.
Template parameters
Required parameters
- deadletterTable: The BigQuery dead-letter table to send failed inserts to. For example,
your-project:your-dataset.your-table-name. - inputFileSpec: The Cloud Storage file pattern to search for CSV files. For example,
gs://mybucket/test-*.csv. - connectionUrl: The Elasticsearch URL in the format
https://hostname:[port]. If using Elastic Cloud, specify the CloudID. For example,https://elasticsearch-host:9200. - apiKey: The Base64-encoded API key to use for authentication.
- index: The Elasticsearch index that the requests are issued to. For example,
my-index.
Optional parameters
- inputFormat: The input file format. Defaults to
CSV. - containsHeaders: Input CSV files contain a header record (true/false). Only required if reading CSV files. Defaults to: false.
- delimiter: The column delimiter of the input text files. Default:
,For example,,. - csvFormat: CSV format specification to use for parsing records. Default is:
Default. See https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html for more details. Must match format names exactly found at: https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Predefined.html. - jsonSchemaPath: The path to the JSON schema. Defaults to
null. For example,gs://path/to/schema. - largeNumFiles: Set to true if number of files is in the tens of thousands. Defaults to
false. - csvFileEncoding: The CSV file character encoding format. Allowed values are
US-ASCII,ISO-8859-1,UTF-8, andUTF-16. Defaults to: UTF-8. - logDetailedCsvConversionErrors: Set to
trueto enable detailed error logging when CSV parsing fails. Note that this may expose sensitive data in the logs (e.g., if the CSV file contains passwords). Default:false. - elasticsearchUsername: The Elasticsearch username to authenticate with. If specified, the value of
apiKeyis ignored. - elasticsearchPassword: The Elasticsearch password to authenticate with. If specified, the value of
apiKeyis ignored. - batchSize: The batch size in number of documents. Defaults to
1000. - batchSizeBytes: The batch size in number of bytes. Defaults to
5242880(5mb). - maxRetryAttempts: The maximum number of retry attempts. Must be greater than zero. Defaults to
no retries. - maxRetryDuration: The maximum retry duration in milliseconds. Must be greater than zero. Defaults to
no retries. - propertyAsIndex: The property in the document being indexed whose value specifies
_indexmetadata to include with the document in bulk requests. Takes precedence over an_indexUDF. Defaults tonone. - javaScriptIndexFnGcsPath: The Cloud Storage path to the JavaScript UDF source for a function that specifies
_indexmetadata to include with the document in bulk requests. Defaults tonone. - javaScriptIndexFnName: The name of the UDF JavaScript function that specifies
_indexmetadata to include with the document in bulk requests. Defaults tonone. - propertyAsId: A property in the document being indexed whose value specifies
_idmetadata to include with the document in bulk requests. Takes precedence over an_idUDF. Defaults tonone. - javaScriptIdFnGcsPath: The Cloud Storage path to the JavaScript UDF source for the function that specifies
_idmetadata to include with the document in bulk requests. Defaults tonone. - javaScriptIdFnName: The name of the UDF JavaScript function that specifies the
_idmetadata to include with the document in bulk requests. Defaults tonone. - javaScriptTypeFnGcsPath: The Cloud Storage path to the JavaScript UDF source for a function that specifies
_typemetadata to include with documents in bulk requests. Defaults tonone. - javaScriptTypeFnName: The name of the UDF JavaScript function that specifies the
_typemetadata to include with the document in bulk requests. Defaults tonone. - javaScriptIsDeleteFnGcsPath: The Cloud Storage path to the JavaScript UDF source for the function that determines whether to delete the document instead of inserting or updating it. The function returns a string value of
trueorfalse. Defaults tonone. - javaScriptIsDeleteFnName: The name of the UDF JavaScript function that determines whether to delete the document instead of inserting or updating it. The function returns a string value of
trueorfalse. Defaults tonone. - usePartialUpdate: Whether to use partial updates (update rather than create or index, allowing partial documents) with Elasticsearch requests. Defaults to
false. - bulkInsertMethod: Whether to use
INDEX(index, allows upserts) orCREATE(create, errors on duplicate _id) with Elasticsearch bulk requests. Defaults toCREATE. - trustSelfSignedCerts: Whether to trust self-signed certificate or not. An Elasticsearch instance installed might have a self-signed certificate, Enable this to true to by-pass the validation on SSL certificate. (Defaults to:
false). - disableCertificateValidation: If
true, trust the self-signed SSL certificate. An Elasticsearch instance might have a self-signed certificate. To bypass validation for the certificate, set this parameter totrue. Defaults tofalse. - apiKeyKMSEncryptionKey: The Cloud KMS key to decrypt the API key. This parameter is required if the
apiKeySourceis set toKMS. If this parameter is provided, pass in an encryptedapiKeystring. Encrypt parameters using the KMS API encrypt endpoint. For the key, use the formatprojects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>. See: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt For example,projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name. - apiKeySecretId: The Secret Manager secret ID for the apiKey. If the
apiKeySourceis set toSECRET_MANAGER, provide this parameter. Use the formatprojects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example,projects/your-project-id/secrets/your-secret/versions/your-secret-version`. - apiKeySource: The source of the API key. Allowed values are
PLAINTEXT,KMSorandSECRET_MANAGER. This parameter is required when you use Secret Manager or KMS. IfapiKeySourceis set toKMS,apiKeyKMSEncryptionKeyand encrypted apiKey must be provided. IfapiKeySourceis set toSECRET_MANAGER,apiKeySecretIdmust be provided. IfapiKeySourceis set toPLAINTEXT,apiKeymust be provided. Defaults to: PLAINTEXT. - socketTimeout: If set, overwrites the default max retry timeout and default socket timeout (30000ms) in the Elastic RestClient.
- javascriptTextTransformGcsPath: The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) to use. For example,
gs://my-bucket/my-udfs/my_file.js. - javascriptTextTransformFunctionName: The name of the JavaScript user-defined function (UDF) to use. For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }, then the function name ismyTransform. For sample JavaScript UDFs, see UDF Examples (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
User-defined functions
This template supports user-defined functions (UDFs) at several points in the pipeline, described below. For more information, see Create user-defined functions for Dataflow templates.
Text transform function
Transforms the CSV data into an Elasticsearch document.
Template parameters:
javascriptTextTransformGcsPath: the Cloud Storage URI of the JavaScript file.javascriptTextTransformFunctionName: the name of the JavaScript function.
Function specification:
- Input: a single line from an input CSV file.
- Output: a stringified JSON document to insert into Elasticsearch.
Index function
Returns the index to which the document belongs.
Template parameters:
javaScriptIndexFnGcsPath: the Cloud Storage URI of the JavaScript file.javaScriptIndexFnName: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: the value of the document's
_indexmetadata field.
Document ID function
Returns the document ID.
Template parameters:
javaScriptIdFnGcsPath: the Cloud Storage URI of the JavaScript file.javaScriptIdFnName: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: the value of the document's
_idmetadata field.
Document deletion function
Specifies whether to delete a document. To use this function, set the bulk
insert mode to INDEX and provide a
document ID function.
Template parameters:
javaScriptIsDeleteFnGcsPath: the Cloud Storage URI of the JavaScript file.javaScriptIsDeleteFnName: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: return the string
"true"to delete the document, or"false"to upsert the document.
Mapping type function
Returns the document's mapping type.
Template parameters:
javaScriptTypeFnGcsPath: the Cloud Storage URI of the JavaScript file.javaScriptTypeFnName: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: the value of the document's
_typemetadata field.
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Cloud Storage to Elasticsearch template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID\ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \ --parameters \ inputFileSpec=INPUT_FILE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX,\ deadletterTable=DEADLETTER_TABLE,\
Replace the following:
PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME: a unique job name of your choiceVERSION: the version of the template that you want to useYou can use the following values:
latestto use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
REGION_NAME: the region where you want to deploy your Dataflow job—for example,us-central1INPUT_FILE_SPEC: your Cloud Storage file pattern.CONNECTION_URL: your Elasticsearch URL.APIKEY: your base64 encoded API key for authentication.INDEX: your Elasticsearch index.DEADLETTER_TABLE: your BigQuery table.
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFileSpec": "INPUT_FILE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX", "deadletterTable": "DEADLETTER_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch", } }
Replace the following:
PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME: a unique job name of your choiceVERSION: the version of the template that you want to useYou can use the following values:
latestto use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
LOCATION: the region where you want to deploy your Dataflow job—for example,us-central1INPUT_FILE_SPEC: your Cloud Storage file pattern.CONNECTION_URL: your Elasticsearch URL.APIKEY: your base64 encoded API key for authentication.INDEX: your Elasticsearch index.DEADLETTER_TABLE: your BigQuery table.
What's next
- Learn about Dataflow templates.
- See the list of Google-provided templates.