Modelo do Pub/Sub para o Elasticsearch

O modelo Pub/Sub para Elasticsearch é um pipeline de streaming que lê mensagens de uma subscrição do Pub/Sub, executa uma função definida pelo utilizador (UDF) e escreve-as no Elasticsearch como documentos. O modelo do Dataflow usa a funcionalidade de streams de dados do Elasticsearch para armazenar dados de séries cronológicas em vários índices, ao mesmo tempo que lhe dá um recurso com um único nome para pedidos. As streams de dados são adequadas para registos, métricas, rastreios e outros dados gerados continuamente armazenados no Pub/Sub.

O modelo cria uma stream de dados denominada logs-gcp.DATASET-NAMESPACE, onde:

  • DATASET é o valor do parâmetro dataset do modelo ou pubsub se não for especificado.
  • NAMESPACE é o valor do parâmetro namespace do modelo ou default se não for especificado.

Requisitos do pipeline

  • A subscrição do Pub/Sub de origem tem de existir e as mensagens têm de estar codificadas num formato JSON válido.
  • Um anfitrião do Elasticsearch acessível publicamente numa instância da Google Cloud Platform ou no Elastic Cloud com a versão 7.0 ou superior do Elasticsearch. Consulte o artigo Integração do Google Cloud para o Elastic para ver mais detalhes.
  • Um tópico Pub/Sub para a saída de erros.

Parâmetros de modelos

Parâmetros obrigatórios

  • inputSubscription: subscrição do Pub/Sub para consumir a entrada. Por exemplo, projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • errorOutputTopic: o tópico de saída do Pub/Sub para publicar registos com falhas, no formato projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • connectionUrl: o URL do Elasticsearch no formato https://hostname:[port]. Se estiver a usar o Elastic Cloud, especifique o CloudID. Por exemplo, https://elasticsearch-host:9200.
  • apiKey: a chave da API codificada em Base64 a usar para autenticação.

Parâmetros opcionais

  • dataset: o tipo de registos enviados através do Pub/Sub, para o qual temos um painel de controlo pronto a usar. Os valores dos tipos de registos conhecidos são audit, vpcflow e firewall. Predefinição: pubsub.
  • namespace: um agrupamento arbitrário, como um ambiente (dev, prod ou qa), uma equipa ou uma unidade de negócio estratégica. Predefinição: default.
  • elasticsearchTemplateVersion: identificador da versão do modelo do Dataflow, normalmente definido pelo Google Cloud. A predefinição é: 1.0.0.
  • javascriptTextTransformGcsPath: o URI do Cloud Storage do ficheiro .js que define a função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: o nome da função definida pelo utilizador (FDU) de JavaScript a usar. Por exemplo, se o código da função JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função é myTransform. Para ver exemplos de UDFs JavaScript, consulte Exemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: especifica a frequência com que o UDF é recarregado, em minutos. Se o valor for superior a 0, o Dataflow verifica periodicamente o ficheiro de FDU no Cloud Storage e recarrega a FDU se o ficheiro for modificado. Este parâmetro permite-lhe atualizar a UDF enquanto o pipeline está em execução, sem ter de reiniciar a tarefa. Se o valor for 0, o recarregamento das FDU é desativado. O valor predefinido é 0.
  • elasticsearchUsername: o nome de utilizador do Elasticsearch para autenticação. Se for especificado, o valor de apiKey é ignorado.
  • elasticsearchPassword: a palavra-passe do Elasticsearch para autenticação. Se for especificado, o valor de apiKey é ignorado.
  • batchSize: o tamanho do lote em número de documentos. A predefinição é 1000.
  • batchSizeBytes: o tamanho do lote em número de bytes. A predefinição é 5242880 (5 MB).
  • maxRetryAttempts: o número máximo de tentativas. Tem de ser superior a zero. A predefinição é no retries.
  • maxRetryDuration: a duração máxima da repetição em milissegundos. Tem de ser superior a zero. A predefinição é no retries.
  • propertyAsIndex: a propriedade no documento a ser indexado cujo valor especifica os metadados _index a incluir com o documento em pedidos em massa. Tem precedência sobre uma UDF _index. A predefinição é none.
  • javaScriptIndexFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript de uma função que especifica metadados _index a incluir com o documento em pedidos em massa. A predefinição é none.
  • javaScriptIndexFnName: o nome da função JavaScript da FDU que especifica os metadados _index a incluir no documento em pedidos em massa. A predefinição é none.
  • propertyAsId: uma propriedade no documento a ser indexado cujo valor especifica os metadados _id a incluir com o documento em pedidos em massa. Tem precedência sobre uma UDF _id. A predefinição é none.
  • javaScriptIdFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript da função que especifica os metadados _id a incluir com o documento em pedidos em massa. A predefinição é none.
  • javaScriptIdFnName: o nome da função JavaScript UDF que especifica os metadados _id a incluir no documento em pedidos em massa. A predefinição é none.
  • javaScriptTypeFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript de uma função que especifica metadados _type a incluir com documentos em pedidos em massa. A predefinição é none.
  • javaScriptTypeFnName: o nome da função JavaScript de FDU que especifica os metadados _type a incluir no documento em pedidos em massa. A predefinição é none.
  • javaScriptIsDeleteFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript para a função que determina se o documento deve ser eliminado em vez de inserido ou atualizado. A função devolve um valor de string de true ou false. A predefinição é none.
  • javaScriptIsDeleteFnName: o nome da função JavaScript de FDU que determina se o documento deve ser eliminado em vez de inserido ou atualizado. A função devolve um valor de string de true ou false. A predefinição é none.
  • usePartialUpdate: se deve usar atualizações parciais (atualizar em vez de criar ou indexar, permitindo documentos parciais) com pedidos do Elasticsearch. A predefinição é false.
  • bulkInsertMethod: se deve usar INDEX (index, permite inserções/atualizações) ou CREATE (create, erros em _id duplicados) com pedidos em massa do Elasticsearch. A predefinição é CREATE.
  • trustSelfSignedCerts: se deve ou não confiar no certificado autoassinado. Uma instância do Elasticsearch instalada pode ter um certificado autoassinado. Ative esta opção como verdadeira para ignorar a validação no certificado SSL. (Predefinição: false).
  • disableCertificateValidation: se for true, confie no certificado SSL autoassinado. Uma instância do Elasticsearch pode ter um certificado autoassinado. Para ignorar a validação do certificado, defina este parâmetro como true. A predefinição é false.
  • apiKeyKMSEncryptionKey: a chave do Cloud KMS para desencriptar a chave da API. Este parâmetro é obrigatório se o parâmetro apiKeySource estiver definido como KMS. Se este parâmetro for fornecido, transmita uma string apiKey encriptada. Encriptar parâmetros através do ponto final de encriptação da API KMS. Para a chave, use o formato projects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>. Consulte: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt Por exemplo, projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name.
  • apiKeySecretId: o ID do segredo do Secret Manager para a apiKey. Se o parâmetro apiKeySource estiver definido como SECRET_MANAGER, forneça este parâmetro. Use o formato projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example, projects/your-project-id/secrets/your-secret/versions/your-secret-version`.
  • apiKeySource: a origem da chave da API. Os valores permitidos são PLAINTEXT, KMS e SECRET_MANAGER. Este parâmetro é obrigatório quando usa o Secret Manager ou o KMS. Se apiKeySource estiver definido como KMS, tem de fornecer apiKeyKMSEncryptionKey e a apiKey encriptada. Se apiKeySource estiver definido como SECRET_MANAGER, tem de fornecer apiKeySecretId. Se apiKeySource estiver definido como PLAINTEXT, tem de fornecer apiKey. A predefinição é: PLAINTEXT.
  • socketTimeout: se definido, substitui o tempo limite máximo de repetição predefinido e o tempo limite de socket predefinido (30 000 ms) no Elastic RestClient.

Funções definidas pelo utilizador

Este modelo suporta funções definidas pelo utilizador (UDFs) em vários pontos do pipeline, descritos abaixo. Para mais informações, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.

Função de transformação de texto

Transforma a mensagem do Pub/Sub num documento do Elasticsearch.

Parâmetros de modelo:

  • javascriptTextTransformGcsPath: o URI do Cloud Storage do ficheiro JavaScript.
  • javascriptTextTransformFunctionName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o campo de dados da mensagem do Pub/Sub, serializado como uma string JSON.
  • Saída: um documento JSON convertido em string para inserir no Elasticsearch.

Função ÍNDICE

Devolve o índice ao qual o documento pertence.

Parâmetros de modelo:

  • javaScriptIndexFnGcsPath: o URI do Cloud Storage do ficheiro JavaScript.
  • javaScriptIndexFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Resultado: o valor do campo de metadados do documento._index

Função ID do documento

Devolve o ID do documento.

Parâmetros de modelo:

  • javaScriptIdFnGcsPath: o URI do Cloud Storage do ficheiro JavaScript.
  • javaScriptIdFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Resultado: o valor do campo de metadados _id do documento.

Função de eliminação de documentos

Especifica se um documento deve ser eliminado. Para usar esta função, defina o modo de inserção em massa como INDEX e forneça uma função de ID do documento.

Parâmetros de modelo:

  • javaScriptIsDeleteFnGcsPath: o URI do Cloud Storage do ficheiro JavaScript.
  • javaScriptIsDeleteFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: devolve a string "true" para eliminar o documento ou "false" para inserir/atualizar o documento.

Função de tipo de mapeamento

Devolve o tipo de mapeamento do documento.

Parâmetros de modelo:

  • javaScriptTypeFnGcsPath: o URI do Cloud Storage do ficheiro JavaScript.
  • javaScriptTypeFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Resultado: o valor do campo de metadados _type do documento.

Execute o modelo

Consola

  1. Aceda à página Dataflow Criar tarefa a partir de um modelo.
  2. Aceda a Criar tarefa a partir de modelo
  3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
  4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

    Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte o artigo Localizações do Dataflow.

  5. No menu pendente Modelo do fluxo de dados, selecione the Pub/Sub to Elasticsearch template.
  6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
  7. Clique em Executar tarefa.

gcloud

Na shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Elasticsearch_Flex \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • ERROR_OUTPUT_TOPIC: o seu tópico do Pub/Sub para a saída de erros
  • SUBSCRIPTION_NAME: o nome da sua subscrição do Pub/Sub
  • CONNECTION_URL: o URL do Elasticsearch
  • DATASET: o tipo de registo
  • NAMESPACE: o seu espaço de nomes para o conjunto de dados
  • APIKEY: a chave da API codificada em base64 para autenticação

API

Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_Elasticsearch_Flex",
   }
}
  

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • ERROR_OUTPUT_TOPIC: o seu tópico do Pub/Sub para a saída de erros
  • SUBSCRIPTION_NAME: o nome da sua subscrição do Pub/Sub
  • CONNECTION_URL: o URL do Elasticsearch
  • DATASET: o tipo de registo
  • NAMESPACE: o seu espaço de nomes para o conjunto de dados
  • APIKEY: a chave da API codificada em base64 para autenticação

O que se segue?