Modelo Gerador de dados de streaming para Pub/Sub, BigQuery e Cloud Storage

O modelo Gerador de dados de streaming é usado para gerar um número ilimitado ou fixo de registros sintéticos ou mensagens com base no esquema fornecido pelo usuário na taxa especificada. Os destinos compatíveis incluem tópicos do Pub/Sub, tabelas do BigQuery e buckets do Cloud Storage.

Veja a seguir um conjunto de alguns casos de uso possíveis:

  • Simule a publicação de eventos em tempo real em grande escala em um tópico do Pub/Sub para medir e determinar o número e o tamanho dos consumidores necessários para processar eventos publicados.
  • Gere dados sintéticos para uma tabela do BigQuery ou um bucket do Cloud Storage para avaliar a performance dos comparativos de mercado ou servir como prova de conceito.

Coletores e formatos de codificação compatíveis

A tabela a seguir descreve quais coletores e formatos de codificação são compatíveis com este modelo:
JSON Avro Parquet
Pub/Sub Sim Sim Não
BigQuery Sim Não Não
Cloud Storage Sim Sim Sim

Requisitos de pipeline

  • A conta de serviço do worker precisa do papel atribuído do Dataflow Worker (roles/dataflow.worker). Para mais informações, consulte Introdução ao IAM.
  • Crie um arquivo de esquema que contenha um modelo JSON para os dados gerados. Esse modelo usa a biblioteca Gerador de dados JSON para que você possa fornecer várias funções fictícias para cada campo no esquema. Para mais informações, consulte a documentação do json-data-generator.

    Por exemplo:

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • Faça upload do arquivo de esquema para um bucket do Cloud Storage.
  • O destino da saída precisa existir antes da execução. O destino precisa ser um tópico do Pub/Sub, uma tabela do BigQuery ou um bucket do Cloud Storage, dependendo do tipo de coletor.
  • Se a codificação de saída for Avro ou Parquet, crie um arquivo de esquema Avro e armazene-o em um local do Cloud Storage.
  • Atribua à conta de serviço do worker um papel adicional do IAM, dependendo do destino desejado.
    Destino Além disso, papel do IAM necessário Aplicar a qual recurso
    Pub/Sub Publicador do Pub/Sub (roles/pubsub.publisher)
    (para mais informações, consulte Controle de acesso do Pub/Sub com IAM)
    Tópico do Pub/Sub
    BigQuery Editor de dados do BigQuery (roles/bigquery.dataEditor)
    (para mais informações, consulte Controle de acesso do BigQuery com o IAM)
    Conjunto de dados do BigQuery
    Cloud Storage Administrador de objetos do Cloud Storage (roles/storage.objectAdmin)
    (para mais informações, consulte Controle de acesso do Cloud Storage com IAM)
    Bucket do Cloud Storage

Parâmetros do modelo

Parâmetro Descrição
schemaLocation Local do arquivo de esquema. Por exemplo, gs://mybucket/filename.json.
qps Número de mensagens a serem publicadas por segundo. Por exemplo, 100.
sinkType [Opcional] Tipo de coletor de saída. Os valores possíveis são PUBSUB, BIGQUERY, GCS. O padrão é PUBSUB.
outputType [Opcional] Tipo de codificação de saída. Os valores possíveis são JSON, AVRO, PARQUET. O padrão é JSON.
avroSchemaLocation [Opcional] Local do arquivo de esquema AVRO. Obrigatório quando outputType for AVRO ou PARQUET. Por exemplo, gs://mybucket/filename.avsc.
topic [Opcional] Nome do tópico do Pub/Sub em que o pipeline precisa publicar dados. Obrigatório quando for Pub/Sub. Por exemplo: . Obrigatório quando sinkType for Pub/Sub. Por exemplo, projects/my-project-id/topics/my-topic-id.
outputTableSpec (Opcional) Nome da tabela de saída do BigQuery. Obrigatório quando sinkType for o BigQuery. Por exemplo, my-project-ID:my_dataset_name.my-table-name.
writeDisposition [Opcional] Disposição de gravação do BigQuery. Os valores possíveis são WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. O padrão é WRITE_APPEND.
outputDeadletterTable [Opcional] Nome da tabela de saída do BigQuery para guardar os registros com falha. Se não for fornecido, o pipeline criará a tabela durante a execução com o nome {nome_da_tabela_de_saida}_registros_de_erros. Por exemplo, my-project-ID:my_dataset_name.my-table-name.
outputDirectory [Opcional] Caminho do local de saída do Cloud Storage. Obrigatório quando sinkType for Cloud Storage. Por exemplo, gs://mybucket/pathprefix/.
outputFilenamePrefix [Opcional] Prefixo do nome do arquivo de saída gravado no Cloud Storage. O padrão é output-.
windowDuration [Opcional] Intervalo de janela em que a saída é gravada no Cloud Storage. O padrão é 1 min (ou seja, 1 minuto).
numShards [Opcional] Número máximo de fragmentos de saída. Obrigatório quando sinkType é Cloud Storage e precisa ser definido como um número maior ou igual a 1.
messagesLimit [Opcional] Número máximo de mensagens de saída. O padrão é 0, que indica ilimitado.
autoscalingAlgorithm [Opcional] Algoritmo usado para escalonamento automático dos workers. Os valores possíveis são THROUGHPUT_BASED para ativar o escalonamento automático ou NONE para desativá-lo.
maxNumWorkers [Opcional] Número máximo de máquinas de worker. Por exemplo, 10.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione o modelo Gerador de dados de streaming.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/ \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Substitua:

  • PROJECT_ID: o ID do projeto em que você quer executar o job do Dataflow Google Cloud
  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • SCHEMA_LOCATION: o caminho para o arquivo de esquema no Cloud Storage. Por exemplo, gs://mybucket/filename.json.
  • QPS: o número de mensagens a serem publicadas por segundo
  • PUBSUB_TOPIC: o tópico de saída do Pub/Sub. Por exemplo, projects/my-project-id/topics/my-topic-id.

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/",
   }
}
  

Substitua:

  • PROJECT_ID: o ID do projeto em que você quer executar o job do Dataflow Google Cloud
  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • SCHEMA_LOCATION: o caminho para o arquivo de esquema no Cloud Storage. Por exemplo, gs://mybucket/filename.json.
  • QPS: o número de mensagens a serem publicadas por segundo
  • PUBSUB_TOPIC: o tópico de saída do Pub/Sub. Por exemplo, projects/my-project-id/topics/my-topic-id.

A seguir