스키마 레지스트리를 사용하여 Avro 메시지 생성
스키마 레지스트리(미리보기)를 사용하여 Apache Avro 메시지를 생성하는 Java 프로듀서 애플리케이션을 개발하는 방법을 알아봅니다. 애플리케이션이 Managed Service for Apache Kafka 클러스터에 메시지를 씁니다.
시작하기 전에
이 튜토리얼을 시작하기 전에 Apache Kafka용 관리형 서비스 클러스터를 새로 만드세요. 클러스터가 이미 있으면 이 단계를 건너뛸 수 있습니다.
클러스터를 만드는 방법
콘솔
- Managed Service for Apache Kafka > 클러스터 페이지로 이동합니다.
- 만들기를 클릭합니다.
- 클러스터 이름 체크박스에 클러스터 이름을 입력합니다.
- 리전 목록에서 클러스터의 위치를 선택합니다.
-
네트워크 구성에서 클러스터에 액세스할 수 있는 서브넷을 구성합니다.
- 프로젝트에서 사용자 프로젝트를 선택합니다.
- 네트워크에서 VPC 네트워크를 선택합니다.
- 서브넷에서 서브넷을 선택합니다.
- 완료를 클릭합니다.
- 만들기를 클릭합니다.
만들기를 클릭하면 클러스터 상태가 Creating가 됩니다. 클러스터가 준비되면 상태는 Active입니다.
gcloud
Kafka 클러스터를 만들려면 managed-kafka clusters
create 명령어를 실행합니다.
gcloud managed-kafka clusters create KAFKA_CLUSTER \ --location=REGION \ --cpu=3 \ --memory=3GiB \ --subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --async
다음을 바꿉니다.
KAFKA_CLUSTER: Kafka 클러스터의 이름REGION: 클러스터의 위치PROJECT_ID: 프로젝트 ID입니다.SUBNET_NAME: 클러스터를 만들려는 서브넷입니다(예:default).
지원되는 위치에 대한 자세한 내용은 Apache Kafka용 관리형 서비스 위치를 참고하세요.
이 명령어는 비동기적으로 실행되고 작업 ID를 반환합니다.
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
생성 작업의 진행 상황을 추적하려면 gcloud managed-kafka
operations describe 명령어를 사용합니다.
gcloud managed-kafka operations describe OPERATION_ID \ --location=REGION
클러스터가 준비되면 이 명령어의 출력에 state:
ACTIVE 항목이 포함됩니다. 자세한 내용은 클러스터 생성 작업 모니터링을 참고하세요.
필요한 역할
클라이언트 VM을 만들고 구성하는 데 필요한 권한을 얻으려면 관리자에게 프로젝트에 대한 다음 IAM 역할을 부여해 달라고 요청하세요.
-
Compute 인스턴스 관리자(v1)(
roles/compute.instanceAdmin.v1) -
프로젝트 IAM 관리자(
roles/resourcemanager.projectIamAdmin) -
역할 뷰어 (
roles/iam.roleViewer) -
서비스 계정 사용자(
roles/iam.serviceAccountUser)
역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.
클라이언트 VM 설정
Kafka 클러스터에 액세스할 수 있는 Compute Engine의 Linux 가상 머신 (VM) 인스턴스를 만듭니다. VM을 구성할 때 다음 옵션을 설정합니다.
Region 사용). Kafka 클러스터와 동일한 리전에 VM을 만듭니다.
서브넷. Kafka 클러스터 구성에서 사용한 서브넷과 동일한 VPC 네트워크에 VM을 만듭니다. 자세한 내용은 클러스터의 서브넷 보기를 참고하세요.
액세스 범위 VM에
https://www.googleapis.com/auth/cloud-platform액세스 범위를 할당합니다. 이 범위는 VM이 관리형 Kafka API에 요청을 보낼 수 있도록 승인합니다.
다음 단계에서는 이러한 옵션을 설정하는 방법을 보여줍니다.
콘솔
Google Cloud 콘솔에서 인스턴스 만들기 페이지로 이동합니다.
머신 구성 창에서 다음을 수행합니다.
이름 필드에 인스턴스 이름을 지정합니다. 자세한 내용은 리소스 이름 지정 규칙을 참조하세요.
리전 목록에서 Kafka 클러스터와 동일한 리전을 선택합니다.
영역 목록에서 영역을 선택합니다.
탐색 메뉴에서 네트워킹을 클릭합니다. 네트워킹 창이 표시되면 다음을 수행합니다.
네트워크 인터페이스 섹션으로 이동합니다.
기본 네트워크 인터페이스를 펼치려면 화살표를 클릭합니다.
네트워크 필드에서 VPC 네트워크를 선택합니다.
서브네트워크 목록에서 서브넷을 선택합니다.
완료를 클릭합니다.
탐색 메뉴에서 보안을 클릭합니다. 보안 창이 표시되면 다음을 수행합니다.
액세스 범위에서 각 API에 액세스 설정을 선택합니다.
액세스 범위 목록에서 Cloud Platform 드롭다운 목록을 찾아 사용 설정됨을 선택합니다.
만들기를 클릭하여 VM을 만듭니다.
gcloud
VM 인스턴스를 만들려면 gcloud compute instances create 명령어를 사용합니다.
gcloud compute instances create VM_NAME \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
--zone=ZONE
다음을 바꿉니다.
- VM_NAME: VM의 이름
- PROJECT_ID: 프로젝트 ID입니다.
- REGION: Kafka 클러스터를 만든 리전입니다(예:
us-central1). - SUBNET: 클러스터 구성에서 사용한 서브넷과 동일한 VPC 네트워크의 서브넷
- ZONE: 클러스터를 만든 리전의 영역(예:
us-central1-c)
VM 만들기에 대한 자세한 내용은 특정 서브넷에 VM 인스턴스 만들기를 참고하세요.
IAM 역할 부여
Compute Engine 기본 서비스 계정에 다음 Identity and Access Management (IAM) 역할을 부여합니다.
- 관리형 Kafka 클라이언트 (
roles/managedkafka.client) - Schema Registry 관리자 (
roles/managedkafka.schemaRegistryAdmin) - 서비스 계정 토큰 생성자 (
roles/iam.serviceAccountTokenCreator) 서비스 계정 OpenID 토큰 생성자 (
roles/iam.serviceAccountOpenIdTokenCreator)
콘솔
Google Cloud 콘솔에서 IAM 페이지로 이동합니다.
Compute Engine 기본 서비스 계정 행을 찾아 주 구성원 수정을 클릭합니다.
다른 역할 추가를 클릭하고 관리형 Kafka 클라이언트 역할을 선택합니다. 스키마 레지스트리 관리자, 서비스 계정 토큰 생성자, 서비스 계정 OpenID 토큰 생성자 역할에 대해서도 이 단계를 반복합니다.
저장을 클릭합니다.
gcloud
IAM 역할을 부여하려면 gcloud projects add-iam-policy-binding 명령어를 사용합니다.
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
gcloud projects add-iam-policy-binding PROJECT_ID\
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.schemaRegistryAdmin
gcloud projects add-iam-policy-binding PROJECT_ID\
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountTokenCreator
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountOpenIdTokenCreator
다음을 바꿉니다.
PROJECT_ID: 프로젝트 ID입니다.
PROJECT_NUMBER: 프로젝트 번호
프로젝트 번호를 가져오려면 gcloud projects describe 명령어를 실행합니다.
gcloud projects describe PROJECT_ID
자세한 내용은 프로젝트 이름, 번호, ID 찾기를 참고하세요.
VM에 연결
SSH를 사용하여 VM 인스턴스에 연결합니다.
콘솔
VM 인스턴스 페이지로 이동합니다.
VM 인스턴스 목록에서 VM 이름을 찾아 SSH를 클릭합니다.
gcloud
VM에 연결하려면 gcloud compute ssh 명령어를 사용합니다.
gcloud compute ssh VM_NAME \
--project=PROJECT_ID \
--zone=ZONE
다음을 바꿉니다.
- VM_NAME: VM의 이름
- PROJECT_ID: 프로젝트 ID입니다.
- ZONE: VM을 만든 영역
SSH를 처음 사용하는 경우 추가 구성이 필요할 수 있습니다. 자세한 내용은 SSH 연결 정보를 참고하세요.
Apache Maven 프로젝트 설정
SSH 세션에서 다음 명령어를 실행하여 Maven 프로젝트를 설정합니다.
다음 명령어를 사용하여 Java 및 Maven을 설치합니다.
sudo apt-get install maven openjdk-17-jdkApache Maven 프로젝트를 설정합니다.
다음 명령어를 사용하여
demo이라는 디렉터리에com.google.example패키지를 만듭니다.mvn archetype:generate -DartifactId=demo -DgroupId=com.google.example\ -DarchetypeArtifactId=maven-archetype-quickstart\ -DarchetypeVersion=1.5 -DinteractiveMode=false
스키마와 Java 구현 정의
이 예에서 메시지는 이름과 선택적 ID가 있는 '사용자'를 나타냅니다. 이는 string 유형의 필수 필드 name와 선택적 정수 id.의 두 필드가 있는 Avro 스키마에 해당합니다. Java 프로그램에서 이 스키마를 사용하려면 이 스키마에 해당하는 객체의 Java 구현도 생성해야 합니다.
cd demo를 사용하여 프로젝트 디렉터리로 변경합니다.코드에서 스키마 파일을 저장할 폴더를 만듭니다.
mkdir -p src/main/avro다음 코드를
src/main/avro/User.avsc이라는 파일에 붙여넣어 Avro 스키마 정의를 만듭니다.{ "namespace": "com.google.example", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "id", "type": ["int", "null"]} ] }pom.xml.의build노드에 다음을 추가하여 Avro Java 코드 생성 플러그인을 사용하도록 Maven 프로젝트를 구성합니다.pom.xml에는pluginManagement노드 내에 다른plugins노드가 있을 수 있습니다. 이 단계에서는pluginManagement노드를 변경하지 마세요.plugins노드는pluginManagement와 동일한 수준에 있어야 합니다.<plugins> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.11.1</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> </configuration> </execution> </executions> </plugin> </plugins>pom.xml의project/dependencies노드 끝에 다음을 추가하여 Avro를 종속 항목으로 추가합니다.pom.xml에는 이미dependencyManagement태그 내에dependencies노드가 있습니다. 이 단계에서는dependencyManagement노드를 변경하지 마세요.<dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.11.1</version> </dependency>Java 소스 생성
mvn generate-sources다음 명령어를 실행하여 구현 소스 파일이 생성되었는지 확인합니다. 소스는
User객체의 생성자, 접근자, 직렬화기, 역직렬화기를 구현하는 Java 클래스 파일입니다. 이 클래스는 프로듀서 코드에서 사용됩니다.cat src/main/java/com/google/example/User.java
Apache Avro에 대한 자세한 내용은 Apache Avro 시작 가이드를 참고하세요.
생산자 클라이언트 만들기
이 섹션에서는 프로듀서 클라이언트를 작성하고 빌드하고 실행하는 단계를 안내합니다.
프로듀서 구현
생산자는 KafkaAvroSerializer.java를 사용하여 메시지를 인코딩하고 스키마를 관리합니다. 직렬화 프로그램은 스키마 레지스트리에 자동으로 연결되고, 주제 아래에 스키마를 등록하고, ID를 가져온 다음 Avro를 사용하여 메시지를 직렬화합니다. 여전히 프로듀서와 직렬화 프로그램을 구성해야 합니다.
src/main/java/com/google/example/UserProducer.java이라는 새 파일에 다음 코드를 붙여넣어 프로듀서 클라이언트 클래스를 만듭니다.package com.google.example; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; public class UserProducer { private static Properties configure() throws Exception { Properties p = new Properties(); p.load(new java.io.FileReader("client.properties")); p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); return p; } public static void main(String[] args) throws Exception { Properties p = configure(); KafkaProducer<String, User> producer = new KafkaProducer<String, User>(p); final User u = new User("SchemaEnthusiast", 42); final String topicName = "newUsers"; ProducerRecord<String, User> message = new ProducerRecord<String, User>(topicName, "", u); producer.send(message, new SendCallback()); producer.close(); } }src/main/java/com/google/example/SendCallback.java에서 콜백 클래스를 정의합니다.package com.google.example; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; class SendCallback implements Callback { public void onCompletion(RecordMetadata m, Exception e){ if (e == null){ System.out.println("Produced a message successfully."); } else { System.out.println(e.getMessage()); } } }이 코드를 컴파일하려면
org.apache.kafka.clients패키지와 직렬 변환기 코드가 필요합니다. 직렬화기 Maven 아티팩트는 맞춤 저장소를 통해 배포됩니다. 이 저장소를 구성하려면pom.xml의project노드에 다음 노드를 추가합니다.<repositories> <repository> <id>confluent</id> <name>Confluent</name> <url>https://packages.confluent.io/maven/</url> </repository> </repositories>pom.xml파일의dependencies노드에 다음을 추가합니다.<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.32</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>7.8.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.7.2</version> </dependency>모든 종속 항목이 올바르게 해결되었는지 확인하려면 클라이언트를 컴파일하세요.
mvn compile
스키마 레지스트리 만들기
스키마 레지스트리를 만들려면 다음 명령어를 실행합니다.
gcloud beta managed-kafka schema-registries create REGISTRY_ID \
--location=REGION
다음을 바꿉니다.
REGISTRY_ID: 새 스키마 레지스트리의 고유 식별자입니다. 이는 레지스트리의 리소스 이름의 일부를 구성합니다. 이름은 문자로 시작해야 하고 문자
(a-z, A-Z), 숫자(0-9), 밑줄(_)만 포함해야 하며 63자(영문 기준) 이하여야 합니다.REGION:스키마 레지스트리가 생성될 Google Cloud 리전입니다. 이 위치는 이 레지스트리를 사용하는 Kafka 클러스터의 리전과 일치해야 합니다.
생성한 스키마 정의가 아직 레지스트리에 업로드되지 않았습니다. 생산자 클라이언트는 다음 단계에서 처음 실행될 때 이를 실행합니다.
프로듀서 구성 및 실행
이 시점에서는 프로듀서가 완전히 구성되지 않았으므로 실행되지 않습니다. 프로듀서를 구성하려면 Kafka 및 스키마 레지스트리 구성을 모두 제공하세요.
pom.xml과 동일한 디렉터리에client.properties라는 파일을 만들고 다음 콘텐츠를 추가합니다.bootstrap.servers=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092 security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; schema.registry.url=https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/schemaRegistries/REGISTRY_ID bearer.auth.credentials.source=CUSTOM bearer.auth.custom.provider.class=com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProviderkafka-avro-serializer종속 항목 위의pom.xmldependencies노드에 다음을 삽입하여 Kafka 및 스키마 레지스트리 인증 핸들러 종속 항목을 Maven 프로젝트에 추가합니다.<dependency> <groupId>com.google.cloud.hosted.kafka</groupId> <artifactId>managed-kafka-auth-login-handler</artifactId> <version>1.0.6</version> <exclusions> <exclusion> <groupId>io.confluent</groupId> <artifactId>kafka-schema-registry-client</artifactId> </exclusion> </exclusions> </dependency>맞춤 스키마 레지스트리 인증 핸들러 인증 핸들러의 구현을 확인하려면 GcpBearerAuthCredentialProvider 클래스를 참고하세요.
프로듀서 클라이언트를 컴파일하고 실행합니다.
mvn compile -q exec:java -Dexec.mainClass=com.google.example.UserProducer모두 잘 진행되면
SendCallback클래스에서 생성된Produced a message successfully출력이 표시됩니다.
출력 검사
User스키마가 주제 및 스키마 이름에서 파생된 주제 이름으로 등록되었는지 확인합니다.SR_DOMAIN=https://managedkafka.googleapis.com SR_PATH=/v1/projects/PROJECT_ID/locations/REGION SR_HOST=$SR_DOMAIN/$SR_PATH/schemaRegistries/REGISTRY_ID/subjects curl -X GET \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -H "Authorization: Bearer $(gcloud auth print-access-token)"\ $SR_HOST이 명령어의 출력은 다음과 같이 표시됩니다.
["newUsers-value"]저장소에 등록된 스키마가
User와 동일한지 확인합니다.curl -X GET \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -H "Authorization: Bearer $(gcloud auth print-access-token)" \ $SR_HOST/newUsers-value/versions/1명령어 출력은 다음과 같이 표시됩니다.
{ "subject": "newUsers-value", "version": 1, "id": 2, "schemaType": "AVRO", "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.google.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":[\"int\",\"null\"]}]}", "references": [] }
삭제
이 페이지에서 사용한 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 다음 단계를 수행합니다.
콘솔
VM 인스턴스 삭제
VM 인스턴스 페이지로 이동합니다.
VM을 선택하고 삭제를 클릭합니다.
스키마 레지스트리를 삭제합니다.
스키마 레지스트리 페이지로 이동합니다.
스키마 레지스트리 이름을 클릭합니다.
삭제를 클릭합니다.
Kafka 클러스터를 삭제합니다.
Managed Service for Apache Kafka > 클러스터 페이지로 이동합니다.
Kafka 클러스터를 선택하고 삭제를 클릭합니다.
gcloud
VM을 삭제하려면
gcloud compute instances delete명령어를 사용합니다.gcloud compute instances delete VM_NAME --zone=ZONE스키마 레지스트리를 삭제하려면
/sdk/gcloud/reference/managed-kafka/schema-registries/delete명령어를 사용합니다.gcloud beta managed-kafka schema-registries delete REGISTRY_ID \ --location=REGIONKafka 클러스터를 삭제하려면
gcloud managed-kafka clusters delete명령어를 사용합니다.gcloud managed-kafka clusters delete CLUSTER_ID \ --location=REGION --async
다음 단계
- 스키마 레지스트리 개요
- Apache Kafka용 관리형 서비스 개요
- Managed Kafka API에 인증
- Kafka Connect를 사용하여 Avro 메시지를 BigQuery에 쓰기