בדף הזה מוסבר איך להגדיר אפשרויות של צינור עיבוד נתונים לעבודות Dataflow. אפשרויות צינור עיבוד הנתונים האלה קובעות איך ואיפה צינור עיבוד הנתונים יפעל ובאילו משאבים הוא ישתמש.
ההרצה של צינור עיבוד הנתונים נפרדת מההרצה של תוכנית Apache Beam. תוכנית Apache Beam שכתבתם יוצרת צינור עיבוד נתונים להפעלה מושהית. המשמעות היא שהתוכנית יוצרת סדרה של שלבים שכל רכיב Apache Beam נתמך יכול להפעיל. בין הרצים התואמים נכללים Dataflow runner ב-Google Cloud ו-direct runner שמבצע את צינור הנתונים ישירות בסביבה מקומית.
אפשר להעביר פרמטרים למשימת Dataflow בזמן הריצה. מידע נוסף על הגדרת אפשרויות של צינורות בזמן ריצה זמין במאמר הגדרת אפשרויות של צינורות.
שימוש באפשרויות של צינורות עיבוד נתונים עם Apache Beam SDKs
אפשר להשתמש בערכות ה-SDK הבאות כדי להגדיר אפשרויות של צינור עיבוד נתונים לעבודות Dataflow:
- Apache Beam SDK ל-Python
- Apache Beam SDK for Java
- Apache Beam SDK for Go
כדי להשתמש בערכות ה-SDK, צריך להגדיר את רכיב ההפעלה של צינור הנתונים ופרמטרים אחרים של ההפעלה באמצעות המחלקה PipelineOptions של Apache Beam SDK.
יש שתי שיטות להגדרת אפשרויות של צינורות:
- אפשר להגדיר אפשרויות של צינורות באופן פרוגרמטי על ידי ציון רשימה של אפשרויות של צינורות.
- מגדירים את האפשרויות של צינור עיבוד הנתונים ישירות בשורת הפקודה כשמריצים את הקוד של צינור עיבוד הנתונים.
הגדרת אפשרויות של צינור עיבוד הנתונים באופן פרוגרמטי
אפשר להגדיר אפשרויות של צינורות באופן פרוגרמטי על ידי יצירה ושינוי של אובייקט PipelineOptions.
Java
יוצרים אובייקט PipelineOptions באמצעות השיטה PipelineOptionsFactory.fromArgs.
לדוגמה, ראו את הקטע הפעלת דוגמה ב-Dataflow בדף הזה.
Python
יוצרים אובייקט PipelineOptions.
לדוגמה, ראו את הקטע הפעלת דוגמה ב-Dataflow בדף הזה.
Go
הגדרת אפשרויות של צינור עיבוד נתונים באופן פרוגרמטי באמצעות PipelineOptions לא נתמכת ב-Apache Beam SDK ל-Go. צריך להשתמש בארגומנטים של שורת הפקודה של Go.
לדוגמה, ראו את הקטע הפעלת דוגמה ב-Dataflow בדף הזה.
הגדרת אפשרויות של צינור עיבוד הנתונים בשורת הפקודה
אפשר להגדיר אפשרויות של צינור עיבוד נתונים באמצעות ארגומנטים של שורת פקודה.
Java
התחביר בדוגמה הבאה הוא מצינור העיבוד WordCount במדריך ל-Java.
mvn -Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=PROJECT_ID \
--gcpTempLocation=gs://BUCKET_NAME/temp/ \
--output=gs://BUCKET_NAME/output \
--runner=DataflowRunner \
--region=REGION"
מחליפים את מה שכתוב בשדות הבאים:
-
PROJECT_ID: מזהה הפרויקט ב- Google Cloud -
BUCKET_NAME: שם הקטגוריה של Cloud Storage -
REGION: a Dataflow region,us-central1
Python
תחביר הדוגמה הבא הוא מצינור העיבוד WordCount במדריך Python.
python -m apache_beam.examples.wordcount \
--region DATAFLOW_REGION \
--input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://STORAGE_BUCKET/results/outputs \
--runner DataflowRunner \
--project PROJECT_ID \
--temp_location gs://STORAGE_BUCKET/tmp/
מחליפים את מה שכתוב בשדות הבאים:
DATAFLOW_REGION: האזור שבו רוצים לפרוס את משימת Dataflow, לדוגמה:europe-west1הדגל
--regionמבטל את האזור שמוגדר כברירת מחדל בשרת המטא-נתונים, בלקוח המקומי או במשתני הסביבה.
STORAGE_BUCKET: שם הקטגוריה של Cloud Storage
PROJECT_ID: מזהה הפרויקט Google Cloud
Go
התחביר בדוגמה הבאה הוא מצינור העיבוד WordCount במדריך Go.
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://BUCKET_NAME/results/outputs \
--runner dataflow \
--project PROJECT_ID \
--region DATAFLOW_REGION \
--staging_location gs://BUCKET_NAME/binaries/מחליפים את מה שכתוב בשדות הבאים:
BUCKET_NAME: שם הקטגוריה של Cloud Storage
PROJECT_ID: מזהה הפרויקט Google Cloud
DATAFLOW_REGION: האזור שבו רוצים לפרוס את משימת Dataflow. לדוגמה,europe-west1. הדגל--regionמבטל את האזור שמוגדר כברירת מחדל בשרת המטא-נתונים, בלקוח המקומי או במשתני הסביבה.
הגדרת אפשרויות ניסיוניות של צינור עיבוד הנתונים
ב-SDK של Java, Python ו-Go, experiments
pipeline option מאפשר להשתמש בתכונות ניסיוניות או בתכונות של Dataflow שזמינות בשלב טרום-GA.
הגדרה פרוגרמטית
כדי להגדיר את האפשרות experiments באופן פרוגרמטי, משתמשים בתחביר הבא.
Java
באובייקט PipelineOptions, כוללים את האפשרות experiments באמצעות התחביר הבא.
בדוגמה הזו, גודל דיסק האתחול מוגדר ל-80GB באמצעות דגל הניסוי.
options.setExperiments("streaming_boot_disk_size_gb=80")
דוגמה ליצירת אובייקט PipelineOptions מופיעה בקטע הפעלת דוגמה ב-Dataflow בדף הזה.
Python
באובייקט PipelineOptions, כוללים את האפשרות experiments באמצעות התחביר הבא.
בדוגמה הזו, גודל דיסק האתחול מוגדר ל-80GB באמצעות דגל הניסוי.
beam_options = PipelineOptions(
beam_args,
experiments=['streaming_boot_disk_size_gb=80'])
דוגמה ליצירת אובייקט PipelineOptions מופיעה בקטע הפעלת דוגמה ב-Dataflow בדף הזה.
Go
הגדרת אפשרויות של צינור עיבוד נתונים באופן פרוגרמטי באמצעות PipelineOptions לא נתמכת ב-Apache Beam SDK ל-Go. צריך להשתמש בארגומנטים של שורת הפקודה של Go.
הגדרה בשורת הפקודה
כדי להגדיר את האפשרות experiments בשורת הפקודה, משתמשים בתחביר הבא.
Java
בדוגמה הזו, גודל דיסק האתחול מוגדר ל-80GB באמצעות דגל הניסוי.
--experiments=streaming_boot_disk_size_gb=80
Python
בדוגמה הזו, גודל דיסק האתחול מוגדר ל-80GB באמצעות דגל הניסוי.
--experiments=streaming_boot_disk_size_gb=80
Go
בדוגמה הזו, גודל דיסק האתחול מוגדר ל-80GB באמצעות דגל הניסוי.
--experiments=streaming_boot_disk_size_gb=80
הגדרה בתבנית
כדי להפעיל תכונה ניסיונית כשמריצים תבנית Dataflow, משתמשים בדגל --additional-experiments.
תבנית מותאמת אישית
gcloud dataflow jobs run JOB_NAME --additional-experiments=EXPERIMENT[,...]
תבנית Flex
gcloud dataflow flex-template run JOB_NAME --additional-experiments=EXPERIMENT[,...]
גישה לאובייקט של אפשרויות הצינור
כשיוצרים אובייקט Pipeline בתוכנית Apache Beam, מעבירים PipelineOptions. כששירות Dataflow מפעיל את צינור עיבוד הנתונים, הוא שולח עותק של PipelineOptions לכל עובד.
Java
אפשר לגשת אל PipelineOptions בתוך מופע של טרנספורמציה ParDo DoFn באמצעות השיטה ProcessContext.getPipelineOptions.
Python
התכונה הזו לא נתמכת ב-Apache Beam SDK ל-Python.
Go
אפשר לגשת לאפשרויות של צינורות באמצעות beam.PipelineOptions.
הפעלה ב-Dataflow
מריצים את המשימה במשאבים מנוהלים Google Cloud באמצעות שירות ההרצה של Dataflow. הפעלת צינור הנתונים באמצעות Dataflow יוצרת משימת Dataflow, שמשתמשת במשאבי Compute Engine ו-Cloud Storage בפרויקט Google Cloud. מידע על הרשאות ב-Dataflow זמין במאמר אבטחה והרשאות ב-Dataflow.
משימות של Dataflow משתמשות ב-Cloud Storage כדי לאחסן קבצים זמניים במהלך ההרצה של צינור עיבוד הנתונים. כדי להימנע מחיוב על עלויות אחסון מיותרות, צריך להשבית את התכונה 'מחיקה עם יכולת שחזור' בדליים שמשמשים את משימות ה-Dataflow לאחסון זמני. מידע נוסף זמין במאמר בנושא השבתת מחיקה עם יכולת שחזור.
הגדרת אפשרויות חובה
כדי להריץ את צינור עיבוד הנתונים באמצעות Dataflow, מגדירים את האפשרויות הבאות של צינור עיבוד הנתונים:
Java
-
project: מזהה הפרויקט ב- Google Cloud . -
runner: הרכיב להרצת צינורות עיבוד נתונים שמבצע את צינור עיבוד הנתונים. כדי להפעיל אתGoogle Cloud , צריך להגדיר את הערךDataflowRunner.
gcpTempLocation: נתיב ב-Cloud Storage שבו Dataflow יכול לאחסן את רוב הקבצים הזמניים. הבאקט שצוין צריך להיות קיים.אם לא מציינים את
gcpTempLocation, Dataflow משתמש בערך של האפשרותtempLocation. אם לא מציינים אף אחת מהאפשרויות האלה, Dataflow יוצר קטגוריה חדשה ב-Cloud Storage.
Python
-
project: מזהה הפרויקט ב- Google Cloud . -
region: האזור של משימת Dataflow. -
runner: הרכיב להרצת צינורות עיבוד נתונים שמבצע את צינור עיבוד הנתונים. כדי להפעיל אתGoogle Cloud , צריך להגדיר את הערךDataflowRunner. -
temp_location: נתיב ב-Cloud Storage שבו Dataflow יכול לאחסן קבצים זמניים של משימות שנוצרו במהלך ההפעלה של צינור עיבוד הנתונים.
Go
-
project: מזהה הפרויקט ב- Google Cloud . -
region: האזור של משימת Dataflow. -
runner: הרכיב להרצת צינורות עיבוד נתונים שמבצע את צינור עיבוד הנתונים. כדי להפעיל אתGoogle Cloud , צריך להגדיר את הערךdataflow. -
staging_location: נתיב ב-Cloud Storage שבו Dataflow יכול לאחסן קבצים זמניים של משימות שנוצרו במהלך ההפעלה של צינור עיבוד הנתונים.
הגדרת אפשרויות של צינור עיבוד הנתונים באופן פרוגרמטי
בדוגמת הקוד הבאה אפשר לראות איך יוצרים צינור עיבוד נתונים באמצעות הגדרה פרוגרמטית של הרצת הצינור ואפשרויות נדרשות אחרות להרצת הצינור באמצעות Dataflow.
Java
// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()
# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
beam_args,
runner='DataflowRunner',
project='my-project-id',
job_name='unique-job-name',
temp_location='gs://my-bucket/temp',
region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']
# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
pass # build your pipeline here.
Go
ה-SDK של Apache Beam ל-Go משתמש בארגומנטים של שורת הפקודה של Go. משתמשים ב-flag.Set() כדי להגדיר ערכי דגלים.
// Use the Go flag package to parse custom options.
flag.Parse()
// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")
beam.Init()
// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()
אחרי שיוצרים את צינור עיבוד הנתונים, מציינים את כל פעולות הקריאה, ההמרות והכתיבה של צינור עיבוד הנתונים ומריצים אותו.
שימוש באפשרויות של צינורות משורת הפקודה
בדוגמה הבאה מוצג אופן השימוש באפשרויות של פייפליין שצוינו בשורת הפקודה. בדוגמה הזו לא מוגדרות אפשרויות הצינור באופן פרוגרמטי.
Java
// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
משתמשים במודול argparse של Python כדי לנתח אפשרויות של שורת פקודה.
# Use Python argparse module to parse custom arguments
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# For more details on how to use argparse, take a look at:
# https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
'--input-file',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()
# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| 'Read files' >> beam.io.ReadFromText(args.input_file)
| 'Write files' >> beam.io.WriteToText(args.output_path))
Go
משתמשים בחבילה Go flag כדי לנתח אפשרויות של שורת פקודה. צריך לנתח את האפשרויות לפני שמתקשרים אל
beam.Init(). בדוגמה הזו, output היא אפשרות בשורת הפקודה.
// Define configuration options
var (
output = flag.String("output", "", "Output file (required).")
)
// Parse options before beam.Init()
flag.Parse()
beam.Init()
// Input validation must be done after beam.Init()
if *output == "" {
log.Fatal("No output provided!")
}
p := beam.NewPipeline()
אחרי שיוצרים את צינור עיבוד הנתונים, מציינים את כל פעולות הקריאה, ההמרות והכתיבה של צינור עיבוד הנתונים, ואז מריצים את צינור עיבוד הנתונים.
מצבי הפעלה של אמצעי הבקרה
כשתוכנית Apache Beam מפעילה צינור עיבוד נתונים בשירות כמו Dataflow, התוכנית יכולה להפעיל את צינור עיבוד הנתונים באופן אסינכרוני, או לחסום עד להשלמת צינור עיבוד הנתונים. אפשר לשנות את ההתנהגות הזו באמצעות ההנחיות הבאות.
Java
כשתוכנית Java של Apache Beam מריצה צינור עיבוד נתונים בשירות כמו Dataflow, היא בדרך כלל מופעלת באופן אסינכרוני. כדי להריץ צינור ולהמתין עד שהעבודה תושלם, צריך להגדיר את DataflowRunner ככלי להרצת צינורות ולקרוא באופן מפורש ל-pipeline.run().waitUntilFinish().
כשמשתמשים ב-DataflowRunner ומפעילים את waitUntilFinish() באובייקט PipelineResult שמוחזר מ-pipeline.run(), צינור עיבוד הנתונים מופעל ב- Google Cloud , אבל הקוד המקומי מחכה לסיום העבודה בענן ומחזיר את האובייקט הסופי DataflowPipelineJob. בזמן שהמשימה פועלת, שירות Dataflow מדפיס עדכונים לגבי סטטוס המשימה והודעות במסוף בזמן ההמתנה.
Python
כשתוכנית Python של Apache Beam מריצה צינור נתונים בשירות כמו Dataflow, היא בדרך כלל מופעלת באופן אסינכרוני. כדי לחסום עד לסיום הצינור, משתמשים בשיטה wait_until_finish() של האובייקט PipelineResult, שמוחזר מהשיטה run() של הרץ.
Go
כשתוכנית Apache Beam Go מריצה צינור עיבוד נתונים ב-Dataflow, היא סינכרונית כברירת מחדל ונחסמת עד לסיום צינור עיבוד הנתונים. אם אתם לא רוצים לחסום, יש לכם שתי אפשרויות:
מפעילים את המשימה בשגרה של Go.
go func() { pr, err := beamx.Run(ctx, p) if err != nil { // Handle the error. } // Send beam.PipelineResult into a channel. results <- pr }() // Do other operations while the pipeline runs.משתמשים בדגל
--asyncשל שורת הפקודה, שנמצא בחבילהjobopts.
כדי לראות את פרטי ההרצה, לעקוב אחרי ההתקדמות ולאמת את סטטוס השלמת העבודה, אפשר להשתמש בממשק המעקב של Dataflow או בממשק שורת הפקודה של Dataflow.
שימוש במקורות סטרימינג
Java
אם צינור הנתונים קורא ממקור נתונים לא מוגבל, כמו Pub/Sub, הוא יפעל אוטומטית במצב סטרימינג.
Python
אם צינור עיבוד הנתונים משתמש במקור נתונים לא מוגבל, כמו Pub/Sub, צריך להגדיר את האפשרות streaming כ-true.
Go
אם צינור הנתונים קורא ממקור נתונים לא מוגבל, כמו Pub/Sub, הוא יפעל אוטומטית במצב סטרימינג.
עבודות סטרימינג משתמשות בסוג מכונה של Compute Engine בגרסה n1-standard-2 ומעלה כברירת מחדל.
הפעלה באופן מקומי
במקום להריץ את צינור העיבוד במשאבי ענן מנוהלים, אפשר להריץ אותו באופן מקומי. להרצה מקומית יש יתרונות מסוימים כשבודקים, מנפים באגים או מריצים את צינור עיבוד הנתונים על מערכי נתונים קטנים. לדוגמה, בהרצה מקומית לא תהיה תלות בשירות Dataflow המרוחק ובפרויקט Google Cloud המשויך.
כשמשתמשים בהרצה מקומית, צריך להריץ את צינור עיבוד הנתונים עם מערכי נתונים קטנים מספיק כדי להיכנס לזיכרון המקומי. אפשר ליצור מערך נתונים קטן בזיכרון באמצעות טרנספורמציה מסוג Create, או להשתמש בטרנספורמציה מסוג Read כדי לעבוד עם קבצים קטנים מקומיים או מרוחקים. בדרך כלל, הרצה מקומית מספקת דרך מהירה וקלה יותר לבצע בדיקות וניפוי באגים עם פחות תלות חיצונית, אבל היא מוגבלת על ידי הזיכרון שזמין בסביבה המקומית.
בדוגמה הבאה של קוד מוצג אופן ההגדרה של צינור עיבוד נתונים שפועל בסביבה המקומית.
Java
// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()
# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| beam.io.ReadFromText(args.input)
| beam.io.WriteToText(args.output))
Go
// Parse options before beam.Init()
flag.Parse()
beam.Init()
p := beam.NewPipeline()
אחרי שיוצרים את צינור הנתונים, מפעילים פתרונות חכמים.
יצירת אפשרויות מותאמות אישית של פייפליין
בנוסף לאפשרויות הרגילות PipelineOptions, אתם יכולים להוסיף אפשרויות מותאמות אישית. בנוסף, שורת הפקודה של Apache Beam יכולה לנתח אפשרויות מותאמות אישית באמצעות ארגומנטים של שורת הפקודה שצוינו באותו פורמט.
Java
כדי להוסיף אפשרויות משלכם, צריך להגדיר ממשק עם שיטות getter ו-setter לכל אפשרות, כמו בדוגמה הבאה:
public interface MyOptions extends PipelineOptions {
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
Python
כדי להוסיף אפשרויות משלכם, משתמשים בשיטה add_argument() (שמתנהגת בדיוק כמו מודול argparse הרגיל של Python), כמו בדוגמה הבאה:
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--input')
parser.add_argument('--output')
Go
כדי להוסיף אפשרויות משלכם, משתמשים בחבילת Go flag כמו בדוגמה הבאה:
var (
input = flag.String("input", "", "")
output = flag.String("output", "", "")
)
אפשר גם לציין תיאור שמופיע כשמשתמש מעביר את --help כארגומנט בשורת הפקודה, וערך ברירת מחדל.
Java
כדי להגדיר את התיאור ואת ערך ברירת המחדל, משתמשים בהערות, באופן הבא:
public interface MyOptions extends PipelineOptions {
@Description("My custom command line argument.")
@Default.String("DEFAULT")
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
מומלץ לרשום את הממשק באמצעות PipelineOptionsFactory
ואז להעביר את הממשק כשיוצרים את אובייקט PipelineOptions. כשרושמים את הממשק ב-PipelineOptionsFactory, הפקודה --help יכולה למצוא את ממשק האפשרויות המותאמות אישית ולהוסיף אותו לפלט של הפקודה --help. PipelineOptionsFactory מוודא שהאפשרויות המותאמות אישית תואמות לכל שאר האפשרויות הרשומות.
בדוגמת הקוד הבאה אפשר לראות איך רושמים את ממשק האפשרויות המותאם אישית באמצעות PipelineOptionsFactory:
PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(MyOptions.class);
עכשיו אפשר להשתמש ב---myCustomOption=value כארגומנט בשורת הפקודה בצינור.
Python
מגדירים את התיאור ואת ערך ברירת המחדל באופן הבא:
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output', required=True, help='The path prefix for output files.')
Go
מגדירים את התיאור ואת ערך ברירת המחדל באופן הבא:
var (
input = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)