Modello Pub/Sub a BigQuery

Il modello Da Pub/Sub a BigQuery è una pipeline di flusso che legge i messaggi in formato JSON da Pub/Sub e li scrive in una tabella BigQuery. Se vuoi, puoi fornire una funzione definita dall'utente (UDF) scritta in JavaScript per elaborare i messaggi in entrata.

Prima di eseguire una pipeline Dataflow per questo scenario, valuta se una sottoscrizione BigQuery di Pub/Sub con una UDF soddisfa i tuoi requisiti.

Requisiti della pipeline

  • La tabella BigQuery deve esistere e avere uno schema.
  • I dati dei messaggi Pub/Sub devono utilizzare il formato JSON oppure devi fornire una UDF che converta i dati dei messaggi in JSON. I dati JSON devono corrispondere allo schema della tabella BigQuery. Ad esempio, se i payload JSON sono formattati come {"k1":"v1", "k2":"v2"}, la tabella BigQuery deve avere due colonne di stringhe denominate k1 e k2.
  • Specifica il parametro inputSubscription o inputTopic, ma non entrambi.

Parametri del modello

Parametri obbligatori

  • outputTableSpec: la tabella BigQuery in cui scrivere, formattata come PROJECT_ID:DATASET_NAME.TABLE_NAME.

Parametri facoltativi

  • inputTopic: l'argomento Pub/Sub da cui leggere, formattato come projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • inputSubscription: la sottoscrizione Pub/Sub da cui leggere, formattata come projects/<PROJECT_ID>/subscriptions/<SUBCRIPTION_NAME>.
  • outputDeadletterTable: la tabella BigQuery da utilizzare per i messaggi che non sono riusciti a raggiungere la tabella di output, formattata come PROJECT_ID:DATASET_NAME.TABLE_NAME. Se la tabella non esiste, viene creata quando viene eseguita la pipeline. Se questo parametro non viene specificato, viene utilizzato il valore OUTPUT_TABLE_SPEC_error_records.
  • useStorageWriteApiAtLeastOnce: quando si utilizza l'API Storage Write, specifica la semantica di scrittura. Per utilizzare la semantica di tipo "almeno una volta" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), imposta questo parametro su true. Per utilizzare la semantica di tipo "esattamente una volta", imposta il parametro su false. Questo parametro si applica solo quando useStorageWriteApi è true. Il valore predefinito è false.
  • useStorageWriteApi: se è true, la pipeline utilizza l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). Il valore predefinito è false. Per saperne di più, vedi Utilizzo dell'API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: quando si utilizza l'API Storage Write, specifica il numero di flussi di scrittura. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro. Il valore predefinito è 0.
  • storageWriteApiTriggeringFrequencySec: quando si utilizza l'API Storage Write, specifica la frequenza di attivazione, in secondi. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro.
  • javascriptTextTransformGcsPath: l'URI Cloud Storage del file .js che definisce la funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: il nome della funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di UDF JavaScript, vedi Esempi di UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: specifica la frequenza di ricaricamento della UDF, in minuti. Se il valore è maggiore di 0, Dataflow controlla periodicamente il file UDF in Cloud Storage e ricarica la UDF se il file viene modificato. Questo parametro ti consente di aggiornare la UDF mentre la pipeline è in esecuzione, senza dover riavviare il job. Se il valore è 0, il ricaricamento della UDF è disabilitato. Il valore predefinito è 0.

Funzione definita dall'utente

Se vuoi, puoi estendere questo modello scrivendo una funzione definita dall'utente (UDF). Il modello chiama la UDF per ogni elemento di input. I payload degli elementi vengono serializzati come stringhe JSON. Per saperne di più, vedi Creare funzioni definite dall'utente per i modelli Dataflow.

Specifiche della funzione

La UDF ha le seguenti specifiche:

  • Input: il campo dei dati dei messaggi Pub/Sub, serializzato come stringa JSON.
  • Output: una stringa JSON che corrisponde allo schema della tabella BigQuery di destinazione.
  • Esegui il modello

    Console

    1. Vai alla pagina Crea job da modello di Dataflow.
    2. Vai a Crea job da modello
    3. Nel campo Nome job, inserisci un nome job univoco.
    4. (Facoltativo) Per Endpoint regionale, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

      Per un elenco delle regioni in cui puoi eseguire un job Dataflow, vedi Località di Dataflow.

    5. Dal menu a discesa Modello Dataflow, seleziona il modello Da Pub/Sub a BigQuery.
    6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
    7. (Facoltativo) Per passare dalla modalità di elaborazione di tipo "esattamente una volta" alla modalità di flusso di dati di tipo "almeno una volta", seleziona Almeno una volta.
    8. Fai clic su Esegui job.

    gcloud

    Nella shell o nel terminale, esegui il modello:

    gcloud dataflow flex-template run JOB_NAME \
        --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/ \
        --template-file-gcs-location REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

    Sostituisci quanto segue:

    • JOB_NAME: un nome job univoco a tua scelta
    • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
    • VERSION: la versione del modello che vuoi utilizzare

      Puoi utilizzare i seguenti valori:

    • STAGING_LOCATION: la località per la gestione temporanea dei file locali (ad esempio, gs://your-bucket/staging)
    • TOPIC_NAME: il nome dell'argomento Pub/Sub
    • DATASET: il set di dati BigQuery
    • TABLE_NAME: il nome della tabella BigQuery

    API

    Per eseguire il modello utilizzando l'API REST, invia una richiesta HTTP POST. Per saperne di più sull'API e sui relativi ambiti di autorizzazione, vedi projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
           "inputTopic": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/",
       }
    }

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID progetto in cui vuoi eseguire il job Dataflow Google Cloud
    • JOB_NAME: un nome job univoco a tua scelta
    • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
    • VERSION: la versione del modello che vuoi utilizzare

      Puoi utilizzare i seguenti valori:

    • STAGING_LOCATION: la località per la gestione temporanea dei file locali (ad esempio, gs://your-bucket/staging)
    • TOPIC_NAME: il nome dell'argomento Pub/Sub
    • DATASET: il set di dati BigQuery
    • TABLE_NAME: il nome della tabella BigQuery

    Passaggi successivi