L'I/O gestito consente a Dataflow di gestire connettori I/O specifici utilizzati nelle pipeline Apache Beam. Managed I/O semplifica la gestione delle pipeline che si integrano con origini e sink supportati.
L'I/O gestito è costituito da due componenti che funzionano insieme:
Una trasformazione Apache Beam che fornisce un'API comune per la creazione di connettori I/O (origini e sink).
Un servizio Dataflow che gestisce questi connettori I/O per tuo conto, inclusa la possibilità di eseguirne l'upgrade indipendentemente dalla versione di Apache Beam.
I vantaggi dell'I/O gestito includono:
Upgrade automatici. Dataflow esegue automaticamente l'upgrade dei connettori I/O gestiti nella pipeline. Ciò significa che la tua pipeline riceve correzioni di sicurezza, miglioramenti delle prestazioni e correzioni di bug per questi connettori, senza richiedere modifiche al codice. Per saperne di più, consulta Upgrade automatici.
API coerente. Tradizionalmente, i connettori I/O in Apache Beam hanno API distinte e ogni connettore è configurato in modo diverso. Managed I/O fornisce una singola API di configurazione che utilizza proprietà chiave-valore, il che si traduce in un codice della pipeline più semplice e coerente. Per ulteriori informazioni, consulta API di configurazione.
Requisiti
I seguenti SDK supportano I/O gestito:
- Apache Beam SDK per Java versione 2.58.0 o successive.
- SDK Apache Beam per Python versione 2.61.0 o successive.
Il servizio di backend richiede Dataflow Runner v2. Se Runner v2 non è abilitato, la pipeline viene comunque eseguita, ma non usufruisce dei vantaggi del servizio I/O gestito.
Upgrade automatici
Le pipeline Dataflow con connettori I/O gestiti utilizzano automaticamente l'ultima versione affidabile del connettore. Gli upgrade automatici si verificano nei seguenti punti del ciclo di vita del job:
Invio di un lavoro. Quando invii un job batch o di streaming, Dataflow utilizza la versione più recente del connettore I/O gestito che è stata testata e funziona correttamente.
Upgrade in sequenza. Per i job di streaming, Dataflow esegue l'upgrade dei connettori I/O gestiti nelle pipeline in esecuzione man mano che diventano disponibili nuove versioni. Non devi preoccuparti di aggiornare manualmente il connettore o la versione di Apache Beam della pipeline.
Per impostazione predefinita, gli upgrade continui vengono eseguiti entro un periodo di 30 giorni, ovvero circa ogni 30 giorni. Puoi regolare la finestra o disattivare gli upgrade continui in base al job. Per saperne di più, vedi Impostare la finestra di upgrade progressivo.
Una settimana prima dell'upgrade, Dataflow scrive un messaggio di notifica nei log dei messaggi del job.
Job di sostituzione. Per i job di streaming, Dataflow verifica la presenza di aggiornamenti ogni volta che avvii un job di sostituzione e utilizza automaticamente l'ultima versione nota e funzionante. Dataflow esegue questo controllo anche se non modifichi alcun codice nel job di sostituzione.
Il seguente diagramma mostra la procedura di upgrade. L'utente crea una pipeline Apache Beam utilizzando la versione dell'SDK X. Dataflow esegue l'upgrade della versione di Managed I/O all'ultima versione supportata. L'upgrade viene eseguito quando l'utente invia il job, dopo la finestra di upgrade in sequenza o quando l'utente invia un job di sostituzione.

La procedura di upgrade aggiunge circa due minuti al tempo di avvio del primo job (per progetto) che utilizza I/O gestito e può richiedere circa mezzo minuto per i job successivi. Per gli upgrade continui, il servizio Dataflow
avvia un
job di sostituzione. Ciò può
comportare tempi di inattività temporanei per la pipeline, poiché il pool di lavoratori esistente viene
arrestato e viene avviato un nuovo pool di lavoratori. Per controllare lo stato delle operazioni di I/O gestite, cerca le voci di log che includono la stringa "Managed Transform(s)".
Imposta la finestra di aggiornamento in sequenza
Per specificare la finestra di upgrade per un job Dataflow di streaming, imposta
l'opzione di servizio managed_transforms_rolling_upgrade_window uguale al numero
di giorni. Il valore deve essere compreso tra 10 e 90 giorni inclusi.
Java
--dataflowServiceOptions=managed_transforms_rolling_upgrade_window=DAYS
Python
--dataflow_service_options=managed_transforms_rolling_upgrade_window=DAYS
gcloud
Utilizza il comando gcloud dataflow jobs run con l'opzione additional-experiments. Se utilizzi un modello flessibile che
utilizza Managed I/O, utilizza il
comando gcloud dataflow flex-template run.
--additional-experiments=managed_transforms_rolling_upgrade_window=DAYS
Per disattivare gli upgrade continui, imposta l'opzione di servizio managed_transforms_rolling_upgrade_window su never. Puoi comunque attivare un aggiornamento avviando un job di sostituzione.
Java
--dataflowServiceOptions=managed_transforms_rolling_upgrade_window=never
Python
--dataflow_service_options=managed_transforms_rolling_upgrade_window=never
Go
--dataflow_service_options=managed_transforms_rolling_upgrade_window=never
gcloud
Utilizza il comando gcloud dataflow jobs run con l'opzione additional-experiments. Se utilizzi modelli flessibili, utilizza il comando
gcloud dataflow flex-template run.
--additional-experiments=managed_transforms_rolling_upgrade_window=never
API di configurazione
Managed I/O è una trasformazione Apache Beam chiavi in mano che fornisce un'API coerente per configurare origini e sink.
Java
Per creare qualsiasi origine o sink supportato da Managed I/O, utilizza la
classe Managed. Specifica l'origine o il sink da istanziare
e passa un insieme di parametri di configurazione, simili ai seguenti:
Map config = ImmutableMap.<String, Object>builder()
.put("config1", "abc")
.put("config2", 1);
pipeline.apply(Managed.read(/*Which source to read*/).withConfig(config))
.getSinglePCollection();
Puoi anche passare i parametri di configurazione come file YAML. Per un esempio di codice completo, vedi Lettura da Apache Iceberg.
Python
Importa il modulo apache_beam.transforms.managed e chiama il metodo managed.Read o managed.Write. Specifica l'origine o il sink da istanziare e passa un insieme di parametri di configurazione, simile al seguente:
pipeline
| beam.managed.Read(
beam.managed.SOURCE, # Example: beam.managed.KAFKA
config={
"config1": "abc",
"config2": 1
}
)
Puoi anche passare i parametri di configurazione come file YAML. Per un esempio di codice completo, consulta Lettura da Apache Kafka.
Destinazioni dinamiche
Per alcuni sink, il connettore I/O gestito può selezionare dinamicamente una destinazione in base ai valori dei campi nei record in entrata.
Per utilizzare le destinazioni dinamiche, fornisci una stringa di modello per la destinazione. La
stringa del modello può includere nomi di campi tra parentesi graffe, ad esempio
"tables.{field1}". In fase di runtime, il connettore sostituisce il valore del
campo per ogni record in entrata, per determinare la destinazione del record.
Ad esempio, supponiamo che i tuoi dati abbiano un campo denominato airport. Potresti impostare la
destinazione su "flights.{airport}". Se airport=SFO, il record viene scritto
in flights.SFO. Per i campi nidificati, utilizza la notazione con il punto. Ad esempio:
{top.middle.nested}.
Per un codice di esempio che mostra come utilizzare le destinazioni dinamiche, consulta Scrivere con destinazioni dinamiche.
Filtri
Potresti voler filtrare determinati campi prima che vengano scritti nella tabella di destinazione. Per i sink che supportano le destinazioni dinamiche, puoi utilizzare
il parametro drop, keep o only a questo scopo. Questi parametri ti consentono di includere i metadati di destinazione nei record di input, senza scriverli nella destinazione.
Puoi impostare al massimo uno di questi parametri per un determinato sink.
| Parametro di configurazione | Tipo di dati | Descrizione |
|---|---|---|
drop |
elenco di stringhe | Un elenco di nomi di campi da eliminare prima di scrivere nella destinazione. |
keep |
elenco di stringhe | Un elenco di nomi di campi da conservare durante la scrittura nella destinazione. Gli altri campi vengono eliminati. |
only |
string | Il nome di esattamente un campo da utilizzare come record di primo livello da scrivere durante la scrittura nella destinazione. Tutti gli altri campi vengono eliminati. Questo campo deve essere di tipo riga. |
Origini e sink supportati
Managed I/O supporta le seguenti origini e destinazioni.
Per saperne di più, consulta la sezione Connettori I/O gestiti nella documentazione di Apache Beam.