Modelo do Pub/Sub para o Splunk

O modelo Pub/Sub para Splunk é um pipeline de streaming que lê mensagens de uma subscrição do Pub/Sub e escreve a carga útil da mensagem no Splunk através do HTTP Event Collector (HEC) do Splunk. O exemplo de utilização mais comum deste modelo é exportar registos para o Splunk. Para ver um exemplo do fluxo de trabalho subjacente, consulte o artigo Implementar exportações de registos preparadas para produção no Splunk através do Dataflow.

Antes de escrever no Splunk, também pode aplicar uma função definida pelo utilizador em JavaScript ao payload da mensagem. Todas as mensagens que apresentem falhas de processamento são encaminhadas para um tópico não processado do Pub/Sub para resolução de problemas e reprocessamento adicionais.

Como camada adicional de proteção para o seu token HEC, também pode transmitir uma chave do Cloud KMS juntamente com o parâmetro de token HEC codificado em base64 encriptado com a chave do Cloud KMS. Consulte o ponto final de encriptação da API Cloud KMS para ver detalhes adicionais sobre a encriptação do parâmetro do token HEC.

Requisitos do pipeline

  • A subscrição do Pub/Sub de origem tem de existir antes de executar o pipeline.
  • O tópico não processado do Pub/Sub tem de existir antes de executar o pipeline.
  • O ponto final do HEC do Splunk tem de estar acessível a partir da rede dos trabalhadores do Dataflow.
  • O token do HEC do Splunk tem de ser gerado e estar disponível.

Parâmetros de modelos

Parâmetros obrigatórios

  • inputSubscription: a subscrição do Pub/Sub a partir da qual ler a entrada. Por exemplo, projects/your-project-id/subscriptions/your-subscription-name.
  • url: o URL do HEC do Splunk. O URL tem de ser encaminhável a partir da VPC em que a pipeline é executada. Por exemplo, https://splunk-hec-host:8088.
  • outputDeadletterTopic: o tópico do Pub/Sub para o qual encaminhar mensagens não entregues. Por exemplo, projects/<PROJECT_ID>/topics/<TOPIC_NAME>.

Parâmetros opcionais

  • token: o token de autenticação do HEC do Splunk. Tem de ser fornecido se o parâmetro tokenSource estiver definido como PLAINTEXT ou KMS.
  • batchCount: o tamanho do lote para enviar vários eventos para o Splunk. A predefinição é 1 (sem processamento em lote).
  • disableCertificateValidation: desative a validação do certificado SSL. Predefinição false (validação ativada). Se true, os certificados não são validados (todos os certificados são fidedignos) e o parâmetro rootCaCertificatePath é ignorado.
  • parallelism: o número máximo de pedidos paralelos. A predefinição é 1 (sem paralelismo).
  • includePubsubMessage: inclua a mensagem completa do Pub/Sub na carga útil. Predefinição false (apenas o elemento de dados está incluído na carga útil).
  • tokenKMSEncryptionKey: a chave do Cloud KMS a usar para desencriptar a string do token HEC. Este parâmetro tem de ser fornecido quando tokenSource está definido como KMS. Se a chave do Cloud KMS for fornecida, a string do token HEC tem de ser transmitida encriptada. Por exemplo, projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name.
  • tokenSecretId: o ID do segredo do Secret Manager para o token. Este parâmetro tem de ser fornecido quando o tokenSource estiver definido como SECRET_MANAGER. Por exemplo, projects/your-project-id/secrets/your-secret/versions/your-secret-version.
  • tokenSource: a origem do token. Os seguintes valores são permitidos: PLAINTEXT, KMS e SECRET_MANAGER. Tem de fornecer este parâmetro quando o Secret Manager é usado. Se tokenSource estiver definido como KMS, tokenKMSEncryptionKey e encriptado, tem de fornecer token. Se tokenSource estiver definido como SECRET_MANAGER, tem de fornecer tokenSecretId. Se tokenSource estiver definido como PLAINTEXT, tem de fornecer token.
  • rootCaCertificatePath: o URL completo para o certificado da AC de raiz no Cloud Storage. O certificado fornecido no Cloud Storage tem de estar codificado com DER e pode ser fornecido em codificação binária ou imprimível (Base64). Se o certificado for fornecido na codificação Base64, tem de estar delimitado no início por -----BEGIN CERTIFICATE----- e no final por -----END CERTIFICATE-----. Se este parâmetro for fornecido, este ficheiro de certificado de AC privada é obtido e adicionado ao armazenamento de confiança do trabalhador do Dataflow para validar o certificado SSL do ponto final HEC do Splunk. Se este parâmetro não for fornecido, é usada a loja de confiança predefinida. Por exemplo, gs://mybucket/mycerts/privateCA.crt.
  • enableBatchLogs: especifica se os registos devem ser ativados para lotes escritos no Splunk. Predefinição: true.
  • enableGzipHttpCompression: especifica se os pedidos HTTP enviados para o Splunk HEC devem ser comprimidos (conteúdo codificado em gzip). Predefinição: true.
  • javascriptTextTransformGcsPath: o URI do Cloud Storage do ficheiro .js que define a função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: o nome da função definida pelo utilizador (FDU) de JavaScript a usar. Por exemplo, se o código da função JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função é myTransform. Para ver exemplos de UDFs JavaScript, consulte Exemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: define o intervalo que os trabalhadores podem verificar quanto a alterações nas FDUs JavaScript para recarregar os ficheiros. A predefinição é: 0.
  • unwrapHecForDeadletter: quando ativada, se uma mensagem não for escrita no Splunk e for enviada para a fila de mensagens rejeitadas, a carga útil original é extraída do formato HEC do Splunk antes de ser escrita no tópico de mensagens rejeitadas. Isto impede a aninhagem de eventos quando as mensagens são repetidas. Predefinição: false.

Função definida pelo utilizador

Opcionalmente, pode estender este modelo escrevendo uma função definida pelo utilizador (FDU). O modelo chama a FDU para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON. Para mais informações, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.

Especificação da função

A FDU tem a seguinte especificação:

  • Entrada: o campo de dados da mensagem do Pub/Sub, serializado como uma string JSON.
  • Saída: os dados de eventos a enviar para o ponto final de eventos do Splunk HEC. A saída tem de ser uma string ou um objeto JSON em forma de string.

Execute o modelo

Consola

  1. Aceda à página Dataflow Criar tarefa a partir de um modelo.
  2. Aceda a Criar tarefa a partir de modelo
  3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
  4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

    Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte o artigo Localizações do Dataflow.

  5. No menu pendente Modelo do fluxo de dados, selecione the Pub/Sub to Splunk template.
  6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
  7. Opcional: para mudar do processamento exatamente uma vez para o modo de streaming pelo menos uma vez, selecione Pelo menos uma vez.
  8. Clique em Executar tarefa.

gcloud

Na shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_Splunk \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION,\
rootCaCertificatePath=ROOT_CA_CERTIFICATE_PATH

Substitua o seguinte:

  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • STAGING_LOCATION: a localização para organizar ficheiros locais (por exemplo, gs://your-bucket/staging)
  • INPUT_SUBSCRIPTION_NAME: o nome da subscrição do Pub/Sub
  • TOKEN: símbolo do coletor de eventos de HTTP do Splunk
  • URL: o caminho do URL para o coletor de eventos de HTTP do Splunk (por exemplo, https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME: o nome do tópico do Pub/Sub
  • JAVASCRIPT_FUNCTION: o nome da função definida pelo utilizador (FDU) JavaScript que quer usar

    Por exemplo, se o código da função JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função é myTransform. Para ver exemplos de UDFs JavaScript, consulte os exemplos de UDFs.

  • PATH_TO_JAVASCRIPT_UDF_FILE: o URI do Cloud Storage do ficheiro .js que define a função definida pelo utilizador (FDU) JavaScript que quer usar, por exemplo, gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT: o tamanho do lote a usar para enviar vários eventos para o Splunk
  • PARALLELISM: o número de pedidos paralelos a usar para enviar eventos para o Splunk
  • DISABLE_VALIDATION: true se quiser desativar a validação do certificado SSL
  • ROOT_CA_CERTIFICATE_PATH: o caminho para o certificado de AC raiz no Cloud Storage (por exemplo, gs://your-bucket/privateCA.crt)

API

Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION",
       "rootCaCertificatePath": "ROOT_CA_CERTIFICATE_PATH"
   }
}

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • STAGING_LOCATION: a localização para organizar ficheiros locais (por exemplo, gs://your-bucket/staging)
  • INPUT_SUBSCRIPTION_NAME: o nome da subscrição do Pub/Sub
  • TOKEN: símbolo do coletor de eventos de HTTP do Splunk
  • URL: o caminho do URL para o coletor de eventos de HTTP do Splunk (por exemplo, https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME: o nome do tópico do Pub/Sub
  • JAVASCRIPT_FUNCTION: o nome da função definida pelo utilizador (FDU) JavaScript que quer usar

    Por exemplo, se o código da função JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função é myTransform. Para ver exemplos de UDFs JavaScript, consulte os exemplos de UDFs.

  • PATH_TO_JAVASCRIPT_UDF_FILE: o URI do Cloud Storage do ficheiro .js que define a função definida pelo utilizador (FDU) JavaScript que quer usar, por exemplo, gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT: o tamanho do lote a usar para enviar vários eventos para o Splunk
  • PARALLELISM: o número de pedidos paralelos a usar para enviar eventos para o Splunk
  • DISABLE_VALIDATION: true se quiser desativar a validação do certificado SSL
  • ROOT_CA_CERTIFICATE_PATH: o caminho para o certificado de AC raiz no Cloud Storage (por exemplo, gs://your-bucket/privateCA.crt)

O que se segue?