연속 쿼리 만들기
이 문서에서는 BigQuery에서 연속 쿼리를 실행하는 방법을 설명합니다.
BigQuery 연속 쿼리는 지속적으로 실행되는 SQL 문입니다. 연속 쿼리를 사용하면 BigQuery에서 수신 데이터를 실시간으로 분석한 후 결과를 Bigtable, Pub/Sub 또는 Spanner로 내보내거나 결과를 BigQuery 테이블에 쓸 수 있습니다.
계정 유형 선택
사용자 계정을 사용하여 연속 쿼리 작업을 만들고 실행하거나 사용자 계정을 사용하여 연속 쿼리 작업을 만든 후 서비스 계정을 사용하여 실행할 수 있습니다. 결과를 Pub/Sub 주제로 내보내는 연속 쿼리를 실행하려면 서비스 계정을 사용해야 합니다.
사용자 계정을 사용하면 연속 쿼리가 최대 2일 동안 실행됩니다. 서비스 계정을 사용하면 연속 쿼리가 최대 150일 동안 실행됩니다. 자세한 내용은 승인을 참조하세요.
필수 권한
이 섹션에서는 연속 쿼리를 만들고 실행하는 데 필요한 권한에 대해 설명합니다. 언급된 Identity and Access Management(IAM) 역할 대신 커스텀 역할을 통해 필요한 권한을 가져올 수 있습니다.
사용자 계정 사용 시 권한
이 섹션에서는 사용자 계정을 사용하여 연속 쿼리를 만들고 실행하는 데 필요한 역할 및 권한에 대한 정보를 제공합니다.
BigQuery에서 작업을 만들려면 사용자 계정에 bigquery.jobs.create
IAM 권한이 있어야 합니다. 다음 각 IAM 역할은 bigquery.jobs.create
권한을 부여합니다.
- BigQuery 사용자(
roles/bigquery.user
) - BigQuery 작업 사용자(
roles/bigquery.jobUser
) - BigQuery 관리자(
roles/bigquery.admin
)
BigQuery 테이블에서 데이터를 내보내려면 사용자 계정에 bigquery.tables.export
IAM 권한이 있어야 합니다. 다음 각 IAM 역할은 bigquery.tables.export
권한을 부여합니다.
- BigQuery 데이터 뷰어(
roles/bigquery.dataViewer
) - BigQuery 데이터 편집자(
roles/bigquery.dataEditor
) - BigQuery 데이터 소유자(
roles/bigquery.dataOwner
) - BigQuery 관리자(
roles/bigquery.admin
)
BigQuery 테이블의 데이터를 업데이트하려면 사용자 계정에 bigquery.tables.updateData
IAM 권한이 있어야 합니다. 다음 각 IAM 역할은 bigquery.tables.updateData
권한을 부여합니다.
- BigQuery 데이터 편집자(
roles/bigquery.dataEditor
) - BigQuery 데이터 소유자(
roles/bigquery.dataOwner
) - BigQuery 관리자(
roles/bigquery.admin
)
사용자 계정에 연속 쿼리 사용 사례에 필요한 API를 사용 설정해야 하는 경우 사용자 계정에 서비스 사용량 관리자(roles/serviceusage.serviceUsageAdmin
) 역할이 있어야 합니다.
서비스 계정 사용 시 권한
이 섹션에서는 연속 쿼리를 만드는 사용자 계정과 연속 쿼리를 실행하는 서비스 계정에 필요한 역할 및 권한에 대한 정보를 제공합니다.
사용자 계정 권한
BigQuery에서 작업을 만들려면 사용자 계정에 bigquery.jobs.create
IAM 권한이 있어야 합니다. 다음 각 IAM 역할은 bigquery.jobs.create
권한을 부여합니다.
- BigQuery 사용자(
roles/bigquery.user
) - BigQuery 작업 사용자(
roles/bigquery.jobUser
) - BigQuery 관리자(
roles/bigquery.admin
)
서비스 계정을 사용하여 실행되는 작업을 제출하려면 사용자 계정에 서비스 계정 사용자(roles/iam.serviceAccountUser
) 역할이 있어야 합니다. 동일한 사용자 계정을 사용하여 서비스 계정을 만드는 경우 사용자 계정에 서비스 계정 관리자(roles/iam.serviceAccountAdmin
) 역할이 있어야 합니다. 프로젝트 내의 모든 서비스 계정이 아닌 단일 서비스 계정에 대한 사용자 액세스를 제한하는 방법은 단일 역할 부여를 참고하세요.
사용자 계정에 연속 쿼리 사용 사례에 필요한 API를 사용 설정해야 하는 경우 사용자 계정에 서비스 사용량 관리자(roles/serviceusage.serviceUsageAdmin
) 역할이 있어야 합니다.
서비스 계정 권한
BigQuery 테이블에서 데이터를 내보내려면 서비스 계정에 bigquery.tables.export
IAM 권한이 있어야 합니다. 다음 각 IAM 역할은 bigquery.tables.export
권한을 부여합니다.
- BigQuery 데이터 뷰어(
roles/bigquery.dataViewer
) - BigQuery 데이터 편집자(
roles/bigquery.dataEditor
) - BigQuery 데이터 소유자(
roles/bigquery.dataOwner
) - BigQuery 관리자(
roles/bigquery.admin
)
bigquery.tables.updateData
IAM 권한이 있어야 합니다. 다음 각 IAM 역할은 bigquery.tables.updateData
권한을 부여합니다.
- BigQuery 데이터 편집자(
roles/bigquery.dataEditor
) - BigQuery 데이터 소유자(
roles/bigquery.dataOwner
) - BigQuery 관리자(
roles/bigquery.admin
)
시작하기 전에
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the BigQuery API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles.
예약 만들기
Enterprise 또는 Enterprise Plus 버전 예약을 만든 후 CONTINUOUS
작업 유형으로 예약 할당을 만듭니다. 이 예약은 자동 확장 및 유휴 슬롯 공유를 사용할 수 있습니다.
연속 쿼리의 예약 할당에는 적용되는 예약 제한사항이 있습니다.
Pub/Sub로 내보내기
데이터를 Pub/Sub으로 내보내려면 추가 API, IAM 권한, Google Cloud 리소스가 필요합니다. 자세한 내용은 Pub/Sub로 내보내기를 참조하세요.
Pub/Sub 메시지에 맞춤 속성을 메타데이터로 삽입
Pub/Sub 속성을 사용하여 우선순위, 출처, 대상 또는 추가 메타데이터와 같은 메시지에 관한 추가 정보를 제공할 수 있습니다. 속성을 사용하여 구독에서 메시지를 필터링할 수도 있습니다.
연속 쿼리 결과 내에서 열 이름이 _ATTRIBUTES
인 경우 값이 Pub/Sub 메시지 속성에 복사됩니다.
_ATTRIBUTES
내에 제공된 필드는 속성 키로 사용됩니다.
_ATTRIBUTES
열은 ARRAY<STRUCT<STRING, STRING>>
또는 STRUCT<STRING>
형식의 JSON
유형이어야 합니다.
예시는 Pub/Sub 주제로 데이터 내보내기를 참조하세요.
Bigtable로 내보내기
데이터를 Bigtable로 내보내려면 추가 API, IAM 권한, Google Cloud리소스가 필요합니다. 자세한 내용은 Bigtable로 내보내기를 참조하세요.
Spanner로 내보내기
데이터를 Spanner로 내보내려면 추가 API, IAM 권한, Google Cloud리소스가 필요합니다. 자세한 내용은 Spanner로 내보내기 (역방향 ETL)를 참고하세요.
BigQuery 테이블에 데이터 쓰기
INSERT
문을 사용하여 BigQuery 테이블에 데이터를 쓸 수 있습니다.
AI 함수 사용
연속 쿼리에서 지원되는 AI 함수를 사용하려면 추가 API, IAM 권한 및 Google Cloud리소스가 필요합니다. 자세한 내용은 사용 사례에 따라 다음 주제 중 하나를 참고하세요.
ML.GENERATE_TEXT
함수를 사용하여 텍스트 생성ML.GENERATE_EMBEDDING
함수를 사용하여 텍스트 임베딩 생성ML.UNDERSTAND_TEXT
함수를 사용하여 텍스트 이해하기ML.TRANSLATE
함수를 사용하여 텍스트 번역
연속 쿼리에서 AI 함수를 사용하는 경우 쿼리 출력이 함수의 할당량 내에 유지되는지 여부를 고려하세요. 할당량을 초과하면 처리되지 않은 레코드를 별도로 처리해야 할 수 있습니다.
시작점 지정
연속 쿼리의 FROM
절에 APPENDS
함수를 사용하여 처리할 가장 오래된 데이터를 지정해야 합니다. 예를 들어 APPENDS(TABLE my_table, CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
는 연속 쿼리가 시작되기 최대 10분 전에 테이블 my_table
에 추가된 데이터를 처리하도록 BigQuery에 지시합니다.
my_table
에 추가된 데이터는 들어오는 대로 처리됩니다. 데이터 처리에는 지연이 적용되지 않습니다.
연속 쿼리에서 APPENDS
함수를 사용하는 경우 end_timestamp
인수를 제공하지 마세요.
다음 예는 스트리밍 택시 탑승 정보를 수신하는 BigQuery 테이블을 쿼리할 때 APPENDS
함수를 사용하여 특정 시점부터 연속 쿼리를 시작하는 방법을 보여줍니다.
EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING(STRUCT(ride_id, timestamp, latitude, longitude)) AS message FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute');
현재 시간보다 이전 시작점을 지정합니다.
현재 시점 이전의 데이터를 처리하려면 APPENDS
함수를 사용하여 쿼리의 이전 시작점을 지정하면 됩니다. 지정하는 시작점은 선택하려는 테이블의 시간 이동 기간 내에 있어야 합니다. 시간 이동 기간은 기본적으로 지난 7일을 포함합니다.
시간 이동 기간 밖의 데이터를 포함하려면 표준 쿼리를 사용하여 특정 시점까지 데이터를 삽입하거나 내보낸 다음 해당 시점부터 연속 쿼리를 시작하세요.
예
다음 예는 특정 시점까지 스트리밍 택시 탑승 정보를 수신하는 BigQuery 테이블에서 이전 데이터를 테이블에 로드한 후 이전 데이터의 차단 지점부터 연속 쿼리를 시작하는 방법을 보여줍니다.
특정 시점까지 데이터를 채우려면 표준 쿼리를 실행하세요.
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM `myproject.real_time_taxi_streaming.taxirides` -- Include all data inserted into the table up to this point in time. -- This timestamp must be within the time travel window. FOR SYSTEM_TIME AS OF '2025-01-01 00:00:00 UTC' WHERE ride_status = 'dropoff';
쿼리가 중지된 시점부터 연속 쿼리를 실행합니다.
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to start processing -- data right where the batch query left off. -- This timestamp must be within the time travel window. TIMESTAMP '2025-01-01 00:00:00 UTC' + INTERVAL 1 MICROSECOND) WHERE ride_status = 'dropoff';
사용자 계정을 사용하여 연속 쿼리 실행
이 섹션에서는 사용자 계정을 사용하여 연속 쿼리를 실행하는 방법을 설명합니다. 연속 쿼리가 실행된 후 쿼리 실행을 중단하지 않고 Google Cloud 콘솔, 터미널 창 또는 애플리케이션을 닫을 수 있습니다. 사용자 계정으로 실행되는 연속 쿼리는 최대 2일 동안 실행된 후 자동으로 중지됩니다. 새로 수신되는 데이터를 계속 처리하려면 새 연속 쿼리를 시작하고 시작점을 지정하세요. 이 프로세스를 자동화하려면 실패한 쿼리 재시도를 참고하세요.
연속 쿼리를 실행하려면 다음 단계를 따르세요.
콘솔
Google Cloud 콘솔에서 BigQuery 페이지로 이동합니다.
쿼리 편집기에서
더보기를 클릭합니다.- 쿼리 모드 선택 섹션에서 연속 쿼리를 선택합니다.
- 확인을 클릭합니다.
- 선택사항: 쿼리 실행 시간을 제어하려면 쿼리 설정을 클릭하고 작업 제한 시간을 밀리초 단위로 설정합니다.
쿼리 편집기에서 연속 쿼리에 대한 SQL 문을 입력합니다. SQL 문에는 지원되는 작업만 포함되어야 합니다.
실행을 클릭합니다.
bq
-
In the Google Cloud console, activate Cloud Shell.
Cloud Shell에서
--continuous
플래그와 함께bq query
명령어를 사용하여 연속 쿼리를 실행합니다.bq query --use_legacy_sql=false --continuous=true 'QUERY'
QUERY
을 연속 쿼리의 SQL 문으로 바꿉니다. SQL 문에는 지원되는 작업만 포함되어야 합니다.--job_timeout_ms
플래그를 사용하여 쿼리 실행 시간을 제어할 수 있습니다.PROJECT_ID
: 프로젝트 ID입니다.QUERY
: 연속 쿼리에 대한 SQL 문입니다. SQL 문에는 지원되는 작업만 포함되어야 합니다.
API
jobs.insert
메서드를 호출하여 연속 쿼리를 실행합니다.
전달하는 Job
리소스의 JobConfigurationQuery
에서 continuous
필드를 true
로 설정해야 합니다.
선택적으로 jobTimeoutMs
필드를 설정하여 쿼리 실행 시간을 제어할 수 있습니다.
curl --request POST \ "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \ --header "Authorization: Bearer $(gcloud auth print-access-token)" \ --header "Content-Type: application/json; charset=utf-8" \ --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true}}}' \ --compressed
다음을 바꿉니다.
서비스 계정을 사용하여 연속 쿼리 실행
이 섹션에서는 서비스 계정을 사용하여 연속 쿼리를 실행하는 방법을 설명합니다. 연속 쿼리가 실행된 후 쿼리 실행을 중단하지 않고 Google Cloud 콘솔, 터미널 창 또는 애플리케이션을 닫을 수 있습니다. 서비스 계정을 사용하여 실행되는 연속 쿼리는 최대 150일 동안 실행된 후 자동으로 중지됩니다. 새로 수신되는 데이터를 계속 처리하려면 새 연속 쿼리를 시작하고 시작점을 지정하세요. 이 프로세스를 자동화하려면 실패한 쿼리 재시도를 참고하세요.
서비스 계정을 사용하여 연속 쿼리를 실행하려면 다음 단계를 따르세요.
콘솔
- 서비스 계정을 만듭니다.
- 서비스 계정에 필요한 권한을 부여합니다.
Google Cloud 콘솔에서 BigQuery 페이지로 이동합니다.
쿼리 편집기에서 더보기를 클릭합니다.
쿼리 모드 선택 섹션에서 연속 쿼리를 선택합니다.
확인을 클릭합니다.
쿼리 편집기에서 더보기 > 쿼리 설정을 클릭합니다.
연속 쿼리 섹션에서 서비스 계정 상자를 사용하여 생성한 서비스 계정을 선택합니다.
선택사항: 쿼리 실행 시간을 제어하려면 작업 제한 시간을 밀리초 단위로 설정합니다.
저장을 클릭합니다.
쿼리 편집기에서 연속 쿼리에 대한 SQL 문을 입력합니다. SQL 문에는 지원되는 작업만 포함되어야 합니다.
실행을 클릭합니다.
bq
- 서비스 계정을 만듭니다.
- 서비스 계정에 필요한 권한을 부여합니다.
-
In the Google Cloud console, activate Cloud Shell.
명령줄에서 다음 플래그와 함께
bq query
명령어를 사용하여 연속 쿼리를 실행합니다.--continuous
플래그를true
로 설정하여 쿼리를 연속으로 만듭니다.--connection_property
플래그를 사용하여 사용할 서비스 계정을 지정합니다.- 선택사항:
--job_timeout_ms
플래그를 설정하여 쿼리 런타임을 제한합니다.
bq query --project_id=PROJECT_ID --use_legacy_sql=false \ --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \ 'QUERY'
다음을 바꿉니다.
PROJECT_ID
: 프로젝트 ID입니다.SERVICE_ACCOUNT_EMAIL
: 서비스 계정 이메일입니다. Google Cloud 콘솔의 서비스 계정 페이지에서 서비스 계정 이메일을 가져올 수 있습니다.QUERY
: 연속 쿼리에 대한 SQL 문입니다. SQL 문에는 지원되는 작업만 포함되어야 합니다.
- 서비스 계정을 만듭니다.
- 서비스 계정에 필요한 권한을 부여합니다.
jobs.insert
메서드를 호출하여 연속 쿼리를 실행합니다. 전달하는Job
리소스의JobConfigurationQuery
리소스에서 다음 필드를 설정합니다.continuous
필드를true
로 설정하여 쿼리를 연속으로 만듭니다.connectionProperties
필드를 사용하여 사용할 서비스 계정을 지정합니다.
선택적으로
JobConfiguration
리소스에서jobTimeoutMs
필드를 설정하여 쿼리 실행 시간을 제어할 수 있습니다.curl --request POST \ "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \ --header "Authorization: Bearer $(gcloud auth print-access-token)" \ --header "Content-Type: application/json; charset=utf-8" \ --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":[{"key":"service_account","value":"SERVICE_ACCOUNT_EMAIL"}]}}}' \ --compressed
다음을 바꿉니다.
PROJECT_ID
: 프로젝트 ID입니다.QUERY
: 연속 쿼리에 대한 SQL 문입니다. SQL 문에는 지원되는 작업만 포함되어야 합니다.SERVICE_ACCOUNT_EMAIL
: 서비스 계정 이메일입니다. Google Cloud 콘솔의 서비스 계정 페이지에서 서비스 계정 이메일을 가져올 수 있습니다.
API
커스텀 작업 ID 만들기
모든 쿼리 작업에는 작업을 검색하고 관리하는 데 사용할 수 있는 작업 ID가 할당됩니다. 기본적으로 작업 ID는 무작위로 생성됩니다. 작업 기록 또는 작업 탐색기를 사용하여 연속 쿼리의 작업 ID를 더 쉽게 검색할 수 있도록 맞춤 작업 ID 접두사를 할당할 수 있습니다.
Google Cloud 콘솔에서 BigQuery 페이지로 이동합니다.
쿼리 편집기에서 더보기를 클릭합니다.
쿼리 모드 선택 섹션에서 연속 쿼리를 선택합니다.
확인을 클릭합니다.
쿼리 편집기에서 더보기 > 쿼리 설정을 클릭합니다.
맞춤 작업 ID 접두사 섹션에 맞춤 이름 접두사를 입력합니다.
저장을 클릭합니다.
예시
다음 SQL 예는 연속 쿼리의 일반적인 사용 사례를 보여줍니다.
Pub/Sub 주제로 데이터 내보내기
다음 예시는 스트리밍 택시 탑승 정보를 수신하는 BigQuery 테이블에서 데이터를 필터링하고 메시지 속성과 함께 데이터를 실시간으로 Pub/Sub 주제에 게시하는 연속 쿼리를 보여줍니다.
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude)) AS message, TO_JSON( STRUCT( CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' );
Bigtable 테이블로 데이터 내보내기
다음 예시는 스트리밍 택시 탑승 정보를 수신하는 BigQuery 테이블에서 데이터를 필터링하고 데이터를 실시간으로 Bigtable 테이블로 내보내는 연속 쿼리를 보여줍니다.
EXPORT DATA OPTIONS ( format = 'CLOUD_BIGTABLE', truncate = TRUE, overwrite = TRUE, uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides') AS ( SELECT CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey, STRUCT( timestamp, latitude, longitude, meter_reading, ride_status, passenger_count) AS features FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' );
Spanner 테이블로 데이터 내보내기
다음 예시는 스트리밍 택시 탑승 정보를 수신하는 BigQuery 테이블에서 데이터를 필터링한 후 데이터를 실시간으로 Spanner 테이블로 내보내는 연속 쿼리를 보여줍니다.
EXPORT DATA OPTIONS ( format = 'CLOUD_SPANNER', uri = 'https://spanner.googleapis.com/projects/myproject/instances/myspannerinstance/databases/taxi-real-time-rides', spanner_options ="""{ "table": "rides", -- To ensure data is written to Spanner in the correct sequence -- during a continuous export, use the change_timestamp_column -- option. This should be mapped to a timestamp column from your -- BigQuery data. If your source data lacks a timestamp, the -- _CHANGE_TIMESTAMP pseudocolumn provided by the APPENDS function -- will be automatically mapped to the "change_timestamp" column. "change_timestamp_column": "change_timestamp" }""" ) AS ( SELECT ride_id, latitude, longitude, meter_reading, ride_status, passenger_count FROM APPENDS( TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' );
BigQuery 테이블에 데이터 쓰기
다음 예시는 스트리밍 택시 탑승 정보를 수신하는 BigQuery 테이블에서 데이터를 필터링하고 변환한 후 데이터를 실시간으로 다른 BigQuery 테이블에 쓰는 연속 쿼리를 보여줍니다. 이렇게 하면 데이터를 추가 다운스트림 분석에 사용할 수 있습니다.
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'dropoff';
Vertex AI 모델을 사용하여 데이터 처리
다음 예시는 Vertex AI 모델을 사용하여 현재 위도 및 경도를 기준으로 택시 탑승자를 위한 광고를 생성한 후 결과를 실시간으로 Pub/Sub 주제로 내보내는 연속 쿼리를 보여줍니다.
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude, prompt, ml_generate_text_llm_result)) AS message FROM ML.GENERATE_TEXT( MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`, ( SELECT timestamp, ride_id, latitude, longitude, CONCAT( 'Generate an ad based on the current latitude of ', latitude, ' and longitude of ', longitude) AS prompt FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you -- want to start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' ), STRUCT( 50 AS max_output_tokens, 1.0 AS temperature, 40 AS top_k, 1.0 AS top_p, TRUE AS flatten_json_output)) AS ml_output );
연속 쿼리의 SQL 수정
연속 쿼리 작업이 실행되는 동안에는 연속 쿼리에 사용되는 SQL을 업데이트할 수 없습니다. 연속 쿼리 작업을 취소하고 SQL을 수정한 다음 원래 연속 쿼리 작업을 중지한 지점부터 새 연속 쿼리 작업을 시작해야 합니다.
연속 쿼리에 사용되는 SQL을 수정하려면 다음 단계를 따르세요.
- 업데이트하려는 연속 쿼리 작업의 작업 세부정보를 보고 작업 ID를 기록합니다.
- 가능한 경우 업스트림 데이터 수집을 일시 정지합니다. 이렇게 할 수 없으면 연속 쿼리가 다시 시작될 때 일부 데이터 중복이 발생할 수 있습니다.
- 수정할 연속 쿼리를 취소합니다.
INFORMATION_SCHEMA
JOBS
보기를 사용하여 원래 연속 쿼리 작업의end_time
값을 가져옵니다.SELECT end_time FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE EXTRACT(DATE FROM creation_time) = current_date() AND error_result.reason = 'stopped' AND job_id = 'JOB_ID';
다음을 바꿉니다.
PROJECT_ID
: 프로젝트 ID입니다.REGION
: 프로젝트에 사용되는 리전입니다.JOB_ID
: 1단계에서 식별한 연속 쿼리 작업 ID입니다.
5단계에서 가져온
end_time
값을 시작 값으로 사용하여 특정 시점에서 연속 쿼리를 시작하도록 연속 쿼리 SQL 문을 수정합니다.필요한 변경사항이 반영되도록 연속 쿼리 SQL 문을 수정합니다.
수정된 연속 쿼리를 실행합니다.
연속 쿼리 취소
다른 작업과 마찬가지로 연속 쿼리 작업을 취소할 수 있습니다. 작업이 취소된 후 쿼리가 중지되는 데 최대 1분이 걸릴 수 있습니다.
쿼리를 취소한 후 다시 시작하면 다시 시작된 쿼리는 독립적인 새 쿼리처럼 작동합니다. 다시 시작된 쿼리는 이전 작업이 중지된 위치에서 데이터 처리를 시작하지 않으며 이전 쿼리의 결과를 참조할 수 없습니다. 특정 시점부터 연속 쿼리 시작을 참고하세요.
쿼리 모니터링 및 오류 처리
데이터 불일치, 스키마 변경, 일시적인 서비스 중단 또는 유지보수와 같은 요인으로 인해 연속 쿼리가 중단될 수 있습니다. BigQuery에서 일부 일시적인 오류를 처리하지만 작업 복원력을 개선하기 위한 권장사항은 다음과 같습니다.