En esta página, se describe cómo publicar eventos de canalización de Cloud Data Fusion, como el estado de la canalización, en temas de Pub/Sub. También se describe cómo crear funciones de Cloud Run que procesan los mensajes de Pub/Sub y realizan acciones, como identificar y reintentar canalizaciones con errores.
Antes de comenzar
- Crea un tema en el que Pub/Sub pueda publicar eventos de canalización de Cloud Data Fusion.
Roles obligatorios
Para asegurarte de que la cuenta de servicio de Cloud Data Fusion tenga los permisos necesarios para publicar eventos de canalización en un tema de Pub/Sub, pídele a tu administrador que le otorgue el rol de IAM de publicador de Pub/Sub (roles/pubsub.publisher) a la cuenta de servicio de Cloud Data Fusion en el proyecto en el que crees el tema de Pub/Sub.
Es posible que tu administrador también pueda otorgar a la cuenta de servicio de Cloud Data Fusion los permisos necesarios mediante roles personalizados o con otros roles predefinidos.
Administra la publicación de eventos en una instancia de Cloud Data Fusion
Puedes administrar la publicación de eventos en instancias nuevas y existentes de Cloud Data Fusion con la API de REST en las versiones 6.7.0 y posteriores.
Publica eventos en una instancia nueva
Crea una instancia nueva y, luego, incluye el campo EventPublishConfig. Para obtener más
información sobre los campos obligatorios para las instancias nuevas, consulta la
referencia
del recurso Instances.
curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
"https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
-d '{
"version": "VERSION_NUMBER",
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Reemplaza lo siguiente:
PROJECT_ID: el Google Cloud ID del proyectoLOCATION: la ubicación de tu proyectoINSTANCE_ID: el ID de tu instancia de Cloud Data FusionVERSION_NUMBER: la versión de Cloud Data Fusion en la que creas la instancia, por ejemplo,6.10.1TOPIC_ID: el ID del tema de Pub/Sub
Habilita la publicación de eventos en una instancia existente de Cloud Data Fusion
Actualiza el
EventPublishConfig
campo en una instancia existente de Cloud Data Fusion:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
-d '{
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Reemplaza lo siguiente:
PROJECT_ID: el Google Cloud ID del proyectoLOCATION: la ubicación de tu proyectoINSTANCE_ID: el ID de tu instancia de Cloud Data FusionTOPIC_ID: el ID del tema de Pub/Sub
Quita la publicación de eventos de una instancia
Para quitar la publicación de eventos de una instancia, actualiza el valor enabled de la publicación de eventos a false:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
-d '{ "event_publish_config": { "enabled": false } }'
Crea funciones para leer mensajes de Pub/Sub
Las funciones de Cloud Run pueden leer mensajes de Pub/Sub y realizar acciones en ellos, como reintentar canalizaciones con errores. Para crear una función de Cloud Run, haz lo siguiente:
En la Google Cloud consola, ve a la páginaFunciones de Cloud Run.
Haz clic en Crear función.
Ingresa un nombre de función y una región.
En el campo Tipo de activador, selecciona Cloud Pub/Sub.
Ingresa el ID del tema de Pub/Sub.
Haz clic en Siguiente.
Agrega funciones para leer los mensajes de Pub/Sub y realizar otras acciones. Por ejemplo, puedes agregar funciones para los siguientes casos de uso:
- Enviar alertas por fallas en la canalización
- Enviar alertas para KPIs, como el recuento de registros o la información de ejecución
- Reiniciar una canalización con errores que no se volvió a ejecutar
Para obtener ejemplos de funciones de Cloud Run, consulta la sección de casos de uso.
Haz clic en Implementar. Para obtener más información, consulta Implementa una función de Cloud Run.
Caso de uso: Documenta el estado de la canalización y vuelve a intentar las canalizaciones con errores
Las siguientes funciones de Cloud Run de ejemplo leen mensajes de Pub/Sub sobre el estado de ejecución de la canalización y, luego, vuelven a intentar las canalizaciones con errores en Cloud Data Fusion.
Estas funciones de ejemplo hacen referencia a los siguientes Google Cloud componentes:
- Google Cloud project: el proyecto en el que se crean las funciones de Cloud Run y los temas de Pub/Sub
- Tema de Pub/Sub: el tema de Pub/Sub vinculado a tu instancia de Cloud Data Fusion
- Instancia de Cloud Data Fusion: la instancia de Cloud Data Fusion en la que diseñas y ejecutas canalizaciones
- Tabla de BigQuery: la tabla de BigQuery que captura el estado de la canalización y los detalles de ejecución y de reejecución
- Función de Cloud Run: la función de Cloud Run en la que implementas el código que vuelve a intentar las canalizaciones con errores
En el siguiente ejemplo de función de Cloud Run, se leen los mensajes de Pub/Sub sobre los eventos de estado de Cloud Data Fusion.
# Triggered from a message on a Pub/Sub topic. @functions_framework.cloud_event def cdf_event_trigger(cloud_event): decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message. pubsub_message = json.loads(decoded_message) # Extract pipeline run details. projectName = pubsub_message["projectName"] publishTime = pubsub_message["publishTime"] instanceName = pubsub_message["instanceName"] namespace = pubsub_message["programStatusEventDetails"]["namespace"] applicationName = pubsub_message["programStatusEventDetails"]["applicationName"] status = pubsub_message["programStatusEventDetails"]["status"] event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms') print(f"projectName: {projectName}") print(f"publishTime: {publishTime}") print(f"instanceName: {instanceName}") print(f"namespace: {namespace}") print(f"applicationName: {applicationName}") print(f"status: {status}") print(f"event timestamp: {event_timestamp}") try: error = pubsub_message["programStatusEventDetails"]["error"] print(f"error: {error}") except: print(f"Pipeline: {applicationName}'s current status: {status}")En la siguiente función de ejemplo, se crea y guarda una tabla de BigQuery, y se consultan los detalles de ejecución de la canalización.
# Global variables. pipeline_rerun_count = 0 has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes. table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information. # Update BigQuery table with the pipeline status and rerun details. schema=[ bigquery.SchemaField("Project_Name", "STRING"), bigquery.SchemaField("Instance_Name", "STRING"), bigquery.SchemaField("Namespace", "STRING"), bigquery.SchemaField("Pipeline_Name", "STRING"), bigquery.SchemaField("Pipeline_Status", "STRING"), bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"), bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"), ] # Prepare DataFrame to load the data in BigQuery. data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]} dataframe = pd.DataFrame(data) # Prepare BigQuery data load job configuration. job_config = bigquery.LoadJobConfig(schema=schema) job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config) job.result() # Wait for the job to complete. table = bq_client.get_table(table_id) # Make an API request. print("BigQuery table: {} updated.".format(table_id))En la siguiente función de ejemplo, se buscan las canalizaciones que fallaron y si se volvieron a ejecutar en la última hora.
bq_client = bigquery.Client() if status == "FAILED": print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.") QUERY = f""" SELECT * FROM `{table_id}` WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED" AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE) AND Pipeline_Rerun_Count > 0 """ query_job = bq_client.query_and_wait(QUERY) # API request. row_count = query_job.total_rows # Waits for query to finish. print(f"Query job result row count: {row_count}") if (row_count > 0): print("Pipeline has FAILED and rerun recently...") global has_pipeline_failed_and_rerun_recently has_pipeline_failed_and_rerun_recently = TrueSi la canalización con errores no se ejecutó recientemente, la siguiente función de ejemplo la vuelve a ejecutar.
if not has_pipeline_failed_and_rerun_recently: applicationName = applicationName auth_token = get_access_token() post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"} cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api" run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName) # Start the job. response = requests.post(run_pipeline_endpoint,headers=post_headers) print(f"Response for restarting the failed pipeline: {response}") global pipeline_rerun_count pipeline_rerun_count = 1
¿Qué sigue?
- Obtén información para escribir funciones de Cloud Run.