Die Vorlage "BigQuery für Elasticsearch" ist eine Batchpipeline, die Daten aus einer BigQuery-Tabelle als Dokumente in Elasticsearch aufnimmt. Die Vorlage kann entweder die gesamte Tabelle oder bestimmte Datensätze mithilfe einer angegebenen Abfrage lesen.
Pipelineanforderungen
- Die BigQuery-Quelltabelle
- Ein Elasticsearch-Host auf einer Google Cloud-Instanz oder in Elastic Cloud mit Elasticsearch Version 7.0 oder höher. Muss über die Dataflow-Worker-Maschinen zugänglich sein.
Vorlagenparameter
Erforderliche Parameter
- connectionUrl: Die Elasticsearch-URL im Format
https://hostname:[port]. Wenn Sie Elastic Cloud verwenden, geben Sie die CloudID an. Beispiel:https://elasticsearch-host:9200 - apiKey: Der Base64-codierte API-Schlüssel für die Authentifizierung.
- index: Der Elasticsearch-Index, an den die Anfragen gesendet werden. Beispiel:
my-index.
Optionale Parameter
- inputTableSpec: Die BigQuery-Tabelle, aus der gelesen werden soll. Wenn Sie
inputTableSpecangeben, liest die Vorlage die Daten mithilfe der BigQuery Storage Read API direkt aus dem BigQuery-Speicher (https://cloud.google.com/bigquery/docs/reference/storage). Informationen zu Einschränkungen in der Storage Read API finden Sie unter https://cloud.google.com/bigquery/docs/reference/storage#limitations. Sie müssen entwederinputTableSpecoderqueryangeben. Wenn Sie beide Parameter festlegen, verwendet die Vorlage den Parameterquery. Beispiel:<BIGQUERY_PROJECT>:<DATASET_NAME>.<INPUT_TABLE>. - outputDeadletterTable: Die BigQuery-Tabelle für Nachrichten, die die Ausgabetabelle nicht erreicht haben. Wenn eine Tabelle nicht vorhanden ist, wird sie während der Pipelineausführung erstellt. Falls nichts angegeben wird, wird
<outputTableSpec>_error_recordsverwendet. Beispiel:<PROJECT_ID>:<DATASET_NAME>.<DEADLETTER_TABLE> - query: Die SQL-Abfrage zum Lesen von Daten aus BigQuery. Wenn sich das BigQuery-Dataset in einem anderen Projekt als der Dataflow-Job befindet, geben Sie den vollständigen Dataset-Namen in der SQL-Abfrage an, z. B. <PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>. Standardmäßig wird für den Parameter
queryGoogleSQL (https://cloud.google.com/bigquery/docs/introduction-sql) verwendet, sofernuseLegacySqlnichttrueist. Sie müssen entwederinputTableSpecoderqueryangeben. Wenn Sie beide Parameter festlegen, verwendet die Vorlage den Parameterquery. Beispiel:select * from sampledb.sample_table. - useLegacySql: Legen Sie
truefest, um Legacy-SQL zu verwenden. Dieser Parameter gilt nur, wenn der Parameterqueryverwendet wird. Die Standardeinstellung istfalse. - queryLocation: Erforderlich, wenn Daten aus einer autorisierten Ansicht ohne die Berechtigung der zugrunde liegenden Tabelle gelesen werden. Beispiel:
US. - queryTempDataset: Mit dieser Option können Sie ein vorhandenes Dataset festlegen, in dem die temporäre Tabelle zum Speichern der Abfrageergebnisse erstellt werden soll. Beispiel:
temp_dataset. - KMSEncryptionKey: Wenn Daten aus BigQuery mit einer Abfragequelle gelesen werden, wird dieser Cloud KMS-Schlüssel zum Verschlüsseln aller erstellten temporären Tabellen verwendet. Beispiel:
projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key. - elasticsearchUsername: Der Elasticsearch-Nutzername, mit dem Sie sich authentifizieren möchten. Wenn dieses angegeben ist, wird der Wert von
apiKeyignoriert. - elasticsearchPassword: Das Elasticsearch-Passwort, mit dem Sie sich authentifizieren. Wenn dieses angegeben ist, wird der Wert von
apiKeyignoriert. - batchSize: Die Batchgröße in der Anzahl an Dokumenten. Die Standardeinstellung ist
1000. - batchSizeBytes: Die Batchgröße in Anzahl der Byte. Die Standardeinstellung ist
5242880(5 MB). - maxRetryAttempts: Die maximale Anzahl der Wiederholungsversuche. Muss größer als Null (0) sein. Die Standardeinstellung ist
no retries. - maxRetryDuration: Die maximale Wiederholungsdauer in Millisekunden. Muss größer als Null (0) sein. Die Standardeinstellung ist
no retries. - propertyAsIndex: Das Attribut im indexierten Dokument, dessen Wert die
_index-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Hat Vorrang vor einer_index-UDF. Die Standardeinstellung istnone. - javaScriptIndexFnGcsPath: Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die
_index-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung istnone. - javaScriptIndexFnName: Der Name der UDF-JavaScript-Funktion, die
_index-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung istnone. - propertyAsId: Ein Attribut im indexierten Dokument, dessen Wert die
_id-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Hat Vorrang vor einer_id-UDF. Die Standardeinstellung istnone. - javaScriptIdFnGcsPath: Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für die Funktion, die
_id-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung istnone. - javaScriptIdFnName: Der Name der UDF-JavaScript-Funktion, die die
_id-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung istnone. - javaScriptTypeFnGcsPath: Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die
_type-Metadaten angibt, die in Bulk-Anfragen in Dokumenten aufgenommen werden sollen. Die Standardeinstellung istnone. - javaScriptTypeFnName: Der Name der UDF-JavaScript-Funktion, die die
_type-Metadaten angibt, die in Bulk-Anfragen in das Dokument aufgenommen werden sollen. Die Standardeinstellung istnone. - javaScriptIsDeleteFnGcsPath: Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für die Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion gibt einen Stringwert von
trueoderfalsezurück. Die Standardeinstellung istnone. - javaScriptIsDeleteFnName: Der Name der UDF-JavaScript-Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion gibt einen Stringwert von
trueoderfalsezurück. Die Standardeinstellung istnone. - usePartialUpdate: Gibt an, ob Teilaktualisierungen (Aktualisieren statt Erstellen oder Indexieren, Teildokumente sind zulässig) in Elasticsearch-Anfragen verwendet werden sollen. Die Standardeinstellung ist
false. - bulkInsertMethod: Gibt an, ob
INDEX(Indexieren, Upserts sind zulässig) oderCREATE(Erstellen, Fehler bei doppelter _id) in Bulk-Anfragen von Elasticsearch verwendet werden soll. Die Standardeinstellung istCREATE. - trustSelfSignedCerts: Gibt an, ob selbst signierten Zertifikaten vertraut werden soll. Eine installierte Elasticsearch-Instanz hat möglicherweise ein selbstsigniertes Zertifikat. Aktivieren Sie diese Option, um die Validierung des SSL-Zertifikats zu umgehen. (Standardeinstellung:
false). - disableCertificateValidation: Wenn
true, wird dem selbstsignierten SSL-Zertifikat vertraut. Eine Elasticsearch-Instanz hat möglicherweise ein selbstsigniertes Zertifikat. Wenn die Validierung für das Zertifikat umgangen werden soll, setzen Sie diesen Parameter auftrue. Die Standardeinstellung istfalse. - apiKeyKMSEncryptionKey: Der Cloud KMS-Schlüssel zum Entschlüsseln des API-Schlüssels. Dieser Parameter ist erforderlich, wenn
apiKeySourceaufKMSfestgelegt ist. Wenn dieser Parameter angegeben wird, muss ein verschlüsselterapiKey-String übergeben werden. Verschlüsseln Sie Parameter mit dem Verschlüsselungsendpunkt der KMS API. Verwenden Sie für den Schlüssel das Formatprojects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>. Siehe https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt, z. B.projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name. - apiKeySecretId: Die Secret Manager-Secret-ID für den API-Schlüssel. Geben Sie diesen Parameter an, wenn
apiKeySourceaufSECRET_MANAGERfestgelegt ist. Verwenden Sie das Formatprojects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example,projects/your-project-id/secrets/your-secret/versions/your-secret-version`. - apiKeySource: Die Quelle des API-Schlüssels. Zulässige Werte sind
PLAINTEXT,KMSundSECRET_MANAGER. Dieser Parameter ist erforderlich, wenn Sie Secret Manager oder KMS verwenden. WennapiKeySourceaufKMSfestgelegt ist, müssenapiKeyKMSEncryptionKeyund der verschlüsselte API-Schlüssel angegeben werden. WennapiKeySourceaufSECRET_MANAGERfestgelegt ist, mussapiKeySecretIdangegeben werden. WennapiKeySourceaufPLAINTEXTfestgelegt ist, mussapiKeyangegeben werden. Standardeinstellung: PLAINTEXT. - socketTimeout: Wenn festgelegt, wird das standardmäßige maximale Zeitlimit für Wiederholungen und das standardmäßige Socket-Zeitlimit (30.000 ms) im Elastic RestClient überschrieben.
- javascriptTextTransformGcsPath: Der Cloud Storage-URI der .js-Datei, in der die zu verwendende benutzerdefinierte JavaScript-Funktion (UDF) definiert wird. Beispiel:
gs://my-bucket/my-udfs/my_file.js. - javascriptTextTransformFunctionName: Der Name der benutzerdefinierten JavaScript-Funktion (UDF), die verwendet werden soll. Wenn Ihre JavaScript-Funktion beispielsweise
myTransform(inJson) { /*...do stuff...*/ }ist, lautet der FunktionsnamemyTransform. Beispiele für JavaScript-UDFs finden Sie unter „UDF-Beispiele“ (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
Benutzerdefinierte Funktionen
Diese Vorlage unterstützt benutzerdefinierte Funktionen (UDFs) an mehreren Stellen in der Pipeline, wie unten beschrieben. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.
Indexfunktion
Gibt den Index zurück, zu dem das Dokument gehört.
Vorlagenparameter:
javaScriptIndexFnGcsPath: Der Cloud Storage-URI der JavaScript-Datei.javaScriptIndexFnName: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Der Wert des Metadatenfelds
_indexdes Dokuments.
Funktion „Dokument-ID“
Gibt die Dokument-ID zurück.
Vorlagenparameter:
javaScriptIdFnGcsPath: Der Cloud Storage-URI der JavaScript-Datei.javaScriptIdFnName: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Der Wert des Metadatenfelds
_iddes Dokuments.
Funktion zum Löschen von Dokumenten
Gibt an, ob ein Dokument gelöscht werden soll. Wenn Sie diese Funktion verwenden möchten, legen Sie den Bulk-Eingabemodus auf INDEX fest und geben Sie eine Funktion für die Dokument-ID an.
Vorlagenparameter:
javaScriptIsDeleteFnGcsPath: Der Cloud Storage-URI der JavaScript-Datei.javaScriptIsDeleteFnName: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Geben Sie den String
"true"zurück, um das Dokument zu löschen, oder"false", um das Dokument zu aktualisieren.
Funktion für den Abgleichstyp
Gibt den Zuordnungstyp des Dokuments zurück.
Vorlagenparameter:
javaScriptTypeFnGcsPath: Der Cloud Storage-URI der JavaScript-Datei.javaScriptTypeFnName: Der Name der JavaScript-Funktion.
Funktionsspezifikation:
- Eingabe: Das Elasticsearch-Dokument, serialisiert als JSON-String.
- Ausgabe: Der Wert des Metadatenfelds
_typedes Dokuments.
Führen Sie die Vorlage aus.
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist
us-central1.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the BigQuery to Elasticsearch templateaus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_Elasticsearch \ --parameters \ inputTableSpec=INPUT_TABLE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX
Ersetzen Sie Folgendes:
PROJECT_ID: Die Google Cloud Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenJOB_NAME: ein eindeutiger Jobname Ihrer WahlREGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1VERSION: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latestzur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
INPUT_TABLE_SPEC: ist der BigQuery-TabellennameCONNECTION_URL: ist die Elasticsearch-URLAPIKEY: ist der base64-codierte API-Schlüssel für die AuthentifizierungINDEX: ist ihr Elasticsearch-Index
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_Elasticsearch", } }
Ersetzen Sie Folgendes:
PROJECT_ID: Die Google Cloud Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenJOB_NAME: ein eindeutiger Jobname Ihrer WahlLOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1VERSION: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latestzur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
INPUT_TABLE_SPEC: ist der BigQuery-TabellennameCONNECTION_URL: ist die Elasticsearch-URLAPIKEY: ist der base64-codierte API-Schlüssel für die AuthentifizierungINDEX: ist ihr Elasticsearch-Index
Nächste Schritte
- Dataflow-Vorlagen
- Sehen Sie sich die Liste der von Google bereitgestellten Vorlagen an.