Modelo de UDFs de Python do Pub/Sub para o MongoDB

O modelo Pub/Sub para MongoDB com UDFs Python é um pipeline de streaming que lê mensagens codificadas em JSON de uma subscrição do Pub/Sub e escreve-as no MongoDB como documentos. Se necessário, este pipeline suporta transformações adicionais que podem ser incluídas através de uma função definida pelo utilizador (UDF) do Python.

Se ocorrerem erros durante o processamento de registos, o modelo escreve-os numa tabela do BigQuery, juntamente com a mensagem de entrada. Por exemplo, podem ocorrer erros devido a uma incompatibilidade de esquemas, a um JSON com formato incorreto ou durante a execução de transformações. Especifique o nome da tabela no parâmetro deadletterTable. Se a tabela não existir, o pipeline cria-a automaticamente.

Requisitos do pipeline

  • A subscrição do Pub/Sub tem de existir e as mensagens têm de estar codificadas num formato JSON válido.
  • O cluster do MongoDB tem de existir e deve estar acessível a partir das máquinas de trabalho do Dataflow.

Parâmetros de modelos

Parâmetro Descrição
inputSubscription Nome da subscrição do Pub/Sub. Por exemplo: projects/my-project-id/subscriptions/my-subscription-id
mongoDBUri Lista de servidores MongoDB separados por vírgulas. Por exemplo: 192.285.234.12:27017,192.287.123.11:27017
database Base de dados no MongoDB para armazenar a coleção. Por exemplo: my-db.
collection Nome da coleção na base de dados do MongoDB. Por exemplo: my-collection.
deadletterTable Tabela do BigQuery que armazena mensagens devido a falhas (esquema em conflito, JSON com formato incorreto, etc.). Por exemplo: project-id:dataset-name.table-name.
pythonExternalTextTransformGcsPath Opcional: o URI do Cloud Storage do ficheiro de código Python que define a função definida pelo utilizador (FDU) que quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName Opcional: O nome da função definida pelo utilizador (FDU) do Python que quer usar.
batchSize Opcional: tamanho do lote usado para a inserção em lote de documentos no MongoDB. Predefinição: 1000.
batchSizeBytes Opcional: tamanho do lote em bytes. Predefinição: 5242880.
maxConnectionIdleTime Opcional: tempo de inatividade máximo permitido em segundos antes de ocorrer o limite de tempo da ligação. Predefinição: 60000.
sslEnabled Opcional: valor booleano que indica se a ligação ao MongoDB tem o SSL ativado. Predefinição: true.
ignoreSSLCertificate Opcional: valor booleano que indica se o certificado SSL deve ser ignorado. Predefinição: true.
withOrdered Opcional: valor booleano que permite inserções em massa ordenadas no MongoDB. Predefinição: true.
withSSLInvalidHostNameAllowed Opcional: valor booleano que indica se o nome do anfitrião inválido é permitido para a ligação SSL. Predefinição: true.

Função definida pelo utilizador

Opcionalmente, pode estender este modelo escrevendo uma função definida pelo utilizador (FDU). O modelo chama a FDU para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON. Para mais informações, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.

Especificação da função

A FDU tem a seguinte especificação:

  • Entrada: uma única linha de um ficheiro CSV de entrada.
  • Saída: um documento JSON convertido em string para inserir no MongoDB.

Execute o modelo

Consola

  1. Aceda à página Dataflow Criar tarefa a partir de um modelo.
  2. Aceda a Criar tarefa a partir de modelo
  3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
  4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

    Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte o artigo Localizações do Dataflow.

  5. No menu pendente Modelo do fluxo de dados, selecione the Pub/Sub to MongoDB with Python UDFs template.
  6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
  7. Clique em Executar tarefa.

gcloud

Na shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_MongoDB_Xlang \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • INPUT_SUBSCRIPTION: a subscrição do Pub/Sub (por exemplo, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: os endereços do servidor MongoDB (por exemplo, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: o nome da base de dados do MongoDB (por exemplo, users)
  • COLLECTION: o nome da coleção do MongoDB (por exemplo, profiles)
  • UNPROCESSED_TABLE: o nome da tabela do BigQuery (por exemplo, your-project:your-dataset.your-table-name)

API

Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_MongoDB_Xlang",
   }
}
  

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • INPUT_SUBSCRIPTION: a subscrição do Pub/Sub (por exemplo, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: os endereços do servidor MongoDB (por exemplo, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: o nome da base de dados do MongoDB (por exemplo, users)
  • COLLECTION: o nome da coleção do MongoDB (por exemplo, profiles)
  • UNPROCESSED_TABLE: o nome da tabela do BigQuery (por exemplo, your-project:your-dataset.your-table-name)

O que se segue?