Crie funções definidas pelo utilizador para modelos do Dataflow

Alguns modelos do Dataflow fornecidos pela Google suportam funções definidas pelo utilizador (UDFs). As FDU permitem-lhe expandir a funcionalidade de um modelo sem modificar o código do modelo.

Vista geral

Para criar uma FDU, escreve uma função JavaScript ou uma função Python, consoante o modelo. Armazena o ficheiro de código da FDU no Cloud Storage e especifica a localização como um parâmetro de modelo. Para cada elemento de entrada, o modelo chama a sua função. A função transforma o elemento ou executa outra lógica personalizada e devolve o resultado ao modelo.

Por exemplo, pode usar uma FDU para:

  • Reformate os dados de entrada para corresponderem a um esquema de destino.
  • Oculte dados confidenciais.
  • Filtrar alguns elementos da saída.

A entrada para a função UDF é um único elemento de dados, serializado como uma string JSON. A função devolve uma string JSON serializada como resultado. O formato dos dados depende do modelo. Por exemplo, no modelo Subscrição do Pub/Sub para o BigQuery, a entrada são os dados da mensagem do Pub/Sub serializados como um objeto JSON e a saída é um objeto JSON serializado que representa uma linha da tabela do BigQuery. Para mais informações, consulte a documentação de cada modelo.

Execute um modelo com uma FDU

Para executar um modelo com uma FDU, especifica a localização do Cloud Storage do ficheiro JavaScript e o nome da função como parâmetros do modelo.

Com alguns modelos fornecidos pela Google, também pode criar a FDU diretamente na Google Cloud consola, da seguinte forma:

  1. Aceda à página Dataflow na Google Cloud consola.

    Aceda à página Fluxo de dados

  2. Clique em Criar tarefa a partir de modelo.

  3. Selecione o modelo fornecido pela Google que quer executar.

  4. Expanda Parâmetros opcionais. Se o modelo suportar FDUs, tem um parâmetro para a localização do Cloud Storage da FDU e outro parâmetro para o nome da função.

  5. Junto ao parâmetro do modelo, clique em Criar FDU.

  6. No painel Selecionar ou criar uma função definida pelo utilizador (FDU):

    1. Introduza um nome de ficheiro. Exemplo: my_udf.js.
    2. Selecione uma pasta do Cloud Storage. Exemplo: gs://your-bucket/your-folder.
    3. Use o editor de código inline para escrever a função. O editor é pré-preenchido com código automático que pode usar como ponto de partida.
    4. Clique em Criar FDU.

      A Google Cloud consola guarda o ficheiro UDF e preenche a localização do Cloud Storage.

    5. Introduza o nome da função no campo correspondente.

Escreva uma FDU de JavaScript

O código seguinte mostra uma FDU JavaScript no-op a partir da qual pode começar:

/*
 * @param {string} inJson input JSON message (stringified)
 * @return {?string} outJson output JSON message (stringified)
 */
function process(inJson) {
  const obj = JSON.parse(inJson);

  // Example data transformations:
  // Add a field: obj.newField = 1;
  // Modify a field: obj.existingField = '';
  // Filter a record: return null;

  return JSON.stringify(obj);
}

O código JavaScript é executado no motor JavaScript Nashorn. Recomendamos que teste a sua FDU no motor Nashorn antes de a implementar. O motor Nashorn não corresponde exatamente à implementação do JavaScript do Node.js. Um problema comum é usar console.log() ou Number.isNaN(), nenhum dos quais está definido no motor Nashorn.

Pode testar a sua FDU no motor Nashorn através do Cloud Shell, que tem o JDK 11 pré-instalado. Inicie o Nashorn no modo interativo da seguinte forma:

jjs --language=es6

Na shell interativa do Nashorn, siga estes passos:

  1. Chame load para carregar o ficheiro JavaScript de FDU.
  2. Defina um objeto JSON de entrada consoante as mensagens esperadas do seu pipeline.
  3. Use a função JSON.stringify para serializar a entrada numa string JSON.
  4. Chame a função UDF para processar a string JSON.
  5. Chame JSON.parse para desserializar o resultado.
  6. Valide o resultado.

Exemplo:

> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)

Escreva uma FDU de Python

O código seguinte mostra uma UDF Python no-op a partir da qual pode começar:

import json
def process(value):
  # Load the JSON string into a dictionary.
  data = json.loads(value)

  # Transform the data in some way.
  data['new_field'] = 'new_value'

  # Serialize the data back to JSON.
  return json.dumps(data)

As UDFs Python suportam pacotes de dependências que são padrão do Python e do Apache Beam. Não podem usar pacotes de terceiros.

Processamento de erros

Normalmente, quando ocorre um erro durante a execução da FDU, o erro é escrito numa localização de mensagens rejeitadas. Os detalhes dependem do modelo. Por exemplo, o modelo Subscrição do Pub/Sub para o BigQuery cria uma tabela _error_records e escreve erros na mesma. Os erros de UDF de tempo de execução podem ocorrer devido a erros de sintaxe ou exceções não capturadas. Para verificar se existem erros de sintaxe, teste a FDU localmente.

Pode gerar programaticamente uma exceção para um elemento que não deve ser processado. Neste caso, o elemento é escrito na localização de mensagens rejeitadas, se o modelo suportar uma. Para ver um exemplo que mostra esta abordagem, consulte o artigo Encaminhe eventos.

Exemplos de utilização

Esta secção descreve alguns padrões comuns para UDFs, com base em exemplos de utilização reais.

Enriqueça eventos

Use uma FDU para enriquecer eventos com novos campos para obter informações mais contextuais.

Exemplo:

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Add new field to track data source
  data.source = "source1";
  return JSON.stringify(data);
}

Transforme eventos

Use uma FDU para transformar o formato de evento completo, consoante o que o destino espera.

O exemplo seguinte reverte uma entrada de registo do Cloud Logging (LogEntry) para a string de registo original quando disponível. (Consoante a origem de registo, a string de registo original é, por vezes, preenchida no campo textPayload.) Pode usar este padrão para enviar os registos não processados no respetivo formato original, em vez de enviar o LogEntry completo do Cloud Logging.

 function process(inJson) {
  const data = JSON.parse(inJson);

  if (data.textPayload) {
    return data.textPayload; // Return string value, and skip JSON.stringify
  }
 return JSON.stringify(obj);
}

Oculte ou remova dados de eventos

Use uma FDU para ocultar ou remover uma parte do evento.

O exemplo seguinte oculta o nome do campo sensitiveField substituindo o respetivo valor e remove completamente o campo denominado redundantField.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Normalize existing field values
  data.source = (data.source && data.source.toLowerCase()) || "unknown";

  // Redact existing field values
  if (data.sensitiveField) {
    data.sensitiveField = "REDACTED";
  }

  // Remove existing fields
  if (data.redundantField) {
    delete(data.redundantField);
  }

  return JSON.stringify(data);
}

Eventos de trajeto

Use uma FDU para encaminhar eventos para destinos separados no ponto de recolha a jusante.

O exemplo seguinte, baseado no modelo Pub/Sub para Splunk, encaminha cada evento para o índice do Splunk correto. Chama uma função local definida pelo utilizador para mapear eventos para índices.

function process(inJson) {
  const obj = JSON.parse(inJson);
  
  // Set index programmatically for data segregation in Splunk
  obj._metadata = {
    index: splunkIndexLookup(obj)
  }
  return JSON.stringify(obj);
}  

O exemplo seguinte encaminha eventos não reconhecidos para a fila de mensagens rejeitadas, partindo do princípio de que o modelo suporta uma fila de mensagens rejeitadas. (Por exemplo, consulte o modelo Pub/Sub para JDBC.) Pode usar este padrão para filtrar entradas inesperadas antes de escrever no destino.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Route unrecognized events to the deadletter topic
  if (!data.hasOwnProperty('severity')) {
    throw new Error("Unrecognized event. eventId='" + data.Id + "'");
  }

  return JSON.stringify(data);

Filtre eventos

Use uma FDU para filtrar eventos indesejados ou não reconhecidos da saída.

O exemplo seguinte rejeita eventos em que data.severity é igual a "DEBUG".

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Drop events with certain field values
  if (data.severity == "DEBUG") {
    return null;
  }

  return JSON.stringify(data);
}

O que se segue?