עיבוד של שינוי בשידור חי ב-Bigtable

במדריך הזה מוסבר איך לפרוס צינור נתונים ב-Dataflow לסטרימינג בזמן אמת של שינויים במסד נתונים שמקורם בסטרימינג של שינויים בטבלה ב-Bigtable. הפלט של צינור העיבוד נכתב לסדרה של קבצים ב-Cloud Storage.

מוצגת דוגמה למערך נתונים של אפליקציה להאזנה למוזיקה. במדריך הזה תלמדו איך לעקוב אחרי שירים שהאזינו להם, ואז לדרג את חמשת השירים המובילים בתקופה מסוימת.

המדריך הזה מיועד למשתמשים טכניים שמכירים את כתיבת הקוד ואת פריסת צינורות עיבוד הנתונים ב- Google Cloud.

מטרות

במדריך הזה מוסבר איך:

  • יוצרים טבלת Bigtable עם הפעלה של שידור שינויים.
  • פריסת צינור ב-Dataflow שמבצע טרנספורמציה של נתוני הסטרים של השינויים ומייצא אותם.
  • צפייה בתוצאות של צינור עיבוד הנתונים.

עלויות

במסמך הזה משתמשים ברכיבים הבאים של Google Cloud, והשימוש בהם כרוך בתשלום:

כדי להעריך את ההוצאות בהתאם לתחזית השימוש שלכם, אתם יכולים להיעזר במחשבון העלויות.

משתמשים חדשים של Google Cloud ? יכול להיות שאתם זכאים לתקופת ניסיון בחינם.

כשמסיימים את המשימות שמתוארות במסמך הזה אפשר למחוק את המשאבים שיצרתם כדי להימנע מחיובים נוספים. מידע נוסף זמין בקטע הסרת המשאבים.

לפני שמתחילים

    נכנסים לחשבון Google Cloud . אם אתם משתמשים חדשים ב- Google Cloud, צרו חשבון כדי שתוכלו להעריך את הביצועים של המוצרים שלנו בתרחישים מהעולם האמיתי. לקוחות חדשים מקבלים בחינם גם קרדיט בשווי 300$ להרצה, לבדיקה ולפריסה של עומסי העבודה.

    התקינו את ה-CLI של Google Cloud. אחר כך, אתחלו את ה-CLI של Google Cloud באמצעות הפקודה הבאה:

    gcloud init

    אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

    יוצרים או בוחרים Google Cloud פרויקט.

    תפקידים שנדרשים כדי לבחור או ליצור פרויקט

    • Select a project: כדי לבחור פרויקט לא צריך תפקיד IAM ספציפי – אפשר לבחור כל פרויקט שקיבלתם בו תפקיד.
    • יצירת פרויקט: כדי ליצור פרויקט, צריך את התפקיד Project Creator (יצירת פרויקטים) (roles/resourcemanager.projectCreator), שכולל את ההרשאה resourcemanager.projects.create. איך מקצים תפקידים
    • יוצרים Google Cloud פרויקט:

      gcloud projects create PROJECT_ID

      מחליפים את PROJECT_ID בשם של פרויקט Google Cloud שיוצרים.

    • בוחרים את הפרויקט שיצרתם: Google Cloud

      gcloud config set project PROJECT_ID

      מחליפים את PROJECT_ID בשם הפרויקט ב- Google Cloud .

    מוודאים שהחיוב מופעל בפרויקט Google Cloud .

    מפעילים את Dataflow,‏ Cloud Bigtable API,‏ Cloud Bigtable Admin API ו-Cloud Storage APIs:

    תפקידים שנדרשים להפעלת ממשקי API

    כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאה serviceusage.services.enable. איך מקצים תפקידים

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com

    התקינו את ה-CLI של Google Cloud. אחר כך, אתחלו את ה-CLI של Google Cloud באמצעות הפקודה הבאה:

    gcloud init

    אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

    יוצרים או בוחרים Google Cloud פרויקט.

    תפקידים שנדרשים כדי לבחור או ליצור פרויקט

    • Select a project: כדי לבחור פרויקט לא צריך תפקיד IAM ספציפי – אפשר לבחור כל פרויקט שקיבלתם בו תפקיד.
    • יצירת פרויקט: כדי ליצור פרויקט, צריך את התפקיד Project Creator (יצירת פרויקטים) (roles/resourcemanager.projectCreator), שכולל את ההרשאה resourcemanager.projects.create. איך מקצים תפקידים
    • יוצרים Google Cloud פרויקט:

      gcloud projects create PROJECT_ID

      מחליפים את PROJECT_ID בשם של פרויקט Google Cloud שיוצרים.

    • בוחרים את הפרויקט שיצרתם: Google Cloud

      gcloud config set project PROJECT_ID

      מחליפים את PROJECT_ID בשם הפרויקט ב- Google Cloud .

    מוודאים שהחיוב מופעל בפרויקט Google Cloud .

    מפעילים את Dataflow,‏ Cloud Bigtable API,‏ Cloud Bigtable Admin API ו-Cloud Storage APIs:

    תפקידים שנדרשים להפעלת ממשקי API

    כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאה serviceusage.services.enable. איך מקצים תפקידים

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  1. מעדכנים ומתקינים את cbt CLI .
    gcloud components update
    gcloud components install cbt

הכנת הסביבה

קבל את הקוד

משכפלים את המאגר שמכיל את הקוד לדוגמה. אם כבר הורדתם את המאגר הזה, צריך למשוך כדי לקבל את הגרסה העדכנית.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

יצירת קטגוריה

  • יוצרים קטגוריה של Cloud Storage:
    gcloud storage buckets create gs://BUCKET_NAME
    מחליפים את BUCKET_NAME בשם קטגוריה שעומד בקריטריונים לשמות של קטגוריות.
  • יצירת מכונה של Bigtable

    אפשר להשתמש במכונה קיימת לצורך המדריך הזה או ליצור מכונה עם הגדרות ברירת המחדל באזור שקרוב אליכם.

    צור טבלה

    אפליקציית הדוגמה עוקבת אחרי השירים שהמשתמשים מאזינים להם ומאחסנת את אירועי ההאזנה ב-Bigtable. יוצרים טבלה עם הפעלה של שינוי נתונים בזמן אמת, עם קבוצת עמודות אחת (cf) ועמודה אחת (song), ומשתמשים במזהי משתמשים כמפתחות שורה.

    יוצרים את הטבלה.

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: מזהה הפרויקט שבו אתם משתמשים
    • BIGTABLE_INSTANCE_ID: המזהה של המופע שיכיל את הטבלה החדשה

    הפעלת הפייפליין

    צינור הנתונים הזה מבצע את השינויים הבאים בזרם השינויים:

    1. קריאת השינוי בשידור החי
    2. קבלת שם השיר
    3. מקבץ את אירועי ההאזנה לשירים בחלונות של N שניות
    4. סופר את חמשת השירים המובילים
    5. הפלט של התוצאות

    מריצים את הפייפליין.

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    מחליפים את BIGTABLE_REGION במזהה האזור שבו נמצא מופע Bigtable, לדוגמה, us-east5.

    הסבר על צינור עיבוד הנתונים

    קטעי הקוד הבאים מצינור עיבוד הנתונים יכולים לעזור לכם להבין את הקוד שאתם מריצים.

    קריאת השינויים בשידור החי

    הקוד בדוגמה הזו מגדיר את סטרימינג המקור עם הפרמטרים של מופע וטבלה ספציפיים של Bigtable.

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    איך מקבלים את שם השיר

    כשמאזינים לשיר, שם השיר נכתב במשפחת העמודות cf ובמסווג העמודות song, כך שהקוד מחלץ את הערך מהמוטציה של זרם השינויים ומעביר אותו לשלב הבא בצינור עיבוד הנתונים.

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    ספירת חמשת השירים המובילים

    אפשר להשתמש בפונקציות המובנות של Beam‏ Count ו-Top.of כדי לקבל את חמשת השירים המובילים בחלון הנוכחי.

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    הצגת התוצאות

    הצינור הזה כותב את התוצאות גם לפלט רגיל וגם לקבצים. במקרה של קבצים, המערכת מחלקת את הכתיבה לקבוצות של 10 אלמנטים או לקטעים של דקה אחת.

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    הצגת הפייפליין

    1. נכנסים לדף Dataflow במסוף Google Cloud .

      מעבר אל Dataflow

    2. לוחצים על המשימה שהשם שלה מתחיל ב-song-rank.

    3. בתחתית המסך, לוחצים על הצגה כדי לפתוח את חלונית היומנים.

    4. לוחצים על Worker logs (יומני עובדים) כדי לעקוב אחרי יומני הפלט של מקור הנתונים לשינויים.

    כתיבה בסטרימינג

    משתמשים ב-cbtCLI כדי לכתוב מספר של האזנות לשירים עבור משתמשים שונים בטבלה song-rank. התכונה הזו נועדה לכתוב נתונים במשך כמה דקות כדי לדמות האזנה לשירים בסטרימינג לאורך זמן.

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    צפייה בפלט

    קוראים את הפלט ב-Cloud Storage כדי לראות את השירים הכי פופולריים.

    gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    פלט לדוגמה:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
    

    הסרת המשאבים

    כדי להימנע מחיובים בחשבון Google Cloud בגלל השימוש במשאבים שנעשה במסגרת המדריך הזה, אפשר למחוק את הפרויקט שמכיל את המשאבים, או להשאיר את הפרויקט ולמחוק את המשאבים בנפרד.

    מחיקת הפרויקט

      כדי למחוק Google Cloud פרויקט:

      gcloud projects delete PROJECT_ID

    מחיקת משאבים בודדים

    1. מוחקים את הקטגוריה ואת הקבצים.

      gcloud storage rm --recursive gs://BUCKET_NAME/
      
    2. משביתים את עדכוני התוכן בטבלה.

      gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \
      --clear-change-stream-retention-period
      
    3. מוחקים את הטבלה song-rank.

      cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
      
    4. עוצרים את צינור עיבוד הנתונים של שינוי הנתונים.

      1. מציגים את רשימת המשרות כדי לקבל את מזהה המשרה.

        gcloud dataflow jobs list --region=BIGTABLE_REGION
        
      2. מבטלים את המשימה.

        gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
        

        מחליפים את JOB_ID במזהה המשימה שמוצג אחרי הפקודה הקודמת.

    המאמרים הבאים