Il modello Da Pub/Sub a Elasticsearch è una pipeline di inserimento flussi che legge i messaggi da una sottoscrizione Pub/Sub , esegue una funzione definita dall'utente (UDF) e li scrive in Elasticsearch come documenti. Il modello Dataflow utilizza la funzionalità dei flussi di dati di Elasticsearch per archiviare i dati delle serie temporali in più indici, fornendoti al contempo una singola risorsa denominata per le richieste. I flussi di dati sono adatti per log, metriche, tracce e altri dati generati continuamente archiviati in Pub/Sub.
Il template crea uno stream di dati denominato
logs-gcp.DATASET-NAMESPACE, dove:
- DATASET è il valore del parametro del modello
datasetopubsubse non specificato. - NAMESPACE è il valore del parametro del modello
namespaceodefaultse non specificato.
Requisiti della pipeline
- Deve esistere la sottoscrizione Pub/Sub di origine e i messaggi devono essere codificati in un formato JSON valido.
- Un host Elasticsearch raggiungibile pubblicamente su un' Google Cloud istanza o su Elastic Cloud con Elasticsearch versione 7.0 o successive. Per maggiori dettagli, consulta Integrazione di Google Cloud per Elastic.
- Un argomento Pub/Sub per l'output degli errori.
Parametri del modello
Parametri obbligatori
- inputSubscription: sottoscrizione Pub/Sub da cui consumare l'input. Ad esempio,
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>. - errorOutputTopic: l'argomento di output Pub/Sub per la pubblicazione dei record non riusciti, nel formato
projects/<PROJECT_ID>/topics/<TOPIC_NAME>. - connectionUrl: l'URL di Elasticsearch nel formato
https://hostname:[port]. Se utilizzi Elastic Cloud, specifica il CloudID. Ad esempio,https://elasticsearch-host:9200. - apiKey: la chiave API codificata in Base64 da utilizzare per l'autenticazione.
Parametri facoltativi
- dataset: il tipo di log inviati tramite Pub/Sub, per i quali disponiamo di una dashboard predefinita. I valori dei tipi di log noti sono
audit,vpcflowefirewall. Il valore predefinito èpubsub. - namespace: un raggruppamento arbitrario, ad esempio un ambiente (sviluppo, produzione o controllo qualità), un team o un'unità aziendale strategica. Il valore predefinito è
default. - elasticsearchTemplateVersion: identificatore della versione del modello Dataflow, in genere definito da Google Cloud. Il valore predefinito è 1.0.0.
- javascriptTextTransformGcsPath: l'URI Cloud Storage del file .js che definisce la funzione JavaScript definita dall'utente (UDF) da utilizzare. Ad esempio,
gs://my-bucket/my-udfs/my_file.js. - javascriptTextTransformFunctionName: il nome della funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio, se il codice della funzione JavaScript è
myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione èmyTransform. Per esempi di UDF JavaScript, consulta Esempi di UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). - javascriptTextTransformReloadIntervalMinutes: specifica la frequenza di ricaricamento della UDF, in minuti. Se il valore è maggiore di 0, Dataflow controlla periodicamente il file UDF in Cloud Storage e ricarica la UDF se il file viene modificato. Questo parametro ti consente di aggiornare la UDF mentre la pipeline è in esecuzione, senza dover riavviare il job. Se il valore è
0, il ricaricamento della UDF è disabilitato. Il valore predefinito è0. - elasticsearchUsername: il nome utente di Elasticsearch per l'autenticazione. Se specificato, il valore di
apiKeyviene ignorato. - elasticsearchPassword: la password di Elasticsearch per l'autenticazione. Se specificato, il valore di
apiKeyviene ignorato. - batchSize: le dimensioni del batch in numero di documenti. Il valore predefinito è
1000. - batchSizeBytes: le dimensioni del batch in numero di byte. Il valore predefinito è
5242880(5 MB). - maxRetryAttempts: il numero massimo di tentativi. Deve essere maggiore di zero. Il valore predefinito è
5. - maxRetryDuration: la durata massima dei tentativi in millisecondi. Deve essere maggiore di zero. Il valore predefinito è
60000(1 minuto). - propertyAsIndex: la proprietà nel documento indicizzato il cui valore specifica i metadati
_indexda includere nel documento nelle richieste collettive. Ha la precedenza su una UDF_index. Il valore predefinito ènone. - javaScriptIndexFnGcsPath: il percorso Cloud Storage all'origine della UDF JavaScript per una funzione che specifica i metadati
_indexda includere nel documento nelle richieste collettive. Il valore predefinito ènone. - javaScriptIndexFnName: il nome della funzione JavaScript UDF che specifica i metadati
_indexda includere nel documento nelle richieste collettive. Il valore predefinito ènone. - propertyAsId: una proprietà nel documento indicizzato il cui valore specifica i metadati
_idda includere nel documento nelle richieste collettive. Ha la precedenza su una UDF_id. Il valore predefinito ènone. - javaScriptIdFnGcsPath: il percorso Cloud Storage all'origine della UDF JavaScript per la funzione che specifica i metadati
_idda includere nel documento nelle richieste collettive. Il valore predefinito ènone. - javaScriptIdFnName: il nome della funzione JavaScript UDF che specifica i metadati
_idda includere nel documento nelle richieste collettive. Il valore predefinito ènone. - javaScriptTypeFnGcsPath: il percorso Cloud Storage all'origine della UDF JavaScript per una funzione che specifica i metadati
_typeda includere nei documenti nelle richieste collettive. Il valore predefinito ènone. - javaScriptTypeFnName: il nome della funzione JavaScript UDF che specifica i metadati
_typeda includere nel documento nelle richieste collettive. Il valore predefinito ènone. - javaScriptIsDeleteFnGcsPath: il percorso Cloud Storage all'origine della UDF JavaScript per la funzione che determina se eliminare il documento anziché inserirlo o aggiornarlo. La funzione restituisce un valore stringa
trueofalse. Il valore predefinito ènone. - javaScriptIsDeleteFnName: il nome della funzione JavaScript UDF che determina se eliminare il documento anziché inserirlo o aggiornarlo. La funzione restituisce un valore stringa
trueofalse. Il valore predefinito ènone. - usePartialUpdate: indica se utilizzare gli aggiornamenti parziali (aggiorna anziché creare o indicizzare, consentendo documenti parziali) con le richieste Elasticsearch. Il valore predefinito è
false. - bulkInsertMethod: indica se utilizzare
INDEX(indice, consente upsert) oCREATE(crea, errori su _id duplicato) con le richieste collettive Elasticsearch. Il valore predefinito èCREATE. - trustSelfSignedCerts: indica se considerare attendibili i certificati autofirmati. Un'istanza di Elasticsearch installata potrebbe avere un certificato autofirmato. Imposta questo valore su true per ignorare la convalida del certificato SSL. (Il valore predefinito è
false). - disableCertificateValidation: se
true, considera attendibile il certificato SSL autofirmato. Un'istanza di Elasticsearch potrebbe avere un certificato autofirmato. Per ignorare la convalida del certificato, imposta questo parametro sutrue. Il valore predefinito èfalse. - apiKeyKMSEncryptionKey: la chiave Cloud KMS per decriptare la chiave API. Questo parametro è obbligatorio se
apiKeySourceè impostato suKMS. Se questo parametro viene fornito, inserisci una stringaapiKeycriptata. Cripta i parametri utilizzando l'endpoint di crittografia dell'API KMS. Per la chiave, utilizza il formatoprojects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>. Consulta: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt Ad esempio,projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name. - apiKeySecretId: l'ID secret di Secret Manager per apiKey. Se
apiKeySourceè impostato suSECRET_MANAGER, fornisci questo parametro. Utilizza il formatoprojects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example,projects/your-project-id/secrets/your-secret/versions/your-secret-version`. - apiKeySource: l'origine della chiave API. I valori consentiti sono
PLAINTEXT,KMSoandSECRET_MANAGER. Questo parametro è obbligatorio quando utilizzi Secret Manager o KMS. SeapiKeySourceè impostato suKMS, devi fornireapiKeyKMSEncryptionKeye la chiave API criptata. SeapiKeySourceè impostato suSECRET_MANAGER, devi fornireapiKeySecretId. SeapiKeySourceè impostato suPLAINTEXT, devi fornireapiKey. Il valore predefinito è PLAINTEXT. - socketTimeout: se impostato, sovrascrive il timeout massimo predefinito per i tentativi e il timeout predefinito del socket (30000 ms) in Elastic RestClient.
Funzioni definite dall'utente
Questo modello supporta le funzioni definite dall'utente (UDF) in diversi punti della pipeline, descritti di seguito. Per ulteriori informazioni, consulta Creare funzioni definite dall'utente per i modelli Dataflow.
Funzione di trasformazione del testo
Trasforma il messaggio Pub/Sub in un documento Elasticsearch.
Parametri del modello:
javascriptTextTransformGcsPath: l'URI Cloud Storage del file JavaScript.javascriptTextTransformFunctionName: il nome della funzione JavaScript.
Specifiche della funzione:
- Input: il campo dati del messaggio Pub/Sub, serializzato come stringa JSON.
- Output: un documento JSON convertito in stringa da inserire in Elasticsearch.
Funzione di indice
Restituisce l'indice a cui appartiene il documento.
Parametri del modello:
javaScriptIndexFnGcsPath: l'URI Cloud Storage del file JavaScript.javaScriptIndexFnName: il nome della funzione JavaScript.
Specifiche della funzione:
- Input: il documento Elasticsearch, serializzato come stringa JSON.
- Output: il valore del campo metadati
_indexdel documento.
Funzione ID documento
Restituisce l'ID documento.
Parametri del modello:
javaScriptIdFnGcsPath: l'URI Cloud Storage del file JavaScript.javaScriptIdFnName: il nome della funzione JavaScript.
Specifiche della funzione:
- Input: il documento Elasticsearch, serializzato come stringa JSON.
- Output: il valore del campo metadati
_iddel documento.
Funzione di eliminazione dei documenti
Specifica se eliminare un documento. Per utilizzare questa funzione, imposta la modalità di inserimento collettivo su INDEX e fornisci una
funzione ID documento.
Parametri del modello:
javaScriptIsDeleteFnGcsPath: l'URI Cloud Storage del file JavaScript.javaScriptIsDeleteFnName: il nome della funzione JavaScript.
Specifiche della funzione:
- Input: il documento Elasticsearch, serializzato come stringa JSON.
- Output: restituisce la stringa
"true"per eliminare il documento o"false"per eseguire l'upsert del documento.
Funzione di tipo di mappatura
Restituisce il tipo di mappatura del documento.
Parametri del modello:
javaScriptTypeFnGcsPath: l'URI Cloud Storage del file JavaScript.javaScriptTypeFnName: il nome della funzione JavaScript.
Specifiche della funzione:
- Input: il documento Elasticsearch, serializzato come stringa JSON.
- Output: il valore del campo metadati
_typedel documento.
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome job univoco.
- (Facoltativo) Per Endpoint regionale, seleziona un valore dal menu a discesa. La regione predefinita è
us-central1.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.
- Dal menu a discesa Modello Dataflow, seleziona il modello Da Pub/Sub a Elasticsearch.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/ \ --parameters \ inputSubscription=SUBSCRIPTION_NAME,\ connectionUrl=CONNECTION_URL,\ dataset=DATASET,\ namespace=NAMESPACE,\ apiKey=APIKEY,\ errorOutputTopic=ERROR_OUTPUT_TOPIC
Sostituisci quanto segue:
PROJECT_ID: l'ID progetto in cui vuoi eseguire il job Dataflow Google CloudJOB_NAME: un nome job univoco a tua sceltaREGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1VERSION: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latestper utilizzare la versione più recente del modello, disponibile nella cartella principale non datata nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale datata nel bucket: gs://dataflow-templates-REGION_NAME/
ERROR_OUTPUT_TOPIC: l'argomento Pub/Sub per l'output degli erroriSUBSCRIPTION_NAME: il nome della sottoscrizione Pub/SubCONNECTION_URL: l'URL di ElasticsearchDATASET: il tipo di logNAMESPACE: lo spazio dei nomi per il set di datiAPIKEY: la chiave API codificata in Base64 per l'autenticazione
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta HTTP POST. Per ulteriori informazioni sull'
API e sui relativi ambiti di autorizzazione, consulta
projects.templates.launch.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "SUBSCRIPTION_NAME", "connectionUrl": "CONNECTION_URL", "dataset": "DATASET", "namespace": "NAMESPACE", "apiKey": "APIKEY", "errorOutputTopic": "ERROR_OUTPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/", } }
Sostituisci quanto segue:
PROJECT_ID: l'ID progetto in cui vuoi eseguire il job Dataflow Google CloudJOB_NAME: un nome job univoco a tua sceltaLOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1VERSION: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latestper utilizzare la versione più recente del modello, disponibile nella cartella principale non datata nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale datata nel bucket: gs://dataflow-templates-REGION_NAME/
ERROR_OUTPUT_TOPIC: l'argomento Pub/Sub per l'output degli erroriSUBSCRIPTION_NAME: il nome della sottoscrizione Pub/SubCONNECTION_URL: l'URL di ElasticsearchDATASET: il tipo di logNAMESPACE: lo spazio dei nomi per il set di datiAPIKEY: la chiave API codificata in Base64 per l'autenticazione
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.