Crea una canalización de Dataflow con Go

En esta página, se muestra cómo usar el SDK de Apache Beam para Go a fin de compilar un programa que defina una canalización. Luego, deberás ejecutar la canalización de manera local y en el servicio de Dataflow. Para obtener una introducción a la canalización de WordCount, consulta el video Cómo usar WordCount en Apache Beam.

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud . Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. Instala Google Cloud CLI.

  3. Si usas un proveedor de identidad externo (IdP), primero debes acceder a la gcloud CLI con tu identidad federada.

  4. Para inicializar gcloud CLI, ejecuta el siguiente comando:

    gcloud init
  5. Crea o selecciona un Google Cloud proyecto.

    Roles necesarios para seleccionar o crear un proyecto

    • Selecciona un proyecto: Para seleccionar un proyecto, no se requiere un rol de IAM específico. Puedes seleccionar cualquier proyecto en el que se te haya otorgado un rol.
    • Crear un proyecto: Para crear un proyecto, necesitas el rol de Creador de proyectos (roles/resourcemanager.projectCreator), que contiene el permiso resourcemanager.projects.create. Obtén más información para otorgar roles.
    • Crea un proyecto de Google Cloud :

      gcloud projects create PROJECT_ID

      Reemplaza PROJECT_ID por un nombre para el proyecto Google Cloud que estás creando.

    • Selecciona el proyecto Google Cloud que creaste:

      gcloud config set project PROJECT_ID

      Reemplaza PROJECT_ID por el nombre de tu Google Cloud proyecto.

  6. Verifica que la facturación esté habilitada para tu proyecto de Google Cloud .

  7. Habilita las APIs de Dataflow, Compute Engine, Cloud Logging, Cloud Storage, JSON de Google Cloud Storage y Cloud Resource Manager:

    Roles necesarios para habilitar las APIs

    Para habilitar las APIs, necesitas el rol de IAM de administrador de Service Usage (roles/serviceusage.serviceUsageAdmin), que contiene el permiso serviceusage.services.enable. Obtén más información para otorgar roles.

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  8. Crea credenciales de autenticación locales para tu cuenta de usuario:

    gcloud auth application-default login

    Si se devuelve un error de autenticación y usas un proveedor de identidad (IdP) externo, confirma que accediste a la gcloud CLI con tu identidad federada.

  9. Otorga roles a tu cuenta de usuario. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Reemplaza lo siguiente:

    • PROJECT_ID: ID del proyecto
    • USER_IDENTIFIER: Es el identificador de tu cuenta de usuario de . Por ejemplo, myemail@example.com.
    • ROLE: Es el rol de IAM que otorgas a tu cuenta de usuario.
  10. Instala Google Cloud CLI.

  11. Si usas un proveedor de identidad externo (IdP), primero debes acceder a la gcloud CLI con tu identidad federada.

  12. Para inicializar gcloud CLI, ejecuta el siguiente comando:

    gcloud init
  13. Crea o selecciona un Google Cloud proyecto.

    Roles necesarios para seleccionar o crear un proyecto

    • Selecciona un proyecto: Para seleccionar un proyecto, no se requiere un rol de IAM específico. Puedes seleccionar cualquier proyecto en el que se te haya otorgado un rol.
    • Crear un proyecto: Para crear un proyecto, necesitas el rol de Creador de proyectos (roles/resourcemanager.projectCreator), que contiene el permiso resourcemanager.projects.create. Obtén más información para otorgar roles.
    • Crea un proyecto de Google Cloud :

      gcloud projects create PROJECT_ID

      Reemplaza PROJECT_ID por un nombre para el proyecto Google Cloud que estás creando.

    • Selecciona el proyecto Google Cloud que creaste:

      gcloud config set project PROJECT_ID

      Reemplaza PROJECT_ID por el nombre de tu Google Cloud proyecto.

  14. Verifica que la facturación esté habilitada para tu proyecto de Google Cloud .

  15. Habilita las APIs de Dataflow, Compute Engine, Cloud Logging, Cloud Storage, JSON de Google Cloud Storage y Cloud Resource Manager:

    Roles necesarios para habilitar las APIs

    Para habilitar las APIs, necesitas el rol de IAM de administrador de Service Usage (roles/serviceusage.serviceUsageAdmin), que contiene el permiso serviceusage.services.enable. Obtén más información para otorgar roles.

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  16. Crea credenciales de autenticación locales para tu cuenta de usuario:

    gcloud auth application-default login

    Si se devuelve un error de autenticación y usas un proveedor de identidad (IdP) externo, confirma que accediste a la gcloud CLI con tu identidad federada.

  17. Otorga roles a tu cuenta de usuario. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Reemplaza lo siguiente:

    • PROJECT_ID: ID del proyecto
    • USER_IDENTIFIER: Es el identificador de tu cuenta de usuario de . Por ejemplo, myemail@example.com.
    • ROLE: Es el rol de IAM que otorgas a tu cuenta de usuario.
  18. Otorga roles a tu cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Reemplaza PROJECT_ID con el ID del proyecto.
    • Reemplaza PROJECT_NUMBER por el número del proyecto. Para encontrar el número de tu proyecto, consulta Identifica proyectos o usa el comando gcloud projects describe.
    • Reemplaza SERVICE_ACCOUNT_ROLE por cada rol individual.
  19. Crea un bucket de Cloud Storage y configúralo de la siguiente manera:
    • Establece la clase de almacenamiento en S (Standard).
    • Configura la ubicación de almacenamiento de la siguiente manera: US (Estados Unidos).
    • Reemplaza BUCKET_NAME con un nombre de bucket único. No incluyas información sensible en el nombre del bucket porque su espacio de nombres es global y visible a nivel público.
    gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
  20. Copia el ID del proyecto Google Cloud y el nombre del bucket de Cloud Storage. Necesitarás estos valores más adelante en esta guía de inicio rápido.

Configura tu entorno de desarrollo

El SDK de Apache Beam es un modelo de programación de código abierto para canalizaciones de datos. Debes definir una canalización con un programa de Apache Beam y, luego, elegir un ejecutor, como Dataflow, para ejecutar tu canalización.

Te recomendamos que uses la versión más reciente de Go cuando trabajes con el SDK de Apache Beam para Go. Si no tienes instalada la versión más reciente de Go, usa la guía de descarga e instalación de Go para descargar e instalar Go en tu sistema operativo específico.

Para verificar la versión de Go que instalaste, ejecuta el siguiente comando en tu terminal local:

go version

Ejecuta el ejemplo de recuento de palabras de Beam

El SDK de Apache Beam para Go incluye un ejemplo de canalización wordcount. El ejemplo de wordcount realiza lo siguiente:

  1. Lee un archivo de texto como entrada. De manera predeterminada, lee un archivo de texto ubicado en un bucket de Cloud Storage con el nombre del recurso gs://dataflow-samples/shakespeare/kinglear.txt.
  2. Analiza cada línea en palabras.
  3. Realiza un recuento de frecuencia en las palabras con asignación de token.

Para ejecutar la última versión del ejemplo de wordcount de Beam en tu máquina local, realiza los siguientes pasos:

  1. Usa el comando git clone para clonar el repositorio de GitHub apache/beam:

    git clone https://github.com/apache/beam.git
  2. Cambia al directorio beam/sdks/go:

    cd beam/sdks/go
  3. Usa el siguiente comando para ejecutar la canalización:

    go run examples/wordcount/wordcount.go \
      --input gs://dataflow-samples/shakespeare/kinglear.txt \
      --output outputs

    La marca input especifica el archivo que se leerá y la marca output especifica el nombre del archivo para el resultado del recuento de frecuencia.

Una vez completada la canalización, ve los resultados de salida:

more outputs*

Para salir, presiona q.

Modifica el código de canalización

La canalización wordcount de Beam distingue mayúsculas de minúsculas. En los pasos siguientes, se muestra cómo crear tu propio módulo de Go, modificar la canalización de wordcount para que no distinga mayúsculas de minúsculas, y ejecutarla en Dataflow.

Crea un módulo de Go

Para realizar cambios en el código de la canalización, sigue estos pasos.

  1. Crea un directorio para tu módulo de Go en la ubicación que elijas:

    mkdir wordcount
    cd wordcount
  2. Crea un módulo de Go. Para este ejemplo, usa example/dataflow como la ruta de acceso del módulo.

    go mod init example/dataflow
  3. Descarga la copia más reciente del código de wordcount del repositorio de GitHub de Apache Beam. Coloca este archivo en el directorio wordcount que creaste.

  4. Si usas un sistema operativo que no sea Linux, debes obtener el paquete unix de Go. Este paquete es necesario para ejecutar canalizaciones en el servicio de Dataflow.

    go get -u golang.org/x/sys/unix
  5. Asegúrate de que el archivo go.mod coincida con el código fuente del módulo:

    go mod tidy

Ejecuta la canalización sin modificar

Verifica que la canalización de wordcount sin modificar se ejecute de forma local.

  1. Desde la terminal, compila y ejecuta la canalización de manera local:

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
  2. Observa los resultados de salida:

     more outputs*
  3. Para salir, presiona q.

Cambia el código de canalización

Si deseas cambiar la canalización para que no distinga mayúsculas de minúsculas, modifica el código a fin de aplicar la función strings.ToLower a todas las palabras.

  1. En el editor que prefieras, abre el archivo wordcount.go.

  2. Examina el bloque init (se quitaron los comentarios para mayor claridad):

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
     }
    
  3. Agrega una línea nueva para registrar la función strings.ToLower:

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
       register.Function1x1(strings.ToLower)
     }
    
  4. Examina la función CountWords:

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Count the number of times each word occurs.
       return stats.Count(s, col)
     }
    
  5. Para dejar en minúsculas las palabras, agrega un ParDo que aplique strings.ToLower a cada palabra:

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Map all letters to lowercase.
       lowercaseWords := beam.ParDo(s, strings.ToLower, col)
    
       // Count the number of times each word occurs.
       return stats.Count(s, lowercaseWords)
     }
    
  6. Guarda el archivo.

Ejecuta la canalización actualizada de forma local

Ejecuta la canalización wordcount actualizada de forma local y verifica que el resultado haya cambiado.

  1. Compila y ejecuta la canalización wordcount modificada:

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
  2. Ve los resultados de la canalización modificada: Todas las palabras deben estar en minúsculas.

     more outputs*
  3. Para salir, presiona q.

Ejecuta la canalización en el servicio de Dataflow

Para ejecutar el ejemplo de wordcount actualizado en el servicio de Dataflow, usa el siguiente comando:

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://BUCKET_NAME/results/outputs \
    --runner dataflow \
    --project PROJECT_ID \
    --region DATAFLOW_REGION \
    --staging_location gs://BUCKET_NAME/binaries/

Reemplaza lo siguiente:

  • BUCKET_NAME: el nombre del bucket de Cloud Storage.

  • PROJECT_ID: Es el ID del proyecto de Google Cloud .

  • DATAFLOW_REGION: la región en la que deseas implementar el trabajo de Dataflow. Por ejemplo, europe-west1. Para obtener una lista de las ubicaciones disponibles, consulta Ubicaciones de Dataflow. La marca --region anula la región predeterminada que está configurada en el servidor de metadatos, el cliente local o las variables de entorno.

Ve los resultados

Puedes ver una lista de tus trabajos de Dataflow en laGoogle Cloud consola. En la consola de Google Cloud , ve a la página Trabajos de Dataflow.

Ir a Trabajos

En la página Trabajos, se muestran detalles del trabajo wordcount, incluido un estado En ejecución primero y, luego, Correcto.

Cuando ejecutas una canalización con Dataflow, los resultados se almacenan en un bucket de Cloud Storage. Para ver los resultados, usa la consola deGoogle Cloud o la terminal local.

Console

Para ver los resultados en la consola de Google Cloud , ve a la página Buckets de Cloud Storage.

Ir a Buckets

En la lista de buckets de tu proyecto, haz clic en el bucket de almacenamiento que creaste antes. Los archivos de salida que creó tu trabajo se muestran en el directorio results.

Terminal

Consulta los resultados desde tu terminal o mediante Cloud Shell.

  1. Para enumerar los archivos de salida, usa el comando gcloud storage ls:

    gcloud storage ls gs://BUCKET_NAME/results/outputs* --long

    Reemplaza BUCKET_NAME por el nombre del bucket de Cloud Storage de salida especificado.

  2. Para ver los resultados en los archivos de salida, usa el comando gcloud storage cat:

    gcloud storage cat gs://BUCKET_NAME/results/outputs*

Realiza una limpieza

Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que usaste en esta página, borra el proyecto de Google Cloud que tiene los recursos.

  1. En la Google Cloud consola, ve a la página Buckets de Cloud Storage.

    Ir a Buckets

  2. Haz clic en la casilla de verificación del bucket que deseas borrar.
  3. Para borrar el bucket, haz clic en Borrar y sigue las instrucciones.
  4. Si conservas tu proyecto, revoca los roles que otorgaste a la cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. Opcional: Revoca las credenciales de autenticación que creaste y borra el archivo local de credenciales.

    gcloud auth application-default revoke
  6. Opcional: Revoca credenciales desde gcloud CLI.

    gcloud auth revoke

¿Qué sigue?