Use contentores personalizados com bibliotecas C++

Neste tutorial, vai criar um pipeline que usa contentores personalizados com bibliotecas C++ para executar um fluxo de trabalho altamente paralelo de HPC do Dataflow. Use este tutorial para saber como usar o Dataflow e o Apache Beam para executar aplicações de computação em grelha que requerem que os dados sejam distribuídos a funções executadas em vários núcleos.

O tutorial demonstra como executar o pipeline primeiro usando o Direct Runner e, em seguida, usando o Dataflow Runner. Ao executar o pipeline localmente, pode testá-lo antes de o implementar.

Este exemplo usa associações Cython e funções da biblioteca GMP. Independentemente da biblioteca ou da ferramenta de associação que usar, pode aplicar os mesmos princípios ao seu pipeline.

O código de exemplo está disponível no GitHub.

Objetivos

  • Crie um pipeline que use contentores personalizados com bibliotecas C++.

  • Crie uma imagem de contentor Docker com um Dockerfile.

  • Empacote o código e as dependências num contentor do Docker.

  • Execute o pipeline localmente para o testar.

  • Execute o pipeline num ambiente distribuído.

Custos

Neste documento, usa os seguintes componentes faturáveis da Google Cloud Platform:

  • Artifact Registry
  • Cloud Build
  • Cloud Storage
  • Compute Engine
  • Dataflow

Para gerar uma estimativa de custos com base na sua utilização prevista, use a calculadora de preços.

Os novos Google Cloud utilizadores podem ser elegíveis para uma avaliação sem custo financeiro.

Quando terminar as tarefas descritas neste documento, pode evitar a faturação contínua eliminando os recursos que criou. Para mais informações, consulte o artigo Limpe.

Antes de começar

  1. Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.

  4. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  5. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: Your project ID.
    • USER_IDENTIFIER: The identifier for your user account. For example, myemail@example.com.
    • ROLE: The IAM role that you grant to your user account.
  10. Install the Google Cloud CLI.

  11. Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.

  12. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  13. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Verify that billing is enabled for your Google Cloud project.

  15. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  16. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: Your project ID.
    • USER_IDENTIFIER: The identifier for your user account. For example, myemail@example.com.
    • ROLE: The IAM role that you grant to your user account.
  18. Crie uma conta de serviço de worker gerida pelo utilizador para o novo pipeline e conceda as funções necessárias à conta de serviço.

    1. Para criar a conta de serviço, execute o comando gcloud iam service-accounts create:

      gcloud iam service-accounts create parallelpipeline \
          --description="Highly parallel pipeline worker service account" \
          --display-name="Highly parallel data pipeline access"
    2. Conceda funções à conta de serviço. Execute o seguinte comando uma vez para cada uma das seguintes funções do IAM:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/storage.objectAdmin
      • roles/artifactregistry.reader
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      Substitua SERVICE_ACCOUNT_ROLE por cada função individual.

    3. Conceda à sua Conta Google uma função que lhe permita criar chaves de acesso para a conta de serviço:

      gcloud iam service-accounts add-iam-policy-binding parallelpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator

Transfira o exemplo de código e altere os diretórios

Transfira o exemplo de código e, de seguida, altere os diretórios. Os exemplos de código no repositório do GitHub fornecem todo o código de que precisa para executar este pipeline. Quando estiver tudo pronto para criar o seu próprio pipeline, pode usar este exemplo de código como modelo.

Clone o repositório beam-cpp-example.

  1. Use o comando git clone para clonar o repositório do GitHub:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. Mude para o diretório da aplicação:

    cd dataflow-sample-applications/beam-cpp-example
    

Código da pipeline

Pode personalizar o código do pipeline a partir deste tutorial. Este pipeline conclui as seguintes tarefas:

  • Produz dinamicamente todos os números inteiros num intervalo de entrada.
  • Executa os números inteiros através de uma função C++ e filtra os valores incorretos.
  • Escreve os valores incorretos num canal lateral.
  • Conta a ocorrência de cada tempo de paragem e normaliza os resultados.
  • Imprime a saída, formata e escreve os resultados num ficheiro de texto.
  • Cria um PCollection com um único elemento.
  • Processa o elemento único com uma função map e passa a frequência PCollection como uma entrada lateral.
  • Processa o PCollection e produz um único resultado.

O ficheiro inicial tem o seguinte aspeto:

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import argparse
import logging
import os
import sys


def run(argv):
  # Import here to avoid __main__ session pickling issues.
  import io
  import itertools
  import matplotlib.pyplot as plt
  import collatz

  import apache_beam as beam
  from apache_beam.io import restriction_trackers
  from apache_beam.options.pipeline_options import PipelineOptions

  class RangeSdf(beam.DoFn, beam.RestrictionProvider):
    """An SDF producing all the integers in the input range.

    This is preferable to beam.Create(range(...)) as it produces the integers
    dynamically rather than materializing them up front.  It is an SDF to do
    so with perfect dynamic sharding.
    """
    def initial_restriction(self, desired_range):
      start, stop = desired_range
      return restriction_trackers.OffsetRange(start, stop)

    def restriction_size(self, _, restriction):
      return restriction.size()

    def create_tracker(self, restriction):
      return restriction_trackers.OffsetRestrictionTracker(restriction)

    def process(self, _, active_range=beam.DoFn.RestrictionParam()):
      for i in itertools.count(active_range.current_restriction().start):
        if active_range.try_claim(i):
          yield i
        else:
          break

  class GenerateIntegers(beam.PTransform):
    def __init__(self, start, stop):
      self._start = start
      self._stop = stop

    def expand(self, p):
      return (
          p
          | beam.Create([(self._start, self._stop + 1)])
          | beam.ParDo(RangeSdf()))

  parser = argparse.ArgumentParser()
  parser.add_argument('--start', dest='start', type=int, default=1)
  parser.add_argument('--stop', dest='stop', type=int, default=10000)
  parser.add_argument('--output', default='./out.png')

  known_args, pipeline_args = parser.parse_known_args(argv)
  # Store this as a local to avoid capturing the full known_args.
  output_path = known_args.output

  with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:

    # Generate the integers from start to stop (inclusive).
    integers = p | GenerateIntegers(known_args.start, known_args.stop)

    # Run them through our C++ function, filtering bad records.
    # Requires apache beam 2.34 or later.
    stopping_times, bad_values = (
        integers
        | beam.Map(collatz.total_stopping_time).with_exception_handling(
            use_subprocess=True))

    # Write the bad values to a side channel.
    bad_values | 'WriteBadValues' >> beam.io.WriteToText(
        os.path.splitext(output_path)[0] + '-bad.txt')

    # Count the occurrence of each stopping time and normalize.
    total = known_args.stop - known_args.start + 1
    frequencies = (
        stopping_times
        | 'Aggregate' >> (beam.Map(lambda x: (x, 1)) | beam.CombinePerKey(sum))
        | 'Normalize' >> beam.MapTuple(lambda x, count: (x, count / total)))

    if known_args.stop <= 10:
      # Print out the results for debugging.
      frequencies | beam.Map(print)
    else:
      # Format and write them to a text file.
      (
          frequencies
          | 'Format' >> beam.MapTuple(lambda count, freq: f'{count}, {freq}')
          | beam.io.WriteToText(os.path.splitext(output_path)[0] + '.txt'))

    # Define some helper functions.
    def make_scatter_plot(xy):
      x, y = zip(*xy)
      plt.plot(x, y, '.')
      png_bytes = io.BytesIO()
      plt.savefig(png_bytes, format='png')
      png_bytes.seek(0)
      return png_bytes.read()

    def write_to_path(path, content):
      """Most Beam IOs write multiple elements to some kind of a container
      file (e.g. strings to lines of a text file, avro records to an avro file,
      etc.)  This function writes each element to its own file, given by path.
      """
      # Write to a temporary path and to a rename for fault tolerence.
      tmp_path = path + '.tmp'
      fs = beam.io.filesystems.FileSystems.get_filesystem(path)
      with fs.create(tmp_path) as fout:
        fout.write(content)
      fs.rename([tmp_path], [path])

    (
        p
        # Create a PCollection with a single element.
        | 'CreateSingleton' >> beam.Create([None])
        # Process the single element with a Map function, passing the frequency
        # PCollection as a side input.
        # This will cause the normally distributed frequency PCollection to be
        # colocated and processed as a single unit, producing a single output.
        | 'MakePlot' >> beam.Map(
            lambda _,
            data: make_scatter_plot(data),
            data=beam.pvalue.AsList(frequencies))
        # Pair this with the desired filename.
        |
        'PairWithFilename' >> beam.Map(lambda content: (output_path, content))
        # And actually write it out, using MapTuple to split the tuple into args.
        | 'WriteToOutput' >> beam.MapTuple(write_to_path))


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run(sys.argv)

Configure o ambiente de programação

  1. Use o SDK Apache Beam para Python.

  2. Instale a biblioteca GMP:

    apt-get install libgmp3-dev
    
  3. Para instalar as dependências, use o ficheiro requirements.txt.

    pip install -r requirements.txt
    
  4. Para criar as associações Python, execute o seguinte comando.

    python setup.py build_ext --inplace
    

Pode personalizar o ficheiro requirements.txt a partir deste tutorial. O ficheiro inicial inclui as seguintes dependências:

#
#    Licensed to the Apache Software Foundation (ASF) under one or more
#    contributor license agreements.  See the NOTICE file distributed with
#    this work for additional information regarding copyright ownership.
#    The ASF licenses this file to You under the Apache License, Version 2.0
#    (the "License"); you may not use this file except in compliance with
#    the License.  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

apache-beam[gcp]==2.46.0
cython==0.29.24
pyparsing==2.4.2
matplotlib==3.4.3

Execute o pipeline localmente

A execução do pipeline localmente é útil para testes. Ao executar o pipeline localmente, pode confirmar que o pipeline é executado e funciona conforme esperado antes de implementar o pipeline num ambiente distribuído.

Pode executar o pipeline localmente através do seguinte comando. Este comando gera uma imagem denominada out.png.

python pipeline.py

Crie os recursos da Google Cloud Platform

Esta secção explica como criar os seguintes recursos:

  • Um contentor do Cloud Storage para usar como localização de armazenamento temporário e uma localização de saída.
  • Um contentor do Docker para criar pacotes do código e das dependências do pipeline.

Crie um contentor do Cloud Storage

Comece por criar um contentor do Cloud Storage com a CLI do Google Cloud. Este contentor é usado como uma localização de armazenamento temporário pelo pipeline do Dataflow.

Para criar o contentor, use o comando gcloud storage buckets create:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Substitua o seguinte:

Crie e crie uma imagem de contentor

Pode personalizar o Dockerfile a partir deste tutorial. O ficheiro inicial tem o seguinte aspeto:

FROM apache/beam_python3.9_sdk:2.46.0

# Install a C++ library.
RUN apt-get update
RUN apt-get install -y libgmp3-dev

# Install Python dependencies.
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

# Install the code and some python bindings.
COPY pipeline.py pipeline.py
COPY collatz.pyx collatz.pyx
COPY setup.py setup.py
RUN python setup.py install

Este ficheiro Docker contém os comandos FROM, COPY e RUN, que pode ler na referência do ficheiro Docker.

  1. Para carregar artefactos, crie um repositório do Artifact Registry. Cada repositório pode conter artefactos para um único formato suportado.

    Todo o conteúdo do repositório é encriptado através de Google-owned and Google-managed encryption keys ou chaves de encriptação geridas pelo cliente. O Artifact Registry usa o Google-owned and Google-managed encryption keys por predefinição e não é necessária nenhuma configuração para esta opção.

    Tem de ter, pelo menos, acesso de gravação do Artifact Registry ao repositório.

    Execute o seguinte comando para criar um novo repositório. O comando usa a flag --async e é devolvido imediatamente, sem aguardar a conclusão da operação em curso.

    gcloud artifacts repositories create REPOSITORY \
       --repository-format=docker \
       --location=LOCATION \
       --async
    

    Substitua REPOSITORY por um nome para o seu repositório. Para cada localização do repositório num projeto, os nomes dos repositórios têm de ser exclusivos.

  2. Crie o Dockerfile.

    Para que os pacotes façam parte do contentor do Apache Beam, tem de os especificar como parte do ficheiro requirements.txt. Certifique-se de que não especifica apache-beam como parte do ficheiro requirements.txt. O contentor do Apache Beam já tem apache-beam.

  3. Antes de poder enviar ou extrair imagens, configure o Docker para autenticar pedidos para o Artifact Registry. Para configurar a autenticação nos repositórios do Docker, execute o seguinte comando:

    gcloud auth configure-docker LOCATION-docker.pkg.dev
    

    O comando atualiza a configuração do Docker. Agora, pode estabelecer ligação ao Artifact Registry no seu Google Cloud projeto para enviar imagens.

  4. Crie a imagem do Docker usando o seu Dockerfile com o Cloud Build.

    Atualize o caminho no seguinte comando para corresponder ao Dockerfile que criou. Este comando cria o ficheiro e envia-o para o seu repositório do Artifact Registry.

    gcloud builds submit --tag LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest .
    

Empacote o código e as dependências num contentor do Docker

  1. Para executar este pipeline num ambiente distribuído, agrupe o código e as dependências num contentor Docker.

    docker build . -t cpp_beam_container
    
  2. Depois de criar o pacote do código e das dependências, pode executar o pipeline localmente para o testar.

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container"
    

    Este comando escreve o resultado na imagem do Docker. Para ver o resultado, execute o pipeline com o --output e escreva o resultado num contentor do Cloud Storage. Por exemplo, execute o seguinte comando.

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container" \
       --output=gs://BUCKET_NAME/out.png
    

Execute o pipeline

Agora, pode executar o pipeline do Apache Beam no Dataflow consultando o ficheiro com o código do pipeline e transmitindo os parâmetros necessários para o pipeline.

Na shell ou no terminal, execute o pipeline com o Dataflow Runner.

python pipeline.py \
    --runner=DataflowRunner \
    --project=PROJECT_ID \
    --region=REGION \
    --temp_location=gs://BUCKET_NAME/tmp \
    --sdk_container_image="LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest" \
    --experiment=use_runner_v2 \
    --output=gs://BUCKET_NAME/out.png

Depois de executar o comando para executar o pipeline, o Dataflow devolve um ID do trabalho com o estado do trabalho Na fila. Pode demorar vários minutos até que o estado da tarefa atinja o valor Em execução e possa aceder ao gráfico de tarefas.

Veja os resultados

Ver dados escritos no seu contentor do Cloud Storage. Use o gcloud storage ls comando para apresentar o conteúdo no nível superior do seu contentor:

gcloud storage ls gs://BUCKET_NAME

Se for bem-sucedido, o comando devolve uma mensagem semelhante à seguinte:

gs://BUCKET_NAME/out.png

Limpar

Para evitar incorrer em custos na sua conta do Google Cloud pelos recursos usados neste tutorial, elimine o projeto que contém os recursos ou mantenha o projeto e elimine os recursos individuais.

Elimine o projeto

A forma mais fácil de eliminar a faturação é eliminar o Google Cloud projeto que criou para o tutorial.

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Elimine os recursos individuais

Se quiser reutilizar o projeto, elimine os recursos que criou para o tutorial.

Limpe os recursos do projeto da Google Cloud Platform

  1. Elimine o repositório do Artifact Registry.

    gcloud artifacts repositories delete REPOSITORY \
       --location=LOCATION --async
    
  2. Elimine o contentor do Cloud Storage e os respetivos objetos. Este conjunto sozinho não incorre em custos.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Revogue as credenciais

  1. Revogue as funções que concedeu à conta de serviço do trabalhador gerida pelo utilizador. Execute o seguinte comando uma vez para cada uma das seguintes funções do IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    • roles/artifactregistry.reader
    gcloud projects remove-iam-policy-binding PROJECT_ID \
      --member=serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com \
      --role=SERVICE_ACCOUNT_ROLE
  2. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  3. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

O que se segue?