Criar pipelines usando o formulário do criador na UI do job builder

Neste tutorial, mostramos como usar a sintaxe YAML do Apache Beam para criar pipelines de processamento de dados do Dataflow. Você vai aprender a ler dados de um arquivo, aplicar transformações e gravar os resultados em outro arquivo usando a UI do job builder no console Google Cloud . Este tutorial é destinado a desenvolvedores que não conhecem o Apache Beam ou querem aprender a usar a API YAML para criar pipelines.

A tabela a seguir mostra um gráfico de pipeline no console do Google Cloud e a especificação YAML correspondente.

Um gráfico do job do Dataflow.
pipeline:
  transforms:
    - name: ReadFromCsv
      type: ReadFromCsv
      config:
        path: 'gs://[...]/restaurant-data.csv'
    - name: MapToFields
      type: MapToFields
      input: ReadFromCsv
      config:
        language: python
        fields:
          Lowercase_menu_item: Item.lower()
          Total_price: Price + Tax
        append: true
    - name: WriteToJson
      type: WriteToJson
      input: MapToFields
      config:
        path: 'gs://[...]/restaurant-data_map-fields.json'

Objetivos

Neste tutorial, você vai aprender a:

  • Criar pipelines YAML do Beam que leem, gravam e transformam dados.
  • Filtra dados com base no conteúdo.
  • Mapear campos usando expressões Python.
  • Use SQL para consultar e agregar dados.
  • Crie e execute pipelines Beam YAML usando o formulário de builder na UI do criador de jobs no console Google Cloud .

Custos

Neste documento, você vai usar os seguintes componentes faturáveis do Google Cloud:

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços.

Novos usuários do Google Cloud podem estar qualificados para um teste sem custo financeiro.

Ao concluir as tarefas descritas neste documento, é possível evitar o faturamento contínuo excluindo os recursos criados. Para mais informações, consulte Limpeza.

Antes de começar

Conclua as etapas a seguir antes de executar o pipeline.

Criar o projeto

  1. Faça login na sua conta do Google Cloud . Se você começou a usar o Google Cloud, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataflow, Compute Engine, Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

Criar um bucket do Cloud Storage

Antes de executar um pipeline, é necessário criar um bucket do Cloud Storage.

  1. Crie um bucket do Cloud Storage:

    1. No console do Google Cloud , acesse a página Buckets do Cloud Storage.

      Acessar buckets

    2. Clique em Criar.
    3. Na página Criar um bucket, insira as informações do seu bucket. Para ir à próxima etapa, clique em Continuar.
      1. Em Nomear o bucket, insira um nome exclusivo. Não inclua informações confidenciais no nome do bucket já que o namespace dele é global e visível para o público.
      2. Na seção Escolha onde armazenar seus dados, faça o seguinte:
        1. Selecione um tipo de local.
        2. Escolha um local onde os dados do bucket são armazenados permanentemente no menu suspenso Tipo de local.
        3. Para configurar a replicação entre buckets, selecione Adicionar replicação entre buckets usando o Serviço de transferência do Cloud Storage e siga estas etapas:

          Configurar a replicação entre buckets

          1. No menu Bucket, selecione um bucket.
          2. Na seção Configurações de replicação, clique em Configurar para definir as configurações do job de replicação.

            O painel Configurar a replicação entre buckets aparece.

            • Para filtrar objetos a serem replicados por prefixo de nome de objeto, insira um prefixo com que você quer incluir ou excluir objetos e clique em Adicionar um prefixo.
            • Para definir uma classe de armazenamento para os objetos replicados, selecione uma classe de armazenamento no menu Classe de armazenamento. Se você pular esta etapa, os objetos replicados vão usar a classe de armazenamento do bucket de destino por padrão.
            • Clique em Concluído.
      3. Na seção Escolha como armazenar seus dados, faça o seguinte:
        1. Na seção Definir uma classe padrão, selecione o seguinte: Padrão.
        2. Para ativar o namespace hierárquico, na seção Otimizar o armazenamento para cargas de trabalho com uso intensivo de dados, selecione Ativar namespace hierárquico neste bucket.
      4. Na seção Escolha como controlar o acesso a objetos, selecione se o bucket aplica ou não a prevenção de acesso público e selecione um método de controle de acesso para os objetos do bucket.
      5. Na seção Escolha como proteger os dados do objeto, faça o seguinte:
        • Selecione qualquer uma das opções em Proteção de dados que você quer definir para o bucket.
          • Para ativar a exclusão reversível, clique na caixa de seleção Política de exclusão reversível (para recuperação de dados) e especifique o número de dias que você quer reter os objetos após a exclusão.
          • Para definir o controle de versões de objetos, clique na caixa de seleção Controle de versões de objetos (para controle de versões) e especifique o número máximo de versões por objeto e o número de dias após os quais as versões não atuais expiram.
          • Para ativar a política de retenção em objetos e buckets, clique na caixa de seleção Retenção (para compliance) e faça o seguinte:
            • Para ativar o bloqueio de retenção de objetos, clique na caixa de seleção Ativar retenção de objetos.
            • Para ativar o Bloqueio de buckets, clique na caixa de seleção Definir política de retenção de buckets e escolha uma unidade e um período de armazenamento para a retenção.
        • Para escolher como os dados do objeto serão criptografados, expanda a seção Criptografia de dados () e selecione um método de Criptografia de dados.
    4. Clique em Criar.
  2. Copie o seguinte, conforme necessário em uma seção posterior:

    • Seu nome do bucket do Cloud Storage.
    • O ID do projeto do Google Cloud .

    Para encontrar esse ID, consulte Como identificar projetos.

Rede VPC

Por padrão, cada novo projeto começa com uma rede padrão. Se a rede padrão do projeto estiver desativada ou tiver sido excluída, você precisará ter uma rede no projeto em que sua conta de usuário tem o papel de usuário da rede do Compute (roles/compute.networkUser).

Ler, gravar e transformar dados

Esta seção mostra como usar a sintaxe do Beam YAML com o Dataflow para ler, gravar e filtrar dados usando o seguinte:

  • Desenvolvimento orientado por interface do usuário para criar e executar jobs na UI do criador de jobs no console Google Cloud . Especificamente, você vai usar o formulário de builder na UI do builder de jobs, sem precisar criar os arquivos YAML manualmente.
  • Dados de arquivos CSV armazenados em um bucket do Cloud Storage visível publicamente. Esses dados contêm dados simulados de cardápios de restaurantes e têm esta aparência:

    restaurant-data.csv

    Menu item,Category,Price,Tax
    Classic Cheeseburger,Entree,9.99,0.7
    Margherita Pizza,Entree,14.50,1.02
    Grilled Salmon with Asparagus,Entree,21.99,1.54
    Chicken Caesar Salad,Salad,12.75,0.89
    Spaghetti Carbonara,Entree,16.25,1.14
    Beef Tacos (3),Entree,10.50,0.74
    Vegetable Stir-Fry,Entree,13.00,0.91
    Shrimp Scampi,Entree,19.75,1.38
    Chicken Pot Pie,Entree,15.50,1.09
    Steak Frites,Entree,28.00,1.96
    Lobster Mac and Cheese,Entree,25.50,1.79
    Pork Belly Bao Buns (2),Appetizer/Side,11.25,0.79
    Mushroom Risotto,Entree,17.50,1.23
    Fish and Chips,Entree,14.00,0.98
    Buffalo Wings (6),Appetizer/Side,9.50,0.67
    French Onion Soup,Appetizer/Side,7.00,0.49
    Tomato Soup with Grilled Cheese,Appetizer/Side,10.00,0.7
    Avocado Toast,Appetizer/Side,8.50,0.6
    Quesadilla with Chicken,Appetizer/Side,11.75,0.82
    Pad Thai,Entree,15.00,1.05
    Chicken Tikka Masala,Entree,18.50,1.3
    Burrito Bowl,Entree,13.50,0.95
    Sushi Combo (8 pieces),Entree,22.00,1.54
    Greek Salad,Salad,11.00,0.77
    Clam Chowder,Appetizer/Side,8.00,0.56
    New York Cheesecake,Dessert,6.50,0.46
    Chocolate Lava Cake,Dessert,7.50,0.53
    Apple Pie,Dessert,5.00,0.35
    Tiramisu,Dessert,8.00,0.56
    Crème brûlée,Dessert,7.00,0.49
    Iced Coffee,Beverage,3.50,0.25
    Lemonade,Beverage,3.00,0.21
    Orange Juice,Beverage,4.00,0.28
    Soda,Beverage,2.50,0.18
    Craft Beer,Beverage,6.00,0.42
    Glass of Wine,Beverage,9.00,0.63
    Margarita,Beverage,12.00,0.84
    Moscow Mule,Beverage,11.50,0.81
    Old Fashioned,Beverage,13.00,0.91
    Espresso,Beverage,3.00,0.21
    Cappuccino,Beverage,4.50,0.32
    Latte,Beverage,5.00,0.35
    Mocha,Beverage,5.50,0.39
    Hot Chocolate,Beverage,4.00,0.28
    Breakfast Burrito,Breakfast,10.50,0.74
    Pancakes (3),Breakfast,8.00,0.56
    Waffles,Breakfast,9.00,0.63
    Eggs Benedict,Breakfast,14.00,0.98
    Omelette,Breakfast,11.00,0.77
    Fruit Salad,Salad,7.50,0.53
    Yogurt Parfait,Breakfast,6.00,0.42

Ler e filtrar dados

O exemplo a seguir mostra como ler dados de um arquivo CSV, filtrar informações específicas e gravar os dados filtrados em um arquivo JSON.

Esta amostra usa a transformaçãoFilter, que permite manter seletivamente os dados que atendem a determinados critérios. O exemplo a seguir filtra um conjunto de dados para manter apenas os registros em que Price é maior ou igual a 20.00.

Para ler os dados CSV e gerar conteúdo JSON filtrado, siga estas etapas:

  1. No console Google Cloud , acesse a página Jobs do Dataflow.

    Acessar "Jobs"

  2. Clique em Criar job no Builder.

  3. Na guia Criador de jobs, deixe Formulário do criador selecionado.

  4. No campo Nome do job, insira filter-python-job.

  5. Em Tipo de serviço, deixe Lote selecionado.

  6. Na seção Fontes:

    1. No campo Nome da fonte do painel Nova fonte, mude o nome para ReadCsv.

    2. Na lista Tipo de origem, selecione CSV do Cloud Storage.

    3. No campo Local do CSV, insira:

      cloud-samples-data/dataflow/tutorials/restaurant-data.csv
      
    4. Clique em Concluído.

  7. Na seção Transformações:

    1. Clique em Adicionar uma transformação.

    2. No campo Nome da transformação, insira FilterPrice.

    3. Na lista Tipo de transformação, selecione Filtro (Python).

    4. No campo Expressão de filtro do Python, insira Price >= 20.00.

    5. Na lista Etapa de entrada para a transformação, deixe ReadCsv selecionado.

    6. Clique em Concluído.

  8. Na seção Coletores:

    1. No campo Nome do coletor, mude o nome para WriteJson.

    2. Na lista Tipo de coletor, selecione Arquivos JSON no Cloud Storage.

    3. No campo Localização do JSON, insira:

      BUCKET_NAME/output/restaurant-data_filtered.json
      

      Substitua BUCKET_NAME pelo nome do bucket do seu Cloud Storage.

    4. Na lista Etapa de entrada para o coletor, deixe FilterPrice selecionado.

    5. Clique em Concluído.

  9. Na seção Opções do Dataflow, clique em Executar job.

Examinar a saída do job

Quando o job for concluído, siga estas etapas para conferir a saída do pipeline:

  1. No console do Google Cloud , acesse a página Buckets do Cloud Storage.

    Acessar buckets

  2. Na lista de buckets, clique no nome do bucket que você criou em Criar um bucket do Cloud Storage.

  3. Clique no arquivo chamado restaurant-data_filtered.json-00000-of-00001.

  4. Na página Detalhes do objeto, clique no URL autenticado para visualizar a saída do pipeline.

A saída será parecida com esta:

{"Item":"Grilled Salmon with Asparagus","Category":"Entree","Price":21.99,"Tax":1.54}
{"Item":"Steak Frites","Category":"Entree","Price":28.0,"Tax":1.96}
{"Item":"Lobster Mac and Cheese","Category":"Entree","Price":25.5,"Tax":1.79}
{"Item":"Sushi Combo (8 pieces)","Category":"Entree","Price":22.0,"Tax":1.54}

Mapear campos usando Python

Com a transformação MapToFields, é possível criar novos campos com base nos atuais. O exemplo a seguir cria uma versão em letras minúsculas de um item de menu, calcula um preço total e anexa os valores depois dos valores atuais.

  1. Acesse a página Jobs do Dataflow no consoleGoogle Cloud .

    Acessar "Jobs"

  2. Clique em Criar job usando o criador.

  3. Na guia Criador de jobs, deixe Formulário do criador selecionado.

  4. No campo Nome do job, insira map-python-job.

  5. Em Tipo de serviço, deixe Lote selecionado.

  6. Na seção Fontes:

    1. No campo Nome da fonte do painel Nova fonte, mude o nome para ReadFromCsvPy.

    2. Na lista Tipo de origem, selecione CSV do Cloud Storage.

    3. No campo Local do CSV, insira:

      cloud-samples-data/dataflow/tutorials/restaurant-data.csv
      
    4. Clique em Concluído.

  7. Na seção Transformações:

    1. Clique em Adicionar uma transformação.

    2. No campo Nome da transformação, insira MapToFieldsPy.

    3. Na lista Tipo de transformação, selecione Mapear campos (Python).

    4. Deixe a opção Preservar campos atuais selecionada.

    5. Na seção Campos mapeados, clique em Adicionar um campo.

    6. No painel Novo campo que é aberto, coloque Lowercase_menu_item como o Nome do campo.

    7. No campo Expressão Python, insira Item.lower().

    8. Clique em Concluído.

    9. Na mesma seção Campos mapeados, clique em Adicionar um campo novamente.

    10. No painel Novo campo que é aberto, coloque Total_price como o Nome do campo.

    11. No campo Expressão Python, insira Price + Tax.

    12. No painel Novo campo, clique em Concluído.

    13. No painel Nova transformação, clique em Concluído.

  8. Na seção Coletores:

    1. No campo Nome do coletor, mude o nome para WriteToJsonPy.

    2. Na lista Tipo de coletor, selecione Arquivos JSON no Cloud Storage.

    3. No campo Localização do JSON, insira:

      BUCKET_NAME/output/restaurant-data_map-fields.json
      

      Substitua BUCKET_NAME pelo nome do bucket do seu Cloud Storage.

    4. Na lista Etapa de entrada para o coletor, deixe MapToFieldsPy selecionado.

    5. Clique em Concluído.

  9. Na seção Opções do Dataflow, clique em Executar job.

Examinar a saída do job

Quando o job for concluído, siga estas etapas para conferir a saída do pipeline:

  1. No console do Google Cloud , acesse a página Buckets do Cloud Storage.

    Acessar buckets

  2. Na lista de buckets, clique no nome do bucket que você criou em Criar um bucket do Cloud Storage.

  3. Clique no arquivo chamado restaurant-data_map-fields.json-00000-of-00001.

  4. Na página Detalhes do objeto, clique no URL autenticado para visualizar a saída do pipeline.

A saída será parecida com esta:

{"Item":"Classic Cheeseburger","Category":"Entree","Price":9.99,"Tax":0.7,"Lowercase_menu_item":"classic cheeseburger","Total_price":10.69}
{"Item":"Margherita Pizza","Category":"Entree","Price":14.5,"Tax":1.02,"Lowercase_menu_item":"margherita pizza","Total_price":15.52}
{"Item":"Grilled Salmon with Asparagus","Category":"Entree","Price":21.99,"Tax":1.54,"Lowercase_menu_item":"grilled salmon with asparagus","Total_price":23.53}
{"Item":"Chicken Caesar Salad","Category":"Salad","Price":12.75,"Tax":0.89,"Lowercase_menu_item":"chicken caesar salad","Total_price":13.64}
{"Item":"Spaghetti Carbonara","Category":"Entree","Price":16.25,"Tax":1.14,"Lowercase_menu_item":"spaghetti carbonara","Total_price":17.39}
{"Item":"Beef Tacos (3)","Category":"Entree","Price":10.5,"Tax":0.74,"Lowercase_menu_item":"beef tacos (3)","Total_price":11.24}
[...]

Transformar dados usando SQL

A transformação Sql transform permite executar consultas SQL nos seus dados. O exemplo a seguir agrupa itens de menu por categoria (como Entree, Beverage ou Dessert) e adiciona uma coluna com a contagem de itens em cada categoria.

Para usar a interface do criador de jobs e criar seu pipeline, siga estas etapas:

  1. Acesse a página Jobs do Dataflow no console Google Cloud .

    Acessar "Jobs"

  2. Clique em Criar job usando o criador.

  3. Na guia Criador de jobs, no campo Nome do job, insira sql-transform-job.

  4. Em Tipo de serviço, deixe Lote selecionado.

  5. Na seção Fontes:

    1. No campo Nome da origem, mude o nome para SqlTransformSource.

    2. Na guia Nova origem, em Tipo de origem, selecione CSV do Cloud Storage. O campo Local do CSV é aberto.

    3. Em Local do CSV, insira:

      cloud-samples-data/dataflow/tutorials/restaurant-data.csv
      
    4. Clique em Concluído.

  6. Na seção Transformações:

    1. Clique em Adicionar uma transformação.

    2. No campo Nome da transformação, atualize o nome para SqlTransform.

    3. Em Tipo de transformação, selecione Transformação de SQL. As opções de transformação de SQL são abertas.

    4. No campo Expressão SQL, insira:

      select Category, count(*) as category_count from PCOLLECTION group by Category
      
    5. Clique em Concluído.

  7. Na seção Coletores:

    1. Em Nome do coletor, insira SqlTransformSink.

    2. Em Tipo de coletor, selecione Arquivos JSON no Cloud Storage. As opções Gravar em arquivos JSON no Cloud Storage são abertas.

    3. Em Local do JSON, insira:

      BUCKET_NAME/output/restaurant-data_transform-sql.json
      

      Substitua BUCKET_NAME pelo nome do bucket do seu Cloud Storage.

    4. Clique em Concluído.

  8. Opcional: veja a definição YAML gerada para esse pipeline.

    1. Navegue até a parte de cima da guia "Criador de jobs".

    2. Selecione Editor do YAML. A definição YAML vai aparecer. Ele vai ficar assim:

      Especificação YAML gerada

      pipeline:
        transforms:
          - name: SqlTransformSource
            type: ReadFromCsv
            config:
              path: 'gs://cloud-samples-data/dataflow/tutorials/restaurant-data.csv'
          - name: SqlTransform
            type: Sql
            config:
              query: >-
                select Category, count(*) as category_count from PCOLLECTION group by
                Category
            input:
              input0: SqlTransformSource
          - name: SqlTransformSink
            type: WriteToJson
            input: SqlTransform
            config:
              path: 'gs://BUCKET_NAME/output/restaurant-data_transform-sql.json'
  9. Na seção Opções do Dataflow, clique em Executar job.

Examinar a saída do job

Quando o job for concluído, siga estas etapas para conferir a saída do pipeline:

  1. No console do Google Cloud , acesse a página Buckets do Cloud Storage.

    Acessar buckets

  2. Na lista de buckets, clique no nome do bucket que você criou em Criar um bucket do Cloud Storage.

  3. Clique no arquivo chamado restaurant-data_transform-sql.json-00000-of-00001.

  4. Na página Detalhes do objeto, clique no URL autenticado para visualizar a saída do pipeline.

A saída será parecida com esta:

{"Category":"Entree","category_count":16}
{"Category":"Beverage","category_count":14}
{"Category":"Appetizer\/Side","category_count":7}
{"Category":"Dessert","category_count":5}
{"Category":"Breakfast","category_count":6}
{"Category":"Salad","category_count":3}

Limpar

Para evitar cobranças na sua conta do Google Cloud pelos recursos usados no tutorial, exclua o projeto que os contém ou mantenha o projeto e exclua os recursos individuais.

Excluir o projeto

  1. No console Google Cloud , acesse a página Gerenciar recursos.

    Acessar "Gerenciar recursos"

  2. Na lista de projetos, selecione o projeto que você quer excluir e clique em Excluir .
  3. Na caixa de diálogo, digite o ID do projeto e clique em Encerrar para excluí-lo.

Excluir recursos individuais

Se você quiser reutilizar o projeto mais tarde, poderá mantê-lo, mas excluir os recursos criados durante o tutorial.

Pare o pipeline do Dataflow

  1. No console Google Cloud , acesse a página Jobs do Dataflow.

    Acessar "Jobs"

  2. Clique no job que você quer interromper.

    Para interromper um job, o status dele precisa serem execução.

  3. Na página de detalhes do job, clique em Parar.

  4. Clique em Cancelar.

  5. Confirme sua escolha clicando em Interromper job.

Exclua o bucket do Cloud Storage

  1. No console do Google Cloud , acesse a página Buckets do Cloud Storage.

    Acessar buckets

  2. Clique na caixa de seleção do bucket que você quer excluir.
  3. Para excluir o bucket, clique em Excluir e siga as instruções.

A seguir