יצירת הודעות Avro באמצעות מאגר הסכימות

במאמר הזה מוסבר איך לפתח אפליקציית Java producer שמשתמשת במאגר סכימות (גרסת Preview) כדי ליצור הודעות Apache Avro. האפליקציה כותבת את ההודעות לאשכול של שירות מנוהל ל-Apache Kafka.

לפני שמתחילים

לפני שמתחילים את המדריך הזה, צריך ליצור אשכול חדש של שירות מנוהל ל-Apache Kafka. אם כבר יש לכם אשכול, אתם יכולים לדלג על השלב הזה.

איך יוצרים אשכול

המסוף

  1. עוברים אל הדף שירות מנוהל ל-Apache Kafka > Clusters.

    מעבר אל Clusters

  2. לוחצים על יצירה.
  3. בתיבה שם האשכול, מזינים שם לאשכול.
  4. ברשימה Region, בוחרים מיקום לאשכול.
  5. בקטע Network configuration (תצורת רשת), מגדירים את רשת המשנה שבה אפשר לגשת לאשכול:
    1. בקטע Project (פרויקט), בוחרים את הפרויקט.
    2. בקטע רשת, בוחרים את רשת ה-VPC.
    3. בקטע רשת משנה, בוחרים את רשת המשנה.
    4. לוחצים על סיום.
  6. לוחצים על יצירה.

אחרי שלוחצים על יצירה, מצב האשכול הוא Creating. כשהאשכול מוכן, הסטטוס שלו הוא Active.

gcloud

כדי ליצור אשכול Kafka, מריצים את הפקודה managed-kafka clusters create.

gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

מחליפים את מה שכתוב בשדות הבאים:

  • KAFKA_CLUSTER: שם לאשכול Kafka
  • REGION: המיקום של האשכול
  • PROJECT_ID: מזהה הפרויקט
  • SUBNET_NAME: תת-הרשת שבה רוצים ליצור את האשכול, לדוגמה default

מידע על מיקומים נתמכים זמין במאמר בנושא מיקומים של שירות מנוהל ל-Apache Kafka.

הפקודה פועלת באופן אסינכרוני ומחזירה מזהה פעולה:

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

כדי לעקוב אחרי ההתקדמות של פעולת היצירה, משתמשים בפקודה gcloud managed-kafka operations describe:

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

כשהאשכול מוכן, הפלט מהפקודה הזו כולל את הרשומה state: ACTIVE. מידע נוסף זמין במאמר בנושא מעקב אחרי פעולת יצירת האשכול.

התפקידים הנדרשים

כדי לקבל את ההרשאות שדרושות ליצירה ולהגדרה של מכונת VM של לקוח, צריך לבקש מהאדמין להקצות לכם את תפקידי ה-IAM הבאים בפרויקט:

להסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

יכול להיות שאפשר לקבל את ההרשאות הנדרשות גם באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש.

הגדרת מכונה וירטואלית של לקוח

יוצרים מכונה וירטואלית (VM) של Linux ב-Compute Engine שיכולה לגשת לאשכול Kafka. כשמגדירים את המכונה הווירטואלית, צריך להגדיר את האפשרויות הבאות:

  • אזור. יוצרים את המכונה הווירטואלית באותו אזור שבו נמצא אשכול Kafka.

  • Subnet. יוצרים את המכונה הווירטואלית באותה רשת VPC כמו רשת המשנה שבה השתמשתם בהגדרת אשכול Kafka. מידע נוסף זמין במאמר בנושא הצגת רשתות המשנה של אשכול.

  • היקפי הרשאות. מקצים את https://www.googleapis.com/auth/cloud-platform היקף הגישה למכונה הווירטואלית. היקף ההרשאות הזה מאשר למכונה הווירטואלית לשלוח בקשות אל Managed Kafka API.

בשלבים הבאים מוסבר איך מגדירים את האפשרויות האלה.

המסוף

  1. נכנסים לדף Create an instance במסוף Google Cloud .

    יצירת מופע

  2. בחלונית Machine configuration:

    1. בשדה Name, מציינים שם למכונה. מידע נוסף זמין במאמר מוסכמות למתן שמות למשאבים.

    2. ברשימה Region, בוחרים את אותו אזור כמו אשכול Kafka.

    3. ברשימה Zone, בוחרים אזור.

  3. בתפריט הניווט, לוחצים על Networking (רשת). בחלונית Networking שמופיעה, מבצעים את הפעולות הבאות:

    1. עוברים לקטע Network interfaces.

    2. כדי להרחיב את ממשק ברירת המחדל של הרשת, לוחצים על החץ .

    3. בשדה Network, בוחרים את רשת ה-VPC.

    4. ברשימה Subnetwork, בוחרים את רשת המשנה.

    5. לוחצים על סיום.

  4. בתפריט הניווט, לוחצים על אבטחה. בחלונית Security שמופיעה, מבצעים את הפעולות הבאות:

    1. בקטע Access scopes בוחרים באפשרות Set access for each API.

    2. ברשימת היקפי הגישה, מחפשים את הרשימה הנפתחת Cloud Platform ובוחרים באפשרות Enabled.

  5. לוחצים על יצירה כדי ליצור את המכונה הווירטואלית.

gcloud

כדי ליצור את מכונת ה-VM, משתמשים בפקודה gcloud compute instances create.

gcloud compute instances create VM_NAME \
  --scopes=https://www.googleapis.com/auth/cloud-platform \
  --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
  --zone=ZONE

מחליפים את מה שכתוב בשדות הבאים:

  • VM_NAME: השם של ה-VM
  • PROJECT_ID: מזהה הפרויקט
  • REGION: האזור שבו יצרתם את אשכול Kafka, לדוגמה us-central1
  • SUBNET: תת-רשת באותה רשת VPC כמו תת-הרשת שבה השתמשתם בתצורת האשכול
  • ZONE: אזור באזור שבו יצרתם את האשכול, למשל us-central1-c

מידע נוסף על יצירת מכונה וירטואלית זמין במאמר בנושא יצירת מופע של מכונה וירטואלית ברשת משנה ספציפית.

מתן תפקידים ב-IAM

מקצים את התפקידים הבאים בניהול זהויות והרשאות גישה (IAM) לחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine:

  • Managed Kafka Client‏ (roles/managedkafka.client)
  • אדמין במרשם הסכימות (roles/managedkafka.schemaRegistryAdmin)
  • יצירת אסימונים בחשבון שירות (roles/iam.serviceAccountTokenCreator)
  • יצירת אסימונים מסוג OpenID בחשבון שירות (roles/iam.serviceAccountOpenIdTokenCreator)

המסוף

  1. נכנסים לדף IAM במסוף Google Cloud .

    כניסה לדף IAM

  2. מחפשים את השורה של חשבון השירות של Compute Engine שמוגדר כברירת מחדל ולוחצים על עריכת החשבון הראשי.

  3. לוחצים על Add another role ובוחרים את התפקיד Managed Kafka Client. חוזרים על השלב הזה עבור התפקידים Schema Registry Admin,‏ Service Account Token Creator ו-Service Account OpenID Token Creator.

  4. לוחצים על Save.

gcloud

כדי להקצות תפקידים ב-IAM, משתמשים בפקודה gcloud projects add-iam-policy-binding.

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.client

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.schemaRegistryAdmin

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountTokenCreator

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountOpenIdTokenCreator

מחליפים את מה שכתוב בשדות הבאים:

  • PROJECT_ID: מזהה הפרויקט

  • PROJECT_NUMBER: מספר הפרויקט

כדי לקבל את מספר הפרויקט, מריצים את הפקודה gcloud projects describe:

gcloud projects describe PROJECT_ID

מידע נוסף זמין במאמר איך מוצאים את השם, המספר והמזהה של הפרויקט.

התחברות למכונה הווירטואלית

משתמשים ב-SSH כדי להתחבר למופע של המכונה הווירטואלית.

המסוף

  1. נכנסים לדף VM instances.

    כניסה לדף VM instances

  2. ברשימת המכונות הווירטואליות, מוצאים את שם המכונה הווירטואלית ולוחצים על SSH.

gcloud

כדי להתחבר למכונה הווירטואלית, משתמשים בפקודה gcloud compute ssh.

gcloud compute ssh VM_NAME \
  --project=PROJECT_ID \
  --zone=ZONE

מחליפים את מה שכתוב בשדות הבאים:

  • VM_NAME: השם של ה-VM
  • PROJECT_ID: מזהה הפרויקט
  • ZONE: האזור שבו יצרתם את המכונה הווירטואלית

יכול להיות שיהיה צורך בהגדרה נוספת כדי להשתמש ב-SSH בפעם הראשונה. מידע נוסף זמין במאמר מידע על חיבורי SSH.

הגדרת פרויקט Apache Maven

מריצים את הפקודות הבאות בסשן ה-SSH כדי להגדיר פרויקט Maven.

  1. מתקינים את Java ואת Maven באמצעות הפקודה:

    sudo apt-get install maven openjdk-17-jdk
    
  2. מגדירים פרויקט Apache Maven.

    כדי ליצור חבילה com.google.example בספרייה שנקראת demo, משתמשים בפקודה הבאה:

    mvn archetype:generate -DartifactId=demo -DgroupId=com.google.example\
       -DarchetypeArtifactId=maven-archetype-quickstart\
       -DarchetypeVersion=1.5 -DinteractiveMode=false
    

הגדרת הסכימה וההטמעה שלה ב-Java

בדוגמה הזו, הודעה מייצגת 'משתמש' שיש לו שם ומזהה אופציונלי. הוא תואם לסכימת Avro עם שני שדות: שדה חובה name מסוג string ומספר שלם אופציונלי id. כדי להשתמש בסכימה הזו בתוכנית Java, תצטרכו גם ליצור הטמעה של Java של אובייקט שתואם לסכימה הזו.

  1. עוברים לספריית הפרויקט באמצעות cd demo.

  2. יוצרים את התיקיות לאחסון קובצי הסכימה בקוד:

    mkdir -p src/main/avro
    
  3. כדי ליצור את הגדרת סכימת Avro, מדביקים את הקוד הבא בקובץ שנקרא src/main/avro/User.avsc:

    {
      "namespace": "com.google.example",
      "type": "record",
      "name": "User",
      "fields": [
        {"name": "name", "type": "string"},
        {"name": "id",  "type": ["int", "null"]}
      ]
    }
    
  4. כדי להגדיר את פרויקט Maven לשימוש בתוסף ליצירת קוד Avro Java, מוסיפים את הקוד הבא לצומת build של pom.xml.. שימו לב: יכול להיות שב-pom.xml יש צמתי plugins אחרים בתוך הצומת pluginManagement. בשלב הזה לא משנים את הצומת pluginManagement. הצומת plugins צריך להיות באותה רמה כמו pluginManagement.

    <plugins>
    <plugin>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro-maven-plugin</artifactId>
      <version>1.11.1</version>
      <executions>
        <execution>
          <phase>generate-sources</phase>
          <goals>
            <goal>schema</goal>
          </goals>
          <configuration>
            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
          </configuration>
        </execution>
      </executions>
    </plugin>
    </plugins>
    
  5. מוסיפים את Avro כתלות על ידי הוספת השורה הבאה לסוף הצומת project/dependencies של pom.xml. הערה: לתג pom.xml כבר יש צומת dependencies בתוך התג dependencyManagement. בשלב הזה לא משנים את הצומת dependencyManagement.

    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>1.11.1</version>
    </dependency>
    
  6. יצירת מקורות Java

      mvn generate-sources
    
  7. מריצים את הפקודה הבאה כדי לוודא שקובץ המקור של ההטמעה נוצר. המקור הוא קובץ מחלקת Java שמיישם בנאים, רכיבי גישה, סדרות וביטול סדרות לאובייקטים של User. תשתמשו במחלקה הזו בקוד של היוצר.

    cat src/main/java/com/google/example/User.java
    

מידע נוסף על Apache Avro זמין במדריך לתחילת העבודה עם Apache Avro.

יצירת לקוח של יצרן

בקטע הזה מוסבר איך לכתוב, ליצור ולהפעיל לקוח של יצרן.

הטמעה של ה-Producer

היוצר משתמש ב-KafkaAvroSerializer.java כדי לקודד הודעות ולנהל את הסכימות שלהן. הסריאליזטור מתחבר אוטומטית למאגר הסכימות, רושם את הסכימה תחת נושא, מאחזר את המזהה שלה ואז מבצע סריאליזציה של ההודעה באמצעות Avro. עדיין צריך להגדיר את ה-producer ואת ה-serializer.

  1. כדי ליצור את מחלקת לקוח היצרן, מדביקים את הקוד הבא בקובץ חדש בשם src/main/java/com/google/example/UserProducer.java

    
    package com.google.example;
    import java.util.Properties;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import io.confluent.kafka.serializers.KafkaAvroSerializer;
    
      public class UserProducer {
    
        private static Properties configure() throws Exception {
            Properties p = new Properties();
            p.load(new java.io.FileReader("client.properties"));
            p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
            return p;
        }
    
        public static void main(String[] args) throws Exception {
            Properties p = configure();
    
            KafkaProducer<String, User> producer = new KafkaProducer<String, User>(p);
            final User u = new User("SchemaEnthusiast", 42);
            final String topicName = "newUsers";
            ProducerRecord<String, User>  message =
              new ProducerRecord<String, User>(topicName, "", u);
            producer.send(message, new SendCallback());
            producer.close();
        }
      }
    
  2. מגדירים את מחלקת הקריאה החוזרת ב-src/main/java/com/google/example/SendCallback.java:

    package com.google.example;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    class SendCallback implements Callback {
          public void onCompletion(RecordMetadata m, Exception e){
              if (e == null){
                System.out.println("Produced a message successfully.");
              } else {
                System.out.println(e.getMessage());
              }
          }
    }
    
  3. כדי לקמפל את הקוד הזה, צריך את חבילת org.apache.kafka.clients ואת קוד הסריאליזציה. ארטיפקט ה-Maven של הסריאליזציה מופץ דרך מאגר מותאם אישית. כדי להגדיר את המאגר הזה, מוסיפים את הצומת הבא לצומת project של pom.xml:

      <repositories>
        <repository>
          <id>confluent</id>
          <name>Confluent</name>
          <url>https://packages.confluent.io/maven/</url>
        </repository>
      </repositories>
    
  4. מוסיפים את הטקסט הבא לצומת dependencies בקובץ pom.xml:

       <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-simple</artifactId>
          <version>1.7.32</version>
        </dependency>
        <dependency>
          <groupId>io.confluent</groupId>
          <artifactId>kafka-avro-serializer</artifactId>
          <version>7.8.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>3.7.2</version>
        </dependency>
    
  5. כדי לוודא שכל התלות נפתרה בצורה תקינה, צריך לקמפל את הלקוח:

    mvn compile
    

יצירת מאגר סכימות

כדי ליצור מאגר סכימות, מריצים את הפקודה הבאה:

gcloud beta managed-kafka schema-registries create REGISTRY_ID \
    --location=REGION

מחליפים את מה שכתוב בשדות הבאים:

  • REGISTRY_ID: מזהה ייחודי של מאגר הסכימות החדש. החלק הזה הוא חלק משם המשאב של הרשם. השם צריך להתחיל באות, לכלול רק אותיות (a-z, A-Z), מספרים (0-9) וקווים תחתונים (_), ולהיות באורך של 63 תווים או פחות.

  • REGION: Google Cloud האזור שבו ייווצר מאגר הסכימות. המיקום הזה חייב להיות זהה לאזור של אשכול Kafka או האשכולות שמשתמשים במאגר הזה.

הגדרת הסכימה שיצרת עדיין לא הועלתה למאגר. לקוח היצרן עושה זאת בפעם הראשונה שהוא פועל בשלבים הבאים.

הגדרה והרצה של המפיק

בשלב הזה, המפיק לא יפעל כי הוא לא מוגדר באופן מלא. כדי להגדיר את ה-producer, צריך לספק את ההגדרה של Kafka ושל מאגר הסכימות.

  1. יוצרים קובץ בשם client.properties באותה תיקייה שבה נמצא הקובץ pom.xml ומוסיפים לו את התוכן הבא:

    bootstrap.servers=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
    
    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    
    schema.registry.url=https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/schemaRegistries/REGISTRY_ID
    bearer.auth.credentials.source=CUSTOM
    bearer.auth.custom.provider.class=com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider
    

    מוסיפים את יחסי התלות של Kafka ואת המטפל באימות של מאגר הסכימות לפרויקט Maven על ידי הוספת הקוד הבא לצומת dependencies של pom.xml מעל התלות kafka-avro-serializer:

      <dependency>
          <groupId>com.google.cloud.hosted.kafka</groupId>
          <artifactId>managed-kafka-auth-login-handler</artifactId>
          <version>1.0.6</version>
          <exclusions>
            <exclusion>
              <groupId>io.confluent</groupId>
              <artifactId>kafka-schema-registry-client</artifactId>
            </exclusion>
        </exclusions>
      </dependency>
    

    אם אתם רוצים לראות את ההטמעה של handler האימות של מאגר הסכימות המותאמות אישית, תוכלו לעיין במחלקה GcpBearerAuthCredentialProvider.

  2. קומפילציה והרצה של לקוח המפיק:

    mvn compile -q exec:java -Dexec.mainClass=com.google.example.UserProducer
    

    אם הכול ילך כשורה, תראו את הפלט Produced a message successfully שנוצר על ידי המחלקה SendCallback.

בדיקת הפלט

  1. בודקים שהסכימה User נרשמה תחת שם נושא שנגזר משמות הנושא והסכימה:

    SR_DOMAIN=https://managedkafka.googleapis.com
    SR_PATH=/v1/projects/PROJECT_ID/locations/REGION
    SR_HOST=$SR_DOMAIN/$SR_PATH/schemaRegistries/REGISTRY_ID/subjects
    
    curl -X GET \
      -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)"\
      $SR_HOST
    

    הפלט של הפקודה הזו אמור להיראות כך:

    ["newUsers-value"]
    
  2. בודקים שהסכימה שרשומה במאגר זהה ל-User:

    curl -X GET \
      -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      $SR_HOST/newUsers-value/versions/1
    

    הפלט של הפקודה אמור להיראות כך:

    {
      "subject": "newUsers-value",
      "version": 1,
      "id": 2,
      "schemaType": "AVRO",
      "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.google.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":[\"int\",\"null\"]}]}",
      "references": []
    }
    

הסרת המשאבים

כדי לא לצבור חיובים לחשבון Google Cloud על המשאבים שבהם השתמשתם בדף הזה, פועלים לפי השלבים הבאים:

המסוף

  1. מוחקים את מכונת ה-VM.

    1. נכנסים לדף VM instances.

      כניסה לדף VM instances

    2. בוחרים את המכונה הווירטואלית ולוחצים על מחיקה.

  2. מחיקת מאגר הסכימות.

    1. עוברים לדף Schema registries.

      כניסה למרשמי סכימות

    2. לוחצים על השם של מאגר הסכימות.

    3. לוחצים על Delete.

  3. מוחקים את אשכול Kafka.

    1. עוברים לדף שירות מנוהל ל-Apache Kafka > Clusters (אשכולות).

      מעבר אל Clusters

    2. בוחרים את אשכול Kafka ולוחצים על מחיקה.

gcloud

  1. כדי למחוק את מכונת ה-VM, משתמשים בפקודה gcloud compute instances delete.

    gcloud compute instances delete VM_NAME --zone=ZONE
    
  2. כדי למחוק את מאגר הסכימות, משתמשים בפקודה /sdk/gcloud/reference/managed-kafka/schema-registries/delete.

    gcloud beta managed-kafka schema-registries delete REGISTRY_ID \
      --location=REGION
    
  3. כדי למחוק את אשכול Kafka, משתמשים בפקודה gcloud managed-kafka clusters delete.

    gcloud managed-kafka clusters delete CLUSTER_ID \
      --location=REGION --async
    

המאמרים הבאים

Apache Kafka®‎ ו-Apache Avro הם סימנים מסחריים רשומים של The Apache Software Foundation או של חברות הבת שלה בארצות הברית ו/או במדינות אחרות.
Confluent הוא סימן מסחרי רשום של Confluent, Inc.