이 페이지에서는 Apache Kafka에서 읽고 BigQuery에 쓰는 Dataflow 스트리밍 작업의 성능 특성을 설명합니다. 스트림 전체에서 상태를 추적하거나 요소를 그룹화하지 않고 메시지별 변환을 실행하는 맵 전용 파이프라인의 벤치마크 테스트 결과를 제공합니다.
ETL, 필드 검증, 스키마 매핑을 비롯한 많은 데이터 통합 워크로드가 매핑 전용 카테고리에 속합니다. 파이프라인이 이 패턴을 따르는 경우 이러한 벤치마크를 사용하여 성능이 우수한 참조 구성에 대해 Dataflow 작업을 평가할 수 있습니다.
테스트 방법
벤치마크는 다음 리소스를 사용하여 진행되었습니다.
Apache Kafka용 관리형 서비스 클러스터 메시지는 스트리밍 데이터 생성기 템플릿을 사용하여 생성되었습니다.
- 메시지 비율: 초당 약 1,000,000개의 메시지
- 입력 부하: 1GiB/s
- 메시지 형식: 고정 스키마가 있는 무작위로 생성된 JSON 텍스트
- 메일 크기: 메일당 약 1KiB
- Kafka 파티션: 1000
표준 BigQuery 테이블
Apache Kafka to BigQuery 템플릿을 사용한 Dataflow 스트리밍 파이프라인 이 파이프라인은 필요한 최소한의 파싱과 스키마 매핑을 실행합니다. 맞춤 사용자 정의 함수 (UDF)가 사용되지 않았습니다.
수평 확장 안정화 후 파이프라인이 정상 상태에 도달하면 파이프라인이 약 하루 동안 실행되도록 허용되었으며, 그 후 결과가 수집되고 분석되었습니다.
Dataflow 파이프라인
이 벤치마크는 JSON 메시지의 간단한 매핑과 변환을 실행하는 map 전용 파이프라인을 사용합니다. 파이프라인은 정확히 한 번 모드와 적어도 한 번 모드를 모두 사용하여 테스트되었습니다. 최소 1회 처리하면 처리량이 향상됩니다. 하지만 중복 레코드가 허용되거나 다운스트림 싱크에서 중복 삭제를 처리하는 경우에만 사용해야 합니다.
작업 구성
다음 표에는 Dataflow 작업이 구성된 방식이 나와 있습니다.
| 설정 | 값 |
|---|---|
| 작업자 머신 유형 | e2-standard-2 |
| 작업자 머신 vCPU | 2 |
| 작업자 머신 RAM | 8GB |
| 작업자 머신 영구 디스크 | 표준 영구 디스크 (HDD), 30GB |
| 최대 작업자 | 120 |
| Streaming Engine | 예 |
| 수평식 자동 확장 | 예 |
| 청구 모델 | 리소스 기반 결제 |
| Storage Write API가 사용 설정되어 있나요? | 예 |
| Storage Write API 스트림 | 400 |
| Storage Write API 트리거 빈도 | 5초 |
| 메시지 형식 | JSON |
| Kafka 인증 모드 |
애플리케이션 기본 사용자 인증 정보 (ADC) 자세한 내용은 Kafka 브로커의 인증 유형을 참고하세요. |
스트리밍 파이프라인에는 BigQuery Storage Write API가 권장됩니다. Storage Write API에서 정확히 한 번 모드를 사용하는 경우 다음 설정을 조정할 수 있습니다.
쓰기 스트림 수 쓰기 단계에서 충분한 키 병렬 처리를 보장하려면 스트림당 처리량 권장사항을 따르면서 스토리지 쓰기 API 스트림 수를 작업자 CPU 수보다 큰 값으로 설정하세요.
트리거 빈도 단일 숫자 초 값은 처리량이 많은 파이프라인에 적합합니다.
자세한 내용은 Dataflow에서 BigQuery로 쓰기를 참고하세요.
Apache Kafka 파티션 수에도 특별한 고려가 필요합니다. 읽기 단계에서 충분한 키 병렬 처리를 보장하려면 파티션 수가 작업자 vCPU의 총수와 같아야 합니다. 자세한 내용은 Apache Kafka에서 Dataflow로 읽기를 참고하세요.
벤치마크 결과
이 섹션에서는 벤치마크 테스트 결과를 설명합니다.
처리량 및 리소스 사용량
다음 표에서는 파이프라인 처리량과 리소스 사용량에 대한 테스트 결과를 보여줍니다.
| Result | 단 한 번 | 적어도 한 번 |
|---|---|---|
| 작업자당 입력 처리량 | 평균: 15MBps, n=3 | 평균: 18MBps, n=3 |
| 모든 작업자의 평균 CPU 사용률 | 평균: 70%, n=3 | 평균: 75%, n=3 |
| 워커 노드 수 | 평균: 63, n=3 | 평균: 53, n=3 |
| 시간당 Streaming Engine 컴퓨팅 단위 | 평균: 58, n=3 | 평균: 0, n=3 |
자동 확장 알고리즘은 목표 CPU 사용률 수준에 영향을 줄 수 있습니다. 목표 CPU 사용률을 높이거나 낮추려면 자동 확장 범위 또는 작업자 사용률 힌트를 설정하면 됩니다. 활용률 타겟이 높을수록 비용이 절감될 수 있지만 특히 부하가 변동하는 경우 테일 지연 시간이 악화될 수도 있습니다.
지연 시간
다음 표에서는 입력 단계를 제외한 정확히 한 번 모드의 파이프라인 지연 시간 벤치마크 결과를 보여줍니다.
| 입력 단계를 제외한 총 단계 엔드 투 엔드 지연 시간 | 단 한 번 |
|---|---|
| P50 | 평균: 1,200ms, n=3 |
| P95 | 평균: 3,000ms, n=3 |
| P99 | 평균: 5,400ms, n=3 |
테스트에서는 3개의 장기 실행 테스트 실행에서 단계별 엔드 투 엔드 지연 시간 (job/streaming_engine/stage_end_to_end_latencies 측정항목)을 측정했습니다. 이 측정항목은 스트리밍 엔진이 각 파이프라인 단계에서 소비하는 시간을 측정합니다. 여기에는 다음과 같은 파이프라인의 모든 내부 단계가 포함됩니다.
- 처리를 위해 메시지 셔플링 및 큐 추가
- 실제 처리 시간(예: 메시지를 행 객체로 변환)
- 영구 상태 쓰기 및 영구 상태 쓰기를 위해 대기열에 추가하는 데 소요된 시간
측정항목의 제한으로 인해 입력 단계 지연 시간이 보고되지 않습니다. 따라서 총계에 포함되지 않습니다.
여기에 표시된 벤치마크는 기준을 나타냅니다. 지연 시간은 파이프라인 복잡성에 매우 민감합니다. 맞춤 UDF, 추가 변환, 복잡한 윈도우화 로직은 모두 지연 시간을 늘릴 수 있습니다.
예상 비용
다음과 같이 Google Cloud Platform 가격 계산기를 사용하여 리소스 기반 청구로 자체적인 유사한 파이프라인의 기준 비용을 추정할 수 있습니다.
- 가격 계산기를 엽니다.
- 합산하여 추정을 클릭합니다.
- Dataflow를 선택합니다.
- 서비스 유형으로 'Dataflow Classic'을 선택합니다.
- 고급 설정을 선택하여 전체 옵션을 표시합니다.
- 작업이 실행되는 위치를 선택합니다.
- 작업 유형으로 '스트리밍'을 선택합니다.
- Streaming Engine 사용 설정을 선택합니다.
- 작업 실행 시간, 작업자 노드, 작업자 머신, 영구 디스크 스토리지에 관한 정보를 입력합니다.
- Streaming Engine 컴퓨팅 단위의 예상 수를 입력합니다.
리소스 사용량과 비용은 입력 처리량에 따라 대략 선형으로 확장되지만 작업자가 몇 명에 불과한 소규모 작업의 경우 총비용은 고정 비용이 지배합니다. 시작점으로 벤치마크 결과에서 작업자 노드 수와 리소스 소비를 추정할 수 있습니다.
예를 들어 정확히 한 번 모드에서 맵 전용 파이프라인을 실행하고 입력 데이터 속도가 100MiB/s라고 가정해 보겠습니다. 1GiB/s 파이프라인의 벤치마크 결과를 기반으로 리소스 요구사항을 다음과 같이 추정할 수 있습니다.
- 배율: (100MiB/초) / (1GiB/초) = 0.1
- 예상 워커 노드: 63개 워커 × 0.1 = 6.3개 워커
- 시간당 예상 Streaming Engine 컴퓨팅 단위 수: 58 × 0.1 = 시간당 5.8단위
이 값은 초기 추정치로만 사용해야 합니다. 실제 처리량과 비용은 머신 유형, 메시지 크기 분포, 사용자 코드, 집계 유형, 키 병렬 처리, 윈도우 크기 등의 요인에 따라 크게 달라질 수 있습니다. 자세한 내용은 Dataflow 비용 최적화 권장사항을 참고하세요.
테스트 파이프라인 실행
이 섹션에서는 맵 전용 파이프라인을 실행하는 데 사용된 gcloud dataflow flex-template run 명령어를 보여줍니다.
단 한 번 모드
gcloud dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
--enable-streaming-engine \
--parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400
적어도 한 번 모드
gcloud dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
--enable-streaming-engine \
--additional-experiments=streaming_mode_at_least_once \
--parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400,\
useStorageWriteApiAtLeastOnce=true
다음을 바꿉니다.
JOB_NAME: Dataflow 작업 이름PROJECT_ID: 프로젝트 ID입니다.KAFKA_BOOTSTRAP_ADDRESS: Apache Kafka 클러스터의 부트스트랩 주소KAFKA_TOPIC: Kafka 주제의 이름BQ_DATASET: BigQuery 데이터 세트의 이름BQ_TABLE_NAME: BigQuery 테이블의 이름
테스트 데이터 생성
테스트 데이터를 생성하려면 다음 명령어를 사용하여 스트리밍 데이터 생성기 템플릿을 실행합니다.
gcloud dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
--max-workers=140 \
--parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=1000000,\
sinkType=KAFKA,\
bootstrapServer=KAFKA_BOOTSTRAP_ADDRESS,\
kafkaTopic=KAFKA_TOPIC,\
outputType=JSON
다음을 바꿉니다.
JOB_NAME: Dataflow 작업 이름PROJECT_ID: 프로젝트 ID입니다.SCHEMA_LOCATION: Cloud Storage의 스키마 파일 경로KAFKA_BOOTSTRAP_ADDRESS: Apache Kafka 클러스터의 부트스트랩 주소KAFKA_TOPIC: Kafka 주제의 이름
스트리밍 데이터 생성기 템플릿은 JSON 데이터 생성기 파일을 사용하여 메시지 스키마를 정의합니다. 벤치마크 테스트에서는 다음과 유사한 메시지 스키마를 사용했습니다.
{ "logStreamId": "{{integer(1000001,2000000)}}", "message": "{{alphaNumeric(962)}}" }
다음 단계
- Dataflow 작업 모니터링 인터페이스 사용
- Dataflow 비용 최적화 권장사항
- 느리거나 중단된 스트리밍 작업 문제 해결
- Apache Kafka에서 Dataflow로 읽기
- Dataflow에서 BigQuery로 쓰기