En este instructivo, crearás una canalización de transmisión de Dataflow que transforma los datos de comercio electrónico de los temas y las suscripciones de Pub/Sub y envía los datos a BigQuery y Bigtable. Este instructivo requiere Gradle.
En el instructivo, se proporciona una aplicación de ejemplo de comercio electrónico de extremo a extremo que transmite datos de una tienda web a BigQuery y Bigtable. En la aplicación de muestra, se ilustran casos de uso comunes y prácticas recomendadas para implementar las estadísticas de datos de transmisión y la inteligencia artificial (IA) en tiempo real. Usa este instructivo para aprender a responder de forma dinámica a las acciones de los clientes a fin de analizar eventos y reaccionar ante ellos en tiempo real. En este instructivo, se describe cómo almacenar, analizar y visualizar datos de eventos para obtener más estadísticas sobre el comportamiento de los clientes.
La aplicación de muestra está disponible en GitHub. Para ejecutar este instructivo mediante Terraform, sigue los pasos proporcionados con la aplicación de muestra en GitHub.
Objetivos
- Validar los datos de entrada y aplicar correcciones cuando sea posible
- Analizar los datos de flujo de clics para mantener un recuento de la cantidad de vistas por producto en un período determinado. Almacena esta información en un almacén de latencia baja. La aplicación puede usar los datos para proporcionar mensajes sobre la cantidad de personas que vieron este producto a los clientes en el sitio web.
Usar los datos de transacciones para informar los pedidos de inventario:
- Analizar los datos de las transacciones para calcular la cantidad total de ventas de cada artículo, por tienda y a nivel global, durante un período determinado
- Analizar los datos de inventario a fin de calcular el inventario entrante para cada elemento
- Pasar estos datos a los sistemas de inventario de forma continua para poder usarlos en la toma de decisiones de compra de inventario.
Validar los datos de entrada y aplicar correcciones cuando sea posible Escribir los datos que no se puedan corregir en una cola de mensajes no entregados para su análisis y procesamiento adicionales. Haz una métrica que represente el porcentaje de datos entrante que se envía a la cola de mensajes no entregados disponibles para supervisar y crear alertas
Procesar todos los datos de entrada en un formato estándar y almacenarlos en un almacén de datos para usarlos en próximos análisis y visualizaciones
Desnormalizar los datos de transacciones para las ventas en la tienda a fin de que puedan incluir información como la latitud y la longitud de la ubicación de la tienda. Para proporcionar la información de la tienda mediante una tabla que cambia con lentitud en BigQuery, usa el ID de tienda como clave.
Datos
La aplicación procesa los siguientes tipos de datos:
- Datos de flujo de clics que se envían mediante sistemas en línea a Pub/Sub.
- Datos de transacciones que se enviarán mediante sistemas locales o de software como servicio (SaaS) a Pub/Sub.
- Datos de archivo que se envían mediante sistemas locales o SaaS a Pub/Sub.
Patrones de la tarea
La aplicación contiene los siguientes patrones de tareas comunes a las canalizaciones compiladas con el SDK de Apache Beam para Java:
- Esquemas de Apache Beam para trabajar con datos estructurados
JsonToRowpara convertir datos JSON- El generador de código
AutoValuepara generar objetos antiguos y sin formato basados en Java (POJOs) - Pon en cola datos que no se pueden procesar para su análisis posterior
- Transformaciones de validación de datos en serie
DoFn.StartBundlea llamadas por microlotes a servicios externos- Patrones de entradas complementarias
Costos
En este documento, usarás los siguientes componentes facturables de Google Cloud:
- BigQuery
- Bigtable
- Cloud Scheduler
- Compute Engine
- Dataflow
- Pub/Sub
Para generar una estimación de costos en función del uso previsto,
usa la calculadora de precios.
Cuando completes las tareas que se describen en este documento, podrás borrar los recursos que creaste para evitar que se te siga facturando. Para obtener más información, consulta Realiza una limpieza.
Antes de comenzar
- Accede a tu cuenta de Google Cloud . Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
-
Instala Google Cloud CLI.
-
Si usas un proveedor de identidad externo (IdP), primero debes acceder a la gcloud CLI con tu identidad federada.
-
Para inicializar gcloud CLI, ejecuta el siguiente comando:
gcloud init -
Crea o selecciona un Google Cloud proyecto.
Roles necesarios para seleccionar o crear un proyecto
- Selecciona un proyecto: Para seleccionar un proyecto, no se requiere un rol de IAM específico. Puedes seleccionar cualquier proyecto en el que se te haya otorgado un rol.
-
Crear un proyecto: Para crear un proyecto, necesitas el rol de Creador de proyectos (
roles/resourcemanager.projectCreator), que contiene el permisoresourcemanager.projects.create. Obtén más información para otorgar roles.
-
Crea un proyecto de Google Cloud :
gcloud projects create PROJECT_ID
Reemplaza
PROJECT_IDpor un nombre para el proyecto Google Cloud que estás creando. -
Selecciona el proyecto Google Cloud que creaste:
gcloud config set project PROJECT_ID
Reemplaza
PROJECT_IDpor el nombre de tu Google Cloud proyecto.
-
Verifica que la facturación esté habilitada para tu proyecto de Google Cloud .
Habilita las APIs de Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin y Cloud Scheduler:
Roles necesarios para habilitar las APIs
Para habilitar las APIs, necesitas el rol de IAM de administrador de Service Usage (
roles/serviceusage.serviceUsageAdmin), que contiene el permisoserviceusage.services.enable. Obtén más información para otorgar roles.gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Crea credenciales de autenticación locales para tu cuenta de usuario:
gcloud auth application-default login
Si se devuelve un error de autenticación y usas un proveedor de identidad (IdP) externo, confirma que accediste a la gcloud CLI con tu identidad federada.
-
Otorga roles a tu cuenta de usuario. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:
roles/iam.serviceAccountUsergcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Reemplaza lo siguiente:
PROJECT_ID: ID del proyectoUSER_IDENTIFIER: Es el identificador de tu cuenta de usuario de . Por ejemplo,myemail@example.com.ROLE: Es el rol de IAM que otorgas a tu cuenta de usuario.
-
Instala Google Cloud CLI.
-
Si usas un proveedor de identidad externo (IdP), primero debes acceder a la gcloud CLI con tu identidad federada.
-
Para inicializar gcloud CLI, ejecuta el siguiente comando:
gcloud init -
Crea o selecciona un Google Cloud proyecto.
Roles necesarios para seleccionar o crear un proyecto
- Selecciona un proyecto: Para seleccionar un proyecto, no se requiere un rol de IAM específico. Puedes seleccionar cualquier proyecto en el que se te haya otorgado un rol.
-
Crear un proyecto: Para crear un proyecto, necesitas el rol de Creador de proyectos (
roles/resourcemanager.projectCreator), que contiene el permisoresourcemanager.projects.create. Obtén más información para otorgar roles.
-
Crea un proyecto de Google Cloud :
gcloud projects create PROJECT_ID
Reemplaza
PROJECT_IDpor un nombre para el proyecto Google Cloud que estás creando. -
Selecciona el proyecto Google Cloud que creaste:
gcloud config set project PROJECT_ID
Reemplaza
PROJECT_IDpor el nombre de tu Google Cloud proyecto.
-
Verifica que la facturación esté habilitada para tu proyecto de Google Cloud .
Habilita las APIs de Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin y Cloud Scheduler:
Roles necesarios para habilitar las APIs
Para habilitar las APIs, necesitas el rol de IAM de administrador de Service Usage (
roles/serviceusage.serviceUsageAdmin), que contiene el permisoserviceusage.services.enable. Obtén más información para otorgar roles.gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Crea credenciales de autenticación locales para tu cuenta de usuario:
gcloud auth application-default login
Si se devuelve un error de autenticación y usas un proveedor de identidad (IdP) externo, confirma que accediste a la gcloud CLI con tu identidad federada.
-
Otorga roles a tu cuenta de usuario. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:
roles/iam.serviceAccountUsergcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Reemplaza lo siguiente:
PROJECT_ID: ID del proyectoUSER_IDENTIFIER: Es el identificador de tu cuenta de usuario de . Por ejemplo,myemail@example.com.ROLE: Es el rol de IAM que otorgas a tu cuenta de usuario.
Crea una cuenta de servicio de trabajador administrado por el usuario para tu canalización nueva y otórgale los roles necesarios.
Para crear la cuenta de servicio, ejecuta el comando
gcloud iam service-accounts create:gcloud iam service-accounts create retailpipeline \ --description="Retail app data pipeline worker service account" \ --display-name="Retail app data pipeline access"
Otorga roles a la cuenta de servicio. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:
roles/dataflow.adminroles/dataflow.workerroles/pubsub.editorroles/bigquery.dataEditorroles/bigtable.adminroles/bigquery.jobUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
Reemplaza
SERVICE_ACCOUNT_ROLEpor cada rol individual.Otorga a tu Cuenta de Google un rol que te permita crear tokens de acceso para la cuenta de servicio:
gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
- Si es necesario, descarga e instala Gradle.
Crea las fuentes y los receptores de ejemplo
En esta sección, se explica cómo crear lo siguiente:
- Un bucket de Cloud Storage para usar como ubicación de almacenamiento temporal
- Fuentes de datos de transmisión con Pub/Sub
- Conjuntos de datos para cargar los datos en BigQuery
- Una instancia de Bigtable
Crea un bucket de Cloud Storage
Primero, crea un bucket de Cloud Storage. La canalización de Dataflow usa este bucket como ubicación de almacenamiento temporal.
Usa el comando gcloud storage buckets create:
gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
Reemplaza lo siguiente:
- BUCKET_NAME: Es un nombre para tu bucket de Cloud Storage que cumple con los requisitos de nombres de buckets. Los nombres de buckets de Cloud Storage deben ser únicos a nivel global.
- LOCATION: la ubicación del bucket.
Crea temas y suscripciones de Pub/Sub
Crea cuatro temas de Pub/Sub y, a continuación, crea tres suscripciones.
Para crear tus temas, ejecuta el comando
gcloud pubsub topics create
una vez por cada tema. Para obtener información sobre cómo asignar un nombre a una suscripción, consulta los Lineamientos para asignar un nombre a un tema o una suscripción.
gcloud pubsub topics create TOPIC_NAME
Reemplaza TOPIC_NAME por los siguientes valores y ejecuta el comando cuatro veces, una vez por cada tema:
Clickstream-inboundTransactions-inboundInventory-inboundInventory-outbound
Para crear una suscripción a un tema, ejecuta el comando
gcloud pubsub subscriptions create
una vez por cada suscripción:
Crea una suscripción
Clickstream-inbound-sub:gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-subCrea una suscripción
Transactions-inbound-sub:gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-subCrea una suscripción
Inventory-inbound-sub:gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
Crea conjuntos de datos y una tabla de BigQuery
Crea un conjunto de datos de BigQuery y una tabla particionada con el esquema adecuado para tu tema de Pub/Sub.
Usa el comando
bq mkpara crear el primer conjunto de datos.bq --location=US mk \ PROJECT_ID:Retail_StoreCrea el segundo conjunto de datos.
bq --location=US mk \ PROJECT_ID:Retail_Store_AggregationsUsa la instrucción de SQL CREATE TABLE para crear una tabla con un esquema y datos de prueba. Los datos de prueba tienen un almacén con un valor de ID de
1. El patrón de entrada complementaria de actualización lenta usa esta tabla.bq query --use_legacy_sql=false \ 'CREATE TABLE Retail_Store.Store_Locations ( id INT64, city STRING, state STRING, zip INT64 ); INSERT INTO Retail_Store.Store_Locations VALUES (1, "a_city", "a_state",00000);'
Crea una instancia y una tabla de Bigtable
Crea una instancia y una tabla de Bigtable. Para obtener más información sobre cómo crear instancias de Bigtable, consulta Crea una instancia.
Si es necesario, ejecuta el siguiente comando para instalar la CLI de
cbt:gcloud components install cbtUsa el comando
bigtable instances createpara crear una instancia:gcloud bigtable instances create aggregate-tables \ --display-name=aggregate-tables \ --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1Reemplaza CLUSTER_ZONE por la zona donde se ejecuta el clúster.
Usa el comando
cbt createtablepara crear una tabla permanente.cbt -instance=aggregate-tables createtable PageView5MinAggregatesUsa el siguiente comando para agregar una familia de columnas a la tabla:
cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
Ejecute la canalización
Usa Gradle para ejecutar una canalización de transmisión. Para ver el código Java que usa la canalización, consulta RetailDataProcessingPipeline.java.
Usa el comando
git clonepara clonar el repositorio de GitHub:git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.gitCambia al directorio de la aplicación:
cd dataflow-sample-applications/retail/retail-java-applicationsPara probar la canalización, en tu shell o terminal, ejecuta el siguiente comando con Gradle:
./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasksPara ejecutar la canalización, ejecuta el siguiente comando con Gradle:
./gradlew tasks executeOnDataflow -Dexec.args=" \ --project=PROJECT_ID \ --tempLocation=gs://BUCKET_NAME/temp/ \ --runner=DataflowRunner \ --region=REGION \ --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \ --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \ --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \ --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \ --dataWarehouseOutputProject=PROJECT_ID \ --serviceAccount=retailpipeline.PROJECT_ID.iam.gserviceaccount.com"
Consulta el código fuente de la canalización en GitHub.
Crea y ejecuta trabajos de Cloud Scheduler
Crea y ejecuta tres trabajos de Cloud Scheduler: uno que publique datos de flujo de clics, uno para datos de inventario y otro para datos de transacciones. En este paso, se generan datos de muestra para la canalización.
Si quieres crear un trabajo de Cloud Scheduler para este instructivo, usa el comando
gcloud scheduler jobs create. En este paso, se crea un publicador para los datos de flujo de clics que publica un mensaje por minuto.gcloud scheduler jobs create pubsub clickstream \ --schedule="* * * * *" \ --location=LOCATION \ --topic="Clickstream-inbound" \ --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'Para iniciar el trabajo de Cloud Scheduler, usa el comando
gcloud scheduler jobs run.gcloud scheduler jobs run --location=LOCATION clickstreamCrea y ejecuta otro publicador similar para los datos de inventario que publique un mensaje cada dos minutos.
gcloud scheduler jobs create pubsub inventory \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Inventory-inbound" \ --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'Inicia el segundo trabajo de Cloud Scheduler.
gcloud scheduler jobs run --location=LOCATION inventoryCrea y ejecuta un tercer publicador para datos de transacciones que publique un mensaje cada dos minutos.
gcloud scheduler jobs create pubsub transactions \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Transactions-inbound" \ --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'Inicia el tercer trabajo de Cloud Scheduler.
gcloud scheduler jobs run --location=LOCATION transactions
Ve los resultados
Visualiza los datos escritos en tus tablas de BigQuery. Ejecuta las siguientes consultas para verificar los resultados en BigQuery: Mientras esta canalización se está ejecutando, puedes ver filas nuevas agregadas a las tablas de BigQuery cada minuto.
Es posible que debas esperar a que las tablas se propaguen con los datos.
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'
Libera espacio
Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos usados en este instructivo, borra el proyecto que contiene los recursos o conserva el proyecto y borra los recursos individuales.
Borra el proyecto
La manera más fácil de eliminar la facturación es borrar el Google Cloud proyecto que creaste para el instructivo.
- En la Google Cloud consola, ve a la página Administrar recursos.
- En la lista de proyectos, elige el proyecto que quieres borrar y haz clic en Borrar.
- En el diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.
Borra los recursos individuales
Si deseas volver a usar el proyecto, borra los recursos que creaste para el instructivo.
Limpia los Google Cloud recursos del proyecto
Para borrar los trabajos de Cloud Scheduler, usa el comando
gcloud scheduler jobs delete.gcloud scheduler jobs delete transactions --location=LOCATIONgcloud scheduler jobs delete inventory --location=LOCATIONgcloud scheduler jobs delete clickstream --location=LOCATIONPara borrar las suscripciones y los temas de Pub/Sub, usa los comandos
gcloud pubsub subscriptions deleteygcloud pubsub topics delete.gcloud pubsub subscriptions delete SUBSCRIPTION_NAME gcloud pubsub topics delete TOPIC_NAMEPara borrar la tabla de BigQuery, usa el comando
bq rm.bq rm -f -t PROJECT_ID:Retail_Store.Store_LocationsBorra los conjuntos de datos de BigQuery. El conjunto de datos por sí solo no genera cargos.
bq rm -r -f -d PROJECT_ID:Retail_Storebq rm -r -f -d PROJECT_ID:Retail_Store_AggregationsPara borrar la instancia de Bigtable, usa el comando
cbt deleteinstance. El bucket por sí solo no genera cargos.cbt deleteinstance aggregate-tablesPara borrar el bucket de Cloud Storage y sus objetos, usa el comando
gcloud storage rm. El bucket por sí solo no genera cargos.gcloud storage rm gs://BUCKET_NAME --recursive
Revoca credenciales
Revoca los roles que otorgaste a la cuenta de servicio de trabajador administrada por el usuario. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:
roles/dataflow.adminroles/dataflow.workerroles/pubsub.editorroles/bigquery.dataEditorroles/bigtable.adminroles/bigquery.jobUser
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \ --role=ROLE
-
Opcional: Revoca las credenciales de autenticación que creaste y borra el archivo local de credenciales.
gcloud auth application-default revoke
-
Opcional: Revoca credenciales desde gcloud CLI.
gcloud auth revoke
¿Qué sigue?
- Consulta la aplicación de muestra en GitHub.
- Lee la entrada de blog relacionada Obtén información sobre los patrones de Beam con el procesamiento de Clickstream de los datos de Google Tag Manager.
- Lee sobre el uso de Pub/Sub para crear y usar temas y cómo Usar suscripciones.
- Lee sobre el uso de BigQuery para crear conjuntos de datos.
- Explora arquitecturas de referencia, diagramas y prácticas recomendadas sobre Google Cloud. Consulta nuestro Cloud Architecture Center.