Membuat konektor Sink BigQuery

Konektor Sink BigQuery memungkinkan Anda melakukan streaming data dari Kafka ke BigQuery, sehingga memungkinkan penyerapan dan analisis data secara real-time dalam BigQuery. Konektor BigQuery Sink menggunakan kumpulan data dari satu atau beberapa topik Kafka, dan menulis data ke satu atau beberapa tabel dalam satu set data BigQuery.

Sebelum memulai

Sebelum membuat konektor BigQuery Sink, pastikan Anda memiliki hal berikut:

Peran dan izin yang diperlukan

Untuk mendapatkan izin yang diperlukan untuk membuat konektor BigQuery Sink, minta administrator Anda untuk memberi Anda peran IAM Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) di project Anda. Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Peran bawaan ini berisi izin yang diperlukan untuk membuat konektor Sink BigQuery. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:

Izin yang diperlukan

Izin berikut diperlukan untuk membuat konektor Sink BigQuery:

  • Berikan izin pembuatan konektor di cluster Connect induk: managedkafka.connectors.create

Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.

Untuk mengetahui informasi selengkapnya tentang peran Managed Kafka Connector Editor, lihat Peran standar Managed Service for Apache Kafka.

Jika cluster Managed Service for Apache Kafka Anda berada dalam project yang sama dengan cluster Connect, tidak diperlukan izin lebih lanjut. Jika cluster berada di project lain, lihat Membuat Cluster Connect di project lain.

Memberikan izin untuk menulis ke tabel BigQuery

Akun layanan cluster Connect, yang mengikuti format service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com, memerlukan izin untuk menulis ke tabel BigQuery. Untuk melakukannya, berikan peran BigQuery Data Editor (roles/bigquery.dataEditor) kepada akun layanan cluster Connect di project yang berisi tabel BigQuery.

Skema untuk konektor Sink BigQuery

Konektor BigQuery Sink menggunakan konverter nilai yang dikonfigurasi (value.converter) untuk mengurai nilai rekaman Kafka menjadi kolom. Kemudian, kolom tersebut akan menulis kolom dengan nama yang sama ke tabel BigQuery.

Konektor memerlukan skema untuk beroperasi. Skema dapat diberikan dengan cara berikut:

Bagian berikutnya menjelaskan opsi ini.

Skema berbasis pesan

Dalam mode ini, setiap rekaman Kafka menyertakan skema JSON. Konektor menggunakan skema untuk menulis data rekaman sebagai baris tabel BigQuery.

Untuk menggunakan skema berbasis pesan, tetapkan properti berikut pada konektor:

  • value.converter=org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable=true

Contoh nilai rekaman Kafka:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "field": "user",
        "type": "string",
        "optional": false
      },
      {
        "field": "age",
        "type": "int64",
        "optional": false
      }
    ]
  },
  "payload": {
    "user": "userId",
    "age": 30
  }
}

Jika tabel tujuan sudah ada, skema tabel BigQuery harus kompatibel dengan skema pesan sematan. Jika autoCreateTables=true, konektor akan otomatis membuat tabel tujuan jika diperlukan. Untuk mengetahui informasi selengkapnya, lihat Pembuatan tabel.

Jika Anda ingin konektor memperbarui skema tabel BigQuery saat skema pesan berubah, tetapkan allowNewBigQueryFields, allowSchemaUnionization, atau allowBigQueryRequiredFieldRelaxation ke true.

Skema berbasis tabel

Dalam mode ini, rekaman Kafka berisi data JSON biasa tanpa skema eksplisit. Konektor menyimpulkan skema dari tabel tujuan.

Persyaratan:

  • Tabel BigQuery harus sudah ada.
  • Data rekaman Kafka harus kompatibel dengan skema tabel.
  • Mode ini tidak mendukung pembaruan skema dinamis berdasarkan pesan masuk.

Untuk menggunakan skema berbasis tabel, tetapkan properti berikut pada konektor:

  • value.converter=org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable=false
  • bigQueryPartitionDecorator=false

Jika tabel BigQuery menggunakan partisi berbasis waktu dengan partisi harian, bigQueryPartitionDecorator dapat berupa true. Jika tidak, tetapkan properti ini ke false.

Contoh nilai rekaman Kafka:

{
  "user": "userId",
  "age": 30
}

Registry skema

Dalam mode ini, setiap rekaman Kafka berisi data Apache Avro, dan skema pesan disimpan dalam registry skema.

Untuk menggunakan konektor BigQuery Sink dengan registry skema, tetapkan properti berikut pada konektor:

  • value.converter=io.confluent.connect.avro.AvroConverter
  • value.converter.schema.registry.url=SCHEMA_REGISTRY_URL

Ganti SCHEMA_REGISTRY_URL dengan URL schema registry.

Untuk menggunakan konektor dengan Managed Service for Apache Kafka schema registry, tetapkan properti berikut:

  • value.converter.bearer.auth.credentials.source=GCP

Untuk mengetahui informasi selengkapnya, lihat Menggunakan Kafka Connect dengan registry skema.

Tabel BigLake untuk Apache Iceberg di BigQuery

Konektor BigQuery Sink mendukung tabel BigLake untuk Apache Iceberg di BigQuery (selanjutnya, tabel BigLake Iceberg di BigQuery) sebagai target sink.

Tabel BigLake Iceberg di BigQuery memberikan fondasi untuk membangun lakehouse format terbuka di Google Cloud. Tabel Iceberg BigLake di BigQuery menawarkan pengalaman terkelola sepenuhnya yang sama seperti tabel BigQuery, tetapi menyimpan data di bucket penyimpanan milik pelanggan menggunakan Parquet agar dapat beroperasi dengan format tabel terbuka Apache Iceberg.

Untuk mengetahui informasi tentang cara membuat tabel Apache Iceberg, lihat Membuat tabel Apache Iceberg.

Membuat konektor Sink BigQuery

Konsol

  1. Di konsol Google Cloud , buka halaman Connect Clusters.

    Buka Connect Clusters

  2. Klik cluster Connect tempat Anda ingin membuat konektor.

  3. Klik Create connector.

  4. Untuk nama konektor, masukkan string.

    Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka.

  5. Untuk Connector plugin, pilih BigQuery Sink.

  6. Di bagian Topics, tentukan topik Kafka yang akan dibaca. Anda dapat menentukan daftar topik atau ekspresi reguler yang akan dicocokkan dengan nama topik.

    • Opsi 1: Pilih Pilih daftar topik Kafka. Dalam daftar Kafka topics, pilih satu atau beberapa topik. Klik OK.

    • Opsi 2: Pilih Gunakan regex topik. Di kolom Topic regex, masukkan regular expression.

  7. Klik Set data dan tentukan set data BigQuery. Anda dapat memilih set data yang ada atau membuat set data baru.

  8. Opsional: Di kotak Configurations, tambahkan properti konfigurasi atau edit properti default. Untuk mengetahui informasi selengkapnya, lihat Mengonfigurasi konektor.

  9. Pilih Kebijakan mulai ulang tugas. Untuk mengetahui informasi selengkapnya, lihat Kebijakan memulai ulang tugas.

  10. Klik Create.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Jalankan perintah gcloud managed-kafka connectors create:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Ganti kode berikut:

    • CONNECTOR_ID: ID atau nama konektor. Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka. Nama konektor tidak dapat diubah.

    • LOCATION: Lokasi tempat Anda membuat konektor. Lokasi ini harus sama dengan lokasi tempat Anda membuat Connect cluster.

    • CONNECT_CLUSTER_ID: ID cluster Connect tempat konektor dibuat.

    • CONFIG_FILE: Jalur ke file konfigurasi YAML untuk konektor BigQuery Sink.

    Berikut adalah contoh file konfigurasi untuk konektor Sink BigQuery:

    name: "BQ_SINK_CONNECTOR_ID"
    project: "GCP_PROJECT_ID"
    topics: "GMK_TOPIC_ID"
    tasks.max: 3
    connector.class: "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    value.converter: "org.apache.kafka.connect.json.JsonConverter"
    value.converter.schemas.enable: "false"
    defaultDataset: "BQ_DATASET_ID"
    

    Ganti kode berikut:

    • BQ_SINK_CONNECTOR_ID: ID atau nama konektor BigQuery Sink. Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service for Apache Kafka. Nama konektor tidak dapat diubah.

    • GCP_PROJECT_ID: ID project Google Cloud tempat set data BigQuery Anda berada.

    • GMK_TOPIC_ID: ID topik Managed Service for Apache Kafka yang menjadi sumber aliran data ke konektor BigQuery Sink.

    • BQ_DATASET_ID: ID set data BigQuery yang berfungsi sebagai sink untuk pipeline.

  3. Terraform

    Anda dapat menggunakan resource Terraform untuk membuat konektor.

    resource "google_managed_kafka_connector" "example-bigquery-sink-connector" {
      project         = data.google_project.default.project_id
      connector_id    = "my-bigquery-sink-connector"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "name"                           = "my-bigquery-sink-connector"
        "project"                        = data.google_project.default.project_id
        "topics"                         = "GMK_TOPIC_ID"
        "tasks.max"                      = "3"
        "connector.class"                = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
        "key.converter"                  = "org.apache.kafka.connect.storage.StringConverter"
        "value.converter"                = "org.apache.kafka.connect.json.JsonConverter"
        "value.converter.schemas.enable" = "false"
        "defaultDataset"                 = "BQ_DATASET_ID"
      }
    
      provider = google-beta
    }

    Untuk mempelajari cara menerapkan atau menghapus konfigurasi Terraform, lihat Perintah dasar Terraform.

    Go

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Go API.

    Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createBigQuerySinkConnector creates a BigQuery Sink connector.
    func createBigQuerySinkConnector(w io.Writer, projectID, region, connectClusterID, connectorID, topics, tasksMax, keyConverter, valueConverter, valueConverterSchemasEnable, defaultDataset string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "BQ_SINK_CONNECTOR_ID"
    	// topics := "GMK_TOPIC_ID"
    	// tasksMax := "3"
    	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// valueConverter := "org.apache.kafka.connect.json.JsonConverter"
    	// valueConverterSchemasEnable := "false"
    	// defaultDataset := "BQ_DATASET_ID"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	// BigQuery Sink sample connector configuration
    	config := map[string]string{
    		"name":                           connectorID,
    		"project":                        projectID,
    		"topics":                         topics,
    		"tasks.max":                      tasksMax,
    		"connector.class":                "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
    		"key.converter":                  keyConverter,
    		"value.converter":                valueConverter,
    		"value.converter.schemas.enable": valueConverterSchemasEnable,
    		"defaultDataset":                 defaultDataset,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created BigQuery sink connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Java API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateBigQuerySinkConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-bigquery-sink-connector";
        String bigqueryProjectId = "my-bigquery-project-id";
        String datasetName = "my-dataset";
        String kafkaTopicName = "kafka-topic";
        String maxTasks = "3";
        String connectorClass = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector";
        String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
        String valueConverter = "org.apache.kafka.connect.json.JsonConverter";
        String valueSchemasEnable = "false";
        createBigQuerySinkConnector(
            projectId,
            region,
            connectClusterId,
            connectorId,
            bigqueryProjectId,
            datasetName,
            kafkaTopicName,
            maxTasks,
            connectorClass,
            keyConverter,
            valueConverter,
            valueSchemasEnable);
      }
    
      public static void createBigQuerySinkConnector(
          String projectId,
          String region,
          String connectClusterId,
          String connectorId,
          String bigqueryProjectId,
          String datasetName,
          String kafkaTopicName,
          String maxTasks,
          String connectorClass,
          String keyConverter,
          String valueConverter,
          String valueSchemasEnable)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("name", connectorId);
        configMap.put("project", bigqueryProjectId);
        configMap.put("topics", kafkaTopicName);
        configMap.put("tasks.max", maxTasks);
        configMap.put("connector.class", connectorClass);
        configMap.put("key.converter", keyConverter);
        configMap.put("value.converter", valueConverter);
        configMap.put("value.converter.schemas.enable", valueSchemasEnable);
        configMap.put("defaultDataset", datasetName);
    
        Connector connector =
            Connector.newBuilder()
                .setName(ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
                .putAllConfigs(configMap)
                .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request =
              CreateConnectorRequest.newBuilder()
                  .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
                  .setConnectorId(connectorId)
                  .setConnector(connector)
                  .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created BigQuery Sink connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Python API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "name": connector_id,
        "project": project_id,
        "topics": topics,
        "tasks.max": tasks_max,
        "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
        "key.converter": key_converter,
        "value.converter": value_converter,
        "value.converter.schemas.enable": value_converter_schemas_enable,
        "defaultDataset": default_dataset,
    }
    
    connector = Connector()
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

Setelah membuat konektor, Anda dapat mengedit, menghapus, menjeda, menghentikan, atau memulai ulang konektor.

Mengonfigurasi konektor

Bagian ini menjelaskan beberapa properti konfigurasi yang dapat Anda tetapkan di konektor. Untuk mengetahui daftar lengkap properti yang khusus untuk konektor ini, lihat Konfigurasi konektor Sink BigQuery.

Nama tabel

Secara default, konektor menggunakan nama topik sebagai nama tabel BigQuery. Untuk menggunakan nama tabel yang berbeda, tetapkan properti topic2TableMap dengan format berikut:

topic2TableMap=TOPIC_1:TABLE_1,TOPIC_2:TABLE_2,...

Pembuatan tabel

Konektor BigQuery Sink dapat membuat tabel tujuan jika tabel tersebut tidak ada.

  • Jika autoCreateTables=true, konektor akan mencoba membuat tabel BigQuery yang belum ada. Setelan ini adalah perilaku default.

  • Jika autoCreateTables=false, konektor tidak membuat tabel apa pun. Jika tabel tujuan tidak ada, akan terjadi error.

Jika autoCreateTables adalah true, Anda dapat menggunakan properti konfigurasi berikut untuk kontrol yang lebih terperinci tentang cara konektor membuat dan mengonfigurasi tabel baru:

  • allBQFieldsNullable
  • clusteringPartitionFieldNames
  • convertDoubleSpecialValues
  • partitionExpirationMs
  • sanitizeFieldNames
  • sanitizeTopics
  • timestampPartitionFieldName

Untuk mengetahui informasi tentang properti ini, lihat Konfigurasi konektor Sink BigQuery.

Metadata Kafka

Anda dapat memetakan data tambahan dari Kafka seperti informasi metadata dan informasi utama ke dalam tabel BigQuery dengan mengonfigurasi kolom kafkaDataFieldName dan kafkaKeyFieldName masing-masing. Contoh informasi metadata mencakup topik, partisi, offset, dan waktu penyisipan Kafka.

Apa langkah selanjutnya?

Apache Kafka® adalah merek dagang terdaftar milik The Apache Software Foundation atau afiliasinya di Amerika Serikat dan/atau negara lain.