Pub/Sub Proto to BigQuery 템플릿

Pub/Sub proto to BigQuery 템플릿은 Pub/Sub 구독에서 BigQuery 테이블로 proto 데이터를 수집하는 스트리밍 파이프라인입니다. BigQuery 테이블에 쓰는 동안 발생하는 모든 오류는 Pub/Sub 처리되지 않은 주제로 스트리밍됩니다.

JavaScript 사용자 정의 함수(UDF)를 제공하여 데이터를 변환할 수 있습니다. UDF 실행 중 오류는 개별 Pub/Sub 주제 또는 BigQuery 오류와 동일한 미처리 주제로 전송될 수 있습니다.

이 시나리오에 Dataflow 파이프라인을 실행하기 전에 UDF가 있는 Pub/Sub BigQuery 구독이 요구사항을 충족하는지 고려하세요.

파이프라인 요구사항

  • 입력 Pub/Sub 구독이 있어야 합니다.
  • Proto 레코드의 스키마 파일이 Cloud Storage에 있어야 합니다.
  • 출력 Pub/Sub 주제가 있어야 합니다.
  • 출력 BigQuery 데이터 세트가 있어야 합니다.
  • BigQuery 테이블이 있으면 createDisposition 값에 관계없이 Proto 데이터와 일치하는 스키마가 있어야 합니다.

템플릿 매개변수

필수 매개변수

  • protoSchemaPath(Proto 스키마 파일의 Cloud Storage 경로): 자체 포함된 설명자 세트 파일의 Cloud Storage 경로입니다. 예: gs://MyBucket/schema.pb Proto를 컴파일하는 protoc 명령어에 --descriptor_set_out=schema.pb를 추가하여 schema.pb를 생성할 수 있습니다. --include_imports 플래그를 사용하면 파일이 독립 실행형 파일임을 보장할 수 있습니다.
  • fullMessageName(전체 Proto 메시지 이름): 전체 메시지 이름입니다(예: package.name.MessageName). 메일이 다른 메일 내에 중첩되어 있는 경우 '.' 구분 기호를 사용하여 모든 메일을 포함합니다(예: package.name.OuterMessage.InnerMessage). 'package.name'은 java_package 문이 아닌 package 문에서 가져와야 합니다.
  • inputSubscription(Pub/Sub 입력 구독): 'projects/your-project-id/subscriptions/your-subscription-name' 형식의 입력을 읽어올 Pub/Sub 구독입니다(예: projects/your-project-id/subscriptions/your-subscription-name).
  • outputTableSpec(BigQuery 출력 테이블): 출력을 작성할 BigQuery 테이블 위치입니다. 이름은 <project>:<dataset>.<table_name> 형식이어야 합니다. 테이블 스키마가 입력 객체와 일치해야 합니다.
  • outputTopic(출력 Pub/Sub 주제): 데이터를 게시해야 하는 주제의 이름이며 형식은 'projects/your-project-id/topics/your-topic-name'입니다(예: projects/your-project-id/topics/your-topic-name).

선택적 매개변수

  • preserveProtoFieldNames(Proto 필드 이름 유지): Proto 필드 이름을 유지할지 아니면 lowerCamelCase로 변환할지 제어하는 플래그입니다. 테이블이 이미 있는 경우 테이블의 스키마와 일치하는 항목을 기반으로 해야 합니다. 그렇지 않으면 생성된 테이블의 열 이름을 결정합니다. proto snake_case를 유지하려면 true입니다. False는 필드를 lowerCamelCase로 변환합니다. (기본값: false).
  • bigQueryTableSchemaPath(BigQuery 테이블 스키마 경로): BigQuery 스키마 JSON 파일에 대한 Cloud Storage 경로입니다. 설정되지 않으면 스키마가 Proto 스키마에서 추론됩니다. (예: gs://MyBucket/bq_schema.json).
  • udfOutputTopic(UDF 실패를 위한 Pub/Sub 출력 주제): UDF 실패를 전송할 선택적 출력 주제입니다. 이 옵션을 설정하지 않으면 실패가 BigQuery 실패와 동일한 주제에 기록됩니다. (예: projects/your-project-id/topics/your-topic-name)
  • writeDisposition(BigQuery에 사용할 쓰기 처리): BigQuery WriteDisposition입니다. 예를 들면 WRITE_APPEND, WRITE_EMPTY 또는 WRITE_TRUNCATE입니다. 기본값은 WRITE_APPEND입니다.
  • createDisposition(BigQuery에 사용할 생성 처리): BigQuery CreateDisposition입니다. 예를 들면 CREATE_IF_NEEDED, CREATE_NEVER입니다. 기본값은 CREATE_IF_NEEDED입니다.
  • javascriptTextTransformGcsPath(JavaScript UDF 소스의 Cloud Storage 경로): 사용자 정의 함수가 포함된 JavaScript 코드의 Cloud Storage 경로 패턴입니다. (예: gs://your-bucket/your-function.js)
  • javascriptTextTransformFunctionName(UDF JavaScript 함수 이름): JavaScript 파일에서 호출할 함수의 이름입니다. 문자, 숫자, 밑줄만 사용합니다. (예: 'transform' 또는 'transform_udf1')
  • javascriptTextTransformReloadIntervalMinutes(JavaScript UDF 자동 새로고침 간격(분)): 작업자가 파일을 새로고침하기 위해 JavaScript UDF 변경사항을 확인할 수 있는 간격을 정의합니다. 기본값은 0입니다.
  • useStorageWriteApi(BigQuery Storage Write API 사용): true이면 파이프라인은 BigQuery에 데이터를 쓸 때 Storage Write API를 사용합니다(https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api 참조). 기본값은 false입니다. 정확히 한 번 모드에서 Storage Write API를 사용할 때는 'BigQuery Storage Write API의 스트림 수' 및 'BigQuery Storage Write API의 트리거 빈도(초)'와 같은 파라미터를 설정해야 합니다. Dataflow 적어도 한 번 모드를 사용 설정하거나 useStorageWriteApiAtLeastOnce 파라미터를 true로 설정하면 스트림 수나 트리거 빈도를 설정할 필요가 없습니다.
  • useStorageWriteApiAtLeastOnce(BigQuery Storage Write API에서 1회 이상 실행되는 시맨틱스 사용): 이 파라미터는 'BigQuery Storage Write API 사용'이 설정된 경우에만 적용됩니다. 사용 설정하면 Storage Write API에 최소 1회의 시맨틱스가 사용되고 그렇지 않은 경우 정확히 한 번의 시맨틱스가 사용됩니다. 기본값은 false입니다.
  • numStorageWriteApiStreams(BigQuery Storage Write API의 스트림 수): 스트림 수는 BigQueryIO Write 변환의 동시 로드를 정의하며 파이프라인에서 사용될 Storage Write API의 스트림 수와 대략 일치합니다. 권장 값은 https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api를 참조하세요. 기본값은 0입니다.
  • storageWriteApiTriggeringFrequencySec(BigQuery Storage Write API의 트리거 빈도(초)): 트리거 빈도는 BigQuery에서 쿼리하는 데이터가 표시되는 속도를 결정합니다. 권장 값은 https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api를 참조하세요.

사용자 정의 함수

선택적으로 사용자 정의 함수(UDF)를 작성하여 이 템플릿을 확장할 수 있습니다. 템플릿이 각 입력 요소에 대해 UDF를 호출합니다. 요소 페이로드는 JSON 문자열로 직렬화됩니다. 자세한 내용은 Dataflow 템플릿에 대한 사용자 정의 함수 만들기를 참조하세요.

함수 사양

UDF의 사양은 다음과 같습니다.

  • 입력: JSON 문자열로 직렬화된 Pub/Sub 메시지 데이터 필드입니다.
  • 출력: BigQuery 대상 테이블의 스키마와 일치하는 JSON 문자열입니다.
  • 템플릿 실행

    콘솔

    1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
    2. 템플릿에서 작업 만들기로 이동
    3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
    4. (선택사항) 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

      Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

    5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub Proto to BigQuery template을 선택합니다.
    6. 제공된 파라미터 필드에 파라미터 값을 입력합니다.
    7. 작업 실행을 클릭합니다.

    gcloud

    셸 또는 터미널에서 템플릿을 실행합니다.

    gcloud dataflow flex-template run JOB_NAME \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex \
        --parameters \
    schemaPath=SCHEMA_PATH,\
    fullMessageName=PROTO_MESSAGE_NAME,\
    inputSubscription=SUBSCRIPTION_NAME,\
    outputTableSpec=BIGQUERY_TABLE,\
    outputTopic=UNPROCESSED_TOPIC
      

    다음을 바꿉니다.

    • JOB_NAME: 선택한 고유한 작업 이름
    • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
    • VERSION: 사용할 템플릿 버전

      다음 값을 사용할 수 있습니다.

    • SCHEMA_PATH: Proto 스키마 파일의 Cloud Storage 경로(예: gs://MyBucket/file.pb).
    • PROTO_MESSAGE_NAME: Proto 메시지 이름(예: package.name.MessageName)
    • SUBSCRIPTION_NAME: Pub/Sub 입력 구독 이름
    • BIGQUERY_TABLE: BigQuery 출력 테이블 이름
    • UNPROCESSED_TOPIC: 처리되지 않은 큐에 사용할 Pub/Sub 주제

    API

    REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex",
          "parameters": {
              "schemaPath": "SCHEMA_PATH",
              "fullMessageName": "PROTO_MESSAGE_NAME",
              "inputSubscription": "SUBSCRIPTION_NAME",
              "outputTableSpec": "BIGQUERY_TABLE",
              "outputTopic": "UNPROCESSED_TOPIC"
          }
       }
    }
      

    다음을 바꿉니다.

    • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
    • JOB_NAME: 선택한 고유한 작업 이름
    • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
    • VERSION: 사용할 템플릿 버전

      다음 값을 사용할 수 있습니다.

    • SCHEMA_PATH: Proto 스키마 파일의 Cloud Storage 경로(예: gs://MyBucket/file.pb).
    • PROTO_MESSAGE_NAME: Proto 메시지 이름(예: package.name.MessageName)
    • SUBSCRIPTION_NAME: Pub/Sub 입력 구독 이름
    • BIGQUERY_TABLE: BigQuery 출력 테이블 이름
    • UNPROCESSED_TOPIC: 처리되지 않은 큐에 사용할 Pub/Sub 주제

    다음 단계