Protobuf 열로 데이터 내보내기

이 문서에서는 BigQuery 사용자 정의 함수(UDF)를 사용하여 BigQuery 데이터를 프로토콜 버퍼(Protobuf) 열로 내보내는 방법을 설명합니다.

Protobuf 열을 사용하는 경우

BigQuery는 선택한 데이터의 형식을 지정하는 여러 가지 기본 제공 함수를 제공합니다. 그 중 한 옵션은 여러 열 값을 단일 Protobuf 값으로 병합하는 것으로 여기에는 다음과 같은 이점이 있습니다.

  • 객체 유형 안전성
  • JSON보다 개선된 압축, 데이터 전송 시간, 비용.
  • 유연성, 대부분의 프로그래밍 언어에는 Protobuf를 처리할 수 있는 라이브러리가 있음
  • 여러 열에서 읽고 단일 객체를 빌드할 때 오버헤드 감소.

다른 열 유형도 유형 안전성을 제공할 수 있지만 Protobuf 열을 사용하면 완전히 형식이 지정된 객체가 제공되어 애플리케이션 계층 또는 파이프라인의 다른 부분에서 처리해야 하는 작업량을 줄일 수 있습니다.

그러나 BigQuery 데이터를 Protobuf 열로 내보내는 데는 몇 가지 제한사항이 있습니다.

  • Protobuf 열은 색인 생성 또는 필터링이 원활하지 않습니다. Protobuf 열의 콘텐츠로 검색하면 효과적이지 않을 수 있습니다.
  • Protobuf 형식의 데이터는 정렬하기 어려울 수 있습니다.

이러한 제한사항이 내보내기 워크플로에 적용되는 경우 다음과 같이 BigQuery 데이터를 내보내는 다른 방법을 고려해야 할 수 있습니다.

  • EXPORT DATA으로 예약된 쿼리를 사용하여 내보낸 BigQuery 데이터를 날짜 또는 시간별로 정렬하고 내보내기를 반복적으로 예약합니다. BigQuery는 데이터를 Avro, CSV, JSON, Parquet 형식으로 내보내는 것을 지원합니다.
  • Dataflow를 사용하여 BigQuery 데이터를 Avro 또는 CSV 파일 형식으로 내보냅니다.

필요한 역할

BigQuery 데이터를 Protobuf 열로 내보내는 데 필요한 권한을 얻으려면 관리자에게 프로젝트에 대한 다음 IAM 역할을 부여해 달라고 요청하세요.

역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

커스텀 역할이나 다른 사전 정의된 역할을 통해 필요한 권한을 얻을 수도 있습니다.

UDF 만들기

BigQuery STRUCT 데이터 유형을 Protobuf 열로 변환하는 UDF를 만듭니다.

  1. 명령줄에서 bigquery-utils.git 저장소를 클론합니다.

    git clone https://github.com/GoogleCloudPlatform/bigquery-utils.git
    
  2. Protobuf 내보내기 폴더로 이동합니다.

    cd bigquery-utils/tools/protobuf_export
    
  3. cp 명령 또는 운영체제의 파일 브라우저를 사용하여 프로토 파일을 ./protos 하위 폴더에 복사합니다.

    ./protos 폴더에 이미 dummy.proto라는 샘플 프로토 파일이 있습니다.

  4. GitHub 저장소에서 필요한 패키지를 설치합니다.

    npm install
    
  5. webpack을 사용하여 패키지를 번들로 묶습니다.

    npx webpack --config webpack.config.js --stats-error-details
    
  6. ./dist 하위 폴더에서 pbwrapper.js 파일을 찾은 다음 Cloud Storage 버킷에 파일을 업로드합니다.

  7. BigQuery 페이지로 이동합니다.

    BigQuery로 이동

  8. 쿼리 편집기를 사용하여 기존 BigQuery 테이블 열에서 Protobuf 열을 빌드하는 toMyProtoMessage라는 UDF를 만듭니다.

    CREATE FUNCTION
      DATASET_ID.toMyProtoMessage(input STRUCT<INPUT_FIELDS>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://BUCKET_NAME/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("PROTO_PACKAGE.PROTO_MESSAGE")
    return pbwrapper.parse(message, input)
      """;
    

    다음을 바꿉니다.

    • DATASET_ID: UDF를 포함할 데이터 세트의 ID입니다.
    • INPUT_FIELDS: proto 파일의 proto 메시지 유형에 사용되는 필드입니다(field_name_1 field_type_1 [, field_name_2 field_type_2, ...] 형식).

      밑줄을 사용하는 메시지 유형 필드는 대신 카멜 표기법을 사용하도록 변환해야 합니다. 예를 들어 메시지 유형이 다음과 같은 경우 입력 필드 값은 itemId int64, itemDescription string이어야 합니다.

      message ThisMessage {
        int64 item_id = 1;
        string item_description = 2;
      }
      
    • BUCKET_NAME: pbwrapper.js 파일이 포함된 Cloud Storage 버킷의 이름입니다.

    • PROTO_PACKAGE: proto 파일의 패키지입니다.

    • PROTO_MESSAGE: proto 파일의 메시지 유형입니다.

    예를 들어 제공된 dummy.proto 파일을 사용하는 경우 CREATE FUNCTION 문은 다음과 같습니다.

    CREATE OR REPLACE FUNCTION
      mydataset.toMyProtoMessage(input STRUCT<dummyField STRING>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://mybucket/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("dummypackage.DummyMessage")
    return pbwrapper.parse(message, input)
      """;
    

열 형식을 Protobuf 값으로 지정

toMyProtoMessage UDF를 실행하여 BigQuery 테이블 열의 형식을 Protobuf 값으로 지정합니다.

  SELECT
    UDF_DATASET_ID.toMyProtoMessage(STRUCT(INPUT_COLUMNS)) AS protoResult
  FROM
    `PROJECT_ID.DATASET_ID.TABLE_NAME`
  LIMIT
    100;

다음을 바꿉니다.

  • UDF_DATASET_ID: UDF가 포함된 데이터 세트의 ID
  • INPUT_COLUMNS: Protobuf 값으로 형식을 지정할 열의 이름으로, column_name_1 [, column_name_2, ...] 형식입니다. 열은 지원되는 모든 스칼라 값 유형 또는 비스칼라 유형(ARRAYSTRUCT 포함)일 수 있습니다. 입력 열은 프로토 메시지 유형 필드의 유형 및 수와 일치해야 합니다.
  • PROJECT_ID: 테이블이 포함된 프로젝트의 ID입니다. 데이터 세트가 현재 프로젝트에 있는 경우 프로젝트 식별을 건너뛸 수 있습니다.
  • DATASET_ID: 테이블이 포함된 데이터 세트의 ID입니다.
  • TABLE_NAME: 서식을 지정할 열이 포함된 테이블의 이름입니다.

예를 들어 dummy.proto을 기반으로 하는 toMyProtoMessage UDF를 사용하는 경우 다음 SELECT 문이 작동합니다.

SELECT
  mydataset.toMyProtoMessage(STRUCT(word)) AS protoResult
FROM
  `bigquery-public-data.samples.shakespeare`
LIMIT 100;

Protobuf 값 사용

Protobuf 형식으로 내보낸 BigQuery 데이터를 사용하여 이제 완전히 형식이 지정된 객체 또는 구조체로 데이터를 작업할 수 있습니다.

다음 코드 샘플은 내보낸 데이터를 처리하거나 사용할 수 있는 몇 가지 방법의 예시를 제공합니다.

Go

// package Main queries Google BigQuery.
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	"cloud.google.com/go/bigquery"
	"google.golang.org/api/iterator"
	"google.golang.org/Protobuf/proto"

	pb "path/to/proto/file_proto"
)

const (
	projectID = "your-project-id"
)

// Row contains returned row data from bigquery.
type Row struct {
	RowKey string `bigquery:"RowKey"`
	Proto  []byte `bigquery:"ProtoResult"`
}

func main() {
	ctx := context.Background()

	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	rows, err := query(ctx, client)
	if err != nil {
		log.Fatal(err)
	}
	if err := printResults(os.Stdout, rows); err != nil {
		log.Fatal(err)
	}
}

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {

	query := client.Query(
		`SELECT 
  concat(word, ":", corpus) as RowKey, 
  <dataset-id>.toMyProtoMessage(
    STRUCT(
      word, 
      CAST(word_count AS BIGNUMERIC)
    )
  ) AS ProtoResult 
FROM 
  ` + "` bigquery - public - data.samples.shakespeare `" + ` 
LIMIT 
  100;
`)
	return query.Read(ctx)
}

// printResults prints results from a query.
func printResults(w io.Writer, iter *bigquery.RowIterator) error {
	for {
		var row Row
		err := iter.Next(&row)
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return fmt.Errorf("error iterating through results: %w", err)
		}
		message := &pb.TestMessage{}
		if err = proto.Unmarshal(row.Proto, message); err != nil {
			return err
		}
		fmt.Fprintf(w, "rowKey: %s, message: %v\n", row.RowKey, message)
	}
}

자바

package proto;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import path.to.proto.TestMessage;
import java.util.UUID;

/** Queries Google BigQuery */
public final class Main {
  public static void main(String[] args) throws Exception {
    String projectId = "your-project-id";
    BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();

    QueryJobConfiguration queryConfig =
        QueryJobConfiguration.newBuilder(
                " SELECT "
                    + "concat(word , \":\",corpus) as RowKey,"
                    + "<dataset-id>.toMyProtoMessage(STRUCT(word, "
                    + "CAST(word_count AS BIGNUMERIC))) AS ProtoResult "
                    + "FROM "
                    + "`bigquery-public-data.samples.shakespeare` "
                    + "ORDER BY word_count DESC "
                    + "LIMIT 20")
            .setUseLegacySql(false)
            .build();

    // Create a job ID so that we can safely retry.
    JobId jobId = JobId.of(UUID.randomUUID().toString());
    Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

    // Wait for the query to complete.
    queryJob = queryJob.waitFor();

    // Check for errors
    if (queryJob == null) {
      throw new RuntimeException("Job no longer exists");
    } else if (queryJob.getStatus().getError() != null) {
      // You can also look at queryJob.getStatus().getExecutionErrors() for all
      // errors, not just the latest one.
      throw new RuntimeException(queryJob.getStatus().getError().toString());
    }

    // Get the results.
    TableResult result = queryJob.getQueryResults();

    // Print all pages of the results.
    for (FieldValueList row : result.iterateAll()) {
      String key = row.get("RowKey").getStringValue();
      byte[] message = row.get("ProtoResult").getBytesValue();
      TestMessage testMessage = TestMessage.parseFrom(message);
      System.out.printf("rowKey: %s, message: %s\n", key, testMessage);
    }
  }
}

Python

"""Queries Google BigQuery."""

from google.cloud import bigquery
from path.to.proto import awesome_pb2


def main():
  project_id = "your-project-id"
  client = bigquery.Client(project=project_id)
  query_job = client.query(query="""
               SELECT
			concat(word , ":",corpus) as RowKey,
			<dataset-id>.toMyProtoMessage(
			    STRUCT(
			      word, 
			      CAST(word_count AS BIGNUMERIC)
			    )
			  ) AS ProtoResult 
		FROM
				  `bigquery-public-data.samples.shakespeare`
		ORDER BY word_count DESC
		LIMIT 20
    """)
  rows = query_job.result()
  for row in rows:
    message = awesome_pb2.TestMessage()
    message.ParseFromString(row.get("ProtoResult"))
    print(
        "rowKey: {}, message: {}".format(row.get("RowKey"), message)
    )