Cloud Storage에 저장된 이벤트 데이터를 처리하는 Cloud Run 작업 실행

Workflows를 사용해서 보다 복잡한 데이터 처리를 수행하거나 기존 작업 시스템을 조정하는 워크플로의 일부로 Cloud Run 작업을 실행할 수 있습니다.

이 튜토리얼에서는 Workflows를 사용하여 Cloud Storage 버킷에 저장된 이벤트 데이터를 처리하는 Cloud Run 작업을 실행하는 방법을 보여줍니다. Cloud Storage 버킷에 이벤트 페이로드를 저장하면 고객 관리 암호화 키를 사용해서 데이터를 암호화할 수 있지만 Cloud Run 작업에 이벤트 데이터를 환경 변수로 전달할 때는 이렇게 할 수 없습니다.

아래 다이어그램은 대략적인 개요를 보여줍니다.

Pub/Sub 이벤트는 Eventarc 트리거에 의해 Workflows로 라우팅되고 Cloud Storage 버킷에 저장됩니다. Cloud Run 작업은 버킷에 저장된 이벤트 데이터를 처리합니다.

Cloud Run 작업 만들기

이 튜토리얼에서는 GitHub에서 찾을 수 있는 샘플 코드가 사용됩니다. 배포 스크립트는 Cloud Run 작업을 만들기 위한 컨테이너 이미지를 빌드합니다. 이 스크립트는 또한 Cloud Storage 버킷을 만듭니다. Cloud Run 작업은 Cloud Storage 버킷에 저장된 이벤트 데이터를 읽은 후 이벤트 데이터를 출력합니다.

  1. Cloud Shell에서 배포 스크립트를 실행 중이며 Compute Engine 기본 서비스 계정에 편집자 역할이 없는 경우 프로젝트에 대한 다음 역할을 Compute Engine 기본 서비스 계정에 부여합니다. (그렇지 않으면 이 단계를 건너뛰고 다음 단계에서 샘플 앱 저장소를 클론하면 됩니다.)

    1. Artifact Registry 작성자 역할(roles/artifactregistry.writer)을 부여합니다.

      gcloud projects add-iam-policy-binding PROJECT_ID \
          --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
          --role=roles/artifactregistry.writer

      여기에서 PROJECT_NUMBER를 Google Cloud프로젝트 번호로 바꿉니다. Google Cloud 콘솔의 시작 페이지에서 또는 다음 명령어를 실행하여 프로젝트 번호를 찾을 수 있습니다.

      gcloud projects describe PROJECT_ID --format='value(projectNumber)'

    2. 스토리지 객체 사용자 역할(roles/storage.objectUser)을 부여합니다.

      gcloud projects add-iam-policy-binding PROJECT_ID \
          --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
          --role=roles/storage.objectUser
    3. Logging 로그 작성자 역할(roles/logging.logWriter)을 부여합니다.

      gcloud projects add-iam-policy-binding PROJECT_ID \
          --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
          --role=roles/logging.logWriter
  2. 샘플 앱 저장소를 로컬 머신에 클론하여 샘플 코드를 가져옵니다.

    git clone https://github.com/GoogleCloudPlatform/workflows-demos.git

    또는 샘플을 ZIP 파일로 다운로드할 수 있습니다.

  3. 샘플 코드가 있는 디렉토리로 변경합니다.

    cd workflows-demos/cloud-run-jobs-payload-gcs/message-payload-job
  4. 배포 스크립트를 실행하여 Cloud Run 작업을 만듭니다.

    ./deploy-job.sh

이 스크립트는 message-payload-PROJECT_ID라는 Cloud Storage 버킷을 만듭니다. 여기서 PROJECT_ID는 Google Cloud 프로젝트의 ID입니다. message-payload-job이라는 Cloud Run 작업도 생성됩니다.

Cloud Run 작업을 실행하는 워크플로 배포

바로 전에 만든 Cloud Run 작업을 실행하는 워크플로를 정의하고 배포합니다. 워크플로 정의는 Workflows 문법을을 사용하여 기술된 일련의 단계로 구성됩니다.

이 워크플로는 이벤트를 수신하고, 이벤트 데이터를 Cloud Storage 버킷에 저장한 후 Cloud Run을 실행해서 이벤트 데이터를 처리합니다.

콘솔

  1. Google Cloud 콘솔에서 Workflows 페이지로 이동합니다.

    Workflows로 이동

  2. 만들기를 클릭합니다.

  3. 새 워크플로의 이름을 입력합니다(예: message-payload-workflow).

  4. 적합한 리전을 선택합니다. 예: us-central1.

  5. 서비스 계정 필드에서 앞에서 만든 서비스 계정을 선택합니다.

    서비스 계정은 워크플로의 ID로 작동합니다. 다음 역할이 서비스 계정에 이미 부여되어 있어야 합니다.

    • Cloud Run 관리자: Cloud Run 작업을 실행합니다.
    • 로그 작성자: 로그 항목을 작성합니다.
    • 스토리지 객체 생성자: Cloud Storage에서 객체를 만듭니다.
  6. 다음을 클릭합니다.

  7. 워크플로 편집기에서 다음 워크플로 정의를 입력합니다.

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${"message-payload-" + project_id}
                    - event_file: ${event.id + ".data.json"}
                    - job_name: message-payload-job
                    - job_location: us-central1
            - log_event:
                call: sys.log
                args:
                    data: ${event}
            - write_payload_to_gcs:
                call: http.post
                args:
                    url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + event_bucket + "/o"}
                    auth:
                        type: OAuth2
                    query:
                        name: ${event_file}
                    body:
                        ${event.data}
            - run_job_to_process_payload:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  8. 배포를 클릭합니다.

gcloud

  1. 워크플로의 소스 코드 파일을 만듭니다.

    touch message-payload-workflow.yaml
  2. 다음 워크플로 정의를 message-payload-workflow.yaml에 복사합니다.

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${"message-payload-" + project_id}
                    - event_file: ${event.id + ".data.json"}
                    - job_name: message-payload-job
                    - job_location: us-central1
            - log_event:
                call: sys.log
                args:
                    data: ${event}
            - write_payload_to_gcs:
                call: http.post
                args:
                    url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + event_bucket + "/o"}
                    auth:
                        type: OAuth2
                    query:
                        name: ${event_file}
                    body:
                        ${event.data}
            - run_job_to_process_payload:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  3. 다음 명령어를 입력하여 워크플로를 배포합니다.

    gcloud workflows deploy message-payload-workflow \
        --location=us-central1 \
        --source=message-payload-workflow.yaml \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    다음을 바꿉니다.

    • SERVICE_ACCOUNT_NAME: 이전에 만든 서비스 계정의 이름
    • PROJECT_ID:Google Cloud 프로젝트의 ID

    서비스 계정은 워크플로의 ID로 작동합니다. 다음 역할이 서비스 계정에 이미 부여되어 있어야 합니다.

    • roles/logging.logWriter: 로그 항목을 기록합니다.
    • roles/run.admin: Cloud Run 작업을 실행합니다.
    • roles/storage.objectCreator: Cloud Storage에서 객체를 만듭니다.

워크플로는 다음을 수행합니다.

  1. init 단계는 이벤트를 인수로 수락하고 필요한 변수를 설정합니다.

  2. log_event 단계는 sys.log 함수를 사용하여 Cloud Logging에 로그 항목을 만듭니다.

  3. write_payload_to_gcs 단계는 HTTP POST 요청을 수행하고 이벤트 페이로드 데이터를 Cloud Storage 버킷 파일에 기록합니다.

  4. run_job_to_process_payload 단계는 Cloud Run Admin API 커넥터 메서드인 googleapis.run.v1.namespaces.jobs.run을 사용하여 작업을 실행합니다. Cloud Storage 버킷 및 데이터 파일 이름은 워크플로에서 작업으로 재정의 변수로 전달됩니다.

  5. finish 단계: 워크플로의 결과로 작업 실행에 대한 정보를 반환합니다.

Pub/Sub 주제 만들기

메시지를 게시할 수 있도록 Pub/Sub 주제를 만듭니다. Pub/Sub 이벤트는 Workflows를 사용해서 이벤트를 라우팅하고 Cloud Run 작업이 이벤트 데이터를 처리할 수 있도록 이벤트를 Cloud Storage에 저장하는 방법을 보여줍니다.

콘솔

  1. Google Cloud 콘솔에서 주제 페이지로 이동합니다.

    주제로 이동

  2. 주제 만들기를 클릭합니다.

  3. 주제 ID 필드에 message-payload-topic과 같은 주제의 ID를 입력합니다.

  4. 기본 구독 추가 옵션을 유지합니다.

  5. 다른 옵션은 선택하지 마세요.

  6. 만들기를 클릭합니다.

gcloud

ID가 message-payload-topic인 주제를 만들려면 gcloud pubsub topics create 명령어를 실행합니다.

gcloud pubsub topics create message-payload-topic

워크플로에 이벤트를 라우팅하는 Eventarc 트리거 만들기

워크플로를 자동으로 실행하고 Cloud Run 작업을 실행하려면 Pub/Sub 이벤트에 응답하고 이벤트를 워크플로로 라우팅하는 Eventarc 트리거를 만듭니다. 메시지가 Pub/Sub 주제에 기록될 때마다 이벤트가 워크플로 실행을 트리거합니다.

콘솔

  1. Google Cloud 콘솔에서 Workflows 페이지로 이동합니다.

    Workflows로 이동

  2. 워크플로의 이름을 클릭합니다(예: message-payload-workflow).

  3. 워크플로 세부정보 페이지에서 수정을 클릭합니다.

  4. 워크플로 수정 페이지의 트리거 섹션에서 새 트리거 추가 > Eventarc를 클릭합니다.

    Eventarc 트리거 창이 열립니다.

  5. 트리거 이름 필드에 트리거 이름을 입력합니다(예: message-payload-trigger).

  6. 이벤트 제공자 목록에서 Cloud Pub/Sub를 선택합니다.

  7. 이벤트 목록에서 google.cloud.pubsub.topic.v1.messagePublished를 선택합니다.

  8. Cloud Pub/Sub 주제 목록에서 이전에 만든 Pub/Sub 주제를 선택합니다.

  9. 서비스 계정 필드에서 앞에서 만든 서비스 계정을 선택합니다.

    서비스 계정은 트리거의 ID로 작동합니다. 다음 역할이 서비스 계정에 이미 부여되어 있어야 합니다.

    • Eventarc 이벤트 수신자: 이벤트를 수신합니다.
    • Workflows 호출자: 워크플로를 실행합니다.
  10. 트리거 저장을 클릭합니다.

    이제 워크플로 수정 페이지의 트리거 섹션에 Eventarc 트리거가 표시됩니다.

  11. 다음을 클릭합니다.

  12. 배포를 클릭합니다.

gcloud

다음 명령어를 실행하여 Eventarc 트리거를 만듭니다.

gcloud eventarc triggers create message-payload-trigger \
    --location=us-central1 \
    --destination-workflow=message-payload-workflow \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
    --transport-topic=projects/PROJECT_ID/topics/message-payload-topic \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

다음을 바꿉니다.

  • PROJECT_ID: Google Cloud 프로젝트 ID입니다.
  • SERVICE_ACCOUNT_NAME: 이전에 만든 서비스 계정의 이름

서비스 계정은 트리거의 ID로 작동합니다. 다음 역할이 서비스 계정에 이미 부여되어 있어야 합니다.

  • roles/eventarc.eventReceiver: 이벤트 수신
  • roles/workflows.invoker: 워크플로를 실행합니다.

워크플로 트리거

Pub/Sub 주제에 메시지를 게시하고 이벤트를 생성하여 엔드 투 엔드 시스템을 테스트합니다. 자세한 내용은 이벤트 또는 Pub/Sub 메시지로 워크플로 트리거를 참조하세요.

  1. Pub/Sub 주제에 메시지를 게시하여 이벤트를 생성합니다.

    gcloud pubsub topics publish message-payload-topic --message="Hello World"
    

    이벤트 메시지를 로깅하고, Cloud Storage 버킷에 이벤트 데이터를 저장하고, Cloud Storage에 저장된 데이터를 처리하도록 Cloud Run 작업을 실행하는 워크플로에 이벤트가 라우팅됩니다. 이 작업은 1분 정도 걸릴 수 있습니다.

  2. 작업 실행을 확인해서 Cloud Run 작업이 예상한 대로 실행되었는지 확인합니다.

    gcloud run jobs executions list --job=message-payload-job

    출력에 새 작업 실행이 표시됩니다.

  3. 워크플로를 트리거하여 생성된 이벤트 관련 로그 항목을 보려면 다음 명령어를 실행합니다.

    gcloud logging read "resource.type=cloud_run_job AND textPayload:Payload"
    
  4. 다음과 같은 로그 항목을 찾습니다.

    textPayload: "Payload: {'message': {'data': 'SGVsbG8gV29ybGQ=', 'messageId': '8834108235224238',\
    \ 'publishTime': '2023-09-20T17:07:52.921Z'}, 'subscription': 'projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741'}"
    ...
    resource:
    labels:
      job_name: message-payload-job
      location: us-central1
      project_id: MY_PROJECT
    type: cloud_run_job
    textPayload: Processing message payload gs://message-payload-MY_PROJECT/8254002311197919.data.json
    
  5. Cloud Storage 버킷 객체에서 이벤트 데이터를 보고 결과가 예상한 대로인지 확인할 수 있습니다.

    1. 버킷 이름을 검색합니다.

      gcloud storage ls

      출력은 다음과 비슷합니다.

      gs://message-payload-PROJECT_ID/

    2. 버킷의 객체를 나열합니다.

      gcloud storage ls gs://message-payload-PROJECT_ID/** --recursive

      출력은 다음과 비슷하게 표시됩니다.

      gs://message-payload-PROJECT_ID/OBJECT_ID.data.json

      다음 단계에서 사용할 OBJECT_ID를 확인합니다.

    3. 버킷의 객체를 파일로 다운로드합니다.

      gcloud storage cp gs://message-payload-PROJECT_ID/OBJECT_ID.data.json message-event.txt

      OBJECT_ID를 이전 단계에서 반환된 ID로 바꿉니다.

    4. 텍스트 편집기에서 message-event.txt 파일을 엽니다. 파일에 기록된 이벤트 본문이 다음과 비슷하게 표시됩니다.

      {
        "message": {
          "data": "SGVsbG8gV29ybGQ=",
          "messageId": "8254002311197919",
          "publishTime": "2023-09-20T16:54:29.312Z"
        },
        "subscription": "projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741"
      }
      

      해당 Base64 형식에서 SGVsbG8gV29ybGQ= 데이터 값을 디코딩할 경우 "Hello World"가 반환됩니다.