Die Vorlage "Change Data Capture von MySQL für BigQuery mit Debezium und Pub/Sub" ist eine Streamingpipeline, die Pub/Sub-Nachrichten mit Änderungsdaten aus einer MySQL-Datenbank liest und die Datensätze in BigQuery schreibt. Ein Debezium-Connector erfasst Änderungen an der MySQL-Datenbank und veröffentlicht die geänderten Daten in Pub/Sub. Die Vorlage liest dann die Pub/Sub-Nachrichten und schreibt sie in BigQuery.
Über diese Vorlage können Sie MySQL-Datenbanken und BigQuery-Tabellen miteinander synchronisieren. Die Pipeline schreibt die geänderten Daten in eine BigQuery-Staging-Tabelle und aktualisiert in regelmäßigen Abständen eine BigQuery-Tabelle zu Replikation der MySQL-Datenbank.
Pipelineanforderungen
- Der Debezium-Connector muss bereitgestellt sein.
- Die Pub/Sub-Nachrichten müssen in einer Beam Row serialisiert sein.
Vorlagenparameter
Erforderliche Parameter
- inputSubscriptions: Die durch Kommas getrennte Liste der Pub/Sub-Eingabeabos, aus denen gelesen werden soll, im Format
<SUBSCRIPTION_NAME>,<SUBSCRIPTION_NAME>, .... - changeLogDataset: Das BigQuery-Dataset, in dem die Staging-Tabellen gespeichert werden sollen, im Format <DATASET_NAME>.
- replicaDataset: Der Speicherort des BigQuery-Datasets, in dem die Replikattabellen gespeichert werden sollen, im Format <DATASET_NAME>.
Optionale Parameter
- inputTopics: Durch Kommas getrennte Liste von PubSub-Themen, an die CDC-Daten übertragen werden.
- updateFrequencySecs: Das Intervall, in dem die Pipeline die BigQuery-Tabelle zur Replikation der MySQL-Datenbank aktualisiert.
- useSingleTopic: Setzen Sie diesen Wert auf
true, wenn Sie den Debezium-Connector so konfiguriert haben, dass alle Tabellenaktualisierungen in einem einzigen Thema veröffentlicht werden. Die Standardeinstellung ist "false". - useStorageWriteApi: Wenn „true“, verwendet die Pipeline die BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). Der Standardwert ist
false. Weitere Informationen finden Sie unter „Storage Write API verwenden“ (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - useStorageWriteApiAtLeastOnce: Gibt bei Verwendung der Storage Write API die Schreibsemantik an. Wenn Sie die "Mindestens einmal"-Semantik verwenden möchten (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), legen Sie diesen Parameter auf
truefest. Wenn Sie die "Genau einmal"-Semantik verwenden möchten, legen Sie den Parameter auffalsefest. Dieser Parameter gilt nur, wennuseStorageWriteApitrueist. Der Standardwert istfalse. - numStorageWriteApiStreams: Gibt bei Verwendung der Storage Write API die Anzahl der Schreibstreams an. Wenn
useStorageWriteApitrueunduseStorageWriteApiAtLeastOncefalseist, müssen Sie diesen Parameter festlegen. Die Standardeinstellung ist 0. - storageWriteApiTriggeringFrequencySec: Wenn Sie die Storage Write API verwenden, wird die Triggerhäufigkeit in Sekunden angegeben. Wenn
useStorageWriteApitrueunduseStorageWriteApiAtLeastOncefalseist, müssen Sie diesen Parameter festlegen.
Führen Sie die Vorlage aus.
Führen Sie die folgenden Schritte aus, um diese Vorlage auszuführen:
- Klonen Sie das DataflowTemplates-Repository auf Ihren lokalen Computer.
- Wechseln Sie zum Verzeichnis
v2/cdc-parent. - Achten Sie darauf, dass der Debezium-Connector bereitgestellt ist.
- Führen Sie mit Maven die Dataflow-Vorlage aus.
mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \ --inputSubscriptions=SUBSCRIPTIONS \ --updateFrequencySecs=300 \ --changeLogDataset=CHANGELOG_DATASET \ --replicaDataset=REPLICA_DATASET \ --project=PROJECT_ID \ --region=REGION_NAME"
Ersetzen Sie Folgendes:
PROJECT_ID: die Google Cloud Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenSUBSCRIPTIONS: Ihre durch Kommas getrennte Liste von Pub/Sub-Abonamen.CHANGELOG_DATASET: Ihr BigQuery-Dataset für Änderungslogdaten.REPLICA_DATASET: Ihr BigQuery-Dataset für Replikattabellen.
Nächste Schritte
- Dataflow-Vorlagen
- Sehen Sie sich die Liste der von Google bereitgestellten Vorlagen an.