רכיב Flink אופציונלי ב-Dataproc

אתם יכולים להפעיל רכיבים נוספים כמו Flink כשאתם יוצרים אשכול Dataproc באמצעות התכונה Optional components (רכיבים אופציונליים). בדף הזה מוסבר איך ליצור אשכול Dataproc עם הפעלת הרכיב האופציונלי Apache Flink (אשכול Flink), ולאחר מכן להריץ עבודות Flink באשכול.

אפשר להשתמש באשכול Flink כדי:

  1. הפעלת משימות Flink באמצעות משאב Dataproc Jobs מתוך מסוף Google Cloud , Google Cloud CLI או Dataproc API.

  2. מריצים משימות Flink באמצעות flink CLI שפועל בצומת הראשי של אשכול Flink.

  3. הרצת משימות Apache Beam ב-Flink

  4. הרצת Flink באשכול עם Kerberos

אתם יכולים להשתמש במסוף Google Cloud , ב-Google Cloud CLI או ב-Dataproc API כדי ליצור אשכול Dataproc שרכיב Flink מופעל בו.

המלצה: כדאי להשתמש באשכול של מכונות וירטואליות עם מאסטר אחד ורכיב Flink. אשכולות במצב זמינות גבוהה של Dataproc (עם 3 מכונות וירטואליות ראשיות) לא תומכים במצב זמינות גבוהה של Flink.

  • משתמשים בקישור Flink History Server בComponent Gateway כדי לראות את Flink History Server שפועל באשכול Flink.
  • משתמשים ב-YARN ResourceManager link ב-Component Gateway כדי להציג את ממשק האינטרנט של Flink Job Manager שפועל באשכול Flink .
  • יוצרים Dataproc Persistent History Server כדי לראות קבצים של היסטוריית משימות Flink שנכתבו על ידי אשכולות Flink קיימים ומחוקים.

אפשר להריץ משימות Flink באמצעות המשאב Jobs של Dataproc מGoogle Cloud מסוף Google Cloud, מ-Google Cloud CLI או מ-Dataproc API.

המסוף

כדי לשלוח משימת ספירת מילים לדוגמה ב-Flink מהמסוף:

  1. פותחים את הדף Dataproc Submit a job במסוףGoogle Cloud בדפדפן.

  2. ממלאים את השדות בדף שליחת משרה:

    1. בוחרים את שם האשכול מתוך רשימת האשכולות.
    2. מגדירים את Job type (סוג העבודה) לערך Flink.
    3. מגדירים את Main class or jar (הכיתה או קובץ ה-JAR הראשיים) ל-org.apache.flink.examples.java.wordcount.WordCount.
    4. מגדירים את Jar files לערך file:///usr/lib/flink/examples/batch/WordCount.jar.
      • file:/// מציין קובץ שנמצא באשכול. ‫Dataproc התקין את WordCount.jar כשהוא יצר את אשכול Flink.
      • אפשר להזין בשדה הזה גם נתיב של Cloud Storage‏ (gs://BUCKET/JARFILE) או נתיב של מערכת קבצים מבוזרת של Hadoop‏ (HDFS)‏ (hdfs://PATH_TO_JAR).
  3. לוחצים על שליחה.

    • הפלט של מניע המשרה מוצג בדף Job details (פרטי המשרה).
    • עבודות Flink מופיעות בדף Jobs של Dataproc במסוף Google Cloud .
    • כדי להפסיק או למחוק משימה, לוחצים על הפסקה או על מחיקה בדף משימות או בדף פרטי המשימה.

gcloud

כדי לשלוח משימת Flink לאשכול Dataproc Flink, מריצים את הפקודה gcloud dataproc jobs submit של ה-CLI של gcloud באופן מקומי בחלון טרמינל או ב-Cloud Shell.

gcloud dataproc jobs submit flink \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --class=MAIN_CLASS \
    --jar=JAR_FILE \
    -- JOB_ARGS

הערות:

  • CLUSTER_NAME: מציינים את השם של אשכול Dataproc Flink שאליו רוצים לשלוח את המשימה.
  • REGION: מציינים אזור של Compute Engine שבו נמצא האשכול.
  • MAIN_CLASS: מציינים את המחלקה main של אפליקציית Flink, למשל:
    • org.apache.flink.examples.java.wordcount.WordCount
  • JAR_FILE: מציינים את קובץ ה-JAR של אפליקציית Flink. אתם יכולים לציין:
    • קובץ jar שהותקן באשכול, באמצעות הקידומת file:///` :
      • file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
      • file:///usr/lib/flink/examples/batch/WordCount.jar
    • קובץ jar ב-Cloud Storage: gs://BUCKET/JARFILE
    • קובץ jar ב-HDFS: hdfs://PATH_TO_JAR
  • JOB_ARGS: אפשר להוסיף ארגומנטים של המשימה אחרי המקף הכפול (--).

  • אחרי ששולחים את העבודה, הפלט של מנהל העבודה מוצג בטרמינל המקומי או ב-Cloud Shell.

    Program execution finished
    Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished.
    Job Runtime: 13610 ms
    ...
    (after,1)
    (and,12)
    (arrows,1)
    (ay,1)
    (be,4)
    (bourn,1)
    (cast,1)
    (coil,1)
    (come,1)

REST

בקטע הזה נסביר איך לשלוח משימת Flink לאשכול Dataproc Flink באמצעות Dataproc jobs.submit API.

לפני שמשתמשים בנתוני הבקשה, צריך להחליף את הנתונים הבאים:

  • PROJECT_ID: Google Cloud מזהה הפרויקט
  • REGION: cluster region
  • CLUSTER_NAME: מציינים את השם של אשכול Dataproc Flink שאליו רוצים לשלוח את העבודה

ה-method של ה-HTTP וכתובת ה-URL:

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit

תוכן בקשת JSON:

{
  "job": {
    "placement": {
      "clusterName": "CLUSTER_NAME"
    },
    "flinkJob": {
      "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
      "jarFileUris": [
        "file:///usr/lib/flink/examples/batch/WordCount.jar"
      ]
    }
  }
}

כדי לשלוח את הבקשה צריך להרחיב אחת מהאפשרויות הבאות:

אתם אמורים לקבל תגובת JSON שדומה לזו:

{
  "reference": {
    "projectId": "PROJECT_ID",
    "jobId": "JOB_ID"
  },
  "placement": {
    "clusterName": "CLUSTER_NAME",
    "clusterUuid": "CLUSTER_UUID"
  },
  "flinkJob": {
    "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/flink/examples/batch/WordCount.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "JOB_UUID"
}
Google Cloud
  • עבודות Flink מופיעות בדף Jobs של Dataproc במסוף Google Cloud .
  • כדי להפסיק או למחוק עבודה, אפשר ללחוץ על Stop או על Delete בדף Jobs או Job details במסוף Google Cloud .

במקום להריץ משימות Flink באמצעות משאב Dataproc Jobs, אפשר להריץ משימות Flink בצומת הראשי של אשכול Flink באמצעות CLI של flink.

בקטעים הבאים מתוארות דרכים שונות להפעלת משימת flink CLI באשכול Dataproc Flink.

  1. מתחברים ל-SSH של הצומת הראשי: משתמשים בכלי SSH כדי לפתוח חלון טרמינל במכונה הווירטואלית הראשית של האשכול.

  2. מגדירים את נתיב המחלקה: מאתחלים את נתיב המחלקה של Hadoop מחלון הטרמינל של SSH במכונה הווירטואלית הראשית של מאסטר אשכולות Flink:

    export HADOOP_CLASSPATH=$(hadoop classpath)
    
  3. הפעלת משימות Flink: אפשר להפעיל משימות Flink במצבי פריסה שונים ב-YARN: מצב אפליקציה, מצב לכל משימה ומצב סשן.

    1. מצב אפליקציה: מצב האפליקציה של Flink נתמך בגרסה 2.0 של Dataproc ואילך. במצב הזה, קוד ה-method של העבודה main() מופעל ב-YARN Job Manager. האשכול מושבת אחרי שהעבודה מסתיימת.

      דוגמה לשליחת משרה:

      flink run-application \
          -t yarn-application \
          -Djobmanager.memory.process.size=1024m \
          -Dtaskmanager.memory.process.size=2048m \
          -Djobmanager.heap.mb=820 \
          -Dtaskmanager.heap.mb=1640 \
          -Dtaskmanager.numberOfTaskSlots=2 \
          -Dparallelism.default=4 \
          /usr/lib/flink/examples/batch/WordCount.jar
      

      הצגת רשימה של משרות פעילות:

      ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
      

      כדי לבטל משימה שפועלת:

      ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
      
    2. מצב לכל משימה: במצב הזה של Flink, קוד ה-method של המשימה main() מופעל בצד הלקוח.

      דוגמה לשליחת משרה:

      flink run \
          -m yarn-cluster \
          -p 4 \
          -ys 2 \
          -yjm 1024m \
          -ytm 2048m \
          /usr/lib/flink/examples/batch/WordCount.jar
      
    3. מצב סשן: מתחילים סשן Flink YARN שפועל לאורך זמן, ואז שולחים משימה אחת או יותר לסשן.

      1. מתחילים סשן: אפשר להתחיל סשן של Flink באחת מהדרכים הבאות:

        1. יוצרים אשכול Flink ומוסיפים את הדגל --metadata flink-start-yarn-session=true לפקודה gcloud dataproc clusters create (ראו יצירת אשכול Dataproc Flink). אם הדגל הזה מופעל, אחרי יצירת האשכול, Dataproc מריץ את הפקודה /usr/bin/flink-yarn-daemon כדי להתחיל סשן Flink באשכול.

          מזהה אפליקציית YARN של הסשן נשמר ב-/tmp/.yarn-properties-${USER}. אפשר להציג את המזהה באמצעות הפקודה yarn application -list.

        2. מריצים את הסקריפט של Flink‏ yarn-session.sh, שמותקן מראש במכונה הווירטואלית הראשית של מאסטר האשכולות, עם הגדרות בהתאמה אישית:

          דוגמה עם הגדרות מותאמות אישית:

          /usr/lib/flink/bin/yarn-session.sh \
              -s 1 \
              -jm 1024m \
              -tm 2048m \
              -nm flink-dataproc \
              --detached
          
        3. מריצים את סקריפט העטיפה של Flink עם הגדרות ברירת המחדל:/usr/bin/flink-yarn-daemon

          . /usr/bin/flink-yarn-daemon
          
      2. שליחת עבודה לסשן: מריצים את הפקודה הבאה כדי לשלוח עבודת Flink לסשן.

        flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
        
        • FLINK_MASTER_URL: כתובת ה-URL, כולל המארח והיציאה, של מכונת ה-VM הראשית של Flink שבה מופעלים הג'ובים. מסירים את התו http:// prefix מכתובת ה-URL. כתובת ה-URL הזו מופיעה בפלט פקודה כשמתחילים סשן של Flink. כדי להציג את כתובת ה-URL הזו בשדה Tracking-URL, מריצים את הפקודה הבאה:
        yarn application -list -appId=<yarn-app-id> | sed 's#http://##'
           ```
        
      3. הצגת רשימת המשימות בהפעלה: כדי להציג רשימת משימות של Flink בהפעלה, מבצעים אחת מהפעולות הבאות:

        • מריצים את הפקודה flink list בלי ארגומנטים. הפקודה מחפשת את מזהה האפליקציה של YARN בסשן ב-/tmp/.yarn-properties-${USER}.

        • מקבלים את מזהה אפליקציית YARN של הסשן מ-/tmp/.yarn-properties-${USER} או מהפלט של yarn application -list, ואז מריצים את <code>flink list -yid YARN_APPLICATION_ID.

        • מריצים את flink list -m FLINK_MASTER_URL.

      4. הפסקת סשן: כדי להפסיק את הסשן, צריך לקבל את מזהה האפליקציה של YARN של הסשן מ-/tmp/.yarn-properties-${USER} או מהפלט של yarn application -list, ואז להריץ אחת מהפקודות הבאות:

        echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
        
        yarn application -kill YARN_APPLICATION_ID
        

אפשר להריץ משימות של Apache Beam ב-Dataproc באמצעות FlinkRunner.

אפשר להריץ משימות Beam ב-Flink בדרכים הבאות:

  1. משימות Java Beam
  2. משרות ניידות של Beam

משימות Java Beam

אורזים את עבודות ה-Beam בקובץ JAR. מספקים את קובץ ה-JAR בחבילה עם יחסי התלות שנדרשים להפעלת העבודה.

בדוגמה הבאה מריצים משימת Java Beam מצומת הראשי של אשכול Dataproc.

  1. יוצרים אשכול Dataproc עם רכיב Flink מופעל.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink.
    • --image-version: גרסת התמונה של האשכול, שקובעת את גרסת Flink שמותקנת באשכול (לדוגמה, אפשר לראות את גרסאות הרכיבים של Apache Flink שמפורטות עבור ארבע גרסאות התמונה האחרונות של 2.0.x).
    • --region: אזור נתמך ב-Dataproc.
    • --enable-component-gateway: הפעלת גישה לממשק המשתמש של Flink Job Manager.
    • --scopes: מאפשר גישה לממשקי API על ידי האשכול Google Cloud (ראו שיטות מומלצות לשימוש בהיקפים). ההגדרה cloud-platform מופעלת כברירת מחדל (אין צורך לכלול את הגדרת הדגל הזו) כשיוצרים אשכול שמשתמש בגרסה 2.1 של תמונת Dataproc או בגרסה מתקדמת יותר.
  2. משתמשים בכלי SSH כדי לפתוח חלון טרמינל בצומת הראשי של אשכול Flink.

  3. מפעילים סשן Flink YARN בצומת הראשי של אשכול Dataproc.

    . /usr/bin/flink-yarn-daemon
    

    חשוב לשים לב לגרסת Flink באשכול Dataproc.

    flink --version
    
  4. במחשב המקומי, יוצרים את הדוגמה הקנונית של ספירת מילים ב-Beam ב-Java.

    בוחרים גרסת Beam שתואמת לגרסת Flink באשכול Dataproc. אפשר לעיין בטבלה תאימות של גרסאות Flink שבה מפורטת התאימות של גרסאות Beam-Flink.

    פותחים את קובץ ה-POM שנוצר. בודקים את גרסת Beam Flink runner שצוינה בתג <flink.artifact.name>. אם גרסת Beam Flink runner בשם ארטיפקט Flink לא תואמת לגרסת Flink באשכול, צריך לעדכן את מספר הגרסה כך שיהיה תואם.

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. אריזה של דוגמה לספירת מילים.

    mvn package -Pflink-runner
    
  6. מעלים את קובץ ה-JAR הארוז, word-count-beam-bundled-0.1.jar (~135 MB) לצומת הראשי של אשכול Dataproc. אתם יכולים להשתמש ב-gcloud storage cp כדי להעביר קבצים מהר יותר אל אשכול Dataproc מ-Cloud Storage.

    1. במסוף המקומי, יוצרים קטגוריה של Cloud Storage ומעלים את קובץ ה-JAR הגדול.

      gcloud storage buckets create BUCKET_NAME
      
      gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. בצומת הראשי של Dataproc, מורידים את קובץ ה-JAR הגדול.

      gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. מריצים את משימת Java Beam בצומת הראשי של אשכול Dataproc.

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. בודקים שהתוצאות נכתבו לקטגוריה של Cloud Storage.

    gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. מפסיקים את סשן Flink YARN.

    yarn application -list
    
    yarn application -kill YARN_APPLICATION_ID
    

משרות Beam ניידות

כדי להריץ משימות Beam שנכתבו ב-Python, ב-Go ובשפות נתמכות אחרות, אפשר להשתמש ב-FlinkRunner וב-PortableRunner כמו שמתואר בדף Flink Runner של Beam (אפשר גם לעיין בתוכנית הדרך של Portability Framework).

בדוגמה הבאה מריצים עבודת Beam ניידת ב-Python מצומת הראשי של אשכול Dataproc.

  1. יוצרים אשכול Dataproc עם הרכיבים Flink ו-Docker מופעלים.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    

    הערות:

    • --optional-components: Flink ו-Docker.
    • --image-version: גרסת התמונה של האשכול, שקובעת את גרסת Flink שמותקנת באשכול (לדוגמה, אפשר לראות את גרסאות הרכיבים של Apache Flink שמופיעות עבור ארבע גרסאות התמונה האחרונות של 2.0.x).
    • --region: אזור Dataproc זמין.
    • --enable-component-gateway: הפעלת גישה לממשק המשתמש של Flink Job Manager.
    • --scopes: מאפשרים גישה לממשקי API על ידי האשכול (ראו שיטות מומלצות לשימוש בהיקפים). Google Cloud ההגדרה cloud-platform מופעלת כברירת מחדל (אין צורך לכלול את הגדרת הדגל הזו) כשיוצרים אשכול שמשתמש בגרסה 2.1 של תמונת Dataproc או בגרסה מתקדמת יותר.
  2. משתמשים ב-CLI של gcloud באופן מקומי או ב-Cloud Shell כדי ליצור קטגוריה של Cloud Storage. תציינו את BUCKET_NAME כשמריצים תוכנית לדוגמה לספירת מילים.

    gcloud storage buckets create BUCKET_NAME
    
  3. בחלון טרמינל במכונת ה-VM של האשכול, מתחילים סשן Flink YARN. רושמים את כתובת ה-URL של Flink master, הכתובת של Flink master שבה מבוצעות המשימות. תציינו את FLINK_MASTER_URL כשמריצים תוכנית לדוגמה לספירת מילים.

    . /usr/bin/flink-yarn-daemon
    

    מציגים את גרסת Flink שמופעלת באשכול Dataproc ורושמים אותה. תציינו את FLINK_VERSION כשמריצים תוכנית לדוגמה לספירת מילים.

    flink --version
    
  4. מתקינים את ספריות Python שנדרשות לעבודה בצומת הראשי של האשכול.

  5. מתקינים גרסת Beam שתואמת לגרסת Flink באשכול.

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. מריצים את הדוגמה של ספירת המילים בצומת הראשי של מאסטר אשכולות (cluster master).

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    

    הערות:

    • --runner: FlinkRunner.
    • --flink_version: FLINK_VERSION, כמו שצוין קודם.
    • --flink_master: FLINK_MASTER_URL, כמו שצוין קודם.
    • --flink_submit_uber_jar: שימוש ב-uber JAR להרצת משימת Beam.
    • --output: BUCKET_NAME, נוצר מוקדם יותר.
  7. מוודאים שהתוצאות נכתבו לדלי.

    gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. מפסיקים את סשן Flink YARN.

    1. מוצאים את מזהה האפליקציה.
    yarn application -list
    
    1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.
    
    yarn application -kill 
    

רכיב Dataproc Flink תומך באשכולות עם Kerberos. כדי לשלוח עבודת Flink או להפעיל אשכול Flink, צריך טיקט קרברוס תקף. כברירת מחדל, כרטיס Kerberos תקף למשך שבעה ימים.

ממשק האינטרנט של Flink Job Manager זמין בזמן שמופעלת משימת Flink או אשכול של סשן Flink. כדי להשתמש בממשק האינטרנט:

  1. יצירת אשכול Dataproc Flink
  2. אחרי יצירת האשכול, לוחצים על Component Gateway YARN ResourceManager link בכרטיסייה Web Interface בדף Cluster details במסוף Google Cloud .
  3. בממשק המשתמש של YARN Resource Manager, מזהים את הרשומה של אפליקציית אשכול Flink. בהתאם לסטטוס ההשלמה של העבודה, יופיע קישור ל-ApplicationMaster או ל-History.
  4. לגבי עבודת סטרימינג שפועלת במשך זמן רב, לוחצים על הקישור ApplicationManager כדי לפתוח את לוח הבקרה של Flink. לגבי עבודה שהושלמה, לוחצים על הקישור History כדי לראות את פרטי העבודה.