监控流水线状态

本页面介绍如何将 Cloud Data Fusion 流水线事件(例如流水线状态)发布到 Pub/Sub 主题。此外,本页面还介绍了如何创建 Cloud Run functions 来处理 Pub/Sub 消息并执行操作,例如识别和重试失败的流水线。

准备工作

所需角色

为了确保 Cloud Data Fusion 服务账号具有将流水线事件发布到 Pub/Sub 主题所需的 权限, 请让您的管理员向 Pub/Sub Publisher (roles/pubsub.publisher) IAM 角色授予 Cloud Data Fusion 服务账号,该服务账号对您创建 Pub/Sub 主题的项目具有服务账号。

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您的管理员还可以通过自定义 角色或其他预定义 角色向 Cloud Data Fusion 服务账号 授予所需的权限。

管理 Cloud Data Fusion 实例中的事件发布

您可以使用 6.7.0 及更高版本中的 REST API 管理新的和现有的 Cloud Data Fusion 实例中的事件发布。

在新实例中发布事件

创建一个新实例并添加 EventPublishConfig 字段。如需详细了解新实例的必填字段,请参阅 实例资源 参考文档。

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
  -d '{
    "version": "VERSION_NUMBER",
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
  }'

替换以下内容:

  • PROJECT_ID: Google Cloud 项目 ID
  • LOCATION:项目的位置
  • INSTANCE_ID:Cloud Data Fusion 实例的 ID
  • VERSION_NUMBER:您在其中创建实例的 Cloud Data Fusion 版本,例如 6.10.1
  • TOPIC_ID:Pub/Sub 主题的 ID

在现有 Cloud Data Fusion 实例中启用事件发布

更新现有 Cloud Data Fusion 实例中的 EventPublishConfig 字段:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
  -d '{
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
}'

替换以下内容:

  • PROJECT_ID: Google Cloud 项目 ID
  • LOCATION:项目 的位置
  • INSTANCE_ID:Cloud Data Fusion 实例的 ID
  • TOPIC_ID:Pub/Sub 主题的 ID

从实例中移除事件发布

如需从实例中移除事件发布,请将事件发布的 enabled 值更新为 false

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
  -d '{ "event_publish_config": { "enabled": false } }'

创建用于读取 Pub/Sub 消息的函数

Cloud Run functions 可以读取 Pub/Sub 消息并对其执行操作,例如重试失败的流水线。如需创建 Cloud Run functions,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往 Cloud Run functions 页面。

    前往 Cloud Run functions

  2. 点击创建函数

  3. 输入函数名称和区域。

  4. 触发器类型 字段中,选择 Cloud Pub/Sub

  5. 输入 Pub/Sub 主题 ID。

  6. 点击下一步

  7. 添加函数以读取 Pub/Sub 消息并执行其他操作。例如,您可以为以下用例添加函数:

    • 发送流水线失败的提醒。
    • 发送 KPI 的提醒,例如记录数或运行信息。
    • 重启尚未重新运行的失败流水线。

    如需查看 Cloud Run functions 示例,请参阅 用例部分。

  8. 点击部署 。 如需了解详情,请参阅部署 Cloud Run 函数

用例:记录流水线状态并重试失败的流水线

以下示例 Cloud Run functions 会读取有关流水线运行状态的 Pub/Sub 消息,然后在 Cloud Data Fusion 中重试失败的流水线。

这些示例函数引用了以下 Google Cloud 组件:

  • Google Cloud **project**:创建 Cloud Run functions 和 Pub/Sub 主题的 项目
  • Pub/Sub topic:与 您的 Cloud Data Fusion 实例关联的 Pub/Sub 主题
  • Cloud Data Fusion instance:您在其中设计和执行流水线的 Cloud Data Fusion 实例
  • BigQuery table:捕获流水线状态以及运行和重新运行详细信息的 BigQuery 表
  • Cloud Run function:您在其中部署 用于重试失败流水线的代码的 Cloud Run functions
  1. 以下 Cloud Run functions 示例会读取有关 Cloud Data Fusion 状态事件的 Pub/Sub 消息。

    # Triggered from a message on a Pub/Sub topic.
    @functions_framework.cloud_event
    def cdf_event_trigger(cloud_event):
    
    decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message.
    pubsub_message = json.loads(decoded_message)
    
    # Extract pipeline run details.
    projectName = pubsub_message["projectName"]
    publishTime = pubsub_message["publishTime"]
    instanceName = pubsub_message["instanceName"]
    namespace = pubsub_message["programStatusEventDetails"]["namespace"]
    applicationName = pubsub_message["programStatusEventDetails"]["applicationName"]
    status = pubsub_message["programStatusEventDetails"]["status"]
    event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms')
    
    print(f"projectName: {projectName}")
    print(f"publishTime: {publishTime}")
    print(f"instanceName: {instanceName}")
    print(f"namespace: {namespace}")
    print(f"applicationName: {applicationName}")
    print(f"status: {status}")
    print(f"event timestamp: {event_timestamp}")
    try:
        error = pubsub_message["programStatusEventDetails"]["error"]
        print(f"error: {error}")
    except:
        print(f"Pipeline: {applicationName}'s current status: {status}")
    
  2. 以下示例函数会创建并保存 BigQuery 表,并查询流水线运行详细信息。

    # Global variables.
    pipeline_rerun_count = 0
    has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes.
    table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information.
    
    # Update BigQuery table with the pipeline status and rerun details.
    schema=[
        bigquery.SchemaField("Project_Name", "STRING"),
        bigquery.SchemaField("Instance_Name", "STRING"),
        bigquery.SchemaField("Namespace", "STRING"),
        bigquery.SchemaField("Pipeline_Name", "STRING"),
        bigquery.SchemaField("Pipeline_Status", "STRING"),
        bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"),
    ]
    
    # Prepare DataFrame to load the data in BigQuery.
    data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]}
    dataframe = pd.DataFrame(data)
    
    # Prepare BigQuery data load job configuration.
    job_config = bigquery.LoadJobConfig(schema=schema)
    
    job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.
    
    table = bq_client.get_table(table_id)  # Make an API request.
    print("BigQuery table: {} updated.".format(table_id))
    
  3. 以下示例函数会检查失败的流水线以及这些流水线在过去一小时内是否重新运行。

    bq_client = bigquery.Client()
    
    if status == "FAILED":
        print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.")
    
        QUERY = f"""
            SELECT * FROM `{table_id}`
            WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED"
            AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE)
            AND Pipeline_Rerun_Count > 0
            """
    
        query_job = bq_client.query_and_wait(QUERY)  # API request.
        row_count = query_job.total_rows  # Waits for query to finish.
        print(f"Query job result row count: {row_count}")
    
        if (row_count > 0):
            print("Pipeline has FAILED and rerun recently...")
            global has_pipeline_failed_and_rerun_recently
            has_pipeline_failed_and_rerun_recently = True
    
  4. 如果失败的流水线最近未运行,则以下示例函数会重新运行失败的流水线。

    if not has_pipeline_failed_and_rerun_recently:
        applicationName = applicationName
        auth_token = get_access_token()
        post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
        cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api"
        run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName)
    
        # Start the job.
        response = requests.post(run_pipeline_endpoint,headers=post_headers)
        print(f"Response for restarting the failed pipeline: {response}")
        global pipeline_rerun_count
        pipeline_rerun_count = 1
    

后续步骤