Usar o Datastream e o Dataflow para transmitir dados para o BigQuery

Nesta página, você vai encontrar práticas recomendadas para usar o Datastream e o Dataflow para transmitir dados para o BigQuery.

Particionar conjuntos de dados de réplica em chaves definidas pelo usuário

O conjunto de dados de staging no BigQuery é particionado automaticamente. No entanto, por padrão, o conjunto de dados da réplica não é particionado porque as chaves de partição nas tabelas de réplica precisam ser definidas com base em uma lógica de negócios específica, em vez de serem aplicadas pelo Datastream e pelo Dataflow.

Para cada tabela no conjunto de dados de réplica que precisa de particionamento:

  1. Interrompa e drene o job do Dataflow

  2. Use o editor de SQL no BigQuery para executar o seguinte script SQL em cada tabela no conjunto de dados de réplica. Neste exemplo, a tabela actor no conjunto de dados datastream_cdc tem uma coluna last_update que queremos definir como chave de partição. Ao executar o script, você recria a tabela com a chave de partição correta.

    create table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' partition by date(last_update)
    as SELECT * FROM '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    drop table '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    alter table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' rename to 'actor'
  3. Use o modelo do Datastream para o BigQuery para recriar um job do Dataflow.

Executar funções definidas pelo usuário para manipular dados de eventos

É possível usar o modelo do Datastream para o BigQuery para executar uma função JavaScript definida pelo usuário. Para fazer isso, primeiro coloque um arquivo que contenha a função em um local específico no Cloud Storage. Em seguida, faça o seguinte:

  • Use o parâmetro javascriptTextTransformGcsPath no modelo para especificar o local do arquivo no Cloud Storage que contém sua função definida pelo usuário.
  • Use o parâmetro javascriptTextTransformFunctionName para especificar o nome da função JavaScript que você quer chamar como função definida pelo usuário.

Por exemplo, é possível executar uma função definida pelo usuário para reter registros excluídos nas tabelas do conjunto de dados de réplica no BigQuery. Esse processo é conhecido como exclusão reversível.

Para isso, crie uma função que copie o valor da coluna _metadata_deleted para uma nova coluna chamada is_deleted e, em seguida, redefina o valor da coluna _metadata_deleted para false. Isso faz com que o job do Dataflow ignore os eventos de exclusão e mantenha os registros excluídos ao atualizar o conjunto de dados de réplica no BigQuery.

Confira o exemplo de código para essa função definida pelo usuário:

/**
* This function changes the behavior of the Datastream to
* BigQuery template to allow soft deletes.
* @param {string} messageString from DatastreamIO data
* @return {string} same as an input message with an added property
*/
function transform(messageString) {
   // messageString is a JSON object as a string
   var messageJson = JSON.parse(messageString);
    // Moving the deleted flag to a new column will cause the pipeline to soft delete data.
   messageJson['is_deleted'] = messageJson['_metadata_deleted'];
   messageJson['_metadata_deleted'] = false;
    return JSON.stringify(messageJson);
 }

Definir a frequência de fusão

Use o parâmetro mergeFrequencyMinutes do modelo do Datastream para BigQuery para definir a frequência de mesclagem. É o número de minutos entre as fusões de uma determinada tabela no conjunto de dados de réplica no BigQuery. Enquanto os dados históricos são preenchidos, recomendamos manter a frequência de mesclagem baixa (12 ou 24 horas) para controlar os custos.

Por exemplo, se você definir o valor desse parâmetro como 10 minutos, o Dataflow vai executar o job que usa o modelo a cada 10 minutos. No entanto, na primeira vez que o job for executado, haverá um atraso de cinco minutos. Neste exemplo, se o job for executado às 9h14, a primeira fusão vai ocorrer às 9h29 (10 minutos para a fusão e 5 minutos para o atraso). A segunda fusão vai ocorrer às 9h39, e todas as fusões subsequentes vão acontecer em intervalos de 10 minutos (9h49, 9h59, 10h09 e assim por diante).

Se você definir a frequência de mesclagem como 60 minutos, o job será executado na hora, após um atraso de 5 minutos para a execução inicial. Se o job estiver programado para ser executado às 10h, ele será executado às 10h05 devido ao atraso de 5 minutos. Todas as fusões subsequentes vão ocorrer em intervalos de 60 minutos (11h05, 12h05, 13h05 e assim por diante).

Seja para controlar custos ou por outros motivos, talvez não seja possível fazer uma fusão com uma frequência que atenda às necessidades da sua empresa. Talvez você não tenha os dados mais recentes. Para acessar os dados mais recentes, crie uma visualização sobre as tabelas dos conjuntos de dados de teste e réplica no BigQuery, em que a visualização imita a mesclagem. Essa visualização é criada como uma tabela lógica (para os conjuntos de dados de staging e de réplica). Se a frequência de mesclagem for baixa e você precisar de acesso mais rápido aos dados, use a visualização.