Il modello Pub/Sub Avro to BigQuery è una pipeline in modalità flusso che importa dati Avro da una sottoscrizione Pub/Sub in una tabella BigQuery. Eventuali errori che si verificano durante la scrittura nella tabella BigQuery vengono inseriti in modalità flusso in un argomento Pub/Sub non elaborato.
Prima di eseguire una pipeline Dataflow per questo scenario, valuta se una sottoscrizione BigQuery di Pub/Sub con una UDF soddisfa i tuoi requisiti.
Requisiti della pipeline
- Deve esistere la sottoscrizione Pub/Sub di input.
- Il file dello schema per i record Avro deve esistere in Cloud Storage.
- Deve esistere l'argomento Pub/Sub non elaborato.
- Deve esistere il set di dati BigQuery di output.
Parametri del modello
Parametri obbligatori
- schemaPath: la posizione Cloud Storage del file dello schema Avro. Ad esempio,
gs://path/to/my/schema.avsc. - inputSubscription: la sottoscrizione Pub/Sub di input da cui leggere. Ad esempio,
projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_ID>. - outputTableSpec: la posizione della tabella di output BigQuery in cui scrivere l'output. Ad esempio,
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.A seconda delcreateDispositionspecificato, la tabella di output potrebbe essere creata automaticamente utilizzando lo schema Avro fornito dall'utente. - outputTopic: l'argomento Pub/Sub da utilizzare per i record non elaborati. Ad esempio,
projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
Parametri facoltativi
- useStorageWriteApiAtLeastOnce: quando si utilizza l'API Storage Write, specifica la semantica di scrittura. Per utilizzare la semantica almeno una volta (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), imposta questo parametro su true. Per utilizzare la semantica di esecuzione esatta, imposta il parametro su
false. Questo parametro si applica solo quandouseStorageWriteApiètrue. Il valore predefinito èfalse. - writeDisposition: il valore di BigQuery WriteDisposition (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Ad esempio,
WRITE_APPEND,WRITE_EMPTYoWRITE_TRUNCATE. Il valore predefinito èWRITE_APPEND. - createDisposition: BigQuery CreateDisposition (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Ad esempio,
CREATE_IF_NEEDEDeCREATE_NEVER. Il valore predefinito èCREATE_IF_NEEDED. - useStorageWriteApi: se è true, la pipeline utilizza l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). Il valore predefinito è
false. Per saperne di più, consulta la pagina Utilizzo dell'API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams: quando utilizzi l'API Storage Write, specifica il numero di flussi di scrittura. Se
useStorageWriteApiètrueeuseStorageWriteApiAtLeastOnceèfalse, devi impostare questo parametro. Il valore predefinito è 0. - storageWriteApiTriggeringFrequencySec: quando si utilizza l'API Storage Write, specifica la frequenza di attivazione, in secondi. Se
useStorageWriteApiètrueeuseStorageWriteApiAtLeastOnceèfalse, devi impostare questo parametro.
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione
predefinita è
us-central1.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.
- Dal menu a discesa Modello Dataflow, seleziona the Pub/Sub Avro to BigQuery template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
Sostituisci quanto segue:
JOB_NAME: un nome univoco del job a tua sceltaREGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1VERSION: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latestper utilizzare l'ultima versione del modello, disponibile nella cartella principale senza data nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH: il percorso Cloud Storage del file schema Avro (ad esempiogs://MyBucket/file.avsc)SUBSCRIPTION_NAME: il nome della sottoscrizione Pub/Sub di inputBIGQUERY_TABLE: il nome della tabella di output BigQueryDEADLETTER_TOPIC: l'argomento Pub/Sub da utilizzare per la coda non elaborata
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per saperne di più sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
Sostituisci quanto segue:
JOB_NAME: un nome univoco del job a tua sceltaLOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1VERSION: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latestper utilizzare l'ultima versione del modello, disponibile nella cartella principale senza data nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH: il percorso Cloud Storage del file schema Avro (ad esempiogs://MyBucket/file.avsc)SUBSCRIPTION_NAME: il nome della sottoscrizione Pub/Sub di inputBIGQUERY_TABLE: il nome della tabella di output BigQueryDEADLETTER_TOPIC: l'argomento Pub/Sub da utilizzare per la coda non elaborata
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.