このページでは、パイプラインのステータスなどの Cloud Data Fusion パイプライン イベントを Pub/Sub トピックに公開する方法について説明します。また、Pub/Sub メッセージを処理して、失敗したパイプラインの特定と再試行などのアクションを行う Cloud Run functions の作成方法についても説明します。
始める前に
必要なロール
Cloud Data Fusion サービス アカウントに Pub/Sub トピックにパイプライン イベントをパブリッシュするために必要な
権限を確実に付与するには、
Pub/Sub トピックを作成するプロジェクトに対する
Pub/Sub パブリッシャー (roles/pubsub.publisher)IAM ロールを Cloud Data Fusion サービス アカウントに付与するよう管理者に依頼してください。
管理者は、カスタムロールや他の事前定義ロールから、Cloud Data Fusion サービス アカウントに必要な権限を付与することもできます。
Cloud Data Fusion インスタンスでイベント公開を管理する
バージョン 6.7.0 以降の REST API を使用して、新規および既存の Cloud Data Fusion インスタンスでイベント公開を管理できます。
新しいインスタンスでイベントを公開する
新しいインスタンスを作成し、EventPublishConfig フィールドを追加します。新しいインスタンスに必要なフィールドの詳細については、
インスタンス リソース
のリファレンスをご覧ください。
curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
"https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
-d '{
"version": "VERSION_NUMBER",
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
以下を置き換えます。
PROJECT_ID: Google Cloud プロジェクト IDLOCATION: プロジェクトの場所INSTANCE_ID: Cloud Data Fusion インスタンスの ID。VERSION_NUMBER: インスタンスを作成する Cloud Data Fusion のバージョン(例:6.10.1)TOPIC_ID: Pub/Sub トピックの ID。
既存の Cloud Data Fusion インスタンスでイベント公開を有効にする
既存の Cloud Data Fusion インスタンスで
EventPublishConfig
フィールドを更新します。
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
-d '{
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
以下を置き換えます。
PROJECT_ID: Google Cloud プロジェクト IDLOCATION: プロジェクトの場所INSTANCE_ID: Cloud Data Fusion インスタンスの ID。TOPIC_ID: Pub/Sub トピックの ID。
インスタンスからイベント公開を削除する
インスタンスからイベント公開を削除するには、イベント公開の enabled 値を false に更新します。
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
-d '{ "event_publish_config": { "enabled": false } }'
Pub/Sub メッセージを読み取る関数を作成する
Cloud Run functions は、Pub/Sub メッセージを読み取って、失敗したパイプラインの再試行などのアクションを実行できます。Cloud Run functions を作成するには、次のようにします。
コンソールで、Cloud Run functions ページに移動します。 Google Cloud
[関数を作成] をクリックします。
関数の名前とリージョンを入力します。
[トリガー] フィールドで [Cloud Pub/Sub] を選択します。
Pub/Sub トピック ID を入力します。
[次へ] をクリックします。
Pub/Sub メッセージを読み取ってその他のアクションを実行する関数を追加します。たとえば、次のユースケースの関数を追加できます。
- パイプラインの障害のアラートを送信します。
- レコード数や実行情報などの KPI のアラートを送信します。
- 再実行されていない失敗したパイプラインを再起動します。
Cloud Run functions の例については、ユースケースのセクションをご覧ください。
[デプロイ] をクリックします。 詳細については、Cloud Run 関数 をデプロイするをご覧ください。
ユースケース: パイプラインのステータスを記録し、失敗したパイプラインを再試行する
次の Cloud Run functions の例では、パイプラインの実行ステータスに関する Pub/Sub メッセージを読み取り、Cloud Data Fusion で失敗したパイプラインを再試行します。
これらの関数の例は、次の Google Cloud コンポーネントを参照しています。
- Google Cloud プロジェクト: Cloud Run functions と Pub/Sub トピックが作成されるプロジェクト
- Pub/Sub トピック: Cloud Data Fusion インスタンスにリンクされている Pub/Sub トピック
- Cloud Data Fusion インスタンス: パイプラインを設計して実行する Cloud Data Fusion インスタンス
- BigQuery テーブル: パイプラインのステータスと実行の詳細を キャプチャする BigQuery テーブル
- Cloud Run function: 失敗したパイプラインを再試行するコードをデプロイする Cloud Run function
次の Cloud Run function の例は、Cloud Data Fusion ステータス イベントに関する Pub/Sub メッセージを読み取ります。
# Triggered from a message on a Pub/Sub topic. @functions_framework.cloud_event def cdf_event_trigger(cloud_event): decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message. pubsub_message = json.loads(decoded_message) # Extract pipeline run details. projectName = pubsub_message["projectName"] publishTime = pubsub_message["publishTime"] instanceName = pubsub_message["instanceName"] namespace = pubsub_message["programStatusEventDetails"]["namespace"] applicationName = pubsub_message["programStatusEventDetails"]["applicationName"] status = pubsub_message["programStatusEventDetails"]["status"] event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms') print(f"projectName: {projectName}") print(f"publishTime: {publishTime}") print(f"instanceName: {instanceName}") print(f"namespace: {namespace}") print(f"applicationName: {applicationName}") print(f"status: {status}") print(f"event timestamp: {event_timestamp}") try: error = pubsub_message["programStatusEventDetails"]["error"] print(f"error: {error}") except: print(f"Pipeline: {applicationName}'s current status: {status}")次の関数の例では、BigQuery テーブルを作成して保存し、パイプラインの実行の詳細をクエリします。
# Global variables. pipeline_rerun_count = 0 has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes. table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information. # Update BigQuery table with the pipeline status and rerun details. schema=[ bigquery.SchemaField("Project_Name", "STRING"), bigquery.SchemaField("Instance_Name", "STRING"), bigquery.SchemaField("Namespace", "STRING"), bigquery.SchemaField("Pipeline_Name", "STRING"), bigquery.SchemaField("Pipeline_Status", "STRING"), bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"), bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"), ] # Prepare DataFrame to load the data in BigQuery. data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]} dataframe = pd.DataFrame(data) # Prepare BigQuery data load job configuration. job_config = bigquery.LoadJobConfig(schema=schema) job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config) job.result() # Wait for the job to complete. table = bq_client.get_table(table_id) # Make an API request. print("BigQuery table: {} updated.".format(table_id))次の関数の例では、失敗したパイプラインと、過去 1 時間以内にそれらが再実行されたかどうかを確認します。
bq_client = bigquery.Client() if status == "FAILED": print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.") QUERY = f""" SELECT * FROM `{table_id}` WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED" AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE) AND Pipeline_Rerun_Count > 0 """ query_job = bq_client.query_and_wait(QUERY) # API request. row_count = query_job.total_rows # Waits for query to finish. print(f"Query job result row count: {row_count}") if (row_count > 0): print("Pipeline has FAILED and rerun recently...") global has_pipeline_failed_and_rerun_recently has_pipeline_failed_and_rerun_recently = True失敗したパイプラインが最近実行されていない場合、次の関数の例では、失敗したパイプラインを再実行します。
if not has_pipeline_failed_and_rerun_recently: applicationName = applicationName auth_token = get_access_token() post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"} cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api" run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName) # Start the job. response = requests.post(run_pipeline_endpoint,headers=post_headers) print(f"Response for restarting the failed pipeline: {response}") global pipeline_rerun_count pipeline_rerun_count = 1
次のステップ
- Cloud Run の関数を作成する方法を学習します。