Halaman ini menjelaskan cara membuat konektor kustom.
Sebelum memulai
Sebelum memulai, pastikan Anda memiliki hal berikut:
Periksa apakah penagihan diaktifkan di project Google Cloud Anda.
Instal dan lakukan inisialisasi CLI Google Cloud . Pastikan akun tersebut diautentikasi untuk project Anda.
Dapatkan akses administrator Discovery Engine untuk Google Cloud project Anda.
Dapatkan kredensial akses untuk sumber data pihak ketiga Anda (seperti kunci API atau autentikasi database).
Buat rencana pemetaan data yang jelas. Hal ini harus mencakup kolom mana yang akan diindeks dan cara merepresentasikan kontrol akses, termasuk identitas pihak ketiga.
Membuat konektor dasar
Bagian ini menunjukkan cara membuat konektor kustom dalam bahasa yang Anda pilih. Prinsip dan pola yang ditampilkan di sini berlaku untuk sistem eksternal apa pun. Cukup sesuaikan panggilan API dan transformasi data untuk sumber tertentu dalam bahasa yang Anda pilih guna membuat konektor dasar.
Mengambil data
Untuk memulai, ambil data dari sumber data pihak ketiga Anda. Dalam contoh ini, kami menunjukkan cara mengambil postingan menggunakan penomoran halaman. Untuk lingkungan produksi, sebaiknya gunakan pendekatan streaming untuk set data besar. Hal ini mencegah masalah memori yang dapat terjadi saat memuat semua data sekaligus.
Python
def fetch_posts(base_url: str, per_page: int = 15) -> List[dict]:
#Fetch all posts from the given site.#
url = base_url.rstrip("/") + "/wp-json/wp/v2/posts"
posts: List[dict] = []
page = 1
while True:
resp = requests.get(
url,
params={"page": page, "per_page": per_page},
)
resp.raise_for_status()
batch = resp.json()
posts.extend(batch)
if len(batch) < per_page:
break
page += 1
return posts
Mentransformasi data
Untuk mengonversi data sumber ke format dokumen Discovery Engine, susun data tersebut seperti yang ditunjukkan dalam payload contoh berikut. Anda dapat menyertakan pasangan nilai kunci sebanyak yang diperlukan. Misalnya, Anda dapat menyertakan konten lengkap untuk penelusuran komprehensif. Atau, Anda dapat menyertakan kolom terstruktur untuk penelusuran berfaset, atau kombinasi keduanya.
Python
def convert_posts_to_documents(posts: List[dict]) -> List[discoveryengine.Document]:
# Convert WP posts into Discovery Engine Document messages.
docs: List[discoveryengine.Document] = []
for post in posts:
payload = {
"title": post.get("title", {}).get("rendered"),
"body": post.get("content", {}).get("rendered"),
"url": post.get("link"),
"author": post.get("author"),
"categories": post.get("categories"),
"tags": post.get("tags"),
"date": post.get("date"),
}
doc = discoveryengine.Document(
id=str(post["id"]),
json_data=json.dumps(payload),
)
docs.append(doc)
return docs
Mengambil atau membuat penyimpanan identitas
Untuk mengelola identitas dan grup pengguna untuk kontrol akses, Anda harus mengambil atau membuat penyimpanan identitas. Fungsi ini mendapatkan penyimpanan identitas yang ada berdasarkan ID, project, dan lokasinya. Jika penyimpanan identitas tidak ada, penyimpanan identitas baru yang kosong akan dibuat dan ditampilkan.
Python
def get_or_create_ims_data_store(
project_id: str,
location: str,
identity_mapping_store_id: str,
) -> discoveryengine.DataStore:
"""Get or create a DataStore."""
# Initialize the client
client_ims = discoveryengine.IdentityMappingStoreServiceClient()
# Construct the parent resource name
parent_ims = client_ims.location_path(project=project_id, location=location)
try:
# Create the request object
name = f"projects/{project_id}/locations/{location}/identityMappingStores/{identity_mapping_store_id}"
request = discoveryengine.GetIdentityMappingStoreRequest(
name=name,
)
return client_ims.get_identity_mapping_store(request=request)
except:
# Create the IdentityMappingStore object (it can be empty for basic creation)
identity_mapping_store = discoveryengine.IdentityMappingStore()
# Create the request object
request = discoveryengine.CreateIdentityMappingStoreRequest(
parent=parent_ims,
identity_mapping_store=identity_mapping_store,
identity_mapping_store_id=identity_mapping_store_id,
)
return client_ims.create_identity_mapping_store(request=request)
Fungsi get_or_create_ims_data_store menggunakan variabel kunci berikut:
project_id: ID Google Cloud project Anda.location: Google Cloud Lokasi untuk penyimpanan pemetaan identitas.identity_mapping_store_id: ID unik untuk penyimpanan identitas.client_ims: Instance discoveryengine.IdentityMappingStoreServiceClient yang digunakan untuk berinteraksi dengan Identity Store API.parent_ims: Nama resource lokasi induk, dibuat menggunakan client_ims.location_path.name: Nama lengkap resource penyimpanan pemetaan identitas, yang digunakan untuk GetIdentityMappingStoreRequest.
Menyerap pemetaan identitas ke dalam penyimpanan identitas
Untuk memuat entri pemetaan identitas ke penyimpanan identitas yang ditentukan, gunakan fungsi ini. Metode ini mengambil daftar entri pemetaan identitas dan memulai operasi impor inline. Hal ini sangat penting untuk membangun hubungan identitas pengguna, grup, dan eksternal yang diperlukan untuk kontrol akses dan personalisasi.
Python
def load_ims_data(
ims_store: discoveryengine.DataStore,
id_mapping_data: list[discoveryengine.IdentityMappingEntry],
) -> discoveryengine.DataStore:
"""Get the IMS data store."""
# Initialize the client
client_ims = discoveryengine.IdentityMappingStoreServiceClient()
# Create the InlineSource object
inline_source = discoveryengine.ImportIdentityMappingsRequest.InlineSource(
identity_mapping_entries=id_mapping_data
)
# Create the main request object
request_ims = discoveryengine.ImportIdentityMappingsRequest(
identity_mapping_store=ims_store.name,
inline_source=inline_source,
)
try:
# Create the InlineSource object, which holds your list of entries
operation = client_ims.import_identity_mappings(
request=request_ims,
)
result = operation.result()
return result
except Exception as e:
print(f"IMS Load Error: {e}")
result = operation.result()
return result
Fungsi load_ims_data menggunakan variabel kunci berikut:
ims_store: Objek discoveryengine.DataStore yang merepresentasikan penyimpanan pemetaan identitas tempat data akan dimuat.id_mapping_data: Daftar objek discoveryengine.IdentityMappingEntry, yang masing-masing berisi identitas eksternal dan ID pengguna atau grup yang sesuai.result: Nilai yang ditampilkan dari discoveryengine.DataStore.
Buat penyimpanan data
Untuk menggunakan konektor kustom, Anda harus menginisialisasi penyimpanan data untuk konten Anda. Gunakan default_collection untuk konektor kustom. Parameter IndustryVertical menyesuaikan perilaku penyimpanan data untuk kasus penggunaan tertentu. GENERIC cocok untuk sebagian besar skenario. Namun, Anda dapat memilih nilai yang berbeda untuk industri tertentu, seperti MEDIA atau HEALTHCARE_FHIR. Konfigurasi nama tampilan dan properti lainnya agar sesuai dengan konvensi dan persyaratan penamaan project Anda.
Python
def get_or_create_data_store(
project_id: str,
location: str,
display_name: str,
data_store_id: str,
identity_mapping_store: str,
) -> discoveryengine.DataStore:
"""Get or create a DataStore."""
client = discoveryengine.DataStoreServiceClient()
ds_name = client.data_store_path(project_id, location, data_store_id)
try:
result = client.get_data_store(request={"name": ds_name})
return result
except:
parent = client.collection_path(project_id, location, "default_collection")
operation = client.create_data_store(
request={
"parent": parent,
"data_store": discoveryengine.DataStore(
display_name=display_name,
acl_enabled=True,
industry_vertical=discoveryengine.IndustryVertical.GENERIC,
identity_mapping_store=identity_mapping_store,
),
"data_store_id": data_store_id,
}
)
result = operation.result()
return result
Fungsi get_or_create_data_store menggunakan variabel kunci berikut:
project_id: ID Google Cloud project Anda.location: Google Cloud Lokasi penyimpanan data.display_name: Nama tampilan penyimpanan data yang dapat dibaca manusia.data_store_id: ID unik untuk penyimpanan data.identity_mapping_store: Nama resource penyimpanan pemetaan identitas yang akan diikat.result: Nilai yang ditampilkan dari discoveryengine.DataStore.
Mengupload dokumen secara langsung
Untuk mengirim dokumen langsung ke Discovery Engine, gunakan upload inline. Metode ini menggunakan mode rekonsiliasi inkremental secara default dan tidak mendukung mode rekonsiliasi penuh. Dalam mode inkremental, dokumen baru ditambahkan dan dokumen yang ada diperbarui, tetapi dokumen yang tidak lagi ada di sumber tidak dihapus. Mode rekonsiliasi penuh menyinkronkan penyimpanan data dengan data sumber Anda, termasuk menghapus dokumen yang tidak lagi ada di sumber.
Penyelesaian inkremental sangat ideal untuk sistem seperti CRM yang menangani perubahan data kecil yang sering. Daripada menyinkronkan seluruh database, kirim hanya perubahan tertentu, sehingga prosesnya lebih cepat dan efisien. Sinkronisasi penuh masih dapat dilakukan secara berkala untuk menjaga integritas data secara keseluruhan.
Python
def upload_documents_inline(
project_id: str,
location: str,
data_store_id: str,
branch_id: str,
documents: List[discoveryengine.Document],
) -> discoveryengine.ImportDocumentsMetadata:
"""Inline import of Document messages."""
client = discoveryengine.DocumentServiceClient()
parent = client.branch_path(
project=project_id,
location=location,
data_store=data_store_id,
branch=branch_id,
)
request = discoveryengine.ImportDocumentsRequest(
parent=parent,
inline_source=discoveryengine.ImportDocumentsRequest.InlineSource(
documents=documents,
),
)
operation = client.import_documents(request=request)
operation.result()
result = operation.metadata
return result
Fungsi upload_documents_inline menggunakan variabel utama berikut:
project_id: ID Google Cloud project Anda.location: Google Cloud Lokasi penyimpanan data.data_store_id: ID penyimpanan data.branch_id: ID cabang dalam penyimpanan data (biasanya "0").documents: Daftar objek discoveryengine.Document yang akan diupload.result: Nilai yang ditampilkan dari jenis discoveryengine.ImportDocumentsMetadata.
Memvalidasi konektor Anda
Untuk memvalidasi bahwa konektor Anda berfungsi seperti yang diharapkan, lakukan uji coba untuk memastikan aliran data yang tepat dari sumber ke Discovery Engine.
Python
SITE = "https://altostrat.com"
PROJECT_ID = "ucs-3p-connectors-testing"
LOCATION = "global"
IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
DATA_STORE_ID = "my-acl-ds-id1"
BRANCH_ID = "0"
posts = fetch_posts(SITE)
docs = convert_posts_to_documents(posts)
print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")
try:
# Step #1: Retrieve an existing identity mapping store or create a new identity mapping store
ims_store = get_or_create_ims_data_store(PROJECT_ID, LOCATION, IDENTITY_MAPPING_STORE_ID)
print(f"STEP #1: IMS Store Retrieval/Creation: {ims_store}")
RAW_IDENTITY_MAPPING_DATA = [
discoveryengine.IdentityMappingEntry(
external_identity="external_id_1",
user_id="testuser1@example.com",
),
discoveryengine.IdentityMappingEntry(
external_identity="external_id_2",
user_id="testuser2@example.com",
),
discoveryengine.IdentityMappingEntry(
external_identity="external_id_2",
group_id="testgroup1@example.com",
)
]
# Step #2: Load IMS Data
response = load_ims_data(ims_store, RAW_IDENTITY_MAPPING_DATA)
print(
"\nStep #2: Load Data in IMS Store successful.", response
)
# Step #3: Create Entity Data Store & Bind IMS Data Store
data_store = get_or_create_data_store(PROJECT_ID, LOCATION, "my-acl-datastore", DATA_STORE_ID, ims_store.name)
print("\nStep #3: Entity Data Store Create Result: ", data_store)
metadata = upload_documents_inline(
PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, docs
)
print(f"Uploaded {metadata.success_count} documents inline.")
except gcp_exceptions.GoogleAPICallError as e:
print(f"\n--- API Call Failed ---")
print(f"Server Error Message: {e.message}")
print(f"Status Code: {e.code}")
except Exception as e:
print(f"An error occurred: {e}")
Pastikan kode konektor Anda menggunakan variabel utama berikut:
SITE: URL dasar sumber data pihak ketiga.PROJECT_ID: Project ID Google Cloud Anda.LOCATION: Google Cloud Lokasi untuk resource.IDENTITY_MAPPING_STORE_ID: ID unik untuk Identity Mapping Store Anda.DATA_STORE_ID: ID unik untuk penyimpanan data Anda.BRANCH_ID: ID cabang dalam penyimpanan data.posts: Menyimpan postingan yang diambil dari sumber pihak ketiga.docs: Menyimpan dokumen yang dikonversi dalam format discoveryengine.Document.ims_store: Objek discoveryengine.DataStore yang diambil atau dibuat untuk pemetaan identitas.RAW_IDENTITY_MAPPING_DATA: Daftar objek discoveryengine.IdentityMappingEntry.
Output yang diharapkan:
Shell
Fetched 20 posts and converted to 20 documents.
STEP #1: IMS Store Retrieval/Creation: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17"
Step #2: Load Data in IMS Store successful.
Step #3: Entity Data Store Create Result: "projects/ <Project Number>/locations/global/collections/default_collection/dataStores/my-acl-ds-id1"
display_name: "my-acl-datastore"
industry_vertical: GENERIC
create_time {
seconds: 1760906997
nanos: 192641000
}
default_schema_id: "default_schema"
acl_enabled: true
identity_mapping_store: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17".
Uploaded 20 documents inline.
Anda juga dapat melihat penyimpanan data di konsol Google pada tahap ini: Google Cloud
Buat konektor dengan Google Cloud Upload penyimpanan
Meskipun impor inline berfungsi dengan baik untuk pengembangan, konektor produksi harus menggunakan Google Cloud Storage untuk skalabilitas yang lebih baik dan mengaktifkan mode rekonsiliasi penuh. Pendekatan ini menangani set data besar secara efisien dan mendukung penghapusan otomatis dokumen yang tidak lagi ada di sumber data pihak ketiga.
Mengonversi dokumen ke JSONL
Untuk menyiapkan dokumen untuk impor massal ke Discovery Engine, konversikan dokumen ke format JSON Lines.
Python
def convert_documents_to_jsonl(
documents: List[discoveryengine.Document],
) -> str:
"""Serialize Document messages to JSONL."""
return "\n".join(
discoveryengine.Document.to_json(doc, indent=None)
for doc in documents
) + "\n"
Fungsi convert_documents_to_jsonl menggunakan variabel berikut:
documents: Daftar objek discoveryengine.Document yang akan dikonversi.
Upload ke Google Cloud Storage
Untuk mengaktifkan impor massal yang efisien, siapkan data Anda di Google Cloud Storage.
Python
def upload_jsonl_to_gcs(jsonl: str, bucket_name: str, blob_name: str) -> str:
"""Upload JSONL content to Google Cloud Storage."""
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.upload_from_string(jsonl, content_type="application/json")
return f"gs://{bucket_name}/{blob_name}"
Fungsi upload_jsonl_to_gcs menggunakan variabel kunci berikut:
jsonl: Konten string berformat JSONL yang akan diupload.bucket_name: Nama bucket Storage. Google Cloudblob_name: Nama blob (objek) dalam bucket yang ditentukan.
Mengimpor dari Google Cloud Storage dengan rekonsiliasi penuh
Untuk melakukan sinkronisasi data lengkap menggunakan mode rekonsiliasi penuh, gunakan metode ini. Hal ini memastikan penyimpanan data Anda mencerminkan sumber data pihak ketiga secara persis, dengan otomatis menghapus dokumen apa pun yang tidak ada lagi.
Python
def import_documents_from_gcs(
project_id: str,
location: str,
data_store_id: str,
branch_id: str,
gcs_uri: str,
) -> discoveryengine.ImportDocumentsMetadata:
"""Bulk-import documents from Google Cloud Storage with FULL reconciliation mode."""
client = discoveryengine.DocumentServiceClient()
parent = client.branch_path(
project=project_id,
location=location,
data_store=data_store_id,
branch=branch_id,
)
gcs_source = discoveryengine.GcsSource(input_uris=[gcs_uri])
request = discoveryengine.ImportDocumentsRequest(
parent=parent,
gcs_source=gcs_source,
reconciliation_mode=
discoveryengine.ImportDocumentsRequest
.ReconciliationMode.FULL,
)
operation = client.import_documents(request=request)
operation.result()
return operation.metadata
Fungsi import_documents_from_gcs menggunakan variabel kunci berikut:
project_id: ID Google Cloud project Anda.location: Google Cloud Lokasi penyimpanan data.data_store_id: ID penyimpanan data.branch_id: ID cabang dalam penyimpanan data (biasanya "0").gcs_uri: The Google Cloud Storage URI yang mengarah ke file JSONL.
Menguji upload Google Cloud Storage
Untuk memverifikasi alur kerja impor berbasis Google Cloud Storage, jalankan perintah berikut:
Python
BUCKET = "your-existing-bucket"
BLOB = "path-to-any-blob/wp/posts.jsonl"
SITE = "https://altostrat.com"
PROJECT_ID = "ucs-3p-connectors-testing"
LOCATION = "global"
IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
DATA_STORE_ID = "your-data-store-id"
BRANCH_ID = "0"
jsonl_payload = convert_documents_to_jsonl(docs)
gcs_uri = upload_jsonl_to_gcs(jsonl_payload, BUCKET, BLOB)
posts = fetch_posts(SITE)
docs = convert_posts_to_documents(posts)
print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")
print("Uploaded to:", gcs_uri)
metadata = import_documents_from_gcs(
PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, gcs_uri
)
print(f"Imported: {metadata.success_count} documents")
Variabel utama berikut digunakan dalam pengujian upload Google Cloud Storage:
BUCKET: Nama bucket Storage. Google CloudBLOB: Jalur ke blob dalam bucket.SITE: URL dasar sumber data pihak ketiga.PROJECT_ID: Project ID Google Cloud Anda.LOCATION: Google Cloud Lokasi untuk resource (misalnya, "global").IDENTITY_MAPPING_STORE_ID: ID unik untuk Identity Mapping Store Anda.DATA_STORE_ID: ID unik untuk penyimpanan data Anda.BRANCH_ID: ID cabang dalam penyimpanan data (biasanya "0").jsonl_payload: Dokumen yang dikonversi ke format JSONL.gcs_uri: The Google Cloud Storage URI dari file JSONL yang diupload.
Output yang diharapkan:
Shell
Fetched 20 posts and converted to 20 documents.
Uploaded to: gs://alex-de-bucket/wp/posts.jsonl
Imported: 20 documents
Kelola izin
Untuk mengelola akses tingkat dokumen di lingkungan perusahaan, Gemini Enterprise mendukung Daftar Kontrol Akses (ACL) dan pemetaan identitas, yang membantu membatasi konten yang dapat dilihat pengguna.
Mengaktifkan ACL di penyimpanan data
Untuk mengaktifkan ACL saat membuat penyimpanan data, jalankan perintah berikut:
Python
# get_or_create_data_store()
"data_store": discoveryengine.DataStore(
display_name=data_store_id,
industry_vertical=discoveryengine.IndustryVertical.GENERIC,
acl_enabled=True, # ADDED
)
Menambahkan ACL ke dokumen
Untuk menghitung dan menyertakan AclInfo saat mengubah dokumen, jalankan perintah berikut:
Python
# convert_posts_to_documents()
doc = discoveryengine.Document(
id=str(post["id"]),
json_data=json.dumps(payload),
acl_info=discoveryengine.Document.AclInfo(
readers=[{
"principals": [
{"user_id": "baklavainthebalkans@gmail.com"},
{"user_id": "cloudysanfrancisco@gmail.com"}
]
}]
),
)
Membuat konten tersedia untuk publik
Untuk membuat dokumen dapat diakses secara publik, tetapkan kolom readers sebagai berikut:
Python
readers=[{"idp_wide": True}]
Memvalidasi ACL
Untuk memvalidasi bahwa konfigurasi ACL Anda berfungsi seperti yang diharapkan, pertimbangkan hal berikut:
Lakukan penelusuran sebagai pengguna yang tidak memiliki akses ke dokumen.
Periksa struktur dokumen yang diupload di Cloud Storage dan bandingkan dengan referensi.
JSON
{
"id": "108",
"jsonData": "{...}",
"aclInfo": {
"readers": [
{
"principals": [
{ "userId": "baklavainthebalkans@gmail.com" },
{ "userId": "cloudysanfrancisco@gmail.com" }
],
"idpWide": false
}
]
}
}
Menggunakan pemetaan identitas
Gunakan pemetaan identitas untuk skenario berikut:
Sumber data pihak ketiga Anda menggunakan identitas non-Google
Anda ingin mereferensikan grup kustom (misalnya, wp-admins) dan bukan pengguna individual
API Anda hanya menampilkan nama grup
Anda perlu mengelompokkan pengguna secara manual untuk skala atau konsistensi
Untuk melakukan pemetaan identitas, ikuti langkah-langkah berikut:
- Buat dan tautkan penyimpanan data identitas.
Mengimpor identitas eksternal (Misalnya, external_group:wp-admins). Jangan sertakan awalan external_group: saat mengimpor, misalnya:
JSON
{ "externalIdentity": "wp-admins", "userId": "user@example.com" }Di info ACL dokumen Anda, tentukan ID entitas eksternal di
principal identifier. Saat mereferensikan grup kustom, gunakan awalanexternal_group:di kolomgroupId.Awalan
external_group:diperlukan untuk ID grup dalam info ACL dokumen selama impor, tetapi tidak digunakan saat mengimpor identitas ke penyimpanan pemetaan. Contoh dokumen dengan pemetaan identitas:JSON
{ "id": "108", "aclInfo": { "readers": [ { "principals": [ { "userId": "cloudysanfrancisco@gmail.com" }, { "groupId": "external_group:wp-admins" } ] } ] }, "structData": { "id": 108, "date": "2025-04-24T18:16:04", ... } }
Langkah berikutnya
- Untuk menyediakan antarmuka pengguna dalam mengkueri data Anda, buat aplikasi di Gemini Enterprise dan hubungkan ke penyimpanan data Konektor kustom yang ada.