Airflow gerenciado (Geração 3) | Airflow gerenciado (Geração 2) | Airflow gerenciado (Geração 1 legada)
Nesta página, descrevemos como usar o Airflow gerenciado (Geração 2) para executar cargas de trabalho do Serviço Gerenciado para Apache Spark em Google Cloud.
Os exemplos nas seções a seguir mostram como usar operadores para gerenciar cargas de trabalho em lote do Serviço Gerenciado para Apache Spark. Esses operadores são usados em DAGs que criam, excluem, listam e recebem uma carga de trabalho em lote do Serviço Gerenciado para Apache Spark:
Crie DAGs para operadores que funcionam com cargas de trabalho em lote do Serviço Gerenciado para Apache Spark:
Crie DAGs que usam contêineres personalizados e o metastore do Dataproc.
Configure o servidor de histórico permanente para esses DAGs.
Antes de começar
Ative a API Dataproc:
Console
Ative a API do Serviço Gerenciado para Apache Spark.
Funções necessárias para ativar APIs
Para ativar as APIs, é necessário ter o papel do IAM de administrador de uso do serviço (
roles/serviceusage.serviceUsageAdmin), que contém a permissãoserviceusage.services.enable. Saiba como conceder papéis.gcloud
Ative a API do Serviço Gerenciado para Apache Spark:
Funções necessárias para ativar APIs
Para ativar as APIs, é necessário ter o papel do IAM de administrador de Service Usage role (
roles/serviceusage.serviceUsageAdmin), que contém aserviceusage.services.enablepermissão. Saiba como conceder papéis.gcloud services enable dataproc.googleapis.com
Selecione o local do arquivo de carga de trabalho em lote. É possível usar qualquer uma das seguintes opções:
- Crie um bucket do Cloud Storage que armazene esse arquivo.
- Use o bucket do ambiente. Como não é necessário sincronizar esse arquivo com o Airflow, é possível criar uma subpasta separada fora das pastas
/dagsou/data. Por exemplo,/batches. - Use um bucket atual.
Configurar arquivos e variáveis do Airflow
Esta seção demonstra como configurar arquivos e variáveis do Airflow para este tutorial.
Fazer upload de um arquivo de carga de trabalho de ML do Serviço Gerenciado para Apache Spark para um bucket
A carga de trabalho neste tutorial executa um script do pyspark:
Salve qualquer script do pyspark em um arquivo local chamado
spark-job.py. Por exemplo, é possível usar o script de exemplo do pyspark.Faça upload do arquivo para o local selecionado em Antes de começar.
Definir variáveis do Airflow
Os exemplos nas seções a seguir usam variáveis do Airflow. Você define valores para essas variáveis no Airflow. Em seguida, o código DAG pode acessar esses valores.
Os exemplos neste tutorial usam as seguintes variáveis do Airflow. É possível defini-las conforme necessário, dependendo do exemplo usado.
Defina as seguintes variáveis do Airflow para uso no código DAG:
project_id: ID do projeto.bucket_name: URI de um bucket em que o arquivo Python principal da carga de trabalho (spark-job.py) está localizado. Você selecionou esse local em Antes de começar.phs_cluster: nome do cluster do servidor de histórico permanente. Você define essa variável ao criar um servidor de histórico permanente.image_name: nome e tag da imagem do contêiner personalizada (image:tag). Você define essa variável ao usar a imagem do contêiner personalizada com DataprocCreateBatchOperator.metastore_cluster: nome do serviço do metastore do Dataproc. Você define essa variável ao usar o serviço do metastore do Dataproc com DataprocCreateBatchOperator.region_name: região em que o serviço do metastore do Dataproc está localizado. Você define essa variável ao usar o serviço do metastore do Dataproc com DataprocCreateBatchOperator.
Use o Google Cloud console e a interface do Airflow para definir cada variável do Airflow
No Google Cloud console, acesse a página Ambientes.
Na lista de ambientes, clique no link Airflow do seu ambiente. A interface do Airflow é aberta.
Na interface do Airflow, selecione Admin > Variables.
Clique em Add a new record.
Especifique o nome da variável no campo Key e defina o valor dela no campo Val.
Clique em Salvar.
Criar um servidor de histórico permanente
Use um servidor de histórico permanente (PHS, na sigla em inglês) para conferir os arquivos de histórico do Spark das cargas de trabalho em lote:
- Criar um servidor de histórico permanente.
- Verifique se você especificou o nome do cluster do PHS na
phs_clustervariável do Airflow.
DataprocCreateBatchOperator
O DAG a seguir inicia uma carga de trabalho em lote do Serviço Gerenciado para Apache Spark.
Para mais informações sobre os argumentos DataprocCreateBatchOperator, consulte
o código-fonte do operador.
Para mais informações sobre os atributos que podem ser transmitidos no batch
parâmetro de DataprocCreateBatchOperator, consulte a
descrição da classe Batch.
Usar a imagem do contêiner personalizada com DataprocCreateBatchOperator
O exemplo a seguir mostra como usar uma imagem de contêiner personalizada para executar cargas de trabalho. É possível usar um contêiner personalizado, por exemplo, para adicionar dependências do Python não fornecidas pela imagem de contêiner padrão.
Para usar uma imagem de contêiner personalizada:
Crie uma imagem de contêiner personalizada e faça upload dela para o Container Registry.
Especifique a imagem na
image_namevariável do Airflow.Use DataprocCreateBatchOperator com sua imagem personalizada:
Usar o serviço do metastore do Dataproc com DataprocCreateBatchOperator
Para usar um serviço Dataproc Metastore de um DAG:
Verifique se o serviço do metastore já foi iniciado.
Para saber como iniciar um serviço do metastore, consulte Ativar e desativar o metastore do Dataproc.
Para informações detalhadas sobre o operador de lote para criar a configuração, consulte PeripheralsConfig.
Depois que o serviço do metastore estiver em funcionamento, especifique o nome dele em a variável
metastore_clustere a região emregion_namea variável do Airflow.Use o serviço do metastore no DataprocCreateBatchOperator:
DataprocDeleteBatchOperator
É possível usar DataprocDeleteBatchOperator para excluir um lote com base no ID da carga de trabalho.
DataprocListBatchesOperator
DataprocDeleteBatchOperator lista os lotes que existem em um determinado project_id e região.
DataprocGetBatchOperator
DataprocGetBatchOperator busca uma carga de trabalho em lote específica.