Desarrolla un conector personalizado para la importación de metadatos

En este documento, se proporciona una plantilla de referencia para compilar un conector personalizado que permita extraer metadatos de fuentes externas, como MySQL, SQL Server y Oracle. Puedes usar este conector para importar metadatos a Knowledge Catalog (anteriormente, Dataplex Universal Catalog) a través de una canalización de conectividad administrada. Se incluye un ejemplo de conector de Python para Oracle Database Express Edition (XE) como punto de partida. También puedes desarrollar conectores con Java, Scala o R.

Cómo funcionan los conectores

Un conector extrae metadatos de una fuente de datos externa, los transforma al formato de Knowledge Catalog ImportItem y genera archivos de importación de metadatos que Knowledge Catalog puede importar.

El conector forma parte de una canalización de conectividad administrada. Una canalización de conectividad administrada es un flujo de trabajo organizado que se usa para importar metadatos del Catálogo de conocimiento. La canalización de conectividad administrada ejecuta el conector y realiza otras tareas en el flujo de trabajo de importación, como ejecutar un trabajo de importación de metadatos y capturar registros.

La canalización de conectividad administrada ejecuta el conector con un trabajo por lotes de Managed Service for Apache Spark. El servicio administrado para Apache Spark proporciona un entorno de ejecución de Spark sin servidores. Si bien puedes compilar un conector que no use Spark, te recomendamos que lo uses porque puede mejorar el rendimiento de tu conector.

Requisitos del conector

El conector tiene los siguientes requisitos:

  • El conector debe ser una imagen de Artifact Registry que se pueda ejecutar en Managed Service for Apache Spark.
  • El conector debe generar archivos de metadatos en un formato que pueda importar un trabajo de importación de metadatos de Knowledge Catalog (el método de la API de metadataJobs.create). Para conocer los requisitos detallados, consulta Archivo de importación de metadatos.
  • El conector debe aceptar los siguientes argumentos de línea de comandos para recibir información de la canalización:

    Argumento de la línea de comandos Valor que proporciona la canalización
    target_project_id PROJECT_ID
    target_location_id REGION
    target_entry_group_id ENTRY_GROUP_ID
    output_bucket CLOUD_STORAGE_BUCKET_ID
    output_folder FOLDER_ID

    El conector usa estos argumentos para generar metadatos en un grupo de entrada de destino projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID y para escribir en un bucket de Cloud Storage gs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID. Cada ejecución de la canalización crea una carpeta nueva FOLDER_ID en el bucket CLOUD_STORAGE_BUCKET_ID. El conector debe escribir archivos de importación de metadatos en esta carpeta.

Las plantillas de canalización admiten conectores de PySpark. Las plantillas suponen que el controlador (mainPythonFileUri) es un archivo local en la imagen del conector llamado main.py. Puedes modificar las plantillas de canalización para otros casos, como un conector de Spark, un URI de controlador diferente o cualquier otra opción.

A continuación, se explica cómo usar PySpark para crear un elemento de importación en el archivo de importación de metadatos.

"""PySpark schemas for the data."""
entry_source_schema = StructType([
      StructField("display_name", StringType()),
      StructField("source", StringType())])

aspect_schema = MapType(StringType(),
                        StructType([
                            StructField("aspect_type", StringType()),
                            StructField("data", StructType([
                            ]))
                          ])
                        )

entry_schema = StructType([
  StructField("name", StringType()),
  StructField("entry_type", StringType()),
  StructField("fully_qualified_name", StringType()),
  StructField("parent_entry", StringType()),
  StructField("entry_source", entry_source_schema),
  StructField("aspects", aspect_schema)
])

import_item_schema = StructType([
  StructField("entry", entry_schema),
  StructField("aspect_keys", ArrayType(StringType())),
  StructField("update_mask", ArrayType(StringType()))
])

Antes de comenzar

En esta guía, se supone que conoces Python y PySpark.

Revisa la siguiente información:

Haz lo siguiente: Crea todos los recursos en la misma ubicación Google Cloud.

  1. Crea o selecciona un Google Cloud proyecto.

    Roles necesarios para seleccionar o crear un proyecto

    • Selecciona un proyecto: Para seleccionar un proyecto, no se requiere un rol de IAM específico. Puedes seleccionar cualquier proyecto en el que se te haya otorgado un rol.
    • Crear un proyecto: Para crear un proyecto, necesitas el rol de Creador de proyectos (roles/resourcemanager.projectCreator), que contiene el permiso resourcemanager.projects.create. Obtén más información para otorgar roles.
    • Crea un proyecto de Google Cloud :

      gcloud projects create PROJECT_ID

      Reemplaza PROJECT_ID por un nombre para el proyecto Google Cloud que estás creando.

    • Selecciona el proyecto Google Cloud que creaste:

      gcloud config set project PROJECT_ID

      Reemplaza PROJECT_ID por el nombre de tu Google Cloud proyecto.

  2. Verifica que la facturación esté habilitada para tu proyecto de Google Cloud .

  3. Habilita las APIs de Dataplex, Managed Service for Apache Spark, Workflows y Artifact Registry:

    Roles necesarios para habilitar las APIs

    Para habilitar las APIs, necesitas el rol de IAM de administrador de Service Usage (roles/serviceusage.serviceUsageAdmin), que contiene el permiso serviceusage.services.enable. Obtén más información para otorgar roles.

    gcloud services enable dataplex.googleapis.com dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com
  4. Instala Google Cloud CLI.

  5. Si usas un proveedor de identidad externo (IdP), primero debes acceder a la gcloud CLI con tu identidad federada.

  6. Para inicializar gcloud CLI, ejecuta el siguiente comando:

    gcloud init
  7. Otorga roles a tu cuenta de usuario. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM: roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Reemplaza lo siguiente:

    • PROJECT_ID: ID del proyecto
    • USER_IDENTIFIER: Es el identificador de tu cuenta de usuario de . Por ejemplo, myemail@example.com.
    • ROLE: Es el rol de IAM que otorgas a tu cuenta de usuario.
  8. Configura la autenticación:

    1. Asegúrate de tener los roles de IAM de creador de cuentas de servicio (roles/iam.serviceAccountCreator) y administrador de IAM del proyecto (roles/resourcemanager.projectIamAdmin). Obtén más información para otorgar roles.
    2. Crea la cuenta de servicio:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Reemplaza SERVICE_ACCOUNT_NAME por un nombre para la cuenta de servicio.

    3. Otorga el rol de IAM roles/owner a la cuenta de servicio.

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Reemplaza lo siguiente:

      • SERVICE_ACCOUNT_NAME: el nombre de la cuenta de servicio
      • PROJECT_ID: el ID del proyecto en el que creaste la cuenta de servicio
  9. Crea un bucket de Cloud Storage para almacenar los archivos de importación de metadatos.

  10. Crea los siguientes recursos de metadatos en el mismo proyecto.

    Para ver ejemplos de valores, consulta la sección Ejemplos de recursos de metadatos para una fuente de Oracle de este documento.

    1. Crea un grupo de entrada.
    2. Crea tipos de aspectos personalizados para las entradas que deseas importar. Usa la convención de nomenclatura SOURCE-ENTITY_TO_IMPORT.

      Por ejemplo, para una base de datos de Oracle, crea un tipo de aspecto llamado oracle-database.

      De manera opcional, puedes crear tipos de aspectos adicionales para almacenar otra información.

    3. Crea tipos de entrada personalizados para los recursos que deseas importar y asígnale los tipos de aspecto relevantes. Usa la convención de nomenclatura SOURCE-ENTITY_TO_IMPORT.

      Por ejemplo, para una base de datos de Oracle, crea un tipo de entrada llamado oracle-database. Vincúlalo al tipo de aspecto llamado oracle-database.

  11. Asegúrate de que se pueda acceder a tu fuente externa desde tu proyecto de Google Cloud . Para obtener más información, consulta Configuración de red de Managed Service for Apache Spark.

Crea un conector básico de Python

El conector básico de Python de ejemplo crea entradas de nivel superior para una fuente de datos de Oracle con las clases de la biblioteca cliente de Knowledge Catalog. Luego, debes proporcionar los valores para los campos de entrada.

El conector crea un archivo de importación de metadatos con las siguientes entradas:

  • Una entrada instance, con el tipo de entrada projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance. Esta entrada representa un sistema de Oracle Database XE.
  • Una entrada database, que representa una base de datos dentro del sistema Oracle Database XE

Para compilar un conector básico de Python, haz lo siguiente:

  1. Clona el repositorio cloud-dataplex.

  2. Configura un entorno local. Te recomendamos que uses un entorno virtual.

    mkdir venv
    python -m venv venv/
    source venv/bin/activate
    

    Usa las versiones activas o en mantenimiento de Python. Se admiten las versiones 3.7 y posteriores de Python.

  3. Crea un proyecto de Python.

  4. Requisitos de instalación:

    pip install -r requirements.txt
    

    Se instalan los siguientes requisitos:

    google-cloud-dataplex==2.2.2
    google-cloud-storage
    google-cloud-secret-manager
    
  5. Agrega un archivo de canalización main.py en la raíz del proyecto.

    from src import bootstrap
    
    
    if __name__ == '__main__':
        bootstrap.run()
    

    Cuando implementas tu código en el servicio administrado para Apache Spark, el archivo main.py sirve como punto de entrada para la ejecución. Te recomendamos que minimices la cantidad de información que se almacena en el archivo main.py. Usa este archivo para llamar a funciones y clases que se definen dentro de tu conector, como la clase src/bootstap.py.

  6. Crea una carpeta src para almacenar la mayor parte de la lógica de tu conector.

  7. Actualiza el archivo src/cmd_reader.py con una clase de Python para aceptar argumentos de la línea de comandos. Puedes usar el módulo argeparse para hacerlo.

    """Command line reader."""
    import argparse
    
    
    def read_args():
        """Reads arguments from the command line."""
        parser = argparse.ArgumentParser()
    
        # Dataplex arguments
        parser.add_argument("--target_project_id", type=str, required=True,
            help="The name of the target Google Cloud project to import the metadata into.")
        parser.add_argument("--target_location_id", type=str, required=True,
            help="The target Google Cloud location where the metadata will be imported into.")
        parser.add_argument("--target_entry_group_id", type=str, required=True,
            help="The ID of the entry group to import metadata into. "
                 "The metadata will be imported into entry group with the following"
                 "full resource name: projects/${target_project_id}/"
                 "locations/${target_location_id}/entryGroups/${target_entry_group_id}.")
    
        # Oracle arguments
        parser.add_argument("--host_port", type=str, required=True,
            help="Oracle host and port number separated by the colon (:).")
        parser.add_argument("--user", type=str, required=True, help="Oracle User.")
        parser.add_argument("--password-secret", type=str, required=True,
            help="Secret resource name in the Secret Manager for the Oracle password.")
        parser.add_argument("--database", type=str, required=True,
            help="Source Oracle database.")
    
        # Google Cloud Storage arguments
        # It is assumed that the bucket is in the same region as the entry group
        parser.add_argument("--output_bucket", type=str, required=True,
            help="The Cloud Storage bucket to write the generated metadata import file.")
        parser.add_argument("--output_folder", type=str, required=True,
            help="A folder in the Cloud Storage bucket, to write the generated metadata import files.")
    
        return vars(parser.parse_known_args()[0])
    

    En los entornos de producción, te recomendamos que almacenes la contraseña en Secret Manager.

  8. Actualiza el archivo src/constants.py con código para crear constantes.

    """Constants that are used in the different files."""
    import enum
    
    SOURCE_TYPE = "oracle"
    
    # Symbols for replacement
    FORBIDDEN = "#"
    ALLOWED = "!"
    
    
    class EntryType(enum.Enum):
        """Types of Oracle entries."""
        INSTANCE: str = "projects/{project}/locations/{location}/entryTypes/oracle-instance"
        DATABASE: str = "projects/{project}/locations/{location}/entryTypes/oracle-database"
        DB_SCHEMA: str = "projects/{project}/locations/{location}/entryTypes/oracle-schema"
        TABLE: str = "projects/{project}/locations/{location}/entryTypes/oracle-table"
        VIEW: str = "projects/{project}/locations/{location}/entryTypes/oracle-view"
    
  9. Actualiza el archivo src/name_builder.py con métodos para compilar los recursos de metadatos que deseas que el conector cree para tus recursos de Oracle. Usa las convenciones que se describen en la sección Recursos de metadatos de ejemplo para una fuente de Oracle de este documento.

    """Builds Dataplex hierarchy identifiers."""
    from typing import Dict
    from src.constants import EntryType, SOURCE_TYPE
    
    
    # Oracle cluster users start with C## prefix, but Dataplex doesn't accept #.
    # In that case in names it is changed to C!!, and escaped with backticks in FQNs
    FORBIDDEN_SYMBOL = "#"
    ALLOWED_SYMBOL = "!"
    
    
    def create_fqn(config: Dict[str, str], entry_type: EntryType,
                   schema_name: str = "", table_name: str = ""):
        """Creates a fully qualified name or Dataplex v1 hierarchy name."""
        if FORBIDDEN_SYMBOL in schema_name:
            schema_name = f"`{schema_name}`"
    
        if entry_type == EntryType.INSTANCE:
            # Requires backticks to escape column
            return f"{SOURCE_TYPE}:`{config['host_port']}`"
        if entry_type == EntryType.DATABASE:
            instance = create_fqn(config, EntryType.INSTANCE)
            return f"{instance}.{config['database']}"
        if entry_type == EntryType.DB_SCHEMA:
            database = create_fqn(config, EntryType.DATABASE)
            return f"{database}.{schema_name}"
        if entry_type in [EntryType.TABLE, EntryType.VIEW]:
            database = create_fqn(config, EntryType.DATABASE)
            return f"{database}.{schema_name}.{table_name}"
        return ""
    
    
    def create_name(config: Dict[str, str], entry_type: EntryType,
                    schema_name: str = "", table_name: str = ""):
        """Creates a Dataplex v2 hierarchy name."""
        if FORBIDDEN_SYMBOL in schema_name:
            schema_name = schema_name.replace(FORBIDDEN_SYMBOL, ALLOWED_SYMBOL)
        if entry_type == EntryType.INSTANCE:
            name_prefix = (
                f"projects/{config['target_project_id']}/"
                f"locations/{config['target_location_id']}/"
                f"entryGroups/{config['target_entry_group_id']}/"
                f"entries/"
            )
            return name_prefix + config["host_port"].replace(":", "@")
        if entry_type == EntryType.DATABASE:
            instance = create_name(config, EntryType.INSTANCE)
            return f"{instance}/databases/{config['database']}"
        if entry_type == EntryType.DB_SCHEMA:
            database = create_name(config, EntryType.DATABASE)
            return f"{database}/database_schemas/{schema_name}"
        if entry_type == EntryType.TABLE:
            db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
            return f"{db_schema}/tables/{table_name}"
        if entry_type == EntryType.VIEW:
            db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
            return f"{db_schema}/views/{table_name}"
        return ""
    
    
    def create_parent_name(config: Dict[str, str], entry_type: EntryType,
                           parent_name: str = ""):
        """Generates a Dataplex v2 name of the parent."""
        if entry_type == EntryType.DATABASE:
            return create_name(config, EntryType.INSTANCE)
        if entry_type == EntryType.DB_SCHEMA:
            return create_name(config, EntryType.DATABASE)
        if entry_type == EntryType.TABLE:
            return create_name(config, EntryType.DB_SCHEMA, parent_name)
        return ""
    
    
    def create_entry_aspect_name(config: Dict[str, str], entry_type: EntryType):
        """Generates an entry aspect name."""
        last_segment = entry_type.value.split("/")[-1]
        return f"{config['target_project_id']}.{config['target_location_id']}.{last_segment}"
    

    Dado que el archivo name_builder.py se usa tanto para el código principal de Python como para el código principal de PySpark, te recomendamos que escribas los métodos como funciones puras, en lugar de como miembros de una clase.

  10. Actualiza el archivo src/top_entry_builder.py con código para completar las entradas de nivel superior con datos.

    """Non-Spark approach for building the entries."""
    import dataclasses
    import json
    from typing import List, Dict
    
    import proto
    from google.cloud import dataplex_v1
    
    from src.constants import EntryType
    from src import name_builder as nb
    
    
    @dataclasses.dataclass(slots=True)
    class ImportItem:
        """A template class for Import API."""
    
        entry: dataplex_v1.Entry = dataclasses.field(default_factory=dataplex_v1.Entry)
        aspect_keys: List[str] = dataclasses.field(default_factory=list)
        update_mask: List[str] = dataclasses.field(default_factory=list)
    
    
    def _dict_factory(data: object):
        """Factory function required for converting Entry dataclass to dict."""
    
        def convert(obj: object):
            if isinstance(obj, proto.Message):
                return proto.Message.to_dict(obj)
            return obj
    
        return dict((k, convert(v)) for k, v in data)
    
    
    def _create_entry(config: Dict[str, str], entry_type: EntryType):
        """Creates an entry based on a Dataplex library."""
        entry = dataplex_v1.Entry()
        entry.name = nb.create_name(config, entry_type)
        entry.entry_type = entry_type.value.format(
            project=config["target_project_id"], location=config["target_location_id"]
        )
        entry.fully_qualified_name = nb.create_fqn(config, entry_type)
        entry.parent_entry = nb.create_parent_name(config, entry_type)
    
        aspect_key = nb.create_entry_aspect_name(config, entry_type)
    
        # Add mandatory aspect
        entry_aspect = dataplex_v1.Aspect()
        entry_aspect.aspect_type = aspect_key
        entry_aspect.data = {}
        entry.aspects[aspect_key] = entry_aspect
    
        return entry
    
    
    def _entry_to_import_item(entry: dataplex_v1.Entry):
        """Packs entry to import item, accepted by the API,"""
        import_item = ImportItem()
        import_item.entry = entry
        import_item.aspect_keys = list(entry.aspects.keys())
        import_item.update_mask = "aspects"
    
        return import_item
    
    
    def create(config, entry_type: EntryType):
        """Creates an entry, packs it to Import Item and converts to json."""
        import_item = _entry_to_import_item(_create_entry(config, entry_type))
        return json.dumps(dataclasses.asdict(import_item, dict_factory=_dict_factory))
    
  11. Actualiza el archivo src/bootstrap.py con código para generar el archivo de importación de metadatos y ejecutar el conector.

    """The entrypoint of a pipeline."""
    from typing import Dict
    
    from src.constants import EntryType
    from src import cmd_reader
    from src import secret_manager
    from src import entry_builder
    from src import gcs_uploader
    from src import top_entry_builder
    from src.oracle_connector import OracleConnector
    
    
    FILENAME = "output.jsonl"
    
    
    def write_jsonl(output_file, json_strings):
        """Writes a list of string to the file in JSONL format."""
    
        # For simplicity, dataset is written into the one file. But it is not
        # mandatory, and the order doesn't matter for Import API.
        # The PySpark itself could dump entries into many smaller JSONL files.
        # Due to performance, it's recommended to dump to many smaller files.
        for string in json_strings:
            output_file.write(string + "\n")
    
    
    def process_dataset(
        connector: OracleConnector,
        config: Dict[str, str],
        schema_name: str,
        entry_type: EntryType,
    ):
        """Builds dataset and converts it to jsonl."""
        df_raw = connector.get_dataset(schema_name, entry_type)
        df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
        return df.toJSON().collect()
    
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        config["password"] = secret_manager.get_password(config["password_secret"])
        connector = OracleConnector(config)
    
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write top entries that don't require connection to the database
            file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
            file.writelines("\n")
            file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
    
            # Get schemas, write them and collect to the list
            df_raw_schemas = connector.get_db_schemas()
            schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
            schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
    
            write_jsonl(file, schemas_json)
    
            # Ingest tables and views for every schema in a list
            for schema in schemas:
                print(f"Processing tables for {schema}")
                tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                write_jsonl(file, tables_json)
                print(f"Processing views for {schema}")
                views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                write_jsonl(file, views_json)
    
        gcs_uploader.upload(config, FILENAME)
    
  12. Ejecuta el código de manera local.

    Se devuelve un archivo de importación de metadatos llamado output.jsonl. El archivo tiene dos líneas, cada una de las cuales representa un elemento de importación. La canalización de conectividad administrada lee este archivo cuando ejecuta el trabajo de importación de metadatos.

  13. Opcional: Extiende el ejemplo anterior para usar las clases de la biblioteca cliente de Knowledge Catalog y crear elementos de importación para tablas, esquemas y vistas. También puedes ejecutar el ejemplo de Python en Managed Service para Apache Spark.

    Te recomendamos que crees un conector que use Spark (y que se ejecute en Managed Service para Apache Spark), ya que puede mejorar el rendimiento de tu conector.

Crea un conector de PySpark

Este ejemplo se basa en la API de DataFrame de PySpark. Puedes instalar PySpark SQL y ejecutarlo de forma local antes de ejecutarlo en Managed Service para Apache Spark. Si instalas y ejecutas PySpark de forma local, instala la biblioteca de PySpark con pip, pero no es necesario que instales un clúster de Spark local.

Por motivos de rendimiento, este ejemplo no usa clases predefinidas de la biblioteca de PySpark. En cambio, el ejemplo crea DataFrames, los convierte en entradas JSON y, luego, escribe el resultado en un archivo de importación de metadatos en formato de líneas JSON que se puede importar a Knowledge Catalog.

Para compilar un conector con PySpark, haz lo siguiente:

  1. Clona el repositorio cloud-dataplex.

  2. Instala PySpark:

    pip install pyspark
    
  3. Requisitos de instalación:

    pip install -r requirements.txt
    

    Se instalan los siguientes requisitos:

    google-cloud-dataplex==2.2.2
    google-cloud-storage
    google-cloud-secret-manager
    
  4. Actualiza el archivo oracle_connector.py con código para leer datos de una fuente de datos de Oracle y devolver DataFrames.

    """Reads Oracle using PySpark."""
    from typing import Dict
    from pyspark.sql import SparkSession, DataFrame
    
    from src.constants import EntryType
    
    
    SPARK_JAR_PATH = "/opt/spark/jars/ojdbc11.jar"
    
    
    class OracleConnector:
        """Reads data from Oracle and returns Spark Dataframes."""
    
        def __init__(self, config: Dict[str, str]):
            # PySpark entrypoint
            self._spark = SparkSession.builder.appName("OracleIngestor") \
                .config("spark.jars", SPARK_JAR_PATH) \
                .getOrCreate()
    
            self._config = config
            self._url = f"jdbc:oracle:thin:@{config['host_port']}:{config['database']}"
    
        def _execute(self, query: str) -> DataFrame:
            """A generic method to execute any query."""
            return self._spark.read.format("jdbc") \
                .option("driver", "oracle.jdbc.OracleDriver") \
                .option("url", self._url) \
                .option("query", query) \
                .option("user", self._config["user"]) \
                .option("password", self._config["password"]) \
                .load()
    
        def get_db_schemas(self) -> DataFrame:
            """In Oracle, schemas are usernames."""
            query = "SELECT username FROM dba_users"
            return self._execute(query)
    
        def _get_columns(self, schema_name: str, object_type: str) -> str:
            """Gets a list of columns in tables or views in a batch."""
            # Every line here is a column that belongs to the table or to the view.
            # This SQL gets data from ALL the tables in a given schema.
            return (f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "
                    f"col.DATA_TYPE, col.NULLABLE "
                    f"FROM all_tab_columns col "
                    f"INNER JOIN DBA_OBJECTS tab "
                    f"ON tab.OBJECT_NAME = col.TABLE_NAME "
                    f"WHERE tab.OWNER = '{schema_name}' "
                    f"AND tab.OBJECT_TYPE = '{object_type}'")
    
        def get_dataset(self, schema_name: str, entry_type: EntryType):
            """Gets data for a table or a view."""
            # Dataset means that these entities can contain end user data.
            short_type = entry_type.name  # table or view, or the title of enum value
            query = self._get_columns(schema_name, short_type)
            return self._execute(query)
    

    Agrega consultas de SQL para devolver los metadatos que deseas importar. Las búsquedas deben devolver la siguiente información:

    • Esquemas de bases de datos
    • Tablas que pertenecen a estos esquemas
    • Columnas que pertenecen a estas tablas, incluidos el nombre y el tipo de datos de la columna, y si la columna admite valores nulos o es obligatoria

    Todas las columnas de todas las tablas y vistas se almacenan en la misma tabla del sistema. Puedes seleccionar columnas con el método _get_columns. Según los parámetros que proporciones, puedes seleccionar columnas para las tablas o para las vistas por separado.

    Ten en cuenta lo siguiente:

    • En Oracle, un esquema de base de datos es propiedad de un usuario de la base de datos y tiene el mismo nombre que ese usuario.
    • Los objetos de esquema son estructuras lógicas que crean los usuarios. Los objetos, como las tablas o los índices, pueden contener datos, y los objetos, como las vistas o los sinónimos, constan solo de una definición.
    • El archivo ojdbc11.jar contiene el controlador JDBC de Oracle.
  5. Actualiza el archivo src/entry_builder.py con métodos compartidos para aplicar transformaciones de Spark.

    """Creates entries with PySpark."""
    import pyspark.sql.functions as F
    from pyspark.sql.types import StringType
    
    from src.constants import EntryType, SOURCE_TYPE
    from src import name_builder as nb
    
    
    @F.udf(returnType=StringType())
    def choose_metadata_type_udf(data_type: str):
        """Choose the metadata type based on Oracle native type."""
        if data_type.startswith("NUMBER") or data_type in ["FLOAT", "LONG"]:
            return "NUMBER"
        if data_type.startswith("VARCHAR") or data_type.startswith("NVARCHAR2"):
            return "STRING"
        if data_type == "DATE":
            return "DATETIME"
        return "OTHER"
    
    
    def create_entry_source(column):
        """Create Entry Source segment."""
        return F.named_struct(F.lit("display_name"),
                              column,
                              F.lit("system"),
                              F.lit(SOURCE_TYPE))
    
    
    def create_entry_aspect(entry_aspect_name):
        """Create aspect with general information (usually it is empty)."""
        return F.create_map(
            F.lit(entry_aspect_name),
            F.named_struct(
                F.lit("aspect_type"),
                F.lit(entry_aspect_name),
                F.lit("data"),
                F.create_map()
                )
            )
    
    
    def convert_to_import_items(df, aspect_keys):
        """Convert entries to import items."""
        entry_columns = ["name", "fully_qualified_name", "parent_entry",
                         "entry_source", "aspects", "entry_type"]
    
        # Puts entry to "entry" key, a list of keys from aspects in "aspects_keys"
        # and "aspects" string in "update_mask"
        return df.withColumn("entry", F.struct(entry_columns)) \
          .withColumn("aspect_keys", F.array([F.lit(key) for key in aspect_keys])) \
          .withColumn("update_mask", F.array(F.lit("aspects"))) \
          .drop(*entry_columns)
    
    
    def build_schemas(config, df_raw_schemas):
        """Create a dataframe with database schemas from the list of usernames.
        Args:
            df_raw_schemas - a dataframe with only one column called USERNAME
        Returns:
            A dataframe with Dataplex-readable schemas.
        """
        entry_type = EntryType.DB_SCHEMA
        entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
    
        # For schema, parent name is the name of the database
        parent_name =  nb.create_parent_name(config, entry_type)
    
        # Create user-defined function.
        create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x),
                                StringType())
        create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x),
                               StringType())
    
        # Fills the missed project and location into the entry type string
        full_entry_type = entry_type.value.format(
            project=config["target_project_id"],
            location=config["target_location_id"])
    
        # Converts a list of schema names to the Dataplex-compatible form
        column = F.col("USERNAME")
        df = df_raw_schemas.withColumn("name", create_name_udf(column)) \
          .withColumn("fully_qualified_name", create_fqn_udf(column)) \
          .withColumn("parent_entry", F.lit(parent_name)) \
          .withColumn("entry_type", F.lit(full_entry_type)) \
          .withColumn("entry_source", create_entry_source(column)) \
          .withColumn("aspects", create_entry_aspect(entry_aspect_name)) \
        .drop(column)
    
        df = convert_to_import_items(df, [entry_aspect_name])
        return df
    
    
    def build_dataset(config, df_raw, db_schema, entry_type):
        """Build table entries from a flat list of columns.
        Args:
            df_raw - a plain dataframe with TABLE_NAME, COLUMN_NAME, DATA_TYPE,
                     and NULLABLE columns
            db_schema - parent database schema
            entry_type - entry type: table or view
        Returns:
            A dataframe with Dataplex-readable data of tables of views.
        """
        schema_key = "dataplex-types.global.schema"
    
        # The transformation below does the following
        # 1. Alters NULLABLE content from Y/N to NULLABLE/REQUIRED
        # 2. Renames NULLABLE to mode
        # 3. Renames DATA_TYPE to dataType
        # 4. Creates metadataType column based on dataType column
        # 5. Renames COLUMN_NAME to name
        df = df_raw \
          .withColumn("mode", F.when(F.col("NULLABLE") == 'Y', "NULLABLE").otherwise("REQUIRED")) \
          .drop("NULLABLE") \
          .withColumnRenamed("DATA_TYPE", "dataType") \
          .withColumn("metadataType", choose_metadata_type_udf("dataType")) \
          .withColumnRenamed("COLUMN_NAME", "name")
    
        # The transformation below aggregate fields, denormalizing the table
        # TABLE_NAME becomes top-level filed, and the rest is put into
        # the array type called "fields"
        aspect_columns = ["name", "mode", "dataType", "metadataType"]
        df = df.withColumn("columns", F.struct(aspect_columns))\
          .groupby('TABLE_NAME') \
          .agg(F.collect_list("columns").alias("fields"))
    
        # Create nested structured called aspects.
        # Fields are becoming a part of a `schema` struct
        # There is also an entry_aspect that is repeats entry_type as aspect_type
        entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
        df = df.withColumn("schema",
                           F.create_map(F.lit(schema_key),
                                        F.named_struct(
                                            F.lit("aspect_type"),
                                            F.lit(schema_key),
                                            F.lit("data"),
                                            F.create_map(F.lit("fields"),
                                                         F.col("fields")))
                                        )
                           )\
          .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
        .drop("fields")
    
        # Merge separate aspect columns into the one map called 'aspects'
        df = df.select(F.col("TABLE_NAME"),
                       F.map_concat("schema", "entry_aspect").alias("aspects"))
    
        # Define user-defined functions to fill the general information
        # and hierarchy names
        create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
                                                         db_schema, x),
                                StringType())
    
        create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
                                                       db_schema, x), StringType())
    
        parent_name = nb.create_parent_name(config, entry_type, db_schema)
        full_entry_type = entry_type.value.format(
            project=config["target_project_id"],
            location=config["target_location_id"])
    
        # Fill the top-level fields
        column = F.col("TABLE_NAME")
        df = df.withColumn("name", create_name_udf(column)) \
          .withColumn("fully_qualified_name", create_fqn_udf(column)) \
          .withColumn("entry_type", F.lit(full_entry_type)) \
          .withColumn("parent_entry", F.lit(parent_name)) \
          .withColumn("entry_source", create_entry_source(column)) \
        .drop(column)
    
        df = convert_to_import_items(df, [schema_key, entry_aspect_name])
        return df
    

    Ten en cuenta lo siguiente:

    • Los métodos compilan los recursos de metadatos que el conector crea para tus recursos de Oracle. Usa las convenciones que se describen en la sección Recursos de metadatos de ejemplo para una fuente de Oracle de este documento.
    • El método convert_to_import_items se aplica a esquemas, tablas y vistas. Asegúrate de que el conector genere uno o más elementos de importación que el método metadataJobs.create pueda procesar, no entradas individuales.
    • Incluso en una vista, la columna se llama TABLE_NAME.
  6. Actualiza el archivo bootstrap.py con código para generar el archivo de importación de metadatos y ejecutar el conector.

    """The entrypoint of a pipeline."""
    from typing import Dict
    
    from src.constants import EntryType
    from src import cmd_reader
    from src import secret_manager
    from src import entry_builder
    from src import gcs_uploader
    from src import top_entry_builder
    from src.oracle_connector import OracleConnector
    
    
    FILENAME = "output.jsonl"
    
    
    def write_jsonl(output_file, json_strings):
        """Writes a list of string to the file in JSONL format."""
    
        # For simplicity, dataset is written into the one file. But it is not
        # mandatory, and the order doesn't matter for Import API.
        # The PySpark itself could dump entries into many smaller JSONL files.
        # Due to performance, it's recommended to dump to many smaller files.
        for string in json_strings:
            output_file.write(string + "\n")
    
    
    def process_dataset(
        connector: OracleConnector,
        config: Dict[str, str],
        schema_name: str,
        entry_type: EntryType,
    ):
        """Builds dataset and converts it to jsonl."""
        df_raw = connector.get_dataset(schema_name, entry_type)
        df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
        return df.toJSON().collect()
    
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        config["password"] = secret_manager.get_password(config["password_secret"])
        connector = OracleConnector(config)
    
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write top entries that don't require connection to the database
            file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
            file.writelines("\n")
            file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
    
            # Get schemas, write them and collect to the list
            df_raw_schemas = connector.get_db_schemas()
            schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
            schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
    
            write_jsonl(file, schemas_json)
    
            # Ingest tables and views for every schema in a list
            for schema in schemas:
                print(f"Processing tables for {schema}")
                tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                write_jsonl(file, tables_json)
                print(f"Processing views for {schema}")
                views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                write_jsonl(file, views_json)
    
        gcs_uploader.upload(config, FILENAME)
    

    En este ejemplo, el archivo de importación de metadatos se guarda como un solo archivo de líneas JSON. Puedes usar herramientas de PySpark, como la clase DataFrameWriter, para generar lotes de JSON en paralelo.

    El conector puede escribir entradas en el archivo de importación de metadatos en cualquier orden.

  7. Actualiza el archivo gcs_uploader.py con código para subir el archivo de importación de metadatos a un bucket de Cloud Storage.

    """Sends files to GCP storage."""
    from typing import Dict
    from google.cloud import storage
    
    
    def upload(config: Dict[str, str], filename: str):
        """Uploads a file to GCP bucket."""
        client = storage.Client()
        bucket = client.get_bucket(config["output_bucket"])
        folder = config["output_folder"]
    
        blob = bucket.blob(f"{folder}/{filename}")
        blob.upload_from_filename(filename)
    
  8. Compila la imagen del conector.

    Si tu conector contiene varios archivos o si quieres usar bibliotecas que no se incluyen en la imagen de Docker predeterminada, debes usar un contenedor personalizado. Managed Service for Apache Spark ejecuta cargas de trabajo dentro de contenedores de Docker. Crea una imagen de Docker personalizada del conector y almacénala en Artifact Registry. Managed Service for Apache Spark lee la imagen de Artifact Registry.

    1. Crea un Dockerfile:

      FROM debian:11-slim
      
      ENV DEBIAN_FRONTEND=noninteractive
      
      RUN apt update && apt install -y procps tini
      RUN apt install -y wget
      
      ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
      RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
      COPY ojdbc11.jar "${SPARK_EXTRA_JARS_DIR}"
      
      ENV CONDA_HOME=/opt/miniconda3
      ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
      ENV PATH=${CONDA_HOME}/bin:${PATH}
      RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py311_24.9.2-0-Linux-x86_64.sh
      
      RUN bash Miniconda3-py310_23.3.1-0-Linux-x86_64.sh -b -p /opt/miniconda3 \
        && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
        && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
        && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
        && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
      
      RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
          && ${CONDA_HOME}/bin/mamba install \
            conda \
            google-cloud-dataproc \
            google-cloud-logging \
            google-cloud-monitoring \
            google-cloud-storage
      
      RUN apt update && apt install -y git
      COPY requirements.txt .
      RUN python -m pip install -r requirements.txt
      
      ENV PYTHONPATH=/opt/python/packages
      RUN mkdir -p "${PYTHONPATH}/src/"
      COPY src/ "${PYTHONPATH}/src/"
      COPY main.py .
      
      RUN groupadd -g 1099 spark
      RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
      USER spark

      Usa Conda como administrador de paquetes. Managed Service for Apache Spark activa pyspark en el contenedor en el tiempo de ejecución, por lo que no es necesario que instales dependencias de PySpark en tu imagen de contenedor personalizada.

    2. Compila la imagen de contenedor personalizada y envíala a Artifact Registry.

      #!/bin/bash
      
      IMAGE=oracle-pyspark:0.0.1
      PROJECT=<PROJECT_ID>
      
      
      REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pyspark
      
      docker build -t "${IMAGE}" .
      
      # Tag and push to GCP container registry
      gcloud config set project ${PROJECT}
      gcloud auth configure-docker us-central1-docker.pkg.dev
      docker tag "${IMAGE}" "${REPO_IMAGE}"
      docker push "${REPO_IMAGE}"
      

      Como una imagen puede tener varios nombres, puedes usar la etiqueta de Docker para asignarle un alias.

  9. Ejecuta el conector en el servicio administrado para Apache Spark. Para enviar un trabajo por lotes de PySpark con la imagen de contenedor personalizada, ejecuta el comando gcloud dataproc batches submit pyspark.

    gcloud dataproc batches submit pyspark main.py --project=PROJECT \
        --region=REGION --batch=BATCH_ID \
        --container-image=CUSTOM_CONTAINER_IMAGE \
        --service-account=SERVICE_ACCOUNT_NAME \
        --jars=PATH_TO_JAR_FILES \
        --properties=PYSPARK_PROPERTIES \
        -- PIPELINE_ARGUMENTS
    

    Ten en cuenta lo siguiente:

    • Los archivos JAR son controladores para Spark. Para leer datos de Oracle, MySQL o Postgres, debes proporcionar un paquete específico de Apache Spark. El paquete puede ubicarse en Cloud Storage o dentro del contenedor. Si el archivo JAR está dentro del contenedor, la ruta de acceso es similar a file:///path/to/file/driver.jar. En este ejemplo, la ruta de acceso al archivo JAR es /opt/spark/jars/.
    • PIPELINE_ARGUMENTS son los argumentos de la línea de comandos para el conector.

    El conector extrae metadatos de la base de datos de Oracle, genera un archivo de importación de metadatos y guarda este archivo en un bucket de Cloud Storage.

  10. Para importar manualmente los metadatos del archivo de importación de metadatos a Knowledge Catalog, ejecuta un trabajo de metadatos. Usa el método metadataJobs.create.

    1. En la línea de comandos, agrega variables de entorno y crea un alias para el comando curl.

      PROJECT_ID=PROJECT
      LOCATION_ID=LOCATION
      DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID
      alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
      
    2. Llama al método de la API y pasa los tipos de entrada y los tipos de aspecto que deseas importar.

      gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF
      {
        "type": "IMPORT",
        "import_spec": {
          "source_storage_uri": "gs://BUCKET/FOLDER/",
          "entry_sync_mode": "FULL",
          "aspect_sync_mode": "INCREMENTAL",
          "scope": {
            "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],
            "entry_types": [
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],
      
            "aspect_types": [
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance",
              "projects/dataplex-types/locations/global/aspectTypes/schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],
            },
          },
        }
      EOF
      )"
      

      El tipo de aspecto schema es un tipo de aspecto global que define Knowledge Catalog.

      Ten en cuenta que el formato que usas para los nombres de los tipos de aspectos cuando llamas al método de la API es diferente del formato que usas en el código del conector.

    3. Opcional: Usa Cloud Logging para ver los registros del trabajo de metadatos. Para obtener más información, consulta Supervisa los registros del Catálogo de conocimiento.

Configura la organización de canalizaciones

En las secciones anteriores, se mostró cómo compilar un conector de ejemplo y ejecutarlo de forma manual.

En un entorno de producción, ejecutas el conector como parte de una canalización de conectividad administrada, con una plataforma de organización como Workflows.

  1. Para ejecutar una canalización de conectividad administrada con el conector de ejemplo, sigue los pasos para importar metadatos con Workflows. Haz lo siguiente:

    • Crea el flujo de trabajo en la misma Google Cloud ubicación que el conector.
    • En el archivo de definición del flujo de trabajo, actualiza la función submit_pyspark_extract_job con el siguiente código para extraer datos de la base de datos de Oracle con el conector que creaste.

      - submit_pyspark_extract_job:
          call: http.post
          args:
            url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            headers:
              Content-Type: "application/json"
            query:
              batchId: ${WORKFLOW_ID}
            body:
              pysparkBatch:
                mainPythonFileUri: file:///main.py
                jars: file:///opt/spark/jars/ojdbc11.jar
                args:
                  - ${"--host_port=" + args.ORACLE_HOST_PORT}
                  - ${"--user=" + args.ORACLE_USER}
                  - ${"--password=" + args.ORACLE_PASSWORD}
                  - ${"--database=" + args.ORACE_DATABASE}
                  - ${"--project=" + args.TARGET_PROJECT_ID}
                  - ${"--location=" + args.CLOUD_REGION}
                  - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID}
                  - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                  - ${"--folder=" + WORKFLOW_ID}
              runtimeConfig:
                version: "2.0"
                containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"
              environmentConfig:
                executionConfig:
                    serviceAccount: ${args.SERVICE_ACCOUNT}
          result: RESPONSE_MESSAGE
      
    • En el archivo de definición del flujo de trabajo, actualiza la función submit_import_job con el siguiente código para importar las entradas. La función llama al método de la API metadataJobs.create para ejecutar un trabajo de importación de metadatos.

      - submit_import_job:
          call: http.post
          args:
            url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            body:
              type: IMPORT
              import_spec:
                source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                entry_sync_mode: FULL
                aspect_sync_mode: INCREMENTAL
                scope:
                  entry_groups:
                    - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}
                  entry_types:
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"
                  aspect_types:
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"
                    -"projects/dataplex-types/locations/global/aspectTypes/schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"
          result: IMPORT_JOB_RESPONSE
      

      Proporciona los mismos tipos de entrada y tipos de aspectos que incluiste cuando llamaste al método de la API de forma manual. Ten en cuenta que no hay una coma al final de cada cadena.

    • Cuando ejecutes el flujo de trabajo, proporciona los siguientes argumentos de tiempo de ejecución:

      {
        "CLOUD_REGION": "us-central1",
        "ORACLE_USER": "system",
        "ORACLE_HOST_PORT": "x.x.x.x:1521",
        "ORACLE_DATABASE": "xe",
        "ADDITIONAL_CONNECTOR_ARGS": [],
      }
      
  2. Opcional: Usa Cloud Logging para ver los registros de la canalización de conectividad administrada. La carga útil del registro incluye un vínculo a los registros del trabajo por lotes de Managed Service for Apache Spark y el trabajo de importación de metadatos, según corresponda. Para obtener más información, consulta Cómo ver los registros de flujo de trabajo.

  3. Opcional: Para mejorar la seguridad, el rendimiento y la funcionalidad de tu canalización de conectividad administrada, considera hacer lo siguiente:

    1. Usa Secret Manager para almacenar las credenciales de tu fuente de datos externa.
    2. Usa PySpark para escribir el resultado de líneas JSON en varios archivos de importación de metadatos en paralelo.
    3. Usa un prefijo para dividir los archivos grandes (más de 100 MB) en archivos más pequeños.
    4. Agrega más aspectos personalizados que capturen metadatos técnicos y empresariales adicionales de tu fuente.

Ejemplo de recursos de metadatos para una fuente de Oracle

El conector de ejemplo extrae metadatos de una base de datos de Oracle y los asigna a los recursos de metadatos correspondientes del Catálogo de conocimiento.

Consideraciones sobre la jerarquía

Todos los sistemas de Knowledge Catalog tienen una entrada raíz que es la entrada principal del sistema. Por lo general, la entrada raíz tiene un tipo de entrada instance. En la siguiente tabla, se muestra la jerarquía de ejemplo de los tipos de entrada y los tipos de aspectos para un sistema de Oracle. Por ejemplo, el tipo de entrada oracle-database está vinculado a un tipo de aspecto que también se llama oracle-database.

ID del tipo de entrada Descripción ID del tipo de aspecto vinculado
oracle-instance Es la raíz del sistema importado. oracle-instance
oracle-database Es la base de datos de Oracle. oracle-database
oracle-schema Es el esquema de la base de datos. oracle-schema
oracle-table Una mesa

oracle-table

schema

oracle-view Una vista

oracle-view

schema

El tipo de aspecto schema es un tipo de aspecto global que define Knowledge Catalog. Contiene una descripción de los campos de una tabla, una vista o cualquier otra entidad que tenga columnas. El tipo de aspecto personalizado oracle-schema contiene el nombre del esquema de la base de datos de Oracle.

Ejemplo de campos de elementos de importación

El conector debe usar las siguientes convenciones para los recursos de Oracle.

  • Nombres completamente calificados: Los nombres completamente calificados para los recursos de Oracle usan la siguiente plantilla de nombres. Los caracteres prohibidos se marcan como escape con acentos graves.

    Recurso Plantilla Ejemplo
    Instancia

    SOURCE:ADDRESS

    Usa el host y el número de puerto o el nombre de dominio del sistema.

    oracle:`localhost:1521` o oracle:`myinstance.com`
    Base de datos SOURCE:ADDRESS.DATABASE oracle:`localhost:1521`.xe
    Esquema SOURCE:ADDRESS.DATABASE.SCHEMA oracle:`localhost:1521`.xe.sys
    Tabla SOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAME oracle:`localhost:1521`.xe.sys.orders
    Ver SOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAME oracle:`localhost:1521`.xe.sys.orders_view
  • Nombres o IDs de entrada: Las entradas para los recursos de Oracle usan la siguiente plantilla de nombres. Los caracteres prohibidos se reemplazan por un carácter permitido. Los recursos usan el prefijo projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries.

    Recurso Plantilla Ejemplo
    Instancia PREFIX/HOST_PORT projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
    Base de datos PREFIX/HOST_PORT/databases/DATABASE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
    Esquema PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
    Tabla PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
    Ver PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEW projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
  • Entradas principales: Si una entrada no es una entrada raíz para el sistema, puede tener un campo de entrada principal que describa su posición en la jerarquía. El campo debe contener el nombre de la entrada principal. Te recomendamos que generes este valor.

    En la siguiente tabla, se muestran las entradas principales para los recursos de Oracle.

    Entrada Entrada principal
    Instancia "" (string vacía)
    Base de datos Nombre de la instancia
    Esquema Nombre de la base de datos
    Tabla Nombre del esquema
    Ver Nombre del esquema
  • Mapa de aspectos: El mapa de aspectos debe contener al menos un aspecto que describa la entidad que se importará. Este es un ejemplo de un mapa de aspectos para una tabla de Oracle.

    "example-project.us-central1.oracle-table": {
        "aspect_type": "example-project.us-central1.oracle-table",
        "path": "",
        "data": {}
     },

    Puedes encontrar tipos de aspectos predefinidos (como schema) que definen la estructura de la tabla o la vista en el proyecto dataplex-types, en la ubicación global.

  • Claves de aspecto: Las claves de aspecto usan el formato de nomenclatura PROJECT.LOCATION.ASPECT_TYPE. En la siguiente tabla, se muestran ejemplos de claves de aspecto para los recursos de Oracle.

    Entrada Ejemplo de clave de aspecto
    Instancia example-project.us-central1.oracle-instance
    Base de datos example-project.us-central1.oracle-database
    Esquema example-project.us-central1.oracle-schema
    Tabla example-project.us-central1.oracle-table
    Ver example-project.us-central1.oracle-view

¿Qué sigue?