이 페이지에서는 Apache Kafka에서 읽고 BigQuery에 쓰는 Dataflow 스트리밍 작업의 성능 특성을 설명합니다. 상태를 추적하거나 스트림에서 요소를 그룹화하지 않고 메시지별 변환을 수행하는 맵 전용 파이프라인의 벤치마크 테스트 결과를 제공합니다.
ETL, 필드 유효성 검사, 스키마 매핑을 비롯한 많은 데이터 통합 워크로드가 맵 전용 카테고리에 속합니다. 파이프라인이 이 패턴을 따르는 경우 이러한 벤치마크를 사용하여 성능이 우수한 참조 구성과 비교하여 Dataflow 작업을 평가할 수 있습니다.
테스트 방법
벤치마크는 다음 리소스를 사용하여 수행되었습니다.
Managed Service for Apache Kafka 클러스터. 메시지는 스트리밍 데이터 생성기 템플릿을 사용하여 생성되었습니다.
- 메시지 비율: 초당 약 1,000,000개의 메시지
- 입력 로드: 1GiB/초
- 메시지 형식: 고정 스키마가 있는 임의로 생성된 JSON 텍스트
- 메시지 크기: 메시지당 약 1KiB
- Kafka 파티션: 1000
Apache Kafka to BigQuery 템플릿을 사용하는 Dataflow 스트리밍 파이프라인. 이 파이프라인은 최소한의 필수 파싱 및 스키마 매핑을 수행합니다. 커스텀 사용자 정의 함수 (UDF)는 사용되지 않았습니다.
수평 확장이 안정화되고 파이프라인이 정상 상태에 도달한 후 파이프라인은 약 하루 동안 실행되도록 허용되었으며, 그 후 결과가 수집되고 분석되었습니다.
Dataflow 파이프라인
이 벤치마크는 JSON 메시지의 간단한 매핑 및 변환을 수행하는 맵 전용 파이프라인을 사용합니다. 파이프라인은 정확히 한 번 모드와 적어도 한 번 모드를 모두 사용하여 테스트되었습니다. 적어도 한 번 처리는 더 나은 처리량을 제공합니다. 하지만 중복 레코드가 허용되거나 다운스트림 싱크가 중복 삭제를 처리하는 경우에만 사용해야 합니다.
작업 구성
다음 표에서는 Dataflow 작업이 구성된 방식을 보여줍니다.
| 설정 | 값 |
|---|---|
| 작업자 머신 유형 | e2-standard-2 |
| 작업자 머신 vCPU | 2 |
| 작업자 머신 RAM | 8 GB |
| 작업자 머신 영구 디스크 | 표준 영구 디스크 (HDD), 30GB |
| 최대 작업자 | 120 |
| Streaming Engine | 예 |
| 수평식 자동 확장 | 예 |
| 청구 모델 | 리소스 기반 청구 |
| Storage Write API 사용 설정됨 | 예 |
| Storage Write API 스트림 | 400 |
| Storage Write API 트리거링 빈도 | 5초 |
| 메시지 형식 | JSON |
| Kafka 인증 모드 |
애플리케이션 기본 사용자 인증 정보 (ADC). 자세한 내용은 Kafka 브로커의 인증 유형을 참조하세요. |
BigQuery Storage Write API는 스트리밍 파이프라인에 권장됩니다. Storage Write API와 함께 정확히 한 번 모드를 사용하는 경우 다음 설정을 조정할 수 있습니다.
쓰기 스트림 수. 쓰기 단계에서 충분한 키 병렬 처리 를 보장하려면 스트림당 처리량 권장사항 을 따르면서 Storage Write API 스트림 수 를 작업자 CPU 수보다 큰 값으로 설정합니다.
트리거링 빈도. 한 자리 수의 초 값은 처리량이 많은 파이프라인에 적합합니다.
자세한 내용은 Dataflow에서 BigQuery로 쓰기를 참조하세요.
Apache Kafka 파티션 수에도 특별한 주의를 기울여야 합니다. 읽기 단계에서 충분한 키 병렬 처리를 보장하려면 파티션 수가 작업자 vCPU의 총 수와 같거나 커야 합니다. 자세한 내용은 다음 자료를 참조하세요. Apache Kafka에서 Dataflow로 읽기
벤치마크 결과
이 섹션에서는 벤치마크 테스트 결과를 설명합니다.
처리량 및 리소스 사용량
다음 표에서는 파이프라인 처리량 및 리소스 사용량에 대한 테스트 결과를 보여줍니다.
| 결과 | 단 한 번 | 적어도 한 번 |
|---|---|---|
| 작업자당 입력 처리량 | 평균: 15MBps, n=3 | 평균: 18MBps, n=3 |
| 모든 작업자의 평균 CPU 사용률 | 평균: 70%, n=3 | 평균: 75%, n=3 |
| 워커 노드 수 | 평균: 63, n=3 | 평균: 53, n=3 |
| 시간당 Streaming Engine 컴퓨팅 단위 | 평균: 58, n=3 | 평균: 0, n=3 |
자동 확장 알고리즘은 대상 CPU 사용률 수준에 영향을 줄 수 있습니다. 대상 CPU 사용률을 높이거나 낮추려면 자동 확장 범위 또는 작업자 사용률 힌트를 설정하면 됩니다. 사용률 대상이 높을수록 비용이 절감될 수 있지만 특히 부하가 변동하는 경우 꼬리 지연 시간이 더 길어질 수 있습니다.
지연 시간
다음 표에서는 입력 단계를 제외한 정확히 한 번 모드의 파이프라인 지연 시간에 대한 벤치마크 결과를 보여줍니다.
| 입력 단계를 제외한 총 단계 엔드 투 엔드 지연 시간 | 단 한 번 |
|---|---|
| P50 | 평균: 1,200ms, n=3 |
| P95 | 평균: 3,000ms, n=3 |
| P99 | 평균: 5,400ms, n=3 |
테스트에서는 3개의 장기 실행 테스트 실행에서 단계별 엔드 투 엔드 지연 시간 (job/streaming_engine/stage_end_to_end_latencies
측정항목)을 측정했습니다. 이 측정항목은 Streaming Engine이 각 파이프라인 단계에서 소비하는 시간을 측정합니다. 여기에는 다음과 같은 파이프라인의 모든 내부 단계가 포함됩니다.
- 처리를 위한 메시지 셔플링 및 대기열 처리
- 실제 처리 시간(예: 메시지를 행 객체로 변환)
- 영구 상태 쓰기 및 영구 상태 쓰기를 위해 대기열에 추가하는 데 걸린 시간
측정항목의 제한으로 인해 입력 단계 지연 시간이 보고되지 않습니다. 따라서 총계에 포함되지 않습니다.
여기에 표시된 벤치마크는 기준선을 나타냅니다. 지연 시간은 파이프라인 복잡성에 매우 민감합니다. 커스텀 UDF, 추가 변환, 복잡한 창 처리 로직은 모두 지연 시간을 늘릴 수 있습니다.
예상 비용
가격 계산기를 사용하여 다음과 같이 리소스 기반 청구로 자체 비교 가능한 파이프라인의 기준 비용을 추정할 수 있습니다.Google Cloud
- 가격 계산기를 엽니다.
- 합산하여 추정 을 클릭합니다.
- Dataflow를 선택합니다.
- 서비스 유형으로 'Dataflow Classic'을 선택합니다.
- 고급 설정 을 선택하여 전체 옵션 세트를 표시합니다.
- 작업이 실행되는 위치를 선택합니다.
- 작업 유형으로 "스트리밍"을 선택합니다.
- Streaming Engine 사용 설정 을 선택합니다.
- 작업 실행 시간, 작업자 노드, 작업자 머신, 영구 디스크 스토리지에 대한 정보를 입력합니다.
- Streaming Engine 컴퓨팅 단위의 예상 개수를 입력합니다.
리소스 사용량과 비용은 입력 처리량에 따라 거의 선형적으로 확장되지만 작업자가 몇 명에 불과한 소규모 작업의 경우 총비용은 고정 비용이 지배합니다. 시작점으로 벤치마크 결과에서 워커 노드 수와 리소스 소비를 추정할 수 있습니다.
예를 들어 입력 데이터 전송률이 100MiB/초인 정확히 한 번 모드에서 맵 전용 파이프라인을 실행한다고 가정해 보겠습니다. 1GiB/초 파이프라인의 벤치마크 결과를 기반으로 다음과 같이 리소스 요구사항을 추정할 수 있습니다.
- 확장 계수: (100MiB/초) / (1GiB/초) = 0.1
- 예상 작업자 노드: 작업자 63명 × 0.1 = 작업자 6.3명
- 시간당 예상 Streaming Engine 컴퓨팅 단위 수: 58 × 0.1 = 시간당 5.8개 단위
이 값은 초기 추정치로만 사용해야 합니다. 실제 처리량과 비용은 머신 유형, 메시지 크기 분포, 사용자 코드, 집계 유형, 키 병렬 처리, 창 크기와 같은 요인에 따라 크게 달라질 수 있습니다. 자세한 내용은 Dataflow 비용 최적화 권장사항을 참조하세요.
테스트 파이프라인 실행
이 섹션에서는
gcloud dataflow flex-template run
명령어를 보여줍니다.
정확히 한 번 모드
gcloud dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
--enable-streaming-engine \
--parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400
적어도 한 번 모드
gcloud dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
--enable-streaming-engine \
--additional-experiments=streaming_mode_at_least_once \
--parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400,\
useStorageWriteApiAtLeastOnce=true
다음을 바꿉니다.
JOB_NAME: Dataflow 작업 이름PROJECT_ID: 프로젝트 IDKAFKA_BOOTSTRAP_ADDRESS: Apache Kafka 클러스터의 부트스트랩 주소KAFKA_TOPIC: Kafka 주제의 이름BQ_DATASET: BigQuery 데이터 세트의 이름BQ_TABLE_NAME: BigQuery 테이블의 이름
테스트 데이터 생성하기
테스트 데이터를 생성하려면 다음 명령어를 사용하여 스트리밍 데이터 생성기 템플릿을 실행합니다.
gcloud dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
--max-workers=140 \
--parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=1000000,\
sinkType=KAFKA,\
bootstrapServer=KAFKA_BOOTSTRAP_ADDRESS,\
kafkaTopic=KAFKA_TOPIC,\
outputType=JSON
다음을 바꿉니다.
JOB_NAME: Dataflow 작업 이름PROJECT_ID: 프로젝트 IDSCHEMA_LOCATION: Cloud Storage의 스키마 파일 경로KAFKA_BOOTSTRAP_ADDRESS: Apache Kafka 클러스터의 부트스트랩 주소KAFKA_TOPIC: Kafka 주제의 이름
스트리밍 데이터 생성기 템플릿은 JSON 데이터 생성기 파일을 사용하여 메시지 스키마를 정의합니다. 벤치마크 테스트에서는 다음과 유사한 메시지 스키마를 사용했습니다.
{ "logStreamId": "{{integer(1000001,2000000)}}", "message": "{{alphaNumeric(962)}}" }
다음 단계
- Dataflow 작업 모니터링 인터페이스 사용
- Dataflow 비용 최적화 권장사항
- 느리거나 중단된 스트리밍 작업 문제 해결
- Apache Kafka에서 Dataflow로 읽기
- Dataflow에서 BigQuery로 쓰기