כתיבה והרצה של משימות Spark Scala ב-Dataproc

במדריך הזה מוסברות דרכים שונות ליצור עבודת Spark Scala ולשלוח אותה לאשכול Dataproc, כולל:

  • לכתוב ולבצע קומפילציה של אפליקציית Spark Scala 'Hello World' במחשב מקומי משורת הפקודה באמצעות Scala REPL (Read-Evaluate-Print-Loop או מתורגמן אינטראקטיבי) או כלי הבנייה SBT
  • חבילה של מחלקות Scala שעברו קומפילציה לקובץ jar עם מניפסט
  • שליחת קובץ ה-Scala jar למשימת Spark שפועלת באשכול Dataproc
  • בדיקת פלט של משימת Scala במסוף Google Cloud

במדריך הזה מוסבר גם איך:

  • לכתוב ולהריץ משימת MapReduce של Spark Scala‏ WordCount ישירות באשכול Dataproc באמצעות spark-shell REPL

  • להריץ דוגמאות של Apache Spark ו-Hadoop שמותקנות מראש באשכול

הגדרת פרויקט ב-Google Cloud Platform

אם עדיין לא עשיתם את זה:

  1. להגדרת פרויקט
  2. יצירת קטגוריה של Cloud Storage
  3. יצירת אשכול Dataproc

כתיבה והידור של קוד Scala באופן מקומי

כדי להתנסות בפעולה פשוטה במדריך הזה, כותבים אפליקציית Scala עם הפלט "Hello World" באמצעות Scala REPL או ממשק שורת הפקודה של SBT באופן מקומי במחשב הפיתוח.

שימוש ב-Scala

  1. מורידים את הקבצים הבינאריים של Scala מהדף Scala Install
  2. פורקים את הקובץ, מגדירים את משתנה הסביבה SCALA_HOME ומוסיפים אותו לנתיב, כמו שמוסבר בהוראות של Scala Install. לדוגמה:

    export SCALA_HOME=/usr/local/share/scala
    export PATH=$PATH:$SCALA_HOME/
    

  3. הפעלת Scala REPL

    $ scala
    Welcome to Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala>
    

  4. מעתיקים את הקוד HelloWorld ומדביקים אותו ב-Scala REPL

    object HelloWorld {
      def main(args: Array[String]): Unit = {
        println("Hello, world!")
      }
    }

  5. שומרים HelloWorld.scala ויוצאים מ-REPL

    scala> :save HelloWorld.scala
    scala> :q
    

  6. קומפילציה עם scalac

    $ scalac HelloWorld.scala
    

  7. רשימה של קובצי .class שעברו קומפילציה

    $ ls HelloWorld*.class
    HelloWorld$.class   HelloWorld.class
    

שימוש ב-SBT

  1. הורדת SBT

  2. יוצרים פרויקט בשם HelloWorld, כמו שמוצג בהמשך

    $ mkdir hello
    $ cd hello
    $ echo \
    'object HelloWorld {def main(args: Array[String]) = println("Hello, world!")}' > \
    HelloWorld.scala
    

  3. יוצרים קובץ תצורה sbt.build כדי להגדיר את artifactName (השם של קובץ ה-JAR שייווצר בהמשך) ל-HelloWorld.jar (ראו שינוי ארטיפקטים שמוגדרים כברירת מחדל)

    echo \
    'artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) =>
    "HelloWorld.jar" }' > \
    build.sbt
    

  4. הפעלת SBT והרצת קוד

    $ sbt
    [info] Set current project to hello ...
    > run
    ... Compiling 1 Scala source to .../hello/target/scala-.../classes...
    ... Running HelloWorld
    Hello, world!
    [success] Total time: 3 s ...
    

  5. אורזים את הקוד בקובץ jar עם מניפסט שמציין את נקודת הכניסה של המחלקה הראשית (HelloWorld), ואז יוצאים.

    > package
    ... Packaging .../hello/target/scala-.../HelloWorld.jar ...
    ... Done packaging.
    [success] Total time: ...
    > exit
    

יצירת קובץ JAR

יוצרים קובץ jar עם SBT או באמצעות הפקודה jar.

יצירת קובץ jar באמצעות SBT

הפקודה package של SBT יוצרת קובץ jar (ראו שימוש ב-SBT).

יצירת קובץ JAR באופן ידני

  1. משנים את הספרייה (cd) לספרייה שמכילה את קובצי HelloWorld*.class שעברו קומפילציה, ואז מריצים את הפקודה הבאה כדי לארוז את קובצי המחלקה בקובץ jar עם מניפסט שמציין את נקודת הכניסה של המחלקה הראשית (HelloWorld).
    $ jar cvfe HelloWorld.jar HelloWorld HelloWorld*.class
    added manifest
    adding: HelloWorld$.class(in = 637) (out= 403)(deflated 36%)
    adding: HelloWorld.class(in = 586) (out= 482)(deflated 17%)
    

העתקת קובץ jar ל-Cloud Storage

  1. משתמשים ב-Google Cloud CLI כדי להעתיק את קובץ ה-JAR לקטגוריה ב-Cloud Storage בפרויקט
$ gcloud storage cp HelloWorld.jar gs://<bucket-name>/
Copying file://HelloWorld.jar [Content-Type=application/java-archive]...
Uploading   gs://bucket-name/HelloWorld.jar:         1.46 KiB/1.46 KiB

שליחת קובץ jar למשימת Spark ב-Dataproc

  1. משתמשים במסוףGoogle Cloud כדי לשלוח את קובץ ה-jar למשימת Dataproc Spark. ממלאים את השדות בדף שליחת משרה באופן הבא:

    • אשכול: בוחרים את שם האשכול מרשימת האשכולות
    • סוג העבודה: Spark
    • Main class or jar: מציינים את נתיב ה-URI של Cloud Storage לקובץ ה-jar של HelloWorld ‏ (gs://your-bucket-name/HelloWorld.jar).

      אם קובץ ה-JAR לא כולל מניפסט שמציין את נקודת הכניסה לקוד ("Main-Class: HelloWorld"), בשדה "Main class or jar" צריך לציין את השם של המחלקה הראשית ("HelloWorld"), ובשדה "Jar files" צריך לציין את נתיב ה-URI לקובץ ה-JAR (gs://your-bucket-name/HelloWorld.jar).

  2. לוחצים על שליחה כדי להתחיל את העבודה. אחרי שהמשימה מתחילה, היא מתווספת לרשימת המשימות.

  3. לוחצים על מזהה משימה כדי לפתוח את הדף משימות, שבו אפשר לראות את הפלט של מנהל ההתקן של המשימה.

כתיבה והפעלה של קוד Spark Scala באמצעות spark-shell REPL של האשכול

יכול להיות שתרצו לפתח אפליקציות Scala ישירות באשכול Dataproc. ‫Hadoop ו-Spark מותקנים מראש באשכולות Dataproc, והם מוגדרים עם מחבר Cloud Storage, שמאפשר לקוד שלכם לקרוא ולכתוב נתונים ישירות מ-Cloud Storage ואליו.

בדוגמה הזו מוסבר איך להתחבר באמצעות SSH לצומת הראשי של אשכול Dataproc בפרויקט, ואז להשתמש ב-REPL של spark-shell כדי ליצור ולהפעיל אפליקציית Scala wordcount mapreduce.

  1. התחברות ל-SSH לצומת הראשי של אשכול Dataproc

    1. עוברים לדף Clusters (אשכולות) של Dataproc בפרויקט ב Google Cloud מסוף, ואז לוחצים על שם האשכול.

    2. בדף הפרטים של האשכול, בוחרים בכרטיסייה VM Instances (מכונות וירטואליות), ואז לוחצים על האפשרות SSH שמופיעה משמאל לשורה עם שם האשכול.

      חלון דפדפן נפתח בספריית הבית בצומת הראשי

  2. מפעילים את spark-shell

    $ spark-shell
    ...
    Using Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    ...
    Spark context available as sc.
    ...
    SQL context available as sqlContext.
    scala>
    

  3. יצירת RDD (מערך נתונים מבוזר עמיד) מקטע טקסט של שייקספיר שנמצא ב-Cloud Storage ציבורי

    scala> val text_file = sc.textFile("gs://pub/shakespeare/rose.txt")
    

  4. מריצים wordcount mapreduce על הטקסט, ואז מציגים את התוצאה wordcounts

    scala> val wordCounts = text_file.flatMap(line => line.split(" ")).map(word =>
    (word, 1)).reduceByKey((a, b) => a + b)
    scala> wordCounts.collect
    ... Array((call,1), (What's,1), (sweet.,1), (we,1), (as,1), (name?,1), (any,1), (other,1),
    (rose,1), (smell,1), (name,1), (a,2), (would,1), (in,1), (which,1), (That,1), (By,1))
    

  5. שומרים את המספרים ב-<bucket-name>/wordcounts-out ב-Cloud Storage, ואז יוצאים מ-scala-shell

    scala> wordCounts.saveAsTextFile("gs://<bucket-name>/wordcounts-out/")
    scala> exit
    

  6. שימוש ב-CLI של gcloud כדי להציג את רשימת קובצי הפלט ואת תוכן הקובץ

    $ gcloud storage ls gs://bucket-name/wordcounts-out/
    gs://spark-scala-demo-bucket/wordcounts-out/
    gs://spark-scala-demo-bucket/wordcounts-out/_SUCCESS
    gs://spark-scala-demo-bucket/wordcounts-out/part-00000
    gs://spark-scala-demo-bucket/wordcounts-out/part-00001
    

  7. בדיקת התוכן של gs://<bucket-name>/wordcounts-out/part-00000

    $ gcloud storage cat gs://bucket-name/wordcounts-out/part-00000
    (call,1)
    (What's,1)
    (sweet.,1)
    (we,1)
    (as,1)
    (name?,1)
    (any,1)
    (other,1)
    

הרצת קוד לדוגמה שהותקן מראש

צומת הראשי של Dataproc מכיל קובצי jar שניתנים להרצה עם דוגמאות סטנדרטיות של Apache Hadoop ו-Spark.

סוג הצנצנת Master node /usr/lib/ location מקור GitHub Apache Docs
Hadoop hadoop-mapreduce/hadoop-mapreduce-examples.jar קישור למקור מדריך ל-MapReduce
Spark spark/lib/spark-examples.jar קישור למקור דוגמאות ל-Spark

שליחת דוגמאות לאשכול משורת הפקודה

אפשר לשלוח דוגמאות ממחשב הפיתוח המקומי באמצעות כלי שורת הפקודה gcloud של Google Cloud CLI (ראו שימוש במסוף לשליחת משימות מהמסוף). Google Cloud Google Cloud

דוגמה ל-Hadoop WordCount

gcloud dataproc jobs submit hadoop --cluster=cluster-name \
    --region=region \
    --jars=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
    --class=org.apache.hadoop.examples.WordCount \
    -- URI of input file URI of output file

דוגמה ל-Spark WordCount

gcloud dataproc jobs submit spark --cluster=cluster-name \
    --region=region \
    --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
    --class=org.apache.spark.examples.JavaWordCount \
    -- URI of input file

כיבוי האשכול

כדי להימנע מחיובים שוטפים, צריך להשבית את האשכול ולמחוק את המשאבים ב-Cloud Storage (קטגוריית Cloud Storage וקבצים) שבהם השתמשתם במדריך הזה.

כדי להשבית אשכול:

gcloud dataproc clusters delete cluster-name \
    --region=region

כדי למחוק את קובץ ה-jar של Cloud Storage:

gcloud storage rm gs://bucket-name/HelloWorld.jar

כדי למחוק קטגוריה ואת כל התיקיות והקבצים שבה, משתמשים בפקודה הבאה:

gcloud storage rm gs://bucket-name/ --recursive

המאמרים הבאים