Trasmettere messaggi in streaming da Pub/Sub utilizzando Dataflow e Cloud Storage

Dataflow è un servizio completamente gestito per la trasformazione e l'arricchimento dei dati in modalità flusso (in tempo reale) e batch con uguale affidabilità ed espressività. Fornisce un ambiente di sviluppo delle pipeline semplificato utilizzando l'SDK Apache Beam, che dispone di un ricco set di primitive per windowing e analisi delle sessioni, nonché di un ecosistema di connettori di origine e sink. Questa guida rapida mostra come utilizzare Dataflow per:

  • Leggere i messaggi pubblicati in un argomento Pub/Sub
  • Raggruppa i messaggi per timestamp
  • Scrivi i messaggi in Cloud Storage

Questa guida rapida introduce l'utilizzo di Dataflow in Java e Python. È supportato anche SQL. Questa guida rapida è disponibile anche come tutorial di Google Cloud Skills Boost che offre credenziali temporanee per iniziare.

Puoi anche iniziare utilizzando i modelli Dataflow basati su UI se non intendi eseguire l'elaborazione personalizzata dei dati.

Prima di iniziare

  1. Accedi al tuo account Google Cloud . Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti senza costi per l'esecuzione, il test e il deployment dei workload.
  2. Installa Google Cloud CLI.

  3. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere a gcloud CLI con la tua identità federata.

  4. Per inizializzare gcloud CLI, esegui questo comando:

    gcloud init
  5. Crea o seleziona un Google Cloud progetto.

    Ruoli richiesti per selezionare o creare un progetto

    • Seleziona un progetto: la selezione di un progetto non richiede un ruolo IAM specifico. Puoi selezionare qualsiasi progetto per il quale ti è stato concesso un ruolo.
    • Crea un progetto: per creare un progetto, devi disporre del ruolo Autore progetto (roles/resourcemanager.projectCreator), che contiene l'autorizzazione resourcemanager.projects.create. Scopri come concedere i ruoli.
    • Creare un progetto Google Cloud :

      gcloud projects create PROJECT_ID

      Sostituisci PROJECT_ID con un nome per il progetto Google Cloud che stai creando.

    • Seleziona il progetto Google Cloud che hai creato:

      gcloud config set project PROJECT_ID

      Sostituisci PROJECT_ID con il nome del progetto Google Cloud .

  6. Verifica che la fatturazione sia abilitata per il tuo progetto Google Cloud .

  7. Abilita le API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud API JSON Storage, Pub/Sub, Resource Manager e Cloud Scheduler:

    Ruoli richiesti per abilitare le API

    Per abilitare le API, devi disporre del ruolo IAM Amministratore utilizzo dei servizi (roles/serviceusage.serviceUsageAdmin), che include l'autorizzazione serviceusage.services.enable. Scopri come concedere i ruoli.

    gcloud services enable dataflow.googleapis.com  compute.googleapis.com  logging.googleapis.com  storage-component.googleapis.com  storage-api.googleapis.com  pubsub.googleapis.com  cloudresourcemanager.googleapis.com  cloudscheduler.googleapis.com
  8. Configura l'autenticazione:

    1. Assicurati di disporre del ruolo IAM Creazione account di servizio (roles/iam.serviceAccountCreator) e del ruolo Amministratore IAM progetto (roles/resourcemanager.projectIamAdmin). Scopri come concedere i ruoli.
    2. Crea l'account di servizio:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Sostituisci SERVICE_ACCOUNT_NAME con un nome per il account di servizio.

    3. Concedi ruoli al account di servizio. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Sostituisci quanto segue:

      • SERVICE_ACCOUNT_NAME: il nome del account di servizio
      • PROJECT_ID: l'ID progetto in cui hai creato il account di servizio
      • ROLE: il ruolo da concedere
    4. Concedi il ruolo richiesto all'entità che collegherà ilaccount di serviziot ad altre risorse.

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Sostituisci quanto segue:

      • SERVICE_ACCOUNT_NAME: il nome del account di servizio
      • PROJECT_ID: l'ID progetto in cui hai creato il account di servizio
      • USER_EMAIL: l'indirizzo email di un Account Google
  9. Installa Google Cloud CLI.

  10. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere a gcloud CLI con la tua identità federata.

  11. Per inizializzare gcloud CLI, esegui questo comando:

    gcloud init
  12. Crea o seleziona un Google Cloud progetto.

    Ruoli richiesti per selezionare o creare un progetto

    • Seleziona un progetto: la selezione di un progetto non richiede un ruolo IAM specifico. Puoi selezionare qualsiasi progetto per il quale ti è stato concesso un ruolo.
    • Crea un progetto: per creare un progetto, devi disporre del ruolo Autore progetto (roles/resourcemanager.projectCreator), che contiene l'autorizzazione resourcemanager.projects.create. Scopri come concedere i ruoli.
    • Creare un progetto Google Cloud :

      gcloud projects create PROJECT_ID

      Sostituisci PROJECT_ID con un nome per il progetto Google Cloud che stai creando.

    • Seleziona il progetto Google Cloud che hai creato:

      gcloud config set project PROJECT_ID

      Sostituisci PROJECT_ID con il nome del progetto Google Cloud .

  13. Verifica che la fatturazione sia abilitata per il tuo progetto Google Cloud .

  14. Abilita le API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud API JSON Storage, Pub/Sub, Resource Manager e Cloud Scheduler:

    Ruoli richiesti per abilitare le API

    Per abilitare le API, devi disporre del ruolo IAM Amministratore utilizzo dei servizi (roles/serviceusage.serviceUsageAdmin), che include l'autorizzazione serviceusage.services.enable. Scopri come concedere i ruoli.

    gcloud services enable dataflow.googleapis.com  compute.googleapis.com  logging.googleapis.com  storage-component.googleapis.com  storage-api.googleapis.com  pubsub.googleapis.com  cloudresourcemanager.googleapis.com  cloudscheduler.googleapis.com
  15. Configura l'autenticazione:

    1. Assicurati di disporre del ruolo IAM Creazione account di servizio (roles/iam.serviceAccountCreator) e del ruolo Amministratore IAM progetto (roles/resourcemanager.projectIamAdmin). Scopri come concedere i ruoli.
    2. Crea l'account di servizio:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Sostituisci SERVICE_ACCOUNT_NAME con un nome per il account di servizio.

    3. Concedi ruoli al account di servizio. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Sostituisci quanto segue:

      • SERVICE_ACCOUNT_NAME: il nome del account di servizio
      • PROJECT_ID: l'ID progetto in cui hai creato il account di servizio
      • ROLE: il ruolo da concedere
    4. Concedi il ruolo richiesto all'entità che collegherà ilaccount di serviziot ad altre risorse.

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Sostituisci quanto segue:

      • SERVICE_ACCOUNT_NAME: il nome del account di servizio
      • PROJECT_ID: l'ID progetto in cui hai creato il account di servizio
      • USER_EMAIL: l'indirizzo email di un Account Google
  16. Crea credenziali di autenticazione locali per il tuo account utente:

    gcloud auth application-default login

    Se viene restituito un errore di autenticazione e utilizzi un provider di identità (IdP) esterno, verifica di aver acceduto a gcloud CLI con la tua identità federata.

Configurare il progetto Pub/Sub

  1. Crea variabili per il bucket, il progetto e la regione. I nomi dei bucket Cloud Storage devono essere univoci a livello globale. Seleziona una regione Dataflow vicina a quella in cui esegui i comandi in questa guida rapida. Il valore della variabile REGION deve essere un nome di regione valido. Per saperne di più su regioni e località, consulta Località di Dataflow.

    BUCKET_NAME=BUCKET_NAME
    PROJECT_ID=$(gcloud config get-value project)
    TOPIC_ID=TOPIC_ID
    REGION=DATAFLOW_REGION
    SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
  2. Crea un bucket Cloud Storage di proprietà di questo progetto:

    gcloud storage buckets create gs://$BUCKET_NAME
  3. Crea un argomento Pub/Sub in questo progetto:

    gcloud pubsub topics create $TOPIC_ID
  4. Crea un job Cloud Scheduler in questo progetto. Il job pubblica un messaggio in un argomento Pub/Sub a intervalli di un minuto.

    Se non esiste un'app App Engine per il progetto, questo passaggio ne creerà una.

    gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
        --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION

    Avvia il lavoro.

    gcloud scheduler jobs run publisher-job --location=$REGION
  5. Utilizza i seguenti comandi per clonare il repository di avvio rapido e passare alla directory delcodice campioneo:

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
    cd java-docs-samples/pubsub/streaming-analytics

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsub/streaming-analytics
    pip install -r requirements.txt  # Install Apache Beam dependencies

Trasmettere flussi di messaggi da Pub/Sub a Cloud Storage

Esempio di codice

Questo codice campione utilizza Dataflow per:

  • Leggi i messaggi Pub/Sub.
  • Raggruppa i messaggi in intervalli di dimensioni fisse in base ai timestamp di pubblicazione.
  • Scrivi i messaggi in ogni finestra nei file di Cloud Storage.

Java


import java.io.IOException;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

public class PubSubToGcs {
  /*
   * Define your own configuration options. Add your own arguments to be processed
   * by the command-line parser, and specify default values for them.
   */
  public interface PubSubToGcsOptions extends StreamingOptions {
    @Description("The Cloud Pub/Sub topic to read from.")
    @Required
    String getInputTopic();

    void setInputTopic(String value);

    @Description("Output file's window size in number of minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Path of the output file including its filename prefix.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  public static void main(String[] args) throws IOException {
    // The maximum number of shards when writing output.
    int numShards = 1;

    PubSubToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);

    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        // 1) Read string messages from a Pub/Sub topic.
        .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
        // 2) Group the messages into fixed-sized minute intervals.
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        // 3) Write one file to GCS for every window of messages.
        .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline and wait until it finishes running.
    pipeline.run().waitUntilFinish();
  }
}

Python

import argparse
from datetime import datetime
import logging
import random

from apache_beam import (
    DoFn,
    GroupByKey,
    io,
    ParDo,
    Pipeline,
    PTransform,
    WindowInto,
    WithKeys,
)
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows


class GroupMessagesByFixedWindows(PTransform):
    """A composite transform that groups Pub/Sub messages based on publish time
    and outputs a list of tuples, each containing a message and its publish time.
    """

    def __init__(self, window_size, num_shards=5):
        # Set window size to 60 seconds.
        self.window_size = int(window_size * 60)
        self.num_shards = num_shards

    def expand(self, pcoll):
        return (
            pcoll
            # Bind window info to each element using element timestamp (or publish time).
            | "Window into fixed intervals"
            >> WindowInto(FixedWindows(self.window_size))
            | "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
            # Assign a random key to each windowed element based on the number of shards.
            | "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1))
            # Group windowed elements by key. All the elements in the same window must fit
            # memory for this. If not, you need to use `beam.util.BatchElements`.
            | "Group by key" >> GroupByKey()
        )


class AddTimestamp(DoFn):
    def process(self, element, publish_time=DoFn.TimestampParam):
        """Processes each windowed element by extracting the message body and its
        publish time into a tuple.
        """
        yield (
            element.decode("utf-8"),
            datetime.utcfromtimestamp(float(publish_time)).strftime(
                "%Y-%m-%d %H:%M:%S.%f"
            ),
        )


class WriteToGCS(DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, key_value, window=DoFn.WindowParam):
        """Write messages in a batch to Google Cloud Storage."""

        ts_format = "%H:%M"
        window_start = window.start.to_utc_datetime().strftime(ts_format)
        window_end = window.end.to_utc_datetime().strftime(ts_format)
        shard_id, batch = key_value
        filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])

        with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for message_body, publish_time in batch:
                f.write(f"{message_body},{publish_time}\n".encode())


def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
    # Set `save_main_session` to True so DoFns can access globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )

    with Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam
            # binds the publish time returned by the Pub/Sub server for each message
            # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.
            # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
            | "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
            | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
            | "Write to GCS" >> ParDo(WriteToGCS(output_path))
        )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_topic",
        help="The Cloud Pub/Sub topic to read from."
        '"projects/<PROJECT_ID>/topics/<TOPIC_ID>".',
    )
    parser.add_argument(
        "--window_size",
        type=float,
        default=1.0,
        help="Output file's window size in minutes.",
    )
    parser.add_argument(
        "--output_path",
        help="Path of the output GCS file including the prefix.",
    )
    parser.add_argument(
        "--num_shards",
        type=int,
        default=5,
        help="Number of shards to use when writing windowed elements to GCS.",
    )
    known_args, pipeline_args = parser.parse_known_args()

    run(
        known_args.input_topic,
        known_args.output_path,
        known_args.window_size,
        known_args.num_shards,
        pipeline_args,
    )

Avvia la pipeline

Per avviare la pipeline, esegui questo comando:

Java

mvn compile exec:java \
  -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \
  -Dexec.cleanupDaemonThreads=false \
  -Dexec.args=" \
    --project=$PROJECT_ID \
    --region=$REGION \
    --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \
    --output=gs://$BUCKET_NAME/samples/output \
    --gcpTempLocation=gs://$BUCKET_NAME/temp \
    --runner=DataflowRunner \
    --windowSize=2 \
    --serviceAccount=$SERVICE_ACCOUNT"

Python

python PubSubToGCS.py \
  --project=$PROJECT_ID \
  --region=$REGION \
  --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \
  --output_path=gs://$BUCKET_NAME/samples/output \
  --runner=DataflowRunner \
  --window_size=2 \
  --num_shards=2 \
  --temp_location=gs://$BUCKET_NAME/temp \
  --service_account_email=$SERVICE_ACCOUNT

Il comando precedente viene eseguito localmente e avvia un job Dataflow che viene eseguito nel cloud. Quando il comando restituisce JOB_MESSAGE_DETAILED: Workers have started successfully, esci dal programma locale utilizzando Ctrl+C.

Osserva l'avanzamento del job e della pipeline

Puoi osservare l'avanzamento del job nella console Dataflow.

Vai alla console Dataflow

Osserva l&#39;avanzamento del job

Apri la visualizzazione dei dettagli del job per visualizzare:

  • Struttura del job
  • Log job
  • Metriche della fase

Osserva l&#39;avanzamento del job

Potresti dover attendere alcuni minuti prima di visualizzare i file di output in Cloud Storage.

Osserva l&#39;avanzamento del job

In alternativa, utilizza la riga di comando riportata di seguito per verificare quali file sono stati scritti.

gcloud storage ls gs://${BUCKET_NAME}/samples/

L'output dovrebbe essere simile al seguente:

Java

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1

Python

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, elimina il progetto Google Cloud con le risorse.

  1. Elimina il job Cloud Scheduler.

    gcloud scheduler jobs delete publisher-job --location=$REGION
  2. Nella console Dataflow, arresta il job. Annulla la pipeline senza svuotarla.

  3. Elimina l'argomento.

    gcloud pubsub topics delete $TOPIC_ID
  4. Elimina i file creati dalla pipeline.

    gcloud storage rm "gs://${BUCKET_NAME}/samples/output*" --recursive --continue-on-error
    gcloud storage rm "gs://${BUCKET_NAME}/temp/*" --recursive --continue-on-error
  5. Rimuovi il bucket Cloud Storage.

    gcloud storage rm gs://${BUCKET_NAME} --recursive

  6. Elimina il account di servizio:
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  7. (Facoltativo) Revoca le credenziali di autenticazione che hai creato ed elimina il file delle credenziali locale.

    gcloud auth application-default revoke
  8. (Facoltativo) Revoca le credenziali da gcloud CLI.

    gcloud auth revoke

Passaggi successivi