Daten nach Pub/Sub exportieren (Reverse ETL)

Für den Export von Daten nach Pub/Sub sind kontinuierliche BigQuery Abfragen erforderlich.

In diesem Dokument wird beschrieben, wie Sie Reverse ETL (Extract-Transform-Load) von BigQuery nach Pub/Sub einrichten. Dazu können Sie die Anweisung EXPORT DATA in einer kontinuierlichen Abfrage verwenden, um Daten aus BigQuery in ein Pub/Sub-Thema zu exportieren.

Mit einem RETL-Workflow für Pub/Sub können Sie die Analysefunktionen von BigQuery mit dem asynchronen und skalierbaren globalen Messaging-Dienst von Pub/Sub kombinieren. Mit diesem Workflow können Sie Daten ereignisgesteuert für nachgelagerte Anwendungen und Dienste bereitstellen.

Vorbereitung

Sie müssen ein Dienstkonto erstellen. Ein Dienstkonto ist erforderlich, um eine kontinuierliche Abfrage auszuführen, mit der Ergebnisse in ein Pub/Sub-Thema exportiert werden.

Sie müssen ein Pub/Sub-Thema erstellen, um die Ergebnisse der kontinuierlichen Abfrage als Nachrichten zu empfangen, und ein Pub/Sub-Abo , mit dem die Zielanwendung diese Nachrichten empfangen kann.

Erforderliche Rollen

Dieser Abschnitt enthält Informationen zu den Rollen und Berechtigungen, die für das Nutzerkonto erforderlich sind, mit dem die kontinuierliche Abfrage erstellt wird, sowie zum Dienstkonto, das die kontinuierliche Abfrage ausführt.

Nutzerkontoberechtigungen

Zum Erstellen eines Jobs in BigQuery muss das Nutzerkonto die IAM-Berechtigung bigquery.jobs.create haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.jobs.create:

Zum Senden eines Jobs, der mit einem Dienstkonto ausgeführt wird, muss das Nutzerkonto die Rolle Dienstkontonutzer (roles/iam.serviceAccountUser) haben. Wenn Sie dasselbe Nutzerkonto zum Erstellen des Dienstkontos verwenden, muss das Nutzerkonto die Rolle Service Account Admin (roles/iam.serviceAccountAdmin) haben. Informationen zum Beschränken des Nutzerzugriffs auf ein einzelnes Dienstkonto, anstelle aller Dienstkonten in einem Projekt, finden Sie unter Einzelne Rolle gewähren.

Wenn das Nutzerkonto die APIs aktivieren muss, die für Ihren Anwendungsfall mit kontinuierlichen Abfragen erforderlich sind, muss das Nutzerkonto die Rolle Service Usage Admin (roles/serviceusage.serviceUsageAdmin) haben.

Dienstkontoberechtigungen

Zum Exportieren von Daten aus einer BigQuery-Tabelle muss das Dienstkonto die IAM-Berechtigung bigquery.tables.export haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.export:

Damit das Dienstkonto auf Pub/Sub zugreifen kann, müssen Sie dem Dienstkonto beide folgenden IAM-Rollen gewähren:

Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen erhalten.

Hinweis

Aktivieren Sie die BigQuery API und die Pub/Sub API.

Rollen, die zum Aktivieren von APIs erforderlich sind

Zum Aktivieren von APIs benötigen Sie die IAM-Rolle Service Usage-Administrator (roles/serviceusage.serviceUsageAdmin), die die Berechtigung serviceusage.services.enable enthält. Informationen zum Zuweisen von Rollen.

APIs aktivieren

Nach Pub/Sub exportieren

Verwenden Sie die EXPORT DATA Anweisung um Daten in ein Pub/Sub-Thema zu exportieren:

Console

  1. Rufen Sie in der Google Cloud Console die Seite BigQuery auf.

    BigQuery aufrufen

  2. Klicken Sie im Abfrageeditor auf Bearbeiten > Abfrageeinstellungen.

  3. Klicken Sie im Bereich Kontinuierliche Abfrage das Kästchen Modus für kontinuierliche Abfragen verwenden an.

  4. Wählen Sie im Feld Dienstkonto das erstellte Dienstkonto aus.

  5. Klicken Sie auf Speichern.

  6. Geben Sie im Abfrageeditor die folgende Anweisung ein:

    EXPORT DATA
    OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID'
    ) AS
    (
    QUERY
    );

    Dabei gilt:

    • PROJECT_ID: Ihre Projekt-ID.
    • TOPIC_ID: die Pub/Sub-Themen-ID. Sie finden die Themen-ID in der Console auf der Seite **Themen**. Google Cloud
    • QUERY: die SQL-Anweisung zum Auswählen der zu exportierenden Daten. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten. Sie müssen die APPENDS Funktion in der FROM Klausel einer kontinuierlichen Abfrage verwenden, um den Zeitpunkt anzugeben, an dem die Datenverarbeitung beginnen soll.
  7. Klicken Sie auf Ausführen.

bq

  1. Aktivieren Sie Cloud Shell in der Google Cloud Console.

    Cloud Shell aktivieren

    Unten in der Google Cloud Console wird eine Cloud Shell Sitzung gestartet und eine Befehlszeilenaufforderung angezeigt. Cloud Shell ist eine Shell-Umgebung in der das Google Cloud CLI bereits installiert ist und Werte für Ihr aktuelles Projekt bereits festgelegt sind. Das Initialisieren der Sitzung kann einige Sekunden dauern.

  2. Führen Sie in der Befehlszeile die kontinuierliche Abfrage mit dem bq query Befehl mit den folgenden Flags aus:

    • Setzen Sie das Flag --continuous auf true, um die Abfrage kontinuierlich zu machen.
    • Geben Sie mit dem Flag --connection_property ein zu verwendendes Dienstkonto an.
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'EXPORT DATA OPTIONS (format = "CLOUD_PUBSUB", uri = "https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID") AS (QUERY);'

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Ihre Projekt-ID.
    • SERVICE_ACCOUNT_EMAIL: die E-Mail-Adresse des Dienstkontos. Sie finden die E-Mail-Adresse des Dienstkontos in der Console auf der Seite **Dienstkonten**. Google Cloud
    • QUERY: die SQL-Anweisung zum Auswählen der zu exportierenden Daten. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten. Sie müssen die APPENDS Funktion in der FROM Klausel einer kontinuierlichen Abfrage verwenden, um den Zeitpunkt anzugeben, an dem die Datenverarbeitung beginnen soll.

API

  1. Führen Sie die kontinuierliche Abfrage aus, indem Sie die jobs.insert Methode aufrufen. Legen Sie die folgenden Felder in der JobConfigurationQuery Ressource der Job Ressource fest, die Sie übergeben:

    • Setzen Sie das Feld continuous auf true, um die Abfrage kontinuierlich zu machen.
    • Geben Sie mit dem Feld connection_property ein zu verwendendes Dienstkonto an.
    curl --request POST \
      'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs'
      --header 'Authorization: Bearer $(gcloud auth print-access-token) \
      --header 'Accept: application/json' \
      --header 'Content-Type: application/json' \
      --data '("configuration":("query":"EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID') AS (QUERY);","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \
      --compressed

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Ihre Projekt-ID.
    • QUERY: die SQL-Anweisung zum Auswählen der zu exportierenden Daten. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten. Sie müssen die APPENDS Funktion in der FROM Klausel einer kontinuierlichen Abfrage verwenden, um den Zeitpunkt anzugeben, an dem die Datenverarbeitung beginnen soll.
    • SERVICE_ACCOUNT_EMAIL: die E-Mail-Adresse des Dienstkontos. Sie finden die E-Mail-Adresse des Dienstkontos in der Console auf der Seite **Dienstkonten**. Google Cloud

Mehrere Spalten nach Pub/Sub exportieren

Wenn Sie mehrere Spalten in die Ausgabe aufnehmen möchten, können Sie eine Strukturspalte erstellen, die die Spaltenwerte enthält, und den Strukturwert dann mithilfe der TO_JSON_STRING-Funktion in einen JSON-Strong konvertieren. Im folgenden Beispiel werden Daten aus vier Spalten als JSON-String exportiert:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

Exportoptimierung

Wenn die Leistung Ihres kontinuierlichen Abfragejobs durch die verfügbaren Rechenressourcen eingeschränkt zu sein scheint, versuchen Sie, die Größe der BigQuery-CONTINUOUS-Slot-Reservierungszuweisung zu erhöhen.

Beschränkungen

  • Die exportierten Daten müssen aus einer einzelnen STRING oder BYTES Spalte bestehen. Der Spaltenname kann beliebig sein.
  • Sie müssen eine kontinuierliche Abfrage verwenden, um Daten nach Pub/Sub zu exportieren.
  • Sie können in der kontinuierlichen Abfrage kein Schema an ein Pub/Sub-Thema übergeben.
  • Sie können keine Daten in ein Pub/Sub-Thema exportieren, das ein Schema verwendet.
  • Beim Exportieren nach Pub/Sub können Sie JSON-formatierte Datensätze exportieren, bei denen einige Werte NULL sind. Datensätze, die nur aus NULL-Werten bestehen, können jedoch nicht exportiert werden. Sie können NULL Datensätze aus den Abfrageergebnissen ausschließen, indem Sie einen WHERE message IS NOT NULL Filter in der kontinuierlichen Abfrage einfügen.
  • Wenn Sie Daten in ein Pub/Sub-Thema exportieren, das mit einem standortbezogenen Endpunktkonfiguriert ist, muss der Endpunkt innerhalb derselben Google Cloud regionalen Grenze wie das BigQuery-Dataset konfiguriert sein, das die Tabelle enthält, die Sie abfragen.
  • Die exportierten Daten dürfen die Pub/Sub-Kontingente nicht überschreiten.

Preise

Wenn Sie Daten in einer kontinuierlichen Abfrage exportieren, werden Ihnen die Kosten gemäß den BigQuery-Kapazitätsrechenpreisen in Rechnung gestellt. Zum Ausführen kontinuierlicher Abfragen benötigen Sie eine Reservierung mit dem Enterprise oder Enterprise Plus und eine Reservierungszuweisung mit dem CONTINUOUS-Jobtyp.

Nach dem Export der Daten werden Ihnen die Kosten für die Nutzung von Pub/Sub in Rechnung gestellt. Weitere Informationen finden Sie unter Preise für Cloud Pub/Sub.