Modelo do MongoDB para o BigQuery (stream)

Este modelo cria um pipeline de streaming que funciona com o MongoDB change streams. Para usar este modelo, publique os dados do fluxo de alterações no Pub/Sub. O pipeline lê os registos JSON do Pub/Sub e escreve-os no BigQuery. Os registos escritos no BigQuery têm o mesmo formato que o modelo de lote do MongoDB para o BigQuery.

Requisitos do pipeline

  • O conjunto de dados do BigQuery de destino tem de existir.
  • A instância do MongoDB de origem tem de estar acessível a partir das máquinas de trabalho do Dataflow.
  • Tem de criar um tópico do Pub/Sub para ler a stream de alterações. Enquanto o pipeline está em execução, ouça os eventos de captura de dados de alterações (CDC) na stream de alterações do MongoDB e publique-os no Pub/Sub como registos JSON. Para mais informações sobre a publicação de mensagens no Pub/Sub, consulte o artigo Publicar mensagens em tópicos.
  • Este modelo usa streams de alterações do MongoDB . Não suporta a captura de dados de alterações do BigQuery.

Parâmetros de modelos

Parâmetros obrigatórios

  • mongoDbUri: o URI de ligação do MongoDB no formato mongodb+srv://:@..
  • database: base de dados no MongoDB a partir da qual ler a coleção. Por exemplo, my-db.
  • collection: nome da coleção na base de dados do MongoDB. Por exemplo, my-collection.
  • userOption: FLATTEN, JSON ou NONE. FLATTEN une os documentos ao nível único. JSON armazena o documento no formato JSON do BigQuery. NONE armazena o documento completo como uma STRING formatada em JSON. A predefinição é: NENHUM.
  • inputTopic: o tópico de entrada do Pub/Sub a partir do qual ler, no formato projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • outputTableSpec: a tabela do BigQuery para a qual escrever. Por exemplo, bigquery-project:dataset.output_table.

Parâmetros opcionais

  • useStorageWriteApiAtLeastOnce: quando usa a API Storage Write, especifica a semântica de escrita. Para usar a semântica, pelo menos, uma vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), defina este parâmetro como true. Para usar a semântica exatamente uma vez, defina o parâmetro como false. Este parâmetro aplica-se apenas quando useStorageWriteApi é true. O valor predefinido é false.
  • KMSEncryptionKey: chave de encriptação do Cloud KMS para desencriptar a string de ligação do URI do MongoDB. Se for transmitida a chave do Cloud KMS, a string de ligação do URI do MongoDB tem de ser transmitida encriptada. Por exemplo, projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key.
  • filter: filtro Bson no formato JSON. Por exemplo, { "val": { $gt: 0, $lt: 9 }}.
  • useStorageWriteApi: se for verdadeiro, o pipeline usa a API Storage Write do BigQuery (https://cloud.google.com/bigquery/docs/write-api). O valor predefinido é false. Para mais informações, consulte a secção Usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: quando usa a API Storage Write, especifica o número de streams de escrita. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, tem de definir este parâmetro. A predefinição é: 0.
  • storageWriteApiTriggeringFrequencySec: quando usa a API Storage Write, especifica a frequência de acionamento, em segundos. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, tem de definir este parâmetro.
  • bigQuerySchemaPath: o caminho do Cloud Storage para o esquema JSON do BigQuery. Por exemplo, gs://your-bucket/your-schema.json.
  • javascriptDocumentTransformGcsPath: o URI do Cloud Storage do ficheiro .js que define a função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo, gs://your-bucket/your-transforms/*.js.
  • javascriptDocumentTransformFunctionName: o nome da função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo, se o código da função JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função é myTransform. Para ver exemplos de UDFs JavaScript, consulte Exemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). Por exemplo, transform.

Função definida pelo utilizador

Opcionalmente, pode expandir este modelo escrevendo uma função definida pelo utilizador (FDU) em JavaScript. O modelo chama a FDU para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON.

Para usar uma FDU, carregue o ficheiro JavaScript para o Cloud Storage e defina os seguintes parâmetros do modelo:

ParâmetroDescrição
javascriptDocumentTransformGcsPath A localização do ficheiro JavaScript no Cloud Storage.
javascriptDocumentTransformFunctionName O nome da função JavaScript.

Para mais informações, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.

Especificação da função

A FDU tem a seguinte especificação:

  • Entrada: um documento MongoDB.
  • Saída: um objeto serializado como uma string JSON.
  • Execute o modelo

    Consola

    1. Aceda à página Dataflow Criar tarefa a partir de um modelo.
    2. Aceda a Criar tarefa a partir de modelo
    3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
    4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

      Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte o artigo Localizações do Dataflow.

    5. No menu pendente Modelo do fluxo de dados, selecione the MongoDB (CDC) to BigQuery template.
    6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
    7. Clique em Executar tarefa.

    gcloud

    Na shell ou no terminal, execute o modelo:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery_CDC \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION,\
    inputTopic=INPUT_TOPIC

    Substitua o seguinte:

    • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
    • JOB_NAME: um nome de tarefa exclusivo à sua escolha
    • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que quer usar

      Pode usar os seguintes valores:

    • OUTPUT_TABLE_SPEC: o nome da tabela do BigQuery de destino.
    • MONGO_DB_URI: o seu URI do MongoDB.
    • DATABASE: a sua base de dados MongoDB.
    • COLLECTION: a sua coleção do MongoDB.
    • USER_OPTION: FLATTEN, JSON ou NONE.
    • INPUT_TOPIC: o seu tópico de entrada do Pub/Sub.

    API

    Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION",
              "inputTopic": "INPUT_TOPIC"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery_CDC",
       }
    }

    Substitua o seguinte:

    • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
    • JOB_NAME: um nome de tarefa exclusivo à sua escolha
    • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que quer usar

      Pode usar os seguintes valores:

    • OUTPUT_TABLE_SPEC: o nome da tabela do BigQuery de destino.
    • MONGO_DB_URI: o seu URI do MongoDB.
    • DATABASE: a sua base de dados MongoDB.
    • COLLECTION: a sua coleção do MongoDB.
    • USER_OPTION: FLATTEN, JSON ou NONE.
    • INPUT_TOPIC: o seu tópico de entrada do Pub/Sub.

    O que se segue?