開發用於匯入中繼資料的自訂連接器

本文提供參考範本,協助您建構自訂連接器,從 MySQL、SQL Server 和 Oracle 等第三方來源擷取中繼資料。您可以使用這個連接器,透過代管的連線管道,將中繼資料匯入 Knowledge Catalog (前身為 Dataplex Universal Catalog)。我們提供 Oracle Database Express Edition (XE) 的 Python 連接器範例,供您做為起點。您也可以使用 Java、Scala 或 R 開發連接器。

連結器的運作方式

連接器會從第三方資料來源擷取中繼資料,將中繼資料轉換為 Knowledge Catalog ImportItem 格式,並產生可由 Knowledge Catalog 匯入的中繼資料匯入檔案。

連接器是代管連線管道的一部分。受管理連線管道是自動化調度管理的工作流程,可用於匯入知識目錄中繼資料。代管連線管道會執行連接器,並在匯入工作流程中執行其他工作,例如執行中繼資料匯入作業及擷取記錄。

代管式連線管道會使用 Managed Service for Apache Spark 批次工作執行連接器。Managed Service for Apache Spark 提供無伺服器 Spark 執行環境。雖然您可以建構不使用 Spark 的連接器,但我們建議使用 Spark,因為這有助於提升連接器的效能。

連接器需求

連接器必須符合下列規定:

  • 連接器必須是 Artifact Registry 映像檔,且可在 Managed Service for Apache Spark 上執行。
  • 連接器產生的中繼資料檔案格式必須可供 Knowledge Catalog 中繼資料匯入工作 (metadataJobs.create API 方法) 匯入。如需詳細規定,請參閱「中繼資料匯入檔案」。
  • 連接器必須接受下列指令列引數,才能從管道接收資訊:

    指令列引數 管道提供的值
    target_project_id PROJECT_ID
    target_location_id REGION
    target_entry_group_id ENTRY_GROUP_ID
    output_bucket CLOUD_STORAGE_BUCKET_ID
    output_folder FOLDER_ID

    連接器會使用這些引數,在目標項目群組 projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID 中產生中繼資料,並寫入 Cloud Storage bucket gs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID。每次執行管道時,系統都會在 bucket CLOUD_STORAGE_BUCKET_ID 中建立新資料夾 FOLDER_ID。連接器應將中繼資料匯入檔案寫入這個資料夾。

管道範本支援 PySpark 連接器。範本會假設驅動程式 (mainPythonFileUri) 是連接器映像檔中名為 main.py 的本機檔案。您可以修改其他情境的管道範本,例如 Spark 連接器、其他驅動程式 URI 或其他選項。

以下說明如何使用 PySpark 在中繼資料匯入檔案中建立匯入項目。

"""PySpark schemas for the data."""
entry_source_schema = StructType([
      StructField("display_name", StringType()),
      StructField("source", StringType())])

aspect_schema = MapType(StringType(),
                        StructType([
                            StructField("aspect_type", StringType()),
                            StructField("data", StructType([
                            ]))
                          ])
                        )

entry_schema = StructType([
  StructField("name", StringType()),
  StructField("entry_type", StringType()),
  StructField("fully_qualified_name", StringType()),
  StructField("parent_entry", StringType()),
  StructField("entry_source", entry_source_schema),
  StructField("aspects", aspect_schema)
])

import_item_schema = StructType([
  StructField("entry", entry_schema),
  StructField("aspect_keys", ArrayType(StringType())),
  StructField("update_mask", ArrayType(StringType()))
])

事前準備

本指南假設您熟悉 Python 和 PySpark。

查看下列資訊:

請執行下列操作。所有資源均須建立於相同位置。 Google Cloud

  1. 建立或選取 Google Cloud 專案

    選取或建立專案所需的角色

    • 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
    • 建立專案:如要建立專案,您需要具備專案建立者角色 (roles/resourcemanager.projectCreator),其中包含 resourcemanager.projects.create 權限。瞭解如何授予角色
    • 建立 Google Cloud 專案:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替換為您要建立的 Google Cloud 專案名稱。

    • 選取您建立的 Google Cloud 專案:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替換為 Google Cloud 專案名稱。

  2. 確認專案已啟用計費功能 Google Cloud

  3. 啟用 Dataplex、Managed Service for Apache Spark、Workflows 和 Artifact Registry API:

    啟用 API 時所需的角色

    如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (roles/serviceusage.serviceUsageAdmin),其中包含 serviceusage.services.enable 權限。瞭解如何授予角色

    gcloud services enable dataplex.googleapis.com dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com
  4. 安裝 Google Cloud CLI。

  5. 若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI

  6. 執行下列指令,初始化 gcloud CLI:

    gcloud init
  7. 將角色授予使用者帳戶。針對下列每個 IAM 角色,執行一次下列指令: roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    更改下列內容:

    • PROJECT_ID:專案 ID。
    • USER_IDENTIFIER:使用者帳戶的 ID。 例如:myemail@example.com
    • ROLE:授予使用者帳戶的 IAM 角色。
  8. 設定驗證方法:

    1. 確認您具備「建立服務帳戶」身分與存取權管理角色 (roles/iam.serviceAccountCreator) 和「專案 IAM 管理員」角色 (roles/resourcemanager.projectIamAdmin)。瞭解如何授予角色
    2. 建立服務帳戶:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      SERVICE_ACCOUNT_NAME 換成服務帳戶的名稱。

    3. roles/owner IAM 角色授予服務帳戶:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      請替換下列項目:

      • SERVICE_ACCOUNT_NAME:服務帳戶名稱
      • PROJECT_ID:您建立服務帳戶的專案 ID
  9. 建立 Cloud Storage bucket,以儲存中繼資料匯入檔案。

  10. 在同一個專案中建立下列中繼資料資源。

    如需範例值,請參閱本文的「Oracle 來源的中繼資料資源範例」一節。

    1. 建立項目群組
    2. 為要匯入的項目建立自訂切面類型。使用命名慣例 SOURCE-ENTITY_TO_IMPORT

      舉例來說,如果是 Oracle 資料庫,請建立名為 oracle-database 的層面類型。

      您也可以視需要建立其他切面類型,儲存其他資訊。

    3. 為要匯入的資源建立自訂項目類型,並指派相關切面類型。使用命名慣例 SOURCE-ENTITY_TO_IMPORT

      舉例來說,如果是 Oracle 資料庫,請建立名為 oracle-database 的項目類型。將其連結至名為「oracle-database」的指標類型。

  11. 確認可從 Google Cloud 專案存取第三方來源。詳情請參閱「Managed Service for Apache Spark 網路設定」。

建立基本 Python 連接器

這個基本 Python 連接器範例會使用 Knowledge Catalog 用戶端程式庫類別,為 Oracle 資料來源建立頂層項目。然後為輸入欄位提供值。

連接器會建立中繼資料匯入檔案,其中包含下列項目:

  • instance 項目,項目類型為 projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance。這個項目代表 Oracle Database XE 系統。
  • database 項目,代表 Oracle Database XE 系統內的資料庫。

如要建構基本 Python 連接器,請按照下列步驟操作:

  1. 複製cloud-dataplex存放區

  2. 設定本機環境。建議您使用虛擬環境。

    mkdir venv
    python -m venv venv/
    source venv/bin/activate
    

    請使用 Python 的「有效」或「維護」版本。支援 Python 3.7 以上版本。

  3. 建立 Python 專案。

  4. 安裝必要項目:

    pip install -r requirements.txt
    

    安裝下列必要項目:

    google-cloud-dataplex==2.2.2
    google-cloud-storage
    google-cloud-secret-manager
    
  5. 在專案根目錄中新增 main.py 管道檔案。

    from src import bootstrap
    
    
    if __name__ == '__main__':
        bootstrap.run()
    

    將程式碼部署至 Managed Service for Apache Spark 時,main.py 檔案會做為執行作業的進入點。建議您盡量減少 main.py 檔案中儲存的資訊量,並使用這個檔案呼叫連接器中定義的函式和類別,例如 src/bootstap.py 類別。

  6. 建立 src 資料夾,儲存連接器的多數邏輯。

  7. 使用 Python 類別更新 src/cmd_reader.py 檔案,以接受指令列引數。您可以使用 argeparse 模組執行這項操作。

    """Command line reader."""
    import argparse
    
    
    def read_args():
        """Reads arguments from the command line."""
        parser = argparse.ArgumentParser()
    
        # Dataplex arguments
        parser.add_argument("--target_project_id", type=str, required=True,
            help="The name of the target Google Cloud project to import the metadata into.")
        parser.add_argument("--target_location_id", type=str, required=True,
            help="The target Google Cloud location where the metadata will be imported into.")
        parser.add_argument("--target_entry_group_id", type=str, required=True,
            help="The ID of the entry group to import metadata into. "
                 "The metadata will be imported into entry group with the following"
                 "full resource name: projects/${target_project_id}/"
                 "locations/${target_location_id}/entryGroups/${target_entry_group_id}.")
    
        # Oracle arguments
        parser.add_argument("--host_port", type=str, required=True,
            help="Oracle host and port number separated by the colon (:).")
        parser.add_argument("--user", type=str, required=True, help="Oracle User.")
        parser.add_argument("--password-secret", type=str, required=True,
            help="Secret resource name in the Secret Manager for the Oracle password.")
        parser.add_argument("--database", type=str, required=True,
            help="Source Oracle database.")
    
        # Google Cloud Storage arguments
        # It is assumed that the bucket is in the same region as the entry group
        parser.add_argument("--output_bucket", type=str, required=True,
            help="The Cloud Storage bucket to write the generated metadata import file.")
        parser.add_argument("--output_folder", type=str, required=True,
            help="A folder in the Cloud Storage bucket, to write the generated metadata import files.")
    
        return vars(parser.parse_known_args()[0])
    

    在正式環境中,建議您將密碼儲存在 Secret Manager

  8. 使用程式碼更新 src/constants.py 檔案,建立常數。

    """Constants that are used in the different files."""
    import enum
    
    SOURCE_TYPE = "oracle"
    
    # Symbols for replacement
    FORBIDDEN = "#"
    ALLOWED = "!"
    
    
    class EntryType(enum.Enum):
        """Types of Oracle entries."""
        INSTANCE: str = "projects/{project}/locations/{location}/entryTypes/oracle-instance"
        DATABASE: str = "projects/{project}/locations/{location}/entryTypes/oracle-database"
        DB_SCHEMA: str = "projects/{project}/locations/{location}/entryTypes/oracle-schema"
        TABLE: str = "projects/{project}/locations/{location}/entryTypes/oracle-table"
        VIEW: str = "projects/{project}/locations/{location}/entryTypes/oracle-view"
    
  9. 使用方法更新 src/name_builder.py 檔案,建構您希望連接器為 Oracle 資源建立的中繼資料資源。請使用本文件「Oracle 來源的中繼資料資源範例」一節中說明的慣例。

    """Builds Dataplex hierarchy identifiers."""
    from typing import Dict
    from src.constants import EntryType, SOURCE_TYPE
    
    
    # Oracle cluster users start with C## prefix, but Dataplex doesn't accept #.
    # In that case in names it is changed to C!!, and escaped with backticks in FQNs
    FORBIDDEN_SYMBOL = "#"
    ALLOWED_SYMBOL = "!"
    
    
    def create_fqn(config: Dict[str, str], entry_type: EntryType,
                   schema_name: str = "", table_name: str = ""):
        """Creates a fully qualified name or Dataplex v1 hierarchy name."""
        if FORBIDDEN_SYMBOL in schema_name:
            schema_name = f"`{schema_name}`"
    
        if entry_type == EntryType.INSTANCE:
            # Requires backticks to escape column
            return f"{SOURCE_TYPE}:`{config['host_port']}`"
        if entry_type == EntryType.DATABASE:
            instance = create_fqn(config, EntryType.INSTANCE)
            return f"{instance}.{config['database']}"
        if entry_type == EntryType.DB_SCHEMA:
            database = create_fqn(config, EntryType.DATABASE)
            return f"{database}.{schema_name}"
        if entry_type in [EntryType.TABLE, EntryType.VIEW]:
            database = create_fqn(config, EntryType.DATABASE)
            return f"{database}.{schema_name}.{table_name}"
        return ""
    
    
    def create_name(config: Dict[str, str], entry_type: EntryType,
                    schema_name: str = "", table_name: str = ""):
        """Creates a Dataplex v2 hierarchy name."""
        if FORBIDDEN_SYMBOL in schema_name:
            schema_name = schema_name.replace(FORBIDDEN_SYMBOL, ALLOWED_SYMBOL)
        if entry_type == EntryType.INSTANCE:
            name_prefix = (
                f"projects/{config['target_project_id']}/"
                f"locations/{config['target_location_id']}/"
                f"entryGroups/{config['target_entry_group_id']}/"
                f"entries/"
            )
            return name_prefix + config["host_port"].replace(":", "@")
        if entry_type == EntryType.DATABASE:
            instance = create_name(config, EntryType.INSTANCE)
            return f"{instance}/databases/{config['database']}"
        if entry_type == EntryType.DB_SCHEMA:
            database = create_name(config, EntryType.DATABASE)
            return f"{database}/database_schemas/{schema_name}"
        if entry_type == EntryType.TABLE:
            db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
            return f"{db_schema}/tables/{table_name}"
        if entry_type == EntryType.VIEW:
            db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
            return f"{db_schema}/views/{table_name}"
        return ""
    
    
    def create_parent_name(config: Dict[str, str], entry_type: EntryType,
                           parent_name: str = ""):
        """Generates a Dataplex v2 name of the parent."""
        if entry_type == EntryType.DATABASE:
            return create_name(config, EntryType.INSTANCE)
        if entry_type == EntryType.DB_SCHEMA:
            return create_name(config, EntryType.DATABASE)
        if entry_type == EntryType.TABLE:
            return create_name(config, EntryType.DB_SCHEMA, parent_name)
        return ""
    
    
    def create_entry_aspect_name(config: Dict[str, str], entry_type: EntryType):
        """Generates an entry aspect name."""
        last_segment = entry_type.value.split("/")[-1]
        return f"{config['target_project_id']}.{config['target_location_id']}.{last_segment}"
    

    由於 name_builder.py 檔案同時用於 Python 核心程式碼和 PySpark 核心程式碼,建議您將方法編寫為純函式,而非類別成員。

  10. 使用程式碼更新 src/top_entry_builder.py 檔案,以填入頂層項目資料。

    """Non-Spark approach for building the entries."""
    import dataclasses
    import json
    from typing import List, Dict
    
    import proto
    from google.cloud import dataplex_v1
    
    from src.constants import EntryType
    from src import name_builder as nb
    
    
    @dataclasses.dataclass(slots=True)
    class ImportItem:
        """A template class for Import API."""
    
        entry: dataplex_v1.Entry = dataclasses.field(default_factory=dataplex_v1.Entry)
        aspect_keys: List[str] = dataclasses.field(default_factory=list)
        update_mask: List[str] = dataclasses.field(default_factory=list)
    
    
    def _dict_factory(data: object):
        """Factory function required for converting Entry dataclass to dict."""
    
        def convert(obj: object):
            if isinstance(obj, proto.Message):
                return proto.Message.to_dict(obj)
            return obj
    
        return dict((k, convert(v)) for k, v in data)
    
    
    def _create_entry(config: Dict[str, str], entry_type: EntryType):
        """Creates an entry based on a Dataplex library."""
        entry = dataplex_v1.Entry()
        entry.name = nb.create_name(config, entry_type)
        entry.entry_type = entry_type.value.format(
            project=config["target_project_id"], location=config["target_location_id"]
        )
        entry.fully_qualified_name = nb.create_fqn(config, entry_type)
        entry.parent_entry = nb.create_parent_name(config, entry_type)
    
        aspect_key = nb.create_entry_aspect_name(config, entry_type)
    
        # Add mandatory aspect
        entry_aspect = dataplex_v1.Aspect()
        entry_aspect.aspect_type = aspect_key
        entry_aspect.data = {}
        entry.aspects[aspect_key] = entry_aspect
    
        return entry
    
    
    def _entry_to_import_item(entry: dataplex_v1.Entry):
        """Packs entry to import item, accepted by the API,"""
        import_item = ImportItem()
        import_item.entry = entry
        import_item.aspect_keys = list(entry.aspects.keys())
        import_item.update_mask = "aspects"
    
        return import_item
    
    
    def create(config, entry_type: EntryType):
        """Creates an entry, packs it to Import Item and converts to json."""
        import_item = _entry_to_import_item(_create_entry(config, entry_type))
        return json.dumps(dataclasses.asdict(import_item, dict_factory=_dict_factory))
    
  11. 使用程式碼更新 src/bootstrap.py 檔案,產生中繼資料匯入檔案並執行連接器。

    """The entrypoint of a pipeline."""
    from typing import Dict
    
    from src.constants import EntryType
    from src import cmd_reader
    from src import secret_manager
    from src import entry_builder
    from src import gcs_uploader
    from src import top_entry_builder
    from src.oracle_connector import OracleConnector
    
    
    FILENAME = "output.jsonl"
    
    
    def write_jsonl(output_file, json_strings):
        """Writes a list of string to the file in JSONL format."""
    
        # For simplicity, dataset is written into the one file. But it is not
        # mandatory, and the order doesn't matter for Import API.
        # The PySpark itself could dump entries into many smaller JSONL files.
        # Due to performance, it's recommended to dump to many smaller files.
        for string in json_strings:
            output_file.write(string + "\n")
    
    
    def process_dataset(
        connector: OracleConnector,
        config: Dict[str, str],
        schema_name: str,
        entry_type: EntryType,
    ):
        """Builds dataset and converts it to jsonl."""
        df_raw = connector.get_dataset(schema_name, entry_type)
        df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
        return df.toJSON().collect()
    
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        config["password"] = secret_manager.get_password(config["password_secret"])
        connector = OracleConnector(config)
    
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write top entries that don't require connection to the database
            file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
            file.writelines("\n")
            file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
    
            # Get schemas, write them and collect to the list
            df_raw_schemas = connector.get_db_schemas()
            schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
            schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
    
            write_jsonl(file, schemas_json)
    
            # Ingest tables and views for every schema in a list
            for schema in schemas:
                print(f"Processing tables for {schema}")
                tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                write_jsonl(file, tables_json)
                print(f"Processing views for {schema}")
                views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                write_jsonl(file, views_json)
    
        gcs_uploader.upload(config, FILENAME)
    
  12. 在本機執行程式碼。

    系統會傳回名為 output.jsonl 的中繼資料匯入檔案。檔案有兩行,每行代表一個匯入項目。執行中繼資料匯入作業時,代管連線管道會讀取這個檔案。

  13. 選用:擴充先前的範例,使用 Knowledge Catalog 用戶端程式庫類別,為資料表、結構定義和檢視區塊建立匯入項目。您也可以在 Managed Service for Apache Spark 上執行 Python 範例。

    建議您建立使用 Spark 的連接器 (並在 Managed Service for Apache Spark 上執行),因為這樣可以提升連接器的效能。

建立 PySpark 連接器

本範例是以 PySpark DataFrame API 為基礎。您可以安裝 PySpark SQL,並在本機執行,然後再透過 Managed Service for Apache Spark 執行。如要在本機安裝及執行 PySpark,請使用 pip 安裝 PySpark 程式庫,但不需要安裝本機 Spark 叢集。

基於效能考量,這個範例不會使用 PySpark 程式庫的預先定義類別。這個範例會建立 DataFrame、將 DataFrame 轉換為 JSON 項目,然後以 JSON Lines 格式將輸出內容寫入中繼資料匯入檔案,以便匯入 Knowledge Catalog。

如要使用 PySpark 建構連接器,請按照下列步驟操作:

  1. 複製cloud-dataplex存放區

  2. 安裝 PySpark:

    pip install pyspark
    
  3. 安裝必要項目:

    pip install -r requirements.txt
    

    安裝下列必要項目:

    google-cloud-dataplex==2.2.2
    google-cloud-storage
    google-cloud-secret-manager
    
  4. 使用程式碼更新 oracle_connector.py 檔案,從 Oracle 資料來源讀取資料並傳回 DataFrame。

    """Reads Oracle using PySpark."""
    from typing import Dict
    from pyspark.sql import SparkSession, DataFrame
    
    from src.constants import EntryType
    
    
    SPARK_JAR_PATH = "/opt/spark/jars/ojdbc11.jar"
    
    
    class OracleConnector:
        """Reads data from Oracle and returns Spark Dataframes."""
    
        def __init__(self, config: Dict[str, str]):
            # PySpark entrypoint
            self._spark = SparkSession.builder.appName("OracleIngestor") \
                .config("spark.jars", SPARK_JAR_PATH) \
                .getOrCreate()
    
            self._config = config
            self._url = f"jdbc:oracle:thin:@{config['host_port']}:{config['database']}"
    
        def _execute(self, query: str) -> DataFrame:
            """A generic method to execute any query."""
            return self._spark.read.format("jdbc") \
                .option("driver", "oracle.jdbc.OracleDriver") \
                .option("url", self._url) \
                .option("query", query) \
                .option("user", self._config["user"]) \
                .option("password", self._config["password"]) \
                .load()
    
        def get_db_schemas(self) -> DataFrame:
            """In Oracle, schemas are usernames."""
            query = "SELECT username FROM dba_users"
            return self._execute(query)
    
        def _get_columns(self, schema_name: str, object_type: str) -> str:
            """Gets a list of columns in tables or views in a batch."""
            # Every line here is a column that belongs to the table or to the view.
            # This SQL gets data from ALL the tables in a given schema.
            return (f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "
                    f"col.DATA_TYPE, col.NULLABLE "
                    f"FROM all_tab_columns col "
                    f"INNER JOIN DBA_OBJECTS tab "
                    f"ON tab.OBJECT_NAME = col.TABLE_NAME "
                    f"WHERE tab.OWNER = '{schema_name}' "
                    f"AND tab.OBJECT_TYPE = '{object_type}'")
    
        def get_dataset(self, schema_name: str, entry_type: EntryType):
            """Gets data for a table or a view."""
            # Dataset means that these entities can contain end user data.
            short_type = entry_type.name  # table or view, or the title of enum value
            query = self._get_columns(schema_name, short_type)
            return self._execute(query)
    

    新增 SQL 查詢,傳回要匯入的中繼資料。查詢必須傳回下列資訊:

    • 資料庫結構定義
    • 屬於這些結構定義的資料表
    • 屬於這些資料表的資料欄,包括資料欄名稱、資料欄資料類型,以及資料欄是否可為空值或必填

    所有資料表和檢視區塊的所有資料欄,都會儲存在同一個系統資料表中。您可以使用 _get_columns 方法選取資料欄。 視您提供的參數而定,您可以分別為資料表或檢視畫面選取資料欄。

    注意事項:

    • 在 Oracle 中,資料庫結構定義由資料庫使用者擁有,且與該使用者同名。
    • 結構定義物件是使用者建立的邏輯結構,資料表或索引等物件可以保存資料,而檢視區塊或同義字等物件只包含定義。
    • ojdbc11.jar 檔案包含 Oracle JDBC 驅動程式
  5. 更新 src/entry_builder.py 檔案,加入用於套用 Spark 轉換的共用方法。

    """Creates entries with PySpark."""
    import pyspark.sql.functions as F
    from pyspark.sql.types import StringType
    
    from src.constants import EntryType, SOURCE_TYPE
    from src import name_builder as nb
    
    
    @F.udf(returnType=StringType())
    def choose_metadata_type_udf(data_type: str):
        """Choose the metadata type based on Oracle native type."""
        if data_type.startswith("NUMBER") or data_type in ["FLOAT", "LONG"]:
            return "NUMBER"
        if data_type.startswith("VARCHAR") or data_type.startswith("NVARCHAR2"):
            return "STRING"
        if data_type == "DATE":
            return "DATETIME"
        return "OTHER"
    
    
    def create_entry_source(column):
        """Create Entry Source segment."""
        return F.named_struct(F.lit("display_name"),
                              column,
                              F.lit("system"),
                              F.lit(SOURCE_TYPE))
    
    
    def create_entry_aspect(entry_aspect_name):
        """Create aspect with general information (usually it is empty)."""
        return F.create_map(
            F.lit(entry_aspect_name),
            F.named_struct(
                F.lit("aspect_type"),
                F.lit(entry_aspect_name),
                F.lit("data"),
                F.create_map()
                )
            )
    
    
    def convert_to_import_items(df, aspect_keys):
        """Convert entries to import items."""
        entry_columns = ["name", "fully_qualified_name", "parent_entry",
                         "entry_source", "aspects", "entry_type"]
    
        # Puts entry to "entry" key, a list of keys from aspects in "aspects_keys"
        # and "aspects" string in "update_mask"
        return df.withColumn("entry", F.struct(entry_columns)) \
          .withColumn("aspect_keys", F.array([F.lit(key) for key in aspect_keys])) \
          .withColumn("update_mask", F.array(F.lit("aspects"))) \
          .drop(*entry_columns)
    
    
    def build_schemas(config, df_raw_schemas):
        """Create a dataframe with database schemas from the list of usernames.
        Args:
            df_raw_schemas - a dataframe with only one column called USERNAME
        Returns:
            A dataframe with Dataplex-readable schemas.
        """
        entry_type = EntryType.DB_SCHEMA
        entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
    
        # For schema, parent name is the name of the database
        parent_name =  nb.create_parent_name(config, entry_type)
    
        # Create user-defined function.
        create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x),
                                StringType())
        create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x),
                               StringType())
    
        # Fills the missed project and location into the entry type string
        full_entry_type = entry_type.value.format(
            project=config["target_project_id"],
            location=config["target_location_id"])
    
        # Converts a list of schema names to the Dataplex-compatible form
        column = F.col("USERNAME")
        df = df_raw_schemas.withColumn("name", create_name_udf(column)) \
          .withColumn("fully_qualified_name", create_fqn_udf(column)) \
          .withColumn("parent_entry", F.lit(parent_name)) \
          .withColumn("entry_type", F.lit(full_entry_type)) \
          .withColumn("entry_source", create_entry_source(column)) \
          .withColumn("aspects", create_entry_aspect(entry_aspect_name)) \
        .drop(column)
    
        df = convert_to_import_items(df, [entry_aspect_name])
        return df
    
    
    def build_dataset(config, df_raw, db_schema, entry_type):
        """Build table entries from a flat list of columns.
        Args:
            df_raw - a plain dataframe with TABLE_NAME, COLUMN_NAME, DATA_TYPE,
                     and NULLABLE columns
            db_schema - parent database schema
            entry_type - entry type: table or view
        Returns:
            A dataframe with Dataplex-readable data of tables of views.
        """
        schema_key = "dataplex-types.global.schema"
    
        # The transformation below does the following
        # 1. Alters NULLABLE content from Y/N to NULLABLE/REQUIRED
        # 2. Renames NULLABLE to mode
        # 3. Renames DATA_TYPE to dataType
        # 4. Creates metadataType column based on dataType column
        # 5. Renames COLUMN_NAME to name
        df = df_raw \
          .withColumn("mode", F.when(F.col("NULLABLE") == 'Y', "NULLABLE").otherwise("REQUIRED")) \
          .drop("NULLABLE") \
          .withColumnRenamed("DATA_TYPE", "dataType") \
          .withColumn("metadataType", choose_metadata_type_udf("dataType")) \
          .withColumnRenamed("COLUMN_NAME", "name")
    
        # The transformation below aggregate fields, denormalizing the table
        # TABLE_NAME becomes top-level filed, and the rest is put into
        # the array type called "fields"
        aspect_columns = ["name", "mode", "dataType", "metadataType"]
        df = df.withColumn("columns", F.struct(aspect_columns))\
          .groupby('TABLE_NAME') \
          .agg(F.collect_list("columns").alias("fields"))
    
        # Create nested structured called aspects.
        # Fields are becoming a part of a `schema` struct
        # There is also an entry_aspect that is repeats entry_type as aspect_type
        entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
        df = df.withColumn("schema",
                           F.create_map(F.lit(schema_key),
                                        F.named_struct(
                                            F.lit("aspect_type"),
                                            F.lit(schema_key),
                                            F.lit("data"),
                                            F.create_map(F.lit("fields"),
                                                         F.col("fields")))
                                        )
                           )\
          .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
        .drop("fields")
    
        # Merge separate aspect columns into the one map called 'aspects'
        df = df.select(F.col("TABLE_NAME"),
                       F.map_concat("schema", "entry_aspect").alias("aspects"))
    
        # Define user-defined functions to fill the general information
        # and hierarchy names
        create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
                                                         db_schema, x),
                                StringType())
    
        create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
                                                       db_schema, x), StringType())
    
        parent_name = nb.create_parent_name(config, entry_type, db_schema)
        full_entry_type = entry_type.value.format(
            project=config["target_project_id"],
            location=config["target_location_id"])
    
        # Fill the top-level fields
        column = F.col("TABLE_NAME")
        df = df.withColumn("name", create_name_udf(column)) \
          .withColumn("fully_qualified_name", create_fqn_udf(column)) \
          .withColumn("entry_type", F.lit(full_entry_type)) \
          .withColumn("parent_entry", F.lit(parent_name)) \
          .withColumn("entry_source", create_entry_source(column)) \
        .drop(column)
    
        df = convert_to_import_items(df, [schema_key, entry_aspect_name])
        return df
    

    注意事項:

    • 這些方法會建構連接器為 Oracle 資源建立的中繼資料資源。請使用本文件「Oracle 來源的中繼資料資源範例」一節中說明的慣例。
    • convert_to_import_items 方法適用於結構定義、資料表和檢視表。請確認連接器的輸出內容是一或多個可由 metadataJobs.create 方法處理的匯入項目,而非個別項目。
    • 即使在檢視區塊中,資料欄也稱為 TABLE_NAME
  6. 使用程式碼更新 bootstrap.py 檔案,產生中繼資料匯入檔案並執行連接器。

    """The entrypoint of a pipeline."""
    from typing import Dict
    
    from src.constants import EntryType
    from src import cmd_reader
    from src import secret_manager
    from src import entry_builder
    from src import gcs_uploader
    from src import top_entry_builder
    from src.oracle_connector import OracleConnector
    
    
    FILENAME = "output.jsonl"
    
    
    def write_jsonl(output_file, json_strings):
        """Writes a list of string to the file in JSONL format."""
    
        # For simplicity, dataset is written into the one file. But it is not
        # mandatory, and the order doesn't matter for Import API.
        # The PySpark itself could dump entries into many smaller JSONL files.
        # Due to performance, it's recommended to dump to many smaller files.
        for string in json_strings:
            output_file.write(string + "\n")
    
    
    def process_dataset(
        connector: OracleConnector,
        config: Dict[str, str],
        schema_name: str,
        entry_type: EntryType,
    ):
        """Builds dataset and converts it to jsonl."""
        df_raw = connector.get_dataset(schema_name, entry_type)
        df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
        return df.toJSON().collect()
    
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        config["password"] = secret_manager.get_password(config["password_secret"])
        connector = OracleConnector(config)
    
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write top entries that don't require connection to the database
            file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
            file.writelines("\n")
            file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
    
            # Get schemas, write them and collect to the list
            df_raw_schemas = connector.get_db_schemas()
            schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
            schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
    
            write_jsonl(file, schemas_json)
    
            # Ingest tables and views for every schema in a list
            for schema in schemas:
                print(f"Processing tables for {schema}")
                tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                write_jsonl(file, tables_json)
                print(f"Processing views for {schema}")
                views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                write_jsonl(file, views_json)
    
        gcs_uploader.upload(config, FILENAME)
    

    這個範例會將中繼資料匯入檔案儲存為單一 JSON Lines 檔案。您可以使用 DataFrameWriter 類別等 PySpark 工具,平行輸出批次的 JSON。

    連接器可以依任何順序將項目寫入中繼資料匯入檔案。

  7. 使用程式碼更新 gcs_uploader.py 檔案,將中繼資料匯入檔案上傳至 Cloud Storage 值區。

    """Sends files to GCP storage."""
    from typing import Dict
    from google.cloud import storage
    
    
    def upload(config: Dict[str, str], filename: str):
        """Uploads a file to GCP bucket."""
        client = storage.Client()
        bucket = client.get_bucket(config["output_bucket"])
        folder = config["output_folder"]
    
        blob = bucket.blob(f"{folder}/{filename}")
        blob.upload_from_filename(filename)
    
  8. 建構連接器映像檔。

    如果連接器包含多個檔案,或想使用預設 Docker 映像檔未納入的程式庫,就必須使用自訂容器。Managed Service for Apache Spark 會在 Docker 容器中執行工作負載。建立連接器的自訂 Docker 映像檔,並將映像檔儲存在 Artifact Registry 中。Managed Service for Apache Spark 會從 Artifact Registry 讀取映像檔。

    1. 建立 Dockerfile:

      FROM debian:11-slim
      
      ENV DEBIAN_FRONTEND=noninteractive
      
      RUN apt update && apt install -y procps tini
      RUN apt install -y wget
      
      ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
      RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
      COPY ojdbc11.jar "${SPARK_EXTRA_JARS_DIR}"
      
      ENV CONDA_HOME=/opt/miniconda3
      ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
      ENV PATH=${CONDA_HOME}/bin:${PATH}
      RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py311_24.9.2-0-Linux-x86_64.sh
      
      RUN bash Miniconda3-py310_23.3.1-0-Linux-x86_64.sh -b -p /opt/miniconda3 \
        && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
        && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
        && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
        && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
      
      RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
          && ${CONDA_HOME}/bin/mamba install \
            conda \
            google-cloud-dataproc \
            google-cloud-logging \
            google-cloud-monitoring \
            google-cloud-storage
      
      RUN apt update && apt install -y git
      COPY requirements.txt .
      RUN python -m pip install -r requirements.txt
      
      ENV PYTHONPATH=/opt/python/packages
      RUN mkdir -p "${PYTHONPATH}/src/"
      COPY src/ "${PYTHONPATH}/src/"
      COPY main.py .
      
      RUN groupadd -g 1099 spark
      RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
      USER spark

      使用 Conda 做為套件管理員。Managed Service for Apache Spark 會在執行階段將 pyspark 掛接至容器,因此您不需要在自訂容器映像檔中安裝 PySpark 依附元件。

    2. 建構自訂容器映像檔,並推送至 Artifact Registry。

      #!/bin/bash
      
      IMAGE=oracle-pyspark:0.0.1
      PROJECT=<PROJECT_ID>
      
      
      REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pyspark
      
      docker build -t "${IMAGE}" .
      
      # Tag and push to GCP container registry
      gcloud config set project ${PROJECT}
      gcloud auth configure-docker us-central1-docker.pkg.dev
      docker tag "${IMAGE}" "${REPO_IMAGE}"
      docker push "${REPO_IMAGE}"
      

      由於一個映像檔可以有多個名稱,因此您可以使用 Docker 標記為映像檔指派別名。

  9. 在 Managed Service for Apache Spark 上執行連接器。 如要使用自訂容器映像檔提交 PySpark 批次工作,請執行 gcloud dataproc batches submit pyspark 指令

    gcloud dataproc batches submit pyspark main.py --project=PROJECT \
        --region=REGION --batch=BATCH_ID \
        --container-image=CUSTOM_CONTAINER_IMAGE \
        --service-account=SERVICE_ACCOUNT_NAME \
        --jars=PATH_TO_JAR_FILES \
        --properties=PYSPARK_PROPERTIES \
        -- PIPELINE_ARGUMENTS
    

    注意事項:

    • JAR 檔案是 Spark 的驅動程式。如要從 Oracle、MySQL 或 Postgres 讀取資料,您必須為 Apache Spark 提供特定套件。套件可位於 Cloud Storage 或容器內。如果 JAR 檔案位於容器內,路徑會類似 file:///path/to/file/driver.jar。在本範例中,JAR 檔案的路徑為 /opt/spark/jars/
    • PIPELINE_ARGUMENTS 是連接器的指令列引數。

    這個連接器會從 Oracle 資料庫擷取中繼資料、產生中繼資料匯入檔案,並將中繼資料匯入檔案儲存至 Cloud Storage bucket。

  10. 如要手動將中繼資料匯入檔案中的中繼資料匯入 Knowledge Catalog,請執行中繼資料工作。請使用 metadataJobs.create 方法

    1. 在指令列中新增環境變數,並為 curl 指令建立別名。

      PROJECT_ID=PROJECT
      LOCATION_ID=LOCATION
      DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID
      alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
      
    2. 呼叫 API 方法,並傳遞要匯入的項目類型和切面類型。

      gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF
      {
        "type": "IMPORT",
        "import_spec": {
          "source_storage_uri": "gs://BUCKET/FOLDER/",
          "entry_sync_mode": "FULL",
          "aspect_sync_mode": "INCREMENTAL",
          "scope": {
            "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],
            "entry_types": [
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],
      
            "aspect_types": [
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance",
              "projects/dataplex-types/locations/global/aspectTypes/schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],
            },
          },
        }
      EOF
      )"
      

      schema 切面類型是由 Knowledge Catalog 定義的全域切面類型。

      請注意,呼叫 API 方法時使用的面向類型名稱格式,與在連接器程式碼中使用的格式不同。

    3. 選用:使用 Cloud Logging 查看中繼資料工作的記錄。詳情請參閱「監控 Knowledge Catalog 記錄」。

設定管道自動化調度管理

前幾節說明如何建構範例連接器,以及手動執行連接器。

在正式環境中,您會使用 Workflows 等自動化調度平台,將連接器做為代管連線管道的一部分執行。

  1. 如要使用範例連接器執行代管連線管道,請按照使用工作流程匯入中繼資料的步驟操作。 請採取下列行動:

    • 在與連結器相同的 Google Cloud 位置建立工作流程。
    • 在工作流程定義檔案中,使用下列程式碼更新 submit_pyspark_extract_job 函式,透過您建立的連接器從 Oracle 資料庫擷取資料。

      - submit_pyspark_extract_job:
          call: http.post
          args:
            url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            headers:
              Content-Type: "application/json"
            query:
              batchId: ${WORKFLOW_ID}
            body:
              pysparkBatch:
                mainPythonFileUri: file:///main.py
                jars: file:///opt/spark/jars/ojdbc11.jar
                args:
                  - ${"--host_port=" + args.ORACLE_HOST_PORT}
                  - ${"--user=" + args.ORACLE_USER}
                  - ${"--password=" + args.ORACLE_PASSWORD}
                  - ${"--database=" + args.ORACE_DATABASE}
                  - ${"--project=" + args.TARGET_PROJECT_ID}
                  - ${"--location=" + args.CLOUD_REGION}
                  - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID}
                  - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                  - ${"--folder=" + WORKFLOW_ID}
              runtimeConfig:
                version: "2.0"
                containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"
              environmentConfig:
                executionConfig:
                    serviceAccount: ${args.SERVICE_ACCOUNT}
          result: RESPONSE_MESSAGE
      
    • 在工作流程定義檔案中,使用下列程式碼更新 submit_import_job 函式,匯入項目。這個函式會呼叫 metadataJobs.create API 方法,執行中繼資料匯入工作。

      - submit_import_job:
          call: http.post
          args:
            url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            body:
              type: IMPORT
              import_spec:
                source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                entry_sync_mode: FULL
                aspect_sync_mode: INCREMENTAL
                scope:
                  entry_groups:
                    - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}
                  entry_types:
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"
                  aspect_types:
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"
                    -"projects/dataplex-types/locations/global/aspectTypes/schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"
          result: IMPORT_JOB_RESPONSE
      

      提供您手動呼叫 API 方法時所加入的相同項目類型和切面類型。請注意,每個字串的結尾都沒有逗號。

    • 執行工作流程時,請提供下列執行階段引數:

      {
        "CLOUD_REGION": "us-central1",
        "ORACLE_USER": "system",
        "ORACLE_HOST_PORT": "x.x.x.x:1521",
        "ORACLE_DATABASE": "xe",
        "ADDITIONAL_CONNECTOR_ARGS": [],
      }
      
  2. 選用:使用 Cloud Logging 查看代管連線管道的記錄。記錄檔酬載包含 Managed Service for Apache Spark 批次工作和中繼資料匯入工作的記錄檔連結 (如適用)。詳情請參閱「查看工作流程記錄」。

  3. 選用:如要提升受管理連線管道的安全性、效能和功能,請考慮採取下列做法:

    1. 使用 Secret Manager 儲存第三方資料來源的憑證。
    2. 使用 PySpark 將 JSON Lines 輸出內容平行寫入多個中繼資料匯入檔案。
    3. 使用前置字元將大檔案 (超過 100 MB) 分割成較小的檔案。
    4. 新增更多自訂層面,從來源擷取額外的業務和技術中繼資料。

Oracle 來源的中繼資料資源範例

範例連接器會從 Oracle 資料庫擷取中繼資料,並將中繼資料對應至相應的 Knowledge Catalog 中繼資料資源。

階層注意事項

知識目錄中的每個系統都有根項目,也就是系統的父項項目。通常根項目具有 instance 項目類型。下表顯示 Oracle 系統的項目類型和層面類型範例階層。舉例來說,oracle-database 項目類型會連結至同樣名為 oracle-database 的切面類型。

項目類型 ID 說明 連結的切面類型 ID
oracle-instance 匯入系統的根目錄。 oracle-instance
oracle-database Oracle 資料庫。 oracle-database
oracle-schema 資料庫結構定義。 oracle-schema
oracle-table 表格。

oracle-table

schema

oracle-view 檢視畫面。

oracle-view

schema

schema 切面類型是由 Knowledge Catalog 定義的全域切面類型。其中包含資料表、檢視區塊或其他有資料欄的實體中,各個欄位的說明。oracle-schema 自訂構面類型包含 Oracle 資料庫結構定義的名稱。

匯入項目欄位範例

連接器應遵循下列 Oracle 資源慣例。

  • 完整名稱:Oracle 資源的完整名稱使用下列命名範本。禁止使用的字元會以反引號逸出。

    資源 範本 範例
    執行個體

    SOURCE:ADDRESS

    使用系統的主機和通訊埠號碼或網域名稱。

    oracle:`localhost:1521`oracle:`myinstance.com`
    資料庫 SOURCEADDRESSDATABASE oracle:`localhost:1521`.xe
    結構定義 SOURCE:ADDRESS.DATABASE.SCHEMA oracle:`localhost:1521`.xe.sys
    資料表 SOURCEADDRESS.DATABASE.SCHEMA.TABLE_NAME oracle:`localhost:1521`.xe.sys.orders
    查看 SOURCEADDRESS.DATABASE.SCHEMA.VIEW_NAME oracle:`localhost:1521`.xe.sys.orders_view
  • 項目名稱或項目 ID:Oracle 資源的項目使用下列命名範本。系統會將禁止使用的字元替換成允許使用的字元。資源使用前置字串 projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries

    資源 範本 範例
    執行個體 PREFIX/HOST_PORT projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
    資料庫 PREFIX/HOST_PORT/databases/DATABASE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
    結構定義 PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
    資料表 PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
    查看 PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEW projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
  • 父項項目:如果項目不是系統的根項目,項目可以有父項項目欄位,說明其在階層中的位置。這個欄位應包含父項項目的名稱。建議您產生這個值。

    下表列出 Oracle 資源的父項項目。

    項目 上層項目
    執行個體 "" (空字串)
    資料庫 執行個體名稱
    結構定義 資料庫名稱
    資料表 結構定義名稱
    查看 結構定義名稱
  • 面向對應:面向對應必須至少包含一個描述要匯入實體的面向。以下是 Oracle 資料表的層面地圖範例。

    "example-project.us-central1.oracle-table": {
        "aspect_type": "example-project.us-central1.oracle-table",
        "path": "",
        "data": {}
     },

    您可以在 dataplex-types 專案的 global 位置,找到預先定義的層面類型 (例如 schema),這些類型會定義表格或檢視結構。

  • 層面鍵:層面鍵採用 PROJECT.LOCATION.ASPECT_TYPE 命名格式。下表列出 Oracle 資源的層面鍵範例。

    項目 範例層面鍵
    執行個體 example-project.us-central1.oracle-instance
    資料庫 example-project.us-central1.oracle-database
    結構定義 example-project.us-central1.oracle-schema
    資料表 example-project.us-central1.oracle-table
    查看 example-project.us-central1.oracle-view

後續步驟