Questo modello crea una pipeline di streaming che esegue il polling continuo per rilevare i nuovi file di testo caricati in Cloud Storage, legge ogni file riga per riga e pubblica le stringhe in un argomento Pub/Sub. Il modello pubblica i record in un file delimitato da accapo contenente record JSON o in un file CSV in un argomento Pub/Sub per l'elaborazione in tempo reale. Puoi utilizzare questo modello per replicare dati in Pub/Sub.
La pipeline viene eseguita a tempo indeterminato e deve essere terminata manualmente tramite un'operazione di "annullamento" e non di "svuotamento", a causa dell'utilizzo della trasformazione "Watch", che è una "SplittableDoFn" che non supporta lo svuotamento.
Al momento, l'intervallo di polling è fisso e impostato su 10 secondi. Questo modello non imposta alcun timestamp nei singoli record, quindi l'ora dell'evento corrisponde a quella di pubblicazione durante l'esecuzione. Se la tua pipeline si basa su un'indicazione accurata sull'ora dell'evento per l'elaborazione, non dovresti utilizzare questa pipeline.
Requisiti della pipeline
- I file di input devono essere in formato JSON delimitato da nuova riga o in formato CSV. I record che si estendono su più righe nei file di origine possono causare problemi downstream, perché ogni riga all'interno dei file viene pubblicata come messaggio in Pub/Sub.
- L'argomento Pub/Sub deve esistere prima dell'esecuzione.
- La pipeline viene eseguita a tempo indeterminato e deve essere terminata manualmente.
Parametri del modello
Parametri obbligatori
- inputFilePattern: il pattern del file di input da cui leggere. Ad esempio,
gs://bucket-name/files/*.json. - outputTopic: l'argomento di input Pub/Sub in cui scrivere. Il nome deve essere nel formato
projects/<PROJECT_ID>/topics/<TOPIC_NAME>. Ad esempio:projects/your-project-id/topics/your-topic-name.
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione
predefinita è
us-central1.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.
- Dal menu a discesa Modello Dataflow, seleziona the Text Files on Cloud Storage to Pub/Sub (Stream) template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- (Facoltativo) Per passare dall'elaborazione exactly-once alla modalità di streaming almeno una volta, seleziona Almeno una volta.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Stream_GCS_Text_to_Cloud_PubSub \ --region REGION_NAME\ --staging-location STAGING_LOCATION\ --parameters \ inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME
Sostituisci quanto segue:
JOB_NAME: un nome univoco del job a tua sceltaREGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1STAGING_LOCATION: la posizione per lo staging dei file locali (ad esempio,gs://your-bucket/staging)TOPIC_NAME: il nome dell'argomento Pub/SubBUCKET_NAME: il nome del bucket Cloud StorageFILE_PATTERN: il pattern glob del file da leggere nel bucket Cloud Storage (ad esempio,path/*.csv)
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per saperne di più sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Stream_GCS_Text_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" } }
Sostituisci quanto segue:
PROJECT_ID: l'ID progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME: un nome univoco del job a tua sceltaLOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1STAGING_LOCATION: la posizione per lo staging dei file locali (ad esempio,gs://your-bucket/staging)TOPIC_NAME: il nome dell'argomento Pub/SubBUCKET_NAME: il nome del bucket Cloud StorageFILE_PATTERN: il pattern glob del file da leggere nel bucket Cloud Storage (ad esempio,path/*.csv)
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.