Modelo de streams de alterações do Spanner para o Pub/Sub

O modelo de streams de alterações do Spanner para o Pub/Sub é um pipeline de streaming que transmite registos de alterações de dados do Spanner e escreve-os em tópicos do Pub/Sub usando o Dataflow Runner V2.

Para enviar os seus dados para um novo tópico do Pub/Sub, primeiro tem de criar o tópico. Após a criação, o Pub/Sub gera e anexa automaticamente uma subscrição ao novo tópico. Se tentar enviar dados para um tópico do Pub/Sub que não existe, o pipeline do Dataflow gera uma exceção e fica bloqueado, pois tenta continuamente estabelecer uma ligação.

Se o tópico do Pub/Sub necessário já existir, pode enviar dados para esse tópico.

Para mais informações, consulte os artigos Acerca das streams de alterações, Crie ligações de streams de alterações com o Dataflow, e Práticas recomendadas para streams de alterações.

Requisitos do pipeline

  • A instância do Spanner tem de existir antes de executar o pipeline.
  • A base de dados do Spanner tem de existir antes de executar o pipeline.
  • A instância de metadados do Spanner tem de existir antes da execução do pipeline.
  • A base de dados de metadados do Spanner tem de existir antes de executar o pipeline.
  • A stream de alterações do Spanner tem de existir antes de executar o pipeline.
  • O tópico Pub/Sub tem de existir antes de executar o pipeline.

Parâmetros de modelos

Parâmetros obrigatórios

  • spannerInstanceId: a instância do Spanner a partir da qual ler streams de alterações.
  • spannerDatabase: a base de dados do Spanner a partir da qual ler as streams de alterações.
  • spannerMetadataInstanceId: a instância do Spanner a usar para a tabela de metadados do conetor de streams de alterações.
  • spannerMetadataDatabase: a base de dados do Spanner a usar para a tabela de metadados do conetor de streams de alterações.
  • spannerChangeStreamName: o nome da stream de alterações do Spanner a partir da qual os dados serão lidos.
  • pubsubTopic: o tópico do Pub/Sub para a saída de streams de alterações.

Parâmetros opcionais

  • spannerProjectId: o projeto a partir do qual ler os fluxos de alterações. É também neste projeto que a tabela de metadados do conetor de streams de alterações é criada. O valor predefinido deste parâmetro é o projeto no qual o pipeline do Dataflow está a ser executado.
  • spannerDatabaseRole: a função da base de dados do Spanner a usar quando executar o modelo. Este parâmetro só é necessário quando o principal do IAM que está a executar o modelo é um utilizador do controlo de acesso detalhado. A função da base de dados tem de ter o privilégio SELECT na stream de alterações e o privilégio EXECUTE na função de leitura da stream de alterações. Para mais informações, consulte o artigo Controlo de acesso detalhado para streams de alterações (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName: o nome da tabela de metadados do conetor de streams de alterações do Spanner a usar. Se não for fornecida, o Spanner cria automaticamente a tabela de metadados do conector de streams durante a alteração do fluxo do pipeline. Tem de fornecer este parâmetro quando atualizar um pipeline existente. Não use este parâmetro para outros casos.
  • startTimestamp: o DateTime de início (https://tools.ietf.org/html/rfc3339), inclusive, a usar para ler streams de alterações. Por exemplo, ex- 2021-10-12T07:20:50.52Z. A predefinição é a data/hora em que o pipeline é iniciado, ou seja, a hora atual.
  • endTimestamp: o DateTime final (https://tools.ietf.org/html/rfc3339), inclusive, a usar para ler fluxos de alterações. Por exemplo, ex- 2021-10-12T07:20:50.52Z. A predefinição é um tempo infinito no futuro.
  • spannerHost: o ponto final do Cloud Spanner a chamar no modelo. Usado apenas para testes. Por exemplo, https://spanner.googleapis.com. O valor predefinido é: https://spanner.googleapis.com.
  • outputDataFormat: o formato da saída. O resultado é envolvido em muitas PubsubMessages e enviado para um tópico Pub/Sub. Os formatos permitidos são JSON e AVRO. A predefinição é JSON.
  • pubsubAPI: a API Pub/Sub usada para implementar o pipeline. As APIs permitidas são pubsubio e native_client. Para um pequeno número de consultas por segundo (CPS), o native_client tem uma latência inferior. Para um grande número de CPS, a pubsubio oferece um desempenho melhor e mais estável. A predefinição é pubsubio.
  • pubsubProjectId: projeto do tópico do Pub/Sub. O valor predefinido deste parâmetro é o projeto no qual o pipeline do Dataflow está a ser executado.
  • rpcPriority: a prioridade do pedido para chamadas do Spanner. Os valores permitidos são HIGH, MEDIUM e LOW. A predefinição é: HIGH).
  • includeSpannerSource: indica se deve ou não incluir o ID da base de dados do Spanner e o ID da instância para ler a stream de alterações dos dados da mensagem de saída. A predefinição é: false.
  • outputMessageMetadata: o valor da string para o campo personalizado outputMessageMetadata na mensagem de publicação/subscrição de saída. A predefinição é vazio e o campo outputMessageMetadata só é preenchido se este valor não estiver vazio. Introduza os carateres especiais de forma literal quando introduzir o valor aqui(ou seja, aspas duplas).

Execute o modelo

Consola

  1. Aceda à página Dataflow Criar tarefa a partir de um modelo.
  2. Aceda a Criar tarefa a partir de modelo
  3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
  4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

    Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte o artigo Localizações do Dataflow.

  5. No menu pendente Modelo do fluxo de dados, selecione the Cloud Spanner change streams to Pub/Sub template.
  6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
  7. Clique em Executar tarefa.

gcloud

Na shell ou no terminal, execute o modelo:

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_PubSub \
        --region REGION_NAME \
        --parameters \
    spannerInstanceId=SPANNER_INSTANCE_ID,\
    spannerDatabase=SPANNER_DATABASE,\
    spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
    spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
    spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
    pubsubTopic=PUBSUB_TOPIC
    

Substitua o seguinte:

  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Spanner
  • SPANNER_DATABASE: base de dados do Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Spanner
  • SPANNER_METADATA_DATABASE: base de dados de metadados do Spanner
  • SPANNER_CHANGE_STREAM: stream de alterações do Spanner
  • PUBSUB_TOPIC: o tópico do Pub/Sub para a saída de streams de alterações

API

Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
    "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "spannerInstanceId": "SPANNER_INSTANCE_ID",
            "spannerDatabase": "SPANNER_DATABASE",
            "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
            "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
            "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
            "pubsubTopic": "PUBSUB_TOPIC"
        },
        "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_PubSub",
    }
  }
  

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Spanner
  • SPANNER_DATABASE: base de dados do Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Spanner
  • SPANNER_METADATA_DATABASE: base de dados de metadados do Spanner
  • SPANNER_CHANGE_STREAM: stream de alterações do Spanner
  • PUBSUB_TOPIC: o tópico do Pub/Sub para a saída de streams de alterações

O que se segue?