רכיב Flink אופציונלי של Managed Service for Apache Spark

אתם יכולים להפעיל רכיבים נוספים כמו Flink כשאתם יוצרים אשכול של Managed Service for Apache Spark באמצעות התכונה רכיבים אופציונליים. בדף הזה מוסבר איך ליצור אשכול של Managed Service for Apache Spark עם הפעלת הרכיב האופציונלי Apache Flink (אשכול Flink), ולאחר מכן להריץ משימות Flink באשכול.

אפשר להשתמש באשכול Flink כדי:

  1. הפעלת משימות Flink באמצעות המשאב Jobs ממסוף Google Cloud , מ-Google Cloud CLI או מ-Dataproc API.

  2. מריצים משימות Flink באמצעות flink CLI שפועל בצומת הראשי של אשכול Flink.

  3. הרצת משימות Apache Beam ב-Flink

  4. הפעלה של Flink באשכול עם Kerberos

אפשר להשתמש במסוף Google Cloud , ב-Google Cloud CLI או ב-Dataproc API כדי ליצור אשכול שרכיב Flink מופעל בו.

המלצה: כדאי להשתמש באשכול של מכונות וירטואליות עם מאסטר אחד ורכיב Flink. אשכולות במצב זמינות גבוהה של Managed Service for Apache Spark (עם 3 מכונות וירטואליות ראשיות) לא תומכים במצב זמינות גבוהה של Flink.

אפשר להריץ משימות Flink באמצעות המשאב Managed Service for Apache Spark Jobs מGoogle Cloud console,‏ Google Cloud CLI או Dataproc API.

המסוף

כדי לשלוח משימת ספירת מילים לדוגמה ב-Flink מהמסוף:

  1. פותחים את הדף Submit a job של Managed Service for Apache Spark במסוףGoogle Cloud בדפדפן.

  2. ממלאים את השדות בדף שליחת משרה:

    1. בוחרים את שם האשכול מתוך רשימת האשכולות.
    2. מגדירים את סוג העבודה לערך Flink.
    3. מגדירים את Main class or jar (הכיתה או קובץ ה-JAR הראשיים) ל-org.apache.flink.examples.java.wordcount.WordCount.
    4. מגדירים את Jar files לערך file:///usr/lib/flink/examples/batch/WordCount.jar.
      • file:/// מציין קובץ שנמצא באשכול. ‫Managed Service for Apache Spark התקין את WordCount.jar כשנוצר אשכול Flink.
      • בשדה הזה אפשר גם להזין נתיב של Cloud Storage‏ (gs://BUCKET/JARFILE) או נתיב של מערכת קבצים מבוזרת של Hadoop‏ (HDFS)‏ (hdfs://PATH_TO_JAR).
  3. לוחצים על שליחה.

    • הפלט של מנהל המשימות מוצג בדף Job details (פרטי המשימה).
    • משימות Flink מופיעות בדף Jobs של Managed Service for Apache Spark במסוף Google Cloud .
    • לוחצים על הפסקה או על מחיקה בדף משימות או פרטי המשימה כדי להפסיק או למחוק משימה.

gcloud

כדי לשלוח משימת Flink לאשכול של Managed Service for Apache Spark Flink, מריצים את הפקודה gcloud dataproc jobs submit של ה-CLI של gcloud באופן מקומי בחלון טרמינל או ב-Cloud Shell.

gcloud dataproc jobs submit flink \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --class=MAIN_CLASS \
    --jar=JAR_FILE \
    -- JOB_ARGS

הערות:

  • CLUSTER_NAME: מציינים את השם של אשכול Managed Service for Apache Spark Flink שאליו רוצים לשלוח את העבודה.
  • REGION: מציינים אזור של Compute Engine שבו נמצא האשכול.
  • MAIN_CLASS: מציינים את המחלקה main של אפליקציית Flink, למשל:
    • org.apache.flink.examples.java.wordcount.WordCount
  • JAR_FILE: מציינים את קובץ ה-JAR של אפליקציית Flink. אתם יכולים לציין:
    • קובץ jar שהותקן באשכול, באמצעות הקידומת file:///` :
      • file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
      • file:///usr/lib/flink/examples/batch/WordCount.jar
    • קובץ jar ב-Cloud Storage: gs://BUCKET/JARFILE
    • קובץ jar ב-HDFS: hdfs://PATH_TO_JAR
  • JOB_ARGS: אפשר להוסיף ארגומנטים של המשימה אחרי הקו המקף הכפול (--).

  • אחרי ששולחים את העבודה, הפלט של מנהל העבודה מוצג בטרמינל המקומי או ב-Cloud Shell.

    Program execution finished
    Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished.
    Job Runtime: 13610 ms
    ...
    (after,1)
    (and,12)
    (arrows,1)
    (ay,1)
    (be,4)
    (bourn,1)
    (cast,1)
    (coil,1)
    (come,1)

REST

בקטע הזה מוסבר איך לשלוח משימת Flink לאשכול Managed Service for Apache Spark Flink באמצעות Managed Service for Apache Spark API‏ jobs.submit.

לפני שמשתמשים בנתוני הבקשה, צריך להחליף את הנתונים הבאים:

  • PROJECT_ID: Google Cloud מזהה הפרויקט
  • REGION: cluster region
  • CLUSTER_NAME: מציינים את השם של אשכול Managed Service for Apache Spark Flink שאליו רוצים לשלוח את המשימה

ה-method של ה-HTTP וכתובת ה-URL:

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit

תוכן בקשת JSON:

{
  "job": {
    "placement": {
      "clusterName": "CLUSTER_NAME"
    },
    "flinkJob": {
      "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
      "jarFileUris": [
        "file:///usr/lib/flink/examples/batch/WordCount.jar"
      ]
    }
  }
}

כדי לשלוח את הבקשה צריך להרחיב אחת מהאפשרויות הבאות:

אתם אמורים לקבל תגובת JSON שדומה לזו:

{
  "reference": {
    "projectId": "PROJECT_ID",
    "jobId": "JOB_ID"
  },
  "placement": {
    "clusterName": "CLUSTER_NAME",
    "clusterUuid": "CLUSTER_UUID"
  },
  "flinkJob": {
    "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/flink/examples/batch/WordCount.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "JOB_UUID"
}
Google Cloud
  • משימות Flink מופיעות בדף Jobs של Managed Service for Apache Spark במסוף Google Cloud .
  • כדי להפסיק או למחוק עבודה, אפשר ללחוץ על Stop או על Delete בדף Jobs או Job details במסוף Google Cloud .

במקום להריץ משימות Flink באמצעות משאב Jobs של Managed Service for Apache Spark, אפשר להריץ משימות Flink בצומת דרייבר של אשכול Flink באמצעות flink CLI.

בקטעים הבאים מתוארות דרכים שונות להפעלת משימת flink CLI באשכול Managed Service for Apache Spark Flink.

  1. התחברות באמצעות SSH לצומת הראשי: משתמשים בכלי SSH כדי לפתוח חלון טרמינל במכונה הווירטואלית הראשית של האשכול.

  2. מגדירים את classpath: מאתחלים את classpath של Hadoop מחלון הטרמינל של SSH במכונה הווירטואלית הראשית של מאסטר אשכולות Flink:

    export HADOOP_CLASSPATH=$(hadoop classpath)
    
  3. הפעלת משימות Flink: אפשר להפעיל משימות Flink במצבי פריסה שונים ב-YARN: מצב אפליקציה, מצב לכל משימה ומצב סשן.

    1. מצב אפליקציה: מצב האפליקציה של Flink נתמך על ידי Managed Service for Apache Spark מגרסה 2.0 ואילך. במצב הזה, קוד ה-method של העבודה main() מופעל ב-YARN Job Manager. האשכול מושבת אחרי שהעבודה מסתיימת.

      דוגמה לשליחת משרה:

      flink run-application \
          -t yarn-application \
          -Djobmanager.memory.process.size=1024m \
          -Dtaskmanager.memory.process.size=2048m \
          -Djobmanager.heap.mb=820 \
          -Dtaskmanager.heap.mb=1640 \
          -Dtaskmanager.numberOfTaskSlots=2 \
          -Dparallelism.default=4 \
          /usr/lib/flink/examples/batch/WordCount.jar
      

      הצגת רשימה של משרות פעילות:

      ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
      

      כדי לבטל עבודה שפועלת:

      ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
      
    2. מצב לכל עבודה: במצב הזה של Flink, קוד ה-method של העבודה main() מופעל בצד הלקוח.

      דוגמה לשליחת משרה:

      flink run \
          -m yarn-cluster \
          -p 4 \
          -ys 2 \
          -yjm 1024m \
          -ytm 2048m \
          /usr/lib/flink/examples/batch/WordCount.jar
      
    3. מצב סשן: מתחילים סשן Flink YARN שפועל לאורך זמן, ואז שולחים משימה אחת או יותר לסשן.

      1. התחלת סשן: אפשר להתחיל סשן Flink באחת מהדרכים הבאות:

        1. יוצרים אשכול Flink ומוסיפים את הדגל --metadata flink-start-yarn-session=true לפקודה gcloud dataproc clusters create (ראו יצירת אשכול Dataproc Flink). אם הדגל הזה מופעל, אחרי יצירת האשכול, Managed Service for Apache Spark מפעיל את הפקודה /usr/bin/flink-yarn-daemon כדי להתחיל סשן Flink באשכול.

          מזהה האפליקציה של YARN בסשן נשמר ב-/tmp/.yarn-properties-${USER}. אפשר להציג את המזהה באמצעות הפקודה yarn application -list.

        2. מריצים את הסקריפט Flink yarn-session.sh שמותקן מראש במכונה הווירטואלית הראשית של מאסטר האשכולות, עם הגדרות בהתאמה אישית:

          דוגמה עם הגדרות מותאמות אישית:

          /usr/lib/flink/bin/yarn-session.sh \
              -s 1 \
              -jm 1024m \
              -tm 2048m \
              -nm flink-dataproc \
              --detached
          
        3. מריצים את סקריפט העטיפה של Flink עם הגדרות ברירת המחדל:/usr/bin/flink-yarn-daemon

          . /usr/bin/flink-yarn-daemon
          
      2. שליחת משימה לסשן: מריצים את הפקודה הבאה כדי לשלוח משימת Flink לסשן.

        flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
        
        • FLINK_MASTER_URL: כתובת ה-URL, כולל המארח והיציאה, של מכונת ה-VM הראשית של Flink שבה מופעלים העבודות. מסירים את התו http:// prefix מכתובת ה-URL. כתובת ה-URL הזו מופיעה בפלט פקודה כשמתחילים סשן של Flink. כדי להציג את כתובת ה-URL הזו בשדה Tracking-URL, מריצים את הפקודה הבאה:
        yarn application -list -appId=<yarn-app-id> | sed 's#http://##'
           ```
        
      3. הצגת רשימת המשימות בהפעלה: כדי להציג רשימה של משימות Flink בהפעלה, מבצעים אחת מהפעולות הבאות:

        • מריצים את הפקודה flink list בלי ארגומנטים. הפקודה מחפשת את מזהה האפליקציה של YARN בסשן ב-/tmp/.yarn-properties-${USER}.

        • מקבלים את מזהה אפליקציית YARN של הסשן מ-/tmp/.yarn-properties-${USER} או מהפלט של yarn application -list, ואז מריצים את הפקודה <code>flink list -yid YARN_APPLICATION_ID.

        • מריצים את flink list -m FLINK_MASTER_URL.

      4. הפסקת סשן: כדי להפסיק את הסשן, צריך לקבל את מזהה האפליקציה של YARN של הסשן מ-/tmp/.yarn-properties-${USER} או מהפלט של yarn application -list, ואז להריץ אחת מהפקודות הבאות:

        echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
        
        yarn application -kill YARN_APPLICATION_ID
        

אפשר להריץ משימות של Apache Beam ב-Managed Service for Apache Spark באמצעות FlinkRunner.

אפשר להריץ משימות Beam ב-Flink בדרכים הבאות:

  1. משימות Java Beam
  2. משרות ניידות של Beam

משימות Java Beam

אורזים את עבודות ה-Beam בקובץ JAR. מספקים את קובץ ה-JAR בחבילה עם יחסי התלות שנדרשים להפעלת העבודה.

בדוגמה הבאה מריצים משימת Java Beam מצומת הראשי של אשכול Managed Service for Apache Spark.

  1. יוצרים אשכול Managed Service for Apache Spark עם רכיב Flink מופעל.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink.
    • --image-version: גרסת התמונה של האשכול, שקובעת את גרסת Flink שמותקנת באשכול (לדוגמה, אפשר לראות את גרסאות הרכיבים של Apache Flink שמופיעות עבור ארבע גרסאות התמונה האחרונות של 2.0.x).
    • --region: אזור נתמך של Managed Service for Apache Spark.
    • --enable-component-gateway: מאפשר גישה לממשק המשתמש של Flink Job Manager.
    • --scopes: מאפשר גישה לממשקי API על ידי האשכול Google Cloud (ראו שיטות מומלצות לשימוש בהיקפים). ההגדרה cloud-platform מופעלת כברירת מחדל (אין צורך לכלול את הגדרת הדגל הזו) כשיוצרים אשכול שמשתמש בגרסה 2.1 ואילך של תמונת Managed Service for Apache Spark.
  2. משתמשים בכלי SSH כדי לפתוח חלון טרמינל בצומת הראשי של אשכול Flink.

  3. מפעילים סשן Flink YARN בצומת מאסטר האשכולות של אשכול Managed Service for Apache Spark.

    . /usr/bin/flink-yarn-daemon
    

    שימו לב לגרסת Flink באשכול Managed Service for Apache Spark.

    flink --version
    
  4. במחשב המקומי, יוצרים את הדוגמה הקנונית של ספירת מילים ב-Beam ב-Java.

    בוחרים גרסת Beam שתואמת לגרסת Flink באשכול Managed Service for Apache Spark. אפשר לעיין בטבלה Flink Version Compatibility שמפרטת את התאימות של גרסאות Beam-Flink.

    פותחים את קובץ ה-POM שנוצר. בודקים את גרסת Beam Flink runner שצוינה בתג <flink.artifact.name>. אם גרסת Beam Flink runner בשם הארטיפקט של Flink לא תואמת לגרסת Flink באשכול, צריך לעדכן את מספר הגרסה כך שיהיה תואם.

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. דוגמה לאריזה של ספירת המילים.

    mvn package -Pflink-runner
    
  6. מעלים את קובץ ה-JAR המאוגד, word-count-beam-bundled-0.1.jar (~135 MB) לצומת הראשי של אשכול Managed Service for Apache Spark. אתם יכולים להשתמש ב-gcloud storage cp כדי להעביר קבצים מהר יותר אל אשכול Managed Service for Apache Spark מ-Cloud Storage.

    1. במסוף המקומי, יוצרים קטגוריה של Cloud Storage ומעלים את קובץ ה-JAR הגדול.

      gcloud storage buckets create BUCKET_NAME
      
      gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. בצומת הראשי של Managed Service for Apache Spark, מורידים את קובץ ה-JAR הגדול.

      gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. מריצים את משימת Java Beam בצומת הדרייבר של אשכול Managed Service for Apache Spark.

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. בודקים שהתוצאות נכתבו לקטגוריה של Cloud Storage.

    gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. מפסיקים את סשן Flink YARN.

    yarn application -list
    
    yarn application -kill YARN_APPLICATION_ID
    

משרות Beam ניידות

כדי להריץ משימות Beam שנכתבו ב-Python, ב-Go ובשפות נתמכות אחרות, אפשר להשתמש ב-FlinkRunner וב-PortableRunner כמו שמתואר בדף Flink Runner של Beam (אפשר גם לעיין בתוכנית הדרך של מסגרת הניוד).

בדוגמה הבאה מריצים משימת Beam ניידת ב-Python מהצומת הראשי של אשכול Managed Service for Apache Spark.

  1. יוצרים אשכול Managed Service for Apache Spark עם הרכיבים Flink ו-Docker מופעלים.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    

    הערות:

    • --optional-components: Flink ו-Docker.
    • --image-version: גרסת התמונה של האשכול, שקובעת את גרסת Flink שמותקנת באשכול (לדוגמה, אפשר לראות את גרסאות הרכיבים של Apache Flink שמופיעות ב-2.0.x image release versions האחרונות וארבע הגרסאות הקודמות).
    • --region: אזור זמין של Managed Service for Apache Spark.
    • --enable-component-gateway: הפעלת גישה לממשק המשתמש של Flink Job Manager.
    • --scopes: מאפשרים גישה לממשקי API על ידי האשכול Google Cloud (ראו שיטות מומלצות לשימוש בהיקפים). ההגדרה cloud-platform מופעלת כברירת מחדל (אין צורך לכלול את הגדרת הדגל הזו) כשיוצרים אשכול שמשתמש בגרסה 2.1 ואילך של תמונת Managed Service for Apache Spark.
  2. משתמשים ב-CLI של gcloud באופן מקומי או ב-Cloud Shell כדי ליצור קטגוריה של Cloud Storage. תציינו את BUCKET_NAME כשמריצים תוכנית לדוגמה לספירת מילים.

    gcloud storage buckets create BUCKET_NAME
    
  3. בחלון טרמינל במכונת ה-VM של האשכול, מתחילים סשן Flink YARN. רושמים את כתובת ה-URL של Flink master, הכתובת של Flink master שבה מבוצעות משימות. תציינו את FLINK_MASTER_URL כשמריצים תוכנית לדוגמה לספירת מילים.

    . /usr/bin/flink-yarn-daemon
    

    מציגים את גרסת Flink שמופעלת באשכול Managed Service for Apache Spark ומציינים אותה. תציינו את FLINK_VERSION כשמריצים תוכנית לדוגמה לספירת מילים.

    flink --version
    
  4. מתקינים את ספריות Python שנדרשות לעבודה במאסטר אשכולות (cluster master).

  5. מתקינים גרסת Beam שתואמת לגרסת Flink באשכול.

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. מריצים את הדוגמה של ספירת המילים בצומת הראשי של האשכול.

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    

    הערות:

    • --runner: FlinkRunner.
    • --flink_version: FLINK_VERSION, כמו שצוין קודם.
    • --flink_master: FLINK_MASTER_URL, כמו שצוין קודם.
    • --flink_submit_uber_jar: שימוש ב-uber JAR להרצת משימת Beam.
    • --output: BUCKET_NAME, נוצר קודם.
  7. מוודאים שהתוצאות נכתבו לדלי.

    gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. מפסיקים את סשן Flink YARN.

    1. מוצאים את מזהה האפליקציה.
    yarn application -list
    
    1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.
    
    yarn application -kill 
    

רכיב Managed Service for Apache Spark Flink תומך באשכולות עם Kerberos. כדי לשלוח עבודת Flink ולשמור אותה או כדי להפעיל אשכול Flink, צריך כרטיס Kerberos תקף. כברירת מחדל, כרטיס Kerberos תקף למשך שבעה ימים.

ממשק האינטרנט של Flink Job Manager זמין בזמן שהפעלתם משימת Flink או אשכול של סשן Flink. כדי להשתמש בממשק האינטרנט:

  1. יצירת אשכול של Managed Service for Apache Spark Flink
  2. אחרי יצירת האשכול, לוחצים על Component Gateway YARN ResourceManager link בכרטיסייה Web Interface בדף Cluster details במסוף Google Cloud .
  3. בממשק המשתמש של YARN Resource Manager, מזהים את הרשומה של אפליקציית אשכול Flink. בהתאם לסטטוס ההשלמה של העבודה, יופיע קישור ל-ApplicationMaster או ל-History.
  4. כדי לפתוח את לוח הבקרה של Flink עבור משימת סטרימינג שפועלת במשך זמן רב, לוחצים על הקישור ApplicationManager. כדי לראות את פרטי המשימה שהושלמה, לוחצים על הקישור History.