O modelo do Cloud Storage para Elasticsearch é um pipeline em lote que lê dados de arquivos CSV armazenados em um bucket do Cloud Storage e os grava no Elasticsearch como documentos JSON.
Requisitos de pipeline
- O bucket do Cloud Storage precisa existir.
- É necessário que haja um host do Elasticsearch em uma instância do Google Cloud Platform ou no Elasticsearch Cloud acessível pelo Dataflow.
- Uma tabela do BigQuery para saída de erros precisa existir.
Esquema CSV
Se os arquivos CSV tiverem cabeçalhos, defina o parâmetro de modelo containsHeaders como true.
Caso contrário, crie um arquivo de esquema JSON que descreva os dados. Especifique o URI do Cloud Storage do arquivo de esquema no parâmetro de modelo jsonSchemaPath. O exemplo a seguir mostra um esquema JSON:
[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]
Como alternativa, é possível fornecer uma função definida pelo usuário (UDF, na sigla em inglês) que analisa o texto CSV e gera documentos do Elasticsearch.
Parâmetros do modelo
Parâmetros obrigatórios
- deadletterTable: a tabela de mensagens inativas do BigQuery para onde enviar inserções com falha. Por exemplo,
your-project:your-dataset.your-table-name. - inputFileSpec: o padrão de arquivo do Cloud Storage para pesquisar arquivos CSV. Por exemplo,
gs://mybucket/test-*.csv. - connectionUrl: o URL do Elasticsearch no formato
https://hostname:[port]. Se estiver usando o Elastic Cloud, especifique o CloudID. Por exemplo,https://elasticsearch-host:9200. - apiKey: a chave de API codificada em Base64 que será usada para autenticação.
- index: o índice do Elasticsearch para o qual as solicitações são emitidas. Por exemplo,
my-index.
Parâmetros opcionais
- inputFormat: o formato do arquivo de entrada. O padrão é
CSV. - containsHeaders: os arquivos CSV de entrada contêm um registro de cabeçalho (verdadeiro/falso). Obrigatório apenas ao ler arquivos CSV. O padrão é: falso.
- delimiter: o delimitador de coluna dos arquivos de texto de entrada. Padrão:
,. Por exemplo,,. - csvFormat: especificação de formato CSV a ser usada para analisar registros. O padrão é
Default. Consulte https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html para mais detalhes. Precisa corresponder exatamente aos nomes de formato encontrados em: https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Predefined.html. - jsonSchemaPath: o caminho para o esquema JSON. O padrão é
null. Por exemplo,gs://path/to/schema. - largeNumFiles: defina como "verdadeiro" se o número de arquivos estiver na casa de milhares. O padrão é
false. - csvFileEncoding: formato de codificação de caracteres de arquivo CSV. Os valores permitidos são
US-ASCII,ISO-8859-1,UTF-8eUTF-16. O padrão é UTF-8. - logDetailedCsvConversionErrors: defina como
truepara ativar o registro detalhado de erros quando a análise de CSV falha. Isso pode expor dados sensíveis nos registros, por exemplo, se o arquivo CSV contiver senhas. Padrão:false. - elasticsearchUsername: o nome de usuário do Elasticsearch usado para autenticação. Se especificado, o valor de
apiKeyserá ignorado. - elasticsearchPassword: a senha do Elasticsearch para autenticação. Se especificado, o valor de
apiKeyserá ignorado. - batchSize: o tamanho do lote em número de documentos. O padrão é
1000. - batchSizeBytes: o tamanho do lote em número de bytes. O padrão é
5242880(5 MB). - maxRetryAttempts: o número máximo de novas tentativas. Precisa ser maior que zero. O padrão é
no retries. - maxRetryDuration: a duração máxima da nova tentativa em milissegundos. Precisa ser maior que zero. O padrão é
no retries. - propertyAsIndex: a propriedade no documento que está sendo indexada, cujo valor especifica metadados
_indexa serem incluídos com o documento em solicitações em massa. Tem precedência sobre uma UDF_index. O padrão énone. - javaScriptIndexFnGcsPath: o caminho do Cloud Storage para a origem da UDF em JavaScript para uma função que especifica os metadados
_indexa serem incluídos no documento em solicitações em massa. O padrão énone. - javaScriptIndexFnName: o nome da função JavaScript da UDF que especifica os metadados
_indexa serem incluídos no documento em solicitações em massa. O padrão énone. - propertyAsId: uma propriedade no documento que está sendo indexada com um valor que especifica metadados
_ida serem incluídos com o documento em solicitações em massa. Tem precedência sobre uma UDF_id. O padrão énone. - javaScriptIdFnGcsPath: o caminho do Cloud Storage para a origem da UDF em JavaScript para a função que especifica os metadados
_ida serem incluídos no documento em solicitações em massa. O padrão énone. - javaScriptIdFnName: o nome da função JavaScript da UDF que especifica os metadados
_ida serem incluídos com o documento em solicitações em massa. O padrão énone. - javaScriptTypeFnGcsPath: o caminho do Cloud Storage para a origem da UDF em JavaScript para uma função que especifica metadados
_typea serem incluídos com documentos em solicitações em massa. O padrão énone. - javaScriptTypeFnName: o nome da função JavaScript da UDF que especifica os metadados
_typea serem incluídos no documento em solicitações em massa. O padrão énone. - javaScriptIsDeleteFnGcsPath: o caminho do Cloud Storage para a origem da UDF em JavaScript para a função que determina se o documento precisa ser excluído em vez de inserido ou atualizado. A função retorna um valor de string de
trueoufalse. O padrão énone. - javaScriptIsDeleteFnName: o nome da função JavaScript da UDF que determina se é necessário excluir o documento em vez de inserir ou atualizar. A função retorna um valor de string de
trueoufalse. O padrão énone. - usePartialUpdate: indica se as atualizações parciais vão ser usadas (atualizar em vez de criar ou indexar, permitindo documentos parciais) com solicitações Elasticsearch. O padrão é
false. - bulkInsertMethod: indica se é necessário usar
INDEX(índice, permite ajustes) ouCREATE(criar, erros em _id duplicados) com solicitações em massa do Elasticsearch. O padrão éCREATE. - trustSelfSignedCerts: se é possível ou não confiar em um certificado autoassinado. Uma instância do Elasticsearch instalada pode ter um certificado autoassinado. Ative essa opção como "True" para ignorar a validação no certificado SSL. O padrão é
false. - disableCertificateValidation: se
true, confie no certificado SSL autoassinado. Uma instância do Elasticsearch pode ter um certificado autoassinado. Para ignorar a validação do certificado, defina esse parâmetro comotrue. O padrão éfalse. - apiKeyKMSEncryptionKey: a chave do Cloud KMS para descriptografar a chave de API. Esse parâmetro é obrigatório se
apiKeySourceestiver definido comoKMS. Se esse parâmetro for fornecido, transmita uma stringapiKeycriptografada. Criptografe parâmetros usando o endpoint de criptografia da API KMS. Para a chave, use o formatoprojects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>. Consulte: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt. Por exemplo,projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name. - apiKeySecretId: o ID do secret do Secret Manager para a apiKey. Forneça esse parâmetro se
apiKeySourceestiver definido comoSECRET_MANAGER. Use o formatoprojects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example,projects/your-project-id/secrets/your-secret/versions/your-secret-version`. - apiKeySource: a origem da chave de API. Os valores permitidos são
PLAINTEXT,KMSeSECRET_MANAGER. Esse parâmetro é obrigatório quando você usa o Secret Manager ou o KMS. SeapiKeySourceestiver definido comoKMS,apiKeyKMSEncryptionKeye a apiKey criptografada precisarão ser fornecidas. SeapiKeySourceestiver definido comoSECRET_MANAGER,apiKeySecretIdprecisará ser fornecido. SeapiKeySourceestiver definido comoPLAINTEXT,apiKeyprecisará ser fornecido. O padrão é: PLAINTEXT. - socketTimeout: se definido, substitui o tempo limite máximo padrão de nova tentativa e o tempo limite padrão do soquete (30.000 ms) no Elastic RestClient.
- javascriptTextTransformGcsPath: o URI do Cloud Storage do arquivo .js que define a função JavaScript definida pelo usuário (UDF) a ser usada. Por exemplo,
gs://my-bucket/my-udfs/my_file.js. - javascriptTextTransformFunctionName: o nome da função JavaScript definida pelo usuário (UDF) a ser usada. Por exemplo, se o código de função do JavaScript for
myTransform(inJson) { /*...do stuff...*/ }, o nome da função serámyTransform. Para ver exemplos de UDFs em JavaScript, consulte os exemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
Funções definidas pelo usuário
Esse modelo é compatível com funções definidas pelo usuário (UDFs) em vários pontos do pipeline, descritas abaixo. Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.
Função de transformação de texto
Transforma os dados CSV em um documento do Elasticsearch.
Parâmetros do modelo:
javascriptTextTransformGcsPath: o URI do Cloud Storage do arquivo JavaScript.javascriptTextTransformFunctionName: o nome da função JavaScript.
Especificação da função:
- Entrada: uma única linha de um arquivo CSV de entrada.
- Saída: um documento JSON em formato de string para inserir no Elasticsearch.
Função de índice
Retorna o índice ao qual o documento pertence.
Parâmetros do modelo:
javaScriptIndexFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.javaScriptIndexFnName: o nome da função JavaScript.
Especificação da função:
- Entrada: o documento do Elasticsearch, serializado como uma string JSON.
- Saída: o valor do campo de metadados
_indexdo documento.
Função ID do documento
Retorna o ID do documento.
Parâmetros do modelo:
javaScriptIdFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.javaScriptIdFnName: o nome da função JavaScript.
Especificação da função:
- Entrada: o documento do Elasticsearch, serializado como uma string JSON.
- Saída: o valor do campo de metadados
_iddo documento.
Função de exclusão de documentos
Especifica se um documento deve ser excluído. Para usar essa função, defina o modo de inserção em massa como INDEX e forneça uma função de ID do documento.
Parâmetros do modelo:
javaScriptIsDeleteFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.javaScriptIsDeleteFnName: o nome da função JavaScript.
Especificação da função:
- Entrada: o documento do Elasticsearch, serializado como uma string JSON.
- Saída: retorna a string
"true"para excluir o documento ou"false"para manter o documento.
Função do tipo de mapeamento
Retorna o tipo de mapeamento do documento.
Parâmetros do modelo:
javaScriptTypeFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.javaScriptTypeFnName: o nome da função JavaScript.
Especificação da função:
- Entrada: o documento do Elasticsearch, serializado como uma string JSON.
- Saída: o valor do campo de metadados
_typedo documento.
Executar o modelo
Console
- Acesse a página Criar job usando um modelo do Dataflow. Acesse Criar job usando um modelo
- No campo Nome do job, insira um nome exclusivo.
- Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é
us-central1.Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.
- No menu suspenso Modelo do Dataflow, selecione the Cloud Storage to Elasticsearch template.
- Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
- Cliquem em Executar job.
gcloud
No shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID\ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \ --parameters \ inputFileSpec=INPUT_FILE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX,\ deadletterTable=DEADLETTER_TABLE,\
Substitua:
PROJECT_ID: o ID do projeto Google Cloud em que você quer executar o job do DataflowJOB_NAME: um nome de job de sua escolhaVERSION: a versão do modelo que você quer usarUse estes valores:
latestpara usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo,us-central1INPUT_FILE_SPEC: o padrão de arquivo do Cloud Storage.CONNECTION_URL: seu URL do ElasticsearchAPIKEY: sua chave de API codificada em base64 para autenticação.INDEX: seu índice do Elasticsearch.DEADLETTER_TABLE: sua tabela do BigQuery.
API
Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a
API e os respectivos escopos de autorização, consulte
projects.templates.launch.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFileSpec": "INPUT_FILE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX", "deadletterTable": "DEADLETTER_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch", } }
Substitua:
PROJECT_ID: o ID do projeto Google Cloud em que você quer executar o job do DataflowJOB_NAME: um nome de job de sua escolhaVERSION: a versão do modelo que você quer usarUse estes valores:
latestpara usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo,us-central1INPUT_FILE_SPEC: o padrão de arquivo do Cloud Storage.CONNECTION_URL: seu URL do ElasticsearchAPIKEY: sua chave de API codificada em base64 para autenticação.INDEX: seu índice do Elasticsearch.DEADLETTER_TABLE: sua tabela do BigQuery.
A seguir
- Saiba mais sobre os modelos do Dataflow.
- Confira a lista de modelos fornecidos pelo Google.