Esportare dati in Pub/Sub (ETL inverso)

L'esportazione dei dati in Pub/Sub richiede l'utilizzo di query continue di BigQuery .

Questo documento descrive come configurare l'estrazione-trasformazione-caricamento (ETL) inversa da BigQuery a Pub/Sub. Puoi farlo utilizzando l' EXPORT DATA istruzione in una query continua per esportare i dati da BigQuery a un argomento Pub/Sub.

Puoi utilizzare un workflow RETL in Pub/Sub per combinare le funzionalità di analisi di BigQuery con il servizio di messaggistica globale asincrono e scalabile di Pub/Sub. Questo workflow ti consente di erogare dati alle applicazioni e ai servizi downstream in modo basato sugli eventi.

Prerequisiti

Devi creare un account di servizio. È necessario un account di servizio per eseguire una query continua che esporta i risultati in un argomento Pub/Sub.

Devi creare un argomento Pub/Sub per ricevere i risultati della query continua come messaggi e una sottoscrizione Pub/Sub che l'applicazione di destinazione può utilizzare per ricevere questi messaggi.

Ruoli obbligatori

Questa sezione fornisce informazioni sui ruoli e sulle autorizzazioni richiesti dall'account utente che crea la query continua e dal account di servizio che esegue la query continua.

Autorizzazioni account utente

Per creare un job in BigQuery, l'account utente deve disporre dell'autorizzazione IAM bigquery.jobs.create. Ciascuno dei seguenti ruoli IAM concede l'autorizzazione bigquery.jobs.create:

Per inviare un job che viene eseguito utilizzando un account di servizio, l'account utente deve disporre del ruolo Utente account di servizio (roles/iam.serviceAccountUser). Se utilizzi lo stesso account utente per creare il service account, l'account utente deve disporre del ruolo Amministratore account di servizio (roles/iam.serviceAccountAdmin). Per informazioni su come limitare l'accesso di un utente a un singolo account di servizio, anziché a tutti i service account all'interno di un progetto, consulta Concedere un singolo ruolo.

Se l'account utente deve abilitare le API richieste per il caso d'uso della query continua, l'account utente deve disporre del ruolo Amministratore Service Usage (roles/serviceusage.serviceUsageAdmin).

Autorizzazioni service account

Per esportare i dati da una tabella BigQuery, il account di servizio deve disporre dell'autorizzazione IAM bigquery.tables.export. Ciascuno dei seguenti ruoli IAM concede l'autorizzazione bigquery.tables.export:

Affinché il account di servizio possa accedere a Pub/Sub, devi concedergli entrambi i seguenti ruoli IAM:

Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati.

Prima di iniziare

Abilita le API BigQuery e Pub/Sub.

Ruoli richiesti per abilitare le API

Per abilitare le API, devi disporre del ruolo IAM Amministratore utilizzo servizi (roles/serviceusage.serviceUsageAdmin), che contiene l'autorizzazione serviceusage.services.enable. Scopri come concedere i ruoli.

Abilita le API

Esportare in Pub/Sub

Utilizza l' EXPORT DATAistruzione per esportare i dati in un argomento Pub/Sub:

Console

  1. Nella Google Cloud console, vai alla pagina BigQuery.

    Vai a BigQuery

  2. Nell'editor di query, fai clic su Altro > Impostazioni query.

  3. Nella sezione Query continua, seleziona la casella di controllo Utilizza la modalità query continua.

  4. Nella casella Service account, seleziona il account di servizio che hai creato.

  5. Fai clic su Salva.

  6. Nell'editor di query, inserisci la seguente istruzione:

    EXPORT DATA
    OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID'
    ) AS
    (
    QUERY
    );

    Sostituisci quanto segue:

    • PROJECT_ID: il tuo ID progetto.
    • TOPIC_ID: l'ID argomento Pub/Sub. Puoi ottenere l'ID argomento dalla pagina Argomenti Google Cloud della console.
    • QUERY: l'istruzione SQL per selezionare i dati da esportare. L'istruzione SQL deve contenere solo operazioni supportate. Devi utilizzare la funzione APPENDS nella clausola FROM di una query continua per specificare il punto nel tempo in cui iniziare l'elaborazione dei dati.
  7. Fai clic su Esegui.

bq

  1. Nella Google Cloud console, attiva Cloud Shell.

    Attiva Cloud Shell

    Nella parte inferiore della Google Cloud console viene avviata una sessione di Cloud Shell e viene visualizzato un prompt della riga di comando. Cloud Shell è un ambiente shell con Google Cloud CLI già inclusa e installata e con valori già impostati per il progetto corrente. L'inizializzazione della sessione può richiedere alcuni secondi.

  2. Nella riga di comando, esegui la query continua utilizzando il bq query comando con i seguenti flag:

    • Imposta il flag --continuous su true per rendere continua la query.
    • Utilizza il flag --connection_property per specificare un account di servizio da utilizzare.
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'EXPORT DATA OPTIONS (format = "CLOUD_PUBSUB", uri = "https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID") AS (QUERY);'

    Sostituisci quanto segue:

    • PROJECT_ID: il tuo ID progetto.
    • SERVICE_ACCOUNT_EMAIL: l'indirizzo email del service account. Puoi ottenere l'indirizzo email del account di servizio nella pagina Service account della Google Cloud console.
    • QUERY: l'istruzione SQL per selezionare i dati da esportare. L'istruzione SQL deve contenere solo operazioni supportate. Devi utilizzare la funzione APPENDS nella clausola FROM di una query continua per specificare il punto nel tempo in cui iniziare l'elaborazione dei dati.

API

  1. Esegui la query continua chiamando il jobs.insert metodo. Imposta i seguenti campi nella JobConfigurationQuery risorsa del Job resource che passi:

    • Imposta il campo continuous su true per rendere continua la query.
    • Utilizza il campo connection_property per specificare un account di servizio da utilizzare.
    curl --request POST \
      'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs'
      --header 'Authorization: Bearer $(gcloud auth print-access-token) \
      --header 'Accept: application/json' \
      --header 'Content-Type: application/json' \
      --data '("configuration":("query":"EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID') AS (QUERY);","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \
      --compressed

    Sostituisci quanto segue:

    • PROJECT_ID: il tuo ID progetto.
    • QUERY: l'istruzione SQL per selezionare i dati da esportare. L'istruzione SQL deve contenere solo operazioni supportate. Devi utilizzare la funzione APPENDS nella clausola FROM di una query continua per specificare il punto nel tempo in cui iniziare l'elaborazione dei dati.
    • SERVICE_ACCOUNT_EMAIL: l'indirizzo email del service account. Puoi ottenere l'indirizzo email del account di servizio nella pagina Service account della Google Cloud console.

Esportare più colonne in Pub/Sub

Se vuoi includere più colonne nell'output, puoi creare una colonna struct per contenere i valori delle colonne, quindi convertire il valore struct in una stringa JSON utilizzando la TO_JSON_STRING funzione. L'esempio seguente esporta i dati da quattro colonne, formattati come stringa JSON:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

Ottimizzazione dell'esportazione

Se il rendimento del job di query continua sembra essere limitato da risorse di calcolo disponibili, prova ad aumentare le dimensioni dell'assegnazione della prenotazione degli slot CONTINUOUS di BigQuery.

Limitazioni

  • I dati esportati devono essere costituiti da una singola STRING o BYTES colonna. Il nome della colonna può essere quello che preferisci.
  • Devi utilizzare una query continua per esportare in Pub/Sub.
  • Non puoi passare uno schema a un argomento Pub/Sub nella query continua.
  • Non puoi esportare i dati in un argomento Pub/Sub che utilizza uno schema.
  • Quando esporti in Pub/Sub, puoi esportare record in formato JSON in cui alcuni valori sono NULL, ma non puoi esportare record costituiti solo da valori NULL. Puoi escludere i record NULL dai risultati della query includendo un filtro WHERE message IS NOT NULL nella query continua.
  • Quando esporti i dati in un argomento Pub/Sub configurato con un endpoint regionale, l'endpoint deve essere configurato all'interno dello stesso Google Cloud limite regionale del set di dati BigQuery che contiene la tabella su cui stai eseguendo la query.
  • I dati esportati non devono superare le quote di Pub/Sub.

Prezzi

Quando esporti i dati in una query continua, la fatturazione viene effettuata in base ai prezzi di calcolo della capacità di BigQuery. Per eseguire query continue, devi disporre di una prenotazione che utilizzi la versione Enterprise o Enterprise Plus, e di un'assegnazione della prenotazione che utilizzi il tipo di prestazione CONTINUOUS.

Dopo l'esportazione dei dati, ti viene addebitato l'utilizzo di Pub/Sub. Per ulteriori informazioni, consulta i prezzi di Pub/Sub.