שימוש במחבר Cloud Storage עם Apache Spark

במדריך הזה מוסבר איך להריץ קוד לדוגמה שמשתמש במחבר Cloud Storage עם Apache Spark.

Lightning Engine משפר את הקישוריות ל-Cloud Storage כדי לייעל את הביצועים של המנוע המקורי שלו. מחבר Cloud Storage המשופר מצמצם את פעולות המטא-נתונים כדי להוזיל עלויות, ומבצע אופטימיזציה של קבצים שמועברים ל-Cloud Storage כדי לשפר את הביצועים והאמינות של עומסי עבודה (workload) ב-Spark. כדי לבקש גישה מוקדמת לתכונה הזו בגרסת הפרה-אלפא, צריך למלא את טופס הגישה המוקדמת.

מטרות

כותבים משימה פשוטה של ספירת מילים ב-Spark ב-Java, ב-Scala או ב-Python, ואז מריצים את המשימה באשכול Managed Service for Apache Spark.

עלויות

במסמך הזה משתמשים ברכיבים הבאים של Google Cloud, והשימוש בהם כרוך בתשלום:

  • Compute Engine
  • Managed Service for Apache Spark
  • Cloud Storage

כדי להעריך את ההוצאות בהתאם לתחזית השימוש שלכם, אתם יכולים להיעזר במחשבון העלויות.

משתמשים חדשים של Google Cloud ? יכול להיות שאתם זכאים לתקופת ניסיון בחינם.

לפני שמתחילים

כדי להתכונן להרצת הקוד במדריך הזה, פועלים לפי השלבים הבאים.

  1. מגדירים את הפרויקט. אם צריך, מגדירים פרויקט עם ממשקי ה-API של Managed Service for Apache Spark, ‏ Compute Engine ו-Cloud Storage מופעלים, ו-Google Cloud CLI מותקן במחשב המקומי.

    1. נכנסים לחשבון Google Cloud . אם אתם משתמשים חדשים ב- Google Cloud, צרו חשבון כדי שתוכלו להעריך את הביצועים של המוצרים שלנו בתרחישים מהעולם האמיתי. לקוחות חדשים מקבלים בחינם גם קרדיט בשווי 300$ להרצה, לבדיקה ולפריסה של עומסי העבודה.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Roles required to select or create a project

      • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
      • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

      Go to project selector

    3. Verify that billing is enabled for your Google Cloud project.

    4. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

      Roles required to enable APIs

      To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

      Enable the APIs

    5. Create a service account:

      1. Ensure that you have the Create Service Accounts IAM role (roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin). Learn how to grant roles.
      2. In the Google Cloud console, go to the Create service account page.

        Go to Create service account
      3. Select your project.
      4. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

        In the Service account description field, enter a description. For example, Service account for quickstart.

      5. Click Create and continue.
      6. Grant the Project > Owner role to the service account.

        To grant the role, find the Select a role list, then select Project > Owner.

      7. Click Continue.
      8. Click Done to finish creating the service account.

        Do not close your browser window. You will use it in the next step.

    6. Create a service account key:

      1. In the Google Cloud console, click the email address for the service account that you created.
      2. Click Keys.
      3. Click Add key, and then click Create new key.
      4. Click Create. A JSON key file is downloaded to your computer.
      5. Click Close.
    7. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

    8. התקינו את ה-CLI של Google Cloud.

    9. אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

    10. כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:

      gcloud init
    11. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Roles required to select or create a project

      • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
      • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

      Go to project selector

    12. Verify that billing is enabled for your Google Cloud project.

    13. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

      Roles required to enable APIs

      To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

      Enable the APIs

    14. Create a service account:

      1. Ensure that you have the Create Service Accounts IAM role (roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin). Learn how to grant roles.
      2. In the Google Cloud console, go to the Create service account page.

        Go to Create service account
      3. Select your project.
      4. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

        In the Service account description field, enter a description. For example, Service account for quickstart.

      5. Click Create and continue.
      6. Grant the Project > Owner role to the service account.

        To grant the role, find the Select a role list, then select Project > Owner.

      7. Click Continue.
      8. Click Done to finish creating the service account.

        Do not close your browser window. You will use it in the next step.

    15. Create a service account key:

      1. In the Google Cloud console, click the email address for the service account that you created.
      2. Click Keys.
      3. Click Add key, and then click Create new key.
      4. Click Create. A JSON key file is downloaded to your computer.
      5. Click Close.
    16. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

    17. התקינו את ה-CLI של Google Cloud.

    18. אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

    19. כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:

      gcloud init

  2. יצירת קטגוריה של Cloud Storage צריך Cloud Storage כדי לאחסן את נתוני ההדרכה. אם אין לכם קטגוריית יעד מוכנה לשימוש, אתם יכולים ליצור קטגוריית יעד חדשה בפרויקט.

    1. במסוף Google Cloud , נכנסים לדף Buckets של Cloud Storage.

      כניסה לדף Buckets

    2. לוחצים על יצירה.
    3. ממלאים את פרטי הקטגוריה בדף Create a bucket. כדי לעבור לשלב הבא לוחצים על Continue.
      1. בקטע Get started (תחילת העבודה), מבצעים את הפעולות הבאות:
      2. בקטע Choose where to store your data, מבצעים את הפעולות הבאות:
        1. בוחרים סוג מיקום.
        2. בתפריט הנפתח Location type, בוחרים מיקום שבו יישמרו נתוני הקטגוריה באופן קבוע.
        3. כדי להגדיר שכפול בין מאגרי מידע, בוחרים באפשרות הוספת שכפול בין מאגרי מידע באמצעות Storage Transfer Service ופועלים לפי השלבים הבאים:

          הגדרה של רפליקציה בין מאגרי מידע

          1. בתפריט Bucket, בוחרים באפשרות הרצויה.
          2. בקטע הגדרות השכפול, לוחצים על הגדרה כדי להגדיר את ההגדרות של משימת השכפול.

            מופיעה החלונית Configure cross-bucket replication.

            • כדי לסנן אובייקטים לשכפול לפי קידומת של שם האובייקט, מזינים קידומת שרוצים לכלול או להחריג אובייקטים ממנה, ואז לוחצים על הוספת קידומת.
            • כדי להגדיר סוג אחסון לאובייקטים המשוכפלים, בוחרים סוג אחסון בתפריט סוג אחסון. אם מדלגים על השלב הזה, האובייקטים המשוכפלים ישתמשו בסוג האחסון של קטגוריית היעד כברירת מחדל.
            • לוחצים על סיום.
      3. בקטע Choose how to store your data, מבצעים את הפעולות הבאות:
        1. בוחרים default storage class לקטגוריה או Autoclass לניהול אוטומטי של סוג האחסון (storage class) של נתוני הקטגוריה.
        2. כדי להפעיל מרחב שמות היררכי, בקטע Optimize storage for data-intensive workloads, בוחרים באפשרות Enable hierarchical namespace on this bucket.
      4. בקטע Choose how to control access to objects, בוחרים אם הקטגוריה אוכפת public access prevention או לא, ואז בוחרים שיטת בקרת גישה לאובייקטים של הקטגוריה.
      5. בקטע Choose how to protect object data, מבצעים את הפעולות הבאות:
        • בוחרים באחת מהאפשרויות בקטע הגנה על נתונים שרוצים להגדיר לקטגוריה.
          • כדי להפעיל מחיקה עם יכולת שחזור, מסמנים את התיבה מדיניות מחיקה עם יכולת שחזור (לשחזור נתונים) ומציינים את מספר הימים שבהם רוצים לשמור אובייקטים אחרי המחיקה.
          • כדי להגדיר ניהול גרסאות של אובייקטים, מסמנים את התיבה ניהול גרסאות של אובייקטים (לשליטה בגרסאות) ומציינים את מספר הגרסאות המקסימלי לכל אובייקט ואת מספר הימים שאחריהם הגרסאות הלא עדכניות יפוגו.
          • כדי להפעיל את מדיניות שמירת הנתונים על אובייקטים וקטגוריות, לוחצים על תיבת הסימון שמירת נתונים (לצורך תאימות), ואז מבצעים את הפעולות הבאות:
            • כדי להפעיל את הנעילה של שמירת אובייקטים, מסמנים את התיבה הפעלת שמירת אובייקטים.
            • כדי להפעיל את נעילת הקטגוריה, מסמנים את תיבת הסימון הגדרת מדיניות שמירת נתונים בקטגוריה ובוחרים יחידת זמן ואת משך הזמן של תקופת השמירה.
        • כדי לבחור איך להצפין את נתוני האובייקט, מרחיבים את הקטע Data encryption () ובוחרים Data encryption method.
    4. לוחצים על יצירה.

  3. הגדרה של משתני סביבה מקומיים מגדירים משתני סביבה במחשב המקומי. מגדירים את Google Cloud project-id ואת השם של קטגוריית Cloud Storage שבה תשתמשו במדריך הזה. צריך גם לציין את השם והאזור של אשכול קיים או חדש של Managed Service for Apache Spark. בשלב הבא אפשר ליצור אשכול לשימוש במדריך הזה.

    PROJECT=project-id
    
    BUCKET_NAME=bucket-name
    
    CLUSTER=cluster-name
    
    REGION=cluster-region Example: "us-central1"
    

  4. יצירת אשכול של Managed Service for Apache Spark מריצים את הפקודה שלמטה כדי ליצור אשכול עם צומת יחיד של Managed Service for Apache Spark באזור Compute Engine שצוין.

    gcloud dataproc clusters create ${CLUSTER} \
        --project=${PROJECT} \
        --region=${REGION} \
        --single-node
    

  5. העתקת נתונים ציבוריים לקטגוריה של Cloud Storage. מעתיקים קטע טקסט של שייקספיר מנתונים ציבוריים לתיקייה input בקטגוריה שלכם ב-Cloud Storage:

    gcloud storage cp gs://pub/shakespeare/rose.txt \
        gs://${BUCKET_NAME}/input/rose.txt
    

  6. מגדירים סביבת פיתוח של Java (Apache Maven),‏ Scala (SBT) או Python.

הכנת משימת ספירת המילים ב-Spark

בוחרים בכרטיסייה שלמטה כדי לפעול לפי השלבים להכנת חבילת עבודה או קובץ לשליחה לאשכול. אפשר להכין אחד מסוגי העבודות הבאים:

Java

  1. מעתיקים את הקובץ pom.xml למחשב המקומי. בקובץ pom.xml הבא מוגדרים יחסי תלות בספריית Scala ו-Spark, שמוגדר להם היקף provided כדי לציין שקלאסטר Managed Service for Apache Spark יספק את הספריות האלה בזמן הריצה. בקובץ pom.xml לא מצוין יחסי תלות ב-Cloud Storage כי המחבר מיישם את ממשק HDFS הרגיל. כשמשימת Spark ניגשת לקבצים באשכול Cloud Storage (קבצים עם מזהי URI שמתחילים ב-gs://), המערכת משתמשת באופן אוטומטי במחבר Cloud Storage כדי לגשת לקבצים ב-Cloud Storage. .major.minor
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
  2. מעתיקים את הקוד WordCount.java שמופיע בהמשך למכונה המקומית.
    1. יוצרים קבוצה של ספריות עם הנתיב src/main/java/managed-spark/codelab:
      mkdir -p src/main/java/managed-spark/codelab
      
    2. מעתיקים את WordCount.java למכונה המקומית אל src/main/java/managed-spark/codelab:
      cp WordCount.java src/main/java/managed-spark/codelab
      

    WordCount.java הוא עבודת Spark ב-Java שקוראת קובצי טקסט מ-Cloud Storage, מבצעת ספירת מילים ואז כותבת את התוצאות של קובץ הטקסט ב-Cloud Storage.

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
  3. יוצרים את החבילה.
    mvn clean package
    
    אם הבנייה תצליח, ייווצר target/word-count-1.0.jar.
  4. מעבירים את החבילה ל-Cloud Storage.
    gcloud storage cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. מעתיקים את הקובץ build.sbt למחשב המקומי. בקובץ build.sbt הבא מוגדרים יחסי תלות בספריית Scala ו-Spark, שמוגדר להם היקף provided כדי לציין שקלאסטר Managed Service for Apache Spark יספק את הספריות האלה בזמן הריצה. בקובץ build.sbt לא מצוין יחסי תלות ב-Cloud Storage כי המחבר מיישם את ממשק HDFS הרגיל. כשמשימת Spark ניגשת לקובצי אשכול ב-Cloud Storage (קובצים עם מזהי URI שמתחילים ב-gs://), המערכת משתמשת באופן אוטומטי במחבר Cloud Storage כדי לגשת לקובצים ב-Cloud Storage.
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
  2. מעתיקים את word-count.scala למחשב המקומי. זוהי משימת Spark ב-Java שקוראת קובצי טקסט מ-Cloud Storage, מבצעת ספירת מילים ואז כותבת את התוצאות של קובץ הטקסט ל-Cloud Storage.
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
  3. יוצרים את החבילה.
    sbt clean package
    
    אם הבנייה תצליח, ייווצר target/scala-2.11/word-count_2.11-1.0.jar.
  4. מעבירים את החבילה ל-Cloud Storage.
    gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. מעתיקים את word-count.py למחשב המקומי. זוהי משימת Spark ב-Python באמצעות PySpark שקוראת קובצי טקסט מ-Cloud Storage, מבצעת ספירת מילים ואז כותבת את התוצאות של קובץ הטקסט ל-Cloud Storage.
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])

שליחת המשימה

מריצים את הפקודה gcloud הבאה כדי לשלוח את עבודת ספירת המילים לאשכול Managed Service for Apache Spark.

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

צפייה בפלט

אחרי שהעבודה מסתיימת, מריצים את הפקודה הבאה ב-CLI של gcloud כדי לראות את הפלט של ספירת המילים.

gcloud storage cat gs://${BUCKET_NAME}/output/*

הפלט של ספירת המילים אמור להיראות כך:

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)

הסרת המשאבים

אחרי שמסיימים את המדריך, אפשר למחוק את המשאבים שנוצרו, כדי שהם יפסיקו להשתמש במכסה ולצבור חיובים. בסעיפים הבאים מוסבר איך למחוק או להשבית את המשאבים האלו.

מחיקת הפרויקט

הדרך הקלה ביותר לבטל את החיוב היא למחוק את הפרויקט שיצרתם בשביל המדריך הזה.

כדי למחוק את הפרויקט:

  1. במסוף Google Cloud , נכנסים לדף Manage resources.

    כניסה לדף Manage resources

  2. ברשימת הפרויקטים, בוחרים את הפרויקט שרוצים למחוק ולוחצים על Delete.
  3. כדי למחוק את הפרויקט, כותבים את מזהה הפרויקט בתיבת הדו-שיח ולוחצים על Shut down.

מחיקת אשכול Managed Service for Apache Spark

במקום למחוק את הפרויקט, אפשר רק למחוק את האשכול בתוך הפרויקט.

מחיקת הקטגוריה של Cloud Storage

מסוףGoogle Cloud

  1. במסוף Google Cloud , נכנסים לדף Buckets של Cloud Storage.

    כניסה לדף Buckets

  2. לוחצים על תיבת הסימון של הקטגוריה שרוצים למחוק.
  3. כדי למחוק את הקטגוריה, לוחצים על Delete ופועלים לפי ההוראות.

שורת הפקודה

    מוחקים את הקטגוריה:
    gcloud storage buckets delete BUCKET_NAME

המאמרים הבאים