בדף הזה מוסבר איך משתמשים ב-Datastream API כדי:
- יצירת מקורות נתונים
- קבלת מידע על מקורות נתונים ואובייקטים של מקורות נתונים
- עדכון הסטרימינג על ידי הפעלה, השהיה, חידוש ושינוי שלו, וגם על ידי הפעלה והפסקה של מילוי חוסרים באובייקטים של הסטרימינג
- שחזור של שידורים שנכשלו באופן סופי
- הפעלת סטרימינג של אובייקטים גדולים ב-Oracle Streams
- מחיקת מקורות נתונים
יש שתי דרכים להשתמש ב-Datastream API. אפשר לבצע קריאות ל-API בארכיטקטורת REST או להשתמש ב-Google Cloud CLI (CLI).
למידע כללי על שימוש ב-Google Cloud CLI לניהול מקורות נתונים של Datastream, אפשר לעיין במאמר מקורות נתונים של Datastream ב-CLI של gcloud.
יצירת מקור נתונים
בקטע הזה מוסבר איך ליצור זרם שמשמש להעברת נתונים מהמקור ליעד. הדוגמאות הבאות לא מקיפות, אלא מדגישות תכונות ספציפיות של Datastream. כדי לפתור את הבעיה הספציפית שלכם, תוכלו להיעזר בדוגמאות האלה ובמסמכי העזר של Datastream API.
בקטע הזה מפורטים תרחישי השימוש הבאים:
- הזרמה מ-Oracle ל-Cloud Storage
- הזרמה מ-MySQL ל-BigQuery
- הזרמה מ-PostgreSQL ל-BigQuery
- הגדרת קבוצת אובייקטים שייכללו במקור הנתונים
- מילוי חוסרים בכל האובייקטים שנכללים בזרם
- החרגת אובייקטים מהעדכונים
- החרגת אובייקטים מהמילוי החוזר
- הגדרת CMEK להצפנת נתונים במצב מנוחה
- הגדרת מצב כתיבה לשידור
- הזרמה לפרויקט אחר ב-BigQuery
- שידור נתונים לטבלאות Apache Iceberg
דוגמה 1: העברת אובייקטים ספציפיים ל-BigQuery
בדוגמה הזו נסביר איך:
- הזרמה מ-MySQL ל-BigQuery
- הכללת קבוצה של אובייקטים בזרם
- הגדרת מצב כתיבה לזרם כהוספה בלבד
- מילוי חוסרים בכל האובייקטים שנכללים בזרם
הבקשה הבאה היא לשליפת כל הטבלאות מ-schema1 ושתי טבלאות ספציפיות מ-schema2: tableA ו-tableC. האירועים נכתבים למערך נתונים ב-BigQuery.
הבקשה לא כוללת את הפרמטר customerManagedEncryptionKey, ולכן נעשה שימוש במערכת הפנימית לניהול מפתחות כדי להצפין את הנתונים במקום ב-CMEK. Google Cloud
הפרמטר backfillAll שמשויך לביצוע מילוי היסטורי (או תמונת מצב) מוגדר כמילון ריק ({}), כלומר Datastream ממלא את הנתונים ההיסטוריים מכל הטבלאות שנכללות בזרם.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream { "displayName": "MySQL CDC to BigQuery", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "schema1" }, { "database": "schema2", "mysqlTables": [ { "table": "tableA", "table": "tableC" } ] } ] }, } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "dataFreshness": "900s" } }, "backfillAll": {} }
gcloud
מידע נוסף על שימוש ב-gcloud ליצירת זרם זמין במסמכי התיעוד של Google Cloud SDK.
דוגמה 2: החרגה של אובייקטים ספציפיים ממקור נתונים מסוג PostgreSQL
בדוגמה הזו נסביר איך:
- הזרמה מ-PostgreSQL ל-BigQuery
- החרגת אובייקטים מהשידור
- החרגת אובייקטים ממילוי חוסרים
הקוד הבא מציג בקשה ליצירת זרם שמשמש להעברת נתונים ממסד נתונים של PostgreSQL כמקור ל-BigQuery. כשיוצרים מקור נתונים ממסד נתונים של PostgreSQL, צריך לציין בבקשה שני שדות נוספים שספציפיים ל-PostgreSQL:
-
replicationSlot: משבצת שכפול היא תנאי מוקדם להגדרת מסד נתונים של PostgreSQL לשכפול. צריך ליצור משבצת שכפול לכל מקור נתונים. -
publication: פרסום הוא קבוצה של טבלאות שרוצים לשכפל מהן שינויים. שם הפרסום חייב להיות קיים במסד הנתונים לפני שמתחילים להזרים נתונים. הפרסום חייב לכלול לפחות את הטבלאות שצוינו ברשימהincludeObjectsשל הזרם.
הפרמטר backfillAll שמשויך לביצוע מילוי היסטורי (או צילום מצב) מוגדר להחרגת טבלה אחת.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myPostgresStream { "displayName": "PostgreSQL to BigQueryCloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp", "postgresqlSourceConfig": { "replicationSlot": "replicationSlot1", "publication": "publicationA", "includeObjects": { "postgresqlSchemas": { "schema": "schema1" } }, "excludeObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA", "postgresqlColumns": [ { "column": "column5" } ] } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "dataFreshness": "900s", "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } } } }, "backfillAll": { "postgresqlExcludedObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA" } ] } ] } } }
gcloud
מידע נוסף על שימוש ב-gcloud ליצירת זרם זמין במסמכי התיעוד של Google Cloud SDK.
דוגמה 3: ציון מצב כתיבה של הוספה בלבד לזרם
כשמבצעים סטרימינג ל-BigQuery, אפשר להגדיר את מצב הכתיבה: merge או appendOnly. מידע נוסף זמין במאמר בנושא הגדרת מצב כתיבה.
אם לא מציינים את מצב הכתיבה בבקשה ליצירת זרם, נעשה שימוש במצב ברירת המחדל merge.
הבקשה הבאה מראה איך להגדיר את המצב appendOnly כשיוצרים זרם מ-MySQL ל-BigQuery.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream { "displayName": "My append-only stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "myMySqlDb" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "appendOnly": {} } }, "backfillAll": {} }
gcloud
מידע נוסף על שימוש ב-gcloud ליצירת זרם זמין במסמכי התיעוד של Google Cloud SDK.
דוגמה 4: הזרמה לפרויקט אחר ב-BigQuery
אם יצרתם את משאבי Datastream בפרויקט אחד, אבל אתם רוצים להזרים נתונים לפרויקט אחר ב-BigQuery, אתם יכולים לעשות זאת באמצעות בקשה שדומה לבקשה הבאה.
אם מציינים sourceHierarchyDatasets כמערך הנתונים של היעד, צריך למלא את השדה projectId.
אם מציינים singleTargetDataset כמערך נתוני היעד, צריך למלא את השדה datasetId בפורמט projectId:datasetId.
REST
עבור sourceHierarchyDatasets:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=crossProjectBqStream1 { "displayName": "My cross-project stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "myMySqlDb" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" }, "projectId": "myProjectId2" } } }, "backfillAll": {} }
עבור singleTargetDataset:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=crossProjectBqStream2 { "displayName": "My cross-project stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "myMySqlDb" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "singleTargetDataset": { "datasetId": "myProjectId2:myDatasetId" }, } }, "backfillAll": {} }
gcloud
עבור sourceHierarchyDatasets:
datastream streams create crossProjectBqStream1 --location=us-central1 --display-name=my-cross-project-stream --source=source-cp --mysql-source-config=mysql_source_config.json --destination=destination-cp --bigquery-destination-config=source_hierarchy_cross_project_config.json --backfill-none
התוכן של קובץ התצורה source_hierarchy_cross_project_config.json:
{"sourceHierarchyDatasets": {"datasetTemplate": {"location": "us-central1", "datasetIdPrefix": "prefix_"}, "projectId": "myProjectId2"}}
עבור singleTargetDataset:
datastream streams create crossProjectBqStream --location=us-central1 --display-name=my-cross-project-stream --source=source-cp --mysql-source-config=mysql_source_config.json --destination=destination-cp --bigquery-destination-config=single_target_cross_project_config.json --backfill-none
התוכן של קובץ התצורה single_target_cross_project_config.json:
{"singleTargetDataset": {"datasetId": "myProjectId2:myDatastetId"}}
מידע נוסף על שימוש ב-gcloud ליצירת זרם זמין במסמכי התיעוד של Google Cloud SDK.
דוגמה 5: סטרימינג ליעד ב-Cloud Storage
בדוגמה הזו נסביר איך:
- הזרמה מ-Oracle ל-Cloud Storage
- הגדרת קבוצת אובייקטים שייכללו בסטרימינג
- הגדרת CMEK להצפנת נתונים במצב מנוחה
בדוגמה הבאה מוצגת בקשה ליצירת סטרימינג שכותב את האירועים לקטגוריה ב-Cloud Storage.
בדוגמה הזו של בקשה, האירועים נכתבים בפורמט הפלט JSON, ונוצר קובץ חדש כל 100MB או כל 30 שניות (החלפה של ערכי ברירת המחדל של 50MB ו-60 שניות).
בפורמט JSON, אפשר:
כוללים קובץ סכמת סוגים מאוחד בנתיב. כתוצאה מכך, Datastream כותב שני קבצים ל-Cloud Storage: קובץ נתונים בפורמט JSON וקובץ סכימה בפורמט Avro. קובץ הסכימה נקרא באותו שם כמו קובץ הנתונים, עם הסיומת
.schema.מפעילים דחיסת gzip כדי ש-Datastream ידחס את הקבצים שנכתבים ב-Cloud Storage.
השימוש בפרמטר backfillNone מציין בבקשה שרק שינויים שמתרחשים כרגע יועברו בסטרימינג ליעד, ללא מילוי חוסרים.
הבקשה מציינת את הפרמטר של מפתח ההצפנה בניהול הלקוח, שמאפשר לכם לשלוט במפתחות שמשמשים להצפנת נתונים באחסון בתוך Google Cloud פרויקט. הפרמטר מתייחס ל-CMEK ש-Datastream משתמש בו כדי להצפין נתונים שמוזרמים מהמקור ליעד. הוא גם מציין את אוסף המפתחות של ה-CMEK.
מידע נוסף על מחזיקי מפתחות זמין במאמר משאבי Cloud KMS. מידע נוסף על הגנה על הנתונים באמצעות מפתחות הצפנה זמין במאמר בנושא Cloud Key Management Service (KMS).
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleCdcStream { "displayName": "Oracle CDC to Cloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/ connectionProfiles/OracleCp", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "GcsBucketCp", "gcsDestinationConfig": { "path": "/folder1", "jsonFileFormat": { "schemaFileFormat": "AVRO_SCHEMA_FILE" }, "fileRotationMb": 100, "fileRotationInterval": 30 } }, "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillNone": {} }
gcloud
מידע נוסף על שימוש ב-gcloud ליצירת זרם זמין במסמכי התיעוד של Google Cloud SDK.
דוגמה 6: הזרמה לטבלת Apache Iceberg
בדוגמה הזו נסביר איך להגדיר מקור נתונים לשכפול נתונים ממסד נתונים של MySQL לטבלת Apache Iceberg במצב append-only.
לפני שיוצרים את הבקשה, צריך לוודא שביצעתם את השלבים הבאים:
- צריכה להיות לכם קטגוריה של Cloud Storage שבה אתם רוצים לאחסן את הנתונים
- יצירת קישור למשאבים ב-Cloud
- הענקת גישה של הקישור למשאבים ב-Cloud לקטגוריה של Cloud Storage
אחרי כן, תוכלו להשתמש בבקשה הבאה כדי ליצור את עדכוני התוכן:
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlIcebergStream { "displayName": "MySQL to Apache Iceberg stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlIcebergCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "my-mysql-database" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id" , "bigqueryDestinationConfig": { "blmtConfig": { "bucket": "my-gcs-bucket-name", "rootPath": "my/folder", "connectionName": "my-project-id.us-central1.my-bigquery-connection-name", "fileFormat": "PARQUET", "tableFormat": "ICEBERG" }, "singleTargetDataset": { "datasetId": "my-project-id:my-bigquery-dataset-id" }, "appendOnly": {} } }, "backfillAll": {} }
gcloud
datastream streams create mysqlIcebergStream --location=us-central1 --display-name=mysql-to-bl-stream --source=source--mysql-source-config=mysql_source_config.json --destination=destination --bigquery-destination-config=bl_config.json --backfill-none
התוכן של קובץ הגדרות המקור mysql_source_config.json:
{"excludeObjects": {}, "includeObjects": {"mysqlDatabases":[{ "database":"my-mysql-database"}]}}
התוכן של קובץ התצורה bl_config.json:
{ "blmtConfig": { "bucket": "my-gcs-bucket-name", "rootPath": "my/folder","connectionName": "my-project-id.us-central1.my-bigquery-connection-name", "fileFormat": "PARQUET", "tableFormat": "ICEBERG" }, "singleTargetDataset": {"datasetId": "my-project-id:my-bigquery-dataset-id"}, "appendOnly": {} }
Terraform
resource "google_datastream_stream" "stream" { stream_id = "mysqlBlStream" location = "us-central1" display_name = "MySQL to Apache Iceberg stream" source_config { source_connection_profile = "/projects/myProjectId1/locations/us-central1/streams/mysqlBlCp" mysql_source_config { include_objects { mysql_databases { database = "my-mysql-database" } } } } destination_config { destination_connection_profile ="projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id" bigquery_destination_config { single_target_dataset { dataset_id = "my-project-id:my-bigquery-dataset-id" } blmt_config { bucket = "my-gcs-bucket-name" table_format = "ICEBERG" file_format = "PARQUET" connection_name = "my-project-id.us-central1.my-bigquery-connection-name" root_path = "my/folder" } append_only {} } } backfill_none {} }
אימות ההגדרה של מקור נתונים
לפני שיוצרים מקור נתונים, אפשר לאמת את ההגדרה שלו. כך תוכלו לוודא שכל בדיקות האימות יעברו בהצלחה, ושהזרם יפעל בהצלחה כשהוא ייווצר.
במהלך אימות של שידור נבדקים הדברים הבאים:
- האם המקור מוגדר בצורה נכונה כדי לאפשר ל-Datastream להזרים ממנו נתונים.
- האם אפשר לחבר את הזרם גם למקור וגם ליעד.
- ההגדרה מקצה לקצה של מקור הנתונים.
כדי לאמת שידור, מוסיפים את &validate_only=true לכתובת ה-URL לפני גוף הבקשה:
POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"
אחרי שתשלחו את הבקשה, יוצגו לכם בדיקות האימות ש-Datastream מריץ למקור וליעד, וגם אם הבדיקות עברו או נכשלו. אם בדיקת אימות מסוימת נכשלת, מופיע מידע שמסביר למה היא נכשלה ומה צריך לעשות כדי לפתור את הבעיה.
לדוגמה, נניח שיש לכם מפתח הצפנה בניהול הלקוח (CMEK) שאתם רוצים ש-Datastream ישתמש בו כדי להצפין נתונים שמוזרמים מהמקור ליעד. במסגרת האימות של הזרם, Datastream יוודא שהמפתח קיים ושיש ל-Datastream הרשאות להשתמש במפתח. אם אחד מהתנאים האלה לא מתקיים, כשמאמתים את הפיד תופיע הודעת השגיאה הבאה:
CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS
כדי לפתור את הבעיה, צריך לוודא שהמפתח שסיפקתם קיים, ושלחשבון השירות של Datastream יש הרשאה cloudkms.cryptoKeys.get למפתח.
אחרי שתבצעו את התיקונים המתאימים, תגישו שוב את הבקשה כדי לוודא שכל בדיקות האימות יעברו בהצלחה. בדוגמה הקודמת, הבדיקה של CMEK_VALIDATE_PERMISSIONS לא תחזיר יותר הודעת שגיאה, אלא סטטוס של PASSED.
קבלת מידע על שידור
בדוגמה הבאה אפשר לראות בקשה לאחזור מידע על שידור. המידע כולל:
- השם של מקור הנתונים (מזהה ייחודי)
- שם ידידותי למשתמש בשביל הסטרימינג (שם לתצוגה)
- חותמות זמן של מועד היצירה והעדכון האחרון של הזרם
- מידע על פרופילי החיבור של מקור ומקור היעד שמשויכים למקור הנתונים
- מצב השידור
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID
התגובה תופיע כך:
{ "name": "myOracleCdcStream", "displayName": "Oracle CDC to Cloud Storage", "createTime": "2019-12-15T15:01:23.045123456Z", "updateTime": "2019-12-15T15:01:23.045123456Z", "sourceConfig": { "sourceConnectionProfileName": "myOracleDb", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" }, { "schema": "schema3", "oracleTables": [ { "table": "tableA" }, { "table": "tableC" } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "myGcsBucket", "gcsDestinationConfig": { "path": "/folder1", "avroFileFormat": {}, "fileRotationMb": 100, "fileRotationInterval": 60 } }, "state": "RUNNING" "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillAll": {} }
gcloud
מידע נוסף על שימוש ב-gcloud כדי לאחזר מידע על הזרם זמין במסמכי התיעוד של Google Cloud SDK.
שידורים חיים
בדוגמה הבאה מוצגת בקשה לאחזור רשימה של כל הזרמים בפרויקט ובמיקום שצוינו.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams
gcloud
מידע נוסף על שימוש ב-gcloud כדי לאחזר מידע על כל הזרמים זמין במסמכי התיעוד של Google Cloud SDK.
הצגת רשימת האובייקטים של סטרימינג
בדוגמה הבאה מוצגת בקשה לאחזור מידע על כל האובייקטים בסטרימינג.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects
gcloud
מידע נוסף על שימוש ב-gcloud כדי לאחזר מידע על כל האובייקטים בזרם מופיע במסמכי התיעוד של Google Cloud SDK.
רשימת האובייקטים שמוחזרת עשויה להיראות כך:
REST
{ "streamObjects": [ { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object1", "displayName": "employees.salaries", "backfillJob": { "state": "ACTIVE", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T12:12:26.344878Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "salaries" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object2", "displayName": "contractors.hours", "sourceObject": { "mysqlIdentifier": { "database": "contractors", "table": "hours" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object3", "displayName": "employees.departments", "backfillJob": { "state": "COMPLETED", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T11:26:12.869880Z", "lastEndTime": "2021-10-18T11:26:28.405653Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "departments" } } } ] }
gcloud
מידע נוסף על שימוש ב-gcloud כדי להציג רשימה של אובייקטים של זרם מופיע במסמכי התיעוד של Google Cloud SDK.
התחלת שידור
בדוגמה הבאה מוצגת בקשה להפעלת סטרימינג.
אם משתמשים בפרמטר updateMask בבקשה, רק השדות שמציינים צריכים להיכלל בגוף הבקשה. כדי להתחיל להזרים נתונים, משנים את הערך בשדה state מ-CREATED ל-RUNNING.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
מידע נוסף על שימוש ב-gcloud כדי להתחיל את הסטרימינג זמין במסמכי התיעוד של Google Cloud SDK.
השהיית שידור
בדוגמה הבאה מוצגת בקשה להשהיית סטרימינג פעיל.
בדוגמה הזו, השדה שצוין לפרמטר updateMask הוא השדה state. כשמשהים את הסטרימינג, המצב שלו משתנה מRUNNING לPAUSED.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "PAUSED" }
gcloud
מידע נוסף על שימוש ב-gcloud להשהיית הסטרימינג זמין במסמכי התיעוד של Google Cloud SDK.
המשך שידור
בדוגמה הבאה מוצגת בקשה להמשכת שידור שהושהה.
בדוגמה הזו, השדה שצוין לפרמטר updateMask הוא השדה state. כשמפעילים מחדש את הסטרימינג, הסטטוס שלו משתנה מPAUSED לRUNNING.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
מידע נוסף על שימוש ב-gcloud כדי להפעיל מחדש את הסטרימינג זמין במסמכי התיעוד של Google Cloud SDK.
שחזור שידור
אפשר לשחזר שידור חי שנכשל באופן סופי באמצעות השיטה RunStream. לכל סוג של מסד נתונים של מקור יש הגדרה משלו לגבי הפעולות האפשריות לשחזור הזרם. מידע נוסף זמין במאמר בנושא שחזור סטרימינג.
שחזור של סטרימינג ממקור MySQL או Oracle
בדוגמאות הקוד הבאות אפשר לראות בקשות לשחזור של זרם ממקור MySQL או Oracle ממיקומים שונים בקובץ היומן:
REST
שחזור של שידור מהמיקום הנוכחי. זוהי אפשרות ברירת המחדל:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
שחזור שידור מהמיקום הזמין הבא:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "nextAvailableStartPosition": {} } }
שחזור שידור מהמיקום האחרון:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "mostRecentStartPosition": {} } }
שחזור של מקור נתונים ממיקום ספציפי (שכפול מבוסס-binlog של MySQL):
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
מחליפים את מה שכתוב בשדות הבאים:
- NAME_OF_THE_LOG_FILE: השם של קובץ היומן שממנו רוצים לשחזר את הזרם
- POSITION: המיקום בקובץ היומן שממנו רוצים לשחזר את הזרם. אם לא מציינים את הערך, Datastream מאחזר את הנתונים מהסטרימינג מתחילת הקובץ.
לדוגמה:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 4 } } } }
שחזור של מקור נתונים ממיקום ספציפי (שכפול מבוסס-GTID של MySQL):
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "GTID_SET" } } } }
מחליפים את GTID_SET במזהה עסקה גלובלי אחד או יותר או בטווח של מזהי עסקאות גלובליים שמהם רוצים לשחזר את הנתונים.
לדוגמה:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:1-3" } } } }
שחזור של מקור נתונים ממיקום ספציפי (Oracle):
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
לדוגמה:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
"cdcStrategy": {
"specificStartPosition": {
"oracleScnPosition": {
"scn": 234234
}
}
}
}
מידע נוסף על אפשרויות השחזור זמין במאמר שחזור של שידור.
gcloud
אין תמיכה בשחזור של סטרימינג באמצעות gcloud.
שחזור של זרם ממקור PostgreSQL
בדוגמת הקוד הבאה מוצגת בקשה לשחזור של זרם ממקור PostgreSQL. במהלך השחזור, הסטרימינג מתחיל לקרוא ממספר הרצף הראשון ביומן (LSN) במשבצת השכפול שהוגדרה לסטרימינג.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "nextAvailableStartPosition": {} } }
לדוגמה:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run
אם רוצים לשנות את משבצת השכפול, קודם מעדכנים את הסטרים עם השם החדש של משבצת השכפול:
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot { "sourceConfig": { "postgresqlSourceConfig": { "replicationSlot": "NEW_REPLICATION_SLOT_NAME" } } }
gcloud
אין תמיכה בשחזור של סטרימינג באמצעות gcloud.
שחזור של שידור ממקור SQL Server
בדוגמאות הקוד הבאות אפשר לראות בקשות לדוגמה לשחזור של זרם ממקור SQL Server.
REST
שחזור שידור מהמיקום הזמין הראשון:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
לדוגמה:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/mySqlServerStreamId:run
שחזור של זרם ממספר רצף יומן מועדף:
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": lsn } } } }
לדוגמה:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run
{
"cdcStrategy": {
"specificStartPosition": {
"sqlServerLsnPosition": {
"lsn": 0000123C:0000BA78:0004
}
}
}
}
gcloud
אין תמיכה בשחזור של סטרימינג באמצעות gcloud.
איך מתחילים או ממשיכים שידור ממיקום ספציפי
אפשר להתחיל שידור או להמשיך שידור שהושהה ממיקום ספציפי במקורות של MySQL, Oracle ו-SQL Server. האפשרות הזו יכולה להיות שימושית כשרוצים לבצע מילוי חוזר באמצעות כלי חיצוני, או להתחיל CDC ממיקום שמציינים:
- במקור MySQL, צריך לציין מיקום ב-binlog או קבוצת GTID.
- במקור Oracle, צריך לציין מספר שינוי מערכת (SCN) בקובץ יומן Redo.
- במקור SQL Server, צריך לציין מספר רצף ביומן (LSN) בטבלת שינויים.
בדוגמת הקוד הבאה מוצגות בקשות להפעלה או להמשך של סטרימינג שכבר נוצר ממיקום ספציפי.
התחלה או המשכה של שידור ממיקום ספציפי ב-binlog (MySQL):
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
מחליפים את מה שכתוב בשדות הבאים:
- NAME_OF_THE_LOG_FILE: השם של קובץ היומן שממנו רוצים להתחיל את הסטרימינג.
- POSITION: המיקום בקובץ היומן שממנו רוצים להתחיל את הסטרימינג. אם לא מציינים את הערך, Datastream מתחיל לקרוא מההתחלה של הקובץ.
לדוגמה:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 2 } } } }
gcloud
אין תמיכה בהתחלה או בהמשך של סטרימינג ממיקום ספציפי באמצעות gcloud. מידע על שימוש ב-gcloud כדי להתחיל או לחדש סטרימינג זמין במסמכי התיעוד של Cloud SDK.
התחלה או המשך של סטרימינג ממערך GTID ספציפי (MySQL):
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "GTID_SET" } } } }
מחליפים את GTID_SET במזהה עסקה גלובלי אחד או יותר או בטווחים של מזהי עסקאות גלובליים שמהם רוצים להתחיל או להמשיך את הסטרימינג.
לדוגמה:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:3-7" } } } }
gcloud
אין תמיכה בהתחלה או בהמשך של סטרימינג ממיקום ספציפי באמצעות gcloud. מידע על שימוש ב-gcloud כדי להתחיל או לחדש סטרימינג זמין במסמכי התיעוד של Cloud SDK.
להתחיל או לחדש שידור ממספר שינוי מערכת ספציפי בקובץ יומן Redo (Oracle):
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
לדוגמה:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
"cdcStrategy": {
"specificStartPosition": {
"oracleScnPosition": {
"scn": 123123
}
}
}
}
gcloud
אי אפשר להתחיל או לחדש סטרימינג ממיקום ספציפי באמצעות gcloud. מידע על שימוש ב-gcloud כדי להתחיל סטרימינג זמין במסמכי התיעוד של Cloud SDK.
התחלה או חידוש של סטרימינג ממספר רצף יומן ספציפי (SQL Server):
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": "lsn" } } } }
מחליפים את lsn במספר הרצף של היומן בטבלת השינויים שממנו רוצים להתחיל את הסטרימינג. חובה למלא את השדה הזה.
לדוגמה:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
"cdcStrategy": {
"specificStartPosition": {
"sqlServerLsnPosition": {
"lsn": "0000123C0000BA780004"
}
}
}
}
מידע נוסף מופיע במאמרי העזרה של Datastream API.
gcloud
אי אפשר להתחיל או לחדש סטרימינג ממיקום ספציפי באמצעות gcloud. מידע על שימוש ב-gcloud כדי להתחיל סטרימינג זמין במסמכי התיעוד של Cloud SDK.
שינוי של מקור נתונים
הקוד הבא מראה בקשה לעדכון הגדרות סיבוב הקובץ של סטרימינג, כך שהקובץ יסתובב כל 75MB או כל 45 שניות.
בדוגמה הזו, השדות שצוינו לפרמטר updateMask כוללים את השדות fileRotationMb ו-fileRotationInterval, שמיוצגים על ידי הדגלים destinationConfig.gcsDestinationConfig.fileRotationMb ו-destinationConfig.gcsDestinationConfig.fileRotationInterval, בהתאמה.
REST
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval { "destinationConfig": { "gcsDestinationConfig": { "fileRotationMb": 75, "fileRotationInterval": 45 } } }
הקוד הבא מציג בקשה לכלול קובץ סכימה של סוגים מאוחדים בנתיב של קבצים ש-Datastream כותב ל-Cloud Storage. כתוצאה מכך, Datastream כותב שני קבצים: קובץ נתוני JSON וקובץ סכימת Avro.
בדוגמה הזו, השדה שצוין הוא השדה jsonFileFormat, שמיוצג על ידי הדגל destinationConfig.gcsDestinationConfig.jsonFileFormat.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. jsonFileFormat { "destinationConfig": { "gcsDestinationConfig": { "jsonFileFormat" { "schemaFileFormat": "AVRO_SCHEMA_FILE" } } } }
הקוד הבא מציג בקשה ל-Datastream לשכפל נתונים קיימים, בנוסף לשינויים שמתבצעים בנתונים, ממסד הנתונים של המקור ליעד.
בקטע oracleExcludedObjects של הקוד מוצגות הטבלאות והסכימות שהגיבוי שלהן ליעד מוגבל.
בדוגמה הזו, כל הטבלאות והסכימות ימולאו מחדש, חוץ מטבלה A בסכימה 3.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll { "backfillAll": { "oracleExcludedObjects": { "oracleSchemas": [ { "schema": "schema3", "oracleTables": [ { "table": "tableA" } ] } ] } } }
gcloud
מידע נוסף על שימוש ב-gcloud כדי לשנות את הסטרים זמין במאמרי העזרה של Google Cloud SDK.
הפעלת מילוי חוסר (backfill) לאובייקט של סטרימינג
מקור נתונים ב-Datastream יכול למלא נתונים היסטוריים, וגם להזרים שינויים שוטפים ליעד. שינויים שמתבצעים באופן שוטף תמיד יועברו בסטרימינג ממקור ליעד. עם זאת, אתם יכולים לציין אם אתם רוצים שהנתונים ההיסטוריים יועברו בסטרימינג.
אם רוצים להזרים נתונים היסטוריים מהמקור ליעד, צריך להשתמש בפרמטר backfillAll.
בנוסף, אפשר להשתמש ב-Datastream כדי להזרים נתונים היסטוריים רק מטבלאות מסוימות במסד הנתונים. כדי לעשות את זה, משתמשים בפרמטר backfillAll ומחריגים את הטבלאות שלא רוצים לקבל לגביהן נתונים היסטוריים.
אם רוצים להזרים ליעד רק שינויים שמתבצעים באופן שוטף, צריך להשתמש בפרמטר backfillNone. אם רוצים ש-Datastream יעביר תמונת מצב של כל הנתונים הקיימים ממקור היעד, צריך להפעיל מילוי חוסרים באופן ידני לאובייקטים שמכילים את הנתונים האלה.
סיבה נוספת להפעלת מילוי חוסרים באובייקט היא אם הנתונים לא מסונכרנים בין המקור ליעד. לדוגמה, משתמש יכול למחוק נתונים ביעד בטעות, והנתונים יאבדו. במקרה הזה, הפעלת מילוי חוסרים לאובייקט משמשת כ "מנגנון איפוס" כי כל הנתונים מועברים ליעד בבת אחת. כתוצאה מכך, הנתונים מסונכרנים בין המקור ליעד.
כדי להפעיל מילוי חוסרים באובייקט של זרם, צריך קודם לאחזר מידע על האובייקט.
לכל אובייקט יש OBJECT_ID שמזהה אותו באופן ייחודי. משתמשים ב-OBJECT_ID כדי להפעיל מילוי חוסרים בשידור.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob
gcloud
מידע נוסף על שימוש ב-gcloud כדי להפעיל מילוי חוסרים לאובייקט של הזרם זמין במסמכי Google Cloud SDK.
הפסקת מילוי חוסר (backfill) של אובייקט בסטרימינג
אחרי הפעלת מילוי חוסר (backfill) לאובייקט של סטרימינג, אפשר להפסיק את מילוי החוסר לאובייקט. לדוגמה, אם משתמש משנה סכימת מסד נתונים, יכול להיות שהסכימה או הנתונים ייפגמו. אתם לא רוצים שהסכימה או הנתונים האלה יועברו בסטרימינג ליעד, ולכן אתם מפסיקים את המילוי החוזר של האובייקט.
אפשר גם להפסיק את המילוי החוזר של אובייקט לצורך איזון עומסים. מקור נתונים יכול להריץ כמה מילויים חוזרים במקביל. יכול להיות שזה יוסיף עומס על המקור. אם העומס משמעותי, צריך להפסיק את מילוי החוסרים של כל אובייקט, ואז להתחיל את מילוי החוסרים של האובייקטים, אחד אחרי השני.
כדי להפסיק את מילוי החוסרים באובייקט של מקור נתונים, צריך לשלוח בקשה לאחזור מידע על כל האובייקטים של מקור הנתונים. לכל אובייקט שמוחזר יש OBJECT_ID שמזהה אותו באופן ייחודי. משתמשים בלחצן OBJECT_ID כדי להפסיק את מילוי החסר בשידור.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob
gcloud
מידע נוסף על שימוש ב-gcloud כדי להפסיק את מילוי החוסרים באובייקט של הזרם זמין במסמכי Google Cloud SDK.
שינוי המספר המקסימלי של משימות CDC בו-זמניות
הקוד הבא מראה איך מגדירים את המספר המקסימלי של משימות מקבילות של לכידת נתוני שינוי (CDC) ל-7 בסטרימינג של MySQL.
בדוגמה הזו, השדה שצוין לפרמטר updateMask הוא השדה maxConcurrentCdcTasks. אם מגדירים את הערך ל-7, משנים את מספר משימות ה-CDC המקסימליות שפועלות בו-זמנית מהערך הקודם ל-7. אפשר להשתמש בערכים מ-0 עד 50 (כולל). אם לא מגדירים את הערך, או אם מגדירים אותו כ-0, המערכת מגדירה את ברירת המחדל של 5 משימות לזרם.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentCdcTasks": "7" } } }
gcloud
מידע נוסף על השימוש ב-gcloud זמין במסמכי התיעוד של Google Cloud SDK.
שינוי המספר המקסימלי של משימות מילוי שטחי פרסום בו-זמניות
הקוד הבא מראה איך להגדיר את המספר המקסימלי של משימות מקבילות של מילוי חוסרים לזרם MySQL ל-25.
בדוגמה הזו, השדה שצוין לפרמטר updateMask הוא השדה maxConcurrentBackfillTasks. אם מגדירים את הערך ל-25, משנים את מספר המשימות המקסימלי של מילוי שטחי פרסום פנויים במקביל מהערך הקודם ל-25. אפשר להשתמש בערכים מ-0 עד 50 (כולל). אם לא מגדירים את הערך, או אם מגדירים אותו כ-0, המערכת מגדירה את ברירת המחדל של 16 משימות לזרם.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/ streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentBackfillTasks": "25" } } }
gcloud
מידע נוסף על השימוש ב-gcloud זמין במסמכי התיעוד של Google Cloud SDK.
הפעלת סטרימינג של אובייקטים גדולים למקורות Oracle
אפשר להפעיל סטרימינג של אובייקטים גדולים, כמו אובייקטים בינאריים גדולים (BLOB), אובייקטים גדולים של תווים (CLOB) ואובייקטים גדולים של תווים לאומיים (NCLOB) לסטרימינג עם מקורות Oracle. הדגל streamLargeObjects מאפשר לכם לכלול אובייקטים גדולים בזרמים חדשים וקיימים. הדגל מוגדר ברמת הזרם, ולא צריך לציין את העמודות של סוגי נתונים של אובייקטים גדולים.
בדוגמה הבאה מוצג אופן היצירה של זרם שמאפשר להזרים אובייקטים גדולים.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleLobStream { "displayName": "Oracle LOB stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp" , "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1", "oracleTables": [ { "table": "tableA", "oracleColumns": [ { "column": "column1,column2" } ] } ] } ] }, "excludeObjects": {}, "streamLargeObjects": {} } } }
gcloud
מידע נוסף על שימוש ב-gcloud לעדכון של זרם זמין במאמרי העזרה של Google Cloud SDK.
מחיקת מקור נתונים
הקוד הבא מראה בקשה למחיקת סטרימינג.
REST
DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID
gcloud
מידע נוסף על שימוש ב-gcloud כדי למחוק את הזרם זמין במסמכי התיעוד של Google Cloud SDK.
המאמרים הבאים
- איך משתמשים ב-Datastream API כדי לנהל פרופילים של חיבורים
- איך משתמשים ב-Datastream API כדי לנהל הגדרות של קישוריות פרטית
- מידע נוסף על השימוש ב-Datastream API זמין במאמרי העזרה.