Neste documento, descrevemos como executar um pipeline do Dataflow usando um contêiner personalizado.
Para informações sobre como criar a imagem do contêiner, consulte Criar imagens de contêiner personalizadas para o Dataflow.
Ao executar o pipeline, inicie-o usando o SDK do Apache Beam com a mesma versão de idioma e versão do SDK na imagem de contêiner personalizada. Isso evita erros inesperados de SDKs ou dependências incompatíveis.
Testar localmente
Antes de executar o pipeline no Dataflow, recomendamos testar a imagem do contêiner localmente, o que permite testes e depuração mais rápidos.
Para saber mais sobre o uso específico do Apache Beam, consulte o guia do Apache Beam Como executar pipelines com imagens de contêiner personalizadas.
Teste básico com PortableRunner
Para verificar se as imagens de contêiner remotas podem ser extraídas e executar um pipeline
simples, use o PortableRunner do Apache Beam. Quando você usa o
PortableRunner, o envio do job ocorre no ambiente local e a
execução de DoFn acontece no ambiente do Docker.
Quando você usa GPUs, o contêiner do Docker pode não ter acesso a elas. Para testar seu contêiner com GPUs, use o executor direto e siga as etapas para testar uma imagem de contêiner em uma VM autônoma com GPUs na página Depurar com um VM independente da página "Usar GPUs".
Veja a seguir um exemplo de pipeline:
Java
mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain \
-Dexec.args="--runner=PortableRunner \
--jobEndpoint=REGION \
--defaultEnvironmentType=DOCKER \
--defaultEnvironmentConfig=IMAGE_URI \
--inputFile=INPUT_FILE \
--output=OUTPUT_FILE"Python
python path/to/my/pipeline.py \
--runner=PortableRunner \
--job_endpoint=REGION \
--environment_type=DOCKER \
--environment_config=IMAGE_URI \
--input=INPUT_FILE \
--output=OUTPUT_FILEGo
go path/to/my/pipeline.go \
--runner=PortableRunner \
--job_endpoint=REGION \
--environment_type=DOCKER \
--environment_config=IMAGE_URI \
--input=INPUT_FILE \
--output=OUTPUT_FILESubstitua:
REGION: a região do serviço do job a ser usada, na forma de endereço e porta. Por exemplo,localhost:3000. Useembedpara executar um serviço de job no processo.IMAGE_URI: o URI da imagem do contêiner personalizado.INPUT_FILE: um arquivo de entrada que pode ser lido como um arquivo de texto. Esse arquivo precisa ser acessível pelo SDK utilizando a imagem do contêiner
, pré-carregada na imagem do contêiner ou em um arquivo remoto.OUTPUT_FILE: um caminho de arquivo para gravar a saída. Esse caminho é remoto ou local no contêiner.
Quando o pipeline for concluído, revise os registros do console para verificar se o pipeline foi concluído e se a imagem remota, especificada por IMAGE_URI, é usada.
Depois de executar o pipeline, os arquivos salvos no contêiner não estão no sistema de arquivos local
e o contêiner é interrompido. É possível copiar arquivos do sistema de arquivos de contêiner interrompido usando docker cp.
Como alternativa:
- Forneça saídas para um sistema de arquivos remoto, como o Cloud Storage. Talvez seja necessário configurar manualmente o acesso para fins de teste, inclusive para arquivos de credenciais ou Application Default Credentials.
- Para uma depuração rápida, adicione um registro temporário.
Usar o Direct Runner
Para testes locais mais detalhados da imagem do contêiner e do pipeline, use o Direct Runner do Apache Beam.
É possível verificar o pipeline separadamente do contêiner testando-o em um ambiente local correspondente à imagem do contêiner ou iniciando o pipeline em um contêiner em execução.
Java
docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/# mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain ...
Python
docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/# python path/to/my/pipeline.py ...
Go
docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/# go path/to/my/pipeline.go ...
Substitua IMAGE_URI pelo URI da imagem do contêiner personalizado.
Os exemplos pressupõem que todos os arquivos do pipeline (inclusive o próprio pipeline) estejam no
próprio contêiner personalizado, tenham sido montados em um sistema de arquivos local ou sejam
remotos e acessíveis pelo Apache Beam e pelo contêiner. Por exemplo, para usar o Maven (mvn)
para executar o exemplo Java anterior, o Maven e as dependências dele
precisam ser testados no contêiner. Para mais informações, consulte
Armazenamento e
docker run
na documentação do Docker.
O objetivo do teste no Direct Runner é testar o pipeline
no ambiente de contêiner personalizado, não testar a execução do contêiner
com o ENTRYPOINT padrão. Modifique o ENTRYPOINT (por exemplo, docker run --entrypoint ...)
para executar diretamente seu pipeline ou permitir
comandos de execução manual no contêiner.
Se você depende de uma configuração específica baseada na execução do contêiner no Compute Engine, é possível executá-lo diretamente em uma VM do Compute Engine. Para mais informações, consulte Contêineres no Compute Engine.
Iniciar o job do Dataflow
Ao iniciar o pipeline do Apache Beam no Dataflow, especifique o caminho para a imagem do contêiner. Não use a tag :latest com suas imagens personalizadas. Marque seus builds com uma data ou um identificador exclusivo. Se algo der errado, usar esse tipo de tag possibilita reverter a execução do pipeline para uma configuração de trabalho conhecida anteriormente e permite uma inspeção de alterações.
Java
Use --sdkContainerImage para especificar uma imagem de contêiner do cliente para o ambiente de execução do Java.
Use --experiments=use_runner_v2 para ativar o Runner v2.
Python
Se estiver usando a versão 2.30.0 ou posterior do SDK, use a opção --sdk_container_image do pipeline para especificar uma imagem de contêiner do SDK.
Para versões mais recentes do SDK, use a opção --worker_harness_container_image do pipeline para especificar o local da imagem do contêiner a ser usada para o arcabouço do worker.
Os contêineres personalizados são compatíveis somente com o Dataflow Runner v2. Se você estiver iniciando um pipeline Python em lote, defina a sinalização --experiments=use_runner_v2.
Se você estiver iniciando um pipeline de streaming do Python, não é necessário especificar o experimento, porque os pipelines de streaming do Python usam o Runner v2 por padrão.
Go
Se estiver usando a versão 2.40.0 ou posterior do SDK, use a opção --sdk_container_image do pipeline para especificar uma imagem de contêiner do SDK.
Para versões mais recentes do SDK, use a opção --worker_harness_container_image do pipeline para especificar o local da imagem do contêiner a ser usada para o arcabouço do worker.
Os contêineres personalizados são compatíveis com todas as versões do SDK do Go porque usam o Dataflow Runner v2 por padrão.
No exemplo a seguir, demonstramos como iniciar o exemplo WordCount em lote
com um contêiner personalizado.
Java
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=DataflowRunner \
--inputFile=INPUT_FILE \
--output=OUTPUT_FILE \
--project=PROJECT_ID \
--region=REGION \
--gcpTempLocation=TEMP_LOCATION \
--diskSizeGb=DISK_SIZE_GB \
--experiments=use_runner_v2 \
--sdkContainerImage=IMAGE_URI"Python
Como usar o SDK do Apache Beam para Python versão 2.30.0 ou posterior:
python -m apache_beam.examples.wordcount \
--input=INPUT_FILE \
--output=OUTPUT_FILE \
--project=PROJECT_ID \
--region=REGION \
--temp_location=TEMP_LOCATION \
--runner=DataflowRunner \
--disk_size_gb=DISK_SIZE_GB \
--experiments=use_runner_v2 \
--sdk_container_image=IMAGE_URIGo
wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://<your-gcs-bucket>/counts \
--runner dataflow \
--project your-gcp-project \
--region your-gcp-region \
--temp_location gs://<your-gcs-bucket>/tmp/ \
--staging_location gs://<your-gcs-bucket>/binaries/ \
--sdk_container_image=IMAGE_URISubstitua:
INPUT_FILE: o caminho de entrada do Cloud Storage lido pelo Dataflow ao executar o exemploOUTPUT_FILE: o caminho de saída do Cloud Storage gravado pelo pipeline de exemplo. Esse arquivo contém a contagem de palavras.PROJECT_ID: o ID do seu projeto do Google Cloud.REGION: a região onde o job do Dataflow será implantado.TEMP_LOCATION: o caminho o Cloud Storage para o Dataflow organizar os arquivos de job temporários criados durante a execução do pipeline.DISK_SIZE_GB: opcional. Se o contêiner for grande, aumente o tamanho do disco de inicialização padrão para evitar a falta de espaço em disco.IMAGE_URI: o URI da imagem do contêiner personalizado do SDK. Sempre use uma tag ou um SHA de contêiner com controle de versões. Não use a tag:latestou uma tag mutável.
Streaming de imagens de contêiner
É possível melhorar a inicialização e a latência de escalonamento automático do pipeline do Dataflow ativando o streaming de imagens. Esse recurso é útil se o contêiner personalizado tiver conteúdo desnecessário ou não usar todo o conteúdo em cada etapa. Por exemplo, seu contêiner pode ter conteúdo incidental, como código de biblioteca baseado em CPU para inferência baseada em GPU. Da mesma forma, você pode ter um contêiner que executa pipelines de ML com vários modelos que usam apenas um modelo em cada etapa. Assim, o conteúdo não é necessário de uma só vez. Ativar o streaming de imagens ajudaria a melhorar a latência nesses casos.
Java
--dataflowServiceOptions=enable_image_streaming
Python
--dataflow_service_options=enable_image_streaming
Go
--dataflow_service_options=enable_image_streaming
O streaming de imagens vai buscar partes do seu contêiner personalizado conforme o código do pipeline precisa delas, em vez de fazer o download de todo o contêiner antecipadamente. As partes do seu contêiner que não são usadas nunca precisam ser baixadas.
Você precisa ativar a API Container File System para aproveitar o streaming de imagens.