Apache Flink

Apache Flink 整合功能會收集用戶端、JobManager 和 TaskManager 的記錄,並將其剖析為 JSON 酬載。結果包含來源、層級和訊息的欄位。

如要進一步瞭解 Flink,請參閱 Apache Flink 說明文件

必要條件

如要收集 Flink 遙測資料,請安裝 Ops Agent

  • 如要使用指標,請安裝 2.18.1 以上版本。
  • 如要查看記錄,請安裝 2.17.0 以上版本。

這項整合作業支援 Flink 1.12.5、1.13.6 和 1.14.4 版。

設定 Flink 適用的作業套件代理程式

按照設定作業套件代理程式指南操作,新增必要元素,從 Flink 執行個體收集遙測資料,然後重新啟動代理程式

範例設定

下列指令會建立設定,收集及擷取 Flink 的遙測資料:

# Configures Ops Agent to collect telemetry from the app. You must restart the agent for the configuration to take effect.

set -e

# Check if the file exists
if [ ! -f /etc/google-cloud-ops-agent/config.yaml ]; then
  # Create the file if it doesn't exist.
  sudo mkdir -p /etc/google-cloud-ops-agent
  sudo touch /etc/google-cloud-ops-agent/config.yaml
fi

# Create a back up of the existing file so existing configurations are not lost.
sudo cp /etc/google-cloud-ops-agent/config.yaml /etc/google-cloud-ops-agent/config.yaml.bak

# Configure the Ops Agent.
sudo tee /etc/google-cloud-ops-agent/config.yaml > /dev/null << EOF
metrics:
  receivers:
    flink:
      type: flink
  service:
    pipelines:
      flink:
        receivers:
          - flink
logging:
  receivers:
    flink:
      type: flink
  service:
    pipelines:
      flink:
        receivers:
          - flink
EOF

如要讓這些變更生效,請重新啟動 Ops Agent:

Linux

  1. 如要重新啟動代理程式,請在執行個體上執行下列指令:
    sudo systemctl restart google-cloud-ops-agent
    
  2. 如要確定代理程式已重新啟動,請執行下列指令,並驗證「指標代理程式」和「Logging 代理程式」元件是否已啟動:
    sudo systemctl status "google-cloud-ops-agent*"
    

Windows

  1. 使用遠端桌面協定或類似工具連線至執行個體,並登入 Windows。
  2. 以滑鼠右鍵按一下 PowerShell 圖示,然後選取「以系統管理員身分執行」,以管理員權限開啟 PowerShell 終端機。
  3. 如要重新啟動代理程式,請執行下列 PowerShell 指令:
    Restart-Service google-cloud-ops-agent -Force
    
  4. 如要確認代理程式已重新啟動,請執行下列指令,並驗證「指標代理程式」和「Logging 代理程式」元件是否已啟動:
    Get-Service google-cloud-ops-agent*
    

如要從 Flink 擷取記錄,您必須為 Flink 產生的記錄建立接收器,然後為新的接收器建立管道。

如要設定 flink 記錄的接收器,請指定下列欄位:

欄位 預設 說明
exclude_paths 要從 include_paths 比對的集合中排除的檔案系統路徑模式清單。
include_paths [/opt/flink/log/flink-*-standalonesession-*.log, /opt/flink/log/flink-*-taskexecutor-*.log, /opt/flink/log/flink-*-client-*.log] 要讀取的檔案系統路徑清單,方法是追蹤每個檔案。路徑中可以使用萬用字元 (*)。
record_log_file_path false 如果設為 true,輸出記錄項目中就會顯示記錄記錄的來源檔案路徑,做為 agent.googleapis.com/log_file_path 標籤的值。使用萬用字元時,系統只會記錄取得記錄的檔案路徑。
type 這個值必須是 flink
wildcard_refresh_interval 60s include_paths 中萬用字元檔案路徑的重新整理間隔。以時間長度表示,例如 30s2m。在記錄吞吐量較高的情況下,記錄檔的輪替速度會比預設間隔更快,這時這項屬性就可能派上用場。

記錄內容

logName 是從設定中指定的接收器 ID 衍生而來。LogEntry 內的詳細欄位如下。

flink 記錄的 LogEntry 包含下列欄位:

欄位 類型 說明
jsonPayload.level 字串 記錄項目層級
jsonPayload.message 字串 記錄訊息,包括詳細的堆疊追蹤 (如有提供)
jsonPayload.source 字串 記錄項目的來源 Java 類別
severity 字串 (LogSeverity) 記錄項目層級 (已翻譯)。

如要從 Flink 擷取指標,請為 Flink 產生的指標建立接收器,然後為新的接收器建立管道。

這個接收器不支援在設定中使用多個執行個體,例如監控多個端點。所有這類執行個體都會寫入相同的時間序列,Cloud Monitoring 無法區分這些執行個體。

如要為 flink 指標設定接收器,請指定下列欄位:

欄位 預設 說明
collection_interval 60s 時間長度值,例如 30s5m
endpoint http://localhost:8081 Flink 公開的網址。
type 這個值必須是 flink

監控的內容

下表列出作業套件代理程式從 Flink 執行個體收集的指標。

指標類型
類型
受監控的資源
標籤
workload.googleapis.com/flink.job.checkpoint.count
CUMULATIVEINT64
gce_instance
checkpoint
host_name
job_name
workload.googleapis.com/flink.job.checkpoint.in_progress
GAUGEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.job.last_checkpoint.size
GAUGEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.job.last_checkpoint.time
GAUGEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.job.restart.count
CUMULATIVEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.jvm.class_loader.classes_loaded
CUMULATIVEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.cpu.load
GAUGEDOUBLE
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.cpu.time
CUMULATIVEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.gc.collections.count
CUMULATIVEINT64
gce_instance
garbage_collector_name
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.gc.collections.time
CUMULATIVEINT64
gce_instance
garbage_collector_name
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.direct.total_capacity
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.direct.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.heap.committed
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.heap.max
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.heap.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.mapped.total_capacity
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.mapped.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.metaspace.committed
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.metaspace.max
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.metaspace.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.nonheap.committed
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.nonheap.max
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.nonheap.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.threads.count
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.memory.managed.total
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.memory.managed.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.operator.record.count
CUMULATIVEINT64
gce_instance
host_name
job_name
operator_name
record
subtask_index
task_name
taskmanager_id
workload.googleapis.com/flink.operator.watermark.output
GAUGEINT64
gce_instance
host_name
job_name
operator_name
subtask_index
task_name
taskmanager_id
workload.googleapis.com/flink.task.record.count
CUMULATIVEINT64
gce_instance
host_name
job_name
record
subtask_index
task_name
taskmanager_id

驗證設定

本節說明如何確認您已正確設定 Flink 接收器。作業套件代理程式可能需要一到兩分鐘,才會開始收集遙測資料。

如要確認 Flink 記錄是否已傳送至 Cloud Logging,請按照下列步驟操作:

  1. 前往 Google Cloud 控制台的「Logs Explorer」頁面:

    前往「Logs Explorer」(記錄檔探索工具)

    如果您是使用搜尋列尋找這個頁面,請選取子標題為「Logging」的結果

  2. 在編輯器中輸入下列查詢,然後按一下「執行查詢」
    resource.type="gce_instance"
    log_id("flink")
    

如要確認 Flink 指標是否已傳送至 Cloud Monitoring,請執行下列操作:

  1. 前往 Google Cloud 控制台的 「Metrics Explorer」頁面:

    前往 Metrics Explorer

    如果您是使用搜尋列尋找這個頁面,請選取子標題為「Monitoring」的結果

  2. 在查詢建構工具窗格的工具列中,選取名稱為  MQL PromQL 的按鈕。
  3. 確認已在「Language」(語言) 切換按鈕中選取「PromQL」。語言切換按鈕位於同一工具列,可供你設定查詢格式。
  4. 在編輯器中輸入下列查詢,然後按一下「執行查詢」
    {"workload.googleapis.com/flink.jvm.memory.heap.used", monitored_resource="gce_instance"}
    

查看資訊主頁

如要查看 Flink 指標,您必須設定圖表或資訊主頁。Flink 整合功能包含一或多個資訊主頁。 設定整合功能後,Ops Agent 就會開始收集指標資料,並自動安裝所有資訊主頁。

您也可以查看資訊主頁的靜態預覽畫面,不必安裝整合功能。

如要查看已安裝的資訊主頁,請按照下列步驟操作:

  1. 在 Google Cloud 控制台中,前往「資訊主頁」頁面:

    前往「Dashboards」(資訊主頁)

    如果您是使用搜尋列尋找這個頁面,請選取子標題為「Monitoring」的結果

  2. 選取「資訊主頁清單」分頁,然後選擇「整合」類別。
  3. 按一下要查看的資訊主頁名稱。

如果您已設定整合功能,但尚未安裝資訊主頁,請檢查作業套件代理程式是否正在執行。如果資訊主頁中的圖表沒有指標資料,資訊主頁安裝作業就會失敗。作業套件代理程式開始收集指標後,系統就會為您安裝資訊主頁。

如要查看資訊主頁的靜態預覽畫面,請按照下列步驟操作:

  1. 前往 Google Cloud 控制台的 「Integrations」(整合) 頁面

    前往「整合」

    如果您是使用搜尋列尋找這個頁面,請選取子標題為「Monitoring」的結果

  2. 按一下「Compute Engine」部署平台篩選器。
  3. 找出 Flink 的項目,然後按一下「查看詳細資料」
  4. 選取「資訊主頁」分頁標籤,即可查看靜態預覽畫面。如果已安裝資訊主頁,請點選「查看資訊主頁」前往。

如要進一步瞭解 Cloud Monitoring 中的資訊主頁,請參閱「資訊主頁和圖表」。

如要進一步瞭解如何使用「整合」頁面,請參閱「管理整合」一文。

安裝快訊政策

警告政策會指示 Cloud Monitoring 在發生指定情況時通知您。Flink 整合功能包含一或多項警告政策,供您使用。您可以在 Monitoring 的「整合」頁面中查看及安裝這些快訊政策。

如要查看可用快訊政策的說明並安裝,請按照下列步驟操作:

  1. 前往 Google Cloud 控制台的 「Integrations」(整合) 頁面

    前往「整合」

    如果您是使用搜尋列尋找這個頁面,請選取子標題為「Monitoring」的結果

  2. 找出 Flink 的項目,然後按一下「查看詳細資料」
  3. 選取「快訊」分頁標籤。這個分頁會說明可用的快訊政策,並提供安裝介面。
  4. 安裝快訊政策。快訊政策需要知道在快訊觸發時要將通知傳送至何處,因此需要您提供安裝資訊。如要安裝快訊政策,請按照下列步驟操作:
    1. 從可用的警報政策清單中,選取要安裝的政策。
    2. 在「設定通知」部分,選取一或多個通知管道。您可以選擇停用通知管道,但這麼做的話,快訊政策會以無聲方式觸發。您可以在「監控」中查看狀態,但不會收到任何通知。

      如要進一步瞭解通知管道,請參閱「管理通知管道」。

    3. 按一下「建立政策」

如要進一步瞭解 Cloud Monitoring 中的快訊政策,請參閱「快訊簡介」。

如要進一步瞭解如何使用「整合」頁面,請參閱「管理整合」一文。

後續步驟

如要逐步瞭解如何使用 Ansible 安裝作業套件代理程式、設定第三方應用程式,以及安裝範例資訊主頁,請觀看「 安裝作業套件代理程式以排解第三方應用程式的問題」影片。