הגדרת אפשרויות של צינור עיבוד נתונים ב-Dataflow

בדף הזה מוסבר איך להגדיר אפשרויות של צינור עיבוד נתונים לעבודות Dataflow. אפשרויות צינור העיבוד האלה מגדירות איך ואיפה צינור העיבוד פועל, ובאילו משאבים הוא משתמש.

ההרצה של צינור הנתונים נפרדת מההרצה של תוכנית Apache Beam. תוכנית Apache Beam שכתבתם יוצרת צינור עיבוד נתונים להפעלה מושהית. המשמעות היא שהתוכנית יוצרת סדרה של שלבים שכל רכיב Apache Beam נתמך יכול להפעיל. הפעלות תואמות כוללות את הפעלת Dataflow ב-Google Cloud ואת הפעלת צינור הנתונים ישירות בסביבה מקומית.

אפשר להעביר פרמטרים למשימת Dataflow בזמן הריצה. מידע נוסף על הגדרת אפשרויות של צינורות בזמן הריצה זמין במאמר הגדרת אפשרויות של צינורות.

שימוש באפשרויות של צינורות עיבוד נתונים עם Apache Beam SDKs

אפשר להשתמש בערכות ה-SDK הבאות כדי להגדיר אפשרויות של צינורות עיבוד נתונים לעבודות Dataflow:

  • ‫Apache Beam SDK ל-Python
  • ‫Apache Beam SDK for Java
  • ‫Apache Beam SDK for Go

כדי להשתמש בערכות ה-SDK, צריך להגדיר את רכיב ההפעלה של צינור הנתונים ופרמטרים אחרים של ההפעלה באמצעות המחלקה PipelineOptions של Apache Beam SDK.

יש שתי שיטות להגדרת אפשרויות של צינורות:

  • אפשר להגדיר אפשרויות של פייפליין באופן פרוגרמטי על ידי ציון רשימה של אפשרויות פייפליין.
  • מגדירים את האפשרויות של צינור עיבוד הנתונים ישירות בשורת הפקודה כשמריצים את קוד צינור עיבוד הנתונים.

הגדרת אפשרויות של צינור עיבוד נתונים באופן פרוגרמטי

אפשר להגדיר אפשרויות של צינורות באופן פרוגרמטי על ידי יצירה ושינוי של אובייקט PipelineOptions.

Java

יוצרים אובייקט PipelineOptions באמצעות ה-method‏ PipelineOptionsFactory.fromArgs.

לדוגמה, ראו את הקטע הפעלה בדוגמה של Dataflow בדף הזה.

Python

יוצרים אובייקט PipelineOptions.

לדוגמה, ראו את הקטע הפעלה בדוגמה של Dataflow בדף הזה.

המשך

הגדרת אפשרויות של צינור עיבוד נתונים באופן פרוגרמטי באמצעות PipelineOptions לא נתמכת ב-Apache Beam SDK ל-Go. צריך להשתמש בארגומנטים של שורת הפקודה של Go.

לדוגמה, ראו את הקטע הפעלה בדוגמה של Dataflow בדף הזה.

הגדרת אפשרויות של צינור עיבוד הנתונים בשורת הפקודה

אפשר להגדיר אפשרויות של צינור עיבוד נתונים באמצעות ארגומנטים של שורת פקודה.

Java

התחביר בדוגמה הבאה הוא מצינור העיבוד WordCount במדריך ל-Java.

mvn -Pdataflow-runner compile exec:java \
  -Dexec.mainClass=org.apache.beam.examples.WordCount \
  -Dexec.args="--project=PROJECT_ID \
  --gcpTempLocation=gs://BUCKET_NAME/temp/ \
  --output=gs://BUCKET_NAME/output \
  --runner=DataflowRunner \
  --region=REGION"

מחליפים את מה שכתוב בשדות הבאים:

  • PROJECT_ID: מזהה הפרויקט ב- Google Cloud
  • BUCKET_NAME: שם הקטגוריה של Cloud Storage
  • REGION: a Dataflow region, us-central1

Python

התחביר בדוגמה הבאה הוא מצינור העיבוד WordCount במדריך ל-Python.

python -m apache_beam.examples.wordcount \
  --region DATAFLOW_REGION \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output gs://STORAGE_BUCKET/results/outputs \
  --runner DataflowRunner \
  --project PROJECT_ID \
  --temp_location gs://STORAGE_BUCKET/tmp/

מחליפים את מה שכתוב בשדות הבאים:

  • DATAFLOW_REGION: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה: europe-west1

    הדגל --region מבטל את האזור שמוגדר כברירת מחדל בשרת המטא-נתונים, בלקוח המקומי או במשתני הסביבה.

  • STORAGE_BUCKET: שם הקטגוריה ב-Cloud Storage

  • PROJECT_ID: מזהה הפרויקט Google Cloud

המשך

התחביר בדוגמה הבאה הוא מצינור העיבוד WordCount במדריך Go.

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
   --output gs://BUCKET_NAME/results/outputs \
   --runner dataflow \
   --project PROJECT_ID \
   --region DATAFLOW_REGION \
   --staging_location gs://BUCKET_NAME/binaries/

מחליפים את מה שכתוב בשדות הבאים:

  • BUCKET_NAME: שם הקטגוריה של Cloud Storage

  • PROJECT_ID: מזהה הפרויקט Google Cloud

  • DATAFLOW_REGION: האזור שבו רוצים לפרוס את משימת Dataflow. לדוגמה, europe-west1. הדגל --region מבטל את ברירת המחדל של האזור שמוגדר בשרת המטא-נתונים, בלקוח המקומי או במשתני הסביבה.

הגדרת אפשרויות ניסיוניות של צינור עיבוד הנתונים

ב-SDK של Java,‏ Python ו-Go,‏ experiments pipeline option מאפשר להשתמש בתכונות ניסיוניות של Dataflow או בתכונות שזמינות בגרסת טרום-GA.

הגדרה פרוגרמטית

כדי להגדיר את האפשרות experiments באופן פרוגרמטי, משתמשים בתחביר הבא.

Java

באובייקט PipelineOptions, כוללים את האפשרות experiments באמצעות התחביר הבא. בדוגמה הזו, גודל דיסק האתחול מוגדר ל-80GB באמצעות דגל הניסוי.

options.setExperiments("streaming_boot_disk_size_gb=80")

דוגמה ליצירת אובייקט PipelineOptions מופיעה בקטע הפעלת דוגמה ב-Dataflow בדף הזה.

Python

באובייקט PipelineOptions, כוללים את האפשרות experiments באמצעות התחביר הבא. בדוגמה הזו, גודל דיסק האתחול מוגדר ל-80GB באמצעות דגל הניסוי.

beam_options = PipelineOptions(
  beam_args,
  experiments=['streaming_boot_disk_size_gb=80'])

דוגמה ליצירת אובייקט PipelineOptions מופיעה בקטע הפעלת דוגמה ב-Dataflow בדף הזה.

המשך

הגדרת אפשרויות של צינור עיבוד נתונים באופן פרוגרמטי באמצעות PipelineOptions לא נתמכת ב-Apache Beam SDK ל-Go. צריך להשתמש בארגומנטים של שורת הפקודה של Go.

הגדרה בשורת הפקודה

כדי להגדיר את האפשרות experiments בשורת הפקודה, משתמשים בתחביר הבא.

Java

בדוגמה הזו, גודל דיסק האתחול מוגדר ל-80GB באמצעות דגל הניסוי.

--experiments=streaming_boot_disk_size_gb=80

Python

בדוגמה הזו, גודל דיסק האתחול מוגדר ל-80GB באמצעות דגל הניסוי.

--experiments=streaming_boot_disk_size_gb=80

המשך

בדוגמה הזו, גודל דיסק האתחול מוגדר ל-80GB באמצעות דגל הניסוי.

--experiments=streaming_boot_disk_size_gb=80

הגדרה בתבנית

כדי להפעיל תכונה ניסיונית כשמריצים תבנית Dataflow, משתמשים בדגל --additional-experiments.

תבנית מותאמת אישית

gcloud dataflow jobs run JOB_NAME --additional-experiments=EXPERIMENT[,...]

תבנית Flex

gcloud dataflow flex-template run JOB_NAME --additional-experiments=EXPERIMENT[,...]

גישה לאובייקט של אפשרויות הצינור

כשיוצרים אובייקט Pipeline בתוכנית Apache Beam, מעבירים את PipelineOptions. כששירות Dataflow מפעיל את צינור עיבוד הנתונים, הוא שולח עותק של PipelineOptions לכל עובד.

Java

כדי לגשת אל PipelineOptions בתוך מופע DoFn של טרנספורמציה ParDo, משתמשים בשיטה ProcessContext.getPipelineOptions.

Python

התכונה הזו לא נתמכת ב-Apache Beam SDK ל-Python.

המשך

אפשר לגשת לאפשרויות של צינורות באמצעות beam.PipelineOptions.

הפעלה ב-Dataflow

מריצים את המשימה במשאבים מנוהלים באמצעות שירות ההרצה של Dataflow. Google Cloud כשמריצים את הפייפליין באמצעות Dataflow, נוצרת משימת Dataflow שמשתמשת במשאבי Compute Engine ו-Cloud Storage בפרויקט Google Cloud. מידע על הרשאות ב-Dataflow זמין במאמר אבטחה והרשאות ב-Dataflow.

משימות של Dataflow משתמשות ב-Cloud Storage כדי לאחסן קבצים זמניים במהלך ההרצה של צינור עיבוד הנתונים. כדי להימנע מחיוב על עלויות אחסון מיותרות, צריך להשבית את התכונה 'מחיקה רכה' בדליים שמשמשים את משימות ה-Dataflow לאחסון זמני. מידע נוסף זמין במאמר השבתת מחיקה רכה.

הגדרת אפשרויות חובה

כדי להריץ את צינור עיבוד הנתונים באמצעות Dataflow, מגדירים את האפשרויות הבאות של צינור עיבוד הנתונים:

Java

  • project: מזהה הפרויקט ב- Google Cloud .
  • runner: מפעיל הפייפליין שמריץ את הפייפליין. כדי להפעיל אתGoogle Cloud , צריך להגדיר את הערך DataflowRunner.
  • gcpTempLocation: נתיב ב-Cloud Storage שבו Dataflow יכול לאחסן את רוב הקבצים הזמניים. הבאקט שצוין צריך להיות קיים.

    אם לא מציינים את gcpTempLocation, ‏ Dataflow משתמש בערך של האפשרות tempLocation. אם לא מציינים אף אחת מהאפשרויות האלה, Dataflow יוצר קטגוריה חדשה ב-Cloud Storage.

Python

  • project: מזהה הפרויקט ב- Google Cloud .
  • region: האזור של משימת Dataflow.
  • runner: מפעיל הפייפליין שמריץ את הפייפליין. כדי להפעיל אתGoogle Cloud , צריך להגדיר את הערך DataflowRunner.
  • temp_location: נתיב ב-Cloud Storage שבו Dataflow יכול לאחסן קבצים זמניים של משימות שנוצרו במהלך ההרצה של צינור עיבוד הנתונים.

המשך

  • project: מזהה הפרויקט ב- Google Cloud .
  • region: האזור של משימת Dataflow.
  • runner: מפעיל הפייפליין שמריץ את הפייפליין. כדי להפעיל אתGoogle Cloud , צריך להגדיר את הערך dataflow.
  • staging_location: נתיב ב-Cloud Storage שבו Dataflow יכול לאחסן קבצים זמניים של משימות שנוצרו במהלך ההרצה של צינור עיבוד הנתונים.

הגדרת אפשרויות של צינור עיבוד נתונים באופן פרוגרמטי

בדוגמת הקוד הבאה אפשר לראות איך יוצרים פייפליין באמצעות הגדרה פרוגרמטית של הרצת הפייפליין ואפשרויות נדרשות אחרות להרצת הפייפליין באמצעות Dataflow.

Java

// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()

# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
    beam_args,
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
    region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']

# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
  pass  # build your pipeline here.

המשך

ה-SDK של Apache Beam ל-Go משתמש בארגומנטים בשורת הפקודה של Go. משתמשים ב-flag.Set() כדי להגדיר ערכי דגלים.

// Use the Go flag package to parse custom options.
flag.Parse()

// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")

beam.Init()

// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()

אחרי שיוצרים את צינור עיבוד הנתונים, מציינים את כל פעולות הקריאה, השינוי והכתיבה של צינור עיבוד הנתונים ומריצים אותו.

שימוש באפשרויות של צינורות משורת הפקודה

בדוגמה הבאה מוצג אופן השימוש באפשרויות של פייפליין שצוינו בשורת הפקודה. בדוגמה הזו לא מוגדרות אפשרויות הפייפליין באופן פרוגרמטי.

Java

// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

משתמשים במודול argparse של Python כדי לנתח אפשרויות של שורת פקודה.

# Use Python argparse module to parse custom arguments
import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# For more details on how to use argparse, take a look at:
#   https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
    '--input-file',
    default='gs://dataflow-samples/shakespeare/kinglear.txt',
    help='The file path for the input text to process.')
parser.add_argument(
    '--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()

# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | 'Read files' >> beam.io.ReadFromText(args.input_file)
      | 'Write files' >> beam.io.WriteToText(args.output_path))

המשך

משתמשים בחבילה Go flag package כדי לנתח אפשרויות של שורת פקודה. צריך לנתח את האפשרויות לפני שמתקשרים אל beam.Init(). בדוגמה הזו, output היא אפשרות בשורת הפקודה.

// Define configuration options
var (
  output = flag.String("output", "", "Output file (required).")
)

// Parse options before beam.Init()
flag.Parse()

beam.Init()

// Input validation must be done after beam.Init()
if *output == "" {
  log.Fatal("No output provided!")
}

p := beam.NewPipeline()

אחרי שיוצרים את צינור עיבוד הנתונים, מציינים את כל פעולות הקריאה, השינוי והכתיבה של צינור עיבוד הנתונים, ואז מריצים אותו.

מצבי הפעלה של אמצעי הבקרה

כשתוכנית Apache Beam מפעילה צינור עיבוד נתונים בשירות כמו Dataflow, התוכנית יכולה להפעיל את צינור עיבוד הנתונים באופן אסינכרוני, או לחסום עד להשלמת צינור עיבוד הנתונים. אפשר לשנות את ההתנהגות הזו באמצעות ההנחיות הבאות.

Java

כשתוכנית Apache Beam Java מריצה צינור עיבוד נתונים בשירות כמו Dataflow, היא בדרך כלל מופעלת באופן אסינכרוני. כדי להריץ צינור ולהמתין עד שהעבודה תושלם, צריך להגדיר את DataflowRunner ככלי להרצת צינורות ולהפעיל את pipeline.run().waitUntilFinish() באופן מפורש.

כשמשתמשים ב-DataflowRunner וקוראים ל-waitUntilFinish() באובייקט PipelineResult שמוחזר מ-pipeline.run(), צינור עיבוד הנתונים מופעל ב- Google Cloud , אבל הקוד המקומי מחכה לסיום העבודה בענן ומחזיר את האובייקט הסופי DataflowPipelineJob. בזמן שהמשימה פועלת, שירות Dataflow מדפיס עדכונים לגבי סטטוס המשימה והודעות במסוף בזמן ההמתנה.

Python

כשתוכנית Python של Apache Beam מריצה צינור נתונים בשירות כמו Dataflow, בדרך כלל היא מופעלת באופן אסינכרוני. כדי לחסום עד להשלמת הצינור, משתמשים בשיטה wait_until_finish() של האובייקט PipelineResult, שמוחזר מהשיטה run() של הרץ.

המשך

כשתוכנית Apache Beam Go מריצה צינור עיבוד נתונים ב-Dataflow, היא סינכרונית כברירת מחדל ונחסמת עד לסיום צינור עיבוד הנתונים. אם אתם לא רוצים לחסום, יש לכם שתי אפשרויות:

  1. מפעילים את המשימה בשגרה של Go.

    go func() {
      pr, err := beamx.Run(ctx, p)
      if err != nil {
        // Handle the error.
      }
      // Send beam.PipelineResult into a channel.
      results <- pr
    }()
    // Do other operations while the pipeline runs.
    
  2. משתמשים בדגל --async של שורת הפקודה, שנמצא בחבילה jobopts.

כדי לראות את פרטי ההפעלה, לעקוב אחרי ההתקדמות ולאמת את סטטוס השלמת העבודה, אפשר להשתמש בממשק המעקב של Dataflow או בממשק שורת הפקודה של Dataflow.

שימוש במקורות סטרימינג

Java

אם צינור הנתונים קורא ממקור נתונים לא מוגבל, כמו Pub/Sub, צינור הנתונים מופעל אוטומטית במצב סטרימינג.

Python

אם צינור עיבוד הנתונים משתמש במקור נתונים לא מוגבל, כמו Pub/Sub, צריך להגדיר את האפשרות streaming כ-true.

המשך

אם צינור הנתונים קורא ממקור נתונים לא מוגבל, כמו Pub/Sub, צינור הנתונים מופעל אוטומטית במצב סטרימינג.

עבודות סטרימינג משתמשות בסוג מכונה של Compute Engine עם 2 או יותר vCPU כברירת מחדל.

הפעלה באופן מקומי

במקום להריץ את צינור הנתונים במשאבי ענן מנוהלים, אפשר להריץ אותו באופן מקומי. להרצה מקומית יש יתרונות מסוימים לבדיקה, לניפוי באגים או להרצת הפייפליין על מערכי נתונים קטנים. לדוגמה, בהרצה מקומית לא תהיה תלות בשירות Dataflow המרוחק ובפרויקט Google Cloud המשויך.

כשמשתמשים בהרצה מקומית, צריך להריץ את צינור עיבוד הנתונים עם מערכי נתונים קטנים מספיק כדי להיכנס לזיכרון המקומי. אפשר ליצור מערך נתונים קטן בזיכרון באמצעות טרנספורמציה מסוג Create, או להשתמש בטרנספורמציה מסוג Read כדי לעבוד עם קבצים קטנים מקומיים או מרוחקים. בדרך כלל, הרצה מקומית מספקת דרך מהירה וקלה יותר לבצע בדיקות וניפוי באגים עם פחות תלות חיצונית, אבל היא מוגבלת על ידי הזיכרון שזמין בסביבה המקומית.

בדוגמת הקוד הבאה מוצג אופן הבנייה של צינור עיבוד נתונים שפועל בסביבה המקומית.

Java

// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()

# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)

with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | beam.io.ReadFromText(args.input)
      | beam.io.WriteToText(args.output))

המשך

// Parse options before beam.Init()
flag.Parse()

beam.Init()

p := beam.NewPipeline()

אחרי שיוצרים את צינור העיבוד, מריצים אותו.

יצירת אפשרויות מותאמות אישית של פייפליין

בנוסף לאפשרויות הרגילות PipelineOptions, אתם יכולים להוסיף אפשרויות מותאמות אישית. בנוסף, שורת הפקודה של Apache Beam יכולה לנתח אפשרויות מותאמות אישית באמצעות ארגומנטים של שורת פקודה שצוינו באותו פורמט.

Java

כדי להוסיף אפשרויות משלכם, צריך להגדיר ממשק עם שיטות getter ו-setter לכל אפשרות, כמו בדוגמה הבאה:

public interface MyOptions extends PipelineOptions {
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

Python

כדי להוסיף אפשרויות משלכם, משתמשים בשיטה add_argument() (שמתנהגת בדיוק כמו מודול argparse הרגיל של Python), כמו בדוגמה הבאה:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

המשך

כדי להוסיף אפשרויות משלכם, משתמשים בחבילת Go flag כמו בדוגמה הבאה:

var (
  input  = flag.String("input", "", "")
  output = flag.String("output", "", "")
)

אפשר גם לציין תיאור שמופיע כשמשתמש מעביר את --help כארגומנט בשורת הפקודה, וערך ברירת מחדל.

Java

כדי להגדיר את התיאור ואת ערך ברירת המחדל, משתמשים בהערות, באופן הבא:

public interface MyOptions extends PipelineOptions {
  @Description("My custom command line argument.")
  @Default.String("DEFAULT")
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

מומלץ לרשום את הממשק באמצעות PipelineOptionsFactory ואז להעביר את הממשק כשיוצרים את אובייקט PipelineOptions. כשרושמים את הממשק ב-PipelineOptionsFactory, ‏ --help יכול למצוא את ממשק האפשרויות המותאם אישית ולהוסיף אותו לפלט של הפקודה --help. ‫PipelineOptionsFactory מוודא שהאפשרויות המותאמות אישית תואמות לכל שאר האפשרויות הרשומות.

בדוגמת הקוד הבאה אפשר לראות איך רושמים את ממשק האפשרויות המותאם אישית באמצעות PipelineOptionsFactory:

PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                          .withValidation()
                                          .as(MyOptions.class);

עכשיו אפשר להשתמש ב---myCustomOption=value כארגומנט בשורת הפקודה בצינור.

Python

מגדירים את התיאור ואת ערך ברירת המחדל באופן הבא:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        default='gs://dataflow-samples/shakespeare/kinglear.txt',
        help='The file path for the input text to process.')
    parser.add_argument(
        '--output', required=True, help='The path prefix for output files.')

המשך

מגדירים את התיאור ואת ערך ברירת המחדל באופן הבא:

var (
  input  = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
  output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)