I/O gestite da Dataflow

L'I/O gestito consente a Dataflow di gestire connettori I/O specifici utilizzati nelle pipeline Apache Beam. Managed I/O semplifica la gestione delle pipeline che si integrano con origini e sink supportati.

L'I/O gestito è costituito da due componenti che funzionano insieme:

  • Una trasformazione Apache Beam che fornisce un'API comune per la creazione di connettori I/O (origini e sink).

  • Un servizio Dataflow che gestisce questi connettori I/O per tuo conto, inclusa la possibilità di eseguirne l'upgrade indipendentemente dalla versione di Apache Beam.

I vantaggi dell'I/O gestito includono:

  • Upgrade automatici. Dataflow esegue automaticamente l'upgrade dei connettori I/O gestiti nella pipeline. Ciò significa che la tua pipeline riceve correzioni di sicurezza, miglioramenti delle prestazioni e correzioni di bug per questi connettori, senza richiedere modifiche al codice. Per saperne di più, consulta Upgrade automatici.

  • API coerente. Tradizionalmente, i connettori I/O in Apache Beam hanno API distinte e ogni connettore è configurato in modo diverso. Managed I/O fornisce una singola API di configurazione che utilizza proprietà chiave-valore, il che si traduce in un codice della pipeline più semplice e coerente. Per ulteriori informazioni, consulta API di configurazione.

Requisiti

  • I seguenti SDK supportano I/O gestito:

    • Apache Beam SDK per Java versione 2.58.0 o successive.
    • SDK Apache Beam per Python versione 2.61.0 o successive.
  • Il servizio di backend richiede Dataflow Runner v2. Se Runner v2 non è abilitato, la pipeline viene comunque eseguita, ma non usufruisce dei vantaggi del servizio I/O gestito.

Upgrade automatici

Le pipeline Dataflow con connettori I/O gestiti utilizzano automaticamente l'ultima versione affidabile del connettore. Gli upgrade automatici si verificano nei seguenti punti del ciclo di vita del job:

  • Invio di un lavoro. Quando invii un job batch o di streaming, Dataflow utilizza la versione più recente del connettore I/O gestito che è stata testata e funziona correttamente.

  • Upgrade in sequenza. Per i job di streaming, Dataflow esegue l'upgrade dei connettori I/O gestiti nelle pipeline in esecuzione man mano che diventano disponibili nuove versioni. Non devi preoccuparti di aggiornare manualmente il connettore o la versione di Apache Beam della pipeline.

    Per impostazione predefinita, gli upgrade continui vengono eseguiti entro un periodo di 30 giorni, ovvero circa ogni 30 giorni. Puoi regolare la finestra o disattivare gli upgrade continui in base al job. Per saperne di più, vedi Impostare la finestra di upgrade progressivo.

    Una settimana prima dell'upgrade, Dataflow scrive un messaggio di notifica nei log dei messaggi del job.

  • Job di sostituzione. Per i job di streaming, Dataflow verifica la presenza di aggiornamenti ogni volta che avvii un job di sostituzione e utilizza automaticamente l'ultima versione nota e funzionante. Dataflow esegue questo controllo anche se non modifichi alcun codice nel job di sostituzione.

Il seguente diagramma mostra la procedura di upgrade. L'utente crea una pipeline Apache Beam utilizzando la versione dell'SDK X. Dataflow esegue l'upgrade della versione di Managed I/O all'ultima versione supportata. L'upgrade viene eseguito quando l'utente invia il job, dopo la finestra di upgrade in sequenza o quando l'utente invia un job di sostituzione.

Diagramma che mostra la procedura di upgrade di Managed I/O.

La procedura di upgrade aggiunge circa due minuti al tempo di avvio del primo job (per progetto) che utilizza I/O gestito e può richiedere circa mezzo minuto per i job successivi. Per gli upgrade continui, il servizio Dataflow avvia un job di sostituzione. Ciò può comportare tempi di inattività temporanei per la pipeline, poiché il pool di lavoratori esistente viene arrestato e viene avviato un nuovo pool di lavoratori. Per controllare lo stato delle operazioni di I/O gestite, cerca le voci di log che includono la stringa "Managed Transform(s)".

Imposta la finestra di aggiornamento in sequenza

Per specificare la finestra di upgrade per un job Dataflow di streaming, imposta l'opzione di servizio managed_transforms_rolling_upgrade_window uguale al numero di giorni. Il valore deve essere compreso tra 10 e 90 giorni inclusi.

Java

--dataflowServiceOptions=managed_transforms_rolling_upgrade_window=DAYS

Python

--dataflow_service_options=managed_transforms_rolling_upgrade_window=DAYS

gcloud

Utilizza il comando gcloud dataflow jobs run con l'opzione additional-experiments. Se utilizzi un modello flessibile che utilizza Managed I/O, utilizza il comando gcloud dataflow flex-template run.

--additional-experiments=managed_transforms_rolling_upgrade_window=DAYS

Per disattivare gli upgrade continui, imposta l'opzione di servizio managed_transforms_rolling_upgrade_window su never. Puoi comunque attivare un aggiornamento avviando un job di sostituzione.

Java

--dataflowServiceOptions=managed_transforms_rolling_upgrade_window=never

Python

--dataflow_service_options=managed_transforms_rolling_upgrade_window=never

Go

--dataflow_service_options=managed_transforms_rolling_upgrade_window=never

gcloud

Utilizza il comando gcloud dataflow jobs run con l'opzione additional-experiments. Se utilizzi modelli flessibili, utilizza il comando gcloud dataflow flex-template run.

--additional-experiments=managed_transforms_rolling_upgrade_window=never

API di configurazione

Managed I/O è una trasformazione Apache Beam chiavi in mano che fornisce un'API coerente per configurare origini e sink.

Java

Per creare qualsiasi origine o sink supportato da Managed I/O, utilizza la classe Managed. Specifica l'origine o il sink da istanziare e passa un insieme di parametri di configurazione, simili ai seguenti:

Map config = ImmutableMap.<String, Object>builder()
    .put("config1", "abc")
    .put("config2", 1);

pipeline.apply(Managed.read(/*Which source to read*/).withConfig(config))
    .getSinglePCollection();

Puoi anche passare i parametri di configurazione come file YAML. Per un esempio di codice completo, vedi Lettura da Apache Iceberg.

Python

Importa il modulo apache_beam.transforms.managed e chiama il metodo managed.Read o managed.Write. Specifica l'origine o il sink da istanziare e passa un insieme di parametri di configurazione, simile al seguente:

pipeline
| beam.managed.Read(
    beam.managed.SOURCE, # Example: beam.managed.KAFKA
    config={
      "config1": "abc",
      "config2": 1
    }
)

Puoi anche passare i parametri di configurazione come file YAML. Per un esempio di codice completo, consulta Lettura da Apache Kafka.

Destinazioni dinamiche

Per alcuni sink, il connettore I/O gestito può selezionare dinamicamente una destinazione in base ai valori dei campi nei record in entrata.

Per utilizzare le destinazioni dinamiche, fornisci una stringa di modello per la destinazione. La stringa del modello può includere nomi di campi tra parentesi graffe, ad esempio "tables.{field1}". In fase di runtime, il connettore sostituisce il valore del campo per ogni record in entrata, per determinare la destinazione del record.

Ad esempio, supponiamo che i tuoi dati abbiano un campo denominato airport. Potresti impostare la destinazione su "flights.{airport}". Se airport=SFO, il record viene scritto in flights.SFO. Per i campi nidificati, utilizza la notazione con il punto. Ad esempio: {top.middle.nested}.

Per un codice di esempio che mostra come utilizzare le destinazioni dinamiche, consulta Scrivere con destinazioni dinamiche.

Filtri

Potresti voler filtrare determinati campi prima che vengano scritti nella tabella di destinazione. Per i sink che supportano le destinazioni dinamiche, puoi utilizzare il parametro drop, keep o only a questo scopo. Questi parametri ti consentono di includere i metadati di destinazione nei record di input, senza scriverli nella destinazione.

Puoi impostare al massimo uno di questi parametri per un determinato sink.

Parametro di configurazione Tipo di dati Descrizione
drop elenco di stringhe Un elenco di nomi di campi da eliminare prima di scrivere nella destinazione.
keep elenco di stringhe Un elenco di nomi di campi da conservare durante la scrittura nella destinazione. Gli altri campi vengono eliminati.
only string Il nome di esattamente un campo da utilizzare come record di primo livello da scrivere durante la scrittura nella destinazione. Tutti gli altri campi vengono eliminati. Questo campo deve essere di tipo riga.

Origini e sink supportati

Managed I/O supporta le seguenti origini e destinazioni.

Per saperne di più, consulta la sezione Connettori I/O gestiti nella documentazione di Apache Beam.