Membuat konektor kustom

Halaman ini menjelaskan cara membuat konektor kustom.

Sebelum memulai

Sebelum memulai, pastikan Anda memiliki hal berikut:

  • Periksa apakah penagihan diaktifkan di project Google Cloud Anda.

  • Instal dan lakukan inisialisasi CLI Google Cloud . Pastikan akun tersebut diautentikasi untuk project Anda.

  • Dapatkan akses administrator Discovery Engine untuk Google Cloud project Anda.

  • Dapatkan kredensial akses untuk sumber data pihak ketiga Anda (seperti kunci API atau autentikasi database).

  • Buat rencana pemetaan data yang jelas. Hal ini harus mencakup kolom mana yang akan diindeks dan cara merepresentasikan kontrol akses, termasuk identitas pihak ketiga.

Membuat konektor dasar

Bagian ini menunjukkan cara membuat konektor kustom dalam bahasa yang Anda pilih. Prinsip dan pola yang ditampilkan di sini berlaku untuk sistem eksternal apa pun. Cukup sesuaikan panggilan API dan transformasi data untuk sumber tertentu dalam bahasa yang Anda pilih guna membuat konektor dasar.

Mengambil data

Untuk memulai, ambil data dari sumber data pihak ketiga Anda. Dalam contoh ini, kami menunjukkan cara mengambil postingan menggunakan penomoran halaman. Untuk lingkungan produksi, sebaiknya gunakan pendekatan streaming untuk set data besar. Hal ini mencegah masalah memori yang dapat terjadi saat memuat semua data sekaligus.

Python

    def fetch_posts(base_url: str, per_page: int = 15) -> List[dict]:
        #Fetch all posts from the given site.#
        url = base_url.rstrip("/") + "/wp-json/wp/v2/posts"
        posts: List[dict] = []
        page = 1
        while True:
            resp = requests.get(
                url,
                params={"page": page, "per_page": per_page},
            )
            resp.raise_for_status()
            batch = resp.json()
            posts.extend(batch)
            if len(batch) < per_page:
                break
            page += 1
        return posts

Mentransformasi data

Untuk mengonversi data sumber ke format dokumen Discovery Engine, susun data tersebut seperti yang ditunjukkan dalam payload contoh berikut. Anda dapat menyertakan pasangan nilai kunci sebanyak yang diperlukan. Misalnya, Anda dapat menyertakan konten lengkap untuk penelusuran komprehensif. Atau, Anda dapat menyertakan kolom terstruktur untuk penelusuran berfaset, atau kombinasi keduanya.

Python

    def convert_posts_to_documents(posts: List[dict]) -> List[discoveryengine.Document]:
        # Convert WP posts into Discovery Engine Document messages.
        docs: List[discoveryengine.Document] = []
        for post in posts:
            payload = {
                "title": post.get("title", {}).get("rendered"),
                "body": post.get("content", {}).get("rendered"),
                "url": post.get("link"),
                "author": post.get("author"),
                "categories": post.get("categories"),
                "tags": post.get("tags"),
                "date": post.get("date"),
            }
            doc = discoveryengine.Document(
                id=str(post["id"]),
                json_data=json.dumps(payload),
            )
            docs.append(doc)
        return docs

Mengambil atau membuat penyimpanan identitas

Untuk mengelola identitas dan grup pengguna untuk kontrol akses, Anda harus mengambil atau membuat penyimpanan identitas. Fungsi ini mendapatkan penyimpanan identitas yang ada berdasarkan ID, project, dan lokasinya. Jika penyimpanan identitas tidak ada, penyimpanan identitas baru yang kosong akan dibuat dan ditampilkan.

Python

    def get_or_create_ims_data_store(
        project_id: str,
        location: str,
        identity_mapping_store_id: str,
    ) -> discoveryengine.DataStore:
      """Get or create a DataStore."""
      # Initialize the client
      client_ims = discoveryengine.IdentityMappingStoreServiceClient()
      # Construct the parent resource name
      parent_ims = client_ims.location_path(project=project_id, location=location)

      try:
        # Create the request object
        name = f"projects/{project_id}/locations/{location}/identityMappingStores/{identity_mapping_store_id}"
        request = discoveryengine.GetIdentityMappingStoreRequest(
            name=name,
        )
        return client_ims.get_identity_mapping_store(request=request)
      except:
        # Create the IdentityMappingStore object (it can be empty for basic creation)
        identity_mapping_store = discoveryengine.IdentityMappingStore()
        # Create the request object
        request = discoveryengine.CreateIdentityMappingStoreRequest(
            parent=parent_ims,
            identity_mapping_store=identity_mapping_store,
            identity_mapping_store_id=identity_mapping_store_id,
        )
        return client_ims.create_identity_mapping_store(request=request)

Fungsi get_or_create_ims_data_store menggunakan variabel kunci berikut:

  • project_id: ID Google Cloud project Anda.
  • location: Google Cloud Lokasi untuk penyimpanan pemetaan identitas.
  • identity_mapping_store_id: ID unik untuk penyimpanan identitas.
  • client_ims: Instance discoveryengine.IdentityMappingStoreServiceClient yang digunakan untuk berinteraksi dengan Identity Store API.
  • parent_ims: Nama resource lokasi induk, dibuat menggunakan client_ims.location_path.
  • name: Nama lengkap resource penyimpanan pemetaan identitas, yang digunakan untuk GetIdentityMappingStoreRequest.

Menyerap pemetaan identitas ke dalam penyimpanan identitas

Untuk memuat entri pemetaan identitas ke penyimpanan identitas yang ditentukan, gunakan fungsi ini. Metode ini mengambil daftar entri pemetaan identitas dan memulai operasi impor inline. Hal ini sangat penting untuk membangun hubungan identitas pengguna, grup, dan eksternal yang diperlukan untuk kontrol akses dan personalisasi.

Python

def load_ims_data(
    ims_store: discoveryengine.DataStore,
    id_mapping_data: list[discoveryengine.IdentityMappingEntry],
) -> discoveryengine.DataStore:
  """Get the IMS data store."""
  # Initialize the client
  client_ims = discoveryengine.IdentityMappingStoreServiceClient()

  #  Create the InlineSource object
  inline_source = discoveryengine.ImportIdentityMappingsRequest.InlineSource(
      identity_mapping_entries=id_mapping_data
  )

  # Create the main request object
  request_ims = discoveryengine.ImportIdentityMappingsRequest(
      identity_mapping_store=ims_store.name,
      inline_source=inline_source,
  )

  try:
    # Create the InlineSource object, which holds your list of entries
    operation = client_ims.import_identity_mappings(
        request=request_ims,
    )
    result = operation.result()
    return result

  except Exception as e:
    print(f"IMS Load Error: {e}")
    result = operation.result()
    return result

Fungsi load_ims_data menggunakan variabel kunci berikut:

  • ims_store: Objek discoveryengine.DataStore yang merepresentasikan penyimpanan pemetaan identitas tempat data akan dimuat.
  • id_mapping_data: Daftar objek discoveryengine.IdentityMappingEntry, yang masing-masing berisi identitas eksternal dan ID pengguna atau grup yang sesuai.
  • result: Nilai yang ditampilkan dari discoveryengine.DataStore.

Buat penyimpanan data

Untuk menggunakan konektor kustom, Anda harus menginisialisasi penyimpanan data untuk konten Anda. Gunakan default_collection untuk konektor kustom. Parameter IndustryVertical menyesuaikan perilaku penyimpanan data untuk kasus penggunaan tertentu. GENERIC cocok untuk sebagian besar skenario. Namun, Anda dapat memilih nilai yang berbeda untuk industri tertentu, seperti MEDIA atau HEALTHCARE_FHIR. Konfigurasi nama tampilan dan properti lainnya agar sesuai dengan konvensi dan persyaratan penamaan project Anda.

Python

def get_or_create_data_store(
    project_id: str,
    location: str,
    display_name: str,
    data_store_id: str,
    identity_mapping_store: str,
) -> discoveryengine.DataStore:
  """Get or create a DataStore."""
  client = discoveryengine.DataStoreServiceClient()
  ds_name = client.data_store_path(project_id, location, data_store_id)
  try:
    result = client.get_data_store(request={"name": ds_name})
    return result
  except:
    parent = client.collection_path(project_id, location, "default_collection")
    operation = client.create_data_store(
        request={
            "parent": parent,
            "data_store": discoveryengine.DataStore(
                display_name=display_name,
                acl_enabled=True,
                industry_vertical=discoveryengine.IndustryVertical.GENERIC,
                identity_mapping_store=identity_mapping_store,
            ),
            "data_store_id": data_store_id,
        }
    )
    result = operation.result()
    return result

Fungsi get_or_create_data_store menggunakan variabel kunci berikut:

  • project_id: ID Google Cloud project Anda.
  • location: Google Cloud Lokasi penyimpanan data.
  • display_name: Nama tampilan penyimpanan data yang dapat dibaca manusia.
  • data_store_id: ID unik untuk penyimpanan data.
  • identity_mapping_store: Nama resource penyimpanan pemetaan identitas yang akan diikat.
  • result: Nilai yang ditampilkan dari discoveryengine.DataStore.

Mengupload dokumen secara langsung

Untuk mengirim dokumen langsung ke Discovery Engine, gunakan upload inline. Metode ini menggunakan mode rekonsiliasi inkremental secara default dan tidak mendukung mode rekonsiliasi penuh. Dalam mode inkremental, dokumen baru ditambahkan dan dokumen yang ada diperbarui, tetapi dokumen yang tidak lagi ada di sumber tidak dihapus. Mode rekonsiliasi penuh menyinkronkan penyimpanan data dengan data sumber Anda, termasuk menghapus dokumen yang tidak lagi ada di sumber.

Penyelesaian inkremental sangat ideal untuk sistem seperti CRM yang menangani perubahan data kecil yang sering. Daripada menyinkronkan seluruh database, kirim hanya perubahan tertentu, sehingga prosesnya lebih cepat dan efisien. Sinkronisasi penuh masih dapat dilakukan secara berkala untuk menjaga integritas data secara keseluruhan.

Python

    def upload_documents_inline(
        project_id: str,
        location: str,
        data_store_id: str,
        branch_id: str,
        documents: List[discoveryengine.Document],
    ) -> discoveryengine.ImportDocumentsMetadata:
        """Inline import of Document messages."""
        client = discoveryengine.DocumentServiceClient()
        parent = client.branch_path(
            project=project_id,
            location=location,
            data_store=data_store_id,
            branch=branch_id,
        )
        request = discoveryengine.ImportDocumentsRequest(
            parent=parent,
            inline_source=discoveryengine.ImportDocumentsRequest.InlineSource(
                documents=documents,
            ),
        )
        operation = client.import_documents(request=request)
        operation.result()
        result = operation.metadata
        return result

Fungsi upload_documents_inline menggunakan variabel utama berikut:

  • project_id: ID Google Cloud project Anda.
  • location: Google Cloud Lokasi penyimpanan data.
  • data_store_id: ID penyimpanan data.
  • branch_id: ID cabang dalam penyimpanan data (biasanya "0").
  • documents: Daftar objek discoveryengine.Document yang akan diupload.
  • result: Nilai yang ditampilkan dari jenis discoveryengine.ImportDocumentsMetadata.

Memvalidasi konektor Anda

Untuk memvalidasi bahwa konektor Anda berfungsi seperti yang diharapkan, lakukan uji coba untuk memastikan aliran data yang tepat dari sumber ke Discovery Engine.

Python

    SITE = "https://altostrat.com"
    PROJECT_ID = "ucs-3p-connectors-testing"
    LOCATION = "global"
    IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
    DATA_STORE_ID = "my-acl-ds-id1"
    BRANCH_ID = "0"

    posts = fetch_posts(SITE)
    docs = convert_posts_to_documents(posts)
    print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")

    try:
      # Step #1: Retrieve an existing identity mapping store or create a new identity mapping store
      ims_store = get_or_create_ims_data_store(PROJECT_ID, LOCATION, IDENTITY_MAPPING_STORE_ID)
      print(f"STEP #1: IMS Store Retrieval/Creation: {ims_store}")

      RAW_IDENTITY_MAPPING_DATA = [
          discoveryengine.IdentityMappingEntry(
              external_identity="external_id_1",
              user_id="testuser1@example.com",
          ),
          discoveryengine.IdentityMappingEntry(
              external_identity="external_id_2",
              user_id="testuser2@example.com",
          ),
          discoveryengine.IdentityMappingEntry(
              external_identity="external_id_2",
              group_id="testgroup1@example.com",
          )
      ]

      # Step #2: Load IMS Data
      response = load_ims_data(ims_store, RAW_IDENTITY_MAPPING_DATA)
      print(
          "\nStep #2: Load Data in IMS Store successful.", response
      )

      # Step #3: Create Entity Data Store & Bind IMS Data Store
      data_store =  get_or_create_data_store(PROJECT_ID, LOCATION, "my-acl-datastore", DATA_STORE_ID, ims_store.name)
      print("\nStep #3: Entity Data Store Create Result: ", data_store)

      metadata = upload_documents_inline(
          PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, docs
      )
      print(f"Uploaded {metadata.success_count} documents inline.")

    except gcp_exceptions.GoogleAPICallError as e:
      print(f"\n--- API Call Failed ---")
      print(f"Server Error Message: {e.message}")
      print(f"Status Code: {e.code}")

    except Exception as e:
      print(f"An error occurred: {e}")

Pastikan kode konektor Anda menggunakan variabel utama berikut:

  • SITE: URL dasar sumber data pihak ketiga.
  • PROJECT_ID: Project ID Google Cloud Anda.
  • LOCATION: Google Cloud Lokasi untuk resource.
  • IDENTITY_MAPPING_STORE_ID: ID unik untuk Identity Mapping Store Anda.
  • DATA_STORE_ID: ID unik untuk penyimpanan data Anda.
  • BRANCH_ID: ID cabang dalam penyimpanan data.
  • posts: Menyimpan postingan yang diambil dari sumber pihak ketiga.
  • docs: Menyimpan dokumen yang dikonversi dalam format discoveryengine.Document.
  • ims_store: Objek discoveryengine.DataStore yang diambil atau dibuat untuk pemetaan identitas.
  • RAW_IDENTITY_MAPPING_DATA: Daftar objek discoveryengine.IdentityMappingEntry.

Output yang diharapkan:

Shell

  Fetched 20 posts and converted to 20 documents.
  STEP #1: IMS Store Retrieval/Creation: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17"
  Step #2: Load Data in IMS Store successful.
  Step #3: Entity Data Store Create Result: "projects/ <Project Number>/locations/global/collections/default_collection/dataStores/my-acl-ds-id1"
  display_name: "my-acl-datastore"
  industry_vertical: GENERIC
  create_time {
    seconds: 1760906997
    nanos: 192641000
  }
  default_schema_id: "default_schema"
  acl_enabled: true
  identity_mapping_store: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17".
  Uploaded 20 documents inline.

Anda juga dapat melihat penyimpanan data di konsol Google pada tahap ini: Google Cloud

Penyimpanan data konektor kustom
Penyimpanan data konektor kustom.

Buat konektor dengan Google Cloud Upload penyimpanan

Meskipun impor inline berfungsi dengan baik untuk pengembangan, konektor produksi harus menggunakan Google Cloud Storage untuk skalabilitas yang lebih baik dan mengaktifkan mode rekonsiliasi penuh. Pendekatan ini menangani set data besar secara efisien dan mendukung penghapusan otomatis dokumen yang tidak lagi ada di sumber data pihak ketiga.

Mengonversi dokumen ke JSONL

Untuk menyiapkan dokumen untuk impor massal ke Discovery Engine, konversikan dokumen ke format JSON Lines.

Python

    def convert_documents_to_jsonl(
        documents: List[discoveryengine.Document],
    ) -> str:
        """Serialize Document messages to JSONL."""
        return "\n".join(
            discoveryengine.Document.to_json(doc, indent=None)
            for doc in documents
        ) + "\n"

Fungsi convert_documents_to_jsonl menggunakan variabel berikut:

  • documents: Daftar objek discoveryengine.Document yang akan dikonversi.

Upload ke Google Cloud Storage

Untuk mengaktifkan impor massal yang efisien, siapkan data Anda di Google Cloud Storage.

Python

    def upload_jsonl_to_gcs(jsonl: str, bucket_name: str, blob_name: str) -> str:
        """Upload JSONL content to Google Cloud Storage."""
        client = storage.Client()
        bucket = client.bucket(bucket_name)
        blob = bucket.blob(blob_name)
        blob.upload_from_string(jsonl, content_type="application/json")
        return f"gs://{bucket_name}/{blob_name}"

Fungsi upload_jsonl_to_gcs menggunakan variabel kunci berikut:

  • jsonl: Konten string berformat JSONL yang akan diupload.
  • bucket_name: Nama bucket Storage. Google Cloud
  • blob_name: Nama blob (objek) dalam bucket yang ditentukan.

Mengimpor dari Google Cloud Storage dengan rekonsiliasi penuh

Untuk melakukan sinkronisasi data lengkap menggunakan mode rekonsiliasi penuh, gunakan metode ini. Hal ini memastikan penyimpanan data Anda mencerminkan sumber data pihak ketiga secara persis, dengan otomatis menghapus dokumen apa pun yang tidak ada lagi.

Python

    def import_documents_from_gcs(
        project_id: str,
        location: str,
        data_store_id: str,
        branch_id: str,
        gcs_uri: str,
    ) -> discoveryengine.ImportDocumentsMetadata:
        """Bulk-import documents from Google Cloud Storage with FULL reconciliation mode."""
        client = discoveryengine.DocumentServiceClient()
        parent = client.branch_path(
            project=project_id,
            location=location,
            data_store=data_store_id,
            branch=branch_id,
        )
        gcs_source = discoveryengine.GcsSource(input_uris=[gcs_uri])
        request = discoveryengine.ImportDocumentsRequest(
            parent=parent,
            gcs_source=gcs_source,
            reconciliation_mode=
                discoveryengine.ImportDocumentsRequest
                .ReconciliationMode.FULL,
        )
        operation = client.import_documents(request=request)
        operation.result()
        return operation.metadata

Fungsi import_documents_from_gcs menggunakan variabel kunci berikut:

  • project_id: ID Google Cloud project Anda.
  • location: Google Cloud Lokasi penyimpanan data.
  • data_store_id: ID penyimpanan data.
  • branch_id: ID cabang dalam penyimpanan data (biasanya "0").
  • gcs_uri: The Google Cloud Storage URI yang mengarah ke file JSONL.

Menguji upload Google Cloud Storage

Untuk memverifikasi alur kerja impor berbasis Google Cloud Storage, jalankan perintah berikut:

Python

  BUCKET = "your-existing-bucket"
  BLOB = "path-to-any-blob/wp/posts.jsonl"
  SITE = "https://altostrat.com"
  PROJECT_ID = "ucs-3p-connectors-testing"
  LOCATION = "global"
  IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
  DATA_STORE_ID = "your-data-store-id"
  BRANCH_ID = "0"
  jsonl_payload = convert_documents_to_jsonl(docs)
  gcs_uri = upload_jsonl_to_gcs(jsonl_payload, BUCKET, BLOB)
  posts = fetch_posts(SITE)
  docs = convert_posts_to_documents(posts)
  print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")
  print("Uploaded to:", gcs_uri)

  metadata = import_documents_from_gcs(
      PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, gcs_uri
  )
  print(f"Imported: {metadata.success_count} documents")

Variabel utama berikut digunakan dalam pengujian upload Google Cloud Storage:

  • BUCKET: Nama bucket Storage. Google Cloud
  • BLOB: Jalur ke blob dalam bucket.
  • SITE: URL dasar sumber data pihak ketiga.
  • PROJECT_ID: Project ID Google Cloud Anda.
  • LOCATION: Google Cloud Lokasi untuk resource (misalnya, "global").
  • IDENTITY_MAPPING_STORE_ID: ID unik untuk Identity Mapping Store Anda.
  • DATA_STORE_ID: ID unik untuk penyimpanan data Anda.
  • BRANCH_ID: ID cabang dalam penyimpanan data (biasanya "0").
  • jsonl_payload: Dokumen yang dikonversi ke format JSONL.
  • gcs_uri: The Google Cloud Storage URI dari file JSONL yang diupload.

Output yang diharapkan:

Shell

    Fetched 20 posts and converted to 20 documents.
    Uploaded to: gs://alex-de-bucket/wp/posts.jsonl
    Imported: 20 documents

Kelola izin

Untuk mengelola akses tingkat dokumen di lingkungan perusahaan, Gemini Enterprise mendukung Daftar Kontrol Akses (ACL) dan pemetaan identitas, yang membantu membatasi konten yang dapat dilihat pengguna.

Mengaktifkan ACL di penyimpanan data

Untuk mengaktifkan ACL saat membuat penyimpanan data, jalankan perintah berikut:

Python

  # get_or_create_data_store()
  "data_store": discoveryengine.DataStore(
      display_name=data_store_id,
      industry_vertical=discoveryengine.IndustryVertical.GENERIC,
      acl_enabled=True, # ADDED
  )

Menambahkan ACL ke dokumen

Untuk menghitung dan menyertakan AclInfo saat mengubah dokumen, jalankan perintah berikut:

Python

  # convert_posts_to_documents()
  doc = discoveryengine.Document(
      id=str(post["id"]),
      json_data=json.dumps(payload),
      acl_info=discoveryengine.Document.AclInfo(
          readers=[{
              "principals": [
                  {"user_id": "baklavainthebalkans@gmail.com"},
                  {"user_id": "cloudysanfrancisco@gmail.com"}
              ]
          }]
      ),
  )

Membuat konten tersedia untuk publik

Untuk membuat dokumen dapat diakses secara publik, tetapkan kolom readers sebagai berikut:

Python

  readers=[{"idp_wide": True}]

Memvalidasi ACL

Untuk memvalidasi bahwa konfigurasi ACL Anda berfungsi seperti yang diharapkan, pertimbangkan hal berikut:

  • Lakukan penelusuran sebagai pengguna yang tidak memiliki akses ke dokumen.

  • Periksa struktur dokumen yang diupload di Cloud Storage dan bandingkan dengan referensi.

JSON

  {
    "id": "108",
    "jsonData": "{...}",
    "aclInfo": {
      "readers": [
        {
          "principals": [
            { "userId": "baklavainthebalkans@gmail.com" },
            { "userId": "cloudysanfrancisco@gmail.com" }
          ],
          "idpWide": false
        }
      ]
    }
  }

Menggunakan pemetaan identitas

Gunakan pemetaan identitas untuk skenario berikut:

  • Sumber data pihak ketiga Anda menggunakan identitas non-Google

  • Anda ingin mereferensikan grup kustom (misalnya, wp-admins) dan bukan pengguna individual

  • API Anda hanya menampilkan nama grup

  • Anda perlu mengelompokkan pengguna secara manual untuk skala atau konsistensi

Untuk melakukan pemetaan identitas, ikuti langkah-langkah berikut:

  1. Buat dan tautkan penyimpanan data identitas.
  2. Mengimpor identitas eksternal (Misalnya, external_group:wp-admins). Jangan sertakan awalan external_group: saat mengimpor, misalnya:

    JSON

      {
        "externalIdentity": "wp-admins",
        "userId": "user@example.com"
      }
    
  3. Di info ACL dokumen Anda, tentukan ID entitas eksternal di principal identifier. Saat mereferensikan grup kustom, gunakan awalan external_group: di kolom groupId.

  4. Awalan external_group: diperlukan untuk ID grup dalam info ACL dokumen selama impor, tetapi tidak digunakan saat mengimpor identitas ke penyimpanan pemetaan. Contoh dokumen dengan pemetaan identitas:

    JSON

      {
        "id": "108",
        "aclInfo": {
          "readers": [
            {
              "principals": [
                {
                  "userId": "cloudysanfrancisco@gmail.com"
                },
                {
                  "groupId": "external_group:wp-admins"
                }
              ]
            }
          ]
        },
        "structData": {
          "id": 108,
          "date": "2025-04-24T18:16:04",
          ...
        }
      }
    

Langkah berikutnya

Library klien Gemini Enterprise