תבנית של קובצי טקסט ב-Cloud Storage ל-Pub/Sub (סטרימינג)

התבנית הזו יוצרת צינור להזרמת נתונים שבודק באופן רציף אם יש קובצי טקסט חדשים שהועלו ל-Cloud Storage, קורא כל קובץ שורה אחר שורה ומפרסם מחרוזות בנושא Pub/Sub. התבנית מפרסמת רשומות בקובץ מופרד בתו שורה חדשה שמכיל רשומות JSON או קובץ CSV בנושא Pub/Sub לעיבוד בזמן אמת. אפשר להשתמש בתבנית הזו כדי להפעיל מחדש נתונים ב-Pub/Sub.

הצינור פועל ללא הגבלת זמן, וצריך להפסיק אותו ידנית באמצעות הפקודה 'ביטול' ולא באמצעות הפקודה 'התרוקנות', כי הוא משתמש בטרנספורמציה 'Watch', שהיא 'SplittableDoFn' שלא תומכת בהתרוקנות.

כרגע, מרווח הזמן בין בדיקות קבוע ומוגדר ל-10 שניות. בתבנית הזו לא מוגדרת חותמת זמן ברשומות הנפרדות, ולכן זמן האירוע שווה לזמן הפרסום במהלך ההפעלה. אם אתם מסתמכים על זמן מדויק של אירוע לצורך עיבוד, לא מומלץ להשתמש בצינור הזה.

הדרישות לגבי צינורות עיבוד נתונים

  • קבצי הקלט חייבים להיות בפורמט JSON או CSV שמופרדים בתו שורה חדשה. רשומות שמשתרעות על כמה שורות בקובצי המקור עלולות לגרום לבעיות בהמשך התהליך, כי כל שורה בקבצים מתפרסמת כהודעה ב-Pub/Sub.
  • נושא Pub/Sub חייב להתקיים לפני ההרצה.
  • צינור העיבוד פועל ללא הגבלה וצריך להפסיק אותו באופן ידני.

פרמטרים של תבניות

פרמטרים נדרשים

  • inputFilePattern: דפוס קובץ הקלט לקריאה. לדוגמה, gs://bucket-name/files/*.json.
  • outputTopic: נושא הקלט של Pub/Sub שאליו ייכתבו הנתונים. השם צריך להיות בפורמט projects/<PROJECT_ID>/topics/<TOPIC_NAME>. לדוגמה, projects/your-project-id/topics/your-topic-name.

הפעלת התבנית

המסוף

  1. עוברים לדף Dataflow Create job from template (יצירת משימה מתבנית).
  2. כניסה לדף Create job from template
  3. בשדה שם המשימה, מזינים שם ייחודי למשימה.
  4. אופציונלי: בשדה Regional endpoint (נקודת קצה אזורית), בוחרים ערך מהתפריט הנפתח. אזור ברירת המחדל הוא us-central1.

    רשימת האזורים שבהם אפשר להריץ משימת Dataflow מופיעה במאמר מיקומי Dataflow.

  5. בתפריט הנפתח Dataflow template (תבנית של העברת נתונים), בוחרים באפשרות the Text Files on Cloud Storage to Pub/Sub (Stream) template.
  6. בשדות הפרמטרים שמופיעים, מזינים את ערכי הפרמטרים.
  7. אופציונלי: כדי לעבור מעיבוד של כל נתון בדיוק פעם אחת אל מצב סטרימינג של כל נתון לפחות פעם אחת, בוחרים באפשרות לפחות פעם אחת.
  8. לוחצים על הפעלת העבודה.

gcloud

במעטפת או בטרמינל, מריצים את התבנית:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location STAGING_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

מחליפים את מה שכתוב בשדות הבאים:

  • JOB_NAME: שם ייחודי של המשימה לפי בחירתכם
  • REGION_NAME: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה: us-central1
  • STAGING_LOCATION: המיקום של קבצים מקומיים להעברה (לדוגמה, gs://your-bucket/staging)
  • TOPIC_NAME: השם של נושא ה-Pub/Sub
  • BUCKET_NAME: שם הקטגוריה של Cloud Storage
  • FILE_PATTERN: תבנית ה-glob של הקובץ לקריאה מקטגוריה של Cloud Storage (לדוגמה, path/*.csv)

API

כדי להריץ את התבנית באמצעות API בארכיטקטורת REST, שולחים בקשת HTTP POST. מידע נוסף על ה-API ועל היקפי ההרשאות שלו זמין במאמר projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

מחליפים את מה שכתוב בשדות הבאים:

  • PROJECT_ID: מזהה הפרויקט שבו רוצים להריץ את משימת Dataflow Google Cloud
  • JOB_NAME: שם ייחודי של המשימה לפי בחירתכם
  • LOCATION: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה: us-central1
  • STAGING_LOCATION: המיקום של קבצים מקומיים להעברה (לדוגמה, gs://your-bucket/staging)
  • TOPIC_NAME: השם של נושא ה-Pub/Sub
  • BUCKET_NAME: שם הקטגוריה של Cloud Storage
  • FILE_PATTERN: תבנית ה-glob של הקובץ לקריאה מקטגוריה של Cloud Storage (לדוגמה, path/*.csv)

המאמרים הבאים