Questa pagina spiega come impostare le opzioni della pipeline per i job di Dataflow. Queste opzioni della pipeline configurano la modalità e la posizione di esecuzione della pipeline e le risorse che utilizza.
L'esecuzione della pipeline è separata dall'esecuzione del programma Apache Beam. Il programma Apache Beam che hai scritto crea una pipeline per l'esecuzione differita. Ciò significa che il programma genera una serie di passaggi che qualsiasi runner Apache Beam supportato può eseguire. I runner compatibili includono il runner Dataflow su Google Cloud e il runner diretto che esegue la pipeline direttamente in un ambiente locale.
Puoi passare i parametri a un job di Dataflow in fase di runtime. Per ulteriori informazioni sull'impostazione delle opzioni della pipeline in fase di runtime, consulta Configurare le opzioni della pipeline.
Utilizzare le opzioni della pipeline con gli SDK Apache Beam
Puoi utilizzare i seguenti SDK per impostare le opzioni della pipeline per i job di Dataflow:
- SDK Apache Beam per Python
- SDK Apache Beam per Java
- SDK Apache Beam per Go
Per utilizzare gli SDK, imposta il runner della pipeline e altri parametri di esecuzione utilizzando la classe PipelineOptions dell'SDK Apache Beam.
Esistono due metodi per specificare le opzioni della pipeline:
- Imposta le opzioni della pipeline in modo programmatico fornendo un elenco di opzioni della pipeline.
- Imposta le opzioni della pipeline direttamente nella riga di comando quando esegui il codice della pipeline.
Impostare le opzioni della pipeline in modo programmatico
Puoi impostare le opzioni della pipeline in modo programmatico creando e modificando un oggetto PipelineOptions.
Java
Crea un
PipelineOptions
oggetto utilizzando il metodo PipelineOptionsFactory.fromArgs.
Per un esempio, consulta la sezione Eseguire su Dataflow in questa pagina.
Python
Crea un
PipelineOptions
oggetto.
Per un esempio, consulta la sezione Eseguire su Dataflow in questa pagina.
Vai
L'impostazione delle opzioni della pipeline in modo programmatico utilizzando PipelineOptions non è supportata nell'SDK Apache Beam per Go. Utilizza gli argomenti della riga di comando Go.
Per un esempio, consulta la sezione Eseguire su Dataflow in questa pagina.
Impostare le opzioni della pipeline nella riga di comando
Puoi impostare le opzioni della pipeline utilizzando gli argomenti della riga di comando.
Java
La seguente sintassi di esempio proviene dalla WordCount pipeline nel
tutorial Java.
mvn -Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=PROJECT_ID \
--gcpTempLocation=gs://BUCKET_NAME/temp/ \
--output=gs://BUCKET_NAME/output \
--runner=DataflowRunner \
--region=REGION"
Sostituisci quanto segue:
PROJECT_ID: l'ID Google Cloud progettoBUCKET_NAME: il nome del bucket Cloud StorageREGION: una regione Dataflow,us-central1
Python
La seguente sintassi di esempio proviene dalla WordCount pipeline nel
tutorial Python.
python -m apache_beam.examples.wordcount \
--region DATAFLOW_REGION \
--input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://STORAGE_BUCKET/results/outputs \
--runner DataflowRunner \
--project PROJECT_ID \
--temp_location gs://STORAGE_BUCKET/tmp/
Sostituisci quanto segue:
DATAFLOW_REGION: la regione in cui vuoi eseguire il deployment del job di Dataflow, ad esempioeurope-west1Il flag
--regionesegue l'override della regione predefinita impostata nel server di metadati, nel client locale o nelle variabili di ambiente.STORAGE_BUCKET: il nome del bucket Cloud StoragePROJECT_ID: l' Google Cloud ID progetto
Vai
La seguente sintassi di esempio proviene dalla WordCount pipeline nel
tutorial Go.
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://BUCKET_NAME/results/outputs \
--runner dataflow \
--project PROJECT_ID \
--region DATAFLOW_REGION \
--staging_location gs://BUCKET_NAME/binaries/Sostituisci quanto segue:
BUCKET_NAME: il nome del bucket Cloud StoragePROJECT_ID: l' Google Cloud ID progettoDATAFLOW_REGION: La regione in cui vuoi eseguire il deployment del job di Dataflow. Ad esempio,europe-west1. Il flag--regionesegue l'override della regione predefinita impostata nel server di metadati, nel client locale o nelle variabili di ambiente.
Impostare le opzioni della pipeline sperimentali
Negli SDK Java, Python e Go, l'opzione della experiments
pipeline
abilita le funzionalità di Dataflow sperimentali o pre-GA.
Impostare in modo programmatico
Per impostare l'opzione experiments in modo programmatico, utilizza la seguente sintassi.
Java
Nell'
PipelineOptions
oggetto, includi l'opzione experiments utilizzando la seguente sintassi.
Questo esempio imposta le dimensioni del disco di avvio su 80 GB con il flag di esperimento.
options.setExperiments("streaming_boot_disk_size_gb=80")
Per un esempio che mostra come creare l'oggetto PipelineOptions, consulta la sezione
Eseguire su Dataflow
in questa pagina.
Python
Nell'
PipelineOptions
oggetto, includi l'opzione experiments utilizzando la seguente sintassi.
Questo esempio imposta le dimensioni del disco di avvio su 80 GB con il flag di esperimento.
beam_options = PipelineOptions(
beam_args,
experiments=['streaming_boot_disk_size_gb=80'])
Per un esempio che mostra come creare l'oggetto PipelineOptions, consulta la sezione
Eseguire su Dataflow
in questa pagina.
Vai
L'impostazione delle opzioni della pipeline in modo programmatico utilizzando PipelineOptions non è supportata nell'SDK Apache Beam per Go. Utilizza gli argomenti della riga di comando Go.
Impostare nella riga di comando
Per impostare l'opzione experiments nella riga di comando, utilizza la seguente sintassi.
Java
Questo esempio imposta le dimensioni del disco di avvio su 80 GB con il flag di esperimento.
--experiments=streaming_boot_disk_size_gb=80
Python
Questo esempio imposta le dimensioni del disco di avvio su 80 GB con il flag di esperimento.
--experiments=streaming_boot_disk_size_gb=80
Vai
Questo esempio imposta le dimensioni del disco di avvio su 80 GB con il flag di esperimento.
--experiments=streaming_boot_disk_size_gb=80
Impostare in un modello
Per abilitare una funzionalità sperimentale durante l'esecuzione di un modello Dataflow, utilizza il flag --additional-experiments.
Modello classico
gcloud dataflow jobs run JOB_NAME --additional-experiments=EXPERIMENT[,...]
Modello flessibile
gcloud dataflow flex-template run JOB_NAME --additional-experiments=EXPERIMENT[,...]
Accedere all'oggetto delle opzioni della pipeline
Quando crei l'oggetto Pipeline nel programma Apache Beam, passa
PipelineOptions. Quando il servizio Dataflow esegue la pipeline, invia una copia di PipelineOptions a ogni worker.
Java
Accedi a PipelineOptions all'interno di qualsiasi istanza DoFn della trasformazione ParDo utilizzando
il metodo ProcessContext.getPipelineOptions.
Python
Questa funzionalità non è supportata nell'SDK Apache Beam per Python.
Vai
Accedi alle opzioni della pipeline utilizzando beam.PipelineOptions.
Eseguire su Dataflow
Esegui il job sulle risorse gestite Google Cloud utilizzando il servizio runner Dataflow. L'esecuzione della pipeline con Dataflow crea un job di Dataflow, che utilizza le risorse di Compute Engine e Cloud Storage nel tuo Google Cloud progetto. Per informazioni sulle autorizzazioni di Dataflow, consulta Sicurezza e autorizzazioni di Dataflow.
I job di Dataflow utilizzano Cloud Storage per archiviare i file temporanei durante l'esecuzione della pipeline. Per evitare l'addebito per costi di archiviazione non necessari, disattiva la funzionalità di eliminazione temporanea sui bucket utilizzati dai job di Dataflow per l'archiviazione temporanea. Per ulteriori informazioni, consulta Disattivare l'eliminazione temporanea.
Impostare le opzioni obbligatorie
Per eseguire la pipeline utilizzando Dataflow, imposta le seguenti opzioni della pipeline:
Java
project: l'ID progetto. Google Cloudrunner: il runner della pipeline che esegue la pipeline. Per Google Cloud l'esecuzione, deve essereDataflowRunner.gcpTempLocation: un percorso di Cloud Storage in cui Dataflow esegue lo staging della maggior parte dei file temporanei. Il bucket specificato deve già esistere.Se non specifichi
gcpTempLocation, Dataflow utilizza il valore dell'opzionetempLocation. Se non specifichi una di queste opzioni, Dataflow crea un nuovo bucket Cloud Storage.
Python
project: l'ID progetto. Google Cloudregion: la regione per il job di Dataflow.runner: il runner della pipeline che esegue la pipeline. Per Google Cloud l'esecuzione, deve essereDataflowRunner.temp_location: un percorso di Cloud Storage in cui Dataflow esegue lo staging dei file di job temporanei creati durante l'esecuzione della pipeline.
Vai
project: l'ID progetto. Google Cloudregion: la regione per il job di Dataflow.runner: il runner della pipeline che esegue la pipeline. Per Google Cloud l'esecuzione, deve esseredataflow.staging_location: un percorso di Cloud Storage in cui Dataflow esegue lo staging dei file di job temporanei creati durante l'esecuzione della pipeline.
Impostare le opzioni della pipeline in modo programmatico
Il seguente codice di esempio mostra come creare una pipeline impostando in modo programmatico il runner e altre opzioni obbligatorie per eseguire la pipeline utilizzando Dataflow.
Java
// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()
# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
beam_args,
runner='DataflowRunner',
project='my-project-id',
job_name='unique-job-name',
temp_location='gs://my-bucket/temp',
region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']
# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
pass # build your pipeline here.
Vai
L'SDK Apache Beam per Go utilizza gli argomenti della riga di comando Go. Utilizza
flag.Set() per impostare i valori dei flag.
// Use the Go flag package to parse custom options.
flag.Parse()
// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")
beam.Init()
// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()
Dopo aver creato la pipeline, specifica tutte le letture, le trasformazioni e le scritture della pipeline ed esegui la pipeline.
Utilizzare le opzioni della pipeline dalla riga di comando
Il seguente esempio mostra come utilizzare le opzioni della pipeline specificate nella riga di comando. Questo esempio non imposta le opzioni della pipeline in modo programmatico.
Java
// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
Utilizza il modulo argparse di Python per analizzare le opzioni della riga di comando.
# Use Python argparse module to parse custom arguments
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# For more details on how to use argparse, take a look at:
# https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
'--input-file',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()
# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| 'Read files' >> beam.io.ReadFromText(args.input_file)
| 'Write files' >> beam.io.WriteToText(args.output_path))
Vai
Utilizza il pacchetto flagGo per analizzare
le opzioni della riga di comando. Devi analizzare le opzioni prima di chiamare beam.Init(). In questo esempio, output è un'opzione della riga di comando.
// Define configuration options
var (
output = flag.String("output", "", "Output file (required).")
)
// Parse options before beam.Init()
flag.Parse()
beam.Init()
// Input validation must be done after beam.Init()
if *output == "" {
log.Fatal("No output provided!")
}
p := beam.NewPipeline()
Dopo aver creato la pipeline, specifica tutte le letture, le trasformazioni e le scritture della pipeline ed esegui la pipeline.
Controllare le modalità di esecuzione
Quando un programma Apache Beam esegue una pipeline su un servizio come Dataflow, il programma può eseguire la pipeline in modo asincrono o può bloccare fino al completamento della pipeline. Puoi modificare questo comportamento seguendo le indicazioni riportate di seguito.
Java
Quando un programma Apache Beam Java esegue una pipeline su un servizio come Dataflow, in genere viene eseguito in modo asincrono. Per eseguire una pipeline e attendere il completamento del job, imposta DataflowRunner come runner della pipeline e chiama esplicitamente pipeline.run().waitUntilFinish().
Quando utilizzi DataflowRunner e chiami waitUntilFinish() sull'oggetto
PipelineResult restituito da pipeline.run(), la pipeline viene eseguita
su Google Cloud ma il codice locale attende il completamento del job cloud e
restituisce l'oggetto DataflowPipelineJob finale. Durante l'esecuzione del job, il servizio Dataflow stampa gli aggiornamenti dello stato del job e i messaggi della console durante l'attesa.
Python
Quando un programma Apache Beam Python esegue una pipeline su un servizio come Dataflow, in genere viene eseguito in modo asincrono. Per bloccare fino al completamento della pipeline, utilizza il metodo wait_until_finish() dell'oggetto PipelineResult restituito dal metodo run() del runner.
Vai
Quando un programma Apache Beam Go esegue una pipeline su Dataflow, per impostazione predefinita è sincrono e si blocca fino al completamento della pipeline. Se non vuoi bloccare, hai due opzioni:
Avvia il job in una routine Go.
go func() { pr, err := beamx.Run(ctx, p) if err != nil { // Handle the error. } // Send beam.PipelineResult into a channel. results <- pr }() // Do other operations while the pipeline runs.Utilizza il flag della riga di comando
--async, che si trova neljoboptspacchetto.
Per visualizzare i dettagli dell'esecuzione, monitorare l'avanzamento e verificare lo stato di completamento del job, utilizza l' interfaccia di monitoraggio di Dataflow o l' interfaccia a riga di comando di Dataflow.
Utilizzare le origini di streaming
Java
Se la pipeline legge da un'origine dati illimitata, ad esempio Pub/Sub, viene eseguita automaticamente in modalità di streaming.
Python
Se la pipeline utilizza un'origine dati illimitata, ad esempio Pub/Sub, devi impostare l'opzione streaming su true.
Vai
Se la pipeline legge da un'origine dati illimitata, ad esempio Pub/Sub, viene eseguita automaticamente in modalità di streaming.
Per impostazione predefinita, i job di streaming utilizzano un tipo di macchina Compute Engine con 2 o più vCPU.
Eseguire localmente
Anziché eseguire la pipeline sulle risorse cloud gestite, puoi scegliere di eseguirla localmente. L'esecuzione locale presenta alcuni vantaggi per il test, il debug o l'esecuzione della pipeline su piccoli set di dati. Ad esempio, l'esecuzione locale rimuove la dipendenza dal servizio Dataflow remoto e dal progetto associato Google Cloud .
Quando utilizzi l'esecuzione locale, devi eseguire la pipeline con set di dati sufficientemente piccoli da poter essere contenuti nella memoria locale. Puoi creare un piccolo set di dati in memoria utilizzando una trasformazione Create oppure puoi utilizzare una trasformazione Read per lavorare con piccoli file locali o remoti. In genere, l'esecuzione locale offre un modo più rapido e semplice per eseguire test e debug con meno dipendenze esterne, ma è limitata dalla memoria disponibile nell'ambiente locale.
Il seguente codice di esempio mostra come creare una pipeline che viene eseguita nell'ambiente locale.
Java
// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()
# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| beam.io.ReadFromText(args.input)
| beam.io.WriteToText(args.output))
Vai
// Parse options before beam.Init()
flag.Parse()
beam.Init()
p := beam.NewPipeline()
Dopo aver creato la pipeline, eseguila.
Creare opzioni della pipeline personalizzate
Puoi aggiungere le tue opzioni personalizzate oltre a PipelineOptions standard. La riga di comando di Apache Beam può anche analizzare le opzioni personalizzate utilizzando gli argomenti della riga di comando specificati nello stesso formato.
Java
Per aggiungere le tue opzioni, definisci un'interfaccia con metodi getter e setter per ogni opzione, come nell'esempio seguente:
public interface MyOptions extends PipelineOptions {
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
Python
Per aggiungere le tue opzioni, utilizza il metodo add_argument() (che si comporta
esattamente come il modulo argparse standard di Python
),
come nell'esempio seguente:
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--input')
parser.add_argument('--output')
Vai
Per aggiungere le tue opzioni, utilizza il pacchetto di flag Go come mostrato nell' esempio seguente:
var (
input = flag.String("input", "", "")
output = flag.String("output", "", "")
)
Puoi anche specificare una descrizione, che viene visualizzata quando un utente passa --help come argomento della riga di comando, e un valore predefinito.
Java
Imposta la descrizione e il valore predefinito utilizzando le annotazioni, come segue:
public interface MyOptions extends PipelineOptions {
@Description("My custom command line argument.")
@Default.String("DEFAULT")
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
Ti consigliamo di registrare l'interfaccia con PipelineOptionsFactory
e poi di passarla quando crei l'oggetto PipelineOptions. Quando registri l'interfaccia con PipelineOptionsFactory, --help può trovare l'interfaccia delle opzioni personalizzate e aggiungerla all'output del comando --help. PipelineOptionsFactory verifica che le opzioni personalizzate siano compatibili con tutte le altre opzioni registrate.
Il seguente codice di esempio mostra come registrare l'interfaccia delle opzioni personalizzate con PipelineOptionsFactory:
PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(MyOptions.class);
Ora la pipeline può accettare --myCustomOption=value come argomento della riga di comando.
Python
Imposta la descrizione e il valore predefinito come segue:
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output', required=True, help='The path prefix for output files.')
Vai
Imposta la descrizione e il valore predefinito come segue:
var (
input = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)