Transferir dados de outros serviços com operadores de transferência do Google

Airflow gerenciado (Geração 3) | Airflow gerenciado (Geração 2) | Airflow gerenciado (Geração 1 legada)

Esta página demonstra como transferir dados de outros serviços com os operadores de transferência do Google nos seus DAGs.

Sobre os operadores de transferência do Google

Os operadores de transferência do Google são um conjunto de operadores do Airflow que podem ser usados para extrair dados de outros serviços para o Google Cloud.

Este guia mostra operadores do Armazenamento de compartilhamento de arquivos do Azure e do Amazon S3 que funcionam com o Cloud Storage. Há muitos outros operadores de transferência que funcionam com serviços dentro Google Cloud e fora do Google Cloud.

Amazon S3 para Cloud Storage

Esta seção demonstra como sincronizar dados do Amazon S3 com um bucket do Cloud Storage.

Instalar o pacote do provedor da Amazon

O pacote apache-airflow-providers-amazon contém os tipos de conexão e a funcionalidade que interage com o Amazon S3. Instale esse pacote PyPI no seu ambiente.

Configurar uma conexão com o Amazon S3

O pacote do provedor da Amazon oferece um tipo de conexão para o Amazon S3. Crie uma conexão desse tipo. A conexão do Cloud Storage, chamada google_cloud_default, já está configurada no seu ambiente.

Configure uma conexão com o Amazon S3 da seguinte maneira:

  1. Na interface do Airflow, acesse Admin > Connections.
  2. Criar uma nova conexão.
  3. Selecione Amazon S3 como o tipo de conexão.
  4. O exemplo a seguir usa uma conexão chamada aws_s3. Você pode usar esse nome ou qualquer outro para a conexão.
  5. Especifique os parâmetros de conexão conforme descrito na documentação do Airflow para a conexão do Amazon Web Services. Por exemplo, para configurar uma conexão com chaves de acesso da AWS, gere uma chave de acesso para sua conta na AWS e forneça o ID da chave de acesso da AWS como um login e a chave de acesso secreta da AWS como uma senha para a conexão.

Transferir dados do Amazon S3

Se você quiser operar nos dados sincronizados mais tarde em outro DAG ou tarefa, extraia-os para a pasta /data do bucket do seu ambiente. Essa pasta é sincronizada com outros workers do Airflow para que as tarefas no seu DAG possam operar nela.

O exemplo de DAG a seguir faz o seguinte:

  • Sincroniza o conteúdo do diretório /data-for-gcs de um bucket do S3 com a pasta /data/from-s3/data-for-gcs/ no bucket do seu ambiente.
  • Espera dois minutos para que os dados sejam sincronizados com todos os workers do Airflow no seu ambiente.
  • Gera a lista de arquivos nesse diretório usando o comando ls. Substitua essa tarefa por outros operadores do Airflow que funcionam com seus dados.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_aws_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule=None,
) as dag:

    transfer_dir_from_s3 = S3ToGCSOperator(
        task_id='transfer_dir_from_s3',
        aws_conn_id='aws_s3',
        prefix='data-for-gcs',
        bucket='example-s3-bucket-transfer-operators',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')


    transfer_dir_from_s3 >> sleep_2min >> print_dir_files

Azure FileShare para Cloud Storage

Esta seção demonstra como sincronizar dados do Azure FileShare com um bucket do Cloud Storage.

Instalar o pacote do provedor do Microsoft Azure

O pacote apache-airflow-providers-microsoft-azure contém os tipos de conexão e a funcionalidade que interage com o Microsoft Azure. Instale esse pacote PyPI no seu ambiente.

Configurar uma conexão com o Azure FileShare

O pacote do provedor do Microsoft Azure oferece um tipo de conexão para o Azure File Share. Crie uma conexão desse tipo. A conexão do Cloud Storage, chamada google_cloud_default, já está configurada no seu ambiente.

Configure uma conexão com o Azure FileShare da seguinte maneira:

  1. Na interface do Airflow, acesse Admin > Connections.
  2. Criar uma nova conexão.
  3. Selecione Azure FileShare como o tipo de conexão.
  4. O exemplo a seguir usa uma conexão chamada azure_fileshare. Você pode usar esse nome ou qualquer outro para a conexão.
  5. Especifique os parâmetros de conexão conforme descrito na documentação do Airflow para a conexão do Microsoft Azure File Share. Por exemplo, é possível especificar uma string de conexão para a chave de acesso da conta de armazenamento.

Transferir dados do Azure FileShare

Se você quiser operar nos dados sincronizados mais tarde em outro DAG ou tarefa, extraia-os para a pasta /data do bucket do seu ambiente. Essa pasta é sincronizada com outros workers do Airflow para que as tarefas no seu DAG possam operar nela.

O DAG a seguir faz o seguinte:

O exemplo de DAG a seguir faz o seguinte:

  • Sincroniza o conteúdo do diretório /data-for-gcs do Azure File Share com a pasta /data/from-azure no bucket do seu ambiente.
  • Espera dois minutos para que os dados sejam sincronizados com todos os workers do Airflow no seu ambiente.
  • Gera a lista de arquivos nesse diretório usando o comando ls. Substitua essa tarefa por outros operadores do Airflow que funcionam com seus dados.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_azure_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule=None,
) as dag:

    transfer_dir_from_azure = AzureFileShareToGCSOperator(
        task_id='transfer_dir_from_azure',
        azure_fileshare_conn_id='azure_fileshare',
        share_name='example-file-share',
        directory_name='data-for-gcs',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-azure/')


    transfer_dir_from_azure >> sleep_2min >> print_dir_files

A seguir