Modello Pub/Sub a BigQuery con UDF Python

Il modello Da Pub/Sub a BigQuery con UDF Python è una pipeline di flusso che legge i messaggi in formato JSON da Pub/Sub e li scrive in una tabella BigQuery. Se vuoi, puoi fornire una funzione definita dall'utente (UDF) scritta in Python per elaborare i messaggi in entrata.

Requisiti della pipeline

  • La tabella BigQuery deve esistere e avere uno schema.
  • I dati dei messaggi Pub/Sub devono utilizzare il formato JSON oppure devi fornire una UDF che converta i dati dei messaggi in JSON. I dati JSON devono corrispondere allo schema della tabella BigQuery. Ad esempio, se i payload JSON sono formattati come {"k1":"v1", "k2":"v2"}, la tabella BigQuery deve avere due colonne di stringhe denominate k1 e k2.
  • Specifica il parametro inputSubscription o inputTopic, ma non entrambi.

Parametri del modello

Parametri obbligatori

  • outputTableSpec: la tabella BigQuery in cui scrivere, formattata come PROJECT_ID:DATASET_NAME.TABLE_NAME.

Parametri facoltativi

  • inputTopic: l'argomento Pub/Sub da cui leggere, formattato come projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • inputSubscription: la sottoscrizione Pub/Sub da cui leggere, formattata come projects/<PROJECT_ID>/subscriptions/<SUBCRIPTION_NAME>.
  • outputDeadletterTable: la tabella BigQuery da utilizzare per i messaggi che non sono riusciti a raggiungere la tabella di output, formattata come PROJECT_ID:DATASET_NAME.TABLE_NAME. Se la tabella non esiste, viene creata quando viene eseguita la pipeline. Se questo parametro non viene specificato, viene utilizzato il valore OUTPUT_TABLE_SPEC_error_records.
  • useStorageWriteApiAtLeastOnce: quando si utilizza l'API Storage Write, specifica la semantica di scrittura. Per utilizzare la semantica di almeno una volta (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), imposta questo parametro su true. Per utilizzare la semantica di esattamente una volta, imposta il parametro su false. Questo parametro si applica solo quando useStorageWriteApi è true. Il valore predefinito è false.
  • useStorageWriteApi: se è true, la pipeline utilizza l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). Il valore predefinito è false. Per saperne di più, vedi Utilizzo dell'API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: quando si utilizza l'API Storage Write, specifica il numero di flussi di scrittura. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro. Il valore predefinito è 0.
  • storageWriteApiTriggeringFrequencySec: quando si utilizza l'API Storage Write, specifica la frequenza di attivazione in secondi. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro.
  • pythonExternalTextTransformGcsPath: il pattern del percorso Cloud Storage per il codice Python contenente le funzioni definite dall'utente. Ad esempio, gs://your-bucket/your-function.py.
  • pythonExternalTextTransformFunctionName: il nome della funzione da chiamare dal file Python. Utilizza solo lettere, cifre e trattini bassi. Ad esempio, 'transform' or 'transform_udf1'.

Funzione definita dall'utente

Se vuoi, puoi estendere questo modello scrivendo una funzione definita dall'utente (UDF). Il modello chiama la UDF per ogni elemento di input. I payload degli elementi vengono serializzati come stringhe JSON. Per saperne di più, vedi Creare funzioni definite dall'utente per i modelli Dataflow.

Specifica della funzione

La UDF ha la seguente specifica:

  • Input: il campo dei dati dei messaggi Pub/Sub, serializzato come stringa JSON.
  • Output: una stringa JSON che corrisponde allo schema della tabella BigQuery di destinazione.
  • Esegui il modello

    Console

    1. Vai alla pagina Crea job da modello di Dataflow.
    2. Vai a Crea job da modello
    3. Nel campo Nome job, inserisci un nome job univoco.
    4. (Facoltativo) Per Endpoint regionale, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

      Per un elenco delle regioni in cui puoi eseguire un job Dataflow, vedi Località di Dataflow.

    5. Nel menu a discesa Modello Dataflow, seleziona il modello Da Pub/Sub a BigQuery con UDF Python.
    6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
    7. (Facoltativo) Per passare dall'elaborazione di esattamente una volta alla modalità di flusso di dati di almeno una volta, seleziona Almeno una volta.
    8. Fai clic su Esegui job.

    gcloud

    Nella shell o nel terminale, esegui il modello:

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/ \
        --region REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

    Sostituisci quanto segue:

    • JOB_NAME: un nome job univoco a tua scelta
    • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
    • VERSION: la versione del modello che vuoi utilizzare

      Puoi utilizzare i seguenti valori:

    • STAGING_LOCATION: la località per la gestione temporanea dei file locali (ad esempio, gs://your-bucket/staging)
    • TOPIC_NAME: il nome dell'argomento Pub/Sub
    • DATASET: il set di dati BigQuery
    • TABLE_NAME: il nome della tabella BigQuery

    API

    Per eseguire il modello utilizzando l'API REST, invia una richiesta HTTP POST. Per saperne di più sull'API e sui relativi ambiti di autorizzazione, vedi projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
           "inputTopic": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/",
       }
    }

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID progetto in cui vuoi eseguire il job Dataflow Google Cloud
    • JOB_NAME: un nome job univoco a tua scelta
    • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
    • VERSION: la versione del modello che vuoi utilizzare

      Puoi utilizzare i seguenti valori:

    • STAGING_LOCATION: la località per la gestione temporanea dei file locali (ad esempio, gs://your-bucket/staging)
    • TOPIC_NAME: il nome dell'argomento Pub/Sub
    • DATASET: il set di dati BigQuery
    • TABLE_NAME: il nome della tabella BigQuery

    Passaggi successivi