Modelo do Datastream para MySQL ou PostgreSQL (Stream)

O modelo Datastream para SQL é um pipeline de streaming que lê dados do Datastream e os replica em qualquer banco de dados MySQL ou PostgreSQL. O modelo lê dados do Cloud Storage usando notificações do Pub/Sub e replica esses dados em tabelas de réplica do PostgreSQL. Especifique o parâmetro gcsPubSubSubscription para ler dados de notificações do Pub/Sub OU forneça o parâmetro inputFilePattern para ler dados diretamente de arquivos no Cloud Storage.

O modelo não é compatível com a linguagem de definição de dados (DDL) e espera que todas as tabelas já existam no banco de dados. A replicação usa transformações com estado do Dataflow para filtrar dados desatualizados e garantir consistência nos dados fora de ordem. Por exemplo, se uma versão mais recente de uma linha já tiver passado, uma versão que chega atrasada dessa linha será ignorada. A linguagem de manipulação de dados (DML, na sigla em inglês) executada é a melhor tentativa de replicar perfeitamente os dados de origem nos de destino. As instruções DML executadas seguem as seguintes regras:

  • Se houver uma chave primária, as operações de inserção e atualização usarão sintaxe de mesclagem (isto é, INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE).
  • Se houver chaves primárias, as exclusões serão replicadas como uma DML de exclusão.
  • Se não houver uma chave primária, as operações de inserção e atualização serão inseridas na tabela.
  • Se não houver chaves primárias, as exclusões serão ignoradas.

Se você estiver usando os utilitários Oracle para Postgres, adicione ROWID no PostgreSQL como a chave primária quando não houver nenhuma.

Requisitos de pipeline

  • Um stream do Datastream que está pronto ou já está replicando dados.
  • As notificações do Pub/Sub do Cloud Storage estão ativadas para os dados do Datastream.
  • Um banco de dados do PostgreSQL foi propagado com o esquema necessário.
  • O acesso à rede entre workers do Dataflow e o PostgreSQL está configurado.

Parâmetros do modelo

Parâmetros obrigatórios

  • inputFilePattern: o local dos arquivos do Datastream que serão replicados no Cloud Storage. Normalmente, esse local de arquivo é o caminho raiz do stream.
  • databaseHost: o host SQL para se conectar.
  • databaseUser: o usuário do PostgreSQL com todas as permissões necessárias para gravar em todas as tabelas na replicação.
  • databasePassword: a senha do usuário SQL.

Parâmetros opcionais

  • gcsPubSubSubscription: a assinatura do Pub/Sub com notificações de arquivos do Datastream. Por exemplo, projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>.
  • inputFileFormat: o formato do arquivo de saída produzido pelo Datastream. Por exemplo, avro ou json. O padrão é avro.
  • streamName: o nome ou modelo do stream para pesquisar informações de esquema. O valor padrão é {_metadata_stream}.
  • rfcStartDateTime: o DateTime inicial usado para buscar do Cloud Storage (https://tools.ietf.org/html/rfc3339). O padrão é: 1970-01-01T00:00:00.00Z.
  • dataStreamRootUrl: URL raiz da API Datastream. O padrão é: https://datastream.googleapis.com/.
  • databaseType: o tipo de banco de dados para gravar (por exemplo, Postgres). O padrão é postgres.
  • databasePort: a porta do banco de dados SQL para a conexão. O valor padrão é 5432.
  • databaseName: o nome do banco de dados SQL a ser conectado. O valor padrão é postgres.
  • defaultCasing: uma alternância para o comportamento de maiúsculas e minúsculas da tabela. Por exemplo, LOWERCASE = mytable -> mytable, UPPERCASE = mytable -> MYTABLECAMEL = my_table -> myTable, SNAKE = myTable -> my_table. O padrão é: LOWERCASE.
  • columnCasing: uma alternância para a capitalização do nome da coluna de destino. LOWERCASE (padrão): my_column -> my_column. UPPERCASE: my_column -> MY_COLUMN. CAMEL: my_column -> myColumn. SNAKE: myColumn -> my_column.
  • schemaMap: um mapa de chaves/valores usados para ditar mudanças no nome do esquema e da tabela. Exemplos: esquema para esquema (SCHEMA1:SCHEMA2), tabela para tabela (SCHEMA1.table1:SCHEMA2.TABLE1) ou vários mapeamentos usando o delimitador de barra vertical "|" (por exemplo, schema1.source:schema2.target|schema3.source:schema4.target). O padrão é vazio.
  • customConnectionString: string de conexão opcional que será usada no lugar da string do banco de dados padrão.
  • numThreads: determina o paralelismo principal da etapa "Formatar para DML". Especificamente, o valor é transmitido para "Reshuffle.withNumBuckets". O padrão é: 100.
  • databaseLoginTimeout: o tempo limite em segundos para tentativas de login no banco de dados. Isso ajuda a evitar falhas de conexão quando vários trabalhadores tentam se conectar simultaneamente.
  • orderByIncludesIsDeleted: a ordenação por configurações de dados deve incluir a priorização de dados que não foram excluídos. O padrão é: falso.
  • datastreamSourceType: substitui a detecção do tipo de origem para dados de CDC do Datastream. Quando especificado, esse valor será usado em vez de derivar o tipo de origem do campo "read_method". Os valores válidos incluem "mysql", "postgresql", "oracle" etc. Esse parâmetro é útil quando o campo "read_method" contém "cdc" e o tipo de origem real não pode ser determinado automaticamente.
  • deadLetterQueueDirectory: o caminho usado pelo Dataflow para gravar a saída da fila de mensagens inativas. Esse caminho não pode estar no mesmo caminho de saída do arquivo do Datastream. O padrão é empty.
  • dlqRetryMinutes: o número de minutos entre as novas tentativas de DLQ. O padrão é 10.
  • dlqMaxRetries: o número máximo de vezes para repetir um registro com falha da DLQ antes de marcá-lo como uma falha permanente. O padrão é 5.
  • schemaCacheRefreshMinutes: o número de minutos para armazenar em cache os esquemas de tabela. O padrão é 1440 (24 horas).
  • runMode: é o tipo de modo de execução, regular ou com retryDLQ. O padrão é: normal.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione o modelo Cloud Datastream para SQL.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/ \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

Substitua:

  • PROJECT_ID: o ID do projeto Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: o caminho do Cloud Storage para os dados do Datastream. Exemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: a assinatura do Pub/Sub para ler os arquivos alterados. Por exemplo, projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: o IP do host do SQL.
  • DATABASE_USER: seu usuário do SQL.
  • DATABASE_PASSWORD: sua senha do SQL.

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/",
   }
}
  

Substitua:

  • PROJECT_ID: o ID do projeto Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: o caminho do Cloud Storage para os dados do Datastream. Exemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: a assinatura do Pub/Sub para ler os arquivos alterados. Por exemplo, projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: o IP do host do SQL.
  • DATABASE_USER: seu usuário do SQL.
  • DATABASE_PASSWORD: sua senha do SQL.

A seguir