Modello Streaming Data Generator a Pub/Sub, BigQuery e Cloud Storage

Il modello Generatore di dati di streaming viene utilizzato per generare un numero illimitato o fisso di record sintetici o messaggi in base allo schema fornito dall'utente alla frequenza specificata. Le destinazioni compatibili includono argomenti Pub/Sub, tabelle BigQuery e bucket Cloud Storage.

Di seguito sono riportati alcuni possibili casi d'uso:

  • Simula la pubblicazione di eventi in tempo reale su larga scala in un argomento Pub/Sub per misurare e determinare il numero e le dimensioni dei consumer necessari per elaborare gli eventi pubblicati.
  • Genera dati sintetici in una tabella BigQuery o in un bucket Cloud Storage per valutare i benchmark delle prestazioni o fungere da prova di fattibilità.

Sink e formati di codifica supportati

La tabella seguente descrive i sink e i formati di codifica supportati da questo modello:
JSON Avro Parquet
Pub/Sub No
BigQuery No No
Cloud Storage

Requisiti della pipeline

  • L'account di servizio worker deve avere il ruolo Dataflow Worker (roles/dataflow.worker) assegnato. Per ulteriori informazioni, consulta Introduzione a IAM.
  • Crea un file di schema che contenga un modello JSON per i dati generati. Questo modello utilizza la libreria JSON Data Generator, in modo da poter fornire varie funzioni faker per ogni campo dello schema. Per saperne di più, consulta la documentazione di json-data-generator.

    Ad esempio:

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • Carica il file dello schema in un bucket Cloud Storage.
  • La destinazione di output deve esistere prima dell'esecuzione. La destinazione deve essere un argomento Pub/Sub, una tabella BigQuery o un bucket Cloud Storage, a seconda del tipo di sink.
  • Se la codifica di output è Avro o Parquet, crea un file dello schema Avro e archivialo in una posizione Cloud Storage.
  • Assegna al service account worker un ruolo IAM aggiuntivo a seconda della destinazione desiderata.
    Destinazione Ruolo IAM necessario aggiuntivo A quale risorsa applicare
    Pub/Sub Publisher Pub/Sub (roles/pubsub.publisher)
    (per maggiori informazioni, consulta Controllo dell'controllo dell'accesso a Pub/Sub con IAM)
    Argomento Pub/Sub
    BigQuery Editor dati BigQuery (roles/bigquery.dataEditor)
    (per maggiori informazioni, vedi Controllo dell'controllo dell'accesso BigQuery con IAM)
    Set di dati BigQuery
    Cloud Storage Amministratore oggetti Storage (roles/storage.objectAdmin)
    (per ulteriori informazioni, consulta Controllo dell'controllo dell'accesso a Cloud Storage con IAM)
    Bucket Cloud Storage

Parametri del modello

Parametro Descrizione
schemaLocation Posizione del file di schema. Ad esempio: gs://mybucket/filename.json.
qps Numero di messaggi da pubblicare al secondo. Ad esempio: 100.
sinkType (Facoltativo) Tipo di sink di output. I valori possibili sono PUBSUB, BIGQUERY, GCS. Il valore predefinito è PUBSUB.
outputType (Facoltativo) Tipo di codifica dell'output. I valori possibili sono JSON, AVRO, PARQUET. Il valore predefinito è JSON.
avroSchemaLocation (Facoltativo) Posizione del file di schema AVRO. Obbligatorio quando outputType è AVRO o PARQUET. Ad esempio: gs://mybucket/filename.avsc.
topic (Facoltativo) Nome dell'argomento Pub/Sub a cui la pipeline deve pubblicare i dati. Obbligatorio quando sinkType è Pub/Sub. Ad esempio: projects/my-project-id/topics/my-topic-id.
outputTableSpec (Facoltativo) Nome della tabella BigQuery di output. Obbligatorio quando sinkType è BigQuery. Ad esempio: my-project-ID:my_dataset_name.my-table-name.
writeDisposition (Facoltativo) Istruzione di scrittura BigQuery. I valori possibili sono WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. Il valore predefinito è WRITE_APPEND.
outputDeadletterTable (Facoltativo) Nome della tabella BigQuery di output in cui archiviare i record non riusciti. Se non viene fornita, la pipeline crea la tabella durante l'esecuzione con il nome {output_table_name}_error_records. Ad esempio: my-project-ID:my_dataset_name.my-table-name.
outputDirectory (Facoltativo) Percorso della posizione di output di Cloud Storage. Obbligatorio quando sinkType è Cloud Storage. Ad esempio: gs://mybucket/pathprefix/.
outputFilenamePrefix (Facoltativo) Il prefisso del nome file dei file di output scritti in Cloud Storage. Il valore predefinito è output-.
windowDuration (Facoltativo) Intervallo della finestra in cui l'output viene scritto in Cloud Storage. Il valore predefinito è 1 m (ovvero 1 minuto).
numShards (Facoltativo) Numero massimo di shard di output. Obbligatorio quando sinkType è Cloud Storage e deve essere impostato su 1 o un numero superiore.
messagesLimit (Facoltativo) Numero massimo di messaggi di output. Il valore predefinito è 0, che indica un valore illimitato.
autoscalingAlgorithm (Facoltativo) Algoritmo utilizzato per la scalabilità automatica dei worker. I valori possibili sono THROUGHPUT_BASED per attivare la scalabilità automatica o NONE per disattivarla.
maxNumWorkers (Facoltativo) Numero massimo di macchine worker. Ad esempio: 10.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Streaming Data Generator template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • JOB_NAME: un nome univoco del job a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • SCHEMA_LOCATION: il percorso del file schema in Cloud Storage. Ad esempio: gs://mybucket/filename.json.
  • QPS: il numero di messaggi da pubblicare al secondo
  • PUBSUB_TOPIC: l'argomento Pub/Sub di output. Ad esempio: projects/my-project-id/topics/my-topic-id.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per saperne di più sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Streaming_Data_Generator",
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • JOB_NAME: un nome univoco del job a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • SCHEMA_LOCATION: il percorso del file schema in Cloud Storage. Ad esempio: gs://mybucket/filename.json.
  • QPS: il numero di messaggi da pubblicare al secondo
  • PUBSUB_TOPIC: l'argomento Pub/Sub di output. Ad esempio: projects/my-project-id/topics/my-topic-id.

Passaggi successivi