יצירת אשכול של שירות מנוהל של Google Cloud ל-Apache Kafka

קלאסטר של שירות מנוהל ל-Apache Kafka מספק סביבה לאחסון ולעיבוד של זרמי הודעות שמסודרים לפי נושאים.

כדי ליצור אשכול, אפשר להשתמש במסוף Google Cloud , ב-Google Cloud CLI, בספריית הלקוח או ב-Managed Kafka API. אי אפשר להשתמש ב-API של Apache Kafka בקוד פתוח כדי ליצור אשכול.

לפני שמתחילים

חשוב לוודא שאתם מכירים את הנקודות הבאות:

התפקידים וההרשאות שנדרשים ליצירת אשכול

כדי לקבל את ההרשאות שנדרשות ליצירת אשכול, צריך לבקש מהאדמין להקצות לכם את תפקיד ה-IAM‏ Managed Kafka Cluster Editor (roles/managedkafka.clusterEditor) בפרויקט. כדי לקרוא הסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

זהו תפקיד שמוגדר מראש וכולל את ההרשאות שנדרשות ליצירת אשכול. כדי לראות בדיוק אילו הרשאות נדרשות, אפשר להרחיב את הקטע ההרשאות הנדרשות:

ההרשאות הנדרשות

כדי ליצור אשכול, צריך את ההרשאות הבאות:

  • יצירת אשכול: managedkafka.clusters.create

יכול להיות שתקבלו את ההרשאות האלה באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש אחרים.

מאפיינים של אשכול שירות מנוהל ל-Apache Kafka

כשיוצרים או מעדכנים אשכול של שירות מנוהל ל-Apache Kafka, צריך לציין את המאפיינים הבאים.

שם האשכול

השם או המזהה של אשכול השירות המנוהל ל-Apache Kafka שאתם יוצרים. הנחיות לשמות של אשכולות זמינות במאמר הנחיות לשמות של משאבים בשירות מנוהל ל-Apache Kafka. השם של אשכול הוא קבוע.

מיקום

המיקום שבו אתם יוצרים את האשכול. המיקום חייב להיות באחד מהאזורים הנתמכים של Google Cloud . אי אפשר לשנות את המיקום של אשכול בשלב מאוחר יותר. רשימת המיקומים הזמינים מופיעה במאמר מיקומים של שירות מנוהל ל-Apache Kafka.

הגדרת הקיבולת

כדי להגדיר את הקיבולת, צריך להגדיר את מספר המעבדים הווירטואליים ואת כמות הזיכרון להגדרת Kafka. למידע נוסף על הגדרת הקיבולת של אשכול, אפשר לעיין במאמר בנושא תכנון הגודל של אשכול Kafka.

אלה המאפיינים להגדרת הקיבולת:

  • vCPUs: מספר המעבדים הווירטואליים באשכול. נדרשים לפחות 3 מעבדים וירטואליים לכל אשכול.

  • זיכרון: נפח הזיכרון שמוקצה לאשכול. צריך להקצות בין 1 ל-8‎ GiB לכל vCPU.

    לדוגמה, אם יוצרים אשכול עם 6 ליבות וירטואליות, הזיכרון המינימלי שאפשר להקצות לאשכול הוא 6GiB (‎1 GiB לכל ליבה וירטואלית), והמקסימלי הוא 48GiB (‎8 GiB לכל ליבה וירטואלית).

מידע נוסף על שינוי הזיכרון ומספר ליבות ה-CPU הווירטואליות אחרי יצירת אשכול זמין במאמר בנושא עדכון גודל האשכול.

הגדרת רשת

תצורת הרשת היא רשימה של רשתות משנה של VPC שאפשר לגשת אליהן מהאשכול. כדי ליצור הודעות או לצרוך אותן, הלקוחות צריכים להיות מסוגלים להגיע לאחת מתת-הרשתות האלה.

ריכזנו כאן כמה הנחיות להגדרת הרשת:

  • נדרשת לפחות רשת משנה אחת לאשכול. מספר החשבונות המקסימלי הוא עשרה.

  • מותר להשתמש ברשת משנה אחת בלבד לכל רשת עבור כל אשכול נתון.

  • כל רשת משנה צריכה להיות באותו אזור כמו האשכול. הפרויקט והרשת יכולים להיות שונים.

  • כתובות ה-IP של השרתים המתווכים ושל שרת האתחול מוקצות באופן אוטומטי בכל תת-רשת. בנוסף, נוצרים ערכי DNS לכתובות ה-IP האלה ברשתות ה-VPC המתאימות.

  • אם מוסיפים רשת משנה מפרויקט אחר, צריך להעניק הרשאות לחשבון השירות שמנוהל על ידי Google שמשויך לאשכול. למידע נוסף, קראו את המאמר בנושא חיבור אשכול בין פרויקטים.

אחרי שיוצרים את האשכול, אפשר לעדכן את רשימת רשתות המשנה. מידע נוסף על רשתות זמין במאמר הגדרת רשתות בשירות המנוהל ל-Apache Kafka.

תוויות

תוויות הן צמדי מפתח/ערך שעוזרים לכם לארגן ולזהות את המשאבים. התוויות מאפשרות לסווג משאבים לפי סביבה. לדוגמה, "env:production" ו-"owner:data-engineering".

אפשר לסנן ולחפש משאבים לפי התוויות שלהם. לדוגמה, נניח שיש לכם כמה אשכולות של שירות מנוהל ל-Apache Kafka למחלקות שונות. אתם יכולים להגדיר אשכולות ולחפש אותם באמצעות התווית "department:marketing" כדי למצוא במהירות את האשכול הרלוונטי.

הגדרות איזון מחדש

ההגדרה הזו קובעת אם השירות יאזן מחדש באופן אוטומטי את העותקים של המחיצות בין הברוקרים.

המצבים הזמינים הם:

  • איזון מחדש אוטומטי בהגדלת קנה מידה: כשהאפשרות הזו מופעלת, השירות מפעיל באופן אוטומטי איזון מחדש של העותקים כשמגדילים את קנה המידה של האשכול. המצב הזה עוזר לשמור על חלוקת עומס אחידה, אבל יכול להיות שתהיה השפעה זמנית על הביצועים במהלך פעולת האיזון מחדש.

  • No rebalance (ללא איזון מחדש): כשהאפשרות הזו מופעלת, השירות לא מבצע איזון מחדש של העותקים באופן אוטומטי.

הצפנה

בשירות המנוהל ל-Apache Kafka אפשר להצפין הודעות באמצעותGoogle-owned and Google-managed encryption keys (ברירת מחדל) או מפתחות הצפנה בניהול הלקוח (CMEK). כל ההודעות מוצפנות גם כשהן נשמרות וגם כשהן נשלחות. סוג ההצפנה של אשכול הוא קבוע.

Google-owned and Google-managed encryption keys משמשים כברירת מחדל. המפתחות האלה נוצרים, מנוהלים ומאוחסנים באופן מלא על ידי Google Cloud בתשתית שלו.

מפתחות CMEK הם מפתחות הצפנה שאתם מנהלים באמצעות Cloud Key Management Service. התכונה הזו מאפשרת לכם לקבל שליטה רבה יותר על המפתחות שמשמשים להצפנת נתונים במנוחה בשירותים נתמכים Google Cloud . השימוש ב-CMEK כרוך בעלויות נוספות שקשורות ל-Cloud Key Management Service. כשמשתמשים ב-CMEK, אוסף המפתחות צריך להיות באותו מיקום שבו נמצאים המשאבים שמשתמשים בו איתם. מידע נוסף זמין במאמר בנושא הגדרת הצפנה של הודעות.

הגדרת mTLS

אפשר להגדיר mTLS כאמצעי אימות חלופי שמשתמש באישורים של לקוחות. ההגדרות כוללות את הפריטים הבאים:

  • מאגרי CA: רשימה של מאגר אחד עד עשרה של Certificate Authority Service (CAS) שהאשכול סומך עליהם לצורך אימות לקוח.

  • כללי מיפוי של חשבון משתמש ב-SSL: מאפיין אופציונלי של ברוקר ssl.principal.mapping.rules, אבל מומלץ להשתמש בו כדי לפשט שמות ארוכים של חשבונות משתמשים באישור לשימוש ברשימות ACL של Kafka.

מידע נוסף על mTLS זמין במאמר הגדרת אימות mTLS.

יצירת אשכול

לפני שיוצרים אשכול, כדאי לעיין במסמכי המידע בנושא מאפייני אשכול.

יצירת אשכול אורכת בדרך כלל 20-30 דקות.

כדי ליצור אשכול, פועלים לפי השלבים הבאים:

המסוף

  1. נכנסים לדף Clusters במסוף Google Cloud .

    מעבר אל Clusters

  2. לוחצים על יצירה.

    ייפתח הדף Create Kafka cluster.

  3. בשדה שם האשכול, מזינים מחרוזת.

    מידע נוסף על מתן שם לאשכול זמין במאמר הנחיות למתן שם למשאב של שירות מנוהל ל-Apache Kafka.

  4. בקטע מיקום, מזינים מיקום נתמך.

    מידע נוסף על מיקומים נתמכים זמין במאמר מיקומים נתמכים של שירות מנוהל ל-Apache Kafka.

  5. בקטע Capacity configuration (הגדרת קיבולת), מזינים ערכים בשדות Memory (זיכרון) ו-vCPUs (מעבדים וירטואליים).

    מידע נוסף על קביעת הגודל של אשכול שירות מנוהל ל-Apache Kafka זמין במאמר תכנון הגודל של אשכול Kafka.

  6. בקטע Network configuration (הגדרת הרשת), מזינים את הפרטים הבאים:

    1. Project: הפרויקט שבו נמצאת רשת המשנה. רשת המשנה צריכה להיות באותו אזור שבו נמצא האשכול, אבל יכול להיות שהפרויקט יהיה שונה.
    2. רשת: הרשת שאליה מחוברת רשת המשנה.
    3. רשת משנה: שם תת-הרשת.
    4. נתיב URI של רשת משנה: השדה הזה מאוכלס באופן אוטומטי. אפשר גם להזין כאן את הנתיב של תת-הרשת. שם תת-הרשת צריך להיות בפורמט: projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID.
    5. לוחצים על סיום.
  7. אופציונלי: כדי להוסיף רשתות משנה נוספות, לוחצים על הוספת רשת משנה מחוברת.

    אפשר להוסיף רשתות משנה נוספות, עד ערך מקסימלי של עשר.

  8. אופציונלי: מגדירים מפתח הצפנה בניהול הלקוח (CMEK).

    1. בקטע הצפנה, בוחרים באפשרות מפתח Cloud KMS.

    2. בשדה Key type, בוחרים באפשרות Cloud KMS.

    3. בקטע Select a customer-managed key (בחירת מפתח בניהול הלקוח), בוחרים או מזינים את מפתח ה-CMEK.

      אם האפשרות הזו לא מוצגת, צריך לוודא שהפעלתם את Cloud KMS API בפרויקט.

    מידע נוסף זמין במאמר בנושא הגדרת הצפנה של הודעות.

  9. לוחצים על יצירה.

gcloud

  1. במסוף Google Cloud , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

    בחלק התחתון של Google Cloud המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.

  2. מריצים את הפקודה gcloud managed-kafka clusters create:

    gcloud managed-kafka clusters create CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --encryption-key=ENCRYPTION_KEY \
        --async \
        --labels=LABELS
    

    מחליפים את מה שכתוב בשדות הבאים:

    • CLUSTER_ID: המזהה או השם של האשכול.

      מידע נוסף על מתן שם לאשכול זמין במאמר הנחיות למתן שם למשאב של שירות מנוהל ל-Apache Kafka.

    • LOCATION: המיקום של האשכול.

      מידע נוסף על מיקומים נתמכים זמין במאמר בנושא מיקומים של שירות מנוהל ל-Apache Kafka.

    • CPU: מספר ליבות ה-vCPU באשכול.

      מידע נוסף על קביעת הגודל של אשכול שירות מנוהל ל-Apache Kafka זמין במאמר תכנון הגודל של אשכול Kafka.

    • MEMORY: כמות הזיכרון של האשכול. צריך להשתמש ביחידות 'MB',‏ 'MiB',‏ 'GB',‏ 'GiB',‏ 'TB' או 'TiB'. לדוגמה, ‎"10GiB".

    • SUBNETS: רשימת תת-הרשתות להתחברות. כדי להפריד בין כמה ערכים של רשתות משנה, צריך להשתמש בפסיקים.

      הפורמט של רשת המשנה הוא projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID.

    • auto-rebalance: הפעלה של איזון מחדש אוטומטי של מחיצות בנושא בין ברוקרים, כשמספר המעבדים באשכול משתנה. ההגדרה הזו מופעלת כברירת מחדל.

    • ENCRYPTION_KEY: המזהה של מפתח ההצפנה בניהול הלקוח שבו רוצים להשתמש עבור האשכול.

      הפורמט הוא projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/CRYPTO_KEY.

    • --async: מאפשר למערכת לשלוח את בקשת היצירה ולהחזיר תגובה באופן מיידי, בלי לחכות לסיום הפעולה. בעזרת הדגל --async, אפשר להמשיך במשימות אחרות בזמן שיצירת האשכול מתבצעת ברקע. אם לא משתמשים בדגל, המערכת מחכה שהפעולה תסתיים לפני שהיא מחזירה תגובה. צריך לחכות עד שהעדכון של האשכול יסתיים לפני שממשיכים במשימות אחרות.

    • LABELS: תוויות לשיוך לאשכול.

      מידע נוסף על הפורמט של תוויות זמין במאמר בנושא תוויות.

    מקבלים תגובה שדומה לזו:

    Create request issued for: [CLUSTER_ID]
    Check operation [projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID] for status.
    

    שומרים את OPERATION_ID כדי לעקוב אחרי progress.

REST

לפני שמשתמשים בנתוני הבקשה, צריך להחליף את הנתונים הבאים:

  • PROJECT_ID: מזהה הפרויקט ב- Google Cloud
  • LOCATION: המיקום של האשכול
  • CLUSTER_ID: מזהה האשכול
  • CPU_COUNT: מספר המעבדים הווירטואליים באשכול
  • MEMORY: כמות הזיכרון באשכול, בבייטים. דוגמה: 3221225472.
  • SUBNET_ID: מזהה תת-הרשת שאליה רוצים להתחבר. דוגמה: default.

ה-method של ה-HTTP וכתובת ה-URL:

POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters?clusterId=CLUSTER_ID

תוכן בקשת JSON:

{
  "capacityConfig": {
    "vcpuCount": CPU_COUNT,
    "memoryBytes": MEMORY
  },
  "gcpConfig": {
    "accessConfig": {
      "networkConfigs": [
        {
          "subnet": "projects/PROJECT_ID/regions/LOCATION/subnetworks/SUBNET_ID"
        }
      ]
    }
  }
}

כדי לשלוח את הבקשה צריך להרחיב אחת מהאפשרויות הבאות:

אתם אמורים לקבל תגובת JSON שדומה לזו:

{
  "name": "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.managedkafka.v1.OperationMetadata",
    "createTime": "CREATE_TIME",
    "target": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID",
    "verb": "create",
    "requestedCancellation": false,
    "apiVersion": "v1"
  },
  "done": false
}

Terraform

אפשר להשתמש במשאב של Terraform כדי ליצור אשכול.

resource "google_managed_kafka_cluster" "default" {
  project    = data.google_project.default.project_id # Replace this with your project ID in quotes
  cluster_id = "my-cluster-id"
  location   = "us-central1"
  capacity_config {
    vcpu_count   = 3
    memory_bytes = 3221225472
  }
  gcp_config {
    access_config {
      network_configs {
        subnet = google_compute_subnetwork.default.id
      }
    }
  }
}

כדי ללמוד איך להחיל הגדרות ב-Terraform או להסיר אותן, ראו פקודות בסיסיות ב-Terraform.

המשך

לפני שמנסים להריץ את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Go במאמר התקנת ספריות הלקוח. מידע נוסף מופיע ב מאמרי העזרה של ה-API של שירות מנוהל ל-Apache Kafka Go.

כדי לבצע אימות לשירות המנוהל ל-Apache Kafka, צריך להגדיר את Application Default Credentials‏(ADC). מידע נוסף זמין במאמר הגדרת ADC לסביבת פיתוח מקומית.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func createCluster(w io.Writer, projectID, region, clusterID, subnet string, cpu, memoryBytes int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// subnet := "projects/my-project-id/regions/us-central1/subnetworks/default"
	// cpu := 3
	// memoryBytes := 3221225472
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	locationPath := fmt.Sprintf("projects/%s/locations/%s", projectID, region)
	clusterPath := fmt.Sprintf("%s/clusters/%s", locationPath, clusterID)

	// Memory must be between 1 GiB and 8 GiB per CPU.
	capacityConfig := &managedkafkapb.CapacityConfig{
		VcpuCount:   cpu,
		MemoryBytes: memoryBytes,
	}
	var networkConfig []*managedkafkapb.NetworkConfig
	networkConfig = append(networkConfig, &managedkafkapb.NetworkConfig{
		Subnet: subnet,
	})
	platformConfig := &managedkafkapb.Cluster_GcpConfig{
		GcpConfig: &managedkafkapb.GcpConfig{
			AccessConfig: &managedkafkapb.AccessConfig{
				NetworkConfigs: networkConfig,
			},
		},
	}
	rebalanceConfig := &managedkafkapb.RebalanceConfig{
		Mode: managedkafkapb.RebalanceConfig_AUTO_REBALANCE_ON_SCALE_UP,
	}
	cluster := &managedkafkapb.Cluster{
		Name:            clusterPath,
		CapacityConfig:  capacityConfig,
		PlatformConfig:  platformConfig,
		RebalanceConfig: rebalanceConfig,
	}

	req := &managedkafkapb.CreateClusterRequest{
		Parent:    locationPath,
		ClusterId: clusterID,
		Cluster:   cluster,
	}
	op, err := client.CreateCluster(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateCluster got err: %w", err)
	}
	// The duration of this operation can vary considerably, typically taking 10-40 minutes.
	resp, err := op.Wait(ctx)
	if err != nil {
		return fmt.Errorf("op.Wait got err: %w", err)
	}
	fmt.Fprintf(w, "Created cluster: %s\n", resp.Name)
	return nil
}

Java

לפני שמנסים להריץ את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Java במאמר התקנת ספריות הלקוח. מידע נוסף מופיע ב מאמרי העזרה של Managed Service for Apache Kafka Java API.

כדי לבצע אימות לשירות המנוהל ל-Apache Kafka, מגדירים את ה-Application Default Credentials. מידע נוסף זמין במאמר הגדרת ADC לסביבת פיתוח מקומית.


import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.longrunning.OperationSnapshot;
import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.retrying.TimedRetryAlgorithm;
import com.google.cloud.managedkafka.v1.AccessConfig;
import com.google.cloud.managedkafka.v1.CapacityConfig;
import com.google.cloud.managedkafka.v1.Cluster;
import com.google.cloud.managedkafka.v1.CreateClusterRequest;
import com.google.cloud.managedkafka.v1.GcpConfig;
import com.google.cloud.managedkafka.v1.LocationName;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
import com.google.cloud.managedkafka.v1.NetworkConfig;
import com.google.cloud.managedkafka.v1.OperationMetadata;
import com.google.cloud.managedkafka.v1.RebalanceConfig;
import java.time.Duration;
import java.util.concurrent.ExecutionException;

public class CreateCluster {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String subnet = "my-subnet"; // e.g. projects/my-project/regions/my-region/subnetworks/my-subnet
    int cpu = 3;
    long memoryBytes = 3221225472L; // 3 GiB
    createCluster(projectId, region, clusterId, subnet, cpu, memoryBytes);
  }

  public static void createCluster(
      String projectId, String region, String clusterId, String subnet, int cpu, long memoryBytes)
      throws Exception {
    CapacityConfig capacityConfig =
        CapacityConfig.newBuilder().setVcpuCount(cpu).setMemoryBytes(memoryBytes).build();
    NetworkConfig networkConfig = NetworkConfig.newBuilder().setSubnet(subnet).build();
    GcpConfig gcpConfig =
        GcpConfig.newBuilder()
            .setAccessConfig(AccessConfig.newBuilder().addNetworkConfigs(networkConfig).build())
            .build();
    RebalanceConfig rebalanceConfig =
        RebalanceConfig.newBuilder()
            .setMode(RebalanceConfig.Mode.AUTO_REBALANCE_ON_SCALE_UP)
            .build();
    Cluster cluster =
        Cluster.newBuilder()
            .setCapacityConfig(capacityConfig)
            .setGcpConfig(gcpConfig)
            .setRebalanceConfig(rebalanceConfig)
            .build();

    // Create the settings to configure the timeout for polling operations
    ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
    TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
        RetrySettings.newBuilder()
            .setTotalTimeoutDuration(Duration.ofHours(1L))
            .build());
    settingsBuilder.createClusterOperationSettings()
        .setPollingAlgorithm(timedRetryAlgorithm);

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
        settingsBuilder.build())) {

      CreateClusterRequest request =
          CreateClusterRequest.newBuilder()
              .setParent(LocationName.of(projectId, region).toString())
              .setClusterId(clusterId)
              .setCluster(cluster)
              .build();

      // The duration of this operation can vary considerably, typically taking between 10-40
      // minutes.
      OperationFuture<Cluster, OperationMetadata> future =
          managedKafkaClient.createClusterOperationCallable().futureCall(request);

      // Get the initial LRO and print details.
      OperationSnapshot operation = future.getInitialFuture().get();
      System.out.printf("Cluster creation started. Operation name: %s\nDone: %s\nMetadata: %s\n",
          operation.getName(),
          operation.isDone(),
          future.getMetadata().get().toString());

      while (!future.isDone()) {
        // The pollingFuture gives us the most recent status of the operation
        RetryingFuture<OperationSnapshot> pollingFuture = future.getPollingFuture();
        OperationSnapshot currentOp = pollingFuture.getAttemptResult().get();
        System.out.printf("Polling Operation:\nName: %s\n Done: %s\n",
            currentOp.getName(),
            currentOp.isDone());
      }

      // NOTE: future.get() blocks completion until the operation is complete (isDone =  True)
      Cluster response = future.get();
      System.out.printf("Created cluster: %s\n", response.getName());
    } catch (ExecutionException e) {
      System.err.printf("managedKafkaClient.createCluster got err: %s", e.getMessage());
    }
  }
}

Python

לפני שמנסים להריץ את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Python במאמר התקנת ספריות הלקוח. מידע נוסף מופיע ב מאמרי העזרה של ה-API בשפת Python של שירות מנוהל ל-Apache Kafka.

כדי לבצע אימות לשירות המנוהל ל-Apache Kafka, מגדירים את ה-Application Default Credentials. מידע נוסף זמין במאמר הגדרת ADC לסביבת פיתוח מקומית.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud import managedkafka_v1

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# subnet = "projects/my-project-id/regions/us-central1/subnetworks/default"
# cpu = 3
# memory_bytes = 3221225472

client = managedkafka_v1.ManagedKafkaClient()

cluster = managedkafka_v1.Cluster()
cluster.name = client.cluster_path(project_id, region, cluster_id)
cluster.capacity_config.vcpu_count = cpu
cluster.capacity_config.memory_bytes = memory_bytes
cluster.gcp_config.access_config.network_configs = [
    managedkafka_v1.NetworkConfig(subnet=subnet)
]
cluster.rebalance_config.mode = (
    managedkafka_v1.RebalanceConfig.Mode.AUTO_REBALANCE_ON_SCALE_UP
)

request = managedkafka_v1.CreateClusterRequest(
    parent=client.common_location_path(project_id, region),
    cluster_id=cluster_id,
    cluster=cluster,
)

try:
    operation = client.create_cluster(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    # The duration of this operation can vary considerably, typically taking 10-40 minutes.
    # We can set a timeout of 3000s (50 minutes).
    response = operation.result(timeout=3000)
    print("Created cluster:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e.message}")

מעקב אחר פעולת יצירת האשכול

אפשר להריץ את הפקודה הבאה רק אם הפעלתם את ה-CLI של gcloud כדי ליצור את האשכול.

  • יצירת אשכול אורכת בדרך כלל 20-30 דקות. כדי לעקוב אחרי התקדמות היצירה של האשכול, הפקודה gcloud managed-kafka clusters create משתמשת בפעולה ארוכת טווח (LRO), שאפשר לעקוב אחריה באמצעות הפקודה הבאה:

    gcloud managed-kafka operations describe OPERATION_ID \
        --location=LOCATION
    

    מחליפים את מה שכתוב בשדות הבאים:

    • OPERATION_ID עם הערך של מזהה הפעולה מהקטע הקודם.
    • LOCATION עם ערך המיקום מהקטע הקודם.

פתרון בעיות

מידע על פתרון בעיות שקשורות ליצירת אשכול זמין במאמר בנושא שגיאות ביצירת אשכול Kafka.

מה השלב הבא?

Apache Kafka®‎ הוא סימן מסחרי רשום של The Apache Software Foundation או של השותפים העצמאיים שלה בארצות הברית או במדינות אחרות.