Este modelo cria um pipeline de streaming que funciona com o MongoDB change streams. Para usar este modelo, publique os dados do fluxo de alterações no Pub/Sub. O pipeline lê os registos JSON do Pub/Sub e escreve-os no BigQuery. Os registos escritos no BigQuery têm o mesmo formato que o modelo de lote do MongoDB para o BigQuery.
Requisitos do pipeline
- O conjunto de dados do BigQuery de destino tem de existir.
- A instância do MongoDB de origem tem de estar acessível a partir das máquinas de trabalho do Dataflow.
- Tem de criar um tópico do Pub/Sub para ler a stream de alterações. Enquanto o pipeline está em execução, ouça os eventos de captura de dados de alterações (CDC) na stream de alterações do MongoDB e publique-os no Pub/Sub como registos JSON. Para mais informações sobre a publicação de mensagens no Pub/Sub, consulte o artigo Publicar mensagens em tópicos.
- Este modelo usa streams de alterações do MongoDB . Não suporta a captura de dados de alterações do BigQuery.
Parâmetros de modelos
Parâmetros obrigatórios
- mongoDbUri: o URI de ligação do MongoDB no formato
mongodb+srv://:@.. - database: base de dados no MongoDB a partir da qual ler a coleção. Por exemplo,
my-db. - collection: nome da coleção na base de dados do MongoDB. Por exemplo,
my-collection. - userOption:
FLATTEN,JSONouNONE.FLATTENune os documentos ao nível único.JSONarmazena o documento no formato JSON do BigQuery.NONEarmazena o documento completo como uma STRING formatada em JSON. A predefinição é: NENHUM. - inputTopic: o tópico de entrada do Pub/Sub a partir do qual ler, no formato
projects/<PROJECT_ID>/topics/<TOPIC_NAME>. - outputTableSpec: a tabela do BigQuery para a qual escrever. Por exemplo,
bigquery-project:dataset.output_table.
Parâmetros opcionais
- useStorageWriteApiAtLeastOnce: quando usa a API Storage Write, especifica a semântica de escrita. Para usar a semântica, pelo menos, uma vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), defina este parâmetro como
true. Para usar a semântica exatamente uma vez, defina o parâmetro comofalse. Este parâmetro aplica-se apenas quandouseStorageWriteApiétrue. O valor predefinido éfalse. - KMSEncryptionKey: chave de encriptação do Cloud KMS para desencriptar a string de ligação do URI do MongoDB. Se for transmitida a chave do Cloud KMS, a string de ligação do URI do MongoDB tem de ser transmitida encriptada. Por exemplo,
projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key. - filter: filtro Bson no formato JSON. Por exemplo,
{ "val": { $gt: 0, $lt: 9 }}. - useStorageWriteApi: se for verdadeiro, o pipeline usa a API Storage Write do BigQuery (https://cloud.google.com/bigquery/docs/write-api). O valor predefinido é
false. Para mais informações, consulte a secção Usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams: quando usa a API Storage Write, especifica o número de streams de escrita. Se
useStorageWriteApifortrueeuseStorageWriteApiAtLeastOnceforfalse, tem de definir este parâmetro. A predefinição é: 0. - storageWriteApiTriggeringFrequencySec: quando usa a API Storage Write, especifica a frequência de acionamento, em segundos. Se
useStorageWriteApifortrueeuseStorageWriteApiAtLeastOnceforfalse, tem de definir este parâmetro. - bigQuerySchemaPath: o caminho do Cloud Storage para o esquema JSON do BigQuery. Por exemplo,
gs://your-bucket/your-schema.json. - javascriptDocumentTransformGcsPath: o URI do Cloud Storage do ficheiro
.jsque define a função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo,gs://your-bucket/your-transforms/*.js. - javascriptDocumentTransformFunctionName: o nome da função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo, se o código da função JavaScript for
myTransform(inJson) { /*...do stuff...*/ }, o nome da função é myTransform. Para ver exemplos de UDFs JavaScript, consulte Exemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). Por exemplo,transform.
Função definida pelo utilizador
Opcionalmente, pode expandir este modelo escrevendo uma função definida pelo utilizador (FDU) em JavaScript. O modelo chama a FDU para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON.
Para usar uma FDU, carregue o ficheiro JavaScript para o Cloud Storage e defina os seguintes parâmetros do modelo:
| Parâmetro | Descrição |
|---|---|
javascriptDocumentTransformGcsPath |
A localização do ficheiro JavaScript no Cloud Storage. |
javascriptDocumentTransformFunctionName |
O nome da função JavaScript. |
Para mais informações, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.
Especificação da função
A FDU tem a seguinte especificação:
Execute o modelo
Consola
- Aceda à página Dataflow Criar tarefa a partir de um modelo. Aceda a Criar tarefa a partir de modelo
- No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
- Opcional: para Ponto final regional, selecione um valor no menu pendente. A região
predefinida é
us-central1.Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte o artigo Localizações do Dataflow.
- No menu pendente Modelo do fluxo de dados, selecione the MongoDB (CDC) to BigQuery template.
- Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
- Clique em Executar tarefa.
gcloud
Na shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery_CDC \ --parameters \ outputTableSpec=OUTPUT_TABLE_SPEC,\ mongoDbUri=MONGO_DB_URI,\ database=DATABASE,\ collection=COLLECTION,\ userOption=USER_OPTION,\ inputTopic=INPUT_TOPIC
Substitua o seguinte:
PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google CloudJOB_NAME: um nome de tarefa exclusivo à sua escolhaREGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1VERSION: a versão do modelo que quer usarPode usar os seguintes valores:
latestpara usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
OUTPUT_TABLE_SPEC: o nome da tabela do BigQuery de destino.MONGO_DB_URI: o seu URI do MongoDB.DATABASE: a sua base de dados MongoDB.COLLECTION: a sua coleção do MongoDB.USER_OPTION: FLATTEN, JSON ou NONE.INPUT_TOPIC: o seu tópico de entrada do Pub/Sub.
API
Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "mongoDbUri": "MONGO_DB_URI", "database": "DATABASE", "collection": "COLLECTION", "userOption": "USER_OPTION", "inputTopic": "INPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery_CDC", } }
Substitua o seguinte:
PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google CloudJOB_NAME: um nome de tarefa exclusivo à sua escolhaLOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1VERSION: a versão do modelo que quer usarPode usar os seguintes valores:
latestpara usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
OUTPUT_TABLE_SPEC: o nome da tabela do BigQuery de destino.MONGO_DB_URI: o seu URI do MongoDB.DATABASE: a sua base de dados MongoDB.COLLECTION: a sua coleção do MongoDB.USER_OPTION: FLATTEN, JSON ou NONE.INPUT_TOPIC: o seu tópico de entrada do Pub/Sub.
O que se segue?
- Saiba mais sobre os modelos do Dataflow.
- Consulte a lista de modelos fornecidos pela Google.