פיתוח מחבר מותאם אישית לייבוא מטא-נתונים

במסמך הזה מובאת תבנית הפניה ליצירת מחבר בהתאמה אישית לחילוץ מטא-נתונים ממקורות צד שלישי, כמו MySQL,‏ SQL Server ו-Oracle. אפשר להשתמש במחבר הזה כדי לייבא מטא-נתונים אל Dataplex Universal Catalog דרך צינור קישוריות מנוהל. דוגמה למחבר Python ל-Oracle Database Express Edition ‏ (XE) כלולה כנקודת התחלה. אפשר גם לפתח מחברים באמצעות Java,‏ Scala או R.

איך מחברים עובדים

מחבר חולץ מטא-נתונים ממקור נתונים של צד שלישי, משנה את המטא-נתונים לפורמט של Dataplex Universal Catalog ImportItem ומייצר קבצים של ייבוא מטא-נתונים שאפשר לייבא באמצעות Dataplex Universal Catalog.

המחבר הוא חלק מצינור קישוריות מנוהל. צינור קישוריות מנוהל הוא תהליך עבודה מתואם שמשמש לייבוא מטא-נתונים של Dataplex Universal Catalog. צינור הקישוריות המנוהל מפעיל את המחבר ומבצע משימות אחרות בתהליך הייבוא, כמו הפעלת משימת ייבוא של מטא-נתונים ותיעוד יומנים.

צינור הקישוריות המנוהל מפעיל את המחבר באמצעות משימה באצווה של Google Cloud Serverless for Apache Spark. ‫Serverless ל-Apache Spark מספק סביבת הפעלה של Spark ללא שרת. אפשר ליצור מחבר שלא משתמש ב-Spark, אבל מומלץ להשתמש ב-Spark כי הוא יכול לשפר את הביצועים של המחבר.

דרישות לגבי מחברים

המחבר צריך לעמוד בדרישות הבאות:

  • המחבר חייב להיות תמונה ב-Artifact Registry שאפשר להריץ ב-Serverless for Apache Spark.
  • המחבר צריך ליצור קובצי מטא-נתונים בפורמט שאפשר לייבא באמצעות משימת ייבוא מטא-נתונים של Dataplex Universal Catalog (השיטה metadataJobs.createAPI). דרישות מפורטות מופיעות במאמר בנושא ייבוא קובץ מטא נתונים.
  • המחבר צריך לקבל את הארגומנטים הבאים של שורת הפקודה כדי לקבל מידע מהצינור:

    ארגומנט בשורת הפקודה הערך שהצינור מספק
    target_project_id PROJECT_ID
    target_location_id REGION
    target_entry_group_id ENTRY_GROUP_ID
    output_bucket CLOUD_STORAGE_BUCKET_ID
    output_folder FOLDER_ID

    המחבר משתמש בארגומנטים האלה כדי ליצור מטא-נתונים בקבוצת רשומות יעד projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID, וכדי לכתוב לקטגוריה של Cloud Storage gs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID. בכל הפעלה של צינור הנתונים נוצרת תיקייה חדשה FOLDER_ID בדלי CLOUD_STORAGE_BUCKET_ID. המחבר אמור לכתוב קבצים של ייבוא מטא-נתונים לתיקייה הזו.

תבניות הצינור תומכות במחברי PySpark. התבניות מניחות שהדרייבר (mainPythonFileUri) הוא קובץ מקומי בתמונת המחבר בשם main.py. אפשר לשנות את תבניות הצינור לתרחישים אחרים, כמו מחבר Spark, מזהה משאבים אחיד (URI) אחר של מנהל התקן או אפשרויות אחרות.

כך משתמשים ב-PySpark כדי ליצור פריט ייבוא בקובץ ייבוא המטא-נתונים.

"""PySpark schemas for the data."""
entry_source_schema = StructType([
      StructField("display_name", StringType()),
      StructField("source", StringType())])

aspect_schema = MapType(StringType(),
                        StructType([
                            StructField("aspect_type", StringType()),
                            StructField("data", StructType([
                            ]))
                          ])
                        )

entry_schema = StructType([
  StructField("name", StringType()),
  StructField("entry_type", StringType()),
  StructField("fully_qualified_name", StringType()),
  StructField("parent_entry", StringType()),
  StructField("entry_source", entry_source_schema),
  StructField("aspects", aspect_schema)
])

import_item_schema = StructType([
  StructField("entry", entry_schema),
  StructField("aspect_keys", ArrayType(StringType())),
  StructField("update_mask", ArrayType(StringType()))
])

לפני שמתחילים

במדריך הזה אנחנו יוצאים מנקודת הנחה שאתם מכירים את Python ואת PySpark.

כדאי לבדוק את המידע הבא:

צריך לבצע את הפעולות הבאות. יוצרים את כל המשאבים באותו Google Cloud מיקום.

  1. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  2. Verify that billing is enabled for your Google Cloud project.

  3. Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable dataplex.googleapis.com dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com
  4. Install the Google Cloud CLI.

  5. אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

  6. כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:

    gcloud init
  7. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: Your project ID.
    • USER_IDENTIFIER: The identifier for your user account. For example, myemail@example.com.
    • ROLE: The IAM role that you grant to your user account.
  8. Set up authentication:

    1. Ensure that you have the Create Service Accounts IAM role (roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin). Learn how to grant roles.
    2. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    3. Grant the roles/owner IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  9. יוצרים קטגוריה של Cloud Storage לאחסון קובצי ייבוא של מטא-נתונים.

  10. יוצרים את משאבי המטא-נתונים הבאים באותו פרויקט.

    דוגמאות לערכים מופיעות בקטע דוגמאות למשאבי מטא-נתונים למקור Oracle במסמך הזה.

    1. יצירת קבוצה של רשומות
    2. יוצרים סוגים מותאמים אישית של היבטים לרשומות שרוצים לייבא. משתמשים במוסכמה למתן שמות SOURCE-ENTITY_TO_IMPORT.

      לדוגמה, במסד נתונים של Oracle, יוצרים סוג היבט בשם oracle-database.

      אפשר גם ליצור סוגי מאפיינים נוספים כדי לאחסן מידע אחר.

    3. יוצרים סוגי רשומות מותאמים אישית למשאבים שרוצים לייבא, ומקצים להם את סוגי ההיבטים הרלוונטיים. משתמשים במוסכמה למתן שמות SOURCE-ENTITY_TO_IMPORT.

      לדוגמה, במסד נתונים של Oracle, יוצרים סוג רשומה בשם oracle-database. מקשרים אותו לסוג ההיבט שנקרא oracle-database.

  11. מוודאים שאפשר לגשת למקור של הצד השלישי מ Google Cloud הפרויקט. מידע נוסף זמין במאמר בנושא הגדרת רשת ל-Serverless for Apache Spark.
  12. יצירת מחבר Python בסיסי

    מחבר Python בסיסי לדוגמה יוצר רשומות ברמה העליונה למקור נתונים של Oracle באמצעות מחלקות של ספריית הלקוח Dataplex Universal Catalog. לאחר מכן מציינים את הערכים בשדות של הרשומה.

    המחבר יוצר קובץ ייבוא של מטא-נתונים עם הרשומות הבאות:

    • רשומה מסוג instance, עם סוג רשומה projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance. הרשומה הזו מייצגת מערכת Oracle Database XE.
    • רשומה של database שמייצגת מסד נתונים בתוך מערכת Oracle Database XE.

    כדי ליצור מחבר Python בסיסי:

    1. משכפלים את מאגר cloud-dataplex.

    2. מגדירים סביבה מקומית. מומלץ להשתמש בסביבה וירטואלית.

      mkdir venv
      python -m venv venv/
      source venv/bin/activate
      

      משתמשים בגרסאות active או maintenance של Python. יש תמיכה ב-Python גרסה 3.7 ואילך.

    3. יוצרים פרויקט Python.

    4. דרישות להתקנה:

      pip install -r requirements.txt
      

      הדרישות הבאות מותקנות:

      google-cloud-dataplex==2.2.2
      google-cloud-storage
      google-cloud-secret-manager
      
    5. מוסיפים קובץ של צינור עיבוד נתונים main.py בתיקיית הבסיס של הפרויקט.

      from src import bootstrap
      
      
      if __name__ == '__main__':
          bootstrap.run()
      

      כשפורסים את הקוד ב-Serverless for Apache Spark, הקובץ main.pyמשמש כנקודת הכניסה להרצה. מומלץ לצמצם את כמות המידע שמאוחסן בקובץ main.py. אפשר להשתמש בקובץ הזה כדי להפעיל פונקציות ומחלקות שמוגדרות במחבר, כמו המחלקה src/bootstap.py.

    6. יוצרים תיקייה src כדי לאחסן את רוב הלוגיקה של המחבר.

    7. מעדכנים את הקובץ src/cmd_reader.py באמצעות מחלקה של Python כדי לקבל ארגומנטים של שורת פקודה. אפשר להשתמש במודול argeparse כדי לעשות את זה.

      """Command line reader."""
      import argparse
      
      
      def read_args():
          """Reads arguments from the command line."""
          parser = argparse.ArgumentParser()
      
          # Dataplex arguments
          parser.add_argument("--target_project_id", type=str, required=True,
              help="The name of the target Google Cloud project to import the metadata into.")
          parser.add_argument("--target_location_id", type=str, required=True,
              help="The target Google Cloud location where the metadata will be imported into.")
          parser.add_argument("--target_entry_group_id", type=str, required=True,
              help="The ID of the entry group to import metadata into. "
                   "The metadata will be imported into entry group with the following"
                   "full resource name: projects/${target_project_id}/"
                   "locations/${target_location_id}/entryGroups/${target_entry_group_id}.")
      
          # Oracle arguments
          parser.add_argument("--host_port", type=str, required=True,
              help="Oracle host and port number separated by the colon (:).")
          parser.add_argument("--user", type=str, required=True, help="Oracle User.")
          parser.add_argument("--password-secret", type=str, required=True,
              help="Secret resource name in the Secret Manager for the Oracle password.")
          parser.add_argument("--database", type=str, required=True,
              help="Source Oracle database.")
      
          # Google Cloud Storage arguments
          # It is assumed that the bucket is in the same region as the entry group
          parser.add_argument("--output_bucket", type=str, required=True,
              help="The Cloud Storage bucket to write the generated metadata import file.")
          parser.add_argument("--output_folder", type=str, required=True,
              help="A folder in the Cloud Storage bucket, to write the generated metadata import files.")
      
          return vars(parser.parse_known_args()[0])
      

      בסביבות ייצור, מומלץ לאחסן את הסיסמה בSecret Manager.

    8. מעדכנים את הקובץ src/constants.py עם קוד ליצירת קבועים.

      """Constants that are used in the different files."""
      import enum
      
      SOURCE_TYPE = "oracle"
      
      # Symbols for replacement
      FORBIDDEN = "#"
      ALLOWED = "!"
      
      
      class EntryType(enum.Enum):
          """Types of Oracle entries."""
          INSTANCE: str = "projects/{project}/locations/{location}/entryTypes/oracle-instance"
          DATABASE: str = "projects/{project}/locations/{location}/entryTypes/oracle-database"
          DB_SCHEMA: str = "projects/{project}/locations/{location}/entryTypes/oracle-schema"
          TABLE: str = "projects/{project}/locations/{location}/entryTypes/oracle-table"
          VIEW: str = "projects/{project}/locations/{location}/entryTypes/oracle-view"
      
    9. מעדכנים את הקובץ src/name_builder.py בשיטות ליצירת משאבי המטא-נתונים שרוצים שמחבר יצור עבור משאבי Oracle. משתמשים במוסכמות שמתוארות בקטע משאבי מטא-נתונים לדוגמה למקור Oracle במסמך הזה.

      """Builds Dataplex hierarchy identifiers."""
      from typing import Dict
      from src.constants import EntryType, SOURCE_TYPE
      
      
      # Oracle cluster users start with C## prefix, but Dataplex doesn't accept #.
      # In that case in names it is changed to C!!, and escaped with backticks in FQNs
      FORBIDDEN_SYMBOL = "#"
      ALLOWED_SYMBOL = "!"
      
      
      def create_fqn(config: Dict[str, str], entry_type: EntryType,
                     schema_name: str = "", table_name: str = ""):
          """Creates a fully qualified name or Dataplex v1 hierarchy name."""
          if FORBIDDEN_SYMBOL in schema_name:
              schema_name = f"`{schema_name}`"
      
          if entry_type == EntryType.INSTANCE:
              # Requires backticks to escape column
              return f"{SOURCE_TYPE}:`{config['host_port']}`"
          if entry_type == EntryType.DATABASE:
              instance = create_fqn(config, EntryType.INSTANCE)
              return f"{instance}.{config['database']}"
          if entry_type == EntryType.DB_SCHEMA:
              database = create_fqn(config, EntryType.DATABASE)
              return f"{database}.{schema_name}"
          if entry_type in [EntryType.TABLE, EntryType.VIEW]:
              database = create_fqn(config, EntryType.DATABASE)
              return f"{database}.{schema_name}.{table_name}"
          return ""
      
      
      def create_name(config: Dict[str, str], entry_type: EntryType,
                      schema_name: str = "", table_name: str = ""):
          """Creates a Dataplex v2 hierarchy name."""
          if FORBIDDEN_SYMBOL in schema_name:
              schema_name = schema_name.replace(FORBIDDEN_SYMBOL, ALLOWED_SYMBOL)
          if entry_type == EntryType.INSTANCE:
              name_prefix = (
                  f"projects/{config['target_project_id']}/"
                  f"locations/{config['target_location_id']}/"
                  f"entryGroups/{config['target_entry_group_id']}/"
                  f"entries/"
              )
              return name_prefix + config["host_port"].replace(":", "@")
          if entry_type == EntryType.DATABASE:
              instance = create_name(config, EntryType.INSTANCE)
              return f"{instance}/databases/{config['database']}"
          if entry_type == EntryType.DB_SCHEMA:
              database = create_name(config, EntryType.DATABASE)
              return f"{database}/database_schemas/{schema_name}"
          if entry_type == EntryType.TABLE:
              db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
              return f"{db_schema}/tables/{table_name}"
          if entry_type == EntryType.VIEW:
              db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
              return f"{db_schema}/views/{table_name}"
          return ""
      
      
      def create_parent_name(config: Dict[str, str], entry_type: EntryType,
                             parent_name: str = ""):
          """Generates a Dataplex v2 name of the parent."""
          if entry_type == EntryType.DATABASE:
              return create_name(config, EntryType.INSTANCE)
          if entry_type == EntryType.DB_SCHEMA:
              return create_name(config, EntryType.DATABASE)
          if entry_type == EntryType.TABLE:
              return create_name(config, EntryType.DB_SCHEMA, parent_name)
          return ""
      
      
      def create_entry_aspect_name(config: Dict[str, str], entry_type: EntryType):
          """Generates an entry aspect name."""
          last_segment = entry_type.value.split("/")[-1]
          return f"{config['target_project_id']}.{config['target_location_id']}.{last_segment}"
      

      קובץ name_builder.py משמש גם לקוד הליבה של Python וגם לקוד הליבה של PySpark, ולכן מומלץ לכתוב את השיטות כפונקציות טהורות, ולא כחברים במחלקה.

    10. מעדכנים את הקובץ src/top_entry_builder.py עם קוד כדי למלא את הרשומות ברמה העליונה בנתונים.

      """Non-Spark approach for building the entries."""
      import dataclasses
      import json
      from typing import List, Dict
      
      import proto
      from google.cloud import dataplex_v1
      
      from src.constants import EntryType
      from src import name_builder as nb
      
      
      @dataclasses.dataclass(slots=True)
      class ImportItem:
          """A template class for Import API."""
      
          entry: dataplex_v1.Entry = dataclasses.field(default_factory=dataplex_v1.Entry)
          aspect_keys: List[str] = dataclasses.field(default_factory=list)
          update_mask: List[str] = dataclasses.field(default_factory=list)
      
      
      def _dict_factory(data: object):
          """Factory function required for converting Entry dataclass to dict."""
      
          def convert(obj: object):
              if isinstance(obj, proto.Message):
                  return proto.Message.to_dict(obj)
              return obj
      
          return dict((k, convert(v)) for k, v in data)
      
      
      def _create_entry(config: Dict[str, str], entry_type: EntryType):
          """Creates an entry based on a Dataplex library."""
          entry = dataplex_v1.Entry()
          entry.name = nb.create_name(config, entry_type)
          entry.entry_type = entry_type.value.format(
              project=config["target_project_id"], location=config["target_location_id"]
          )
          entry.fully_qualified_name = nb.create_fqn(config, entry_type)
          entry.parent_entry = nb.create_parent_name(config, entry_type)
      
          aspect_key = nb.create_entry_aspect_name(config, entry_type)
      
          # Add mandatory aspect
          entry_aspect = dataplex_v1.Aspect()
          entry_aspect.aspect_type = aspect_key
          entry_aspect.data = {}
          entry.aspects[aspect_key] = entry_aspect
      
          return entry
      
      
      def _entry_to_import_item(entry: dataplex_v1.Entry):
          """Packs entry to import item, accepted by the API,"""
          import_item = ImportItem()
          import_item.entry = entry
          import_item.aspect_keys = list(entry.aspects.keys())
          import_item.update_mask = "aspects"
      
          return import_item
      
      
      def create(config, entry_type: EntryType):
          """Creates an entry, packs it to Import Item and converts to json."""
          import_item = _entry_to_import_item(_create_entry(config, entry_type))
          return json.dumps(dataclasses.asdict(import_item, dict_factory=_dict_factory))
      
    11. מעדכנים את הקובץ src/bootstrap.py עם קוד כדי ליצור את קובץ ייבוא המטא-נתונים ומריצים את המחבר.

      """The entrypoint of a pipeline."""
      from typing import Dict
      
      from src.constants import EntryType
      from src import cmd_reader
      from src import secret_manager
      from src import entry_builder
      from src import gcs_uploader
      from src import top_entry_builder
      from src.oracle_connector import OracleConnector
      
      
      FILENAME = "output.jsonl"
      
      
      def write_jsonl(output_file, json_strings):
          """Writes a list of string to the file in JSONL format."""
      
          # For simplicity, dataset is written into the one file. But it is not
          # mandatory, and the order doesn't matter for Import API.
          # The PySpark itself could dump entries into many smaller JSONL files.
          # Due to performance, it's recommended to dump to many smaller files.
          for string in json_strings:
              output_file.write(string + "\n")
      
      
      def process_dataset(
          connector: OracleConnector,
          config: Dict[str, str],
          schema_name: str,
          entry_type: EntryType,
      ):
          """Builds dataset and converts it to jsonl."""
          df_raw = connector.get_dataset(schema_name, entry_type)
          df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
          return df.toJSON().collect()
      
      
      def run():
          """Runs a pipeline."""
          config = cmd_reader.read_args()
          config["password"] = secret_manager.get_password(config["password_secret"])
          connector = OracleConnector(config)
      
          with open(FILENAME, "w", encoding="utf-8") as file:
              # Write top entries that don't require connection to the database
              file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
              file.writelines("\n")
              file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
      
              # Get schemas, write them and collect to the list
              df_raw_schemas = connector.get_db_schemas()
              schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
              schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
      
              write_jsonl(file, schemas_json)
      
              # Ingest tables and views for every schema in a list
              for schema in schemas:
                  print(f"Processing tables for {schema}")
                  tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                  write_jsonl(file, tables_json)
                  print(f"Processing views for {schema}")
                  views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                  write_jsonl(file, views_json)
      
          gcs_uploader.upload(config, FILENAME)
      
    12. מריצים את הקוד באופן מקומי.

      מוחזר קובץ ייבוא של מטא-נתונים בשם output.jsonl. בקובץ יש שתי שורות, כל אחת מייצגת פריט לייבוא. צינור הקישוריות המנוהל קורא את הקובץ הזה כשמריצים את משימת ייבוא המטא-נתונים.

    13. אופציונלי: אפשר להרחיב את הדוגמה הקודמת כדי להשתמש במחלקות של ספריית הלקוח של Dataplex Universal Catalog כדי ליצור פריטי ייבוא לטבלאות, לסכימות ולתצוגות. אפשר גם להריץ את דוגמת ה-Python ב-Serverless for Apache Spark.

      מומלץ ליצור מחבר שמשתמש ב-Spark (ומופעל ב-Serverless for Apache Spark), כי הוא יכול לשפר את הביצועים של המחבר.

    יצירת מחבר PySpark

    הדוגמה הזו מבוססת על PySpark DataFrame API. אפשר להתקין את PySpark SQL ולהריץ אותו באופן מקומי לפני שמריצים אותו ב-Serverless for Apache Spark. אם מתקינים ומריצים את PySpark באופן מקומי, צריך להתקין את ספריית PySpark באמצעות pip, אבל לא צריך להתקין אשכול Spark מקומי.

    מסיבות שקשורות לביצועים, בדוגמה הזו לא נעשה שימוש במחלקות מוגדרות מראש מהספרייה של PySpark. במקום זאת, בדוגמה נוצרים אובייקטים מסוג DataFrame, הם מומרים לרשומות JSON, והפלט נכתב לקובץ ייבוא של מטא-נתונים בפורמט JSON Lines שאפשר לייבא ל-Dataplex Universal Catalog.

    כדי ליצור מחבר באמצעות PySpark:

    1. משכפלים את מאגר cloud-dataplex.

    2. מתקינים את PySpark:

      pip install pyspark
      
    3. דרישות להתקנה:

      pip install -r requirements.txt
      

      הדרישות הבאות מותקנות:

      google-cloud-dataplex==2.2.2
      google-cloud-storage
      google-cloud-secret-manager
      
    4. מעדכנים את הקובץ oracle_connector.py עם קוד לקריאת נתונים ממקור נתונים של Oracle ולהחזרת DataFrames.

      """Reads Oracle using PySpark."""
      from typing import Dict
      from pyspark.sql import SparkSession, DataFrame
      
      from src.constants import EntryType
      
      
      SPARK_JAR_PATH = "/opt/spark/jars/ojdbc11.jar"
      
      
      class OracleConnector:
          """Reads data from Oracle and returns Spark Dataframes."""
      
          def __init__(self, config: Dict[str, str]):
              # PySpark entrypoint
              self._spark = SparkSession.builder.appName("OracleIngestor") \
                  .config("spark.jars", SPARK_JAR_PATH) \
                  .getOrCreate()
      
              self._config = config
              self._url = f"jdbc:oracle:thin:@{config['host_port']}:{config['database']}"
      
          def _execute(self, query: str) -> DataFrame:
              """A generic method to execute any query."""
              return self._spark.read.format("jdbc") \
                  .option("driver", "oracle.jdbc.OracleDriver") \
                  .option("url", self._url) \
                  .option("query", query) \
                  .option("user", self._config["user"]) \
                  .option("password", self._config["password"]) \
                  .load()
      
          def get_db_schemas(self) -> DataFrame:
              """In Oracle, schemas are usernames."""
              query = "SELECT username FROM dba_users"
              return self._execute(query)
      
          def _get_columns(self, schema_name: str, object_type: str) -> str:
              """Gets a list of columns in tables or views in a batch."""
              # Every line here is a column that belongs to the table or to the view.
              # This SQL gets data from ALL the tables in a given schema.
              return (f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "
                      f"col.DATA_TYPE, col.NULLABLE "
                      f"FROM all_tab_columns col "
                      f"INNER JOIN DBA_OBJECTS tab "
                      f"ON tab.OBJECT_NAME = col.TABLE_NAME "
                      f"WHERE tab.OWNER = '{schema_name}' "
                      f"AND tab.OBJECT_TYPE = '{object_type}'")
      
          def get_dataset(self, schema_name: str, entry_type: EntryType):
              """Gets data for a table or a view."""
              # Dataset means that these entities can contain end user data.
              short_type = entry_type.name  # table or view, or the title of enum value
              query = self._get_columns(schema_name, short_type)
              return self._execute(query)
      

      מוסיפים שאילתות SQL כדי להחזיר את המטא-נתונים שרוצים לייבא. השאילתות צריכות להחזיר את הפרטים הבאים:

      • סכימות של מסדי נתונים
      • טבלאות ששייכות לסכימות האלה
      • עמודות ששייכות לטבלאות האלה, כולל שם העמודה, סוג הנתונים בעמודה והאם העמודה יכולה להכיל ערך null או שהיא נדרשת

      כל העמודות של כל הטבלאות והתצוגות מאוחסנות באותה טבלת מערכת. אפשר לבחור עמודות באמצעות השיטה _get_columns. בהתאם לפרמטרים שאתם מספקים, אתם יכולים לבחור עמודות לטבלאות או לתצוגות בנפרד.

      שימו לב לנקודות הבאות:

      • ב-Oracle, סכימת מסד נתונים נמצאת בבעלות של משתמש מסד נתונים, והשם שלה זהה לשם של המשתמש.
      • אובייקטים של סכימה הם מבנים לוגיים שנוצרים על ידי משתמשים. אובייקטים כמו טבלאות או אינדקסים יכולים להכיל נתונים, ואובייקטים כמו תצוגות או מילים נרדפות מכילים רק הגדרה.
      • הקובץ ojdbc11.jar מכיל את Oracle JDBC driver.
    5. מעדכנים את הקובץ src/entry_builder.py בשיטות משותפות להחלת טרנספורמציות של Spark.

      """Creates entries with PySpark."""
      import pyspark.sql.functions as F
      from pyspark.sql.types import StringType
      
      from src.constants import EntryType, SOURCE_TYPE
      from src import name_builder as nb
      
      
      @F.udf(returnType=StringType())
      def choose_metadata_type_udf(data_type: str):
          """Choose the metadata type based on Oracle native type."""
          if data_type.startswith("NUMBER") or data_type in ["FLOAT", "LONG"]:
              return "NUMBER"
          if data_type.startswith("VARCHAR") or data_type.startswith("NVARCHAR2"):
              return "STRING"
          if data_type == "DATE":
              return "DATETIME"
          return "OTHER"
      
      
      def create_entry_source(column):
          """Create Entry Source segment."""
          return F.named_struct(F.lit("display_name"),
                                column,
                                F.lit("system"),
                                F.lit(SOURCE_TYPE))
      
      
      def create_entry_aspect(entry_aspect_name):
          """Create aspect with general information (usually it is empty)."""
          return F.create_map(
              F.lit(entry_aspect_name),
              F.named_struct(
                  F.lit("aspect_type"),
                  F.lit(entry_aspect_name),
                  F.lit("data"),
                  F.create_map()
                  )
              )
      
      
      def convert_to_import_items(df, aspect_keys):
          """Convert entries to import items."""
          entry_columns = ["name", "fully_qualified_name", "parent_entry",
                           "entry_source", "aspects", "entry_type"]
      
          # Puts entry to "entry" key, a list of keys from aspects in "aspects_keys"
          # and "aspects" string in "update_mask"
          return df.withColumn("entry", F.struct(entry_columns)) \
            .withColumn("aspect_keys", F.array([F.lit(key) for key in aspect_keys])) \
            .withColumn("update_mask", F.array(F.lit("aspects"))) \
            .drop(*entry_columns)
      
      
      def build_schemas(config, df_raw_schemas):
          """Create a dataframe with database schemas from the list of usernames.
          Args:
              df_raw_schemas - a dataframe with only one column called USERNAME
          Returns:
              A dataframe with Dataplex-readable schemas.
          """
          entry_type = EntryType.DB_SCHEMA
          entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
      
          # For schema, parent name is the name of the database
          parent_name =  nb.create_parent_name(config, entry_type)
      
          # Create user-defined function.
          create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x),
                                  StringType())
          create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x),
                                 StringType())
      
          # Fills the missed project and location into the entry type string
          full_entry_type = entry_type.value.format(
              project=config["target_project_id"],
              location=config["target_location_id"])
      
          # Converts a list of schema names to the Dataplex-compatible form
          column = F.col("USERNAME")
          df = df_raw_schemas.withColumn("name", create_name_udf(column)) \
            .withColumn("fully_qualified_name", create_fqn_udf(column)) \
            .withColumn("parent_entry", F.lit(parent_name)) \
            .withColumn("entry_type", F.lit(full_entry_type)) \
            .withColumn("entry_source", create_entry_source(column)) \
            .withColumn("aspects", create_entry_aspect(entry_aspect_name)) \
          .drop(column)
      
          df = convert_to_import_items(df, [entry_aspect_name])
          return df
      
      
      def build_dataset(config, df_raw, db_schema, entry_type):
          """Build table entries from a flat list of columns.
          Args:
              df_raw - a plain dataframe with TABLE_NAME, COLUMN_NAME, DATA_TYPE,
                       and NULLABLE columns
              db_schema - parent database schema
              entry_type - entry type: table or view
          Returns:
              A dataframe with Dataplex-readable data of tables of views.
          """
          schema_key = "dataplex-types.global.schema"
      
          # The transformation below does the following
          # 1. Alters NULLABLE content from Y/N to NULLABLE/REQUIRED
          # 2. Renames NULLABLE to mode
          # 3. Renames DATA_TYPE to dataType
          # 4. Creates metadataType column based on dataType column
          # 5. Renames COLUMN_NAME to name
          df = df_raw \
            .withColumn("mode", F.when(F.col("NULLABLE") == 'Y', "NULLABLE").otherwise("REQUIRED")) \
            .drop("NULLABLE") \
            .withColumnRenamed("DATA_TYPE", "dataType") \
            .withColumn("metadataType", choose_metadata_type_udf("dataType")) \
            .withColumnRenamed("COLUMN_NAME", "name")
      
          # The transformation below aggregate fields, denormalizing the table
          # TABLE_NAME becomes top-level filed, and the rest is put into
          # the array type called "fields"
          aspect_columns = ["name", "mode", "dataType", "metadataType"]
          df = df.withColumn("columns", F.struct(aspect_columns))\
            .groupby('TABLE_NAME') \
            .agg(F.collect_list("columns").alias("fields"))
      
          # Create nested structured called aspects.
          # Fields are becoming a part of a `schema` struct
          # There is also an entry_aspect that is repeats entry_type as aspect_type
          entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
          df = df.withColumn("schema",
                             F.create_map(F.lit(schema_key),
                                          F.named_struct(
                                              F.lit("aspect_type"),
                                              F.lit(schema_key),
                                              F.lit("data"),
                                              F.create_map(F.lit("fields"),
                                                           F.col("fields")))
                                          )
                             )\
            .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
          .drop("fields")
      
          # Merge separate aspect columns into the one map called 'aspects'
          df = df.select(F.col("TABLE_NAME"),
                         F.map_concat("schema", "entry_aspect").alias("aspects"))
      
          # Define user-defined functions to fill the general information
          # and hierarchy names
          create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
                                                           db_schema, x),
                                  StringType())
      
          create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
                                                         db_schema, x), StringType())
      
          parent_name = nb.create_parent_name(config, entry_type, db_schema)
          full_entry_type = entry_type.value.format(
              project=config["target_project_id"],
              location=config["target_location_id"])
      
          # Fill the top-level fields
          column = F.col("TABLE_NAME")
          df = df.withColumn("name", create_name_udf(column)) \
            .withColumn("fully_qualified_name", create_fqn_udf(column)) \
            .withColumn("entry_type", F.lit(full_entry_type)) \
            .withColumn("parent_entry", F.lit(parent_name)) \
            .withColumn("entry_source", create_entry_source(column)) \
          .drop(column)
      
          df = convert_to_import_items(df, [schema_key, entry_aspect_name])
          return df
      

      שימו לב לנקודות הבאות:

      • השיטות יוצרות את משאבי המטא-נתונים שהמחבר יוצר עבור משאבי Oracle. משתמשים במוסכמות שמתוארות בקטע משאבי מטא-נתונים לדוגמה למקור Oracle במסמך הזה.
      • השיטה convert_to_import_items חלה על סכימות, טבלאות ותצוגות. צריך לוודא שהפלט של המחבר הוא פריט ייבוא אחד או יותר שאפשר לעבד באמצעות השיטה metadataJobs.create, ולא רשומות נפרדות.
      • גם בתצוגה, העמודה נקראת TABLE_NAME.
    6. מעדכנים את הקובץ bootstrap.py עם קוד כדי ליצור את קובץ ייבוא המטא-נתונים ומריצים את המחבר.

      """The entrypoint of a pipeline."""
      from typing import Dict
      
      from src.constants import EntryType
      from src import cmd_reader
      from src import secret_manager
      from src import entry_builder
      from src import gcs_uploader
      from src import top_entry_builder
      from src.oracle_connector import OracleConnector
      
      
      FILENAME = "output.jsonl"
      
      
      def write_jsonl(output_file, json_strings):
          """Writes a list of string to the file in JSONL format."""
      
          # For simplicity, dataset is written into the one file. But it is not
          # mandatory, and the order doesn't matter for Import API.
          # The PySpark itself could dump entries into many smaller JSONL files.
          # Due to performance, it's recommended to dump to many smaller files.
          for string in json_strings:
              output_file.write(string + "\n")
      
      
      def process_dataset(
          connector: OracleConnector,
          config: Dict[str, str],
          schema_name: str,
          entry_type: EntryType,
      ):
          """Builds dataset and converts it to jsonl."""
          df_raw = connector.get_dataset(schema_name, entry_type)
          df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
          return df.toJSON().collect()
      
      
      def run():
          """Runs a pipeline."""
          config = cmd_reader.read_args()
          config["password"] = secret_manager.get_password(config["password_secret"])
          connector = OracleConnector(config)
      
          with open(FILENAME, "w", encoding="utf-8") as file:
              # Write top entries that don't require connection to the database
              file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
              file.writelines("\n")
              file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
      
              # Get schemas, write them and collect to the list
              df_raw_schemas = connector.get_db_schemas()
              schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
              schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
      
              write_jsonl(file, schemas_json)
      
              # Ingest tables and views for every schema in a list
              for schema in schemas:
                  print(f"Processing tables for {schema}")
                  tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                  write_jsonl(file, tables_json)
                  print(f"Processing views for {schema}")
                  views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                  write_jsonl(file, views_json)
      
          gcs_uploader.upload(config, FILENAME)
      

      בדוגמה הזו, קובץ ייבוא המטא-נתונים נשמר כקובץ JSON Lines יחיד. אפשר להשתמש בכלים של PySpark כמו המחלקה DataFrameWriter כדי להפיק קבוצות של JSON במקביל.

      המחבר יכול לכתוב רשומות לקובץ ייבוא המטא-נתונים בכל סדר.

    7. מעדכנים את הקובץ gcs_uploader.py עם קוד להעלאת קובץ ייבוא המטא-נתונים לקטגוריה של Cloud Storage.

      """Sends files to GCP storage."""
      from typing import Dict
      from google.cloud import storage
      
      
      def upload(config: Dict[str, str], filename: str):
          """Uploads a file to GCP bucket."""
          client = storage.Client()
          bucket = client.get_bucket(config["output_bucket"])
          folder = config["output_folder"]
      
          blob = bucket.blob(f"{folder}/{filename}")
          blob.upload_from_filename(filename)
      
    8. יוצרים את תמונת המחבר.

      אם המחבר מכיל כמה קבצים, או אם רוצים להשתמש בספריות שלא נכללות ב קובץ אימג' של Docker שמוגדר כברירת מחדל, צריך להשתמש בקונטיינר בהתאמה אישית. ‫Serverless ל-Apache Spark מריץ עומסי עבודה (workloads) בתוך קונטיינרים של Docker. יוצרים קובץ אימג' מותאם אישית של Docker למחבר ומאחסנים את קובץ האימג' ב-Artifact Registry. ‫Serverless for Apache Spark קורא את התמונה מ-Artifact Registry.

      1. יוצרים Dockerfile:

        FROM debian:11-slim
        
        ENV DEBIAN_FRONTEND=noninteractive
        
        RUN apt update && apt install -y procps tini
        RUN apt install -y wget
        
        ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
        RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
        COPY ojdbc11.jar "${SPARK_EXTRA_JARS_DIR}"
        
        ENV CONDA_HOME=/opt/miniconda3
        ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
        ENV PATH=${CONDA_HOME}/bin:${PATH}
        RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py311_24.9.2-0-Linux-x86_64.sh
        
        RUN bash Miniconda3-py310_23.3.1-0-Linux-x86_64.sh -b -p /opt/miniconda3 \
          && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
          && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
          && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
          && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
        
        RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
            && ${CONDA_HOME}/bin/mamba install \
              conda \
              google-cloud-dataproc \
              google-cloud-logging \
              google-cloud-monitoring \
              google-cloud-storage
        
        RUN apt update && apt install -y git
        COPY requirements.txt .
        RUN python -m pip install -r requirements.txt
        
        ENV PYTHONPATH=/opt/python/packages
        RUN mkdir -p "${PYTHONPATH}/src/"
        COPY src/ "${PYTHONPATH}/src/"
        COPY main.py .
        
        RUN groupadd -g 1099 spark
        RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
        USER spark

        משתמשים ב-Conda כמנהל החבילות. ‫Serverless for Apache Spark מבצעת mount של pyspark בקונטיינר בזמן הריצה, כך שלא צריך להתקין תלות ב-PySpark בקובץ האימג' של הקונטיינר המותאם אישית.

      2. יוצרים את קובץ האימג' של הקונטיינר בהתאמה אישית ומעבירים אותו בדחיפה ל-Artifact Registry.

        #!/bin/bash
        
        IMAGE=oracle-pyspark:0.0.1
        PROJECT=<PROJECT_ID>
        
        
        REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pyspark
        
        docker build -t "${IMAGE}" .
        
        # Tag and push to GCP container registry
        gcloud config set project ${PROJECT}
        gcloud auth configure-docker us-central1-docker.pkg.dev
        docker tag "${IMAGE}" "${REPO_IMAGE}"
        docker push "${REPO_IMAGE}"
        

        לתמונה אחת יכולים להיות כמה שמות, ולכן אפשר להשתמש בתג Docker כדי להקצות לתמונה כינוי.

    9. מריצים את המחבר ב-Serverless for Apache Spark. כדי לשלוח משימה באצווה של PySpark באמצעות קובץ האימג' של הקונטיינר בהתאמה אישית, מריצים את הפקודה gcloud dataproc batches submit pyspark.

      gcloud dataproc batches submit pyspark main.py --project=PROJECT \
          --region=REGION --batch=BATCH_ID \
          --container-image=CUSTOM_CONTAINER_IMAGE \
          --service-account=SERVICE_ACCOUNT_NAME \
          --jars=PATH_TO_JAR_FILES \
          --properties=PYSPARK_PROPERTIES \
          -- PIPELINE_ARGUMENTS
      

      שימו לב לנקודות הבאות:

      • קובצי ה-JAR הם דרייברים ל-Spark. כדי לקרוא מ-Oracle, מ-MySQL או מ-Postgres, צריך לספק ל-Apache Spark חבילה ספציפית. החבילה יכולה להיות ממוקמת ב-Cloud Storage או בתוך הקונטיינר. אם קובץ ה-JAR נמצא בתוך המאגר, הנתיב דומה ל-file:///path/to/file/driver.jar. בדוגמה הזו, הנתיב לקובץ ה-JAR הוא /opt/spark/jars/.
      • PIPELINE_ARGUMENTS הם הארגומנטים בשורת הפקודה של המחבר.

      המחבר מחלץ מטא-נתונים ממסד הנתונים של Oracle, יוצר קובץ לייבוא מטא-נתונים ושומר את הקובץ הזה בקטגוריה של Cloud Storage.

    10. כדי לייבא ידנית את המטא-נתונים בקובץ ייבוא המטא-נתונים אל Dataplex Universal Catalog, מריצים משימת מטא-נתונים. משתמשים בשיטה metadataJobs.create.

      1. בשורת הפקודה, מוסיפים משתני סביבה ויוצרים כינוי לפקודת curl.

        PROJECT_ID=PROJECT
        LOCATION_ID=LOCATION
        DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID
        alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
        
      2. קוראים ל-method של ה-API ומעבירים את סוגי הרשומות ואת סוגי ההיבטים שרוצים לייבא.

        gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF
        {
          "type": "IMPORT",
          "import_spec": {
            "source_storage_uri": "gs://BUCKET/FOLDER/",
            "entry_sync_mode": "FULL",
            "aspect_sync_mode": "INCREMENTAL",
            "scope": {
              "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],
              "entry_types": [
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],
        
              "aspect_types": [
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance",
                "projects/dataplex-types/locations/global/aspectTypes/schema",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],
              },
            },
          }
        EOF
        )"
        

        המאפיין schema הוא מאפיין גלובלי שמוגדר על ידי Dataplex Universal Catalog.

        שימו לב: הפורמט שבו משתמשים בשמות של סוגי היבטים כשקוראים לשיטת ה-API שונה מהפורמט שבו משתמשים בקוד של מחבר.

      3. אופציונלי: אפשר להשתמש ב-Cloud Logging כדי לראות את היומנים של משימת המטא-נתונים. מידע נוסף זמין במאמר בנושא מעקב אחרי יומנים של Dataplex Universal Catalog.

    הגדרת תזמור של צינורות עיבוד נתונים

    בקטעים הקודמים הראינו איך ליצור מחבר לדוגמה ולהריץ אותו באופן ידני.

    בסביבת ייצור, מפעילים את המחבר כחלק מצינור קישוריות מנוהל, באמצעות פלטפורמת תזמור כמו Workflows.

    1. כדי להריץ צינור קישוריות מנוהל באמצעות מחבר לדוגמה, פועלים לפי השלבים לייבוא מטא-נתונים באמצעות Workflows. צריך לבצע את הפעולות הבאות:

      • יוצרים את תהליך העבודה באותו Google Cloud מיקום שבו נמצא המחבר.
      • בקובץ ההגדרה של תהליך העבודה, מעדכנים את הפונקציה submit_pyspark_extract_job באמצעות הקוד הבא כדי לחלץ נתונים ממסד הנתונים של Oracle באמצעות המחבר שיצרתם.

        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  jars: file:///opt/spark/jars/ojdbc11.jar
                  args:
                    - ${"--host_port=" + args.ORACLE_HOST_PORT}
                    - ${"--user=" + args.ORACLE_USER}
                    - ${"--password=" + args.ORACLE_PASSWORD}
                    - ${"--database=" + args.ORACE_DATABASE}
                    - ${"--project=" + args.TARGET_PROJECT_ID}
                    - ${"--location=" + args.CLOUD_REGION}
                    - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--folder=" + WORKFLOW_ID}
                runtimeConfig:
                  version: "2.0"
                  containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"
                environmentConfig:
                  executionConfig:
                      serviceAccount: ${args.SERVICE_ACCOUNT}
            result: RESPONSE_MESSAGE
        
      • בקובץ הגדרת תהליך העבודה, מעדכנים את הפונקציה submit_import_job עם הקוד הבא כדי לייבא את הרשומות. הפונקציה מפעילה את שיטת metadataJobs.create API כדי להריץ עבודת ייבוא של מטא-נתונים.

        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  scope:
                    entry_groups:
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}
                    entry_types:
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"
                    aspect_types:
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"
                      -"projects/dataplex-types/locations/global/aspectTypes/schema"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"
            result: IMPORT_JOB_RESPONSE
        

        צריך לספק את אותם סוגי רשומות וסוגי היבטים שכללתם כשקראתם לשיטת ה-API באופן ידני. שימו לב שאין פסיק בסוף כל מחרוזת.

      • כשמריצים את תהליך העבודה, מספקים את הארגומנטים הבאים של זמן הריצה:

        {
          "CLOUD_REGION": "us-central1",
          "ORACLE_USER": "system",
          "ORACLE_HOST_PORT": "x.x.x.x:1521",
          "ORACLE_DATABASE": "xe",
          "ADDITIONAL_CONNECTOR_ARGS": [],
        }
        
    2. אופציונלי: אפשר להשתמש ב-Cloud Logging כדי להציג יומנים של צינור הנתונים של הקישוריות המנוהלת. המטען הייעודי (Payload) של היומן כולל קישור ליומנים של משימת אצווה של Serverless for Apache Spark ומשימת ייבוא המטא-נתונים, בהתאם. מידע נוסף זמין במאמר בנושא הצגת יומני זרימת עבודה.

    3. אופציונלי: כדי לשפר את האבטחה, הביצועים והפונקציונליות של צינור הקישוריות המנוהל, כדאי לבצע את הפעולות הבאות:

      1. משתמשים ב-Secret Manager כדי לאחסן את פרטי הכניסה למקור הנתונים של צד שלישי.
      2. משתמשים ב-PySpark כדי לכתוב את הפלט של JSON Lines לכמה קבצים של ייבוא מטא-נתונים במקביל.
      3. כדי לפצל קבצים גדולים (מעל 100 MB) לקבצים קטנים יותר, צריך להשתמש בקידומת.
      4. מוסיפים עוד היבטים מותאמים אישית שכוללים מטא-נתונים עסקיים וטכניים נוספים מהמקור.

    דוגמאות למשאבי מטא-נתונים למקור Oracle

    מחבר לדוגמה מחלץ מטא-נתונים ממסד נתונים של Oracle וממפה את המטא-נתונים למשאבי מטא-נתונים תואמים ב-Dataplex Universal Catalog.

    שיקולים לגבי ההיררכיה

    לכל מערכת ב-Dataplex Universal Catalog יש רשומת בסיס שהיא רשומת ההורה של המערכת. בדרך כלל, רשומת הבסיס היא מסוג instance. בטבלה הבאה מוצגת היררכיה לדוגמה של סוגי רשומות וסוגי היבטים במערכת Oracle. לדוגמה, סוג הרשומה oracle-database מקושר לסוג מאפיין שנקרא גם oracle-database.

    מזהה סוג רשומה תיאור מזהה סוג ההיבט המקושר
    oracle-instance הבסיס של המערכת המיובאת. oracle-instance
    oracle-database מסד הנתונים של Oracle. oracle-database
    oracle-schema סכימת מסד הנתונים. oracle-schema
    oracle-table טבלה.

    oracle-table

    schema

    oracle-view תצוגה.

    oracle-view

    schema

    המאפיין schema הוא מאפיין גלובלי שמוגדר על ידי Dataplex Universal Catalog. הוא מכיל תיאור של השדות בטבלה, בתצוגה או בישות אחרת שיש לה עמודות. המאפיין oracle-schema custom aspect type מכיל את השם של סכמת מסד הנתונים של Oracle.

    דוגמה לייבוא של שדות פריטים

    המחבר צריך להשתמש במוסכמות הבאות למשאבי Oracle.

    • שמות שמוגדרים במלואם: שמות שמוגדרים במלואם למשאבי Oracle משתמשים בתבנית השמות הבאה. תווים אסורים מסומנים בתו בריחה (escape) של גרש הפוך.

      משאב תבנית דוגמה
      Instance

      SOURCE:ADDRESS

      משתמשים במארח ובמספר היציאה או בשם הדומיין של המערכת.

      oracle:`localhost:1521` או oracle:`myinstance.com`
      מסד נתונים SOURCE:ADDRESS.DATABASE oracle:`localhost:1521`.xe
      סכימה SOURCE:ADDRESS.DATABASE.SCHEMA oracle:`localhost:1521`.xe.sys
      טבלה SOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAME oracle:`localhost:1521`.xe.sys.orders
      הצגה SOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAME oracle:`localhost:1521`.xe.sys.orders_view
    • שמות של רשומות או מזהים של רשומות: רשומות של משאבי Oracle משתמשים בתבנית השמות הבאה. תווים אסורים מוחלפים בתו מותר. המשאבים משתמשים בקידומת projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries.

      משאב תבנית דוגמה
      Instance HOST_PORT/PREFIX projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
      מסד נתונים PREFIX/HOST_PORT/databases/DATABASE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
      סכימה PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
      טבלה PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
      הצגה PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEW projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
    • רשומות הורה: אם רשומה מסוימת היא לא רשומת הבסיס של המערכת, יכול להיות שיהיה לה שדה של רשומת הורה שמתאר את המיקום שלה בהיררכיה. השדה צריך להכיל את השם של רשומת ההורה. מומלץ ליצור את הערך הזה.

      בטבלה הבאה מפורטים רשומות ההורה של משאבי Oracle.

      הערך רשומה של הורה
      Instance "" (מחרוזת ריקה)
      מסד נתונים שם המכונה
      סכימה שם מסד הנתונים
      טבלה שם הסכימה
      הצגה שם הסכימה
    • מפת היבטים: מפת ההיבטים צריכה להכיל לפחות היבט אחד שמתאר את הישות לייבוא. הנה דוגמה למפת היבטים של טבלת Oracle.

      "example-project.us-central1.oracle-table": {
          "aspect_type": "example-project.us-central1.oracle-table",
          "path": "",
          "data": {}
       },

      אפשר למצוא סוגי היבטים מוגדרים מראש (כמו schema) שמגדירים את המבנה של הטבלה או התצוגה בפרויקט dataplex-types, במיקום global.

    • מפתחות מאפיינים: מפתחות מאפיינים משתמשים בפורמט השמות PROJECT.LOCATION.ASPECT_TYPE. בטבלה הבאה מוצגות דוגמאות למפתחות מאפיינים של משאבי Oracle.

      הערך דוגמה למקש יחס גובה-רוחב
      Instance example-project.us-central1.oracle-instance
      מסד נתונים example-project.us-central1.oracle-database
      סכימה example-project.us-central1.oracle-schema
      טבלה example-project.us-central1.oracle-table
      הצגה example-project.us-central1.oracle-view

    המאמרים הבאים