Streaming perubahan Bigtable ke template Vector Search

Template ini membuat pipeline streaming untuk melakukan streaming data Bigtable mengubah rekaman dan menuliskannya ke Vertex AI Vector Search menggunakan Dataflow Runner V2.

Persyaratan pipeline

  • Instance sumber Bigtable harus ada.
  • Tabel sumber Bigtable harus ada, dan tabel harus mengaktifkan aliran perubahan.
  • Profil aplikasi Bigtable harus ada.
  • Jalur indeks Vector Search harus ada.

Parameter template

Parameter yang diperlukan

  • embeddingColumn: Nama kolom yang sepenuhnya memenuhi syarat tempat embedding disimpan. Dalam format cf:col.
  • embeddingByteSize: Ukuran byte setiap entri dalam array embedding. Gunakan 4 untuk Float, dan 8 untuk Double. Nilai defaultnya adalah: 4.
  • vectorSearchIndex: Indeks Penelusuran Vektor tempat perubahan akan di-streaming, dalam format 'projects/{projectID}/locations/{region}/indexes/{indexID}' (tanpa spasi di awal atau akhir) Misalnya, projects/123/locations/us-east1/indexes/456.
  • bigtableChangeStreamAppProfile: ID profil aplikasi Bigtable. Profil aplikasi harus menggunakan perutean cluster tunggal dan mengizinkan transaksi baris tunggal.
  • bigtableReadInstanceId: ID instance Bigtable sumber.
  • bigtableReadTableId: ID tabel Bigtable sumber.

Parameter opsional

  • bigtableMetadataTableTableId: ID tabel yang digunakan untuk membuat tabel metadata.
  • crowdingTagColumn: Nama kolom yang sepenuhnya memenuhi syarat tempat tag keramaian disimpan. Dalam format cf:col.
  • allowRestrictsMappings: Nama kolom yang sepenuhnya memenuhi syarat yang dipisahkan koma dari kolom yang harus digunakan sebagai pembatasan allow, dengan aliasnya. Dalam format cf:col->alias.
  • denyRestrictsMappings: Nama kolom yang sepenuhnya memenuhi syarat dan dipisahkan koma dari kolom yang harus digunakan sebagai pembatasan deny, dengan aliasnya. Dalam format cf:col->alias.
  • intNumericRestrictsMappings: Nama kolom yang sepenuhnya memenuhi syarat yang dipisahkan koma dari kolom yang harus digunakan sebagai numeric_restricts bilangan bulat, dengan aliasnya. Dalam format cf:col->alias.
  • floatNumericRestrictsMappings: Nama kolom yang sepenuhnya memenuhi syarat dan dipisahkan koma dari kolom yang harus digunakan sebagai numeric_restricts float (4 byte), dengan aliasnya. Dalam format cf:col->alias.
  • doubleNumericRestrictsMappings: Nama kolom yang sepenuhnya memenuhi syarat dan dipisahkan koma dari kolom yang harus digunakan sebagai numeric_restricts ganda (8 byte), dengan aliasnya. Dalam format cf:col->alias.
  • upsertMaxBatchSize: Jumlah maksimum upsert yang akan di-buffer sebelum meng-upsert batch ke Indeks Vector Search. Batch akan dikirim saat ada upsertBatchSize data yang siap, atau waktu tunggu upsertBatchDelay untuk data apa pun telah berlalu. Contoh, 10. Nilai defaultnya adalah: 10.
  • upsertMaxBufferDuration: Penundaan maksimum sebelum batch upsert dikirim ke Vector Search.Batch akan dikirim saat ada upsertBatchSize data yang siap, atau waktu tunggu upsertBatchDelay telah berlalu untuk data apa pun. Format yang diizinkan adalah: Ns (untuk detik, contoh: 5s), Nm (untuk menit, contoh: 12m), Nh (untuk jam, contoh: 2h). Contoh, 10s. Nilai defaultnya adalah: 10 detik.
  • deleteMaxBatchSize: Jumlah maksimum penghapusan yang akan di-buffer sebelum menghapus batch dari Indeks Penelusuran Vektor. Batch akan dikirim saat ada deleteBatchSize data yang siap, atau waktu tunggu deleteBatchDelay untuk data apa pun telah berlalu. Contoh, 10. Nilai defaultnya adalah: 10.
  • deleteMaxBufferDuration: Penundaan maksimum sebelum batch penghapusan dikirim ke Vector Search.Batch akan dikirim saat ada deleteBatchSize rekaman yang siap, atau waktu deleteBatchDelay telah berlalu untuk rekaman apa pun. Format yang diizinkan adalah: Ns (untuk detik, contoh: 5s), Nm (untuk menit, contoh: 12m), Nh (untuk jam, contoh: 2h). Contoh, 10s. Nilai defaultnya adalah: 10 detik.
  • dlqDirectory: Jalur untuk menyimpan semua data yang belum diproses beserta alasan kegagalan pemrosesannya. Defaultnya adalah direktori di bawah lokasi sementara tugas Dataflow. Nilai default sudah cukup dalam sebagian besar kondisi.
  • bigtableChangeStreamMetadataInstanceId: ID instance metadata aliran perubahan Bigtable. Nilai defaultnya adalah kosong.
  • bigtableChangeStreamMetadataTableTableId: ID tabel metadata konektor aliran perubahan data Bigtable. Jika tidak diberikan, tabel metadata konektor aliran perubahan Bigtable akan dibuat secara otomatis selama eksekusi pipeline. Nilai defaultnya adalah kosong.
  • bigtableChangeStreamCharset: Nama charset aliran perubahan Bigtable. Nilai defaultnya: UTF-8.
  • bigtableChangeStreamStartTimestamp: Stempel waktu awal (https://tools.ietf.org/html/rfc3339), inklusif, yang akan digunakan untuk membaca aliran perubahan. Misalnya, 2022-05-05T07:59:59Z. Default-nya adalah stempel waktu waktu mulai pipeline.
  • bigtableChangeStreamIgnoreColumnFamilies: Daftar perubahan nama grup kolom yang dipisahkan koma untuk diabaikan. Nilai defaultnya adalah kosong.
  • bigtableChangeStreamIgnoreColumns: Daftar perubahan nama kolom yang dipisahkan koma untuk diabaikan. Contoh: "cf1:col1,cf2:col2". Nilai defaultnya adalah kosong.
  • bigtableChangeStreamName: Nama unik untuk pipeline klien. Memungkinkan Anda melanjutkan pemrosesan dari titik saat pipeline yang sebelumnya berjalan berhenti. Secara default, nama dibuat secara otomatis. Lihat log tugas Dataflow untuk nilai yang digunakan.
  • bigtableChangeStreamResume: Jika disetel ke true, pipeline baru akan melanjutkan pemrosesan dari titik saat pipeline yang sebelumnya berjalan dengan nilai bigtableChangeStreamName yang sama berhenti. Jika pipeline dengan nilai bigtableChangeStreamName yang diberikan belum pernah dijalankan, pipeline baru tidak akan dimulai. Jika disetel ke false, pipeline baru akan dimulai. Jika pipeline dengan nilai bigtableChangeStreamName yang sama telah berjalan untuk sumber tertentu, pipeline baru tidak akan dimulai. Nilai defaultnya adalah false.
  • bigtableReadChangeStreamTimeoutMs: Waktu tunggu untuk permintaan Bigtable ReadChangeStream dalam milidetik.
  • bigtableReadProjectId: Project ID Bigtable. Defaultnya adalah project untuk tugas Dataflow.

Menjalankan template

Konsol

  1. Buka halaman Dataflow Membuat tugas dari template.
  2. Buka Membuat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region default-nya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Template Dataflow, pilih the Bigtable Change Streams to Vector Search template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Jalankan tugas.

gcloud CLI

Di shell atau terminal Anda, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       embeddingColumn=EMBEDDING_COLUMN,\
       embeddingByteSize=EMBEDDING_BYTE_SIZE,\
       vectorSearchIndex=VECTOR_SEARCH_INDEX,\
       bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\
       bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\
       bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\

Ganti kode berikut:

  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • EMBEDDING_COLUMN: kolom Embedding
  • EMBEDDING_BYTE_SIZE: Ukuran byte array embedding. Dapat berupa 4 atau 8.
  • VECTOR_SEARCH_INDEX: Jalur indeks Vector Search
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: ID profil aplikasi Bigtable
  • BIGTABLE_READ_INSTANCE_ID: ID Instance Bigtable sumber
  • BIGTABLE_READ_TABLE_ID: ID tabel Bigtable sumber

API

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "embeddingColumn": "EMBEDDING_COLUMN",
       "embeddingByteSize": "EMBEDDING_BYTE_SIZE",
       "vectorSearchIndex": "VECTOR_SEARCH_INDEX",
       "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE",
       "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID",
       "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search",
     "environment": { "maxWorkers": "10" }
  }
}

Ganti kode berikut:

  • PROJECT_ID: Google Cloud Project ID tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • EMBEDDING_COLUMN: kolom Embedding
  • EMBEDDING_BYTE_SIZE: Ukuran byte array embedding. Dapat berupa 4 atau 8.
  • VECTOR_SEARCH_INDEX: Jalur indeks Vector Search
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: ID profil aplikasi Bigtable
  • BIGTABLE_READ_INSTANCE_ID: ID Instance Bigtable sumber
  • BIGTABLE_READ_TABLE_ID: ID tabel Bigtable sumber