將資料匯出至 Pub/Sub (反向 ETL)

如要將資料匯出至 Pub/Sub,必須使用 BigQuery 持續查詢

本文說明如何從 BigQuery 設定反向擷取、轉換及載入 (RETL) 至 Pub/Sub。您可以在持續查詢中使用 EXPORT DATA 陳述式,將資料從 BigQuery 匯出至 Pub/Sub 主題

您可以透過 RETL 工作流程將資料發布至 Pub/Sub,結合 BigQuery 的分析功能與 Pub/Sub 的非同步可擴充全域訊息服務。這項工作流程可讓您以事件驅動的方式,將資料提供給下游應用程式和服務。

必要條件

您必須建立服務帳戶。如要執行持續查詢,並將結果匯出至 Pub/Sub 主題,您必須擁有服務帳戶

您必須建立 Pub/Sub 主題,才能以訊息形式接收持續查詢結果,並建立 Pub/Sub 訂閱項目,供目標應用程式接收這些訊息。

必要的角色

本節說明建立持續查詢的使用者帳戶,以及執行持續查詢的服務帳戶,分別需要哪些角色和權限。

使用者帳戶權限

如要在 BigQuery 中建立工作,使用者帳戶必須具備 bigquery.jobs.create IAM 權限。下列 IAM 角色都授予 bigquery.jobs.create 權限:

如要提交使用服務帳戶執行的工作,使用者帳戶必須具備「服務帳戶使用者」(roles/iam.serviceAccountUser)角色。如果您使用同一個使用者帳戶建立服務帳戶,則該使用者帳戶必須具備服務帳戶管理員 (roles/iam.serviceAccountAdmin) 角色。如要瞭解如何限制使用者存取單一服務帳戶,而非專案中的所有服務帳戶,請參閱「授予單一角色」。

如果使用者帳戶必須啟用持續查詢用例所需的 API,則該帳戶必須具備「服務使用管理員」角色 (roles/serviceusage.serviceUsageAdmin)。

服務帳戶權限

如要從 BigQuery 資料表匯出資料,服務帳戶必須具備 bigquery.tables.export IAM 權限。下列每個 IAM 角色都會授予 bigquery.tables.export 權限:

如要讓服務帳戶存取 Pub/Sub,您必須授予服務帳戶下列兩個 IAM 角色:

您或許也能透過自訂角色取得必要權限。

事前準備

Enable the BigQuery and Pub/Sub APIs.

Roles required to enable APIs

To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

Enable the APIs

匯出至 Pub/Sub

使用 EXPORT DATA 陳述式將資料匯出至 Pub/Sub 主題:

控制台

  1. 前往 Google Cloud 控制台的「BigQuery」頁面。

    前往「BigQuery」

  2. 在查詢編輯器中,依序點選「更多」>「查詢設定」

  3. 在「連續查詢」部分,勾選「使用連續查詢模式」核取方塊。

  4. 在「Service account」(服務帳戶) 方塊中,選取您建立的服務帳戶。

  5. 按一下 [儲存]

  6. 在查詢編輯器中輸入下列陳述式:

    EXPORT DATA
    OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID'
    ) AS
    (
    QUERY
    );

    更改下列內容:

    • PROJECT_ID:您的專案 ID。
    • TOPIC_ID:Pub/Sub 主題 ID。您可以前往 Google Cloud 控制台的「主題」頁面取得主題 ID。
    • QUERY:用於選取要匯出資料的 SQL 陳述式。SQL 陳述式只能包含支援的作業。您必須在連續查詢的 FROM 子句中使用 APPENDS 函式,指定開始處理資料的時間點。
  7. 按一下「執行」

bq

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 在指令列中,使用 bq query 指令和下列旗標執行連續查詢:

    • --continuous 旗標設為 true,即可持續查詢。
    • 使用 --connection_property 旗標指定要使用的服務帳戶。
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'EXPORT DATA OPTIONS (format = "CLOUD_PUBSUB", uri = "https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID") AS (QUERY);'

    更改下列內容:

    • PROJECT_ID:您的專案 ID。
    • SERVICE_ACCOUNT_EMAIL:服務帳戶電子郵件地址。您可以在 Google Cloud 控制台的「服務帳戶」頁面取得服務帳戶電子郵件地址。
    • QUERY:用於選取要匯出資料的 SQL 陳述式。SQL 陳述式只能包含支援的作業。您必須在連續查詢的 FROM 子句中使用 APPENDS 函式,指定開始處理資料的時間點。
  3. API

    1. 呼叫 jobs.insert 方法,執行連續查詢。在您傳遞的 Job 資源JobConfigurationQuery 資源中設定下列欄位:

      • continuous 欄位設為 true,即可讓查詢持續執行。
      • 使用 connection_property 欄位指定要使用的服務帳戶。
      curl --request POST \
        'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs'
        --header 'Authorization: Bearer $(gcloud auth print-access-token) \
        --header 'Accept: application/json' \
        --header 'Content-Type: application/json' \
        --data '("configuration":("query":"EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID') AS (QUERY);","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \
        --compressed

      更改下列內容:

      • PROJECT_ID:您的專案 ID。
      • QUERY:用於選取要匯出資料的 SQL 陳述式。SQL 陳述式只能包含支援的作業。您必須在連續查詢的 FROM 子句中使用 APPENDS 函式,指定開始處理資料的時間點。
      • SERVICE_ACCOUNT_EMAIL:服務帳戶電子郵件地址。您可以在 Google Cloud 控制台的「服務帳戶」頁面取得服務帳戶電子郵件地址。

將多個資料欄匯出至 Pub/Sub

如要在輸出內容中加入多個資料欄,可以建立包含資料欄值的 struct 資料欄,然後使用 TO_JSON_STRING 函式將 struct 值轉換為 JSON 字串。 以下範例會匯出四個資料欄的資料,並以 JSON 字串格式呈現:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

匯出最佳化

如果連續查詢工作效能似乎受到可用運算資源限制,請嘗試增加 BigQuery CONTINUOUS 運算單元預留指派作業的大小。

限制

定價

匯出連續查詢中的資料時,系統會按照 BigQuery 容量運算價格計費。如要執行連續查詢,您必須擁有使用 Enterprise 或 Enterprise Plus 版本預留位置,以及使用 CONTINUOUS 工作類型的預留位置指派

匯出資料後,系統會向您收取 Pub/Sub 使用費。 詳情請參閱「Pub/Sub 定價」。