Modelo do MongoDB para o BigQuery

Este modelo cria um pipeline em lote que lê documentos do MongoDB e os escreve no BigQuery.

Se quiser capturar dados de stream de alterações do MongoDB, pode usar o modelo do MongoDB para o BigQuery (CDC).

Requisitos do pipeline

  • O conjunto de dados do BigQuery de destino tem de existir.
  • A instância do MongoDB de origem tem de estar acessível a partir das máquinas de trabalho do Dataflow.

Formato de saída

O formato dos registos de saída depende do valor do parâmetro userOption. Se userOption for NONE, a saída tem o seguinte esquema. O campo source_data contém o documento no formato JSON.

  [
    {"name":"id","type":"STRING"},
    {"name":"source_data","type":"STRING"},
    {"name":"timestamp","type":"TIMESTAMP"}
  ]
  

Se userOption for FLATTEN, o pipeline reduz os documentos e escreve os campos de nível superior como colunas da tabela. Por exemplo, suponhamos que os documentos na coleção do MongoDB contêm os seguintes campos:

  • "_id" (string)
  • "title" (string)
  • "genre" (string)

Com o FLATTEN, a saída tem o seguinte esquema. O campo timestamp é adicionado pelo modelo.

  [
    {"name":"_id","type":"STRING"},
    {"name":"title","type":"STRING"},
    {"name":"genre","type":"STRING"},
    {"name":"timestamp","type":"TIMESTAMP"}
  ]
  

Se userOption for JSON, o pipeline armazena o documento no formato JSON do BigQuery. O BigQuery tem suporte incorporado para dados JSON através do tipo de dados JSON. Para mais informações, consulte o artigo Trabalhar com dados JSON no GoogleSQL.

Parâmetros de modelos

Parâmetros obrigatórios

  • mongoDbUri: o URI de ligação do MongoDB no formato mongodb+srv://:@..
  • database: base de dados no MongoDB a partir da qual ler a coleção. Por exemplo, my-db.
  • collection: nome da coleção na base de dados do MongoDB. Por exemplo, my-collection.
  • userOption: FLATTEN, JSON ou NONE. FLATTEN une os documentos ao nível único. JSON armazena o documento no formato JSON do BigQuery. NONE armazena o documento completo como uma STRING formatada em JSON. A predefinição é: NENHUM.
  • outputTableSpec: a tabela do BigQuery para a qual escrever. Por exemplo, bigquery-project:dataset.output_table.

Parâmetros opcionais

  • KMSEncryptionKey: chave de encriptação do Cloud KMS para desencriptar a string de ligação do URI do MongoDB. Se for transmitida a chave do Cloud KMS, a string de ligação do URI do MongoDB tem de ser transmitida encriptada. Por exemplo, projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key.
  • filter: filtro Bson no formato JSON. Por exemplo, { "val": { $gt: 0, $lt: 9 }}.
  • useStorageWriteApi: se true, o pipeline usa a API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). O valor predefinido é false. Para mais informações, consulte a secção Usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce: quando usa a API Storage Write, especifica a semântica de escrita. Para usar a semântica, pelo menos, uma vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), defina este parâmetro como true. Para usar a semântica exatamente uma vez, defina o parâmetro como false. Este parâmetro aplica-se apenas quando useStorageWriteApi é true. O valor predefinido é false.
  • bigQuerySchemaPath: o caminho do Cloud Storage para o esquema JSON do BigQuery. Por exemplo, gs://your-bucket/your-schema.json.
  • javascriptDocumentTransformGcsPath: o URI do Cloud Storage do ficheiro .js que define a função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo, gs://your-bucket/your-transforms/*.js.
  • javascriptDocumentTransformFunctionName: o nome da função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo, se o código da função JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função é myTransform. Para ver exemplos de UDFs JavaScript, consulte Exemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). Por exemplo, transform.

Função definida pelo utilizador

Opcionalmente, pode expandir este modelo escrevendo uma função definida pelo utilizador (FDU) em JavaScript. O modelo chama a FDU para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON.

Para usar uma FDU, carregue o ficheiro JavaScript para o Cloud Storage e defina os seguintes parâmetros do modelo:

ParâmetroDescrição
javascriptDocumentTransformGcsPath A localização do ficheiro JavaScript no Cloud Storage.
javascriptDocumentTransformFunctionName O nome da função JavaScript.

Para mais informações, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.

Especificação da função

A FDU tem a seguinte especificação:

  • Entrada: um documento MongoDB.
  • Saída: um objeto serializado como uma string JSON. Se userOption for NONE, o objeto JSON tem de incluir uma propriedade denominada _id que contém o ID do documento.
  • Execute o modelo

    Consola

    1. Aceda à página Dataflow Criar tarefa a partir de um modelo.
    2. Aceda a Criar tarefa a partir de modelo
    3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
    4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

      Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte o artigo Localizações do Dataflow.

    5. No menu pendente Modelo do fluxo de dados, selecione the MongoDB to BigQuery template.
    6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
    7. Clique em Executar tarefa.

    gcloud

    Na shell ou no terminal, execute o modelo:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION

    Substitua o seguinte:

    • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
    • JOB_NAME: um nome de tarefa exclusivo à sua escolha
    • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que quer usar

      Pode usar os seguintes valores:

    • OUTPUT_TABLE_SPEC: o nome da tabela do BigQuery de destino.
    • MONGO_DB_URI: o seu URI do MongoDB.
    • DATABASE: a sua base de dados MongoDB.
    • COLLECTION: a sua coleção do MongoDB.
    • USER_OPTION: FLATTEN, JSON ou NONE.

    API

    Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery",
       }
    }

    Substitua o seguinte:

    • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
    • JOB_NAME: um nome de tarefa exclusivo à sua escolha
    • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
    • VERSION: a versão do modelo que quer usar

      Pode usar os seguintes valores:

    • OUTPUT_TABLE_SPEC: o nome da tabela do BigQuery de destino.
    • MONGO_DB_URI: o seu URI do MongoDB.
    • DATABASE: a sua base de dados MongoDB.
    • COLLECTION: a sua coleção do MongoDB.
    • USER_OPTION: FLATTEN, JSON ou NONE.

    O que se segue?