תבנית Datastream to MySQL or PostgreSQL (Stream)‎

התבנית Datastream to SQL היא צינור עיבוד נתונים של סטרימינג שקורא נתונים מ-Datastream ומשכפל אותם לכל מסד נתונים של MySQL או PostgreSQL. התבנית קוראת נתונים מ-Cloud Storage באמצעות התראות Pub/Sub ומשכפלת את הנתונים האלה לטבלאות רפליקה של SQL. מציינים את הפרמטר gcsPubSubSubscription כדי לקרוא נתונים מהתראות Pub/Sub, או את הפרמטר inputFilePattern כדי לקרוא נתונים ישירות מקבצים ב-Cloud Storage.

התבנית לא תומכת בשפת הגדרת נתונים (DDL) ומניחה שכל הטבלאות כבר קיימות במסד הנתונים. השכפול משתמש בטרנספורמציות עם שמירת מצב ב-Dataflow כדי לסנן נתונים לא עדכניים ולהבטיח עקביות בנתונים לא מסודרים. לדוגמה, אם גרסה עדכנית יותר של שורה כבר עברה, המערכת מתעלמת מגרסה של אותה שורה שמגיעה באיחור. שפת הטיפול בנתונים (DML) שמופעלת היא ניסיון מיטבי לשכפל בצורה מושלמת את נתוני המקור לנתוני היעד. הפקודות של DML שמופעלות פועלות לפי הכללים הבאים:

  • אם קיים מפתח ראשי, פעולות של הוספה ועדכון משתמשות בתחביר upsert (כלומר, INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE).
  • אם קיימים מפתחות ראשיים, המחיקות משוכפלות כ-DML של מחיקה.
  • אם לא קיים מפתח ראשי, גם פעולות הוספה וגם פעולות עדכון מוכנסות לטבלה.
  • אם לא קיימים מפתחות ראשיים, המערכת מתעלמת ממחיקות.

אם אתם משתמשים בכלי השירות של Oracle ל-Postgres, מוסיפים ROWID ב-SQL כמפתח ראשי אם לא קיים כזה.

הדרישות לגבי צינורות עיבוד נתונים

  • מקור נתונים ב-Datastream שמוכן לשכפול נתונים או שכבר משכפל נתונים.
  • התראות Pub/Sub ל-Cloud Storage מופעלות עבור נתוני Datastream.
  • מסד נתונים של PostgreSQL הועבר עם הסכימה הנדרשת.
  • מוגדרת גישה לרשת בין העובדים של Dataflow לבין PostgreSQL.

פרמטרים של תבניות

פרמטרים נדרשים

  • inputFilePattern: מיקום הקובץ של קובצי Datastream ב-Cloud Storage ליצירת רפליקה. מיקום הקובץ הזה הוא בדרך כלל נתיב השורש של הסטרימינג.
  • databaseHost: מארח ה-SQL שאליו מתחברים.
  • databaseUser: משתמש ה-SQL עם כל ההרשאות הנדרשות לכתיבה לכל הטבלאות בשכפול.
  • databasePassword: הסיסמה של משתמש ה-SQL.

פרמטרים אופציונליים

  • gcsPubSubSubscription: מינוי Pub/Sub עם התראות על קבצים ב-Datastream. לדוגמה, projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>.
  • inputFileFormat: הפורמט של קובץ הפלט שנוצר על ידי Datastream. לדוגמה, avro או json. ברירת המחדל היא avro.
  • streamName: השם או התבנית של הזרם שצריך לבצע בו סקר כדי לקבל מידע על הסכימה. ערך ברירת המחדל הוא {_metadata_stream}.
  • rfcStartDateTime: תאריך ושעה להתחלה שמשמשים לאחזור מ-Cloud Storage (https://tools.ietf.org/html/rfc3339). ברירת המחדל היא: 1970-01-01T00:00:00.00Z.
  • dataStreamRootUrl: כתובת הבסיס של Datastream API. ברירת המחדל היא: https://datastream.googleapis.com/.
  • databaseType: סוג מסד הנתונים שאליו רוצים לכתוב (לדוגמה, Postgres). ברירת המחדל היא: postgres.
  • databasePort: יציאת מסד הנתונים של SQL שאליה מתחברים. ערך ברירת המחדל הוא 5432.
  • databaseName: השם של מסד הנתונים מסוג SQL שאליו רוצים להתחבר. ערך ברירת המחדל הוא postgres.
  • defaultCasing: מתג לשינוי התנהגות ברירת המחדל של האותיות בטבלה. לדוגמה,(כלומר, LOWERCASE = mytable -> mytable,‏ UPPERCASE = mytable -> MYTABLECAMEL = my_table -> myTable,‏ SNAKE = myTable -> my_table. ברירת המחדל היא: LOWERCASE.
  • columnCasing: מתג להגדרת רישיות של שם עמודת היעד. אותיות קטנות (ברירת מחדל): my_column -> my_column. אותיות רישיות: my_column -> MY_COLUMN. ‫CAMEL: my_column -> myColumn. ‫SNAKE: myColumn -> my_column.
  • schemaMap: מיפוי של מפתח/ערכים שמשמשים להכתבת שינויים בסכימה ובשם הטבלה. דוגמאות: סכימה לסכימה (SCHEMA1:SCHEMA2), טבלה לטבלה (SCHEMA1.table1:SCHEMA2.TABLE1) או כמה מיפויים באמצעות התו '|' כמפריד (לדוגמה, schema1.source:schema2.target|schema3.source:schema4.target). ברירת המחדל היא ריק.
  • customConnectionString: מחרוזת חיבור אופציונלית שתשמש במקום מחרוזת מסד הנתונים שמוגדרת כברירת מחדל.
  • numThreads: קובע את המקביליות של המפתח בשלב Format to DML, באופן ספציפי, הערך מועבר אל Reshuffle.withNumBuckets. ברירת המחדל היא 100.
  • databaseLoginTimeout: הזמן הקצוב לתפוגה בשניות לניסיונות התחברות למסד נתונים. כך אפשר למנוע מצב שבו החיבור נתקע כשכמה עובדים מנסים להתחבר בו-זמנית.
  • orderByIncludesIsDeleted: הגדרות המיון של הנתונים צריכות לכלול תעדוף של נתונים שלא נמחקו. ברירת המחדל היא: false.
  • datastreamSourceType: עקיפה של זיהוי סוג המקור לנתוני CDC של Datastream. אם מציינים ערך, המערכת תשתמש בו במקום להסיק את סוג המקור מהשדה read_method. ערכים תקינים כוללים 'mysql',‏ 'postgresql',‏ 'oracle' וכו'. הפרמטר הזה שימושי כשהשדה read_method מכיל את הערך 'cdc' ולא ניתן לקבוע את סוג המקור בפועל באופן אוטומטי.
  • deadLetterQueueDirectory: הנתיב שבו Dataflow משתמש כדי לכתוב את הפלט של תור ההודעות המתות. הנתיב הזה לא יכול להיות זהה לנתיב של פלט הקובץ של Datastream. ברירת המחדל היא empty.
  • dlqRetryMinutes: מספר הדקות בין ניסיונות חוזרים של DLQ. ברירת המחדל היא 10.
  • dlqMaxRetries: מספר הניסיונות המקסימלי לביצוע חוזר של רשומה שנכשלה מ-DLQ לפני סימונה ככשל קבוע. ברירת המחדל היא 5.
  • schemaCacheRefreshMinutes: מספר הדקות שבהן סכימות של טבלאות יישמרו במטמון. ברירת המחדל היא 1440 (24 שעות).
  • runMode: סוג מצב ההרצה, רגיל או עם retryDLQ. ברירת המחדל היא: רגיל.

הפעלת התבנית

המסוף

  1. עוברים לדף Dataflow Create job from template (יצירת משימה מתבנית).
  2. כניסה לדף Create job from template
  3. בשדה שם המשימה, מזינים שם ייחודי למשימה.
  4. אופציונלי: בשדה Regional endpoint (נקודת קצה אזורית), בוחרים ערך מהתפריט הנפתח. אזור ברירת המחדל הוא us-central1.

    רשימת האזורים שבהם אפשר להריץ משימת Dataflow מופיעה במאמר מיקומי Dataflow.

  5. בתפריט הנפתח Dataflow template (תבנית של העברת נתונים), בוחרים באפשרות the Cloud Datastream to SQL template.
  6. בשדות הפרמטרים שמופיעים, מזינים את ערכי הפרמטרים.
  7. לוחצים על הפעלת העבודה.

gcloud

במעטפת או בטרמינל, מריצים את התבנית:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

מחליפים את מה שכתוב בשדות הבאים:

  • PROJECT_ID: מזהה הפרויקט שבו רוצים להריץ את משימת Dataflow Google Cloud
  • JOB_NAME: שם ייחודי של המשימה לפי בחירתכם
  • REGION_NAME: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה: us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: הנתיב ב-Cloud Storage לנתוני Datastream. לדוגמה: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: המינוי ל-Pub/Sub שממנו קוראים את הקבצים שהשתנו. לדוגמה: projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: כתובת ה-IP של מארח ה-SQL.
  • DATABASE_USER: משתמש ה-SQL.
  • DATABASE_PASSWORD: סיסמת ה-SQL שלכם.

API

כדי להריץ את התבנית באמצעות API בארכיטקטורת REST, שולחים בקשת HTTP POST. מידע נוסף על ה-API ועל היקפי ההרשאות שלו זמין במאמר projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

מחליפים את מה שכתוב בשדות הבאים:

  • PROJECT_ID: מזהה הפרויקט שבו רוצים להריץ את משימת Dataflow Google Cloud
  • JOB_NAME: שם ייחודי של המשימה לפי בחירתכם
  • LOCATION: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה: us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: הנתיב ב-Cloud Storage לנתוני Datastream. לדוגמה: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: המינוי ל-Pub/Sub שממנו קוראים את הקבצים שהשתנו. לדוגמה: projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: כתובת ה-IP של מארח ה-SQL.
  • DATABASE_USER: משתמש ה-SQL.
  • DATABASE_PASSWORD: סיסמת ה-SQL שלכם.

המאמרים הבאים