Creare una pipeline di streaming utilizzando un modello Dataflow
Questa guida rapida mostra come creare una pipeline di streaming utilizzando un modello Dataflow fornito da Google. Nello specifico, questa guida rapida utilizza il modello Da Pub/Sub a BigQuery come esempio.
Il modello Da Pub/Sub a BigQuery è una pipeline di streaming che può leggere i messaggi in formato JSON da un argomento Pub/Sub e scriverli in una tabella BigQuery.
Per seguire le indicazioni dettagliate per questa attività direttamente nella console Google Cloud , fai clic su Procedura guidata:
Prima di iniziare
Completa i seguenti passaggi prima di eseguire la pipeline.
Configura il progetto
- Accedi al tuo account Google Cloud . Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti senza costi per l'esecuzione, il test e il deployment dei workload.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.
Ruoli obbligatori
Per completare questa guida rapida, devi disporre dei seguenti ruoli Identity and Access Management (IAM).
Per ottenere le autorizzazioni necessarie per completare questa guida rapida, chiedi all'amministratore di concederti i seguenti ruoli IAM nel tuo progetto:
-
Utente BigQuery (
roles/bigquery.user) -
Dataflow Admin (
roles/dataflow.admin) -
Utente Service Account (
roles/iam.serviceAccountUser) -
Storage Admin (
roles/storage.admin)
Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.
Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.
Per assicurarti che il account di servizio predefinito di Compute Engine disponga delle autorizzazioni necessarie per eseguire il job Dataflow, chiedi all'amministratore di concedere i seguenti ruoli IAM al account di servizio predefinito di Compute Engine sul tuo progetto:
-
Editor dati BigQuery (
roles/bigquery.dataEditor) -
Dataflow Worker (
roles/dataflow.worker) -
Editor Pub/Sub (
roles/pubsub.editor) -
Storage Object Admin (
roles/storage.objectAdmin) -
Visualizzatore (
roles/viewer)
Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.
L'amministratore potrebbe anche essere in grado di concedere al account di servizio predefinito di Compute Engine le autorizzazioni richieste tramite ruoli personalizzati o altri ruoli predefiniti.
Crea un bucket Cloud Storage
Prima di poter eseguire una pipeline, devi creare un bucket Cloud Storage.
Crea un bucket Cloud Storage:
- Nella console Google Cloud , vai alla pagina Bucket in Cloud Storage.
- Fai clic su Crea.
- Nella pagina Crea un bucket, inserisci le informazioni del bucket. Per andare al passaggio
successivo, fai clic su Continua.
- Per Assegna un nome al bucket, inserisci un nome univoco per il bucket. Non includere informazioni sensibili nel nome del bucket, poiché lo spazio dei nomi dei bucket è globale e visibile pubblicamente.
-
Nella sezione Scegli dove archiviare i tuoi dati, segui questi passaggi:
- Seleziona un Tipo di località.
- Scegli una posizione in cui i dati del bucket vengono archiviati in modo permanente dal menu a discesa Tipo di località.
- Se selezioni il tipo di località a doppia regione, puoi anche scegliere di attivare la replica turbo utilizzando la casella di controllo pertinente.
- Per configurare la replica tra bucket, seleziona
Aggiungi una replica tra bucket mediante Storage Transfer Service e
segui questi passaggi:
Configura la replica tra bucket
- Nel menu Bucket, seleziona un bucket.
Nella sezione Impostazioni di replica, fai clic su Configura per configurare le impostazioni per il job di replica.
Viene visualizzato il riquadro Configura replica tra bucket.
- Per filtrare gli oggetti da replicare in base al prefisso del nome dell'oggetto, inserisci un prefisso da cui includere o escludere gli oggetti, quindi fai clic su Aggiungi un prefisso.
- Per impostare una classe di archiviazione per gli oggetti replicati, seleziona una classe di archiviazione dal menu Classe di archiviazione. Se salti questo passaggio, gli oggetti replicati utilizzeranno per impostazione predefinita la classe di archiviazione del bucket di destinazione.
- Fai clic su Fine.
-
Nella sezione Scegli come archiviare i tuoi dati, segui questi passaggi:
- Nella sezione Impostare una classe predefinita, seleziona quanto segue: Standard.
- Per attivare lo spazio dei nomi gerarchico, nella sezione Ottimizza l'archiviazione per workload con uso intensivo dei dati, seleziona Abilita uno spazio dei nomi gerarchico in questo bucket.
- Nella sezione Scegli come controllare l'accesso agli oggetti, seleziona se il bucket applica o meno la prevenzione dell'accesso pubblico e seleziona un metodo di controllo dell'accesso per gli oggetti del bucket.
-
Nella sezione Scegli come proteggere i dati degli oggetti, segui questi passaggi:
- Seleziona una delle opzioni in Protezione dei dati che vuoi impostare per il bucket.
- Per attivare l'eliminazione temporanea, fai clic sulla casella di controllo Criterio di eliminazione temporanea (per il recupero dei dati) e specifica il numero di giorni per cui vuoi conservare gli oggetti dopo l'eliminazione.
- Per impostare il controllo delle versioni degli oggetti, seleziona la casella di controllo Controllo delle versioni degli oggetti (per il controllo delle versioni) e specifica il numero massimo di versioni per oggetto e il numero di giorni dopo i quali scadono le versioni non correnti.
- Per abilitare il criterio di conservazione su oggetti e bucket, seleziona la casella di controllo Conservazione (per la conformità), quindi procedi nel seguente modo:
- Per attivare il blocco della conservazione degli oggetti, fai clic sulla casella di controllo Abilita conservazione degli oggetti.
- Per attivare Bucket Lock, fai clic sulla casella di controllo Imposta criterio di conservazione del bucket e scegli un'unità di tempo e una durata per il periodo di conservazione.
- Per scegliere come verranno criptati i dati degli oggetti, espandi la sezione Crittografia dei dati () e seleziona un metodo di crittografia dei dati.
- Seleziona una delle opzioni in Protezione dei dati che vuoi impostare per il bucket.
- Fai clic su Crea.
Copia quanto segue, perché ti servirà in una sezione successiva:
- Il nome del bucket Cloud Storage.
- L'ID del tuo progetto Google Cloud .
Per trovare questo ID, consulta Identificazione dei progetti.
Rete VPC
Per impostazione predefinita, ogni nuovo progetto viene avviato con una rete
predefinita. Se la rete predefinita per il tuo progetto è disattivata o è stata eliminata, devi disporre di una rete nel tuo progetto per la quale il tuo account utente ha il ruolo Utente di rete Compute (roles/compute.networkUser).
Creare un set di dati e una tabella BigQuery
Crea un set di dati e una tabella BigQuery con lo schema appropriato per l'argomento Pub/Sub utilizzando la console Google Cloud .
In questo esempio, il nome del set di dati è taxirides e il nome della
tabella è realtime. Per creare questo set di dati e questa tabella:
- Vai alla pagina BigQuery.
Vai a BigQuery - Nel riquadro Explorer, accanto al progetto in cui vuoi creare il set di dati, fai clic su Visualizza azioni, quindi fai clic su Crea set di dati.
- Nel riquadro Crea set di dati, segui questi passaggi:
- In ID set di dati, inserisci
taxirides. Gli ID set di dati sono univoci per ogni Google Cloud progetto. - Per Tipo di località, scegli Più regioni e poi Stati Uniti (più regioni negli Stati Uniti). I set di dati pubblici sono archiviati nella
località multiregionale
US. Per semplicità, inserisci il tuo set di dati nella stessa località. - Lascia invariate le altre impostazioni predefinite e fai clic su Crea set di dati.
- Nel riquadro
Explorer , espandi il progetto. - Accanto al set di dati
taxirides, fai clic su Visualizza azioni, quindi fai clic su Crea tabella. - Nel riquadro Crea tabella, segui questi passaggi:
- Nella sezione Origine, per Crea tabella da, seleziona Tabella vuota.
- Nella sezione Destinazione, in Tabella, inserisci
realtime. - Nella sezione Schema, fai clic sull'opzione di attivazione/disattivazione Modifica come testo e incolla la seguente definizione di schema nel riquadro:
ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp, meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
- Nella sezione Impostazioni di partizionamento e clustering, per Partizionamento, seleziona il campo timestamp.
- Lascia invariate le altre impostazioni predefinite e fai clic su Crea tabella.
esegui la pipeline.
Esegui una pipeline in modalità flusso utilizzando il modello Da Pub/Sub a BigQuery fornito da Google. La pipeline riceve i dati in entrata dall'argomento di input.
- Vai alla pagina Job Dataflow:
Vai a Job - Fai clic su
Crea job da modello . - Inserisci
taxi-datacome Nome job per il tuo job Dataflow. - In Modello Dataflow, seleziona il modello Da Pub/Sub a BigQuery.
- In Tabella di output BigQuery, inserisci quanto segue:
PROJECT_ID:taxirides.realtime
Sostituisci
PROJECT_IDcon l'ID del progetto in cui hai creato il set di dati BigQuery. - Nella sezione Parametri facoltativi dell'origine, nel campo Argomento di input Pub/Sub, fai clic su Inserisci argomento manualmente.
- Nella finestra di dialogo, in Nome argomento, inserisci quanto segue e quindi fai clic su Salva:
projects/pubsub-public-data/topics/taxirides-realtime
Questo argomento Pub/Sub disponibile pubblicamente si basa sul set di dati aperto della NYC Taxi & Limousine Commission. Di seguito è riportato un messaggio di esempio di questo argomento, in formato JSON:
{ "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e", "point_idx": 217, "latitude": 40.75399, "longitude": -73.96302, "timestamp": "2021-03-08T02:29:09.66644-05:00", "meter_reading": 6.293821, "meter_increment": 0.029003782, "ride_status": "enroute", "passenger_count": 1 }
- Per Località temporanea, inserisci quanto segue:
gs://BUCKET_NAME/temp/
Sostituisci
BUCKET_NAMEcon il nome del tuo bucket Cloud Storage. La cartellatemparchivia i file temporanei, ad esempio il job della pipeline di gestione temporanea. - Se il tuo progetto non ha una rete predefinita, inserisci una rete e una subnet. Per saperne di più, consulta Specificare una rete e una subnet.
- Fai clic su Esegui job.
Visualizza i tuoi risultati
Per visualizzare i dati scritti nella tabellarealtime:
Vai alla pagina BigQuery.
Fai clic su Crea una nuova query. Si apre una nuova scheda Editor.
SELECT * FROM `PROJECT_ID.taxirides.realtime` WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY) LIMIT 1000
Sostituisci
PROJECT_IDcon l'ID progetto del progetto in cui hai creato il set di dati BigQuery. La visualizzazione dei dati nella tabella può richiedere fino a cinque minuti.Fai clic su Esegui.
La query restituisce le righe che sono state aggiunte alla tabella nelle ultime 24 ore. Puoi anche eseguire query utilizzando SQL standard.
Esegui la pulizia
Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, segui questi passaggi.
Elimina il progetto
Il modo più semplice per eliminare la fatturazione è eliminare il progetto Google Cloud che hai creato per la guida rapida.- Nella console Google Cloud , vai alla pagina Gestisci risorse.
- Nell'elenco dei progetti, seleziona quello che vuoi eliminare, quindi fai clic su Elimina.
- Nella finestra di dialogo, digita l'ID del progetto e fai clic su Chiudi per eliminare il progetto.
Elimina le singole risorse
Se vuoi conservare il progetto Google Cloud che hai utilizzato in questa guida rapida, elimina le singole risorse:
- Vai alla pagina Job Dataflow:
Vai a Job - Seleziona il tuo lavoro di streaming dall'elenco dei lavori.
- Nella navigazione, fai clic su Stop.
- Nella finestra di dialogo Arresta job, annulla o svuota la pipeline, quindi fai clic su Arresta job.
- Vai alla pagina BigQuery.
Vai a BigQuery - Nel riquadro Explorer, espandi il progetto.
- Accanto al set di dati che vuoi eliminare, fai clic su Visualizza azioni, quindi fai clic su Apri.
- Nel riquadro dei dettagli, fai clic su Elimina set di dati, quindi segui le istruzioni.
- Nella console Google Cloud , vai alla pagina Bucket in Cloud Storage.
- Fai clic sulla casella di controllo del bucket da eliminare.
- Per eliminare il bucket, fai clic su Elimina, quindi segui le istruzioni.