Modelo de fluxos de alterações do Spanner para o banco de dados de origem

Pipeline de streaming. Lê dados dos fluxos de alterações do Spanner e os grava em uma origem.

Parâmetros do modelo

Parâmetro Descrição
changeStreamName O nome do fluxo de alterações do Spanner que o pipeline lê.
instanceId O nome da instância do Spanner em que o fluxo de alterações está presente.
databaseId O nome do banco de dados do Spanner que o fluxo de alterações monitora.
spannerProjectId O nome do projeto do Spanner.
metadataInstance A instância para armazenar os metadados usados pelo conector para controlar o consumo dos dados da API Change Stream.
metadataDatabase O banco de dados para armazenar os metadados usados pelo conector para controlar o consumo dos dados da API Change Stream.
sourceShardsFilePath Caminho para um arquivo do Cloud Storage que contém informações do perfil de conexão para fragmentos de origem.
startTimestamp Opcional: o carimbo de data/hora inicial para ler as mudanças. O padrão é vazio.
endTimestamp Opcional: o carimbo de data/hora de término para leitura de mudanças. Se nenhum carimbo de data/hora for fornecido, a leitura será feita indefinidamente. O padrão é vazio.
shadowTablePrefix Opcional: o prefixo usado para nomear tabelas de sombra. Padrão: shadow_.
sessionFilePath Opcional: caminho da sessão no Cloud Storage que contém informações de mapeamento do HarbourBridge.
filtrationMode Opcional: modo de filtragem. Especifica como descartar determinados registros com base em um critério. Os modos compatíveis são: none (não filtrar nada) e forward_migration (filtrar registros gravados usando o pipeline de migração direta). O padrão é forward_migration.
shardingCustomJarPath Opcional: local do arquivo JAR personalizado no Cloud Storage que contém a lógica de personalização para buscar o ID do fragmento. Se você definir esse parâmetro, defina o parâmetro shardingCustomJarPath. O padrão é vazio.
shardingCustomClassName Opcional: nome de classe totalmente qualificado com a implementação personalizada do ID de fragmento. Se shardingCustomJarPath for especificado, esse parâmetro será obrigatório. O padrão é vazio.
shardingCustomParameters Opcional: string contendo os parâmetros personalizados que serão passados para a classe de fragmentação personalizada. O padrão é vazio.
sourceDbTimezoneOffset Opcional: o deslocamento de fuso horário do UTC para o banco de dados de origem. Exemplo de valor: +10:00. O padrão é +00:00.
dlqGcsPubSubSubscription Opcional: a assinatura do Pub/Sub que está sendo usada em uma política de notificação do Cloud Storage para o diretório de repetição de DLQ quando executado no modo normal. O nome precisa estar no formato projects/<project-id>/subscriptions/<subscription-name>. Quando definido, o deadLetterQueueDirectory e dlqRetryMinutes são ignorados.
skipDirectoryName Opcional: os registros ignorados da replicação inversa são gravados neste diretório. O nome do diretório padrão é "skip".
maxShardConnections Opcional: o número máximo de conexões que um determinado fragmento pode aceitar. O padrão é 10000.
deadLetterQueueDirectory Opcional: o caminho usado para armazenar a saída da fila de erros. O caminho padrão é um diretório no local temporário do job do Dataflow.
dlqMaxRetryCount Opcional: o número máximo de vezes que os erros temporários podem ser repetidos pela fila de mensagens inativas. O padrão é 500.
runMode Opcional: o tipo de modo de execução. Valores aceitos: regular, retryDLQ. Padrão: regular. Especifique retryDLQ para repetir somente registros graves da fila de mensagens não entregues.
dlqRetryMinutes Opcional: o número de minutos entre novas tentativas de fila de mensagens inativas (DLQ) O valor padrão é 10.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione o modelo Fluxos de mudanças do Spanner para o banco de dados de origem.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

CLI da gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/ \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       changeStreamName=CHANGE_STREAM_NAME,\
       instanceId=INSTANCE_ID,\
       databaseId=DATABASE_ID,\
       spannerProjectId=SPANNER_PROJECT_ID,\
       metadataInstance=METADATA_INSTANCE,\
       metadataDatabase=METADATA_DATABASE,\
       sourceShardsFilePath=SOURCE_SHARDS_FILE_PATH,\

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • CHANGE_STREAM_NAME: o nome do fluxo de alterações de onde ler
  • INSTANCE_ID: o ID da instância do Cloud Spanner.
  • DATABASE_ID: o ID do banco de dados do Cloud Spanner.
  • SPANNER_PROJECT_ID: o ID do projeto do Cloud Spanner.
  • METADATA_INSTANCE: a instância do Cloud Spanner para armazenar metadados ao ler de fluxos de alterações
  • METADATA_DATABASE: o banco de dados do Cloud Spanner para armazenar metadados ao ler de fluxos de alterações.
  • SOURCE_SHARDS_FILE_PATH: o caminho para o arquivo do GCS que contém os detalhes do fragmento de origem

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "changeStreamName": "CHANGE_STREAM_NAME",
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "spannerProjectId": "SPANNER_PROJECT_ID",
       "metadataInstance": "METADATA_INSTANCE",
       "metadataDatabase": "METADATA_DATABASE",
       "sourceShardsFilePath": "SOURCE_SHARDS_FILE_PATH",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/",
     "environment": { "maxWorkers": "10" }
  }
}

Substitua:

  • PROJECT_ID: o ID do projeto Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • CHANGE_STREAM_NAME: o nome do fluxo de alterações de onde ler
  • INSTANCE_ID: o ID da instância do Cloud Spanner.
  • DATABASE_ID: o ID do banco de dados do Cloud Spanner.
  • SPANNER_PROJECT_ID: o ID do projeto do Cloud Spanner.
  • METADATA_INSTANCE: a instância do Cloud Spanner para armazenar metadados ao ler de fluxos de alterações
  • METADATA_DATABASE: o banco de dados do Cloud Spanner para armazenar metadados ao ler de fluxos de alterações.
  • SOURCE_SHARDS_FILE_PATH: o caminho para o arquivo do GCS que contém os detalhes do fragmento de origem