Modello File di testo su Cloud Storage in Pub/Sub (stream)

Questo modello crea una pipeline di inserimento flussi che esegue il polling continuo per i nuovi file di testo caricati in Cloud Storage, legge ogni file riga per riga e pubblica le stringhe in un argomento Pub/Sub topic. Il modello pubblica i record in un file delimitato da accapo contenente record JSON o in un file CSV in un argomento Pub/Sub per l'elaborazione in tempo reale. Puoi utilizzare questo modello per replicare dati in Pub/Sub.

La pipeline viene eseguita a tempo indeterminato e deve essere terminata manualmente tramite un'operazione di "annullamento" e non di "svuotamento", a causa dell'utilizzo della trasformazione "Watch", che è una "SplittableDoFn" che non supporta lo svuotamento.

Al momento, l'intervallo di polling è fisso e impostato su 10 secondi. Questo modello non imposta un timestamp nei singoli record, quindi l'ora dell'evento corrisponde a quella di pubblicazione durante l'esecuzione. Se la tua pipeline necessita di un'indicazione accurata sull'ora dell'evento per l'elaborazione, non dovresti utilizzare questa pipeline.

Requisiti della pipeline

  • I file di input devono essere in formato JSON delimitato da accapo o in formato CSV. I record che comprendono più righe nei file di origine possono causare problemi downstream perché ogni riga presente nei file viene pubblicata come messaggio in Pub/Sub.
  • L'argomento Pub/Sub deve esistere prima dell'esecuzione.
  • La pipeline viene eseguita a tempo indeterminato e deve essere terminata manualmente.

Parametri del modello

Parametri obbligatori

  • inputFilePattern: il pattern del file di input da cui leggere. Ad esempio, gs://bucket-name/files/*.json.
  • outputTopic: l'argomento di input Pub/Sub in cui scrivere. Il nome deve essere nel formato projects/<PROJECT_ID>/topics/<TOPIC_NAME>. Ad esempio, projects/your-project-id/topics/your-topic-name.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome job univoco.
  4. (Facoltativo) Per Endpoint regionale, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona il modello File di testo su Cloud Storage in Pub/Sub (flusso).
  6. Inserisci i valori dei parametri nei campi dei parametri forniti.
  7. (Facoltativo) Per passare dall'elaborazione esattamente una volta alla modalità flusso di dati almeno una volta, seleziona Almeno una volta.
  8. Fai clic su Esegui job.

gcloud

Esegui il modello nella shell o nel terminale:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/ \
    --region REGION_NAME\
    --staging-location STAGING_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

Sostituisci quanto segue:

  • JOB_NAME: un nome job univoco a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • STAGING_LOCATION: la località per l'inserimento gestione temporanea dei file locali (ad esempio, gs://your-bucket/staging)
  • TOPIC_NAME: il nome dell'argomento Pub/Sub
  • BUCKET_NAME: il nome del bucket Cloud Storage
  • FILE_PATTERN: il glob del pattern del file da cui leggere nel bucket Cloud Storage (ad esempio, path/*.csv)

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta HTTP POST. Per saperne di più sull' API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto in cui vuoi eseguire il job Dataflow Google Cloud
  • JOB_NAME: un nome job univoco a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • STAGING_LOCATION: la località per l'inserimento gestione temporanea dei file locali (ad esempio, gs://your-bucket/staging)
  • TOPIC_NAME: il nome dell'argomento Pub/Sub
  • BUCKET_NAME: il nome del bucket Cloud Storage
  • FILE_PATTERN: il glob del pattern del file da cui leggere nel bucket Cloud Storage (ad esempio, path/*.csv)

Passaggi successivi