Développer un connecteur personnalisé pour l'importation de métadonnées

Ce document fournit un modèle de référence pour créer un connecteur personnalisé permettant d'extraire les métadonnées de sources tierces, telles que MySQL, SQL Server et Oracle. Vous pouvez utiliser ce connecteur pour importer des métadonnées dans Dataplex Universal Catalog via un pipeline de connectivité gérée. Un exemple de connecteur Python pour Oracle Database Express Edition (XE) est inclus pour vous aider à démarrer. Vous pouvez également développer des connecteurs à l'aide de Java, Scala ou R.

Fonctionnement des connecteurs

Un connecteur extrait les métadonnées d'une source de données tierce, les transforme au format ImportItem de Dataplex Universal Catalog et génère des fichiers d'importation de métadonnées qui peuvent être importés par Dataplex Universal Catalog.

Le connecteur fait partie d'un pipeline de connectivité gérée. Un pipeline de connectivité gérée est un workflow orchestré que vous utilisez pour importer des métadonnées Dataplex Universal Catalog. Il exécute le connecteur et effectue d'autres tâches dans le workflow d'importation, comme exécuter un job d'importation de métadonnées et capturer des journaux.

Le pipeline de connectivité gérée exécute le connecteur à l'aide d'un job par lot Google Cloud Serverless pour Apache Spark. Serverless pour Apache Spark fournit un environnement d'exécution Spark sans serveur. Bien que vous puissiez créer un connecteur qui n'utilise pas Spark, nous vous recommandons d'utiliser Spark, car il peut améliorer les performances de votre connecteur.

Exigences concernant le connecteur

Le connecteur doit répondre aux exigences suivantes :

  • Le connecteur doit être une image Artifact Registry pouvant être exécutée sur Serverless pour Apache Spark.
  • Il doit générer des fichiers de métadonnées dans un format pouvant être importé par un job d'importation de métadonnées Dataplex Universal Catalog (méthode API metadataJobs.create). Pour en savoir plus sur les exigences, consultez la section Fichier d'importation de métadonnées.
  • Le connecteur doit accepter les arguments de ligne de commande suivants pour recevoir des informations du pipeline :

    Argument de ligne de commande Valeur fournie par le pipeline
    target_project_id PROJECT_ID
    target_location_id REGION
    target_entry_group_id ENTRY_GROUP_ID
    output_bucket CLOUD_STORAGE_BUCKET_ID
    output_folder FOLDER_ID

    Le connecteur utilise ces arguments pour générer des métadonnées dans un groupe d'entrées cible projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID et pour écrire dans un bucket Cloud Storage gs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID. Chaque exécution du pipeline crée un dossier FOLDER_ID dans le bucket CLOUD_STORAGE_BUCKET_ID. Le connecteur doit écrire les fichiers d'importation de métadonnées dans ce dossier.

Les modèles de pipeline sont compatibles avec les connecteurs PySpark. Les modèles supposent que le pilote (mainPythonFileUri) est un fichier local sur l'image du connecteur nommée main.py. Vous pouvez modifier les modèles de pipeline pour d'autres scénarios, tels qu'un connecteur Spark, un autre URI de pilote ou d'autres options.

Voici comment utiliser PySpark pour créer un élément d'importation dans le fichier d'importation de métadonnées.

"""PySpark schemas for the data."""
entry_source_schema = StructType([
      StructField("display_name", StringType()),
      StructField("source", StringType())])

aspect_schema = MapType(StringType(),
                        StructType([
                            StructField("aspect_type", StringType()),
                            StructField("data", StructType([
                            ]))
                          ])
                        )

entry_schema = StructType([
  StructField("name", StringType()),
  StructField("entry_type", StringType()),
  StructField("fully_qualified_name", StringType()),
  StructField("parent_entry", StringType()),
  StructField("entry_source", entry_source_schema),
  StructField("aspects", aspect_schema)
])

import_item_schema = StructType([
  StructField("entry", entry_schema),
  StructField("aspect_keys", ArrayType(StringType())),
  StructField("update_mask", ArrayType(StringType()))
])

Avant de commencer

Dans ce guide, nous partons du principe que vous connaissez bien Python et PySpark.

Consultez les informations suivantes :

Suivez les étapes ci-dessous. Créez toutes les ressources au même emplacement Google Cloud.

  1. Créez ou sélectionnez un projet Google Cloud .

    Rôles requis pour sélectionner ou créer un projet

    • Sélectionnez un projet : la sélection d'un projet ne nécessite pas de rôle IAM spécifique. Vous pouvez sélectionner n'importe quel projet pour lequel un rôle vous a été attribué.
    • Créer un projet : pour créer un projet, vous devez disposer du rôle Créateur de projet (roles/resourcemanager.projectCreator), qui contient l'autorisation resourcemanager.projects.create. Découvrez comment attribuer des rôles.
    • Créez un projet Google Cloud  :

      gcloud projects create PROJECT_ID

      Remplacez PROJECT_ID par le nom du projet Google Cloud que vous créez.

    • Sélectionnez le projet Google Cloud que vous avez créé :

      gcloud config set project PROJECT_ID

      Remplacez PROJECT_ID par le nom de votre projet Google Cloud .

  2. Vérifiez que la facturation est activée pour votre projet Google Cloud .

  3. Activez les API Dataplex, Dataproc, Workflows et Artifact Registry :

    Rôles requis pour activer les API

    Pour activer les API, vous avez besoin du rôle IAM Administrateur Service Usage (roles/serviceusage.serviceUsageAdmin), qui contient l'autorisation serviceusage.services.enable. Découvrez comment attribuer des rôles.

    gcloud services enable dataplex.googleapis.com dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com
  4. Installez la Google Cloud CLI.

  5. Si vous utilisez un fournisseur d'identité (IdP) externe, vous devez d'abord vous connecter à la gcloud CLI avec votre identité fédérée.

  6. Pour initialiser la gcloud CLI, exécutez la commande suivante :

    gcloud init
  7. Attribuez des rôles à votre compte utilisateur. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants : roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Remplacez les éléments suivants :

    • PROJECT_ID : ID de votre projet
    • USER_IDENTIFIER : identifiant de votre compte d'utilisateur. Par exemple, myemail@example.com.
    • ROLE : rôle IAM que vous accordez à votre compte utilisateur.
  8. Configurez l'authentification :

    1. Assurez-vous de disposer des rôles IAM "Créateur de compte de service" (roles/iam.serviceAccountCreator) et "Administrateur IAM du projet" (roles/resourcemanager.projectIamAdmin). Découvrez comment attribuer des rôles.
    2. Créez le compte de service :

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Remplacez SERVICE_ACCOUNT_NAME par le nom que vous souhaitez donner au compte de service.

    3. Attribuez le rôle IAM roles/owner au compte de service.

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Remplacez les éléments suivants :

      • SERVICE_ACCOUNT_NAME : nom du compte de service.
      • PROJECT_ID : ID du projet dans lequel vous avez créé le compte de service.
  9. Créez un bucket Cloud Storage pour stocker les fichiers d'importation de métadonnées.

  10. Créez les ressources de métadonnées suivantes dans le même projet.

    Pour obtenir des exemples de valeurs, consultez la section Exemples de ressources de métadonnées pour une source Oracle de ce document.

    1. Créez un groupe d'entrées.
    2. Créez des types d'aspects personnalisés pour les entrées que vous souhaitez importer. Utilisez la convention d'attribution de noms SOURCE-ENTITY_TO_IMPORT.

      Par exemple, pour une base de données Oracle, créez un type d'aspect nommé oracle-database.

      Vous pouvez également créer d'autres types d'aspects pour stocker d'autres informations.

    3. Créez des types d'entrées personnalisés pour les ressources que vous souhaitez importer, puis attribuez-leur les types d'aspects appropriés. Utilisez la convention d'attribution de noms SOURCE-ENTITY_TO_IMPORT.

      Par exemple, pour une base de données Oracle, créez un type d'entrée nommé oracle-database. Associez-le au type d'aspect nommé oracle-database.

  11. Assurez-vous que votre source tierce est accessible depuis votre projet Google Cloud . Pour en savoir plus, consultez Configuration du réseau Serverless pour Apache Spark.

Créer un connecteur Python de base

L'exemple de connecteur Python de base crée des entrées de premier niveau pour une source de données Oracle à l'aide des classes de la bibliothèque cliente Dataplex Universal Catalog. Vous fournissez ensuite les valeurs des champs d'entrée.

Le connecteur crée un fichier d'importation de métadonnées avec les entrées suivantes :

  • Une entrée instance, avec le type d'entrée projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance. Cette entrée représente un système Oracle Database XE.
  • Une entrée database, qui représente une base de données dans le système Oracle Database XE.

Pour créer un connecteur Python de base, suivez ces étapes :

  1. Clonez le dépôt cloud-dataplex.

  2. Configurez un environnement local. Nous vous recommandons d'utiliser un environnement virtuel.

    mkdir venv
    python -m venv venv/
    source venv/bin/activate
    

    Utilisez les versions active ou de maintenance de Python. Les versions 3.7 et ultérieures de Python sont compatibles.

  3. Créez un projet Python.

  4. Installez les éléments requis :

    pip install -r requirements.txt
    

    Les éléments requis suivants sont installés :

    google-cloud-dataplex==2.2.2
    google-cloud-storage
    google-cloud-secret-manager
    
  5. Ajoutez un fichier de pipeline main.py à la racine du projet.

    from src import bootstrap
    
    
    if __name__ == '__main__':
        bootstrap.run()
    

    Lorsque vous déployez votre code sur Serverless pour Apache Spark, le fichier main.py sert de point d'entrée pour l'exécution. Nous vous recommandons de réduire au minimum la quantité d'informations stockées dans le fichier main.py. Utilisez ce fichier pour appeler des fonctions et des classes définies dans votre connecteur, comme la classe src/bootstap.py.

  6. Créez un dossier src pour stocker la majeure partie de la logique de votre connecteur.

  7. Mettez à jour le fichier src/cmd_reader.py avec une classe Python pour accepter les arguments de ligne de commande. Pour cela, vous pouvez utiliser le module argeparse.

    """Command line reader."""
    import argparse
    
    
    def read_args():
        """Reads arguments from the command line."""
        parser = argparse.ArgumentParser()
    
        # Dataplex arguments
        parser.add_argument("--target_project_id", type=str, required=True,
            help="The name of the target Google Cloud project to import the metadata into.")
        parser.add_argument("--target_location_id", type=str, required=True,
            help="The target Google Cloud location where the metadata will be imported into.")
        parser.add_argument("--target_entry_group_id", type=str, required=True,
            help="The ID of the entry group to import metadata into. "
                 "The metadata will be imported into entry group with the following"
                 "full resource name: projects/${target_project_id}/"
                 "locations/${target_location_id}/entryGroups/${target_entry_group_id}.")
    
        # Oracle arguments
        parser.add_argument("--host_port", type=str, required=True,
            help="Oracle host and port number separated by the colon (:).")
        parser.add_argument("--user", type=str, required=True, help="Oracle User.")
        parser.add_argument("--password-secret", type=str, required=True,
            help="Secret resource name in the Secret Manager for the Oracle password.")
        parser.add_argument("--database", type=str, required=True,
            help="Source Oracle database.")
    
        # Google Cloud Storage arguments
        # It is assumed that the bucket is in the same region as the entry group
        parser.add_argument("--output_bucket", type=str, required=True,
            help="The Cloud Storage bucket to write the generated metadata import file.")
        parser.add_argument("--output_folder", type=str, required=True,
            help="A folder in the Cloud Storage bucket, to write the generated metadata import files.")
    
        return vars(parser.parse_known_args()[0])
    

    Dans les environnements de production, nous vous recommandons de stocker le mot de passe dans Secret Manager.

  8. Mettez à jour le fichier src/constants.py avec du code permettant de créer des constantes.

    """Constants that are used in the different files."""
    import enum
    
    SOURCE_TYPE = "oracle"
    
    # Symbols for replacement
    FORBIDDEN = "#"
    ALLOWED = "!"
    
    
    class EntryType(enum.Enum):
        """Types of Oracle entries."""
        INSTANCE: str = "projects/{project}/locations/{location}/entryTypes/oracle-instance"
        DATABASE: str = "projects/{project}/locations/{location}/entryTypes/oracle-database"
        DB_SCHEMA: str = "projects/{project}/locations/{location}/entryTypes/oracle-schema"
        TABLE: str = "projects/{project}/locations/{location}/entryTypes/oracle-table"
        VIEW: str = "projects/{project}/locations/{location}/entryTypes/oracle-view"
    
  9. Mettez à jour le fichier src/name_builder.py avec les méthodes permettant de compiler les ressources de métadonnées que vous souhaitez que le connecteur crée pour vos ressources Oracle. Utilisez les conventions décrites dans la section Exemples de ressources de métadonnées pour une source Oracle de ce document.

    """Builds Dataplex hierarchy identifiers."""
    from typing import Dict
    from src.constants import EntryType, SOURCE_TYPE
    
    
    # Oracle cluster users start with C## prefix, but Dataplex doesn't accept #.
    # In that case in names it is changed to C!!, and escaped with backticks in FQNs
    FORBIDDEN_SYMBOL = "#"
    ALLOWED_SYMBOL = "!"
    
    
    def create_fqn(config: Dict[str, str], entry_type: EntryType,
                   schema_name: str = "", table_name: str = ""):
        """Creates a fully qualified name or Dataplex v1 hierarchy name."""
        if FORBIDDEN_SYMBOL in schema_name:
            schema_name = f"`{schema_name}`"
    
        if entry_type == EntryType.INSTANCE:
            # Requires backticks to escape column
            return f"{SOURCE_TYPE}:`{config['host_port']}`"
        if entry_type == EntryType.DATABASE:
            instance = create_fqn(config, EntryType.INSTANCE)
            return f"{instance}.{config['database']}"
        if entry_type == EntryType.DB_SCHEMA:
            database = create_fqn(config, EntryType.DATABASE)
            return f"{database}.{schema_name}"
        if entry_type in [EntryType.TABLE, EntryType.VIEW]:
            database = create_fqn(config, EntryType.DATABASE)
            return f"{database}.{schema_name}.{table_name}"
        return ""
    
    
    def create_name(config: Dict[str, str], entry_type: EntryType,
                    schema_name: str = "", table_name: str = ""):
        """Creates a Dataplex v2 hierarchy name."""
        if FORBIDDEN_SYMBOL in schema_name:
            schema_name = schema_name.replace(FORBIDDEN_SYMBOL, ALLOWED_SYMBOL)
        if entry_type == EntryType.INSTANCE:
            name_prefix = (
                f"projects/{config['target_project_id']}/"
                f"locations/{config['target_location_id']}/"
                f"entryGroups/{config['target_entry_group_id']}/"
                f"entries/"
            )
            return name_prefix + config["host_port"].replace(":", "@")
        if entry_type == EntryType.DATABASE:
            instance = create_name(config, EntryType.INSTANCE)
            return f"{instance}/databases/{config['database']}"
        if entry_type == EntryType.DB_SCHEMA:
            database = create_name(config, EntryType.DATABASE)
            return f"{database}/database_schemas/{schema_name}"
        if entry_type == EntryType.TABLE:
            db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
            return f"{db_schema}/tables/{table_name}"
        if entry_type == EntryType.VIEW:
            db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
            return f"{db_schema}/views/{table_name}"
        return ""
    
    
    def create_parent_name(config: Dict[str, str], entry_type: EntryType,
                           parent_name: str = ""):
        """Generates a Dataplex v2 name of the parent."""
        if entry_type == EntryType.DATABASE:
            return create_name(config, EntryType.INSTANCE)
        if entry_type == EntryType.DB_SCHEMA:
            return create_name(config, EntryType.DATABASE)
        if entry_type == EntryType.TABLE:
            return create_name(config, EntryType.DB_SCHEMA, parent_name)
        return ""
    
    
    def create_entry_aspect_name(config: Dict[str, str], entry_type: EntryType):
        """Generates an entry aspect name."""
        last_segment = entry_type.value.split("/")[-1]
        return f"{config['target_project_id']}.{config['target_location_id']}.{last_segment}"
    

    Étant donné que le fichier name_builder.py est utilisé à la fois pour le code Python principal et pour le code PySpark principal, nous vous recommandons d'écrire les méthodes en tant que fonctions pures, plutôt qu'en tant que membres d'une classe.

  10. Mettez à jour le fichier src/top_entry_builder.py avec du code permettant d'ajouter des données aux entrées de premier niveau.

    """Non-Spark approach for building the entries."""
    import dataclasses
    import json
    from typing import List, Dict
    
    import proto
    from google.cloud import dataplex_v1
    
    from src.constants import EntryType
    from src import name_builder as nb
    
    
    @dataclasses.dataclass(slots=True)
    class ImportItem:
        """A template class for Import API."""
    
        entry: dataplex_v1.Entry = dataclasses.field(default_factory=dataplex_v1.Entry)
        aspect_keys: List[str] = dataclasses.field(default_factory=list)
        update_mask: List[str] = dataclasses.field(default_factory=list)
    
    
    def _dict_factory(data: object):
        """Factory function required for converting Entry dataclass to dict."""
    
        def convert(obj: object):
            if isinstance(obj, proto.Message):
                return proto.Message.to_dict(obj)
            return obj
    
        return dict((k, convert(v)) for k, v in data)
    
    
    def _create_entry(config: Dict[str, str], entry_type: EntryType):
        """Creates an entry based on a Dataplex library."""
        entry = dataplex_v1.Entry()
        entry.name = nb.create_name(config, entry_type)
        entry.entry_type = entry_type.value.format(
            project=config["target_project_id"], location=config["target_location_id"]
        )
        entry.fully_qualified_name = nb.create_fqn(config, entry_type)
        entry.parent_entry = nb.create_parent_name(config, entry_type)
    
        aspect_key = nb.create_entry_aspect_name(config, entry_type)
    
        # Add mandatory aspect
        entry_aspect = dataplex_v1.Aspect()
        entry_aspect.aspect_type = aspect_key
        entry_aspect.data = {}
        entry.aspects[aspect_key] = entry_aspect
    
        return entry
    
    
    def _entry_to_import_item(entry: dataplex_v1.Entry):
        """Packs entry to import item, accepted by the API,"""
        import_item = ImportItem()
        import_item.entry = entry
        import_item.aspect_keys = list(entry.aspects.keys())
        import_item.update_mask = "aspects"
    
        return import_item
    
    
    def create(config, entry_type: EntryType):
        """Creates an entry, packs it to Import Item and converts to json."""
        import_item = _entry_to_import_item(_create_entry(config, entry_type))
        return json.dumps(dataclasses.asdict(import_item, dict_factory=_dict_factory))
    
  11. Mettez à jour le fichier src/bootstrap.py avec du code permettant de générer le fichier d'importation de métadonnées et d'exécuter le connecteur.

    """The entrypoint of a pipeline."""
    from typing import Dict
    
    from src.constants import EntryType
    from src import cmd_reader
    from src import secret_manager
    from src import entry_builder
    from src import gcs_uploader
    from src import top_entry_builder
    from src.oracle_connector import OracleConnector
    
    
    FILENAME = "output.jsonl"
    
    
    def write_jsonl(output_file, json_strings):
        """Writes a list of string to the file in JSONL format."""
    
        # For simplicity, dataset is written into the one file. But it is not
        # mandatory, and the order doesn't matter for Import API.
        # The PySpark itself could dump entries into many smaller JSONL files.
        # Due to performance, it's recommended to dump to many smaller files.
        for string in json_strings:
            output_file.write(string + "\n")
    
    
    def process_dataset(
        connector: OracleConnector,
        config: Dict[str, str],
        schema_name: str,
        entry_type: EntryType,
    ):
        """Builds dataset and converts it to jsonl."""
        df_raw = connector.get_dataset(schema_name, entry_type)
        df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
        return df.toJSON().collect()
    
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        config["password"] = secret_manager.get_password(config["password_secret"])
        connector = OracleConnector(config)
    
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write top entries that don't require connection to the database
            file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
            file.writelines("\n")
            file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
    
            # Get schemas, write them and collect to the list
            df_raw_schemas = connector.get_db_schemas()
            schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
            schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
    
            write_jsonl(file, schemas_json)
    
            # Ingest tables and views for every schema in a list
            for schema in schemas:
                print(f"Processing tables for {schema}")
                tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                write_jsonl(file, tables_json)
                print(f"Processing views for {schema}")
                views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                write_jsonl(file, views_json)
    
        gcs_uploader.upload(config, FILENAME)
    
  12. Exécutez le code en local.

    Un fichier d'importation de métadonnées nommé output.jsonl est renvoyé. Il comporte deux lignes, chacune représentant un élément d'importation. Le pipeline de connectivité gérée lit ce fichier lors de l'exécution du job d'importation de métadonnées.

  13. (Facultatif) Étendez l'exemple précédent pour utiliser les classes de la bibliothèque cliente Dataplex Universal Catalog afin de créer des éléments d'importation pour les tables, les schémas et les vues. Vous pouvez également exécuter l'exemple Python sur Serverless pour Apache Spark.

    Nous vous recommandons de créer un connecteur qui utilise Spark (et s'exécute sur Serverless pour Apache Spark), car cela peut améliorer ses performances.

Créer un connecteur PySpark

Cet exemple est basé sur l'API PySpark DataFrame. Vous pouvez installer PySpark SQL et l'exécuter en local avant de l'exécuter sur Serverless pour Apache Spark. Si vous installez et exécutez PySpark en local, installez la bibliothèque PySpark à l'aide de pip. En revanche, vous n'avez pas besoin d'installer un cluster Spark local.

Pour des raisons de performances, cet exemple n'utilise pas de classes prédéfinies de la bibliothèque PySpark. L'exemple crée plutôt des DataFrames, les convertit en entrées JSON, puis écrit la sortie dans un fichier d'importation de métadonnées au format JSON Lines qui peut être importé dans Dataplex Universal Catalog.

Pour créer un connecteur à l'aide de PySpark, suivez ces étapes :

  1. Clonez le dépôt cloud-dataplex.

  2. Installez PySpark :

    pip install pyspark
    
  3. Installez les éléments requis :

    pip install -r requirements.txt
    

    Les éléments requis suivants sont installés :

    google-cloud-dataplex==2.2.2
    google-cloud-storage
    google-cloud-secret-manager
    
  4. Mettez à jour le fichier oracle_connector.py avec du code permettant de lire les données d'une source de données Oracle et de renvoyer des DataFrames.

    """Reads Oracle using PySpark."""
    from typing import Dict
    from pyspark.sql import SparkSession, DataFrame
    
    from src.constants import EntryType
    
    
    SPARK_JAR_PATH = "/opt/spark/jars/ojdbc11.jar"
    
    
    class OracleConnector:
        """Reads data from Oracle and returns Spark Dataframes."""
    
        def __init__(self, config: Dict[str, str]):
            # PySpark entrypoint
            self._spark = SparkSession.builder.appName("OracleIngestor") \
                .config("spark.jars", SPARK_JAR_PATH) \
                .getOrCreate()
    
            self._config = config
            self._url = f"jdbc:oracle:thin:@{config['host_port']}:{config['database']}"
    
        def _execute(self, query: str) -> DataFrame:
            """A generic method to execute any query."""
            return self._spark.read.format("jdbc") \
                .option("driver", "oracle.jdbc.OracleDriver") \
                .option("url", self._url) \
                .option("query", query) \
                .option("user", self._config["user"]) \
                .option("password", self._config["password"]) \
                .load()
    
        def get_db_schemas(self) -> DataFrame:
            """In Oracle, schemas are usernames."""
            query = "SELECT username FROM dba_users"
            return self._execute(query)
    
        def _get_columns(self, schema_name: str, object_type: str) -> str:
            """Gets a list of columns in tables or views in a batch."""
            # Every line here is a column that belongs to the table or to the view.
            # This SQL gets data from ALL the tables in a given schema.
            return (f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "
                    f"col.DATA_TYPE, col.NULLABLE "
                    f"FROM all_tab_columns col "
                    f"INNER JOIN DBA_OBJECTS tab "
                    f"ON tab.OBJECT_NAME = col.TABLE_NAME "
                    f"WHERE tab.OWNER = '{schema_name}' "
                    f"AND tab.OBJECT_TYPE = '{object_type}'")
    
        def get_dataset(self, schema_name: str, entry_type: EntryType):
            """Gets data for a table or a view."""
            # Dataset means that these entities can contain end user data.
            short_type = entry_type.name  # table or view, or the title of enum value
            query = self._get_columns(schema_name, short_type)
            return self._execute(query)
    

    Ajoutez des requêtes SQL pour renvoyer les métadonnées que vous souhaitez importer. Les requêtes doivent renvoyer les informations suivantes :

    • Les schémas de base de données
    • Les tables appartenant à ces schémas
    • Les colonnes appartenant à ces tables, y compris les noms des colonnes, leur type de données, et si elles peuvent être nulles ou sont obligatoires

    Toutes les colonnes de toutes les tables et vues sont stockées dans la même table système. Vous pouvez sélectionner des colonnes à l'aide de la méthode _get_columns. En fonction des paramètres que vous fournissez, vous pouvez sélectionner des colonnes pour les tables ou pour les vues séparément.

    Veuillez noter les points suivants :

    • Dans Oracle, un schéma de base de données appartient à un utilisateur de base de données et porte le même nom que cet utilisateur.
    • Les objets de schéma sont des structures logiques créées par les utilisateurs. Les objets tels que les tables ou les index peuvent contenir des données, tandis que les objets tels que les vues ou les synonymes consistent uniquement en une définition.
    • Le fichier ojdbc11.jar contient le pilote Oracle JDBC.
  5. Mettez à jour le fichier src/entry_builder.py avec des méthodes partagées pour appliquer les transformations Spark.

    """Creates entries with PySpark."""
    import pyspark.sql.functions as F
    from pyspark.sql.types import StringType
    
    from src.constants import EntryType, SOURCE_TYPE
    from src import name_builder as nb
    
    
    @F.udf(returnType=StringType())
    def choose_metadata_type_udf(data_type: str):
        """Choose the metadata type based on Oracle native type."""
        if data_type.startswith("NUMBER") or data_type in ["FLOAT", "LONG"]:
            return "NUMBER"
        if data_type.startswith("VARCHAR") or data_type.startswith("NVARCHAR2"):
            return "STRING"
        if data_type == "DATE":
            return "DATETIME"
        return "OTHER"
    
    
    def create_entry_source(column):
        """Create Entry Source segment."""
        return F.named_struct(F.lit("display_name"),
                              column,
                              F.lit("system"),
                              F.lit(SOURCE_TYPE))
    
    
    def create_entry_aspect(entry_aspect_name):
        """Create aspect with general information (usually it is empty)."""
        return F.create_map(
            F.lit(entry_aspect_name),
            F.named_struct(
                F.lit("aspect_type"),
                F.lit(entry_aspect_name),
                F.lit("data"),
                F.create_map()
                )
            )
    
    
    def convert_to_import_items(df, aspect_keys):
        """Convert entries to import items."""
        entry_columns = ["name", "fully_qualified_name", "parent_entry",
                         "entry_source", "aspects", "entry_type"]
    
        # Puts entry to "entry" key, a list of keys from aspects in "aspects_keys"
        # and "aspects" string in "update_mask"
        return df.withColumn("entry", F.struct(entry_columns)) \
          .withColumn("aspect_keys", F.array([F.lit(key) for key in aspect_keys])) \
          .withColumn("update_mask", F.array(F.lit("aspects"))) \
          .drop(*entry_columns)
    
    
    def build_schemas(config, df_raw_schemas):
        """Create a dataframe with database schemas from the list of usernames.
        Args:
            df_raw_schemas - a dataframe with only one column called USERNAME
        Returns:
            A dataframe with Dataplex-readable schemas.
        """
        entry_type = EntryType.DB_SCHEMA
        entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
    
        # For schema, parent name is the name of the database
        parent_name =  nb.create_parent_name(config, entry_type)
    
        # Create user-defined function.
        create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x),
                                StringType())
        create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x),
                               StringType())
    
        # Fills the missed project and location into the entry type string
        full_entry_type = entry_type.value.format(
            project=config["target_project_id"],
            location=config["target_location_id"])
    
        # Converts a list of schema names to the Dataplex-compatible form
        column = F.col("USERNAME")
        df = df_raw_schemas.withColumn("name", create_name_udf(column)) \
          .withColumn("fully_qualified_name", create_fqn_udf(column)) \
          .withColumn("parent_entry", F.lit(parent_name)) \
          .withColumn("entry_type", F.lit(full_entry_type)) \
          .withColumn("entry_source", create_entry_source(column)) \
          .withColumn("aspects", create_entry_aspect(entry_aspect_name)) \
        .drop(column)
    
        df = convert_to_import_items(df, [entry_aspect_name])
        return df
    
    
    def build_dataset(config, df_raw, db_schema, entry_type):
        """Build table entries from a flat list of columns.
        Args:
            df_raw - a plain dataframe with TABLE_NAME, COLUMN_NAME, DATA_TYPE,
                     and NULLABLE columns
            db_schema - parent database schema
            entry_type - entry type: table or view
        Returns:
            A dataframe with Dataplex-readable data of tables of views.
        """
        schema_key = "dataplex-types.global.schema"
    
        # The transformation below does the following
        # 1. Alters NULLABLE content from Y/N to NULLABLE/REQUIRED
        # 2. Renames NULLABLE to mode
        # 3. Renames DATA_TYPE to dataType
        # 4. Creates metadataType column based on dataType column
        # 5. Renames COLUMN_NAME to name
        df = df_raw \
          .withColumn("mode", F.when(F.col("NULLABLE") == 'Y', "NULLABLE").otherwise("REQUIRED")) \
          .drop("NULLABLE") \
          .withColumnRenamed("DATA_TYPE", "dataType") \
          .withColumn("metadataType", choose_metadata_type_udf("dataType")) \
          .withColumnRenamed("COLUMN_NAME", "name")
    
        # The transformation below aggregate fields, denormalizing the table
        # TABLE_NAME becomes top-level filed, and the rest is put into
        # the array type called "fields"
        aspect_columns = ["name", "mode", "dataType", "metadataType"]
        df = df.withColumn("columns", F.struct(aspect_columns))\
          .groupby('TABLE_NAME') \
          .agg(F.collect_list("columns").alias("fields"))
    
        # Create nested structured called aspects.
        # Fields are becoming a part of a `schema` struct
        # There is also an entry_aspect that is repeats entry_type as aspect_type
        entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
        df = df.withColumn("schema",
                           F.create_map(F.lit(schema_key),
                                        F.named_struct(
                                            F.lit("aspect_type"),
                                            F.lit(schema_key),
                                            F.lit("data"),
                                            F.create_map(F.lit("fields"),
                                                         F.col("fields")))
                                        )
                           )\
          .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
        .drop("fields")
    
        # Merge separate aspect columns into the one map called 'aspects'
        df = df.select(F.col("TABLE_NAME"),
                       F.map_concat("schema", "entry_aspect").alias("aspects"))
    
        # Define user-defined functions to fill the general information
        # and hierarchy names
        create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
                                                         db_schema, x),
                                StringType())
    
        create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
                                                       db_schema, x), StringType())
    
        parent_name = nb.create_parent_name(config, entry_type, db_schema)
        full_entry_type = entry_type.value.format(
            project=config["target_project_id"],
            location=config["target_location_id"])
    
        # Fill the top-level fields
        column = F.col("TABLE_NAME")
        df = df.withColumn("name", create_name_udf(column)) \
          .withColumn("fully_qualified_name", create_fqn_udf(column)) \
          .withColumn("entry_type", F.lit(full_entry_type)) \
          .withColumn("parent_entry", F.lit(parent_name)) \
          .withColumn("entry_source", create_entry_source(column)) \
        .drop(column)
    
        df = convert_to_import_items(df, [schema_key, entry_aspect_name])
        return df
    

    Veuillez noter les points suivants :

    • Les méthodes compilent les ressources de métadonnées que le connecteur crée pour vos ressources Oracle. Utilisez les conventions décrites dans la section Exemples de ressources de métadonnées pour une source Oracle de ce document.
    • La méthode convert_to_import_items s'applique aux schémas, aux tables et aux vues. Assurez-vous que la sortie du connecteur est constituée d'un ou de plusieurs éléments d'importation pouvant être traités par la méthode metadataJobs.create, et non d'entrées individuelles.
    • Même dans une vue, la colonne est appelée TABLE_NAME.
  6. Mettez à jour le fichier bootstrap.py avec du code permettant de générer le fichier d'importation de métadonnées et d'exécuter le connecteur.

    """The entrypoint of a pipeline."""
    from typing import Dict
    
    from src.constants import EntryType
    from src import cmd_reader
    from src import secret_manager
    from src import entry_builder
    from src import gcs_uploader
    from src import top_entry_builder
    from src.oracle_connector import OracleConnector
    
    
    FILENAME = "output.jsonl"
    
    
    def write_jsonl(output_file, json_strings):
        """Writes a list of string to the file in JSONL format."""
    
        # For simplicity, dataset is written into the one file. But it is not
        # mandatory, and the order doesn't matter for Import API.
        # The PySpark itself could dump entries into many smaller JSONL files.
        # Due to performance, it's recommended to dump to many smaller files.
        for string in json_strings:
            output_file.write(string + "\n")
    
    
    def process_dataset(
        connector: OracleConnector,
        config: Dict[str, str],
        schema_name: str,
        entry_type: EntryType,
    ):
        """Builds dataset and converts it to jsonl."""
        df_raw = connector.get_dataset(schema_name, entry_type)
        df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
        return df.toJSON().collect()
    
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        config["password"] = secret_manager.get_password(config["password_secret"])
        connector = OracleConnector(config)
    
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write top entries that don't require connection to the database
            file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
            file.writelines("\n")
            file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
    
            # Get schemas, write them and collect to the list
            df_raw_schemas = connector.get_db_schemas()
            schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
            schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
    
            write_jsonl(file, schemas_json)
    
            # Ingest tables and views for every schema in a list
            for schema in schemas:
                print(f"Processing tables for {schema}")
                tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                write_jsonl(file, tables_json)
                print(f"Processing views for {schema}")
                views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                write_jsonl(file, views_json)
    
        gcs_uploader.upload(config, FILENAME)
    

    Cet exemple enregistre le fichier d'importation de métadonnées en tant que fichier JSON Lines unique. Vous pouvez utiliser des outils PySpark tels que la classe DataFrameWriter pour générer des lots de JSON en parallèle.

    Le connecteur peut écrire des entrées dans le fichier d'importation de métadonnées dans n'importe quel ordre.

  7. Mettez à jour le fichier gcs_uploader.py avec du code permettant d'importer le fichier de métadonnées dans un bucket Cloud Storage.

    """Sends files to GCP storage."""
    from typing import Dict
    from google.cloud import storage
    
    
    def upload(config: Dict[str, str], filename: str):
        """Uploads a file to GCP bucket."""
        client = storage.Client()
        bucket = client.get_bucket(config["output_bucket"])
        folder = config["output_folder"]
    
        blob = bucket.blob(f"{folder}/{filename}")
        blob.upload_from_filename(filename)
    
  8. Créez l'image du connecteur.

    Si votre connecteur contient plusieurs fichiers ou si vous souhaitez utiliser des bibliothèques qui ne sont pas incluses dans l'image Docker par défaut, vous devez utiliser un conteneur personnalisé. Serverless pour Apache Spark exécute les charges de travail dans des conteneurs Docker. Créez une image Docker personnalisée du connecteur et stockez-la dans Artifact Registry. Serverless pour Apache Spark lit l'image depuis Artifact Registry.

    1. Créez un fichier Dockerfile :

      FROM debian:11-slim
      
      ENV DEBIAN_FRONTEND=noninteractive
      
      RUN apt update && apt install -y procps tini
      RUN apt install -y wget
      
      ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
      RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
      COPY ojdbc11.jar "${SPARK_EXTRA_JARS_DIR}"
      
      ENV CONDA_HOME=/opt/miniconda3
      ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
      ENV PATH=${CONDA_HOME}/bin:${PATH}
      RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py311_24.9.2-0-Linux-x86_64.sh
      
      RUN bash Miniconda3-py310_23.3.1-0-Linux-x86_64.sh -b -p /opt/miniconda3 \
        && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
        && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
        && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
        && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
      
      RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
          && ${CONDA_HOME}/bin/mamba install \
            conda \
            google-cloud-dataproc \
            google-cloud-logging \
            google-cloud-monitoring \
            google-cloud-storage
      
      RUN apt update && apt install -y git
      COPY requirements.txt .
      RUN python -m pip install -r requirements.txt
      
      ENV PYTHONPATH=/opt/python/packages
      RUN mkdir -p "${PYTHONPATH}/src/"
      COPY src/ "${PYTHONPATH}/src/"
      COPY main.py .
      
      RUN groupadd -g 1099 spark
      RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
      USER spark

      Utilisez Conda comme gestionnaire de paquets. Serverless pour Apache Spark installe pyspark dans le conteneur au moment de l'exécution. Vous n'avez donc pas besoin d'installer de dépendances PySpark dans l'image de votre conteneur personnalisé.

    2. Créez l'image du conteneur personnalisé et transmettez-la à Artifact Registry.

      #!/bin/bash
      
      IMAGE=oracle-pyspark:0.0.1
      PROJECT=<PROJECT_ID>
      
      
      REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pyspark
      
      docker build -t "${IMAGE}" .
      
      # Tag and push to GCP container registry
      gcloud config set project ${PROJECT}
      gcloud auth configure-docker us-central1-docker.pkg.dev
      docker tag "${IMAGE}" "${REPO_IMAGE}"
      docker push "${REPO_IMAGE}"
      

      Étant donné qu'une image peut avoir plusieurs noms, vous pouvez utiliser le tag Docker pour lui attribuer un alias.

  9. Exécutez le connecteur sur Serverless pour Apache Spark. Pour envoyer un job par lot PySpark à l'aide de l'image du conteneur personnalisé, exécutez la commande gcloud dataproc batches submit pyspark.

    gcloud dataproc batches submit pyspark main.py --project=PROJECT \
        --region=REGION --batch=BATCH_ID \
        --container-image=CUSTOM_CONTAINER_IMAGE \
        --service-account=SERVICE_ACCOUNT_NAME \
        --jars=PATH_TO_JAR_FILES \
        --properties=PYSPARK_PROPERTIES \
        -- PIPELINE_ARGUMENTS
    

    Veuillez noter les points suivants :

    • Les fichiers JAR sont des pilotes pour Spark. Pour lire des données depuis Oracle, MySQL ou Postgres, vous devez fournir un package spécifique à Apache Spark. Le package peut se trouver dans Cloud Storage ou dans le conteneur. Si le fichier JAR se trouve dans le conteneur, son chemin d'accès est semblable à file:///path/to/file/driver.jar. Dans cet exemple, le chemin d'accès au fichier JAR est /opt/spark/jars/.
    • PIPELINE_ARGUMENTS correspond aux arguments de ligne de commande du connecteur.

    Le connecteur extrait les métadonnées de la base de données Oracle, génère un fichier d'importation de métadonnées et l'enregistre dans un bucket Cloud Storage.

  10. Pour importer manuellement les métadonnées du fichier d'importation de métadonnées dans Dataplex Universal Catalog, exécutez un job de métadonnées. Exécutez la méthode metadataJobs.create.

    1. Dans la ligne de commande, ajoutez des variables d'environnement et créez un alias pour la commande curl.

      PROJECT_ID=PROJECT
      LOCATION_ID=LOCATION
      DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID
      alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
      
    2. Appelez la méthode API en transmettant les types d'entrées et d'aspects que vous souhaitez importer.

      gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF
      {
        "type": "IMPORT",
        "import_spec": {
          "source_storage_uri": "gs://BUCKET/FOLDER/",
          "entry_sync_mode": "FULL",
          "aspect_sync_mode": "INCREMENTAL",
          "scope": {
            "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],
            "entry_types": [
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],
      
            "aspect_types": [
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance",
              "projects/dataplex-types/locations/global/aspectTypes/schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],
            },
          },
        }
      EOF
      )"
      

      Le type d'aspect schema est un type d'aspect global défini par Dataplex Universal Catalog.

      Notez que le format que vous utilisez pour les noms de types d'aspect lorsque vous appelez la méthode API est différent de celui que vous utilisez dans le code du connecteur.

    3. (Facultatif) Utilisez Cloud Logging pour afficher les journaux du job de métadonnées. Pour en savoir plus, consultez Surveiller les journaux Dataplex Universal Catalog.

Configurer l'orchestration du pipeline

Les sections précédentes ont montré comment créer un exemple de connecteur et l'exécuter manuellement.

Dans un environnement de production, vous exécutez le connecteur dans un pipeline de connectivité gérée, à l'aide d'une plate-forme d'orchestration telle que Workflows.

  1. Pour exécuter un pipeline de connectivité gérée avec l'exemple de connecteur, suivez la procédure indiquée dans Importer des métadonnées à l'aide de Workflows. Voici ce que vous devez faire :

    • Créez le workflow au même emplacement Google Cloud que le connecteur.
    • Dans le fichier de définition du workflow, mettez à jour la fonction submit_pyspark_extract_job avec le code suivant pour extraire les données de la base de données Oracle à l'aide du connecteur que vous avez créé.

      - submit_pyspark_extract_job:
          call: http.post
          args:
            url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            headers:
              Content-Type: "application/json"
            query:
              batchId: ${WORKFLOW_ID}
            body:
              pysparkBatch:
                mainPythonFileUri: file:///main.py
                jars: file:///opt/spark/jars/ojdbc11.jar
                args:
                  - ${"--host_port=" + args.ORACLE_HOST_PORT}
                  - ${"--user=" + args.ORACLE_USER}
                  - ${"--password=" + args.ORACLE_PASSWORD}
                  - ${"--database=" + args.ORACE_DATABASE}
                  - ${"--project=" + args.TARGET_PROJECT_ID}
                  - ${"--location=" + args.CLOUD_REGION}
                  - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID}
                  - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                  - ${"--folder=" + WORKFLOW_ID}
              runtimeConfig:
                version: "2.0"
                containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"
              environmentConfig:
                executionConfig:
                    serviceAccount: ${args.SERVICE_ACCOUNT}
          result: RESPONSE_MESSAGE
      
    • Dans le fichier de définition du workflow, mettez à jour la fonction submit_import_job avec le code suivant pour importer les entrées. La fonction appelle la méthode API metadataJobs.create pour exécuter un job d'importation de métadonnées.

      - submit_import_job:
          call: http.post
          args:
            url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            body:
              type: IMPORT
              import_spec:
                source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                entry_sync_mode: FULL
                aspect_sync_mode: INCREMENTAL
                scope:
                  entry_groups:
                    - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}
                  entry_types:
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"
                  aspect_types:
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"
                    -"projects/dataplex-types/locations/global/aspectTypes/schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"
          result: IMPORT_JOB_RESPONSE
      

      Indiquez les mêmes types d'entrées et d'aspects que ceux que vous avez inclus lorsque vous avez appelé la méthode API manuellement. Notez l'absence de virgule à la fin de chaque chaîne.

    • Lorsque vous exécutez le workflow, fournissez les arguments d'exécution suivants :

      {
        "CLOUD_REGION": "us-central1",
        "ORACLE_USER": "system",
        "ORACLE_HOST_PORT": "x.x.x.x:1521",
        "ORACLE_DATABASE": "xe",
        "ADDITIONAL_CONNECTOR_ARGS": [],
      }
      
  2. (Facultatif) Utilisez Cloud Logging pour afficher les journaux du pipeline de connectivité gérée. La charge utile de journal inclut un lien vers les journaux du job par lot Serverless pour Apache Spark et du job d'importation de métadonnées, le cas échéant. Pour en savoir plus, consultez Afficher les journaux de workflow.

  3. (Facultatif) Pour améliorer la sécurité, les performances et les fonctionnalités de votre pipeline de connectivité gérée, vous pouvez effectuer les opérations suivantes :

    1. Utilisez Secret Manager pour stocker les identifiants de votre source de données tierce.
    2. Utilisez PySpark pour écrire la sortie JSON Lines dans plusieurs fichiers d'importation de métadonnées en parallèle.
    3. Utilisez un préfixe pour diviser les fichiers volumineux (plus de 100 Mo) en fichiers plus petits.
    4. Ajoutez d'autres aspects personnalisés qui capturent des métadonnées métier et techniques supplémentaires à partir de votre source.

Exemples de ressources de métadonnées pour une source Oracle

L'exemple de connecteur extrait les métadonnées d'une base de données Oracle et les mappe aux ressources de métadonnées Dataplex Universal Catalog correspondantes.

Éléments à prendre en compte concernant la hiérarchie

Chaque système de Dataplex Universal Catalog possède une entrée racine qui est l'entrée parente du système. L'entrée racine est généralement de type instance. Le tableau suivant présente un exemple de hiérarchie des types d'entrées et des types d'aspects pour un système Oracle. Par exemple, le type d'entrée oracle-database est associé à un type d'aspect également nommé oracle-database.

ID du type d'entrée Description ID du type d'aspect associé
oracle-instance Racine du système importé. oracle-instance
oracle-database Base de données Oracle. oracle-database
oracle-schema Schéma de base de données. oracle-schema
oracle-table Table.

oracle-table

schema

oracle-view Vue.

oracle-view

schema

Le type d'aspect schema est un type d'aspect global défini par Dataplex Universal Catalog. Il contient une description des champs d'une table, d'une vue ou d'une autre entité comportant des colonnes. Le type d'aspect personnalisé oracle-schema contient le nom du schéma de base de données Oracle.

Exemples de champs d'éléments d'importation

Le connecteur doit utiliser les conventions suivantes pour les ressources Oracle.

  • Noms complets : les noms complets des ressources Oracle utilisent le modèle d'attribution de noms suivant. Les caractères non autorisés sont échappés avec des accents graves.

    Ressource Modèle Exemple
    Instance

    SOURCE : ADDRESS

    Utilisez l'hôte et le numéro de port ou le nom de domaine du système.

    oracle:`localhost:1521` ou oracle:`myinstance.com`
    Base de données SOURCE:ADDRESS.DATABASE oracle:`localhost:1521`.xe
    Schéma SOURCE:ADDRESS.DATABASE.SCHEMA oracle:`localhost:1521`.xe.sys
    Table SOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAME oracle:`localhost:1521`.xe.sys.orders
    Vue SOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAME oracle:`localhost:1521`.xe.sys.orders_view
  • Noms ou ID des entrées : les entrées pour les ressources Oracle utilisent le modèle d'attribution de noms suivant. Les caractères non autorisés sont remplacés par un caractère autorisé. Les ressources utilisent le préfixe projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries.

    Ressource Modèle Exemple
    Instance PREFIX/HOST_PORT projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
    Base de données PREFIX/HOST_PORT/databases/DATABASE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
    Schéma PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
    Table PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
    Vue PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEW projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
  • Entrées parentes : si une entrée n'est pas une entrée racine du système, elle peut comporter un champ d'entrée parente qui décrit sa position dans la hiérarchie. Le champ doit contenir le nom de l'entrée parente. Nous vous recommandons de générer cette valeur.

    Le tableau suivant présente les entrées parentes pour les ressources Oracle.

    Entrée Entrée parente
    Instance "" (chaîne vide)
    Base de données Nom de l'instance
    Schéma Nom de la base de données
    Table Nom du schéma
    Vue Nom du schéma
  • Carte des aspects : la carte des aspects doit contenir au moins un aspect qui décrit l'entité à importer. Voici un exemple de carte des aspects pour une table Oracle.

    "example-project.us-central1.oracle-table": {
        "aspect_type": "example-project.us-central1.oracle-table",
        "path": "",
        "data": {}
     },

    Vous trouverez des types d'aspects prédéfinis (comme schema) qui définissent la structure de la table ou de la vue dans le projet dataplex-types, à l'emplacement global.

  • Clés d'aspect : les clés d'aspect utilisent le format d'attribution de noms PROJECT.LOCATION.ASPECT_TYPE. Le tableau suivant présente des exemples de clés d'aspect pour les ressources Oracle.

    Entrée Exemple de clé d'aspect
    Instance example-project.us-central1.oracle-instance
    Base de données example-project.us-central1.oracle-database
    Schéma example-project.us-central1.oracle-schema
    Tableau example-project.us-central1.oracle-table
    Vue example-project.us-central1.oracle-view

Étapes suivantes