Transmite mensajes desde Pub/Sub con Dataflow y Cloud Storage
Dataflow es un servicio completamente administrado para transformar y enriquecer datos en modos de transmisión (en tiempo real) y por lotes con la misma confiabilidad y expresividad. Proporciona un entorno de desarrollo de canalización simplificado con el SDK de Apache Beam, que tiene un conjunto amplio de primitivas de análisis de sesiones y sistemas de ventanas, además de un ecosistema de conectores fuente y receptores. En esta guía de inicio rápido, se muestra cómo usar Dataflow para realizar las siguientes acciones:
- Leer mensajes publicados en un tema de Pub/Sub
- Mostrar mensajes en ventanas, o agruparlos, por marca de tiempo
- Escribir mensajes a Cloud Storage
En esta guía de inicio rápido, se explica el uso de Dataflow en Java y Python. SQL también es compatible. Esta guía de inicio rápido también se ofrece como un instructivo de Google Cloud Skills Boost, que proporciona credenciales temporales para ayudarte a comenzar.
Si tu intención no es realizar un procesamiento de datos personalizado, puedes comenzar a usar las plantillas de Dataflow basadas en IU.
Antes de comenzar
- Accede a tu cuenta de Google Cloud . Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
-
Instala Google Cloud CLI.
-
Si usas un proveedor de identidad externo (IdP), primero debes acceder a la gcloud CLI con tu identidad federada.
-
Para inicializar gcloud CLI, ejecuta el siguiente comando:
gcloud init -
Crea o selecciona un Google Cloud proyecto.
Roles necesarios para seleccionar o crear un proyecto
- Selecciona un proyecto: Para seleccionar un proyecto, no se requiere un rol de IAM específico. Puedes seleccionar cualquier proyecto en el que se te haya otorgado un rol.
-
Crear un proyecto: Para crear un proyecto, necesitas el rol de Creador de proyectos (
roles/resourcemanager.projectCreator), que contiene el permisoresourcemanager.projects.create. Obtén más información para otorgar roles.
-
Crea un proyecto de Google Cloud :
gcloud projects create PROJECT_ID
Reemplaza
PROJECT_IDpor un nombre para el proyecto Google Cloud que estás creando. -
Selecciona el proyecto Google Cloud que creaste:
gcloud config set project PROJECT_ID
Reemplaza
PROJECT_IDpor el nombre de tu Google Cloud proyecto.
-
Verifica que la facturación esté habilitada para tu proyecto de Google Cloud .
Habilita las APIs de Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager y Cloud Scheduler:
Roles necesarios para habilitar las APIs
Para habilitar las APIs, necesitas el rol de IAM de administrador de Service Usage (
roles/serviceusage.serviceUsageAdmin), que contiene el permisoserviceusage.services.enable. Obtén más información para otorgar roles.gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Configura la autenticación:
-
Asegúrate de tener los roles de IAM de creador de cuentas de servicio (
roles/iam.serviceAccountCreator) y administrador de IAM del proyecto (roles/resourcemanager.projectIamAdmin). Obtén más información para otorgar roles. -
Crea la cuenta de servicio:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Reemplaza
SERVICE_ACCOUNT_NAMEpor un nombre para la cuenta de servicio. -
Otorga roles a la cuenta de servicio. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Reemplaza lo siguiente:
SERVICE_ACCOUNT_NAME: el nombre de la cuenta de servicioPROJECT_ID: el ID del proyecto en el que creaste la cuenta de servicioROLE: el rol a otorgar
-
Otorga el rol requerido a la principal que conectará la cuenta de servicio a otros recursos.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Reemplaza lo siguiente:
SERVICE_ACCOUNT_NAME: el nombre de la cuenta de servicioPROJECT_ID: el ID del proyecto en el que creaste la cuenta de servicioUSER_EMAIL: La dirección de correo electrónico de una Cuenta de Google
-
Asegúrate de tener los roles de IAM de creador de cuentas de servicio (
-
Instala Google Cloud CLI.
-
Si usas un proveedor de identidad externo (IdP), primero debes acceder a la gcloud CLI con tu identidad federada.
-
Para inicializar gcloud CLI, ejecuta el siguiente comando:
gcloud init -
Crea o selecciona un Google Cloud proyecto.
Roles necesarios para seleccionar o crear un proyecto
- Selecciona un proyecto: Para seleccionar un proyecto, no se requiere un rol de IAM específico. Puedes seleccionar cualquier proyecto en el que se te haya otorgado un rol.
-
Crear un proyecto: Para crear un proyecto, necesitas el rol de Creador de proyectos (
roles/resourcemanager.projectCreator), que contiene el permisoresourcemanager.projects.create. Obtén más información para otorgar roles.
-
Crea un proyecto de Google Cloud :
gcloud projects create PROJECT_ID
Reemplaza
PROJECT_IDpor un nombre para el proyecto Google Cloud que estás creando. -
Selecciona el proyecto Google Cloud que creaste:
gcloud config set project PROJECT_ID
Reemplaza
PROJECT_IDpor el nombre de tu Google Cloud proyecto.
-
Verifica que la facturación esté habilitada para tu proyecto de Google Cloud .
Habilita las APIs de Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager y Cloud Scheduler:
Roles necesarios para habilitar las APIs
Para habilitar las APIs, necesitas el rol de IAM de administrador de Service Usage (
roles/serviceusage.serviceUsageAdmin), que contiene el permisoserviceusage.services.enable. Obtén más información para otorgar roles.gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Configura la autenticación:
-
Asegúrate de tener los roles de IAM de creador de cuentas de servicio (
roles/iam.serviceAccountCreator) y administrador de IAM del proyecto (roles/resourcemanager.projectIamAdmin). Obtén más información para otorgar roles. -
Crea la cuenta de servicio:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Reemplaza
SERVICE_ACCOUNT_NAMEpor un nombre para la cuenta de servicio. -
Otorga roles a la cuenta de servicio. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Reemplaza lo siguiente:
SERVICE_ACCOUNT_NAME: el nombre de la cuenta de servicioPROJECT_ID: el ID del proyecto en el que creaste la cuenta de servicioROLE: el rol a otorgar
-
Otorga el rol requerido a la principal que conectará la cuenta de servicio a otros recursos.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Reemplaza lo siguiente:
SERVICE_ACCOUNT_NAME: el nombre de la cuenta de servicioPROJECT_ID: el ID del proyecto en el que creaste la cuenta de servicioUSER_EMAIL: La dirección de correo electrónico de una Cuenta de Google
-
Asegúrate de tener los roles de IAM de creador de cuentas de servicio (
-
Crea credenciales de autenticación locales para tu cuenta de usuario:
gcloud auth application-default login
Si se devuelve un error de autenticación y usas un proveedor de identidad (IdP) externo, confirma que accediste a la gcloud CLI con tu identidad federada.
Configura tu proyecto de Pub/Sub
-
Crea variables para tu bucket, tu proyecto y la región. Los nombres de depósitos de Cloud Storage deben ser únicos a nivel global. Selecciona una región de Dataflow cercana a donde ejecutas los comandos de esta guía de inicio rápido. El valor de la variable
REGIONdebe ser un nombre de región válido. Para obtener más información acerca de las regiones y ubicaciones, consulta Ubicaciones de Dataflow.BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
-
Crea un bucket de Cloud Storage que sea propiedad de este proyecto:
gcloud storage buckets create gs://$BUCKET_NAME
-
Crea un tema de Pub/Sub en este proyecto:
gcloud pubsub topics create $TOPIC_ID
-
Crea un trabajo de Cloud Scheduler en este proyecto. El trabajo publica un mensaje en un tema de Pub/Sub con intervalos de un minuto.
Si una app de App Engine no existe en el proyecto, crea una con el siguiente comando.
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
Inicia el trabajo.
gcloud scheduler jobs run publisher-job --location=$REGION
-
Usa los siguientes comandos para clonar el repositorio de la guía de inicio rápido y navega al directorio de código de muestra:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Transmite mensajes desde Pub/Sub a Cloud Storage
Muestra de código
En este código de muestra, se usa Dataflow para realizar las siguientes acciones:
- Leer mensajes de Pub/Sub
- Mostrar mensajes en ventanas, o agruparlos, en intervalos de tamaño fijo con marcas de tiempo públicas
Escribir los mensajes en cada ventana en archivos en Cloud Storage
Java
Python
Comienza la canalización
Para iniciar la canalización, ejecuta el siguiente comando:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
El comando anterior se ejecuta de manera local y, luego, inicia un trabajo de Dataflow que se ejecuta en la nube. Cuando el comando muestre JOB_MESSAGE_DETAILED: Workers
have started successfully, sal del programa local con Ctrl+C.
Observa el progreso del trabajo y la canalización
Puedes observar el progreso del trabajo en la consola de Dataflow.
Abre la vista de detalles de trabajos para ver lo siguiente:
- Estructura del trabajo
- Registros del trabajo
- Métricas de etapas
Puede que debas esperar unos minutos para ver los archivos de salida en Cloud Storage.
También puedes usar la línea de comandos que se muestra a continuación para verificar qué archivos se escribieron.
gcloud storage ls gs://${BUCKET_NAME}/samples/
El resultado debe tener el siguiente aspecto:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1Realiza una limpieza
Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que usaste en esta página, borra el proyecto de Google Cloud que tiene los recursos.
Borra el trabajo de Cloud Scheduler.
gcloud scheduler jobs delete publisher-job --location=$REGION
En la consola de Dataflow, detén el trabajo. Cancela la canalización sin desviarla.
Borra el tema.
gcloud pubsub topics delete $TOPIC_ID
Borra los archivos que se crearon con la canalización.
gcloud storage rm "gs://${BUCKET_NAME}/samples/output*" --recursive --continue-on-error gcloud storage rm "gs://${BUCKET_NAME}/temp/*" --recursive --continue-on-error
Quita el bucket de Cloud Storage.
gcloud storage rm gs://${BUCKET_NAME} --recursive
-
Borra la cuenta de servicio:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Opcional: Revoca las credenciales de autenticación que creaste y borra el archivo local de credenciales.
gcloud auth application-default revoke
-
Opcional: Revoca credenciales desde gcloud CLI.
gcloud auth revoke
¿Qué sigue?
Si deseas mostrar los mensajes de Pub/Sub en una marca de tiempo personalizada, puedes especificar la marca de tiempo como un atributo en el mensaje de Pub/Sub y, luego, usar la marca de tiempo personalizada con el comando
withTimestampAttributede PubsubIO.Observa las plantillas de código abierto de Dataflow diseñadas para la transmisión de Google.
Obtén más información sobre cómo Dataflow se integra com Pub/Sub.
Consulta este instructivo que lee desde Pub/Sub y escribe en BigQuery mediante las plantillas flexibles de Dataflow.
Para obtener más información sobre el sistema de ventanas, consulta el ejemplo en la página sobre canalización de videojuegos para dispositivos móviles de Apache Beam.