Nesta página, mostramos como criar pipelines do Dataflow que consomem e encaminham dados de mudança do Spanner usando fluxos de mudanças. Você pode usar o exemplo de código nesta página para criar pipelines personalizados.
Principais conceitos
Confira alguns conceitos básicos para pipelines do Dataflow para fluxo de alterações.
Dataflow
O Dataflow é um serviço sem servidor, rápido e econômico que oferece suporte ao processamento de stream e em lote. Ele oferece portabilidade com jobs de processamento escritos usando as bibliotecas de código aberto Apache Beam e automatiza o provisionamento de infraestrutura e o gerenciamento de clusters. O Dataflow oferece streaming quase em tempo real ao ler fluxo de alterações.
É possível usar o Dataflow para consumir fluxos de alterações do Spanner com o conector SpannerIO, que oferece uma abstração sobre a API Spanner para consultar fluxo de alterações. Com esse conector, não é necessário gerenciar o ciclo de vida da partição fluxo de alterações, o que é necessário ao usar a API Spanner diretamente. O conector fornece um fluxo de registros de mudança de dados para que você possa se concentrar mais na lógica do aplicativo e menos em detalhes específicos da API e no particionamento dinâmico do fluxo de mudanças. Recomendamos usar o conector SpannerIO em vez da API Spanner na maioria das circunstâncias em que você precisa ler dados de fluxo de alterações.
Os modelos do Dataflow são pipelines pré-criados que implementam casos de uso comuns. Consulte Modelos do Dataflow para uma visão geral.
Pipeline do Dataflow
Um pipeline do Dataflow de fluxo de alterações do Spanner é composto de quatro partes principais:
- Um banco de dados do Spanner com um fluxo de alterações
- O conector SpannerIO
- Transformações e receptores definidos pelo usuário
- Um gravador de E/S de coletor do Apache Beam
Fluxo de alterações do Spanner
Para detalhes sobre como criar um fluxo de alterações, consulte Criar um fluxo de alterações.
Conector SpannerIO do Apache Beam
Esse é o conector SpannerIO descrito na seção anterior do Dataflow.
É um conector de E/S de origem que emite um PCollection de registros de mudança de dados
para etapas posteriores do pipeline. A hora do evento de cada registro de mudança de dados emitido será o carimbo de data/hora de confirmação. Os registros emitidos não são ordenados, e o conector SpannerIO garante que não haverá registros atrasados.
Ao trabalhar com fluxo de alterações, o Dataflow usa o checkpointing. Como resultado, cada worker pode aguardar até o intervalo de checkpoint configurado para armazenar as mudanças em buffer antes de enviá-las para processamento adicional.
Transformações definidas pelo usuário
Uma transformação definida pelo usuário permite agregar, transformar ou modificar dados de processamento em um pipeline do Dataflow. Os casos de uso comuns incluem a remoção de informações de identificação pessoal, a satisfação dos requisitos de formato de dados downstream e a classificação. Consulte a documentação oficial do Apache Beam para o guia de programação sobre transformações.
Gravador de E/S de coletor do Apache Beam
O Apache Beam contém conectores de E/S integrados que podem ser usados para gravar de um pipeline do Dataflow em um coletor de dados, como o BigQuery. Os destinos de dados mais comuns são compatíveis de forma nativa.
Modelos do Dataflow
Os modelos do Dataflow oferecem um método para criar jobs do Dataflow com base em imagens Docker pré-criadas para casos de uso comuns usando o console Google Cloud , a CLI Google Cloud ou chamadas da API REST.
Para fluxo de alterações do Spanner, oferecemos três modelos flexíveis do Dataflow:
As seguintes restrições se aplicam ao usar o modelo streams fluxo de alterações para o Pub/Sub:
O Pub/Sub tem uma limitação de tamanho de mensagem de 10 MB. Para mais informações, consulte Cotas e limites do Pub/Sub.
O modelo Fluxos fluxo de alterações para Pub/Sub não oferece suporte ao processamento de mensagens grandes devido às limitações do Pub/Sub.
Definir permissões do IAM para modelos do Dataflow
Antes de criar um job do Dataflow com os três modelos flexíveis listados, verifique se você tem as permissões necessárias do IAM para as seguintes contas de serviço:
Se você não tiver as permissões necessárias do IAM, será preciso especificar uma conta de serviço de worker gerenciada pelo usuário para criar o job do Dataflow. Para mais informações, consulte Segurança e permissões do Dataflow.
Quando você tenta executar um job com base em um modelo flexível do Dataflow sem todas as permissões necessárias, o job pode falhar com um erro ao ler o arquivo de resultado ou um erro de permissão negada no recurso. Para mais informações, consulte Solução de problemas de modelos flexíveis.
criar um pipeline do Dataflow
Esta seção aborda a configuração inicial do conector e fornece exemplos de integrações comuns com o recurso de fluxo de alterações do Spanner.
Para seguir estas etapas, você precisa de um ambiente de desenvolvimento Java para o Dataflow. Para mais informações, consulte Criar um pipeline do Dataflow usando Java.
Criar um stream de alterações
Para detalhes sobre como criar um fluxo de alterações, consulte Criar um fluxo de alterações. Para continuar com as próximas etapas, você precisa ter um banco de dados do Spanner com um fluxo de alterações configurado.
Conceder privilégios de controle de acesso refinado
Se você espera que usuários de controle de acesso refinado executem o job do Dataflow,
garanta que eles tenham acesso a uma função de banco de dados
com o privilégio SELECT no fluxo de alterações e o privilégio EXECUTE
na função com valor de tabela do fluxo de alterações. Verifique também se o
principal especifica a função de banco de dados na configuração do SpannerIO ou no
modelo flex do Dataflow.
Para mais informações, consulte Sobre o controle de acesso minucioso.
Adicionar o conector SpannerIO como uma dependência
O conector SpannerIO do Apache Beam encapsula a complexidade de consumir os fluxos de alterações diretamente usando a API Cloud Spanner, emitindo uma PCollection de registros de dados de fluxo de alterações para etapas posteriores do pipeline.
Esses objetos podem ser consumidos em outras etapas do pipeline do Dataflow do usuário. A integração do fluxo de alterações faz parte do conector SpannerIO. Para usar o conector SpannerIO, a dependência precisa ser adicionada ao arquivo pom.xml:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>
Criar um banco de dados de metadados
O conector precisa rastrear cada partição ao executar o pipeline do Apache Beam. Ele mantém esses metadados em uma tabela do Spanner criada pelo conector durante a inicialização. Você especifica o banco de dados em que essa tabela será criada ao configurar o conector.
Conforme descrito em Práticas recomendadas para fluxos de alteração, recomendamos criar um novo banco de dados para essa finalidade, em vez de permitir que o conector use o banco de dados do aplicativo para armazenar a tabela de metadados.
O proprietário de um job do Dataflow que usa o conector SpannerIO precisa ter as seguintes permissões do IAM definidas com esse banco de dados de metadados:
spanner.databases.updateDdlspanner.databases.beginReadOnlyTransactionspanner.databases.beginOrRollbackReadWriteTransactionspanner.databases.readspanner.databases.selectspanner.databases.writespanner.sessions.createspanner.sessions.get
Configurar o conector
O conector de fluxo de alterações do Spanner pode ser configurado da seguinte maneira:
SpannerConfig spannerConfig = SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role"); // Needed for fine-grained access control only
Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
startTime.getSeconds() + (10 * 60),
startTime.getNanos()
);
SpannerIO
.readChangeStream()
.withSpannerConfig(spannerConfig)
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-meta-instance-id")
.withMetadataDatabase("my-meta-database-id")
.withMetadataTable("my-meta-table-name")
.withRpcPriority(RpcPriority.MEDIUM)
.withInclusiveStartAt(startTime)
.withInclusiveEndAt(endTime);
Confira abaixo as descrições das opções de readChangeStream():
Configuração do Spanner (obrigatório)
Usado para configurar o projeto, a instância e o banco de dados em que o fluxo de alterações foi criado e de onde ele deve ser consultado. Também especifica opcionalmente o papel de banco de dados a ser usado quando o principal do IAM que executa o job do Dataflow é um usuário de controle de acesso minucioso. O job assume essa função de banco de dados para acessar o fluxo de alterações. Para mais informações, consulte Sobre o controle de acesso refinado.
Nome do fluxo de alterações (obrigatório)
Esse nome identifica exclusivamente o fluxo de mudanças. O nome aqui precisa ser igual ao usado na criação.
ID da instância de metadados (opcional)
Essa é a instância para armazenar os metadados usados pelo conector para controlar o consumo dos dados da API Change Stream.
ID do banco de dados de metadados (obrigatório)
É o banco de dados para armazenar os metadados usados pelo conector para controlar o consumo dos dados da API de fluxo de alterações.
Nome da tabela de metadados (opcional)
Isso só deve ser usado ao atualizar um pipeline atual.
Esse é o nome da tabela de metadados preexistente a ser usado pelo conector. Usado pelo conector para armazenar os metadados e controlar o consumo dos dados da API Change Stream. Se essa opção for omitida, o Spanner vai criar uma tabela com um nome gerado na inicialização do conector.
Prioridade da RPC (opcional)
A prioridade
da solicitação a ser
usada para as consultas de fluxo de mudanças. Se esse parâmetro for omitido, high
priority será usado.
InclusiveStartAt (obrigatório)
As mudanças do carimbo de data/hora especificado são retornadas ao autor da chamada.
InclusiveEndAt (opcional)
As mudanças até o carimbo de data/hora especificado são retornadas ao autor da chamada. Se esse parâmetro for omitido, as mudanças serão emitidas indefinidamente.
Adicionar transformações e gravadores para processar dados de mudança
Com as etapas anteriores concluídas, o conector SpannerIO configurado está pronto
para emitir uma PCollection de objetos DataChangeRecord.
Consulte Exemplos de transformações e sinks para ver várias configurações de pipeline de amostra que processam esses dados transmitidos de várias maneiras.
Os registros de fluxo de alterações emitidos pelo conector SpannerIO não são ordenados. Isso acontece porque as PCollections não oferecem garantias de ordenação. Se você precisar de um fluxo ordenado, agrupe e classifique os registros como transformações nos seus pipelines. Consulte Exemplo: ordenar por chave. Você pode estender essa amostra para classificar os registros com base em qualquer campo deles, como IDs de transação.
Exemplo de transformações e coletores
Você pode definir suas próprias transformações e especificar coletores para gravar os dados. A documentação do Apache Beam oferece várias transformações que podem ser aplicadas, além de conectores de E/S prontos para uso que gravam os dados em sistemas externos.
Exemplo: ordenar por chave
Este exemplo de código emite registros de mudança de dados ordenados por carimbo de data/hora de confirmação e agrupados por chaves primárias usando o conector do Dataflow.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new BreakRecordByModFn()))
.apply(ParDo.of(new KeyByIdFn()))
.apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
// Subsequent processing goes here
Este exemplo de código usa estados e timers para armazenar em buffer registros de cada chave e define o tempo de expiração do timer para algum tempo T configurado pelo usuário no futuro (definido na função BufferKeyUntilOutputTimestamp). Quando a marca d'água do Dataflow passa do tempo T, esse código libera todos os registros no buffer com carimbo de data/hora menor que T, ordena esses registros por carimbo de data/hora de confirmação e gera um par de chave-valor em que:
- A chave é a chave de entrada, ou seja, a chave primária com hash para uma matriz de bucket de tamanho 1.000.
- O valor são os registros de alteração de dados ordenados que foram armazenados em buffer para a chave.
Para cada chave, temos as seguintes garantias:
- Os timers têm garantia de serem acionados na ordem do carimbo de data/hora de expiração.
- Os estágios downstream têm a garantia de receber os elementos na mesma ordem em que foram produzidos.
Por exemplo, com uma chave de valor 100, o timer é acionado em T1 e T10, respectivamente, produzindo um pacote de registros de mudança de dados em cada carimbo de data/hora. Como os registros de mudança de dados gerados em T1 foram produzidos antes dos registros gerados em T10, os registros de mudança de dados gerados em T1 também são recebidos pela próxima etapa antes dos registros gerados em T10. Esse mecanismo ajuda a garantir uma ordenação estrita de carimbos de data/hora de confirmação por chave primária para processamento downstream.
Esse processo se repete até que o pipeline termine e todos os registros de mudança de dados sejam processados (ou se repete indefinidamente se nenhum horário de término for especificado).
Esta exemplo de código usa estados e timers, em vez de janelas, para realizar a ordenação por chave. O motivo é que não há garantia de que as janelas serão processadas em ordem. Isso significa que janelas mais antigas podem ser processadas depois de janelas mais recentes, o que pode resultar em processamento fora de ordem.
BreakRecordByModFn
Cada registro de alteração de dados pode conter várias modificações. Cada mod representa uma inserção, atualização ou exclusão de um único valor de chave primária. Essa função divide cada registro de mudança de dados em registros separados, um por mod.
private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
DataChangeRecord> {
@ProcessElement
public void processElement(
@Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
outputReceiver) {
record.getMods().stream()
.map(
mod ->
new DataChangeRecord(
record.getPartitionToken(),
record.getCommitTimestamp(),
record.getServerTransactionId(),
record.isLastRecordInTransactionInPartition(),
record.getRecordSequence(),
record.getTableName(),
record.getRowType(),
Collections.singletonList(mod),
record.getModType(),
record.getValueCaptureType(),
record.getNumberOfRecordsInTransaction(),
record.getNumberOfPartitionsInTransaction(),
record.getTransactionTag(),
record.isSystemTransaction(),
record.getMetadata()))
.forEach(outputReceiver::output);
}
}
KeyByIdFn
Essa função usa um DataChangeRecord e gera um DataChangeRecord com chave pela chave primária do Spanner com hash para um valor inteiro.
private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>> {
// NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
// Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
// of states and timers for performance purposes.
// Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
// On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
private static final int NUMBER_OF_BUCKETS = 1000;
@ProcessElement
public void processElement(
@Element DataChangeRecord record,
OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
// Hash the received keys into a bucket in order to have a
// deterministic number of buffers and timers.
String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);
outputReceiver.output(KV.of(bucketIndex, record));
}
}
BufferKeyUntilOutputTimestamp
Timers e buffers são por chave. Essa função armazena em buffer cada registro de mudança de dados até que a marca-d'água passe o carimbo de data/hora em que queremos gerar os registros de mudança de dados armazenados em buffer.
Este código usa um timer de loop para determinar quando limpar o buffer:
- Quando ele vê um registro de mudança de dados para uma chave pela primeira vez, define o temporizador para ser acionado no carimbo de data/hora de confirmação do registro de mudança de dados +
incrementIntervalSeconds(uma opção configurável pelo usuário). - Quando o timer é acionado, ele adiciona todos os registros de mudança de dados no buffer com carimbo de data/hora menor que o prazo de validade do timer a
recordsToOutput. Se o buffer tiver registros de mudança de dados com um carimbo de data/hora maior ou igual ao tempo de expiração do timer, ele adicionará esses registros de volta ao buffer em vez de gerá-los. Em seguida, ele define o próximo timer como o tempo de expiração do timer atual maisincrementIntervalInSeconds. - Se
recordsToOutputnão estiver vazio, a função vai ordenar os registros de mudança de dados emrecordsToOutputpor carimbo de data/hora de confirmação e ID da transação e, em seguida, vai gerar a saída deles.
private static class BufferKeyUntilOutputTimestamp extends
DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>> {
private static final Logger LOG =
LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);
private final long incrementIntervalInSeconds = 2;
private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
this.incrementIntervalInSeconds = incrementIntervalInSeconds;
}
@SuppressWarnings("unused")
@TimerId("timer")
private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@StateId("buffer")
private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();
@StateId("keyString")
private final StateSpec<ValueState<String>> keyString =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
public void process(
@Element KV<String, DataChangeRecord> element,
@StateId("buffer") BagState<DataChangeRecord> buffer,
@TimerId("timer") Timer timer,
@StateId("keyString") ValueState<String> keyString) {
buffer.add(element.getValue());
// Only set the timer if this is the first time we are receiving a data change
// record with this key.
String elementKey = keyString.read();
if (elementKey == null) {
Instant commitTimestamp =
new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
Instant outputTimestamp =
commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
timer.set(outputTimestamp);
keyString.write(element.getKey());
}
}
@OnTimer("timer")
public void onExpiry(
OnTimerContext context,
@StateId("buffer") BagState<DataChangeRecord> buffer,
@TimerId("timer") Timer timer,
@StateId("keyString") ValueState<String> keyString) {
if (!buffer.isEmpty().read()) {
String elementKey = keyString.read();
final List<DataChangeRecord> records =
StreamSupport.stream(buffer.read().spliterator(), false)
.collect(Collectors.toList());
buffer.clear();
List<DataChangeRecord> recordsToOutput = new ArrayList<>();
for (DataChangeRecord record : records) {
Instant recordCommitTimestamp =
new Instant(record.getCommitTimestamp().toSqlTimestamp());
final String recordString =
record.getMods().get(0).getNewValuesJson().isEmpty()
? "Deleted record"
: record.getMods().get(0).getNewValuesJson();
// When the watermark passes time T, this means that all records with
// event time < T have been processed and successfully committed. Since the
// timer fires when the watermark passes the expiration time, we should
// only output records with event time < expiration time.
if (recordCommitTimestamp.isBefore(context.timestamp())) {
LOG.info(
"Outputting record with key {} and value {} at expiration " +
"timestamp {}",
elementKey,
recordString,
context.timestamp().toString());
recordsToOutput.add(record);
} else {
LOG.info(
"Expired at {} but adding record with key {} and value {} back to " +
"buffer due to commit timestamp {}",
context.timestamp().toString(),
elementKey,
recordString,
recordCommitTimestamp.toString());
buffer.add(record);
}
}
// Output records, if there are any to output.
if (!recordsToOutput.isEmpty()) {
// Order the records in place, and output them. The user would need
// to implement DataChangeRecordComparator class that sorts the
// data change records by commit timestamp and transaction ID.
Collections.sort(recordsToOutput, new DataChangeRecordComparator());
context.outputWithTimestamp(
KV.of(elementKey, recordsToOutput), context.timestamp());
LOG.info(
"Expired at {}, outputting records for key {}",
context.timestamp().toString(),
elementKey);
} else {
LOG.info("Expired at {} with no records", context.timestamp().toString());
}
}
Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
LOG.info("Setting next timer to {}", nextTimer.toString());
timer.set(nextTimer);
} else {
LOG.info(
"Timer not being set since the buffer is empty: ");
keyString.clear();
}
}
}
Como ordenar transações
Esse pipeline pode ser alterado para ordenar por ID da transação e carimbo de data/hora do commit. Para isso, armazene em buffer os registros de cada par ID da transação / carimbo de data/hora de confirmação, em vez de cada chave do Spanner. Isso exige a modificação do código em KeyByIdFn.
Exemplo: reunir transações
Este exemplo de código lê registros de mudança de dados, reúne todos os registros pertencentes à mesma transação em um único elemento e gera esse elemento. As transações geradas por este exemplo de código não são ordenadas por carimbo de data/hora de confirmação.
Este exemplo de código usa buffers para montar transações de registros de mudança de dados. Ao receber um registro de mudança de dados pertencente a uma transação pela primeira vez, ele lê o campo numberOfRecordsInTransaction no registro, que descreve o número esperado de registros de mudança de dados pertencentes a essa transação. Ele armazena em buffer os registros de mudança de dados pertencentes a essa transação até que o número de registros armazenados em buffer corresponda a numberOfRecordsInTransaction, quando ele gera os registros de mudança de dados agrupados.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new KeyByTransactionIdFn()))
.apply(ParDo.of(new TransactionBoundaryFn()))
// Subsequent processing goes here
KeyByTransactionIdFn
Essa função usa um DataChangeRecord e gera um DataChangeRecord com chave pelo ID da transação.
private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>> {
@ProcessElement
public void processElement(
@Element DataChangeRecord record,
OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
outputReceiver.output(KV.of(record.getServerTransactionId(), record));
}
}
TransactionBoundaryFn
Os buffers TransactionBoundaryFn recebem pares de chave-valor de
{TransactionId, DataChangeRecord} de KeyByTransactionIdFn e
os armazenam em grupos com base em TransactionId. Quando o número de registros em buffer é igual ao número de registros contidos em toda a transação, essa função classifica os objetos DataChangeRecord no grupo por sequência de registros e gera um par de chave-valor de {CommitTimestamp, TransactionId}, Iterable<DataChangeRecord>.
Aqui, estamos supondo que SortKey é uma classe definida pelo usuário que representa um par {CommitTimestamp, TransactionId}. Para mais informações sobre o
SortKey, consulte a implementação de amostra.
private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>> {
@StateId("buffer")
private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();
@StateId("count")
private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();
@ProcessElement
public void process(
ProcessContext context,
@StateId("buffer") BagState<DataChangeRecord> buffer,
@StateId("count") ValueState<Integer> countState) {
final KV<String, DataChangeRecord> element = context.element();
final DataChangeRecord record = element.getValue();
buffer.add(record);
int count = (countState.read() != null ? countState.read() : 0);
count = count + 1;
countState.write(count);
if (count == record.getNumberOfRecordsInTransaction()) {
final List<DataChangeRecord> sortedRecords =
StreamSupport.stream(buffer.read().spliterator(), false)
.sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
.collect(Collectors.toList());
final Instant commitInstant =
new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
.getTime());
context.outputWithTimestamp(
KV.of(
new SortKey(sortedRecords.get(0).getCommitTimestamp(),
sortedRecords.get(0).getServerTransactionId()),
sortedRecords),
commitInstant);
buffer.clear();
countState.clear();
}
}
}
Exemplo: filtrar por tag de transação
Quando uma transação que modifica dados do usuário é marcada, a tag correspondente e o tipo dela são armazenados como parte de DataChangeRecord. Estes exemplos mostram como filtrar registros de fluxo de alterações com base em tags de transação definidas pelo usuário e tags do sistema:
Filtragem de tags definida pelo usuário para my-tx-tag:
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(Filter.by(record ->
!record.isSystemTransaction()
&& record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
// Subsequent processing goes here
Filtragem de tags do sistema/auditoria de TTL:
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(Filter.by(record ->
record.isSystemTransaction()
&& record.getTransactionTag().equals("RowDeletionPolicy")))
// Subsequent processing goes here
Exemplo: buscar linha completa
Este exemplo funciona com uma tabela do Spanner chamada Singer que tem a seguinte definição:
CREATE TABLE Singers (
SingerId INT64 NOT NULL,
FirstName STRING(1024),
LastName STRING(1024)
) PRIMARY KEY (SingerId);
No modo de captura de valor OLD_AND_NEW_VALUES padrão dos fluxo de alterações,
quando há uma atualização em uma linha do Spanner, o registro
de mudança de dados recebido contém apenas as colunas que foram alteradas. As colunas rastreadas, mas
inalteradas, não serão incluídas no registro. A chave primária da
modificação pode ser usada para fazer uma leitura de snapshot do Spanner no carimbo de data/hora
de confirmação do registro de alteração de dados para buscar as colunas inalteradas ou até mesmo
recuperar a linha completa.
A política de retenção do banco de dados pode precisar ser alterada para um valor maior ou igual à política de retenção do fluxo de alterações para que a leitura do snapshot seja bem-sucedida.
Além disso, usar o tipo de captura de valor NEW_ROW é a maneira recomendada e mais eficiente de fazer isso, já que ele retorna todas as colunas rastreadas da linha por padrão e não exige uma leitura de instantâneo extra no Spanner.
SpannerConfig spannerConfig = SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role"); // Needed for fine-grained access control only
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(spannerConfig)
// Assume we have a change stream "my-change-stream" that watches Singers table.
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
// Subsequent processing goes here
ToFullRowJsonFn
Essa transformação vai realizar uma leitura desatualizada no carimbo de data/hora de confirmação de cada registro recebido e mapear a linha completa para JSON.
public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
// Since each instance of this DoFn will create its own session pool and will
// perform calls to Spanner sequentially, we keep the number of sessions in
// the pool small. This way, we avoid wasting resources.
private static final int MIN_SESSIONS = 1;
private static final int MAX_SESSIONS = 5;
private final String projectId;
private final String instanceId;
private final String databaseId;
private transient DatabaseClient client;
private transient Spanner spanner;
public ToFullRowJsonFn(SpannerConfig spannerConfig) {
this.projectId = spannerConfig.getProjectId().get();
this.instanceId = spannerConfig.getInstanceId().get();
this.databaseId = spannerConfig.getDatabaseId().get();
}
@Setup
public void setup() {
SessionPoolOptions sessionPoolOptions = SessionPoolOptions
.newBuilder()
.setMinSessions(MIN_SESSIONS)
.setMaxSessions(MAX_SESSIONS)
.build();
SpannerOptions options = SpannerOptions
.newBuilder()
.setProjectId(projectId)
.setSessionPoolOption(sessionPoolOptions)
.build();
DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
spanner = options.getService();
client = spanner.getDatabaseClient(id);
}
@Teardown
public void teardown() {
spanner.close();
}
@ProcessElement
public void process(
@Element DataChangeRecord element,
OutputReceiver<String> output) {
com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
element.getMods().forEach(mod -> {
JSONObject keysJson = new JSONObject(mod.getKeysJson());
JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
ModType modType = element.getModType();
JSONObject jsonRow = new JSONObject();
long singerId = keysJson.getLong("SingerId");
jsonRow.put("SingerId", singerId);
if (modType == ModType.INSERT) {
// For INSERT mod, get non-primary key columns from mod.
jsonRow.put("FirstName", newValuesJson.get("FirstName"));
jsonRow.put("LastName", newValuesJson.get("LastName"));
} else if (modType == ModType.UPDATE) {
// For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
try (ResultSet resultSet = client
.singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
.read(
"Singers",
KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
Arrays.asList("FirstName", "LastName"))) {
if (resultSet.next()) {
jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
JSONObject.NULL : resultSet.getString("FirstName"));
jsonRow.put("LastName", resultSet.isNull("LastName") ?
JSONObject.NULL : resultSet.getString("LastName"));
}
}
} else {
// For DELETE mod, there is nothing to do, as we already set SingerId.
}
output.output(jsonRow.toString());
});
}
}
Esse código cria um cliente de banco de dados do Spanner para realizar a busca completa de linhas e configura o pool de sessões para ter apenas algumas sessões, realizando leituras em uma instância de ToFullReowJsonFn sequencialmente.
O Dataflow gera muitas instâncias dessa função, cada uma com o próprio pool de clientes.
Exemplo: Spanner para Pub/Sub
Nesse cenário, o autor da chamada transmite registros para o Pub/Sub o mais rápido possível, sem agrupamento ou agregação. Isso é adequado para acionar o processamento downstream, como o streaming de todas as novas linhas inseridas em uma tabela do Spanner para o Pub/Sub para processamento posterior.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
.apply(PubsubIO.writeStrings().to("my-topic"));
O coletor do Pub/Sub pode ser configurado para garantir a semântica exatamente uma vez.
Exemplo: Spanner para Cloud Storage
Nesse cenário, o caller agrupa todos os registros em uma determinada janela e salva o grupo em arquivos separados do Cloud Storage. Isso é adequado para análises e arquivamento pontual, que é independente do período de armazenamento do Spanner.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(TextIO
.write()
.to("gs://my-bucket/change-stream-results-")
.withSuffix(".txt")
.withWindowedWrites()
.withNumShards(1));
A gravidade do Cloud Storage oferece semântica de "pelo menos uma vez" por padrão. Com processamento extra, ele pode ser modificado para ter semântica de exatamente uma vez.
Também oferecemos um modelo do Dataflow para esse caso de uso. Consulte Conectar fluxo de alterações ao Cloud Storage.
Exemplo: do Spanner para o BigQuery (tabela de razão)
Aqui, o caller transmite registros de mudanças para o BigQuery. Cada registro de mudança de dados é refletido como uma linha no BigQuery. Isso é adequado para análises. Esse código usa as funções definidas anteriormente, na seção Buscar linha completa, para recuperar a linha completa do registro e gravá-la no BigQuery.
SpannerConfig spannerConfig = SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role"); // Needed for fine-grained access control only
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(spannerConfig)
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
.apply(BigQueryIO
.<String>write()
.to("my-bigquery-table")
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
.withSchema(new TableSchema().setFields(Arrays.asList(
new TableFieldSchema()
.setName("SingerId")
.setType("INT64")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("FirstName")
.setType("STRING")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("LastName")
.setType("STRING")
.setMode("REQUIRED")
)))
.withAutoSharding()
.optimizedWrites()
.withFormatFunction((String element) -> {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = null;
try {
jsonNode = objectMapper.readTree(element);
} catch (IOException e) {
e.printStackTrace();
}
return new TableRow()
.set("SingerId", jsonNode.get("SingerId").asInt())
.set("FirstName", jsonNode.get("FirstName").asText())
.set("LastName", jsonNode.get("LastName").asText());
}
)
);
Por padrão, o gravador do BigQuery oferece semântica do tipo "pelo menos uma vez". Com processamento extra, ele pode ser modificado para ter semântica de exatamente uma vez.
Também oferecemos um modelo do Dataflow para esse caso de uso. Consulte Conectar fluxo de alterações ao BigQuery.
Monitorar um pipeline
Há duas classes de métricas disponíveis para monitorar um pipeline do Dataflow de fluxo de alterações.
Métricas padrão do Dataflow
O Dataflow oferece várias métricas para garantir que seu job esteja íntegro, como atualização de dados, atraso do sistema, capacidade de processamento do job, uso da CPU do worker e muito mais. Para mais informações, consulte Como usar o Monitoring em pipelines do Dataflow.
Para pipelines de fluxo de alterações, há duas métricas principais que precisam ser consideradas: a latência do sistema e a atualização de dados.
A latência do sistema informa a duração máxima atual (em segundos) em que um item de dados é processado ou aguarda processamento.
A atualização de dados mostra a quantidade de tempo entre o momento atual (tempo real) e a marca d'água de saída. A marca-d'água de saída do tempo T indica que todos os elementos com um tempo de evento (estritamente) anterior a T foram processados para computação. Em outras palavras, a atualização de dados mede o quão atualizado está o pipeline em relação ao processamento dos eventos recebidos.
Se o pipeline tiver poucos recursos, você poderá ver esse efeito nessas duas métricas. A latência do sistema vai aumentar, porque os itens precisam esperar mais tempo antes de serem processados. A atualização de dados também vai aumentar, porque o pipeline não vai conseguir acompanhar a quantidade de dados recebidos.
Métricas personalizadas de fluxo de mudanças
Essas métricas são expostas no Cloud Monitoring e incluem:
- Latência agrupada (histograma) entre a confirmação de um registro no Spanner e a emissão dele em uma PCollection pelo conector. Essa métrica pode ser usada para identificar problemas de desempenho (latência) com o pipeline.
- Número total de registros de dados lidos. É uma indicação geral do número de registros emitidos pelo conector. Esse número deve aumentar continuamente, refletindo a tendência de gravações no banco de dados do Spanner subjacente.
- Número de partições que estão sendo lidas. Sempre deve haver partições sendo lidas. Se esse número for zero, isso indica que ocorreu um erro no pipeline.
- Número total de consultas emitidas durante a execução do conector. Essa é uma indicação geral das consultas de fluxo de alterações feitas na instância do Spanner durante a execução do pipeline. Isso pode ser usado para estimar a carga do conector no banco de dados do Spanner.
Atualizar um pipeline
É possível atualizar um pipeline em execução que usa o conector SpannerIO
para processar fluxo de alterações se as verificações de compatibilidade
de jobs forem aprovadas. Para fazer isso, defina explicitamente o parâmetro de nome da tabela de metadados do novo job ao atualizá-lo. Use o valor da opção de pipeline metadataTable do job que você está atualizando.
Se você estiver usando um modelo do Dataflow fornecido pelo Google, defina o nome da tabela usando o parâmetro spannerMetadataTableName. Você também pode modificar
o job atual para usar explicitamente a tabela de metadados com o método
withMetadataTable(your-metadata-table-name) na
configuração do conector. Depois disso, siga as instruções em Como iniciar o job de substituição na documentação do Dataflow para atualizar um job em execução.
Práticas recomendadas para fluxo de alterações e Dataflow
Confira algumas práticas recomendadas para criar conexões de fluxo de alterações usando o Dataflow.
Usar um banco de dados de metadados separado
Recomendamos criar um banco de dados separado para o conector SpannerIO usar no armazenamento de metadados, em vez de configurá-lo para usar o banco de dados do aplicativo.
Para mais informações, consulte Considerar um banco de dados de metadados separado.
Dimensionar o cluster
Uma regra prática para um número inicial de workers em um job de fluxo de alterações do Spanner é um worker por 1.000 gravações por segundo. Essa estimativa pode variar dependendo de vários fatores, como o tamanho de cada transação, quantos registros de fluxo de mudanças são produzidos em uma única transação e outras transformações, agregações ou gravadores que estão sendo usados no pipeline.
Após o provisionamento inicial, é importante acompanhar as métricas mencionadas em Monitorar um pipeline para garantir que ele esteja funcionando bem. Recomendamos testar um tamanho inicial de pool de trabalhadores e monitorar como o pipeline lida com a carga, aumentando o número de nós se necessário. A utilização da CPU é uma métrica importante para verificar se a carga está adequada e se mais nós são necessários.
Limitações conhecidas
Há algumas limitações conhecidas ao usar fluxo de alterações do Spanner com o Dataflow:
Escalonamento automático
O suporte ao escalonamento automático para pipelines que incluem SpannerIO.readChangeStream
requer o Apache Beam 2.39.0 ou mais recente.
Se você usa uma versão do Apache Beam anterior a 2.39.0, os pipelines que incluem
SpannerIO.readChangeStream precisam especificar explicitamente o algoritmo de escalonamento automático
como NONE, conforme descrito em Escalonamento automático horizontal.
Para escalonar manualmente um pipeline do Dataflow em vez de usar o escalonamento automático, consulte Como escalonar manualmente um pipeline de streaming.
Runner V2
O conector de fluxo de alterações do Spanner requer o
Dataflow Runner V2.
Isso precisa ser especificado manualmente durante a execução, caso contrário, um erro será gerado. É possível especificar Runner V2 ao configurar o job com
--experiments=use_unified_worker,use_runner_v2.
Snapshot
O conector de fluxo de alterações do Spanner não é compatível com snapshots do Dataflow.
Reduzindo
O conector de fluxo de alterações do Spanner não é compatível com drenagem de um job. Só é possível cancelar um job em andamento.
Também é possível atualizar um pipeline sem precisar interrompê-lo.
OpenCensus
Para usar o OpenCensus para monitorar seu pipeline, especifique a versão 0.28.3 ou mais recente.
NullPointerException no início do pipeline
Um bug na versão 2.38.0 do Apache Beam pode causar um NullPointerException ao iniciar o pipeline em determinadas condições. Isso impediria o início do job e mostraria esta mensagem de erro:
java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null
Para resolver esse problema, use a versão 2.39.0 ou mais recente do Apache Beam ou
especifique manualmente a versão de beam-sdks-java-core como 2.37.0:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.37.0</version>
</dependency>