設定 Pub/Sub 通知

本文說明如何設定註記例項更新的通知。

容器分析會針對自動掃描發現的安全漏洞和其他中繼資料,透過 Pub/Sub 發出通知。系統建立或更新註記或例項時,就會向每個 API 版本的對應主題發布訊息。請使用與您目前 API 版本相應的主題。

事前準備

  1. 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Enable the Container Analysis API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  4. 安裝 Google Cloud CLI。

  5. 若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI

  6. 執行下列指令,初始化 gcloud CLI:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  8. Enable the Container Analysis API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  9. 安裝 Google Cloud CLI。

  10. 若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI

  11. 執行下列指令,初始化 gcloud CLI:

    gcloud init
  12. 瞭解如何為專案中的中繼資料設定存取權控管。如果您只使用 Artifact Analysis 容器掃描作業建立的例項中繼資料,請略過這個步驟。

建立 Pub/Sub 主題

啟動 Artifact Analysis API 後,Artifact Analysis 會自動建立 Pub/Sub 主題,主題 ID 如下:

  • container-analysis-notes-v1
  • container-analysis-occurrences-v1

如果不慎刪除或遺失主題,您可以自行加入。舉例來說,如果貴機構設有 Google Cloud組織政策限制,規定必須使用客戶自行管理的加密金鑰 (CMEK) 進行加密,主題可能就會遺失。如果 Pub/Sub API 位於這項限制的拒絕清單中,服務就無法使用Google-owned and Google-managed encryption keys自動建立主題。

如要使用 Google-owned and Google-managed encryption keys建立主題:

控制台

  1. 前往 Google Cloud 控制台的 Pub/Sub 主題頁面。

    開啟 Pub/Sub 主題頁面

  2. 按一下 [Create Topic] (建立主題)

  3. 輸入主題 ID:

    container-analysis-notes-v1
    

    讓名稱與 URI 相符:

    projects/PROJECT_ID/topics/container-analysis-notes-v1
    

    其中 PROJECT_ID 是您的 Google Cloud 專案 ID

  4. 點選「建立」

  5. 輸入主題 ID:

    container-analysis-occurrences-v1
    

    讓名稱與 URI 相符:

    projects/PROJECT_ID/topics/container-analysis-occurrences-v1
    

gcloud

在您的殼層或終端機視窗中執行下列指令:

gcloud pubsub topics create projects/PROJECT_ID/topics/container-analysis-notes-v1
gcloud pubsub topics create projects/PROJECT_ID/topics/container-analysis-occurrences-v1

如要進一步瞭解 gcloud pubsub topics 指令,請參閱topics 說明文件

如要建立採用 CMEK 加密的主題,請參閱 Pub/Sub 的主題加密操作說明

只要系統建立或更新註記或例項,就會向相關主題發布訊息,但您也必須建立 Pub/Sub 訂閱項目,才能監聽事件及接收 Pub/Sub 服務傳送的訊息。

建立 Pub/Sub 訂閱項目

如要監聽事件,請建立與主題相關聯的 Pub/Sub 訂閱項目:

控制台

  1. 前往Google Cloud 控制台的 Pub/Sub 訂閱項目頁面。

    開啟 Pub/Sub 訂閱項目頁面

  2. 按一下 [Create Subscription] (建立訂閱項目)

  3. 輸入訂閱方案名稱。例如「notes」

  4. 輸入註記的主題 URI:

    projects/PROJECT_ID/topics/container-analysis-notes-v1
    

    其中 PROJECT_ID 是您的 Google Cloud 專案 ID

  5. 點選「建立」

  6. 使用 URI 替例項建立另一個訂閱項目:

    projects/PROJECT_ID/topics/container-analysis-occurrences-v1
    

gcloud

如要接收 Pub/Sub 事件,您必須先建立與 container-analysis-occurrences-v1 主題相關聯的訂閱項目:

gcloud pubsub subscriptions create \
    --topic container-analysis-occurrences-v1 occurrences

您之後可以使用新訂閱提取有關例項的訊息:

gcloud pubsub subscriptions pull \
    --auto-ack occurrences

Java

如要瞭解如何安裝及使用 Artifact Analysis 的用戶端程式庫,請參閱「Artifact Analysis 用戶端程式庫」。詳情請參閱 Artifact Analysis Java API 參考文件

如要向 Artifact Analysis 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.lang.InterruptedException;
import java.util.concurrent.TimeUnit;

public class Subscriptions {
  // Handle incoming Occurrences using a Cloud Pub/Sub subscription
  public static int pubSub(String subId, long timeoutSeconds, String projectId)
      throws InterruptedException {
    // String subId = "my-occurrence-subscription";
    // long timeoutSeconds = 20;
    // String projectId = "my-project-id";
    Subscriber subscriber = null;
    MessageReceiverExample receiver = new MessageReceiverExample();

    try {
      // Subscribe to the requested Pub/Sub channel
      ProjectSubscriptionName subName = ProjectSubscriptionName.of(projectId, subId);
      subscriber = Subscriber.newBuilder(subName, receiver).build();
      subscriber.startAsync().awaitRunning();
      // Sleep to listen for messages
      TimeUnit.SECONDS.sleep(timeoutSeconds);
    } finally {
      // Stop listening to the channel
      if (subscriber != null) {
        subscriber.stopAsync();
      }
    }
    // Print and return the number of Pub/Sub messages received
    System.out.println(receiver.messageCount);
    return receiver.messageCount;
  }

  // Custom class to handle incoming Pub/Sub messages
  // In this case, the class will simply log and count each message as it comes in
  static class MessageReceiverExample implements MessageReceiver {
    public int messageCount = 0;

    @Override
    public synchronized void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
      // Every time a Pub/Sub message comes in, print it and count it
      System.out.println("Message " + messageCount + ": " + message.getData().toStringUtf8());
      messageCount += 1;
      // Acknowledge the message
      consumer.ack();
    }
  }

  // Creates and returns a Pub/Sub subscription object listening to the Occurrence topic
  public static Subscription createOccurrenceSubscription(String subId, String projectId) 
      throws IOException, StatusRuntimeException, InterruptedException {
    // This topic id will automatically receive messages when Occurrences are added or modified
    String topicId = "container-analysis-occurrences-v1";
    TopicName topicName = TopicName.of(projectId, topicId);
    SubscriptionName subName = SubscriptionName.of(projectId, subId);

    SubscriptionAdminClient client = SubscriptionAdminClient.create();
    PushConfig config = PushConfig.getDefaultInstance();
    Subscription sub = client.createSubscription(subName, topicName, config, 0);
    return sub;
  }
}

Go

如要瞭解如何安裝及使用 Artifact Analysis 的用戶端程式庫,請參閱「Artifact Analysis 用戶端程式庫」。詳情請參閱 Artifact Analysis Go API 參考文件

如要向 Artifact Analysis 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。


import (
	"context"
	"fmt"
	"io"
	"sync"
	"time"

	pubsub "cloud.google.com/go/pubsub"
)

// occurrencePubsub handles incoming Occurrences using a Cloud Pub/Sub subscription.
func occurrencePubsub(w io.Writer, subscriptionID string, timeout time.Duration, projectID string) (int, error) {
	// subscriptionID := fmt.Sprintf("my-occurrences-subscription")
	// timeout := time.Duration(20) * time.Second
	ctx := context.Background()

	var mu sync.Mutex
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return -1, fmt.Errorf("pubsub.NewClient: %w", err)
	}
	// Subscribe to the requested Pub/Sub channel.
	sub := client.Subscription(subscriptionID)
	count := 0

	// Listen to messages for 'timeout' seconds.
	ctx, cancel := context.WithTimeout(ctx, timeout)
	defer cancel()
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		mu.Lock()
		count = count + 1
		fmt.Fprintf(w, "Message %d: %q\n", count, string(msg.Data))
		msg.Ack()
		mu.Unlock()
	})
	if err != nil {
		return -1, fmt.Errorf("sub.Receive: %w", err)
	}
	// Print and return the number of Pub/Sub messages received.
	fmt.Fprintln(w, count)
	return count, nil
}

// createOccurrenceSubscription creates a new Pub/Sub subscription object listening to the Occurrence topic.
func createOccurrenceSubscription(subscriptionID, projectID string) error {
	// subscriptionID := fmt.Sprintf("my-occurrences-subscription")
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// This topic id will automatically receive messages when Occurrences are added or modified
	topicID := "container-analysis-occurrences-v1"
	topic := client.Topic(topicID)
	config := pubsub.SubscriptionConfig{Topic: topic}
	_, err = client.CreateSubscription(ctx, subscriptionID, config)
	return fmt.Errorf("client.CreateSubscription: %w", err)
}

Node.js

如要瞭解如何安裝及使用 Artifact Analysis 的用戶端程式庫,請參閱「Artifact Analysis 用戶端程式庫」。詳情請參閱 Artifact Analysis Node.js API 參考文件

如要向 Artifact Analysis 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

/**
 * TODO(developer): Uncomment these variables before running the sample
 */
// const projectId = 'your-project-id', // Your GCP Project ID
// const subscriptionId = 'my-sub-id', // A user-specified subscription to the 'container-analysis-occurrences-v1' topic
// const timeoutSeconds = 30 // The number of seconds to listen for the new Pub/Sub Messages

// Import the pubsub library and create a client, topic and subscription
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub({projectId});
const subscription = pubsub.subscription(subscriptionId);

// Handle incoming Occurrences using a Cloud Pub/Sub subscription
let count = 0;
const messageHandler = message => {
  count++;
  message.ack();
};

// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);

setTimeout(() => {
  subscription.removeListener('message', messageHandler);
  console.log(`Polled ${count} occurrences`);
}, timeoutSeconds * 1000);

Ruby

如要瞭解如何安裝及使用 Artifact Analysis 的用戶端程式庫,請參閱「Artifact Analysis 用戶端程式庫」。詳情請參閱 Artifact Analysis Ruby API 參考文件

如要向 Artifact Analysis 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

# subscription_id = "A user-specified identifier for the new subscription"
# timeout_seconds = "The number of seconds to listen for new Pub/Sub messages"
# project_id      = "Your Google Cloud project ID"

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new project_id: project_id
subscription_admin = pubsub.subscription_admin
subscription = subscription_admin.create_subscription \
  name: pubsub.subscription_path(subscription_id),
  topic: pubsub.topic_path("container-analysis-occurrences-v1")

subscriber = pubsub.subscriber subscription.name
count = 0
listener = subscriber.listen do |received_message|
  count += 1
  # Process incoming occurrence here
  puts "Message #{count}: #{received_message.data}"
  received_message.acknowledge!
end

listener.start
# Wait for incoming occurrences
sleep timeout_seconds
listener.stop.wait!

subscription_admin.delete_subscription subscription: subscription.name

# Print and return the total number of Pub/Sub messages received
puts "Total Messages Received: #{count}"
count

Python

如要瞭解如何安裝及使用 Artifact Analysis 的用戶端程式庫,請參閱「Artifact Analysis 用戶端程式庫」。詳情請參閱 Artifact Analysis Python API 參考文件

如要向 Artifact Analysis 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

import time

from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub import SubscriberClient
from google.cloud.pubsub_v1.subscriber.message import Message


def pubsub(subscription_id: str, timeout_seconds: int, project_id: str) -> int:
    """Respond to incoming occurrences using a Cloud Pub/Sub subscription."""
    # subscription_id := 'my-occurrences-subscription'
    # timeout_seconds = 20
    # project_id = 'my-gcp-project'

    client = SubscriberClient()
    subscription_name = client.subscription_path(project_id, subscription_id)
    receiver = MessageReceiver()
    client.subscribe(subscription_name, receiver.pubsub_callback)

    # listen for 'timeout' seconds
    for _ in range(timeout_seconds):
        time.sleep(1)
    # print and return the number of pubsub messages received
    print(receiver.msg_count)
    return receiver.msg_count


class MessageReceiver:
    """Custom class to handle incoming Pub/Sub messages."""

    def __init__(self) -> None:
        # initialize counter to 0 on initialization
        self.msg_count = 0

    def pubsub_callback(self, message: Message) -> None:
        # every time a pubsub message comes in, print it and count it
        self.msg_count += 1
        print(f"Message {self.msg_count}: {message.data}")
        message.ack()


def create_occurrence_subscription(subscription_id: str, project_id: str) -> bool:
    """Creates a new Pub/Sub subscription object listening to the
    Container Analysis Occurrences topic."""
    # subscription_id := 'my-occurrences-subscription'
    # project_id = 'my-gcp-project'

    topic_id = "container-analysis-occurrences-v1"
    client = SubscriberClient()
    topic_name = f"projects/{project_id}/topics/{topic_id}"
    subscription_name = client.subscription_path(project_id, subscription_id)
    success = True
    try:
        client.create_subscription({"name": subscription_name, "topic": topic_name})
    except AlreadyExists:
        # if subscription already exists, do nothing
        pass
    else:
        success = False
    return success

訂閱者應用程式只會收到訂閱項目建立後發布到主題的訊息。

Pub/Sub 酬載採用 JSON 格式,其結構定義如下所示:

注意:

{
    "name": "projects/PROJECT_ID/notes/NOTE_ID",
    "kind": "NOTE_KIND",
    "notificationTime": "NOTIFICATION_TIME",
}

例項:

{
    "name": "projects/PROJECT_ID/occurrences/OCCURRENCE_ID",
    "kind": "NOTE_KIND",
    "notificationTime": "NOTIFICATION_TIME",
}

其中:

  • NOTE_KINDNoteKind 中的一個值。
  • NOTIFICATION_TIME 是採用 RFC 3339 UTC Zulu 格式的時間戳記,精確度達奈秒單位。

查看詳細資料

如要進一步瞭解附註或事件,可以存取儲存在 Artifact Analysis 中的中繼資料。舉例來說,您可以要求提供特定事件的所有詳細資料。請參閱「調查安全漏洞」一文中的操作說明。

後續步驟