Creare un job personalizzato con il builder di job

Il builder dei job consente di creare job Dataflow di flusso e batch personalizzati. Puoi anche salvare i job del builder dei job come file Apache Beam YAML da condividere e riutilizzare.

Crea una nuova pipeline

Per creare una nuova pipeline nel builder dei job:

  1. Vai alla pagina Job nella Google Cloud console.

    Vai a Job

  2. Fai clic su Crea job dal builder.

  3. In Nome job, inserisci un nome per il job.

  4. Seleziona Batch o Flusso.

  5. Se selezioni Flusso, seleziona una modalità di finestra. Quindi, inserisci una specifica per la finestra, come segue:

    • Finestra fissa: inserisci una dimensione della finestra in secondi.
    • Finestra scorrevole: inserisci una dimensione della finestra e un periodo della finestra in secondi.
    • Finestra di sessione: inserisci un intervallo di sessione in secondi.

    Per saperne di più sulle finestre, consulta Finestre e funzioni di finestra.

A questo punto, aggiungi origini, trasformazioni e sink alla pipeline, come descritto nelle sezioni riportate di seguito.

Aggiungi un'origine alla pipeline

Una pipeline deve avere almeno un'origine. Inizialmente, il builder dei job viene compilato con un'origine vuota. Per configurare l'origine:

  1. Nella casella Nome origine, inserisci un nome per l'origine o utilizza il nome predefinito. Il nome viene visualizzato nel grafico del job quando esegui il job.

  2. Nell'elenco Tipo di origine, seleziona il tipo di origine dati.

  3. A seconda del tipo di origine, fornisci ulteriori informazioni di configurazione.

    • Ad esempio, se selezioni BigQuery, specifica la tabella da cui leggere.
    • Se selezioni Pub/Sub, specifica uno schema di messaggi. Inserisci il nome e il tipo di dati di ogni campo che vuoi leggere dai messaggi Pub/Sub. La pipeline elimina tutti i campi non specificati nello schema.
    • Se selezioni Apache Iceberg, specifica i dettagli della connessione per Iceberg REST Catalog (IRC), come l'identificatore della tabella Iceberg, il nome del catalogo, il tipo di catalogo, l'URI del catalogo e il nome del warehouse.
  4. (Facoltativo) Per alcuni tipi di origine, puoi fare clic su Visualizza l'anteprima dei dati di origine per visualizzare l'anteprima dei dati di origine.

Per aggiungere un'altra origine alla pipeline, fai clic su Aggiungi un'origine. Per combinare i dati di più origini, aggiungi una trasformazione SQL o Join alla pipeline.

Aggiungi una trasformazione alla pipeline

(Facoltativo) Aggiungi una o più trasformazioni alla pipeline. Puoi utilizzare le seguenti trasformazioni per manipolare, aggregare o unire i dati di origini e altre trasformazioni:

Tipo di trasformazione Descrizione Informazioni sulla trasformazione Beam YAML
Filtra (Python) Filtra i record con un'espressione Python.
Trasformazione SQL Gestisci i record o unisci più input con un'istruzione SQL.
Mappa campi (Python) Aggiungi nuovi campi o mappa di nuovo interi record con espressioni e funzioni Python.
Mappa campi (SQL) Aggiungi o mappa i campi dei record con espressioni SQL.
Trasformazioni YAML:
  1. AssertEqual
  2. AssignTimestamps
  3. Combine
  4. Explode
  5. Filter
  6. Flatten
  7. Join
  8. LogForTesting
  9. MLTransform
  10. MapToFields
  11. PyTransform
  12. WindowInfo

Utilizza qualsiasi trasformazione nell'SDK Beam YAML.

Configurazione della trasformazione YAML: fornisci i parametri di configurazione per la trasformazione YAML sotto forma di mappa YAML. Le coppie chiave-valore vengono utilizzate per compilare la sezione di configurazione della trasformazione Beam YAML risultante. Per i parametri di configurazione supportati per ciascun tipo di trasformazione, consulta la documentazione sulla trasformazione Beam YAML. Esempio di parametri di configurazione:

Combina
group_by:
combine:
Partecipa
type:
equalities:
fields:
Log Registra i record nei log dei worker del job.
Raggruppa per Combina i record con funzioni come count() e sum().
Partecipa Unisci più input su campi uguali.
Espandi Suddividi i record appiattendo i campi dell'array.

Per aggiungere una trasformazione:

  1. Fai clic su Aggiungi una trasformazione.

  2. Nella casella Nome trasformazione, inserisci un nome per la trasformazione o utilizza il nome predefinito. Il nome viene visualizzato nel grafico del job quando esegui il job.

  3. Nell'elenco Tipo di trasformazione, seleziona il tipo di trasformazione.

  4. A seconda del tipo di trasformazione, fornisci ulteriori informazioni di configurazione. Ad esempio, se selezioni Filtra (Python), inserisci un'espressione Python da utilizzare come filtro.

  5. Seleziona il passaggio di input per la trasformazione. Il passaggio di input è l'origine o la trasformazione il cui output fornisce l'input per questa trasformazione.

Aggiungi un sink alla pipeline

Una pipeline deve avere almeno un sink. Inizialmente, il builder dei job viene compilato con un sink vuoto. Per configurare il sink:

  1. Nella casella Nome sink, inserisci un nome per il sink o utilizza il nome predefinito. Il nome viene visualizzato nel grafico del job quando esegui il job.

  2. Nell'elenco Tipo di sink, seleziona il tipo di sink.

  3. A seconda del tipo di sink, fornisci ulteriori informazioni di configurazione. Ad esempio, se selezioni il sink BigQuery, seleziona la tabella BigQuery in cui scrivere.

  4. Seleziona il passaggio di input per il sink. Il passaggio di input è l'origine o la trasformazione il cui output fornisce l'input per questa trasformazione.

  5. Per aggiungere un altro sink alla pipeline, fai clic su Aggiungi un sink.

Esegui la pipeline

Per eseguire una pipeline dal builder dei job:

  1. (Facoltativo) Imposta le opzioni del job Dataflow. Per espandere la sezione delle opzioni di Dataflow, fai clic sulla freccia espansore.

  2. Fai clic su Esegui job. Il builder dei job passa al grafico del job per il job inviato. Puoi utilizzare il grafico del job per monitorare lo stato del job.

Convalida la pipeline prima del lancio

Per le pipeline con configurazioni complesse, come i filtri Python e le espressioni SQL, può essere utile controllare la configurazione della pipeline per verificare la presenza di errori di sintassi prima del lancio. Per convalidare la sintassi della pipeline:

  1. Fai clic su Convalida per aprire Cloud Shell e avviare il servizio di convalida.
  2. Fai clic su Inizia la convalida.
  3. Se viene rilevato un errore durante la convalida, viene visualizzato un punto esclamativo rosso.
  4. Correggi gli errori rilevati e verifica le correzioni facendo clic su Convalida. Se non viene rilevato alcun errore, viene visualizzato un segno di spunta verde.

Esegui con gcloud CLI

Puoi anche eseguire le pipeline Beam YAML utilizzando gcloud CLI. Per eseguire una pipeline del builder dei job con gcloud CLI:

  1. Fai clic su Salva YAML per aprire la finestra Salva YAML.

  2. Esegui una delle seguenti azioni:

    • Per salvare in Cloud Storage, inserisci un percorso Cloud Storage e fai clic su Salva.
    • Per scaricare un file locale, fai clic su Scarica.
  3. Esegui questo comando nella shell o nel terminale:

      gcloud dataflow yaml run my-job-builder-job --yaml-pipeline-file=YAML_FILE_PATH
    

    Sostituisci YAML_FILE_PATH con il percorso del file YAML, in locale o in Cloud Storage.

Passaggi successivi