데이터 엔지니어링 파이프라인 빌드

이 가이드에서는 Antigravity용 Google Cloud Data Agent Kit 확장 프로그램에서 오케스트레이션 파이프라인을 만들고 배포하는 방법을 설명합니다.

예시 파이프라인은 Managed Service for Apache Spark에서 PySpark 스크립트를 실행합니다.

Antigravity에서 오케스트레이션 파이프라인을 로컬 버전으로 또는 main 브랜치에 변경사항을 병합할 때와 같은 GitHub 작업을 통해 배포할 수 있습니다. 이 문서에서는 오케스트레이션 파이프라인의 로컬 버전을 배포하는 방법을 보여줍니다.

시작하기 전에

시작하기 전에 다음을 완료하세요.

  1. Antigravity용 Data Agent Kit 확장 프로그램을 설치합니다.
  2. 설정을 구성합니다.
  3. 오케스트레이션 파이프라인 및 스크립트와 같은 애셋을 저장할 GitHub 저장소를 Antigravity 작업공간에 추가합니다.

필수 IAM 역할 검토

프로젝트에서 리소스를 만들고 오케스트레이션 파이프라인을 배포하고 실행할 수 있는 권한을 얻으려면 관리자에게 필요한 역할을 부여해 달라고 요청하세요.

Managed Service for Apache Airflow 환경을 만들고 관리하며 연결된 버킷에서 객체를 관리하려면 다음 역할이 필요합니다. 이러한 사용자 역할에 대한 자세한 내용은 Managed Service for Apache Airflow 문서에서 사용자에게 역할 부여를 참조하세요.

  • 환경 및 스토리지 객체 관리자(composer.environmentAndStorageObjectAdmin)
  • 서비스 계정 사용자(iam.serviceAccountUser)

BigQuery 및 Cloud Storage 리소스를 사용하려면 다음 역할이 필요합니다.

  • BigQuery 데이터 편집자 (roles/bigquery.dataEditor)
  • 스토리지 객체 관리자 (roles/storage.objectAdmin)

액세스하려는 리소스에 따라 확장 프로그램을 사용하고 오케스트레이션 파이프라인을 사용할 수 있는 역할 외에 추가 역할이 필요할 수 있습니다.

서비스 계정 만들기 및 IAM 역할 부여

Managed Airflow 3세대 환경에 고유한 서비스 계정을 사용합니다. 서비스 계정은 Managed Airflow 3세대 환경을 만들고 배포하는 모든 오케스트레이션 파이프라인을 실행합니다.

관리자에게 다음 단계를 완료해 달라고 요청하세요.

  1. IAM 문서에 설명된 대로 서비스 계정을 만듭니다.
  2. 서비스 계정에 Composer 작업자 (composer.worker) 역할을 부여합니다. 이 역할은 대부분의 경우 필요한 권한을 제공합니다.

권장사항으로, 프로젝트의 다른 리소스에 액세스해야 하는 경우 오케스트레이션 파이프라인 작업에 필요한 경우에만 이 서비스 계정에 추가 권한을 부여하세요.Google Cloud

오케스트레이션 파이프라인용 리소스 만들기 Google Cloud

이 단계에서는 오케스트레이션 파이프라인용 리소스를 만듭니다. Google Cloud

관리형 Airflow 3세대 환경 만들기

다음 구성으로 Managed Airflow 3세대 환경을 만듭니다.

  • 환경 이름: 나중에 오케스트레이션 파이프라인을 구성하는 데 사용할 이름을 입력합니다. 예: example-pipeline-scheduler.
  • 위치: 위치를 선택합니다. 이 가이드의 모든 리소스를 동일한 위치에 만드는 것이 좋습니다. 예: us-central1.
  • 서비스 계정: 이 환경에 대해 만든 서비스 계정을 선택합니다.

다음 Google Cloud CLI 명령어 예시에서는 구문을 보여줍니다.

gcloud composer environments create example-pipeline-scheduler \
  --location us-central1 \
  --image-version composer-3-airflow-2 \
  --service-account "example-account@example-project.iam.gserviceaccount.com"

스케줄러 구성에 환경 매개변수 추가

오케스트레이션 파이프라인을 실행할 Managed Airflow 환경의 연결 세부정보를 제공합니다.

Google Cloud Data Agent Kit 설정 편집기를 사용하여 만든 환경의 구성 매개변수를 추가합니다.

  1. 작업 표시줄에서 Google Cloud Data Agent Kit 아이콘을 클릭합니다.
  2. 설정을 펼친 후 설정을 클릭합니다.
  3. 스케줄러 를 선택합니다.
  4. 이전에 만든 Managed Airflow 3세대 환경의 매개변수를 입력합니다.
    • 프로젝트 ID: 환경이 있는 프로젝트의 이름입니다. 예: example-project.
    • 리전: 환경이 있는 리전입니다. 예: us-central1.
    • 환경: 환경의 이름입니다. 예: example-pipeline-scheduler.
  5. 저장 을 클릭합니다.

파이프라인 아티팩트용 버킷 만들기

Cloud Storage 버킷을 Managed Airflow 환경과 동일한 프로젝트에 만들고 example-pipelines-bucket과 비슷한 이름을 지정합니다. 이 버킷은 Managed Service for Apache Spark 작업을 저장하는 데 필요합니다.

일부 파이프라인 작업(예: Cloud Storage 버킷에 결과 출력)이 있습니다.

BigQuery에서 새 데이터 세트 및 테이블 만들기

이 가이드에서는 BigQuery 테이블에 데이터를 쓰는 파이프라인을 보여줍니다. 프로젝트에서 다음 BigQuery 리소스를 만듭니다.

  1. 새 데이터 세트wordcount_dataset이라는 이름으로 만듭니다.
  2. 새 BigQuery 테이블 wordcount_output을(를) 만듭니다.

파이프라인 애셋 추가

이 가이드에서는 BigQuery에서 읽고, 데이터를 변환 (단어 수)하고, BigQuery에 다시 로드하는 PySpark를 사용하여 일반적인 데이터 엔지니어링 작업(ETL: 추출, 변환, 로드)을 보여줍니다.

비에이전트형

다음 파일을 저장소의 /scripts 폴더에 추가합니다. 나중에 Managed Service for Apache Spark에서 이 스크립트를 실행하는 파이프라인 작업을 추가합니다.

예시 wordcount.py 파일:

#!/usr/bin/python
"""BigQuery I/O PySpark example for Word Count"""

from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName('spark-bigquery-demo') \
.getOrCreate()

# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = ARTIFACTS_BUCKET_NAME
spark.conf.set('temporaryGcsBucket', bucket)

# Load data from BigQuery public dataset (Shakespeare).
words = spark.read.format('bigquery') \
.option('table', 'bigquery-public-data:samples.shakespeare') \
.load()
words.createOrReplaceTempView('words')

# Perform word count using Spark SQL.
# This query counts occurrences of each word.
word_count = spark.sql(
    'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC'
)
word_count.show()
word_count.printSchema()

# Saving the results to a new table in BigQuery.
# Replace YOUR_PROJECT_ID with your project ID.
destination_table = 'PROJECT_ID:wordcount_dataset.wordcount_output'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.mode('overwrite') \
.save()

print(f"Successfully wrote word counts to BigQuery table: {destination_table}")

다음을 바꿉니다.

  • ARTIFACTS_BUCKET_NAME: 이전에 만든 Cloud Storage 버킷의 이름입니다. 예: example-pipelines-bucket.
  • PROJECT_ID: 환경이 있는 프로젝트의 이름입니다. 예: example-project.

에이전트형

에이전트에게 저장소의 /scripts 폴더에 샘플 PySpark 스크립트를 생성하라는 메시지를 표시합니다. 나중에 Managed Service for Apache Spark에서 이 스크립트를 실행하는 파이프라인 작업을 추가합니다.

다음과 비슷한 프롬프트를 입력합니다.

I want to create a PySpark script that does the following:

1. Loads data from the bigquery-public-data:samples.shakespeare.
2. Counts occurrences of each word across all works using a Spark SQL query.
Sum the existing word counts for each word to get the total occurrences.
I want the results to be ordered by the word popularity, most popular first.
3. Saves results to a new table in BigQuery, in my project.

My project is sample-project, the destination table is
wordcount_dataset.wordcount_output, and I want to store temporary BigQuery
export data in example-pipelines-bucket.

Save the resulting script to /scripts as wordcount.py

저장소에서 오케스트레이션 파이프라인 초기화

오케스트레이션 파이프라인을 초기화하면 Antigravity용 Data Agent Kit 확장 프로그램이 다음을 포함하는 스캐폴딩을 만듭니다.

  • 오케스트레이션 파이프라인 YAML 파일: 일정이 포함되어 있지만 정의된 작업이 없는 파이프라인 정의의 예입니다.
  • deployment.yaml: 파이프라인을 배포하는 방법을 정의하는 파이프라인 배포 구성의 예입니다. 이 파일은 Managed Airflow 환경, 아티팩트 버킷, 파이프라인 작업에서 사용하는 기타 리소스에 필요한 구성을 보여줍니다.
  • .github/workflows/deploy.yaml: GitHub 저장소의 main 브랜치에 변경사항을 병합할 때 파이프라인을 배포하는 GitHub 작업을 설정합니다.
  • .github/workflows/validate.yaml: 배포된 후 파이프라인의 유효성을 검사하는 GitHub 작업을 설정합니다.

이 문서의 뒷부분에서는 Antigravity용 Data Agent Kit 확장 프로그램을 사용하여 이러한 정의를 확장하여 오케스트레이션 파이프라인을 로컬로 만들고 배포합니다.

비에이전트형

오케스트레이션 파이프라인을 초기화하려면 다음 단계를 따르세요.

  1. 작업 표시줄에서 Google Cloud Data Agent Kit 아이콘을 클릭합니다.
  2. 데이터 엔지니어링을 펼친 후 오케스트레이션 파이프라인 초기화를 클릭합니다.
  3. 새 오케스트레이션 파이프라인의 매개변수를 입력합니다.
  4. 파이프라인 ID: 파이프라인의 ID를 입력합니다. 예: example-pipeline.
  5. Google Cloud 프로젝트 ID: 환경이 있는 프로젝트의 이름입니다. 예: example-project.
  6. 리전: 환경이 있는 리전입니다. 예: us-central1.
  7. 환경 ID: 개발하려는 환경의 이름입니다. 예: dev/staging.
  8. 스케줄러 Managed Service for Apache Airflow 환경: 파이프라인을 오케스트레이션하려는 환경의 이름입니다. 이 문서에서는 이 매개변수에 동일한 환경을 지정합니다.

  9. 아티팩트 버킷: 파이프라인 아티팩트에 사용되는 버킷의 이름입니다(gs:// 프리픽스 제외). 예: example-pipelines-bucket.

  10. 다음 을 클릭합니다.

  11. 초기화 를 클릭합니다.

  12. 파이프라인을 초기화할 작업공간을 지정합니다.

에이전트형

에이전트에게 저장소의 오케스트레이션 파이프라인용 스캐폴딩을 만들어 달라고 요청합니다.

다음과 비슷한 프롬프트를 입력합니다.

Initialize orchestration pipelines in my repository. Don't add any actions
or schedule yet. I want to do it later.

The pipeline is my-sample-pipeline, the project ID is my-project, and the
region is us-central1.

The environment ID is my-test-environment. Use the same environment ID for
the Scheduler Managed Service.

Store pipeline artifacts in example-pipelines-bucket.

저장소에서 파이프라인을 초기화한 후에는 새 스캐폴딩이 변경한 구성을 덮어쓰므로 다시 초기화할 수 없습니다. 프로젝트에서 새 파이프라인 정의 파일을 만들고 배포 구성에 추가하여 새 파이프라인을 추가할 수 있습니다.

파이프라인에 새 태스크 추가

초기 파이프라인 구성에는 작업이 없으므로 PySpark 스크립트를 실행하는 작업을 추가합니다.

비에이전트형

파이프라인을 수정하려면 다음 단계를 따르세요.

  1. 작업 표시줄에서 Google Cloud Data Agent Kit 아이콘을 클릭합니다.
  2. 데이터 엔지니어링을 펼친 후 오케스트레이션 파이프라인을 펼칩니다.
  3. example-pipeline.yaml을 선택합니다. 선택한 파이프라인의 파이프라인 편집기가 열립니다.
  4. 선택사항: 일정 트리거 노드를 선택합니다. 크론과 같은 표현식과 일정 시작 및 종료 시간을 지정하여 파이프라인의 일정을 조정할 수 있습니다. 새로 초기화된 파이프라인의 기본 일정은 매일 오전 2시에 실행되는 0 2 * * *입니다.
  1. 새 태스크를 추가합니다. 이 가이드에서는 이전에 추가한 PySpark 스크립트를 실행하는 PySpark 태스크를 추가합니다.

    1. 첫 번째 태스크 추가 를 클릭하여 새 태스크 노드를 추가합니다.
    2. PySpark 스크립트 실행script/wordcount.py 파일을 선택합니다.

    PySpark 스크립트 실행 패널이 열립니다.

    1. Spark 클러스터 모드에서 서버리스 Spark 를 선택합니다.
    2. 위치에서 환경이 있는 위치를 지정합니다. 예: us-central1.
    3. 저장 을 클릭합니다.

에이전트형

다음 프롬프트를 실행합니다.

Add the wordcount.py script to the pipeline. I want to run it in Serverless
Spark every day at 1 AM. Run it in the same region where the environment that
runs my pipeline is located. Use the minimal resource profile.

파이프라인의 로컬 버전 배포

파이프라인의 로컬 버전을 배포하여 올바르게 구성되었는지 확인합니다.

오케스트레이션 파이프라인의 로컬 버전을 배포하면 Antigravity용 Data Agent Kit 확장 프로그램이 파이프라인 번들의 로컬 버전을 Managed Airflow 환경에 업로드하고 실행합니다. 로컬 배포는 개발 환경에서 작업할 때 사용하기 위한 것입니다.

배포 명령어는 일시중지되지 않은 일정을 배포합니다. 이를 방지하려면 파이프라인 관리 창에서 일정을 수동으로 일시중지하면 됩니다. 파이프라인 YAML 파일을 수정하여 triggers: - schedule 블록을 주석 처리하거나 삭제할 수도 있습니다.

비에이전트형

예시 오케스트레이션 파이프라인의 로컬 버전을 배포하려면 다음 단계를 따르세요.

  1. 작업 표시줄에서 Google Cloud Data Agent Kit 아이콘을 클릭합니다.
  2. 데이터 엔지니어링 을 펼친 후 오케스트레이션 파이프라인 을 펼칩니다.
  3. example-pipeline.yaml을 선택합니다. 선택한 파이프라인의 파이프라인 편집기가 열립니다.
  4. 파이프라인 실행 을 선택한 후 이전에 만든 개발 또는 스테이징 환경을 선택합니다.

에이전트형

다음 프롬프트를 실행합니다.

Deploy my pipeline

파이프라인 실행 모니터링 및 실행 로그 확인

파이프라인이 배포된 후에는 파이프라인의 세부정보, 파이프라인 실행 기록, 파이프라인 실행 로그를 확인할 수 있습니다.

  1. 작업 표시줄에서 Google Cloud Data Agent Kit 아이콘을 클릭합니다.
  2. 데이터 엔지니어링을 펼친 후 파이프라인 관리를 선택합니다.
  3. 파이프라인 이름 (example-pipeline)을 클릭하여 실행 기록을 확인합니다. 특정 날짜의 실행 목록에서 개별 파이프라인 실행과 각 파이프라인 실행 내의 개별 작업 분석을 확인할 수 있습니다.
  4. 작업 ID를 클릭하여 작업 실행 로그를 확인합니다. 예시 PySpark 스크립트는 Managed Service for Apache Spark에서 실행되었으므로 작업 로그에는 배치 로그 링크가 있습니다.

파이프라인 실패 문제 해결 및 수정

파이프라인이 실패하면 파이프라인 관리 창에 진단 버튼이 표시됩니다.

에이전트형

진단 버튼을 클릭하면 에이전트가 파이프라인 실패 문제를 해결하기 위한 프롬프트를 생성합니다. 프롬프트는 클립보드에 복사되거나 새 채팅 세션에서 열립니다.

에이전트는 로그 수집, 배포된 코드와 작업공간 교차 확인, 근본 원인 분석 (RCA) 생성에 중점을 두고 전문 기술을 사용하여 파이프라인 문제를 해결합니다.

RCA를 받은 후 가능한 다음 단계는 다음과 같습니다.

  • 현재 작업공간에 근본 원인 분석을 적용합니다.
  • 에이전트에게 새 브랜치를 만들고 변경사항을 적용해 달라고 요청합니다.
  • RCA 세부정보가 포함된 Cloud Customer Care 티켓을 엽니다.

확장 프로그램 문제 해결에 대한 도움말은 Antigravity용 Data Agent Kit 확장 프로그램 문제 해결을 참조하세요.

다음 단계