Daten nach Pub/Sub exportieren (Reverse ETL)

Für den Export von Daten nach Pub/Sub sind kontinuierliche BigQuery-Abfragen erforderlich.

In diesem Dokument wird beschrieben, wie Sie einen Workflow für Reverse Extrahieren, Transformieren und Laden (Reverse ETL) von BigQuery nach Pub/Sub einrichten. Dazu können Sie die Anweisung EXPORT DATA in einer kontinuierlichen Abfrage verwenden, um Daten aus BigQuery in ein Pub/Sub-Thema zu exportieren.

Mit einem RETL-Workflow für Pub/Sub können Sie die Analysefunktionen von BigQuery mit dem asynchronen und skalierbaren globalen Messaging-Dienst von Pub/Sub kombinieren. Mit diesem Workflow können Sie Daten ereignisgesteuert für Downstream-Anwendungen und -Dienste bereitstellen.

Vorbereitung

Sie müssen ein Dienstkonto erstellen. Für die Ausführung einer kontinuierlichen Abfrage, mit der Ergebnisse in ein Pub/Sub-Thema exportiert werden, ist ein Dienstkonto erforderlich.

Sie müssen ein Pub/Sub-Thema erstellen, um die Ergebnisse der kontinuierlichen Abfrage als Nachrichten zu empfangen, und ein Pub/Sub-Abo, das die Zielanwendung zum Empfangen dieser Nachrichten verwenden kann.

Erforderliche Rollen

Dieser Abschnitt enthält Informationen zu den Rollen und Berechtigungen, die für das Nutzerkonto erforderlich sind, mit dem die kontinuierliche Abfrage erstellt wird, sowie zum Dienstkonto, das die kontinuierliche Abfrage ausführt.

Nutzerkontoberechtigungen

Zum Erstellen eines Jobs in BigQuery muss das Nutzerkonto die IAM-Berechtigung bigquery.jobs.create haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.jobs.create:

Wenn Sie einen Job senden möchten, der mit einem Dienstkonto ausgeführt wird, muss das Nutzerkonto die Rolle Dienstkontonutzer (roles/iam.serviceAccountUser) haben. Wenn Sie dasselbe Nutzerkonto zum Erstellen des Dienstkontos verwenden, muss das Nutzerkonto die Rolle Service Account Admin (roles/iam.serviceAccountAdmin) haben. Informationen dazu, wie Sie den Zugriff eines Nutzers auf ein einzelnes Dienstkonto anstelle aller Dienstkonten in einem Projekt einschränken, finden Sie unter Einzelne Rolle zuweisen.

Wenn das Nutzerkonto die APIs aktivieren muss, die für Ihren Anwendungsfall mit kontinuierlichen Abfragen erforderlich sind, muss das Nutzerkonto die Rolle Service Usage Admin (roles/serviceusage.serviceUsageAdmin) haben.

Dienstkontoberechtigungen

Zum Exportieren von Daten aus einer BigQuery-Tabelle muss das Dienstkonto die IAM-Berechtigung bigquery.tables.export haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.export:

Damit das Dienstkonto auf Pub/Sub zugreifen kann, müssen Sie ihm die folgenden beiden IAM-Rollen zuweisen:

Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen erhalten.

Hinweise

Enable the BigQuery and Pub/Sub APIs.

Roles required to enable APIs

To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

Enable the APIs

Nach Pub/Sub exportieren

Mit der EXPORT DATA-Anweisung können Sie Daten in ein Pub/Sub-Thema exportieren:

Console

  1. Öffnen Sie in der Google Cloud Console die Seite BigQuery.

    BigQuery aufrufen

  2. Klicken Sie im Abfrageeditor auf Mehr > Abfrageeinstellungen.

  3. Klicken Sie im Bereich Kontinuierliche Abfrage das Kästchen Modus für kontinuierliche Abfragen verwenden an.

  4. Wählen Sie im Feld Dienstkonto das erstellte Dienstkonto aus.

  5. Klicken Sie auf Speichern.

  6. Geben Sie im Abfrageeditor die folgende Anweisung ein:

    EXPORT DATA
    OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID'
    ) AS
    (
    QUERY
    );

    Dabei gilt:

    • PROJECT_ID: Ihre Projekt-ID.
    • TOPIC_ID: die Pub/Sub-Themen-ID. Sie können die Themen-ID in der Console auf der Seite Google Cloud abrufen.
    • QUERY: Die SQL-Anweisung zum Auswählen der zu exportierenden Daten. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten. Sie müssen die Funktion APPENDS in der FROM-Klausel einer kontinuierlichen Abfrage verwenden, um den Zeitpunkt anzugeben, an dem die Verarbeitung von Daten beginnen soll.
  7. Klicken Sie auf Ausführen.

bq

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

  2. Führen Sie die kontinuierliche Abfrage in der Befehlszeile mit dem Befehl bq query mit den folgenden Flags aus:

    • Setzen Sie das Flag --continuous auf true, um die Abfrage kontinuierlich auszuführen.
    • Verwenden Sie das Flag --connection_property, um ein zu verwendendes Dienstkonto anzugeben.
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'EXPORT DATA OPTIONS (format = "CLOUD_PUBSUB", uri = "https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID") AS (QUERY);'

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Ihre Projekt-ID.
    • SERVICE_ACCOUNT_EMAIL: die E-Mail-Adresse des Dienstkontos. Sie können die E‑Mail-Adresse des Dienstkontos in der Google Cloud Console auf der Seite Dienstkonten abrufen.
    • QUERY: Die SQL-Anweisung zum Auswählen der zu exportierenden Daten. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten. Sie müssen die Funktion APPENDS in der FROM-Klausel einer kontinuierlichen Abfrage verwenden, um den Zeitpunkt anzugeben, an dem die Verarbeitung von Daten beginnen soll.
  3. API

    1. Führen Sie die Continuous Query aus, indem Sie die Methode jobs.insert aufrufen. Legen Sie in der JobConfigurationQuery-Ressource der Job-Ressource, die Sie übergeben, die folgenden Felder fest:

      • Setzen Sie das Feld continuous auf true, um die Abfrage kontinuierlich zu machen.
      • Verwenden Sie das Feld connection_property, um ein zu verwendendes Dienstkonto anzugeben.
      curl --request POST \
        'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs'
        --header 'Authorization: Bearer $(gcloud auth print-access-token) \
        --header 'Accept: application/json' \
        --header 'Content-Type: application/json' \
        --data '("configuration":("query":"EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID') AS (QUERY);","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \
        --compressed

      Ersetzen Sie Folgendes:

      • PROJECT_ID: Ihre Projekt-ID.
      • QUERY: Die SQL-Anweisung zum Auswählen der zu exportierenden Daten. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten. Sie müssen die Funktion APPENDS in der FROM-Klausel einer kontinuierlichen Abfrage verwenden, um den Zeitpunkt anzugeben, an dem die Verarbeitung von Daten beginnen soll.
      • SERVICE_ACCOUNT_EMAIL: die E-Mail-Adresse des Dienstkontos. Sie können die E‑Mail-Adresse des Dienstkontos in der Google Cloud Console auf der Seite Dienstkonten abrufen.

Mehrere Spalten nach Pub/Sub exportieren

Wenn Sie mehrere Spalten in die Ausgabe aufnehmen möchten, können Sie eine Strukturspalte erstellen, die die Spaltenwerte enthält, und den Strukturwert dann mithilfe der TO_JSON_STRING-Funktion in einen JSON-Strong konvertieren. Im folgenden Beispiel werden Daten aus vier Spalten exportiert, die als JSON-String formatiert sind:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

Exportoptimierung

Wenn die Leistung Ihres kontinuierlichen Abfragejobs durch die verfügbaren Rechenressourcen eingeschränkt zu sein scheint, versuchen Sie, die Größe der BigQuery-CONTINUOUS-Slot-Reservierungszuweisung zu erhöhen.

Beschränkungen

  • Die exportierten Daten müssen aus einer einzelnen Spalte vom Typ STRING oder BYTES bestehen. Der Spaltenname kann beliebig gewählt werden.
  • Für den Export nach Pub/Sub ist eine kontinuierliche Abfrage erforderlich.
  • Sie können kein Schema an ein Pub/Sub-Thema in der kontinuierlichen Abfrage übergeben.
  • Sie können keine Daten in ein Pub/Sub-Thema exportieren, für das ein Schema verwendet wird.
  • Beim Exportieren nach Pub/Sub können Sie Datensätze im JSON-Format exportieren, in denen einige Werte NULL sind. Sie können jedoch keine Datensätze exportieren, die nur aus NULL-Werten bestehen. Sie können NULL-Datensätze aus den Abfrageergebnissen ausschließen, indem Sie einen WHERE message IS NOT NULL-Filter in die Continuous Query einfügen.
  • Wenn Sie Daten in ein Pub/Sub-Thema exportieren, das mit einem Standortendpunkt konfiguriert ist, muss der Endpunkt innerhalb derselben Google Cloud regionalen Google Cloud Grenze wie das BigQuery-Dataset konfiguriert sein, das die Tabelle enthält, die Sie abfragen.
  • Die exportierten Daten dürfen die Pub/Sub-Kontingente nicht überschreiten.

Preise

Wenn Sie Daten in einer kontinuierlichen Abfrage exportieren, werden Ihnen die Kosten gemäß den BigQuery-Kapazitätsrechenpreisen in Rechnung gestellt. Zum Ausführen kontinuierlicher Abfragen benötigen Sie eine Reservierung mit dem Enterprise oder Enterprise Plus und eine Reservierungszuweisung mit dem CONTINUOUS-Jobtyp.

Nachdem die Daten exportiert wurden, wird die Nutzung von Pub/Sub in Rechnung gestellt. Weitere Informationen finden Sie unter Preise für Cloud Pub/Sub.