O modelo Pub/Sub para Elasticsearch é um pipeline de streaming que lê mensagens de uma subscrição do Pub/Sub, executa uma função definida pelo utilizador (UDF) e escreve-as no Elasticsearch como documentos. O modelo do Dataflow usa a funcionalidade de streams de dados do Elasticsearch para armazenar dados de séries cronológicas em vários índices, ao mesmo tempo que lhe dá um recurso com um único nome para pedidos. As streams de dados são adequadas para registos, métricas, rastreios e outros dados gerados continuamente armazenados no Pub/Sub.
O modelo cria uma stream de dados denominada
logs-gcp.DATASET-NAMESPACE, onde:
- DATASET é o valor do parâmetro
datasetdo modelo oupubsubse não for especificado. - NAMESPACE é o valor do parâmetro
namespacedo modelo oudefaultse não for especificado.
Requisitos do pipeline
- A subscrição do Pub/Sub de origem tem de existir e as mensagens têm de estar codificadas num formato JSON válido.
- Um anfitrião do Elasticsearch acessível publicamente numa instância da Google Cloud Platform ou no Elastic Cloud com a versão 7.0 ou superior do Elasticsearch. Consulte o artigo Integração do Google Cloud para o Elastic para ver mais detalhes.
- Um tópico Pub/Sub para a saída de erros.
Parâmetros de modelos
Parâmetros obrigatórios
- inputSubscription: subscrição do Pub/Sub para consumir a entrada. Por exemplo,
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>. - errorOutputTopic: o tópico de saída do Pub/Sub para publicar registos com falhas, no formato
projects/<PROJECT_ID>/topics/<TOPIC_NAME>. - connectionUrl: o URL do Elasticsearch no formato
https://hostname:[port]. Se estiver a usar o Elastic Cloud, especifique o CloudID. Por exemplo,https://elasticsearch-host:9200. - apiKey: a chave da API codificada em Base64 a usar para autenticação.
Parâmetros opcionais
- dataset: o tipo de registos enviados através do Pub/Sub, para o qual temos um painel de controlo pronto a usar. Os valores dos tipos de registos conhecidos são
audit,vpcflowefirewall. Predefinição:pubsub. - namespace: um agrupamento arbitrário, como um ambiente (dev, prod ou qa), uma equipa ou uma unidade de negócio estratégica. Predefinição:
default. - elasticsearchTemplateVersion: identificador da versão do modelo do Dataflow, normalmente definido pelo Google Cloud. A predefinição é: 1.0.0.
- javascriptTextTransformGcsPath: o URI do Cloud Storage do ficheiro .js que define a função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo,
gs://my-bucket/my-udfs/my_file.js. - javascriptTextTransformFunctionName: o nome da função definida pelo utilizador (FDU) de JavaScript a usar. Por exemplo, se o código da função JavaScript for
myTransform(inJson) { /*...do stuff...*/ }, o nome da função émyTransform. Para ver exemplos de UDFs JavaScript, consulte Exemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). - javascriptTextTransformReloadIntervalMinutes: especifica a frequência com que o UDF é recarregado, em minutos. Se o valor for superior a 0, o Dataflow verifica periodicamente o ficheiro de FDU no Cloud Storage e recarrega a FDU se o ficheiro for modificado. Este parâmetro permite-lhe atualizar a UDF enquanto o pipeline está em execução, sem ter de reiniciar a tarefa. Se o valor for
0, o recarregamento das FDU é desativado. O valor predefinido é0. - elasticsearchUsername: o nome de utilizador do Elasticsearch para autenticação. Se for especificado, o valor de
apiKeyé ignorado. - elasticsearchPassword: a palavra-passe do Elasticsearch para autenticação. Se for especificado, o valor de
apiKeyé ignorado. - batchSize: o tamanho do lote em número de documentos. A predefinição é
1000. - batchSizeBytes: o tamanho do lote em número de bytes. A predefinição é
5242880(5 MB). - maxRetryAttempts: o número máximo de tentativas. Tem de ser superior a zero. A predefinição é
no retries. - maxRetryDuration: a duração máxima da repetição em milissegundos. Tem de ser superior a zero. A predefinição é
no retries. - propertyAsIndex: a propriedade no documento a ser indexado cujo valor especifica os metadados
_indexa incluir com o documento em pedidos em massa. Tem precedência sobre uma UDF_index. A predefinição énone. - javaScriptIndexFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript de uma função que especifica metadados
_indexa incluir com o documento em pedidos em massa. A predefinição énone. - javaScriptIndexFnName: o nome da função JavaScript da FDU que especifica os metadados
_indexa incluir no documento em pedidos em massa. A predefinição énone. - propertyAsId: uma propriedade no documento a ser indexado cujo valor especifica os metadados
_ida incluir com o documento em pedidos em massa. Tem precedência sobre uma UDF_id. A predefinição énone. - javaScriptIdFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript da função que especifica os metadados
_ida incluir com o documento em pedidos em massa. A predefinição énone. - javaScriptIdFnName: o nome da função JavaScript UDF que especifica os metadados
_ida incluir no documento em pedidos em massa. A predefinição énone. - javaScriptTypeFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript de uma função que especifica metadados
_typea incluir com documentos em pedidos em massa. A predefinição énone. - javaScriptTypeFnName: o nome da função JavaScript de FDU que especifica os metadados
_typea incluir no documento em pedidos em massa. A predefinição énone. - javaScriptIsDeleteFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript para a função que determina se o documento deve ser eliminado em vez de inserido ou atualizado. A função devolve um valor de string de
trueoufalse. A predefinição énone. - javaScriptIsDeleteFnName: o nome da função JavaScript de FDU que determina se o documento deve ser eliminado em vez de inserido ou atualizado. A função devolve um valor de string de
trueoufalse. A predefinição énone. - usePartialUpdate: se deve usar atualizações parciais (atualizar em vez de criar ou indexar, permitindo documentos parciais) com pedidos do Elasticsearch. A predefinição é
false. - bulkInsertMethod: se deve usar
INDEX(index, permite inserções/atualizações) ouCREATE(create, erros em _id duplicados) com pedidos em massa do Elasticsearch. A predefinição éCREATE. - trustSelfSignedCerts: se deve ou não confiar no certificado autoassinado. Uma instância do Elasticsearch instalada pode ter um certificado autoassinado. Ative esta opção como verdadeira para ignorar a validação no certificado SSL. (Predefinição:
false). - disableCertificateValidation: se for
true, confie no certificado SSL autoassinado. Uma instância do Elasticsearch pode ter um certificado autoassinado. Para ignorar a validação do certificado, defina este parâmetro comotrue. A predefinição éfalse. - apiKeyKMSEncryptionKey: a chave do Cloud KMS para desencriptar a chave da API. Este parâmetro é obrigatório se o parâmetro
apiKeySourceestiver definido comoKMS. Se este parâmetro for fornecido, transmita uma stringapiKeyencriptada. Encriptar parâmetros através do ponto final de encriptação da API KMS. Para a chave, use o formatoprojects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>. Consulte: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt Por exemplo,projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name. - apiKeySecretId: o ID do segredo do Secret Manager para a apiKey. Se o parâmetro
apiKeySourceestiver definido comoSECRET_MANAGER, forneça este parâmetro. Use o formatoprojects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example,projects/your-project-id/secrets/your-secret/versions/your-secret-version`. - apiKeySource: a origem da chave da API. Os valores permitidos são
PLAINTEXT,KMSeSECRET_MANAGER. Este parâmetro é obrigatório quando usa o Secret Manager ou o KMS. SeapiKeySourceestiver definido comoKMS, tem de fornecerapiKeyKMSEncryptionKeye a apiKey encriptada. SeapiKeySourceestiver definido comoSECRET_MANAGER, tem de fornecerapiKeySecretId. SeapiKeySourceestiver definido comoPLAINTEXT, tem de fornecerapiKey. A predefinição é: PLAINTEXT. - socketTimeout: se definido, substitui o tempo limite máximo de repetição predefinido e o tempo limite de socket predefinido (30 000 ms) no Elastic RestClient.
Funções definidas pelo utilizador
Este modelo suporta funções definidas pelo utilizador (UDFs) em vários pontos do pipeline, descritos abaixo. Para mais informações, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.
Função de transformação de texto
Transforma a mensagem do Pub/Sub num documento do Elasticsearch.
Parâmetros de modelo:
javascriptTextTransformGcsPath: o URI do Cloud Storage do ficheiro JavaScript.javascriptTextTransformFunctionName: o nome da função JavaScript.
Especificação da função:
- Entrada: o campo de dados da mensagem do Pub/Sub, serializado como uma string JSON.
- Saída: um documento JSON convertido em string para inserir no Elasticsearch.
Função ÍNDICE
Devolve o índice ao qual o documento pertence.
Parâmetros de modelo:
javaScriptIndexFnGcsPath: o URI do Cloud Storage do ficheiro JavaScript.javaScriptIndexFnName: o nome da função JavaScript.
Especificação da função:
- Entrada: o documento do Elasticsearch, serializado como uma string JSON.
- Resultado: o valor do campo de metadados do documento.
_index
Função ID do documento
Devolve o ID do documento.
Parâmetros de modelo:
javaScriptIdFnGcsPath: o URI do Cloud Storage do ficheiro JavaScript.javaScriptIdFnName: o nome da função JavaScript.
Especificação da função:
- Entrada: o documento do Elasticsearch, serializado como uma string JSON.
- Resultado: o valor do campo de metadados
_iddo documento.
Função de eliminação de documentos
Especifica se um documento deve ser eliminado. Para usar esta função, defina o modo de inserção em massa como INDEX e forneça uma função de ID do documento.
Parâmetros de modelo:
javaScriptIsDeleteFnGcsPath: o URI do Cloud Storage do ficheiro JavaScript.javaScriptIsDeleteFnName: o nome da função JavaScript.
Especificação da função:
- Entrada: o documento do Elasticsearch, serializado como uma string JSON.
- Saída: devolve a string
"true"para eliminar o documento ou"false"para inserir/atualizar o documento.
Função de tipo de mapeamento
Devolve o tipo de mapeamento do documento.
Parâmetros de modelo:
javaScriptTypeFnGcsPath: o URI do Cloud Storage do ficheiro JavaScript.javaScriptTypeFnName: o nome da função JavaScript.
Especificação da função:
- Entrada: o documento do Elasticsearch, serializado como uma string JSON.
- Resultado: o valor do campo de metadados
_typedo documento.
Execute o modelo
Consola
- Aceda à página Dataflow Criar tarefa a partir de um modelo. Aceda a Criar tarefa a partir de modelo
- No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
- Opcional: para Ponto final regional, selecione um valor no menu pendente. A região
predefinida é
us-central1.Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte o artigo Localizações do Dataflow.
- No menu pendente Modelo do fluxo de dados, selecione the Pub/Sub to Elasticsearch template.
- Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
- Clique em Executar tarefa.
gcloud
Na shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Elasticsearch_Flex \ --parameters \ inputSubscription=SUBSCRIPTION_NAME,\ connectionUrl=CONNECTION_URL,\ dataset=DATASET,\ namespace=NAMESPACE,\ apiKey=APIKEY,\ errorOutputTopic=ERROR_OUTPUT_TOPIC
Substitua o seguinte:
PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google CloudJOB_NAME: um nome de tarefa exclusivo à sua escolhaREGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1VERSION: a versão do modelo que quer usarPode usar os seguintes valores:
latestpara usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
ERROR_OUTPUT_TOPIC: o seu tópico do Pub/Sub para a saída de errosSUBSCRIPTION_NAME: o nome da sua subscrição do Pub/SubCONNECTION_URL: o URL do ElasticsearchDATASET: o tipo de registoNAMESPACE: o seu espaço de nomes para o conjunto de dadosAPIKEY: a chave da API codificada em base64 para autenticação
API
Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "SUBSCRIPTION_NAME", "connectionUrl": "CONNECTION_URL", "dataset": "DATASET", "namespace": "NAMESPACE", "apiKey": "APIKEY", "errorOutputTopic": "ERROR_OUTPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_Elasticsearch_Flex", } }
Substitua o seguinte:
PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google CloudJOB_NAME: um nome de tarefa exclusivo à sua escolhaLOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1VERSION: a versão do modelo que quer usarPode usar os seguintes valores:
latestpara usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
ERROR_OUTPUT_TOPIC: o seu tópico do Pub/Sub para a saída de errosSUBSCRIPTION_NAME: o nome da sua subscrição do Pub/SubCONNECTION_URL: o URL do ElasticsearchDATASET: o tipo de registoNAMESPACE: o seu espaço de nomes para o conjunto de dadosAPIKEY: a chave da API codificada em base64 para autenticação
O que se segue?
- Saiba mais sobre os modelos do Dataflow.
- Consulte a lista de modelos fornecidos pela Google.