Ler dados em tempo real com fluxos de alterações

Os fluxos de alterações do Firestore com compatibilidade com o MongoDB permitem que os aplicativos acessem mudanças em tempo real (inserções, atualizações e exclusões) feitas em uma coleção ou em um banco de dados inteiro. Um fluxo de alterações ordena as atualizações por horário de modificação.

Os Change Streams podem ser acessados pelas APIs compatíveis com o MongoDB e pelos drivers tradicionais do MongoDB. A implementação do Firestore com compatibilidade com o MongoDB de fluxos de alterações pode processar qualquer capacidade de processamento de gravações e leituras por uma implementação exclusiva de particionamento automático em gravações e paralelismo de leitura. Isso permite criar cargas de trabalho de alta capacidade. Você também pode melhorar a infraestrutura de migração e sincronização de dados entre o Firestore e outras soluções de armazenamento.

Além da compatibilidade com os drivers do MongoDB, você pode usar o Firestore para ler fluxos de mudanças em paralelo. Isso permite criar cargas de trabalho de leitura paralelas e de alta capacidade. Cada stream representa uma partição bem distribuída de resultados.

Os fluxos de alterações são compatíveis com os seguintes recursos:

  • Fluxo de alterações configuráveis com escopo de banco de dados ou coleção.
  • Uma duração de retenção para um fluxo de alterações especificado na criação. A retenção padrão é de 7 dias, e a mínima é de 1 dia. O período de retenção precisa ser um múltiplo de um dia, até um máximo de sete dias. A duração da retenção não pode ser alterada depois da criação. Para mudar o período de armazenamento, é necessário soltar e recriar o fluxo de alterações.
  • delete, insert, update e drop são eventos de mudança observáveis usando db.collection.watch() e db.watch().
  • updateDescription.updatedFields contém diferenças de atualização.
  • Todas as opções fullDocument e fullDocumentBeforeChange.
    • Procurando o documento completo para atualizações.
    • Pré-imagem do documento antes de ser substituído, atualizado ou excluído.
    • Imagem do documento após a substituição ou atualização.
    • As imagens pré e pós com mais de uma hora exigem a ativação da recuperação pontual (PITR).
  • Todas as opções de retomada, incluindo resumeAfter e startAfter.
  • Ao usar watch() para observar mudanças, é possível encadear estágios de agregação como $addFields, $match, $project, $replaceRoot, $replaceWith, $set e $unset.

Configurar fluxos de alterações

Para criar, excluir ou ver fluxos de mudanças de um banco de dados, use o console doGoogle Cloud .

Papéis e permissões

Para criar, excluir e listar fluxos de mudanças, um principal precisa das permissões datastore.schemas.create, datastore.schemas.delete e datastore.schemas.list do Identity and Access Management (IAM), respectivamente.

O papel Administrador de índice do Datastore (roles/datastore.indexAdmin), por exemplo, concede essas permissões.

Criar um stream de alterações

Antes de abrir um cursor de fluxo de alterações correspondente, é necessário criar um fluxo de alterações. Não é possível ativar automaticamente o fluxo de alterações na criação de coleções ou bancos de dados.

Para criar um fluxo de alterações, use o console do Google Cloud .

  1. No console do Google Cloud , acesse a página Bancos de dados.

    Acessar "Bancos de dados"

  2. Na lista, selecione um banco de dados do Firestore com compatibilidade com o MongoDB. O painel do Firestore Studio será aberto.
  3. No painel Explorador, encontre o nó fluxo de alterações, clique em Mais ações e selecione Criar fluxo de alterações.
  4. Insira um nome, um escopo e um período de armazenamento exclusivos para o fluxo de mudanças e clique em Salvar

Ver fluxos de alterações

Confira detalhes sobre fluxos de alterações no console Google Cloud .

  1. No console do Google Cloud , acesse a página Bancos de dados.

    Acessar "Bancos de dados"

  2. Na lista, selecione um banco de dados do Firestore com compatibilidade com o MongoDB. O painel do Firestore Studio será aberto.
  3. No painel Explorer, encontre o nó fluxo de alterações.
  4. Para abrir ou fechar o nó, clique em Alternar nó.

Excluir um fluxo de alterações

Para excluir um fluxo de alterações, use o console Google Cloud .

  1. No console do Google Cloud , acesse a página Bancos de dados.

    Acessar "Bancos de dados"

  2. Na lista, selecione um banco de dados do Firestore com compatibilidade com o MongoDB. O painel do Firestore Studio será aberto.
  3. No painel Explorer, encontre o nó fluxo de alterações.
  4. Para abrir ou fechar o nó, clique em Alternar nó.
  5. No Explorer, localize o fluxo de mudanças que você quer excluir.
  6. Clique em Mais ações e selecione Excluir fluxo de alterações.
  7. Na caixa de diálogo, insira o nome do fluxo de mudanças para confirmar a exclusão e clique em Excluir.

Abrir ou retomar um cursor de fluxo de alterações

Os exemplos a seguir demonstram como criar, retomar e configurar um cursor de fluxo de mudanças.

Antes de criar um cursor de fluxo de alterações, é necessário criar um fluxo de alterações para o banco de dados ou a coleção.

Criar um cursor de fluxo de alterações

Para criar um novo cursor de fluxo de alterações, use o método watch nos drivers do MongoDB. Para detectar todas as mudanças em um banco de dados, crie um fluxo de alterações no escopo do banco de dados e chame o método watch no objeto db.

let cursor = db.watch()

Para criar um cursor no escopo de uma coleção, primeiro é preciso criar um fluxo de alterações para essa coleção. Em seguida, chame o método watch na coleção correspondente.

let cursor = db.my_collection.watch()

Agora que você criou um cursor de fluxo de alterações, pode começar a fazer streaming. Por exemplo, se você inserir um documento e chamar tryNext no cursor, a mudança vai aparecer no fluxo de alterações.

let doc = db.my_collection.insertOne({value: "hello world"})
console.log(cursor.tryNext())

Se você atualizar e excluir o documento, essas mudanças vão aparecer no fluxo de mudanças:

db.my_collection.updateOne({"_id": doc.insertedId}, {$set: {value: "hello world!"}})
db.my_collection.deleteOne({"_id": doc.insertedId}})

// Prints the update event
console.log(cursor.tryNext())

// Prints the delete event
console.log(cursor.tryNext())

Retomar um fluxo de alterações

Para retomar um fluxo de alterações, use as opções resumeAfter ou startAfter. Para determinar de onde retomar no registro de alterações resumeAfter e startAfter, use um token de retomada.

// Create a cursor and add one event to the change stream.
let cursor = db.my_collection.watch();
db.my_collection.insertOne({value: "hello world"});
let event = cursor.tryNext();

// Get the resume token from the event.
let resumeToken = event._id;

// Add a new event to the change stream.
db.my_collection.insertOne({value: "foobar"});

// Create a new cursor by using the resume token as a starting point.
let newCursor = db.my_collection.watch({resumeAfter: resumeToken})

// Log the change event containing the "foobar" value.
console.log(newCursor.tryNext())

Para usar startAfter:

// Start after the resume token.
let startAfterCursor = db.my_collection.watch({startAfter: resumeToken})

Incluir imagens de antes e depois em atualizações e exclusões

Se necessário, você pode incluir imagens pré e pós de documentos em eventos de mudança de atualização e exclusão. A disponibilidade de imagens está sujeita à janela de recuperação pontual (PITR). Para ler imagens de documentos com mais de uma hora, ative a PITR.

Os fluxos de alteração aproveitam a janela de PITR para fornecer uma visualização do documento antes e depois do evento de mudança especificado. Por padrão, os eventos de atualização contêm um campo updateDescription, que é o delta dos campos modificados pela operação de atualização.

Para incluir as imagens antes e depois em um evento de mudança, especifique as opções fullDocumentBeforeChange e fullDocument na consulta de fluxo de mudanças.

let cursor = db.my_collection.watch({
  "fullDocument": "required",
  "fullDocumentBeforeChange": "required"
})

Se a consulta tentar ler um documento fora do período de retenção da PITR ou se a PITR não estiver ativada, o valor required vai gerar uma mensagem de erro do lado do servidor.

Como alternativa a gerar um erro, use o valor whenAvailable para retornar um valor null se as imagens não estiverem mais disponíveis.

let cursor = db.my_collection.watch({
  "fullDocument": "whenAvailable",
  "fullDocumentBeforeChange": "whenAvailable"
})

Incluir a imagem atual nas atualizações

Por padrão, os eventos de atualização contêm um campo updateDescription, que é o delta dos campos modificados pela operação de atualização. Para pesquisar a versão mais atual de todo o documento, use o valor updateLookup na opção fullDocument.

Esse recurso não exige PITR e faz uma pesquisa do documento.

let cursor = db.my_collection.watch({
  "fullDocument": "updateLookup",
})

Leituras paralelas

Para aumentar a capacidade de processamento, use a opção firestoreWorkerConfig para dividir uma consulta de fluxo de alterações em vários workers. Cada worker é responsável por veicular as mudanças em um conjunto distinto de documentos. Você precisa criar um cursor paralelo usando uma consulta runCommand ou aggregate.

Por exemplo, é possível distribuir um fluxo de mudanças em três workers da seguinte maneira:

let cursor1 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 0 }}
  }]);

let cursor2 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 1 }}
  }]);

let cursor3 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 2 }}
  }]);

Fluxos de mudanças e backups

Nem a configuração nem os dados do fluxo de alterações estão disponíveis em operações de restauração de backup. Se você restaurar um banco de dados com Change Streams, será necessário recriar esses fluxos de mudanças no banco de dados de destino para abrir cursores para esse banco de dados.

Faturamento

Diferenças de comportamento

A seção a seguir descreve as diferenças nos fluxos de mudanças entre o Firestore com compatibilidade com o MongoDB e o MongoDB.

updateDescription

updateDescription é um documento em um evento update que descreve os campos atualizados ou removidos pela operação de atualização. No Firestore, as diferenças notáveis são:

  • Em updateDescription, os campos truncatedArrays e disambiguatedPaths não são preenchidos.
  • updateDescription.updatedFields representa uma diferença canônica entre as imagens pré e pós de um documento antes e depois da aplicação de uma mutação.

Considere o seguinte estado inicial de um documento:

db.my_collection.insertOne({
  _id: 1,
  root: {
    array: [{a: 1}, {b: 2}, {c: 3}]
  }
})

Cenário 1: mude apenas o primeiro elemento da matriz.

Neste cenário, o comportamento do Firestore corresponde ao do MongoDB.

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array.0.a": 100}}
)

{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

Cenário 2: substituir com uma matriz inteira

Nesse cenário, a operação atualiza apenas o primeiro campo de matriz, mas substitui toda a matriz.

A diferença de atualização do Firestore não diferencia esses dois cenários e retorna o mesmo updateDescription.updatedFields para ambos:

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array": [{a: 100}, {b: 2}, {c: 3}]}}
)

// In other implementations, updatedFields reflects the mutation itself
{
  updatedFields: {
    "root.array": [{a: 100}, {b: 2}, {c: 3}]
  },
  removedFields: []
}

// Firestore updatedFields is the diff between the before and after versions of the document
{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}