This tutorial shows how to integrate Kafka Connect with schema registry, so that connectors can use registered schemas to serialize and deserialize messages.
Before you start this tutorial, complete the steps in Produce Avro messages with the schema registry. After you complete them, you have the following components that are needed for this tutorial:
- A schema registry with a registered Avro schema.
- A Kafka cluster with a topic that contains Avro messages.
Overview
A schema defines the structure of a message or data payload. A schema registry provides a centralized location to store schemas. By using a schema registry, you can ensure consistent encoding and decoding of your data as it moves from producers to consumers.
This tutorial demonstrates the following scenario:
A Java producer client writes Apache Avro messages to a Managed Service for Apache Kafka cluster. The Avro message schema is stored in a schema registry.
A BigQuery Sink connector reads the incoming messages from Kafka and writes them to BigQuery. The connector automatically creates a BigQuery table that is compatible with the Avro schema.
Create a Connect cluster
Perform the following steps to create a Connect cluster. Creating a Connect cluster can take up to 30 minutes.
Console
Go to the Managed Service for Apache Kafka > Connect Clusters page.
Click
Create.For the Connect cluster name, enter a string. Example:
my-connect-cluster
.For Primary Kafka cluster, select the Kafka cluster.
Click Create.
While the Connect cluster is being created, the cluster state is Creating
.
When the cluster has finished being created, the state is Active
.
gcloud
To create a Connect cluster, run the
gcloud managed-kafka connect-clusters create
command.
gcloud managed-kafka connect-clusters create CONNECT_CLUSTER \
--location=REGION \
--cpu=12 \
--memory=12GiB \
--primary-subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--kafka-cluster=KAFKA_CLUSTER \
--async
Replace the following:
CONNECT_CLUSTER
: a name for the Connect clusterREGION
: the region where you created the Kafka clusterPROJECT_ID
: your project IDSUBNET_NAME
: the subnet where you created the Kafka clusterKAFKA_CLUSTER
: the name of your Kafka cluster
The command runs asynchronously and returns an operation ID:
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
To track the progress of the create operation, use the
gcloud managed-kafka operations describe
command:
gcloud managed-kafka operations describe OPERATION_ID \
--location=REGION
For more information, see Monitor the cluster creation operation.
Grant IAM roles
Grant the BigQuery Data Editor Identity and Access Management (IAM) role to the Managed Kafka service account. This role allows the connector to write to BigQuery.
Console
In the Google Cloud console, go to the IAM page.
Select Include Google-provided role grants.
Find the row for Managed Kafka Service Account and click
Edit principal.Click Add another role and select the role BigQuery Data Editor.
Click Save.
For more information about granting roles, see Grant an IAM role by using the console.
gcloud
To grant IAM roles to the service account, run the
gcloud projects add-iam-policy-binding
command.
gcloud projects add-iam-policy-binding PROJECT_ID \
--member=serviceAccount:service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
--role=roles/bigquery.dataEditor
Replace the following:
PROJECT_ID
: your project IDPROJECT_NUMBER
: your project number
To find your project number, use the
gcloud projects describe
command.
Create a BigQuery dataset
In this step, you create a dataset to hold the BigQuery table. The BigQuery Sink connector automatically creates the table.
To create a dataset, perform the following steps.
Console
Open the BigQuery page.
In the Explorer panel, select the project where you want to create the dataset.
Expand the
View actions option and click Create dataset.On the Create dataset page:
For Dataset ID, enter a name for the dataset.
For Location type, choose a geographic location for the dataset.
gcloud
To create a new dataset, use the
bq mk
command with
the --dataset
flag.
bq mk --location REGION \
--dataset PROJECT_ID:DATASET_NAME
Replace the following:
PROJECT_ID
: your project IDDATASET_NAME
: the name of the datasetREGION
: the dataset's location
Create a BigQuery Sink connector
In this step, you create a BigQuery Sink connector, which reads from Kafka and writes to BigQuery. You configure the connector to deserialize the Kafka messages by using the Avro schema stored in your schema registry.
Console
Go to the Managed Service for Apache Kafka > Connect Clusters page.
Click the name of the Connect cluster.
Click
Create connector.For the Connector name, enter a string. Example:
bigquery-connector
.In the Connector plugin list, select
BigQuery Sink
.For Topics, select the Kafka topic named
newUsers
. This topic is created by the Java producer client.For Dataset, enter the name of the BigQuery dataset, in the following format:
PROJECT_ID.DATASET_NAME
. Example:my-project.dataset1
.In the Configurations edit box, replace the existing configuration with the following:
tasks.max=3 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=io.confluent.connect.avro.AvroConverter value.converter.bearer.auth.credentials.source=GCP value.converter.schema.registry.url=https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/REGISTRY_REGION/schemaRegistries/REGISTRY_NAME
Replace the following:
PROJECT_ID
: your project IDREGISTRY_REGION
: the region where you created your schema registryREGISTRY_NAME
: the name of your schema registry
Click Create.
gcloud
To create a BigQuery Sink connector, run the
gcloud managed-kafka connectors create
command.
export REGISTRY_PATH=projects/PROJECT_ID/locations/REGISTRY_REGION/schemaRegistries/REGISTRY_NAME
gcloud managed-kafka connectors create CONNECTOR_NAME \
--location=REGION \
--connect-cluster=CONNECT_CLUSTER \
--configs=connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector,\
key.converter=org.apache.kafka.connect.storage.StringConverter,\
value.converter=io.confluent.connect.avro.AvroConverter,\
value.converter.bearer.auth.credentials.source=GCP,\
value.converter.schema.registry.url=https://managedkafka.googleapis.com/v1/$REGISTRY_PATH,\
tasks.max=3,\
project=PROJECT_ID,\
defaultDataset=DATASET_NAME,\
topics=newUsers
Replace the following:
REGISTRY_REGION
: the region where you created your schema registryREGISTRY_NAME
: the name of your schema registryCONNECTOR_NAME
: a name for the connector, such asbigquery-connector
CONNECT_CLUSTER
: the name of your Connect clusterREGION
: the region where you created the Connect clusterPROJECT_ID
: your project IDDATASET_NAME
: the name of your BigQuery dataset
View results
To view the results in BigQuery, run a query on the table as follows. It might take a few minutes for the first messages to be written to the table.
Console
Open the BigQuery page.
In the query editor, run the following query:
SELECT * FROM `PROJECT_ID.DATASET_NAME.newUsers`
Replace the following variables:
PROJECT_ID
: the name of your Google Cloud projectDATASET_NAME
: the name of your BigQuery dataset
gcloud
Use the bq query
command to run a query on the table:
bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.newUsers`'
Replace the following variables:
PROJECT_ID
: the name of your Google Cloud projectDATASET_NAME
: the name of your BigQuery dataset